#!/usr/bin/env python3 """ test-ioc-pipeline.py — End-to-end test: Wazuh syslog → soc-integrator IOC evaluation → IRIS alert. Flow: 1. Send a syslog event with embedded IOC payload to Wazuh (via Docker exec) 2. Wait for Wazuh indexer to index the alert 3. Trigger POST /wazuh/sync-to-mvp 4. Verify ioc_evaluated > 0 and decision_source == "direct_api" (VT/AbuseIPDB, no Shuffle) 5. Verify IRIS alert created if IOC matched Log formats used (both fire level-8 rules): ioc_ips → FortiGate IPS: logid=0419016386 ... attack=IOC.IP.Match ioc_type=ip → rule 110318 ioc_dns → soc_event=dns_ioc event_type=ioc_dns_traffic query= → rule 110301 Usage: python3 scripts/test-ioc-pipeline.py python3 scripts/test-ioc-pipeline.py --ioc-type domain --ioc-value "evil.example.com" python3 scripts/test-ioc-pipeline.py --ioc-value "198.51.100.1" --wait 30 python3 scripts/test-ioc-pipeline.py --no-send # sync only, skip sending Env vars: INTEGRATOR_URL default: http://localhost:8088 INTEGRATOR_API_KEY default: dev-internal-key """ from __future__ import annotations import argparse import datetime import json import os import socket import ssl import subprocess import sys import time import urllib.request import uuid # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088") INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key") WAZUH_CONTAINER = "wazuh-single-wazuh.manager-1" SYSLOG_PORT = 514 SYSLOG_SRC_IP = "172.16.22.253" # must be in allowed-ips SSL_CTX = ssl.create_default_context() SSL_CTX.check_hostname = False SSL_CTX.verify_mode = ssl.CERT_NONE PASS = "\033[32m✓\033[0m" FAIL = "\033[31m✗\033[0m" INFO = "\033[36m·\033[0m" WARN = "\033[33m!\033[0m" # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- def _get(url: str, headers: dict | None = None) -> dict: req = urllib.request.Request(url, headers=headers or {}) with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r: return json.loads(r.read()) def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict: body = json.dumps(data or {}).encode() h = {"Content-Type": "application/json", **(headers or {})} req = urllib.request.Request(url, data=body, headers=h, method="POST") with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r: return json.loads(r.read()) def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict: url = f"{INTEGRATOR_URL}{path}" if params: url += ("&" if "?" in url else "?") + params headers = {"X-Internal-API-Key": INTEGRATOR_KEY} if method == "POST": return _post(url, data, headers) return _get(url, headers) # --------------------------------------------------------------------------- # Syslog helpers # --------------------------------------------------------------------------- def _ts() -> tuple[str, str]: now = datetime.datetime.now(datetime.timezone.utc) return now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S") def _send_via_docker(raw_line: str, port: int) -> bool: """Send raw syslog line from inside the Wazuh container via /dev/udp.""" py_cmd = ( f"import socket; s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM); " f"s.sendto({repr((raw_line + chr(10)).encode())}, ('127.0.0.1',{port})); s.close()" ) result = subprocess.run( ["docker", "exec", WAZUH_CONTAINER, "python3", "-c", py_cmd], capture_output=True, timeout=10, ) return result.returncode == 0 def build_ioc_ips_log(event_id: str, ioc_value: str) -> str: """ FortiGate IPS format that fires rule 110318 (A2-08 IOC IP indicator, level 8). Requires: logid=0419016386 + attack field + action=dropped + ioc_type=ip """ date, ts = _ts() return ( f"date={date} time={ts} devname=FG-TEST-FW devid=FGT60E0000000001 " f"logid=0419016386 type=utm subtype=ips level=critical vd=root " f"srcip={ioc_value} srcport=54321 srcintf=port1 " f"dstip=172.16.1.5 dstport=80 dstintf=port2 " f"policyid=1 proto=6 action=dropped service=HTTP " f"attack=IOC.IP.Match ioc_type=ip ioc_value={ioc_value} " f"event_id={event_id}" ) def build_ioc_dns_log(event_id: str, domain: str) -> str: """ SOC DNS IOC format that fires rule 110301 (A1-01 DNS IOC traffic, level 8). Requires: soc_event=dns_ioc + event_type=ioc_dns_traffic """ return ( f"soc_event=dns_ioc event_type=ioc_dns_traffic " f"event_id={event_id} " f"src_ip=10.26.45.214 query={domain} " f"action=blocked severity=medium" ) # --------------------------------------------------------------------------- # Pretty helpers # --------------------------------------------------------------------------- def step(n: int | str, label: str) -> None: print(f"\n\033[1mStep {n}: {label}\033[0m") def ok(msg: str) -> None: print(f" {PASS} {msg}") def fail(msg: str) -> None: print(f" {FAIL} {msg}") def info(msg: str) -> None: print(f" {INFO} {msg}") def warn(msg: str) -> None: print(f" {WARN} {msg}") # --------------------------------------------------------------------------- # Main test # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> int: errors = 0 event_id = f"ioc-test-{uuid.uuid4().hex[:12]}" print(f"\n\033[1mIOC Pipeline Test\033[0m (Wazuh → soc-integrator → IRIS)") print(f" ioc_type : {args.ioc_type}") print(f" ioc_value : {args.ioc_value}") print(f" event_id : {event_id}") # ------------------------------------------------------------------ # Step 0: Health check # ------------------------------------------------------------------ step(0, "Health check") try: h = _get(f"{INTEGRATOR_URL}/health") if h.get("ok"): ok(f"soc-integrator reachable at {INTEGRATOR_URL}") else: fail(f"soc-integrator unhealthy: {h}") return 1 except Exception as exc: fail(f"Cannot reach soc-integrator: {exc}") return 1 # ------------------------------------------------------------------ # Step 1: Snapshot IRIS alert count before # ------------------------------------------------------------------ step(1, "Snapshot IRIS alert count before sync") max_id_before = 0 try: resp = _get(f"{INTEGRATOR_URL}/iris/alerts?per_page=5&sort_by=alert_id&sort_dir=desc") alerts_before = (resp.get("data") or {}).get("alerts", {}).get("data", []) max_id_before = max((a["alert_id"] for a in alerts_before), default=0) ok(f"Latest alert_id before = {max_id_before}") except Exception as exc: warn(f"Could not snapshot IRIS alerts (verification skipped): {exc}") # ------------------------------------------------------------------ # Step 2: Direct smoke test — /mvp/ioc/evaluate without Shuffle # ------------------------------------------------------------------ step(2, "Direct /mvp/ioc/evaluate smoke test (confirm no Shuffle)") try: eval_resp = _integrator( "/mvp/ioc/evaluate", method="POST", data={"ioc_type": args.ioc_type, "ioc_value": args.ioc_value}, ) eval_data = eval_resp.get("data", {}) decision_source = eval_data.get("decision_source", "") confidence = eval_data.get("confidence", 0.0) matched = eval_data.get("matched", False) severity = eval_data.get("severity", "") iocs = eval_data.get("iocs", []) info(f"decision_source : {decision_source}") info(f"matched : {matched} confidence={confidence:.4f} severity={severity}") if iocs: for ioc in iocs: info(f"ioc match : {ioc.get('reason','')[:80]}") if decision_source == "direct_api": ok("decision_source=direct_api — VT/AbuseIPDB path confirmed, no Shuffle") elif decision_source == "skipped": ok("IOC skipped (empty/placeholder value) — try a real IP for full VT result") else: fail(f"Unexpected decision_source={decision_source!r} — check evaluate_ioc()") errors += 1 except Exception as exc: fail(f"/mvp/ioc/evaluate failed: {exc}") errors += 1 # ------------------------------------------------------------------ # Step 3: Send IOC syslog event to Wazuh # ------------------------------------------------------------------ step(3, "Send IOC syslog event to Wazuh") sent_ok = False if args.no_send: info("Skipped (--no-send)") else: if args.ioc_type == "domain": log_msg = build_ioc_dns_log(event_id, args.ioc_value) rule_note = "rule 110301 (A1-01 DNS IOC, level 8)" else: log_msg = build_ioc_ips_log(event_id, args.ioc_value) rule_note = "rule 110318 (A2-08 FortiGate IPS IOC, level 8)" info(f"Format : {rule_note}") info(f"Log : {log_msg[:100]}...") sent_ok = _send_via_docker(log_msg, SYSLOG_PORT) if sent_ok: ok(f"Syslog sent via {WAZUH_CONTAINER} (event_id={event_id})") else: fail(f"Failed to send syslog — is {WAZUH_CONTAINER} running?") errors += 1 # ------------------------------------------------------------------ # Step 4: Wait for Wazuh indexer # ------------------------------------------------------------------ step(4, f"Wait {args.wait}s for Wazuh indexer") if args.no_send: info("Skipped (--no-send)") else: for remaining in range(args.wait, 0, -5): print(f" {INFO} {remaining}s remaining...", end="\r", flush=True) time.sleep(min(5, remaining)) print() ok("Wait complete") # ------------------------------------------------------------------ # Step 5: Trigger sync # ------------------------------------------------------------------ step(5, "Trigger Wazuh → IRIS sync") min_sev = args.min_severity params = f"limit=50&minutes={args.minutes}&q=*&min_severity={min_sev}" s: dict = {} try: resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params) s = resp["data"]["sync"] info(f"min_severity_applied : {s['min_severity_applied']}") info(f"processed : {s['processed']}") info(f"skipped_existing : {s['skipped_existing']}") info(f"skipped_filtered : {s.get('skipped_filtered', 0)}") info(f"ingested : {s['ingested']}") info(f"ioc_evaluated : {s.get('ioc_evaluated', 0)}") info(f"ioc_matched : {s.get('ioc_matched', 0)}") info(f"ioc_rejected : {s.get('ioc_rejected', 0)}") info(f"iris_alert_ids : {s['iris_alert_ids']}") if s.get("errors"): fail(f"Sync errors: {s['errors']}") errors += 1 else: ok("Sync completed without errors") except Exception as exc: fail(f"Sync request failed: {exc}") return errors + 1 # ------------------------------------------------------------------ # Step 6: Verify IOC was evaluated # ------------------------------------------------------------------ step(6, "Verify IOC evaluation in sync result") ioc_evaluated = s.get("ioc_evaluated", 0) ioc_matched = s.get("ioc_matched", 0) ioc_rejected = s.get("ioc_rejected", 0) if args.no_send: info("Send skipped — checking for any IOC evaluations this sync window") if ioc_evaluated > 0: ok(f"ioc_evaluated={ioc_evaluated} matched={ioc_matched} rejected={ioc_rejected}") else: info("No IOC events in this sync window (expected with --no-send)") elif sent_ok: if ioc_evaluated > 0: ok(f"ioc_evaluated={ioc_evaluated} matched={ioc_matched} rejected={ioc_rejected}") if ioc_matched > 0: ok(f"IOC matched — IRIS alert(s) should be created: {s['iris_alert_ids']}") else: ok(f"IOC not matched ({args.ioc_value} is clean or below threshold)") elif s.get("skipped_existing", 0) > 0: warn("Event may have been deduped from a prior run — each run uses a fresh event_id") warn("If this repeats, check that Wazuh rule actually fires: see logtest instructions") else: fail( f"ioc_evaluated=0 — event not indexed by Wazuh\n" f" Verify with: docker exec {WAZUH_CONTAINER} " f"tail -f /var/ossec/logs/alerts/alerts.log | grep {event_id}" ) errors += 1 # ------------------------------------------------------------------ # Step 7: Verify IRIS alert was created (if matched) # ------------------------------------------------------------------ step(7, "Check IRIS alerts for new IOC alert") if ioc_matched > 0: try: resp = _get(f"{INTEGRATOR_URL}/iris/alerts?per_page=10&sort_by=alert_id&sort_dir=desc") alerts_after = (resp.get("data") or {}).get("alerts", {}).get("data", []) new_alerts = [ a for a in alerts_after if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh" ] if new_alerts: ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:") for a in new_alerts: print(f" alert_id={a['alert_id']} sev={a.get('alert_severity_id')} " f"title={a.get('alert_title', '')[:55]}") else: warn(f"ioc_matched={ioc_matched} but no new wazuh-sourced IRIS alerts found " f"(may already exist from a prior run)") except Exception as exc: fail(f"Could not check IRIS alerts: {exc}") errors += 1 elif ioc_evaluated > 0: ok(f"IOC evaluated, not matched — {args.ioc_value} scored below thresholds (expected for test IPs)") else: info("No IRIS alert check (no IOC evaluation this run)") # ------------------------------------------------------------------ # Summary # ------------------------------------------------------------------ print() print("─" * 65) if errors == 0: print(f" {PASS} All checks passed") else: print(f" {FAIL} {errors} check(s) failed") print("─" * 65) print() print("Re-run options:") print(f" --ioc-type domain --ioc-value evil.example.com (DNS IOC, rule 110301)") print(f" --ioc-value 198.51.100.42 (IP IOC, rule 110318)") print(f" --no-send --minutes 60 (sync only, wider window)") print() return errors # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser( description="End-to-end IOC pipeline test: Wazuh syslog → soc-integrator → IRIS", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--ioc-type", default="ip", choices=["ip", "domain"], help="IOC type (default: ip)") parser.add_argument("--ioc-value", default="198.51.100.42", help="IOC value embedded in syslog event (default: 198.51.100.42)") parser.add_argument("--min-severity", default="low", choices=["informational", "low", "medium", "high", "critical"], help="Min severity filter for sync (default: low)") parser.add_argument("--wait", type=int, default=20, help="Seconds to wait for Wazuh indexer (default: 20)") parser.add_argument("--minutes", type=int, default=5, help="Sync lookback window in minutes (default: 5)") parser.add_argument("--no-send", action="store_true", help="Skip sending syslog — just sync and verify") args = parser.parse_args() sys.exit(run(args))