Aucune description

ioc_feed.py 5.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. """
  2. ioc_feed.py — Public threat-intel feed fetcher.
  3. Sources:
  4. Feodo Tracker — botnet C2 IPs (Emotet, TrickBot, QakBot, …)
  5. ThreatFox — IPs, domains, and hashes from live malware C2 (Abuse.ch)
  6. URLhaus — malware distribution domains (Abuse.ch)
  7. MalwareBazaar — SHA256 malware hashes (Abuse.ch)
  8. """
  9. from __future__ import annotations
  10. import logging
  11. from typing import Any
  12. import httpx
  13. logger = logging.getLogger(__name__)
  14. FEODO_IP_URL = "https://feodotracker.abuse.ch/downloads/ipblocklist.txt"
  15. THREATFOX_API = "https://threatfox-api.abuse.ch/api/v1/"
  16. URLHAUS_DOMAINS_URL = "https://urlhaus.abuse.ch/downloads/text_online/"
  17. BAZAAR_HASHES_URL = "https://bazaar.abuse.ch/export/txt/sha256/full/"
  18. _TIMEOUT = 30.0
  19. class IocFeedAdapter:
  20. async def fetch_feodo_ips(self) -> dict[str, str]:
  21. """Feodo Tracker IP blocklist — plain text, one IP per line, comments start with #."""
  22. result: dict[str, str] = {}
  23. try:
  24. async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
  25. r = await client.get(FEODO_IP_URL)
  26. r.raise_for_status()
  27. for line in r.text.splitlines():
  28. line = line.strip()
  29. if not line or line.startswith("#"):
  30. continue
  31. # Format: "1.2.3.4" or "1.2.3.4 # comment"
  32. ip = line.split()[0]
  33. if _is_valid_ip(ip):
  34. result[ip] = "feodo_c2"
  35. logger.info("feodo_tracker ips=%d", len(result))
  36. except Exception as exc:
  37. logger.warning("feodo_tracker fetch failed: %s", exc)
  38. return result
  39. async def fetch_threatfox(self, days: int = 3) -> tuple[dict[str, str], dict[str, str], dict[str, str]]:
  40. """
  41. ThreatFox — returns (ips, domains, hashes) dicts.
  42. ioc_type values: "ip:port", "domain", "url", "md5_hash", "sha256_hash"
  43. """
  44. ips: dict[str, str] = {}
  45. domains: dict[str, str] = {}
  46. hashes: dict[str, str] = {}
  47. try:
  48. async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
  49. r = await client.post(
  50. THREATFOX_API,
  51. json={"query": "get_iocs", "days": days},
  52. headers={"Content-Type": "application/json"},
  53. )
  54. r.raise_for_status()
  55. payload: dict[str, Any] = r.json()
  56. for entry in payload.get("data") or []:
  57. ioc_type = str(entry.get("ioc_type") or "")
  58. ioc_value = str(entry.get("ioc") or "").strip()
  59. malware = str(entry.get("malware_printable") or "threatfox").replace(" ", "_")[:32]
  60. tag = f"threatfox_{malware.lower()}"
  61. if ioc_type == "ip:port":
  62. ip = ioc_value.split(":")[0]
  63. if _is_valid_ip(ip):
  64. ips[ip] = tag
  65. elif ioc_type == "domain":
  66. if ioc_value:
  67. domains[ioc_value] = tag
  68. elif ioc_type == "url":
  69. # extract hostname from URL
  70. host = _hostname_from_url(ioc_value)
  71. if host:
  72. domains[host] = tag
  73. elif ioc_type in {"sha256_hash"}:
  74. if len(ioc_value) == 64:
  75. hashes[ioc_value] = tag
  76. logger.info("threatfox ips=%d domains=%d hashes=%d", len(ips), len(domains), len(hashes))
  77. except Exception as exc:
  78. logger.warning("threatfox fetch failed: %s", exc)
  79. return ips, domains, hashes
  80. async def fetch_urlhaus_domains(self) -> dict[str, str]:
  81. """URLhaus — online malware distribution domains (plain text list)."""
  82. result: dict[str, str] = {}
  83. try:
  84. async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
  85. r = await client.get(URLHAUS_DOMAINS_URL)
  86. r.raise_for_status()
  87. for line in r.text.splitlines():
  88. line = line.strip()
  89. if not line or line.startswith("#"):
  90. continue
  91. # Lines are full URLs like "http://evil.com/path"
  92. host = _hostname_from_url(line)
  93. if host and not _is_valid_ip(host):
  94. result[host] = "urlhaus_malware"
  95. logger.info("urlhaus domains=%d", len(result))
  96. except Exception as exc:
  97. logger.warning("urlhaus fetch failed: %s", exc)
  98. return result
  99. async def fetch_bazaar_hashes(self) -> dict[str, str]:
  100. """MalwareBazaar SHA256 full export (plain text, one hash per line)."""
  101. result: dict[str, str] = {}
  102. try:
  103. async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
  104. r = await client.get(BAZAAR_HASHES_URL)
  105. r.raise_for_status()
  106. for line in r.text.splitlines():
  107. line = line.strip()
  108. if not line or line.startswith("#"):
  109. continue
  110. if len(line) == 64 and all(c in "0123456789abcdefABCDEF" for c in line):
  111. result[line.lower()] = "bazaar_malware"
  112. logger.info("bazaar hashes=%d", len(result))
  113. except Exception as exc:
  114. logger.warning("bazaar fetch failed: %s", exc)
  115. return result
  116. # ---------------------------------------------------------------------------
  117. # Helpers
  118. # ---------------------------------------------------------------------------
  119. def _is_valid_ip(value: str) -> bool:
  120. parts = value.split(".")
  121. if len(parts) != 4:
  122. return False
  123. try:
  124. return all(0 <= int(p) <= 255 for p in parts)
  125. except ValueError:
  126. return False
  127. def _hostname_from_url(url: str) -> str:
  128. """Extract hostname from a URL-like string, stripping scheme and path."""
  129. url = url.strip()
  130. if "://" in url:
  131. url = url.split("://", 1)[1]
  132. return url.split("/")[0].split(":")[0].lower()