| 1234567891011121314151617181920212223242526272829303132333435363738 |
- from __future__ import annotations
- from typing import Any
- import httpx
- class AbuseIpdbAdapter:
- def __init__(self, base_url: str, api_key: str) -> None:
- self.base_url = base_url.rstrip("/")
- self.api_key = api_key
- def _headers(self) -> dict[str, str]:
- return {"Key": self.api_key, "Accept": "application/json"} if self.api_key else {}
- async def check_ip(self, ip: str, max_age_in_days: int = 90, verbose: bool = True) -> dict[str, Any]:
- if not self.api_key:
- raise RuntimeError("AbuseIPDB API key is not configured")
- url = f"{self.base_url}/check"
- params = {
- "ipAddress": ip.strip(),
- "maxAgeInDays": max(1, int(max_age_in_days)),
- "verbose": "true" if verbose else "false",
- }
- headers = self._headers()
- async with httpx.AsyncClient(timeout=20.0) as client:
- response = await client.get(url, headers=headers, params=params)
- try:
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = response.text.strip()
- raise RuntimeError(
- f"AbuseIPDB returned {response.status_code} for {url}. Response: {detail}"
- ) from exc
- return response.json() if response.content else {"status_code": response.status_code}
|