#!/usr/bin/env python3 """ test-wazuh-iris-sync.py — End-to-end test: Wazuh → soc-integrator → IRIS alert sync. Steps: 1. Send test syslog events to Wazuh (optional, skip with --no-send) 2. Wait for Wazuh indexer to index them 3. Call POST /wazuh/sync-to-mvp 4. Verify IRIS alerts were created with source="wazuh" 5. Print a pass/fail summary Usage: python3 scripts/test-wazuh-iris-sync.py python3 scripts/test-wazuh-iris-sync.py --no-send # skip sending, just sync python3 scripts/test-wazuh-iris-sync.py --min-severity high python3 scripts/test-wazuh-iris-sync.py --minutes 60 # widen search window Env vars (override defaults): INTEGRATOR_URL default: http://localhost:8088 INTEGRATOR_API_KEY default: dev-internal-key IRIS_URL default: https://localhost:8443 IRIS_API_KEY required for IRIS verification (or set in soc-integrator/.env) """ from __future__ import annotations import argparse import json import os import ssl import subprocess import sys import time import urllib.request from pathlib import Path # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088") INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key") IRIS_URL = os.environ.get("IRIS_URL", "https://localhost:8443") # Try to read IRIS_API_KEY from env, then from soc-integrator/.env def _read_iris_key() -> str: if k := os.environ.get("IRIS_API_KEY"): return k env_file = Path(__file__).parent.parent / "soc-integrator" / ".env" if env_file.exists(): for line in env_file.read_text().splitlines(): if line.startswith("IRIS_API_KEY="): return line.split("=", 1)[1].strip() return "" IRIS_KEY = _read_iris_key() SSL_CTX = ssl.create_default_context() SSL_CTX.check_hostname = False SSL_CTX.verify_mode = ssl.CERT_NONE PASS = "\033[32m✓\033[0m" FAIL = "\033[31m✗\033[0m" INFO = "\033[36m·\033[0m" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _get(url: str, headers: dict | None = None) -> dict: req = urllib.request.Request(url, headers=headers or {}) with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r: return json.loads(r.read()) def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict: body = json.dumps(data or {}).encode() if data else b"" h = {"Content-Type": "application/json", **(headers or {})} req = urllib.request.Request(url, data=body, headers=h, method="POST") with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r: return json.loads(r.read()) def _put(url: str, data: dict, headers: dict | None = None) -> dict: body = json.dumps(data).encode() h = {"Content-Type": "application/json", **(headers or {})} req = urllib.request.Request(url, data=body, headers=h, method="PUT") with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r: return json.loads(r.read()) def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict: url = f"{INTEGRATOR_URL}{path}" if params: url += ("&" if "?" in url else "?") + params headers = {"X-Internal-API-Key": INTEGRATOR_KEY} if method == "POST": return _post(url, data, headers) if method == "PUT": return _put(url, data or {}, headers) return _get(url, headers) def _iris_alerts(page: int = 1, per_page: int = 20) -> list[dict]: url = f"{INTEGRATOR_URL}/iris/alerts?page={page}&per_page={per_page}&sort_by=alert_id&sort_dir=desc" data = _get(url) return (data.get("data") or {}).get("alerts", {}).get("data", []) def step(n: int, label: str) -> None: print(f"\n\033[1mStep {n}: {label}\033[0m") def ok(msg: str) -> None: print(f" {PASS} {msg}") def fail(msg: str) -> None: print(f" {FAIL} {msg}") def info(msg: str) -> None: print(f" {INFO} {msg}") # --------------------------------------------------------------------------- # Main test # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> int: errors = 0 # ------------------------------------------------------------------ # Step 0: Health check # ------------------------------------------------------------------ step(0, "Health check") try: h = _get(f"{INTEGRATOR_URL}/health") if h.get("ok"): ok(f"soc-integrator reachable at {INTEGRATOR_URL}") else: fail(f"soc-integrator unhealthy: {h}") errors += 1 except Exception as exc: fail(f"Cannot reach soc-integrator: {exc}") return 1 # ------------------------------------------------------------------ # Step 1: Read current sync policy # ------------------------------------------------------------------ step(1, "Read sync policy") try: policy_resp = _integrator("/wazuh/sync-policy") current_min = policy_resp["data"]["sync"]["min_severity"] ok(f"Current min_severity = {current_min!r}") except Exception as exc: fail(f"Could not read sync policy: {exc}") errors += 1 current_min = "medium" # ------------------------------------------------------------------ # Step 2: Optionally send test events to Wazuh # ------------------------------------------------------------------ step(2, "Send test events to Wazuh") if args.no_send: info("Skipped (--no-send)") else: script = Path(__file__).parent / "test-firewall-syslog.py" if not script.exists(): info("test-firewall-syslog.py not found — skipping send") else: try: result = subprocess.run( [sys.executable, str(script), "--via-docker", "--scenario", args.scenario], capture_output=True, text=True, timeout=30, ) sent = result.stdout.count("✓") if sent: ok(f"Sent {sent} test event(s) (scenario={args.scenario})") else: fail(f"No events sent\n{result.stdout}\n{result.stderr}") errors += 1 except Exception as exc: fail(f"Failed to send test events: {exc}") errors += 1 # ------------------------------------------------------------------ # Step 3: Wait for Wazuh indexer # ------------------------------------------------------------------ step(3, f"Wait {args.wait}s for Wazuh indexer") if args.no_send: info("Skipped (--no-send)") else: for i in range(args.wait, 0, -5): print(f" {INFO} {i}s remaining...", end="\r", flush=True) time.sleep(min(5, i)) print() ok("Done") # ------------------------------------------------------------------ # Step 4: Snapshot IRIS alert count before sync # ------------------------------------------------------------------ step(4, "Snapshot IRIS alert count before sync") try: alerts_before = _iris_alerts(per_page=5) max_id_before = max((a["alert_id"] for a in alerts_before), default=0) ok(f"Latest IRIS alert_id before sync: {max_id_before}") except Exception as exc: info(f"Could not read IRIS alerts (verification will be skipped): {exc}") max_id_before = 0 # ------------------------------------------------------------------ # Step 5: Run sync # ------------------------------------------------------------------ step(5, "Run sync") min_sev = args.min_severity or current_min params = f"limit={args.limit}&minutes={args.minutes}&q=*&min_severity={min_sev}" try: resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params) s = resp["data"]["sync"] info(f"min_severity_applied : {s['min_severity_applied']}") info(f"processed : {s['processed']}") info(f"skipped_existing : {s['skipped_existing']}") info(f"skipped_filtered : {s.get('skipped_filtered', 0)}") info(f"ingested : {s['ingested']}") info(f"iris_alert_ids : {s['iris_alert_ids']}") if s.get("errors"): fail(f"Sync errors: {s['errors']}") errors += 1 else: ok("Sync completed without errors") except Exception as exc: fail(f"Sync request failed: {exc}") return errors + 1 # ------------------------------------------------------------------ # Step 6: Verify new IRIS alerts # ------------------------------------------------------------------ step(6, "Verify IRIS alerts") if not s["iris_alert_ids"]: if s["ingested"] == 0 and s["skipped_existing"] == s["processed"]: ok("All alerts already synced (no duplicates created) — dedup working") elif s["skipped_filtered"] > 0 and s["ingested"] == 0: ok(f"All new alerts filtered by min_severity={min_sev} — filter working") else: info("No new IRIS alerts created this run") else: try: alerts_after = _iris_alerts(per_page=10) new_alerts = [a for a in alerts_after if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh"] if new_alerts: ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:") for a in new_alerts: print(f" alert_id={a['alert_id']} ref={a.get('alert_source_ref','')} title={a.get('alert_title','')[:55]}") else: fail(f"iris_alert_ids={s['iris_alert_ids']} but no matching IRIS alerts found") errors += 1 except Exception as exc: fail(f"Could not verify IRIS alerts: {exc}") errors += 1 # ------------------------------------------------------------------ # Step 7: Auto-sync status # ------------------------------------------------------------------ step(7, "Auto-sync worker status") try: st = _integrator("/wazuh/auto-sync/status")["data"] info(f"enabled : {st['enabled']}") info(f"task_running : {st['task_running']}") info(f"min_severity : {st['settings']['min_severity']}") if st.get("state", {}).get("last_status"): info(f"last_status : {st['state']['last_status']}") ok("Auto-sync status retrieved") except Exception as exc: fail(f"Could not read auto-sync status: {exc}") errors += 1 # ------------------------------------------------------------------ # Summary # ------------------------------------------------------------------ print() print("─" * 60) if errors == 0: print(f" {PASS} All checks passed") else: print(f" {FAIL} {errors} check(s) failed") print("─" * 60) return errors # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser(description="End-to-end test: Wazuh → soc-integrator → IRIS") parser.add_argument("--no-send", action="store_true", help="Skip sending test events to Wazuh") parser.add_argument("--scenario", default="rdp", help="Firewall scenario to send (default: rdp)") parser.add_argument("--wait", type=int, default=20, help="Seconds to wait for indexer (default: 20)") parser.add_argument("--minutes", type=int, default=5, help="Sync lookback window in minutes (default: 5)") parser.add_argument("--limit", type=int, default=20, help="Max alerts to sync (default: 20)") parser.add_argument("--min-severity", default=None, help="Override min_severity for this run (default: use policy)") args = parser.parse_args() sys.exit(run(args))