| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- """
- ioc_feed.py — Public threat-intel feed fetcher.
- Sources:
- Feodo Tracker — botnet C2 IPs (Emotet, TrickBot, QakBot, …)
- ThreatFox — IPs, domains, and hashes from live malware C2 (Abuse.ch)
- URLhaus — malware distribution domains (Abuse.ch)
- MalwareBazaar — SHA256 malware hashes (Abuse.ch)
- """
- from __future__ import annotations
- import logging
- from typing import Any
- import httpx
- logger = logging.getLogger(__name__)
- FEODO_IP_URL = "https://feodotracker.abuse.ch/downloads/ipblocklist.txt"
- THREATFOX_API = "https://threatfox-api.abuse.ch/api/v1/"
- URLHAUS_DOMAINS_URL = "https://urlhaus.abuse.ch/downloads/text_online/"
- BAZAAR_HASHES_URL = "https://bazaar.abuse.ch/export/txt/sha256/full/"
- _TIMEOUT = 30.0
- class IocFeedAdapter:
- async def fetch_feodo_ips(self) -> dict[str, str]:
- """Feodo Tracker IP blocklist — plain text, one IP per line, comments start with #."""
- result: dict[str, str] = {}
- try:
- async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
- r = await client.get(FEODO_IP_URL)
- r.raise_for_status()
- for line in r.text.splitlines():
- line = line.strip()
- if not line or line.startswith("#"):
- continue
- # Format: "1.2.3.4" or "1.2.3.4 # comment"
- ip = line.split()[0]
- if _is_valid_ip(ip):
- result[ip] = "feodo_c2"
- logger.info("feodo_tracker ips=%d", len(result))
- except Exception as exc:
- logger.warning("feodo_tracker fetch failed: %s", exc)
- return result
- async def fetch_threatfox(self, days: int = 3) -> tuple[dict[str, str], dict[str, str], dict[str, str]]:
- """
- ThreatFox — returns (ips, domains, hashes) dicts.
- ioc_type values: "ip:port", "domain", "url", "md5_hash", "sha256_hash"
- """
- ips: dict[str, str] = {}
- domains: dict[str, str] = {}
- hashes: dict[str, str] = {}
- try:
- async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
- r = await client.post(
- THREATFOX_API,
- json={"query": "get_iocs", "days": days},
- headers={"Content-Type": "application/json"},
- )
- r.raise_for_status()
- payload: dict[str, Any] = r.json()
- for entry in payload.get("data") or []:
- ioc_type = str(entry.get("ioc_type") or "")
- ioc_value = str(entry.get("ioc") or "").strip()
- malware = str(entry.get("malware_printable") or "threatfox").replace(" ", "_")[:32]
- tag = f"threatfox_{malware.lower()}"
- if ioc_type == "ip:port":
- ip = ioc_value.split(":")[0]
- if _is_valid_ip(ip):
- ips[ip] = tag
- elif ioc_type == "domain":
- if ioc_value:
- domains[ioc_value] = tag
- elif ioc_type == "url":
- # extract hostname from URL
- host = _hostname_from_url(ioc_value)
- if host:
- domains[host] = tag
- elif ioc_type in {"sha256_hash"}:
- if len(ioc_value) == 64:
- hashes[ioc_value] = tag
- logger.info("threatfox ips=%d domains=%d hashes=%d", len(ips), len(domains), len(hashes))
- except Exception as exc:
- logger.warning("threatfox fetch failed: %s", exc)
- return ips, domains, hashes
- async def fetch_urlhaus_domains(self) -> dict[str, str]:
- """URLhaus — online malware distribution domains (plain text list)."""
- result: dict[str, str] = {}
- try:
- async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
- r = await client.get(URLHAUS_DOMAINS_URL)
- r.raise_for_status()
- for line in r.text.splitlines():
- line = line.strip()
- if not line or line.startswith("#"):
- continue
- # Lines are full URLs like "http://evil.com/path"
- host = _hostname_from_url(line)
- if host and not _is_valid_ip(host):
- result[host] = "urlhaus_malware"
- logger.info("urlhaus domains=%d", len(result))
- except Exception as exc:
- logger.warning("urlhaus fetch failed: %s", exc)
- return result
- async def fetch_bazaar_hashes(self) -> dict[str, str]:
- """MalwareBazaar SHA256 full export (plain text, one hash per line)."""
- result: dict[str, str] = {}
- try:
- async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
- r = await client.get(BAZAAR_HASHES_URL)
- r.raise_for_status()
- for line in r.text.splitlines():
- line = line.strip()
- if not line or line.startswith("#"):
- continue
- if len(line) == 64 and all(c in "0123456789abcdefABCDEF" for c in line):
- result[line.lower()] = "bazaar_malware"
- logger.info("bazaar hashes=%d", len(result))
- except Exception as exc:
- logger.warning("bazaar fetch failed: %s", exc)
- return result
- # ---------------------------------------------------------------------------
- # Helpers
- # ---------------------------------------------------------------------------
- def _is_valid_ip(value: str) -> bool:
- parts = value.split(".")
- if len(parts) != 4:
- return False
- try:
- return all(0 <= int(p) <= 255 for p in parts)
- except ValueError:
- return False
- def _hostname_from_url(url: str) -> str:
- """Extract hostname from a URL-like string, stripping scheme and path."""
- url = url.strip()
- if "://" in url:
- url = url.split("://", 1)[1]
- return url.split("/")[0].split(":")[0].lower()
|