from __future__ import annotations from typing import Any import httpx class AbuseIpdbAdapter: def __init__(self, base_url: str, api_key: str) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key def _headers(self) -> dict[str, str]: return {"Key": self.api_key, "Accept": "application/json"} if self.api_key else {} async def check_ip(self, ip: str, max_age_in_days: int = 90, verbose: bool = True) -> dict[str, Any]: if not self.api_key: raise RuntimeError("AbuseIPDB API key is not configured") url = f"{self.base_url}/check" params = { "ipAddress": ip.strip(), "maxAgeInDays": max(1, int(max_age_in_days)), "verbose": "true" if verbose else "false", } headers = self._headers() async with httpx.AsyncClient(timeout=20.0) as client: response = await client.get(url, headers=headers, params=params) try: response.raise_for_status() except httpx.HTTPStatusError as exc: detail = response.text.strip() raise RuntimeError( f"AbuseIPDB returned {response.status_code} for {url}. Response: {detail}" ) from exc return response.json() if response.content else {"status_code": response.status_code}