| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- #!/usr/bin/env python3
- """
- test-ioc-pipeline.py — End-to-end test: Wazuh syslog → soc-integrator IOC evaluation → IRIS alert.
- Flow:
- 1. Send a syslog event with embedded IOC payload to Wazuh (via Docker exec)
- 2. Wait for Wazuh indexer to index the alert
- 3. Trigger POST /wazuh/sync-to-mvp
- 4. Verify ioc_evaluated > 0 and decision_source == "direct_api" (VT/AbuseIPDB, no Shuffle)
- 5. Verify IRIS alert created if IOC matched
- Log formats used (both fire level-8 rules):
- ioc_ips → FortiGate IPS: logid=0419016386 ... attack=IOC.IP.Match ioc_type=ip → rule 110318
- ioc_dns → soc_event=dns_ioc event_type=ioc_dns_traffic query=<domain> → rule 110301
- Usage:
- python3 scripts/test-ioc-pipeline.py
- python3 scripts/test-ioc-pipeline.py --ioc-type domain --ioc-value "evil.example.com"
- python3 scripts/test-ioc-pipeline.py --ioc-value "198.51.100.1" --wait 30
- python3 scripts/test-ioc-pipeline.py --no-send # sync only, skip sending
- Env vars:
- INTEGRATOR_URL default: http://localhost:8088
- INTEGRATOR_API_KEY default: dev-internal-key
- """
- from __future__ import annotations
- import argparse
- import datetime
- import json
- import os
- import socket
- import ssl
- import subprocess
- import sys
- import time
- import urllib.request
- import uuid
- # ---------------------------------------------------------------------------
- # Config
- # ---------------------------------------------------------------------------
- INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088")
- INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key")
- WAZUH_CONTAINER = "wazuh-single-wazuh.manager-1"
- SYSLOG_PORT = 514
- SYSLOG_SRC_IP = "172.16.22.253" # must be in allowed-ips
- SSL_CTX = ssl.create_default_context()
- SSL_CTX.check_hostname = False
- SSL_CTX.verify_mode = ssl.CERT_NONE
- PASS = "\033[32m✓\033[0m"
- FAIL = "\033[31m✗\033[0m"
- INFO = "\033[36m·\033[0m"
- WARN = "\033[33m!\033[0m"
- # ---------------------------------------------------------------------------
- # HTTP helpers
- # ---------------------------------------------------------------------------
- def _get(url: str, headers: dict | None = None) -> dict:
- req = urllib.request.Request(url, headers=headers or {})
- with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
- return json.loads(r.read())
- def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict:
- body = json.dumps(data or {}).encode()
- h = {"Content-Type": "application/json", **(headers or {})}
- req = urllib.request.Request(url, data=body, headers=h, method="POST")
- with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r:
- return json.loads(r.read())
- def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict:
- url = f"{INTEGRATOR_URL}{path}"
- if params:
- url += ("&" if "?" in url else "?") + params
- headers = {"X-Internal-API-Key": INTEGRATOR_KEY}
- if method == "POST":
- return _post(url, data, headers)
- return _get(url, headers)
- # ---------------------------------------------------------------------------
- # Syslog helpers
- # ---------------------------------------------------------------------------
- def _ts() -> tuple[str, str]:
- now = datetime.datetime.now(datetime.timezone.utc)
- return now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S")
- def _send_via_docker(raw_line: str, port: int) -> bool:
- """Send raw syslog line from inside the Wazuh container via /dev/udp."""
- py_cmd = (
- f"import socket; s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM); "
- f"s.sendto({repr((raw_line + chr(10)).encode())}, ('127.0.0.1',{port})); s.close()"
- )
- result = subprocess.run(
- ["docker", "exec", WAZUH_CONTAINER, "python3", "-c", py_cmd],
- capture_output=True, timeout=10,
- )
- return result.returncode == 0
- def build_ioc_ips_log(event_id: str, ioc_value: str) -> str:
- """
- FortiGate IPS format that fires rule 110318 (A2-08 IOC IP indicator, level 8).
- Requires: logid=0419016386 + attack field + action=dropped + ioc_type=ip
- """
- date, ts = _ts()
- return (
- f"date={date} time={ts} devname=FG-TEST-FW devid=FGT60E0000000001 "
- f"logid=0419016386 type=utm subtype=ips level=critical vd=root "
- f"srcip={ioc_value} srcport=54321 srcintf=port1 "
- f"dstip=172.16.1.5 dstport=80 dstintf=port2 "
- f"policyid=1 proto=6 action=dropped service=HTTP "
- f"attack=IOC.IP.Match ioc_type=ip ioc_value={ioc_value} "
- f"event_id={event_id}"
- )
- def build_ioc_dns_log(event_id: str, domain: str) -> str:
- """
- SOC DNS IOC format that fires rule 110301 (A1-01 DNS IOC traffic, level 8).
- Requires: soc_event=dns_ioc + event_type=ioc_dns_traffic
- """
- return (
- f"soc_event=dns_ioc event_type=ioc_dns_traffic "
- f"event_id={event_id} "
- f"src_ip=10.26.45.214 query={domain} "
- f"action=blocked severity=medium"
- )
- # ---------------------------------------------------------------------------
- # Pretty helpers
- # ---------------------------------------------------------------------------
- def step(n: int | str, label: str) -> None:
- print(f"\n\033[1mStep {n}: {label}\033[0m")
- def ok(msg: str) -> None:
- print(f" {PASS} {msg}")
- def fail(msg: str) -> None:
- print(f" {FAIL} {msg}")
- def info(msg: str) -> None:
- print(f" {INFO} {msg}")
- def warn(msg: str) -> None:
- print(f" {WARN} {msg}")
- # ---------------------------------------------------------------------------
- # Main test
- # ---------------------------------------------------------------------------
- def run(args: argparse.Namespace) -> int:
- errors = 0
- event_id = f"ioc-test-{uuid.uuid4().hex[:12]}"
- print(f"\n\033[1mIOC Pipeline Test\033[0m (Wazuh → soc-integrator → IRIS)")
- print(f" ioc_type : {args.ioc_type}")
- print(f" ioc_value : {args.ioc_value}")
- print(f" event_id : {event_id}")
- # ------------------------------------------------------------------
- # Step 0: Health check
- # ------------------------------------------------------------------
- step(0, "Health check")
- try:
- h = _get(f"{INTEGRATOR_URL}/health")
- if h.get("ok"):
- ok(f"soc-integrator reachable at {INTEGRATOR_URL}")
- else:
- fail(f"soc-integrator unhealthy: {h}")
- return 1
- except Exception as exc:
- fail(f"Cannot reach soc-integrator: {exc}")
- return 1
- # ------------------------------------------------------------------
- # Step 1: Snapshot IRIS alert count before
- # ------------------------------------------------------------------
- step(1, "Snapshot IRIS alert count before sync")
- max_id_before = 0
- try:
- resp = _get(f"{INTEGRATOR_URL}/iris/alerts?per_page=5&sort_by=alert_id&sort_dir=desc")
- alerts_before = (resp.get("data") or {}).get("alerts", {}).get("data", [])
- max_id_before = max((a["alert_id"] for a in alerts_before), default=0)
- ok(f"Latest alert_id before = {max_id_before}")
- except Exception as exc:
- warn(f"Could not snapshot IRIS alerts (verification skipped): {exc}")
- # ------------------------------------------------------------------
- # Step 2: Direct smoke test — /mvp/ioc/evaluate without Shuffle
- # ------------------------------------------------------------------
- step(2, "Direct /mvp/ioc/evaluate smoke test (confirm no Shuffle)")
- try:
- eval_resp = _integrator(
- "/mvp/ioc/evaluate", method="POST",
- data={"ioc_type": args.ioc_type, "ioc_value": args.ioc_value},
- )
- eval_data = eval_resp.get("data", {})
- decision_source = eval_data.get("decision_source", "")
- confidence = eval_data.get("confidence", 0.0)
- matched = eval_data.get("matched", False)
- severity = eval_data.get("severity", "")
- iocs = eval_data.get("iocs", [])
- info(f"decision_source : {decision_source}")
- info(f"matched : {matched} confidence={confidence:.4f} severity={severity}")
- if iocs:
- for ioc in iocs:
- info(f"ioc match : {ioc.get('reason','')[:80]}")
- if decision_source == "direct_api":
- ok("decision_source=direct_api — VT/AbuseIPDB path confirmed, no Shuffle")
- elif decision_source == "skipped":
- ok("IOC skipped (empty/placeholder value) — try a real IP for full VT result")
- else:
- fail(f"Unexpected decision_source={decision_source!r} — check evaluate_ioc()")
- errors += 1
- except Exception as exc:
- fail(f"/mvp/ioc/evaluate failed: {exc}")
- errors += 1
- # ------------------------------------------------------------------
- # Step 3: Send IOC syslog event to Wazuh
- # ------------------------------------------------------------------
- step(3, "Send IOC syslog event to Wazuh")
- sent_ok = False
- if args.no_send:
- info("Skipped (--no-send)")
- else:
- if args.ioc_type == "domain":
- log_msg = build_ioc_dns_log(event_id, args.ioc_value)
- rule_note = "rule 110301 (A1-01 DNS IOC, level 8)"
- else:
- log_msg = build_ioc_ips_log(event_id, args.ioc_value)
- rule_note = "rule 110318 (A2-08 FortiGate IPS IOC, level 8)"
- info(f"Format : {rule_note}")
- info(f"Log : {log_msg[:100]}...")
- sent_ok = _send_via_docker(log_msg, SYSLOG_PORT)
- if sent_ok:
- ok(f"Syslog sent via {WAZUH_CONTAINER} (event_id={event_id})")
- else:
- fail(f"Failed to send syslog — is {WAZUH_CONTAINER} running?")
- errors += 1
- # ------------------------------------------------------------------
- # Step 4: Wait for Wazuh indexer
- # ------------------------------------------------------------------
- step(4, f"Wait {args.wait}s for Wazuh indexer")
- if args.no_send:
- info("Skipped (--no-send)")
- else:
- for remaining in range(args.wait, 0, -5):
- print(f" {INFO} {remaining}s remaining...", end="\r", flush=True)
- time.sleep(min(5, remaining))
- print()
- ok("Wait complete")
- # ------------------------------------------------------------------
- # Step 5: Trigger sync
- # ------------------------------------------------------------------
- step(5, "Trigger Wazuh → IRIS sync")
- min_sev = args.min_severity
- params = f"limit=50&minutes={args.minutes}&q=*&min_severity={min_sev}"
- s: dict = {}
- try:
- resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params)
- s = resp["data"]["sync"]
- info(f"min_severity_applied : {s['min_severity_applied']}")
- info(f"processed : {s['processed']}")
- info(f"skipped_existing : {s['skipped_existing']}")
- info(f"skipped_filtered : {s.get('skipped_filtered', 0)}")
- info(f"ingested : {s['ingested']}")
- info(f"ioc_evaluated : {s.get('ioc_evaluated', 0)}")
- info(f"ioc_matched : {s.get('ioc_matched', 0)}")
- info(f"ioc_rejected : {s.get('ioc_rejected', 0)}")
- info(f"iris_alert_ids : {s['iris_alert_ids']}")
- if s.get("errors"):
- fail(f"Sync errors: {s['errors']}")
- errors += 1
- else:
- ok("Sync completed without errors")
- except Exception as exc:
- fail(f"Sync request failed: {exc}")
- return errors + 1
- # ------------------------------------------------------------------
- # Step 6: Verify IOC was evaluated
- # ------------------------------------------------------------------
- step(6, "Verify IOC evaluation in sync result")
- ioc_evaluated = s.get("ioc_evaluated", 0)
- ioc_matched = s.get("ioc_matched", 0)
- ioc_rejected = s.get("ioc_rejected", 0)
- if args.no_send:
- info("Send skipped — checking for any IOC evaluations this sync window")
- if ioc_evaluated > 0:
- ok(f"ioc_evaluated={ioc_evaluated} matched={ioc_matched} rejected={ioc_rejected}")
- else:
- info("No IOC events in this sync window (expected with --no-send)")
- elif sent_ok:
- if ioc_evaluated > 0:
- ok(f"ioc_evaluated={ioc_evaluated} matched={ioc_matched} rejected={ioc_rejected}")
- if ioc_matched > 0:
- ok(f"IOC matched — IRIS alert(s) should be created: {s['iris_alert_ids']}")
- else:
- ok(f"IOC not matched ({args.ioc_value} is clean or below threshold)")
- elif s.get("skipped_existing", 0) > 0:
- warn("Event may have been deduped from a prior run — each run uses a fresh event_id")
- warn("If this repeats, check that Wazuh rule actually fires: see logtest instructions")
- else:
- fail(
- f"ioc_evaluated=0 — event not indexed by Wazuh\n"
- f" Verify with: docker exec {WAZUH_CONTAINER} "
- f"tail -f /var/ossec/logs/alerts/alerts.log | grep {event_id}"
- )
- errors += 1
- # ------------------------------------------------------------------
- # Step 7: Verify IRIS alert was created (if matched)
- # ------------------------------------------------------------------
- step(7, "Check IRIS alerts for new IOC alert")
- if ioc_matched > 0:
- try:
- resp = _get(f"{INTEGRATOR_URL}/iris/alerts?per_page=10&sort_by=alert_id&sort_dir=desc")
- alerts_after = (resp.get("data") or {}).get("alerts", {}).get("data", [])
- new_alerts = [
- a for a in alerts_after
- if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh"
- ]
- if new_alerts:
- ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:")
- for a in new_alerts:
- print(f" alert_id={a['alert_id']} sev={a.get('alert_severity_id')} "
- f"title={a.get('alert_title', '')[:55]}")
- else:
- warn(f"ioc_matched={ioc_matched} but no new wazuh-sourced IRIS alerts found "
- f"(may already exist from a prior run)")
- except Exception as exc:
- fail(f"Could not check IRIS alerts: {exc}")
- errors += 1
- elif ioc_evaluated > 0:
- ok(f"IOC evaluated, not matched — {args.ioc_value} scored below thresholds (expected for test IPs)")
- else:
- info("No IRIS alert check (no IOC evaluation this run)")
- # ------------------------------------------------------------------
- # Summary
- # ------------------------------------------------------------------
- print()
- print("─" * 65)
- if errors == 0:
- print(f" {PASS} All checks passed")
- else:
- print(f" {FAIL} {errors} check(s) failed")
- print("─" * 65)
- print()
- print("Re-run options:")
- print(f" --ioc-type domain --ioc-value evil.example.com (DNS IOC, rule 110301)")
- print(f" --ioc-value 198.51.100.42 (IP IOC, rule 110318)")
- print(f" --no-send --minutes 60 (sync only, wider window)")
- print()
- return errors
- # ---------------------------------------------------------------------------
- # CLI
- # ---------------------------------------------------------------------------
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(
- description="End-to-end IOC pipeline test: Wazuh syslog → soc-integrator → IRIS",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- parser.add_argument("--ioc-type", default="ip", choices=["ip", "domain"],
- help="IOC type (default: ip)")
- parser.add_argument("--ioc-value", default="198.51.100.42",
- help="IOC value embedded in syslog event (default: 198.51.100.42)")
- parser.add_argument("--min-severity", default="low",
- choices=["informational", "low", "medium", "high", "critical"],
- help="Min severity filter for sync (default: low)")
- parser.add_argument("--wait", type=int, default=20,
- help="Seconds to wait for Wazuh indexer (default: 20)")
- parser.add_argument("--minutes", type=int, default=5,
- help="Sync lookback window in minutes (default: 5)")
- parser.add_argument("--no-send", action="store_true",
- help="Skip sending syslog — just sync and verify")
- args = parser.parse_args()
- sys.exit(run(args))
|