暫無描述

abuseipdb.py 1.3KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. from __future__ import annotations
  2. from typing import Any
  3. import httpx
  4. class AbuseIpdbAdapter:
  5. def __init__(self, base_url: str, api_key: str) -> None:
  6. self.base_url = base_url.rstrip("/")
  7. self.api_key = api_key
  8. def _headers(self) -> dict[str, str]:
  9. return {"Key": self.api_key, "Accept": "application/json"} if self.api_key else {}
  10. async def check_ip(self, ip: str, max_age_in_days: int = 90, verbose: bool = True) -> dict[str, Any]:
  11. if not self.api_key:
  12. raise RuntimeError("AbuseIPDB API key is not configured")
  13. url = f"{self.base_url}/check"
  14. params = {
  15. "ipAddress": ip.strip(),
  16. "maxAgeInDays": max(1, int(max_age_in_days)),
  17. "verbose": "true" if verbose else "false",
  18. }
  19. headers = self._headers()
  20. async with httpx.AsyncClient(timeout=20.0) as client:
  21. response = await client.get(url, headers=headers, params=params)
  22. try:
  23. response.raise_for_status()
  24. except httpx.HTTPStatusError as exc:
  25. detail = response.text.strip()
  26. raise RuntimeError(
  27. f"AbuseIPDB returned {response.status_code} for {url}. Response: {detail}"
  28. ) from exc
  29. return response.json() if response.content else {"status_code": response.status_code}