from typing import Any import httpx class WazuhAdapter: def __init__( self, base_url: str, username: str, password: str, indexer_url: str | None = None, indexer_username: str | None = None, indexer_password: str | None = None, ) -> None: self.base_url = base_url.rstrip("/") self.username = username self.password = password self.indexer_url = (indexer_url or "").rstrip("/") self.indexer_username = indexer_username self.indexer_password = indexer_password async def _authenticate(self, client: httpx.AsyncClient) -> str: auth_url = f"{self.base_url}/security/user/authenticate?raw=true" response = await client.post(auth_url, auth=(self.username, self.password)) response.raise_for_status() token = response.text.strip() if not token: raise RuntimeError("Wazuh authentication returned an empty token.") return token async def _get_with_bearer( self, client: httpx.AsyncClient, token: str, path: str, params: dict[str, Any] | None = None, ) -> dict[str, Any]: url = f"{self.base_url}{path}" response = await client.get( url, headers={"Authorization": f"Bearer {token}"}, params=params, ) response.raise_for_status() return response.json() if response.content else {"status_code": response.status_code} async def auth_test(self) -> dict[str, Any]: async with httpx.AsyncClient(verify=False, timeout=15.0) as client: token = await self._authenticate(client) return {"authenticated": True, "token_prefix": token[:12]} async def get_version(self) -> dict[str, Any]: async with httpx.AsyncClient(verify=False, timeout=15.0) as client: token = await self._authenticate(client) # API root should return API metadata when authenticated. try: return await self._get_with_bearer(client, token, "/") except httpx.HTTPStatusError: # Fallback endpoint for environments restricting root access. return await self._get_with_bearer(client, token, "/manager/info") async def get_manager_info(self) -> dict[str, Any]: async with httpx.AsyncClient(verify=False, timeout=15.0) as client: token = await self._authenticate(client) return await self._get_with_bearer(client, token, "/manager/info") async def list_agents( self, limit: int = 50, offset: int = 0, select: str | None = None ) -> dict[str, Any]: params: dict[str, Any] = {"limit": limit, "offset": offset} if select: params["select"] = select async with httpx.AsyncClient(verify=False, timeout=20.0) as client: token = await self._authenticate(client) return await self._get_with_bearer(client, token, "/agents", params=params) async def list_manager_logs( self, limit: int = 50, offset: int = 0, q: str | None = None, sort: str | None = None, ) -> dict[str, Any]: params: dict[str, Any] = {"limit": limit, "offset": offset} if q: params["q"] = q if sort: params["sort"] = sort async with httpx.AsyncClient(verify=False, timeout=20.0) as client: token = await self._authenticate(client) return await self._get_with_bearer(client, token, "/manager/logs", params=params) async def search_alerts( self, query: str, limit: int = 50, minutes: int = 120, ) -> dict[str, Any]: if not self.indexer_url: raise RuntimeError("Wazuh indexer URL is not configured.") body: dict[str, Any] = { "size": limit, "sort": [{"@timestamp": {"order": "desc"}}], "query": { "bool": { "must": [{"query_string": {"query": query}}], "filter": [ { "range": { "@timestamp": { "gte": f"now-{minutes}m", "lte": "now", } } } ], } }, } async with httpx.AsyncClient( verify=False, timeout=20.0, auth=(self.indexer_username, self.indexer_password), ) as client: response = await client.post(f"{self.indexer_url}/wazuh-alerts-*/_search", json=body) response.raise_for_status() return response.json()