Aucune description

geoip.py 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from __future__ import annotations
  2. import ipaddress
  3. import time
  4. from typing import Any
  5. import httpx
  6. class GeoIpAdapter:
  7. def __init__(self, provider: str = "ipwhois", cache_ttl_seconds: int = 21600) -> None:
  8. normalized = (provider or "").strip().lower()
  9. if normalized in {"ipwho.is", "ipwhois"}:
  10. normalized = "ipwhois"
  11. self.provider = normalized or "ipwhois"
  12. self.cache_ttl_seconds = max(60, int(cache_ttl_seconds))
  13. self._cache: dict[str, tuple[float, dict[str, Any]]] = {}
  14. def _is_private_or_invalid(self, ip: str) -> bool:
  15. try:
  16. addr = ipaddress.ip_address(ip)
  17. except ValueError:
  18. return True
  19. return (
  20. addr.is_private
  21. or addr.is_loopback
  22. or addr.is_reserved
  23. or addr.is_link_local
  24. or addr.is_multicast
  25. or addr.is_unspecified
  26. )
  27. async def lookup(self, ip: str) -> dict[str, Any]:
  28. candidate = ip.strip()
  29. if not candidate:
  30. return {"ok": False, "country_code": None, "error": "missing_ip"}
  31. if self._is_private_or_invalid(candidate):
  32. return {"ok": False, "country_code": None, "error": "private_or_invalid_ip"}
  33. now = time.time()
  34. cached = self._cache.get(candidate)
  35. if cached and cached[0] > now:
  36. out = dict(cached[1])
  37. out["cached"] = True
  38. return out
  39. if self.provider == "ip-api":
  40. url = f"http://ip-api.com/json/{candidate}"
  41. params = {"fields": "status,message,countryCode,country,lat,lon,query"}
  42. async with httpx.AsyncClient(timeout=8.0) as client:
  43. response = await client.get(url, params=params)
  44. response.raise_for_status()
  45. data = response.json() if response.content else {}
  46. if str(data.get("status", "")).lower() != "success":
  47. return {
  48. "ok": False,
  49. "country_code": None,
  50. "country_name": None,
  51. "error": str(data.get("message") or "lookup_failed"),
  52. "provider": "ip-api",
  53. "ip": candidate,
  54. }
  55. result: dict[str, Any] = {
  56. "ok": True,
  57. "country_code": data.get("countryCode"),
  58. "country_name": data.get("country"),
  59. "lat": data.get("lat"),
  60. "lon": data.get("lon"),
  61. "provider": "ip-api",
  62. "ip": data.get("query", candidate),
  63. "cached": False,
  64. }
  65. elif self.provider == "ipwhois":
  66. url = f"https://ipwho.is/{candidate}"
  67. async with httpx.AsyncClient(timeout=8.0) as client:
  68. response = await client.get(url)
  69. response.raise_for_status()
  70. data = response.json() if response.content else {}
  71. if not bool(data.get("success", False)):
  72. return {
  73. "ok": False,
  74. "country_code": None,
  75. "country_name": None,
  76. "error": str(data.get("message") or "lookup_failed"),
  77. "provider": "ipwho.is",
  78. "ip": candidate,
  79. }
  80. result = {
  81. "ok": True,
  82. "country_code": data.get("country_code"),
  83. "country_name": data.get("country"),
  84. "lat": data.get("latitude"),
  85. "lon": data.get("longitude"),
  86. "provider": "ipwho.is",
  87. "ip": data.get("ip", candidate),
  88. "cached": False,
  89. }
  90. else:
  91. return {"ok": False, "country_code": None, "country_name": None, "error": f"unsupported_provider:{self.provider}"}
  92. self._cache[candidate] = (now + self.cache_ttl_seconds, result)
  93. return result