| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- from __future__ import annotations
- import ipaddress
- import time
- from typing import Any
- import httpx
- class GeoIpAdapter:
- def __init__(self, provider: str = "ipwhois", cache_ttl_seconds: int = 21600) -> None:
- normalized = (provider or "").strip().lower()
- if normalized in {"ipwho.is", "ipwhois"}:
- normalized = "ipwhois"
- self.provider = normalized or "ipwhois"
- self.cache_ttl_seconds = max(60, int(cache_ttl_seconds))
- self._cache: dict[str, tuple[float, dict[str, Any]]] = {}
- def _is_private_or_invalid(self, ip: str) -> bool:
- try:
- addr = ipaddress.ip_address(ip)
- except ValueError:
- return True
- return (
- addr.is_private
- or addr.is_loopback
- or addr.is_reserved
- or addr.is_link_local
- or addr.is_multicast
- or addr.is_unspecified
- )
- async def lookup(self, ip: str) -> dict[str, Any]:
- candidate = ip.strip()
- if not candidate:
- return {"ok": False, "country_code": None, "error": "missing_ip"}
- if self._is_private_or_invalid(candidate):
- return {"ok": False, "country_code": None, "error": "private_or_invalid_ip"}
- now = time.time()
- cached = self._cache.get(candidate)
- if cached and cached[0] > now:
- out = dict(cached[1])
- out["cached"] = True
- return out
- if self.provider == "ip-api":
- url = f"http://ip-api.com/json/{candidate}"
- params = {"fields": "status,message,countryCode,country,lat,lon,query"}
- async with httpx.AsyncClient(timeout=8.0) as client:
- response = await client.get(url, params=params)
- response.raise_for_status()
- data = response.json() if response.content else {}
- if str(data.get("status", "")).lower() != "success":
- return {
- "ok": False,
- "country_code": None,
- "country_name": None,
- "error": str(data.get("message") or "lookup_failed"),
- "provider": "ip-api",
- "ip": candidate,
- }
- result: dict[str, Any] = {
- "ok": True,
- "country_code": data.get("countryCode"),
- "country_name": data.get("country"),
- "lat": data.get("lat"),
- "lon": data.get("lon"),
- "provider": "ip-api",
- "ip": data.get("query", candidate),
- "cached": False,
- }
- elif self.provider == "ipwhois":
- url = f"https://ipwho.is/{candidate}"
- async with httpx.AsyncClient(timeout=8.0) as client:
- response = await client.get(url)
- response.raise_for_status()
- data = response.json() if response.content else {}
- if not bool(data.get("success", False)):
- return {
- "ok": False,
- "country_code": None,
- "country_name": None,
- "error": str(data.get("message") or "lookup_failed"),
- "provider": "ipwho.is",
- "ip": candidate,
- }
- result = {
- "ok": True,
- "country_code": data.get("country_code"),
- "country_name": data.get("country"),
- "lat": data.get("latitude"),
- "lon": data.get("longitude"),
- "provider": "ipwho.is",
- "ip": data.get("ip", candidate),
- "cached": False,
- }
- else:
- return {"ok": False, "country_code": None, "country_name": None, "error": f"unsupported_provider:{self.provider}"}
- self._cache[candidate] = (now + self.cache_ttl_seconds, result)
- return result
|