| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- """
- ioc_list_service.py — Build and push Wazuh CDB threat-intel lists.
- Sources merged per refresh cycle:
- 1. Feodo Tracker — botnet C2 IPs
- 2. ThreatFox — IPs, domains, hashes from live C2
- 3. URLhaus — malware distribution domains
- 4. MalwareBazaar — SHA256 hashes
- 5. ioc_trace (local) — high-confidence VT/AbuseIPDB matches from this pipeline
- After writing the list files to disk (bind-mounted into the Wazuh container),
- the service calls PUT /manager/restart via the Wazuh API so analysisd
- recompiles the CDB binary files and reloads rules.
- """
- from __future__ import annotations
- import logging
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any
- from app.adapters.ioc_feed import IocFeedAdapter
- from app.adapters.wazuh import WazuhAdapter
- from app.repositories.mvp_repo import MvpRepository
- logger = logging.getLogger(__name__)
- _CDB_COMMENT = (
- "# Auto-generated by soc-integrator ioc_list_service\n"
- "# DO NOT EDIT MANUALLY — changes will be overwritten on next refresh\n"
- "# Format: key:source_tag\n"
- )
- class IocListService:
- def __init__(
- self,
- repo: MvpRepository,
- wazuh_adapter: WazuhAdapter,
- feed_adapter: IocFeedAdapter,
- lists_path: str = "/ioc-lists",
- confidence_threshold: float = 0.7,
- lookback_days: int = 30,
- ) -> None:
- self.repo = repo
- self.wazuh_adapter = wazuh_adapter
- self.feed_adapter = feed_adapter
- self.lists_path = Path(lists_path)
- self.confidence_threshold = confidence_threshold
- self.lookback_days = lookback_days
- async def refresh(self) -> dict[str, Any]:
- """
- Full refresh cycle:
- 1. Fetch public feeds
- 2. Merge with local ioc_trace confirmed hits
- 3. Write CDB list files to disk
- 4. Restart Wazuh analysisd so lists are recompiled
- Returns a summary dict with counts per list and timing.
- """
- started_at = datetime.now(timezone.utc).isoformat()
- errors: list[str] = []
- # ------------------------------------------------------------------
- # 1. Fetch public feeds
- # ------------------------------------------------------------------
- feodo_ips = await self.feed_adapter.fetch_feodo_ips()
- tf_ips, tf_domains, tf_hashes = await self.feed_adapter.fetch_threatfox(days=3)
- uh_domains = await self.feed_adapter.fetch_urlhaus_domains()
- bz_hashes = await self.feed_adapter.fetch_bazaar_hashes()
- # ------------------------------------------------------------------
- # 2. Merge with local confirmed hits from ioc_trace
- # ------------------------------------------------------------------
- local_ips = {
- v: "local_vt_abuseipdb"
- for v in self.repo.get_confirmed_iocs(
- "ip", self.confidence_threshold, self.lookback_days
- )
- }
- local_domains = {
- v: "local_vt"
- for v in self.repo.get_confirmed_iocs(
- "domain", self.confidence_threshold, self.lookback_days
- )
- }
- local_hashes = {
- v: "local_vt"
- for v in self.repo.get_confirmed_iocs(
- "hash", self.confidence_threshold, self.lookback_days
- )
- }
- # Public feeds take precedence (more authoritative tag), local fills gaps
- all_ips: dict[str, str] = {**local_ips, **feodo_ips, **tf_ips}
- all_domains: dict[str, str] = {**local_domains, **uh_domains, **tf_domains}
- all_hashes: dict[str, str] = {**local_hashes, **bz_hashes, **tf_hashes}
- # ------------------------------------------------------------------
- # 3. Write CDB files
- # ------------------------------------------------------------------
- self.lists_path.mkdir(parents=True, exist_ok=True)
- try:
- _write_cdb(self.lists_path / "malicious-ip", all_ips)
- except Exception as exc:
- errors.append(f"write malicious-ip: {exc}")
- logger.error("failed to write malicious-ip: %s", exc)
- try:
- _write_cdb(self.lists_path / "malicious-domains", all_domains)
- except Exception as exc:
- errors.append(f"write malicious-domains: {exc}")
- logger.error("failed to write malicious-domains: %s", exc)
- try:
- _write_cdb(self.lists_path / "malware-hashes", all_hashes)
- except Exception as exc:
- errors.append(f"write malware-hashes: {exc}")
- logger.error("failed to write malware-hashes: %s", exc)
- # ------------------------------------------------------------------
- # 4. Restart Wazuh analysisd to recompile CDB binaries
- # ------------------------------------------------------------------
- wazuh_restarted = False
- wazuh_error: str | None = None
- if not errors:
- try:
- await self.wazuh_adapter.restart_manager()
- wazuh_restarted = True
- logger.info("wazuh manager restart triggered — CDB lists reloaded")
- except Exception as exc:
- wazuh_error = str(exc)
- errors.append(f"wazuh_restart: {exc}")
- logger.error("wazuh restart failed: %s", exc)
- else:
- wazuh_error = "skipped due to write errors"
- finished_at = datetime.now(timezone.utc).isoformat()
- return {
- "started_at": started_at,
- "finished_at": finished_at,
- "lists": {
- "malicious_ips": len(all_ips),
- "malicious_domains": len(all_domains),
- "malware_hashes": len(all_hashes),
- },
- "sources": {
- "feodo_ips": len(feodo_ips),
- "threatfox_ips": len(tf_ips),
- "threatfox_domains": len(tf_domains),
- "threatfox_hashes": len(tf_hashes),
- "urlhaus_domains": len(uh_domains),
- "bazaar_hashes": len(bz_hashes),
- "local_confirmed_ips": len(local_ips),
- "local_confirmed_domains": len(local_domains),
- "local_confirmed_hashes": len(local_hashes),
- },
- "wazuh_restarted": wazuh_restarted,
- "wazuh_error": wazuh_error,
- "errors": errors,
- }
- def list_status(self) -> dict[str, Any]:
- """Return current entry counts for each CDB file (reads from disk)."""
- out: dict[str, Any] = {}
- for name in ("malicious-ip", "malicious-domains", "malware-hashes"):
- path = self.lists_path / name
- if path.exists():
- lines = [
- l for l in path.read_text().splitlines()
- if l.strip() and not l.startswith("#")
- ]
- out[name] = {"entries": len(lines), "exists": True}
- else:
- out[name] = {"entries": 0, "exists": False}
- return out
- # ---------------------------------------------------------------------------
- # Helpers
- # ---------------------------------------------------------------------------
- def _write_cdb(path: Path, entries: dict[str, str]) -> None:
- """Write a Wazuh CDB list file. Format: key:tag per line."""
- lines = [_CDB_COMMENT]
- for key, tag in sorted(entries.items()):
- # CDB keys must not contain colons (except ip:port which we already strip)
- safe_key = str(key).replace(":", "_")
- lines.append(f"{safe_key}:{tag}\n")
- path.write_text("".join(lines), encoding="utf-8")
- logger.info("wrote %s entries → %s", len(entries), path)
|