Nessuna descrizione

ioc_list_service.py 7.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. """
  2. ioc_list_service.py — Build and push Wazuh CDB threat-intel lists.
  3. Sources merged per refresh cycle:
  4. 1. Feodo Tracker — botnet C2 IPs
  5. 2. ThreatFox — IPs, domains, hashes from live C2
  6. 3. URLhaus — malware distribution domains
  7. 4. MalwareBazaar — SHA256 hashes
  8. 5. ioc_trace (local) — high-confidence VT/AbuseIPDB matches from this pipeline
  9. After writing the list files to disk (bind-mounted into the Wazuh container),
  10. the service calls PUT /manager/restart via the Wazuh API so analysisd
  11. recompiles the CDB binary files and reloads rules.
  12. """
  13. from __future__ import annotations
  14. import logging
  15. from datetime import datetime, timezone
  16. from pathlib import Path
  17. from typing import Any
  18. from app.adapters.ioc_feed import IocFeedAdapter
  19. from app.adapters.wazuh import WazuhAdapter
  20. from app.repositories.mvp_repo import MvpRepository
  21. logger = logging.getLogger(__name__)
  22. _CDB_COMMENT = (
  23. "# Auto-generated by soc-integrator ioc_list_service\n"
  24. "# DO NOT EDIT MANUALLY — changes will be overwritten on next refresh\n"
  25. "# Format: key:source_tag\n"
  26. )
  27. class IocListService:
  28. def __init__(
  29. self,
  30. repo: MvpRepository,
  31. wazuh_adapter: WazuhAdapter,
  32. feed_adapter: IocFeedAdapter,
  33. lists_path: str = "/ioc-lists",
  34. confidence_threshold: float = 0.7,
  35. lookback_days: int = 30,
  36. ) -> None:
  37. self.repo = repo
  38. self.wazuh_adapter = wazuh_adapter
  39. self.feed_adapter = feed_adapter
  40. self.lists_path = Path(lists_path)
  41. self.confidence_threshold = confidence_threshold
  42. self.lookback_days = lookback_days
  43. async def refresh(self) -> dict[str, Any]:
  44. """
  45. Full refresh cycle:
  46. 1. Fetch public feeds
  47. 2. Merge with local ioc_trace confirmed hits
  48. 3. Write CDB list files to disk
  49. 4. Restart Wazuh analysisd so lists are recompiled
  50. Returns a summary dict with counts per list and timing.
  51. """
  52. started_at = datetime.now(timezone.utc).isoformat()
  53. errors: list[str] = []
  54. # ------------------------------------------------------------------
  55. # 1. Fetch public feeds
  56. # ------------------------------------------------------------------
  57. feodo_ips = await self.feed_adapter.fetch_feodo_ips()
  58. tf_ips, tf_domains, tf_hashes = await self.feed_adapter.fetch_threatfox(days=3)
  59. uh_domains = await self.feed_adapter.fetch_urlhaus_domains()
  60. bz_hashes = await self.feed_adapter.fetch_bazaar_hashes()
  61. # ------------------------------------------------------------------
  62. # 2. Merge with local confirmed hits from ioc_trace
  63. # ------------------------------------------------------------------
  64. local_ips = {
  65. v: "local_vt_abuseipdb"
  66. for v in self.repo.get_confirmed_iocs(
  67. "ip", self.confidence_threshold, self.lookback_days
  68. )
  69. }
  70. local_domains = {
  71. v: "local_vt"
  72. for v in self.repo.get_confirmed_iocs(
  73. "domain", self.confidence_threshold, self.lookback_days
  74. )
  75. }
  76. local_hashes = {
  77. v: "local_vt"
  78. for v in self.repo.get_confirmed_iocs(
  79. "hash", self.confidence_threshold, self.lookback_days
  80. )
  81. }
  82. # Public feeds take precedence (more authoritative tag), local fills gaps
  83. all_ips: dict[str, str] = {**local_ips, **feodo_ips, **tf_ips}
  84. all_domains: dict[str, str] = {**local_domains, **uh_domains, **tf_domains}
  85. all_hashes: dict[str, str] = {**local_hashes, **bz_hashes, **tf_hashes}
  86. # ------------------------------------------------------------------
  87. # 3. Write CDB files
  88. # ------------------------------------------------------------------
  89. self.lists_path.mkdir(parents=True, exist_ok=True)
  90. try:
  91. _write_cdb(self.lists_path / "malicious-ip", all_ips)
  92. except Exception as exc:
  93. errors.append(f"write malicious-ip: {exc}")
  94. logger.error("failed to write malicious-ip: %s", exc)
  95. try:
  96. _write_cdb(self.lists_path / "malicious-domains", all_domains)
  97. except Exception as exc:
  98. errors.append(f"write malicious-domains: {exc}")
  99. logger.error("failed to write malicious-domains: %s", exc)
  100. try:
  101. _write_cdb(self.lists_path / "malware-hashes", all_hashes)
  102. except Exception as exc:
  103. errors.append(f"write malware-hashes: {exc}")
  104. logger.error("failed to write malware-hashes: %s", exc)
  105. # ------------------------------------------------------------------
  106. # 4. Restart Wazuh analysisd to recompile CDB binaries
  107. # ------------------------------------------------------------------
  108. wazuh_restarted = False
  109. wazuh_error: str | None = None
  110. if not errors:
  111. try:
  112. await self.wazuh_adapter.restart_manager()
  113. wazuh_restarted = True
  114. logger.info("wazuh manager restart triggered — CDB lists reloaded")
  115. except Exception as exc:
  116. wazuh_error = str(exc)
  117. errors.append(f"wazuh_restart: {exc}")
  118. logger.error("wazuh restart failed: %s", exc)
  119. else:
  120. wazuh_error = "skipped due to write errors"
  121. finished_at = datetime.now(timezone.utc).isoformat()
  122. return {
  123. "started_at": started_at,
  124. "finished_at": finished_at,
  125. "lists": {
  126. "malicious_ips": len(all_ips),
  127. "malicious_domains": len(all_domains),
  128. "malware_hashes": len(all_hashes),
  129. },
  130. "sources": {
  131. "feodo_ips": len(feodo_ips),
  132. "threatfox_ips": len(tf_ips),
  133. "threatfox_domains": len(tf_domains),
  134. "threatfox_hashes": len(tf_hashes),
  135. "urlhaus_domains": len(uh_domains),
  136. "bazaar_hashes": len(bz_hashes),
  137. "local_confirmed_ips": len(local_ips),
  138. "local_confirmed_domains": len(local_domains),
  139. "local_confirmed_hashes": len(local_hashes),
  140. },
  141. "wazuh_restarted": wazuh_restarted,
  142. "wazuh_error": wazuh_error,
  143. "errors": errors,
  144. }
  145. def list_status(self) -> dict[str, Any]:
  146. """Return current entry counts for each CDB file (reads from disk)."""
  147. out: dict[str, Any] = {}
  148. for name in ("malicious-ip", "malicious-domains", "malware-hashes"):
  149. path = self.lists_path / name
  150. if path.exists():
  151. lines = [
  152. l for l in path.read_text().splitlines()
  153. if l.strip() and not l.startswith("#")
  154. ]
  155. out[name] = {"entries": len(lines), "exists": True}
  156. else:
  157. out[name] = {"entries": 0, "exists": False}
  158. return out
  159. # ---------------------------------------------------------------------------
  160. # Helpers
  161. # ---------------------------------------------------------------------------
  162. def _write_cdb(path: Path, entries: dict[str, str]) -> None:
  163. """Write a Wazuh CDB list file. Format: key:tag per line."""
  164. lines = [_CDB_COMMENT]
  165. for key, tag in sorted(entries.items()):
  166. # CDB keys must not contain colons (except ip:port which we already strip)
  167. safe_key = str(key).replace(":", "_")
  168. lines.append(f"{safe_key}:{tag}\n")
  169. path.write_text("".join(lines), encoding="utf-8")
  170. logger.info("wrote %s entries → %s", len(entries), path)