from __future__ import annotations import ipaddress import time from typing import Any import httpx class GeoIpAdapter: def __init__(self, provider: str = "ipwhois", cache_ttl_seconds: int = 21600) -> None: normalized = (provider or "").strip().lower() if normalized in {"ipwho.is", "ipwhois"}: normalized = "ipwhois" self.provider = normalized or "ipwhois" self.cache_ttl_seconds = max(60, int(cache_ttl_seconds)) self._cache: dict[str, tuple[float, dict[str, Any]]] = {} def _is_private_or_invalid(self, ip: str) -> bool: try: addr = ipaddress.ip_address(ip) except ValueError: return True return ( addr.is_private or addr.is_loopback or addr.is_reserved or addr.is_link_local or addr.is_multicast or addr.is_unspecified ) async def lookup(self, ip: str) -> dict[str, Any]: candidate = ip.strip() if not candidate: return {"ok": False, "country_code": None, "error": "missing_ip"} if self._is_private_or_invalid(candidate): return {"ok": False, "country_code": None, "error": "private_or_invalid_ip"} now = time.time() cached = self._cache.get(candidate) if cached and cached[0] > now: out = dict(cached[1]) out["cached"] = True return out if self.provider == "ip-api": url = f"http://ip-api.com/json/{candidate}" params = {"fields": "status,message,countryCode,country,lat,lon,query"} async with httpx.AsyncClient(timeout=8.0) as client: response = await client.get(url, params=params) response.raise_for_status() data = response.json() if response.content else {} if str(data.get("status", "")).lower() != "success": return { "ok": False, "country_code": None, "country_name": None, "error": str(data.get("message") or "lookup_failed"), "provider": "ip-api", "ip": candidate, } result: dict[str, Any] = { "ok": True, "country_code": data.get("countryCode"), "country_name": data.get("country"), "lat": data.get("lat"), "lon": data.get("lon"), "provider": "ip-api", "ip": data.get("query", candidate), "cached": False, } elif self.provider == "ipwhois": url = f"https://ipwho.is/{candidate}" async with httpx.AsyncClient(timeout=8.0) as client: response = await client.get(url) response.raise_for_status() data = response.json() if response.content else {} if not bool(data.get("success", False)): return { "ok": False, "country_code": None, "country_name": None, "error": str(data.get("message") or "lookup_failed"), "provider": "ipwho.is", "ip": candidate, } result = { "ok": True, "country_code": data.get("country_code"), "country_name": data.get("country"), "lat": data.get("latitude"), "lon": data.get("longitude"), "provider": "ipwho.is", "ip": data.get("ip", candidate), "cached": False, } else: return {"ok": False, "country_code": None, "country_name": None, "error": f"unsupported_provider:{self.provider}"} self._cache[candidate] = (now + self.cache_ttl_seconds, result) return result