Bez popisu

test-ioc-pipeline.py 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. #!/usr/bin/env python3
  2. """
  3. test-ioc-pipeline.py — End-to-end test: Wazuh syslog → soc-integrator IOC evaluation → IRIS alert.
  4. Flow:
  5. 1. Send a syslog event with embedded IOC payload to Wazuh (via Docker exec)
  6. 2. Wait for Wazuh indexer to index the alert
  7. 3. Trigger POST /wazuh/sync-to-mvp
  8. 4. Verify ioc_evaluated > 0 and decision_source == "direct_api" (VT/AbuseIPDB, no Shuffle)
  9. 5. Verify IRIS alert created if IOC matched
  10. Log formats used (both fire level-8 rules):
  11. ioc_ips → FortiGate IPS: logid=0419016386 ... attack=IOC.IP.Match ioc_type=ip → rule 110318
  12. ioc_dns → soc_event=dns_ioc event_type=ioc_dns_traffic query=<domain> → rule 110301
  13. Usage:
  14. python3 scripts/test-ioc-pipeline.py
  15. python3 scripts/test-ioc-pipeline.py --ioc-type domain --ioc-value "evil.example.com"
  16. python3 scripts/test-ioc-pipeline.py --ioc-value "198.51.100.1" --wait 30
  17. python3 scripts/test-ioc-pipeline.py --no-send # sync only, skip sending
  18. Env vars:
  19. INTEGRATOR_URL default: http://localhost:8088
  20. INTEGRATOR_API_KEY default: dev-internal-key
  21. """
  22. from __future__ import annotations
  23. import argparse
  24. import datetime
  25. import json
  26. import os
  27. import socket
  28. import ssl
  29. import subprocess
  30. import sys
  31. import time
  32. import urllib.request
  33. import uuid
  34. # ---------------------------------------------------------------------------
  35. # Config
  36. # ---------------------------------------------------------------------------
  37. INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088")
  38. INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key")
  39. WAZUH_CONTAINER = "wazuh-single-wazuh.manager-1"
  40. SYSLOG_PORT = 514
  41. SYSLOG_SRC_IP = "172.16.22.253" # must be in allowed-ips
  42. SSL_CTX = ssl.create_default_context()
  43. SSL_CTX.check_hostname = False
  44. SSL_CTX.verify_mode = ssl.CERT_NONE
  45. PASS = "\033[32m✓\033[0m"
  46. FAIL = "\033[31m✗\033[0m"
  47. INFO = "\033[36m·\033[0m"
  48. WARN = "\033[33m!\033[0m"
  49. # ---------------------------------------------------------------------------
  50. # HTTP helpers
  51. # ---------------------------------------------------------------------------
  52. def _get(url: str, headers: dict | None = None) -> dict:
  53. req = urllib.request.Request(url, headers=headers or {})
  54. with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
  55. return json.loads(r.read())
  56. def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict:
  57. body = json.dumps(data or {}).encode()
  58. h = {"Content-Type": "application/json", **(headers or {})}
  59. req = urllib.request.Request(url, data=body, headers=h, method="POST")
  60. with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r:
  61. return json.loads(r.read())
  62. def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict:
  63. url = f"{INTEGRATOR_URL}{path}"
  64. if params:
  65. url += ("&" if "?" in url else "?") + params
  66. headers = {"X-Internal-API-Key": INTEGRATOR_KEY}
  67. if method == "POST":
  68. return _post(url, data, headers)
  69. return _get(url, headers)
  70. # ---------------------------------------------------------------------------
  71. # Syslog helpers
  72. # ---------------------------------------------------------------------------
  73. def _ts() -> tuple[str, str]:
  74. now = datetime.datetime.now(datetime.timezone.utc)
  75. return now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S")
  76. def _send_via_docker(raw_line: str, port: int) -> bool:
  77. """Send raw syslog line from inside the Wazuh container via /dev/udp."""
  78. py_cmd = (
  79. f"import socket; s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM); "
  80. f"s.sendto({repr((raw_line + chr(10)).encode())}, ('127.0.0.1',{port})); s.close()"
  81. )
  82. result = subprocess.run(
  83. ["docker", "exec", WAZUH_CONTAINER, "python3", "-c", py_cmd],
  84. capture_output=True, timeout=10,
  85. )
  86. return result.returncode == 0
  87. def build_ioc_ips_log(event_id: str, ioc_value: str) -> str:
  88. """
  89. FortiGate IPS format that fires rule 110318 (A2-08 IOC IP indicator, level 8).
  90. Requires: logid=0419016386 + attack field + action=dropped + ioc_type=ip
  91. """
  92. date, ts = _ts()
  93. return (
  94. f"date={date} time={ts} devname=FG-TEST-FW devid=FGT60E0000000001 "
  95. f"logid=0419016386 type=utm subtype=ips level=critical vd=root "
  96. f"srcip={ioc_value} srcport=54321 srcintf=port1 "
  97. f"dstip=172.16.1.5 dstport=80 dstintf=port2 "
  98. f"policyid=1 proto=6 action=dropped service=HTTP "
  99. f"attack=IOC.IP.Match ioc_type=ip ioc_value={ioc_value} "
  100. f"event_id={event_id}"
  101. )
  102. def build_ioc_dns_log(event_id: str, domain: str) -> str:
  103. """
  104. SOC DNS IOC format that fires rule 110301 (A1-01 DNS IOC traffic, level 8).
  105. Requires: soc_event=dns_ioc + event_type=ioc_dns_traffic
  106. """
  107. return (
  108. f"soc_event=dns_ioc event_type=ioc_dns_traffic "
  109. f"event_id={event_id} "
  110. f"src_ip=10.26.45.214 query={domain} "
  111. f"action=blocked severity=medium"
  112. )
  113. # ---------------------------------------------------------------------------
  114. # Pretty helpers
  115. # ---------------------------------------------------------------------------
  116. def step(n: int | str, label: str) -> None:
  117. print(f"\n\033[1mStep {n}: {label}\033[0m")
  118. def ok(msg: str) -> None:
  119. print(f" {PASS} {msg}")
  120. def fail(msg: str) -> None:
  121. print(f" {FAIL} {msg}")
  122. def info(msg: str) -> None:
  123. print(f" {INFO} {msg}")
  124. def warn(msg: str) -> None:
  125. print(f" {WARN} {msg}")
  126. # ---------------------------------------------------------------------------
  127. # Main test
  128. # ---------------------------------------------------------------------------
  129. def run(args: argparse.Namespace) -> int:
  130. errors = 0
  131. event_id = f"ioc-test-{uuid.uuid4().hex[:12]}"
  132. print(f"\n\033[1mIOC Pipeline Test\033[0m (Wazuh → soc-integrator → IRIS)")
  133. print(f" ioc_type : {args.ioc_type}")
  134. print(f" ioc_value : {args.ioc_value}")
  135. print(f" event_id : {event_id}")
  136. # ------------------------------------------------------------------
  137. # Step 0: Health check
  138. # ------------------------------------------------------------------
  139. step(0, "Health check")
  140. try:
  141. h = _get(f"{INTEGRATOR_URL}/health")
  142. if h.get("ok"):
  143. ok(f"soc-integrator reachable at {INTEGRATOR_URL}")
  144. else:
  145. fail(f"soc-integrator unhealthy: {h}")
  146. return 1
  147. except Exception as exc:
  148. fail(f"Cannot reach soc-integrator: {exc}")
  149. return 1
  150. # ------------------------------------------------------------------
  151. # Step 1: Snapshot IRIS alert count before
  152. # ------------------------------------------------------------------
  153. step(1, "Snapshot IRIS alert count before sync")
  154. max_id_before = 0
  155. try:
  156. resp = _get(f"{INTEGRATOR_URL}/iris/alerts?per_page=5&sort_by=alert_id&sort_dir=desc")
  157. alerts_before = (resp.get("data") or {}).get("alerts", {}).get("data", [])
  158. max_id_before = max((a["alert_id"] for a in alerts_before), default=0)
  159. ok(f"Latest alert_id before = {max_id_before}")
  160. except Exception as exc:
  161. warn(f"Could not snapshot IRIS alerts (verification skipped): {exc}")
  162. # ------------------------------------------------------------------
  163. # Step 2: Direct smoke test — /mvp/ioc/evaluate without Shuffle
  164. # ------------------------------------------------------------------
  165. step(2, "Direct /mvp/ioc/evaluate smoke test (confirm no Shuffle)")
  166. try:
  167. eval_resp = _integrator(
  168. "/mvp/ioc/evaluate", method="POST",
  169. data={"ioc_type": args.ioc_type, "ioc_value": args.ioc_value},
  170. )
  171. eval_data = eval_resp.get("data", {})
  172. decision_source = eval_data.get("decision_source", "")
  173. confidence = eval_data.get("confidence", 0.0)
  174. matched = eval_data.get("matched", False)
  175. severity = eval_data.get("severity", "")
  176. iocs = eval_data.get("iocs", [])
  177. info(f"decision_source : {decision_source}")
  178. info(f"matched : {matched} confidence={confidence:.4f} severity={severity}")
  179. if iocs:
  180. for ioc in iocs:
  181. info(f"ioc match : {ioc.get('reason','')[:80]}")
  182. if decision_source == "direct_api":
  183. ok("decision_source=direct_api — VT/AbuseIPDB path confirmed, no Shuffle")
  184. elif decision_source == "skipped":
  185. ok("IOC skipped (empty/placeholder value) — try a real IP for full VT result")
  186. else:
  187. fail(f"Unexpected decision_source={decision_source!r} — check evaluate_ioc()")
  188. errors += 1
  189. except Exception as exc:
  190. fail(f"/mvp/ioc/evaluate failed: {exc}")
  191. errors += 1
  192. # ------------------------------------------------------------------
  193. # Step 3: Send IOC syslog event to Wazuh
  194. # ------------------------------------------------------------------
  195. step(3, "Send IOC syslog event to Wazuh")
  196. sent_ok = False
  197. if args.no_send:
  198. info("Skipped (--no-send)")
  199. else:
  200. if args.ioc_type == "domain":
  201. log_msg = build_ioc_dns_log(event_id, args.ioc_value)
  202. rule_note = "rule 110301 (A1-01 DNS IOC, level 8)"
  203. else:
  204. log_msg = build_ioc_ips_log(event_id, args.ioc_value)
  205. rule_note = "rule 110318 (A2-08 FortiGate IPS IOC, level 8)"
  206. info(f"Format : {rule_note}")
  207. info(f"Log : {log_msg[:100]}...")
  208. sent_ok = _send_via_docker(log_msg, SYSLOG_PORT)
  209. if sent_ok:
  210. ok(f"Syslog sent via {WAZUH_CONTAINER} (event_id={event_id})")
  211. else:
  212. fail(f"Failed to send syslog — is {WAZUH_CONTAINER} running?")
  213. errors += 1
  214. # ------------------------------------------------------------------
  215. # Step 4: Wait for Wazuh indexer
  216. # ------------------------------------------------------------------
  217. step(4, f"Wait {args.wait}s for Wazuh indexer")
  218. if args.no_send:
  219. info("Skipped (--no-send)")
  220. else:
  221. for remaining in range(args.wait, 0, -5):
  222. print(f" {INFO} {remaining}s remaining...", end="\r", flush=True)
  223. time.sleep(min(5, remaining))
  224. print()
  225. ok("Wait complete")
  226. # ------------------------------------------------------------------
  227. # Step 5: Trigger sync
  228. # ------------------------------------------------------------------
  229. step(5, "Trigger Wazuh → IRIS sync")
  230. min_sev = args.min_severity
  231. params = f"limit=50&minutes={args.minutes}&q=*&min_severity={min_sev}"
  232. s: dict = {}
  233. try:
  234. resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params)
  235. s = resp["data"]["sync"]
  236. info(f"min_severity_applied : {s['min_severity_applied']}")
  237. info(f"processed : {s['processed']}")
  238. info(f"skipped_existing : {s['skipped_existing']}")
  239. info(f"skipped_filtered : {s.get('skipped_filtered', 0)}")
  240. info(f"ingested : {s['ingested']}")
  241. info(f"ioc_evaluated : {s.get('ioc_evaluated', 0)}")
  242. info(f"ioc_matched : {s.get('ioc_matched', 0)}")
  243. info(f"ioc_rejected : {s.get('ioc_rejected', 0)}")
  244. info(f"iris_alert_ids : {s['iris_alert_ids']}")
  245. if s.get("errors"):
  246. fail(f"Sync errors: {s['errors']}")
  247. errors += 1
  248. else:
  249. ok("Sync completed without errors")
  250. except Exception as exc:
  251. fail(f"Sync request failed: {exc}")
  252. return errors + 1
  253. # ------------------------------------------------------------------
  254. # Step 6: Verify IOC was evaluated
  255. # ------------------------------------------------------------------
  256. step(6, "Verify IOC evaluation in sync result")
  257. ioc_evaluated = s.get("ioc_evaluated", 0)
  258. ioc_matched = s.get("ioc_matched", 0)
  259. ioc_rejected = s.get("ioc_rejected", 0)
  260. if args.no_send:
  261. info("Send skipped — checking for any IOC evaluations this sync window")
  262. if ioc_evaluated > 0:
  263. ok(f"ioc_evaluated={ioc_evaluated} matched={ioc_matched} rejected={ioc_rejected}")
  264. else:
  265. info("No IOC events in this sync window (expected with --no-send)")
  266. elif sent_ok:
  267. if ioc_evaluated > 0:
  268. ok(f"ioc_evaluated={ioc_evaluated} matched={ioc_matched} rejected={ioc_rejected}")
  269. if ioc_matched > 0:
  270. ok(f"IOC matched — IRIS alert(s) should be created: {s['iris_alert_ids']}")
  271. else:
  272. ok(f"IOC not matched ({args.ioc_value} is clean or below threshold)")
  273. elif s.get("skipped_existing", 0) > 0:
  274. warn("Event may have been deduped from a prior run — each run uses a fresh event_id")
  275. warn("If this repeats, check that Wazuh rule actually fires: see logtest instructions")
  276. else:
  277. fail(
  278. f"ioc_evaluated=0 — event not indexed by Wazuh\n"
  279. f" Verify with: docker exec {WAZUH_CONTAINER} "
  280. f"tail -f /var/ossec/logs/alerts/alerts.log | grep {event_id}"
  281. )
  282. errors += 1
  283. # ------------------------------------------------------------------
  284. # Step 7: Verify IRIS alert was created (if matched)
  285. # ------------------------------------------------------------------
  286. step(7, "Check IRIS alerts for new IOC alert")
  287. if ioc_matched > 0:
  288. try:
  289. resp = _get(f"{INTEGRATOR_URL}/iris/alerts?per_page=10&sort_by=alert_id&sort_dir=desc")
  290. alerts_after = (resp.get("data") or {}).get("alerts", {}).get("data", [])
  291. new_alerts = [
  292. a for a in alerts_after
  293. if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh"
  294. ]
  295. if new_alerts:
  296. ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:")
  297. for a in new_alerts:
  298. print(f" alert_id={a['alert_id']} sev={a.get('alert_severity_id')} "
  299. f"title={a.get('alert_title', '')[:55]}")
  300. else:
  301. warn(f"ioc_matched={ioc_matched} but no new wazuh-sourced IRIS alerts found "
  302. f"(may already exist from a prior run)")
  303. except Exception as exc:
  304. fail(f"Could not check IRIS alerts: {exc}")
  305. errors += 1
  306. elif ioc_evaluated > 0:
  307. ok(f"IOC evaluated, not matched — {args.ioc_value} scored below thresholds (expected for test IPs)")
  308. else:
  309. info("No IRIS alert check (no IOC evaluation this run)")
  310. # ------------------------------------------------------------------
  311. # Summary
  312. # ------------------------------------------------------------------
  313. print()
  314. print("─" * 65)
  315. if errors == 0:
  316. print(f" {PASS} All checks passed")
  317. else:
  318. print(f" {FAIL} {errors} check(s) failed")
  319. print("─" * 65)
  320. print()
  321. print("Re-run options:")
  322. print(f" --ioc-type domain --ioc-value evil.example.com (DNS IOC, rule 110301)")
  323. print(f" --ioc-value 198.51.100.42 (IP IOC, rule 110318)")
  324. print(f" --no-send --minutes 60 (sync only, wider window)")
  325. print()
  326. return errors
  327. # ---------------------------------------------------------------------------
  328. # CLI
  329. # ---------------------------------------------------------------------------
  330. if __name__ == "__main__":
  331. parser = argparse.ArgumentParser(
  332. description="End-to-end IOC pipeline test: Wazuh syslog → soc-integrator → IRIS",
  333. formatter_class=argparse.RawDescriptionHelpFormatter,
  334. )
  335. parser.add_argument("--ioc-type", default="ip", choices=["ip", "domain"],
  336. help="IOC type (default: ip)")
  337. parser.add_argument("--ioc-value", default="198.51.100.42",
  338. help="IOC value embedded in syslog event (default: 198.51.100.42)")
  339. parser.add_argument("--min-severity", default="low",
  340. choices=["informational", "low", "medium", "high", "critical"],
  341. help="Min severity filter for sync (default: low)")
  342. parser.add_argument("--wait", type=int, default=20,
  343. help="Seconds to wait for Wazuh indexer (default: 20)")
  344. parser.add_argument("--minutes", type=int, default=5,
  345. help="Sync lookback window in minutes (default: 5)")
  346. parser.add_argument("--no-send", action="store_true",
  347. help="Skip sending syslog — just sync and verify")
  348. args = parser.parse_args()
  349. sys.exit(run(args))