| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- #!/usr/bin/env python3
- """
- test-wazuh-iris-sync.py — End-to-end test: Wazuh → soc-integrator → IRIS alert sync.
- Steps:
- 1. Send test syslog events to Wazuh (optional, skip with --no-send)
- 2. Wait for Wazuh indexer to index them
- 3. Call POST /wazuh/sync-to-mvp
- 4. Verify IRIS alerts were created with source="wazuh"
- 5. Print a pass/fail summary
- Usage:
- python3 scripts/test-wazuh-iris-sync.py
- python3 scripts/test-wazuh-iris-sync.py --no-send # skip sending, just sync
- python3 scripts/test-wazuh-iris-sync.py --min-severity high
- python3 scripts/test-wazuh-iris-sync.py --minutes 60 # widen search window
- Env vars (override defaults):
- INTEGRATOR_URL default: http://localhost:8088
- INTEGRATOR_API_KEY default: dev-internal-key
- IRIS_URL default: https://localhost:8443
- IRIS_API_KEY required for IRIS verification (or set in soc-integrator/.env)
- """
- from __future__ import annotations
- import argparse
- import json
- import os
- import ssl
- import subprocess
- import sys
- import time
- import urllib.request
- from pathlib import Path
- # ---------------------------------------------------------------------------
- # Config
- # ---------------------------------------------------------------------------
- INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088")
- INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key")
- IRIS_URL = os.environ.get("IRIS_URL", "https://localhost:8443")
- # Try to read IRIS_API_KEY from env, then from soc-integrator/.env
- def _read_iris_key() -> str:
- if k := os.environ.get("IRIS_API_KEY"):
- return k
- env_file = Path(__file__).parent.parent / "soc-integrator" / ".env"
- if env_file.exists():
- for line in env_file.read_text().splitlines():
- if line.startswith("IRIS_API_KEY="):
- return line.split("=", 1)[1].strip()
- return ""
- IRIS_KEY = _read_iris_key()
- SSL_CTX = ssl.create_default_context()
- SSL_CTX.check_hostname = False
- SSL_CTX.verify_mode = ssl.CERT_NONE
- PASS = "\033[32m✓\033[0m"
- FAIL = "\033[31m✗\033[0m"
- INFO = "\033[36m·\033[0m"
- # ---------------------------------------------------------------------------
- # Helpers
- # ---------------------------------------------------------------------------
- def _get(url: str, headers: dict | None = None) -> dict:
- req = urllib.request.Request(url, headers=headers or {})
- with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
- return json.loads(r.read())
- def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict:
- body = json.dumps(data or {}).encode() if data else b""
- h = {"Content-Type": "application/json", **(headers or {})}
- req = urllib.request.Request(url, data=body, headers=h, method="POST")
- with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r:
- return json.loads(r.read())
- def _put(url: str, data: dict, headers: dict | None = None) -> dict:
- body = json.dumps(data).encode()
- h = {"Content-Type": "application/json", **(headers or {})}
- req = urllib.request.Request(url, data=body, headers=h, method="PUT")
- with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
- return json.loads(r.read())
- def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict:
- url = f"{INTEGRATOR_URL}{path}"
- if params:
- url += ("&" if "?" in url else "?") + params
- headers = {"X-Internal-API-Key": INTEGRATOR_KEY}
- if method == "POST":
- return _post(url, data, headers)
- if method == "PUT":
- return _put(url, data or {}, headers)
- return _get(url, headers)
- def _iris_alerts(page: int = 1, per_page: int = 20) -> list[dict]:
- url = f"{INTEGRATOR_URL}/iris/alerts?page={page}&per_page={per_page}&sort_by=alert_id&sort_dir=desc"
- data = _get(url)
- return (data.get("data") or {}).get("alerts", {}).get("data", [])
- def step(n: int, label: str) -> None:
- print(f"\n\033[1mStep {n}: {label}\033[0m")
- def ok(msg: str) -> None:
- print(f" {PASS} {msg}")
- def fail(msg: str) -> None:
- print(f" {FAIL} {msg}")
- def info(msg: str) -> None:
- print(f" {INFO} {msg}")
- # ---------------------------------------------------------------------------
- # Main test
- # ---------------------------------------------------------------------------
- def run(args: argparse.Namespace) -> int:
- errors = 0
- # ------------------------------------------------------------------
- # Step 0: Health check
- # ------------------------------------------------------------------
- step(0, "Health check")
- try:
- h = _get(f"{INTEGRATOR_URL}/health")
- if h.get("ok"):
- ok(f"soc-integrator reachable at {INTEGRATOR_URL}")
- else:
- fail(f"soc-integrator unhealthy: {h}")
- errors += 1
- except Exception as exc:
- fail(f"Cannot reach soc-integrator: {exc}")
- return 1
- # ------------------------------------------------------------------
- # Step 1: Read current sync policy
- # ------------------------------------------------------------------
- step(1, "Read sync policy")
- try:
- policy_resp = _integrator("/wazuh/sync-policy")
- current_min = policy_resp["data"]["sync"]["min_severity"]
- ok(f"Current min_severity = {current_min!r}")
- except Exception as exc:
- fail(f"Could not read sync policy: {exc}")
- errors += 1
- current_min = "medium"
- # ------------------------------------------------------------------
- # Step 2: Optionally send test events to Wazuh
- # ------------------------------------------------------------------
- step(2, "Send test events to Wazuh")
- if args.no_send:
- info("Skipped (--no-send)")
- else:
- script = Path(__file__).parent / "test-firewall-syslog.py"
- if not script.exists():
- info("test-firewall-syslog.py not found — skipping send")
- else:
- try:
- result = subprocess.run(
- [sys.executable, str(script), "--via-docker", "--scenario", args.scenario],
- capture_output=True, text=True, timeout=30,
- )
- sent = result.stdout.count("✓")
- if sent:
- ok(f"Sent {sent} test event(s) (scenario={args.scenario})")
- else:
- fail(f"No events sent\n{result.stdout}\n{result.stderr}")
- errors += 1
- except Exception as exc:
- fail(f"Failed to send test events: {exc}")
- errors += 1
- # ------------------------------------------------------------------
- # Step 3: Wait for Wazuh indexer
- # ------------------------------------------------------------------
- step(3, f"Wait {args.wait}s for Wazuh indexer")
- if args.no_send:
- info("Skipped (--no-send)")
- else:
- for i in range(args.wait, 0, -5):
- print(f" {INFO} {i}s remaining...", end="\r", flush=True)
- time.sleep(min(5, i))
- print()
- ok("Done")
- # ------------------------------------------------------------------
- # Step 4: Snapshot IRIS alert count before sync
- # ------------------------------------------------------------------
- step(4, "Snapshot IRIS alert count before sync")
- try:
- alerts_before = _iris_alerts(per_page=5)
- max_id_before = max((a["alert_id"] for a in alerts_before), default=0)
- ok(f"Latest IRIS alert_id before sync: {max_id_before}")
- except Exception as exc:
- info(f"Could not read IRIS alerts (verification will be skipped): {exc}")
- max_id_before = 0
- # ------------------------------------------------------------------
- # Step 5: Run sync
- # ------------------------------------------------------------------
- step(5, "Run sync")
- min_sev = args.min_severity or current_min
- params = f"limit={args.limit}&minutes={args.minutes}&q=*&min_severity={min_sev}"
- try:
- resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params)
- s = resp["data"]["sync"]
- info(f"min_severity_applied : {s['min_severity_applied']}")
- info(f"processed : {s['processed']}")
- info(f"skipped_existing : {s['skipped_existing']}")
- info(f"skipped_filtered : {s.get('skipped_filtered', 0)}")
- info(f"ingested : {s['ingested']}")
- info(f"iris_alert_ids : {s['iris_alert_ids']}")
- if s.get("errors"):
- fail(f"Sync errors: {s['errors']}")
- errors += 1
- else:
- ok("Sync completed without errors")
- except Exception as exc:
- fail(f"Sync request failed: {exc}")
- return errors + 1
- # ------------------------------------------------------------------
- # Step 6: Verify new IRIS alerts
- # ------------------------------------------------------------------
- step(6, "Verify IRIS alerts")
- if not s["iris_alert_ids"]:
- if s["ingested"] == 0 and s["skipped_existing"] == s["processed"]:
- ok("All alerts already synced (no duplicates created) — dedup working")
- elif s["skipped_filtered"] > 0 and s["ingested"] == 0:
- ok(f"All new alerts filtered by min_severity={min_sev} — filter working")
- else:
- info("No new IRIS alerts created this run")
- else:
- try:
- alerts_after = _iris_alerts(per_page=10)
- new_alerts = [a for a in alerts_after if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh"]
- if new_alerts:
- ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:")
- for a in new_alerts:
- print(f" alert_id={a['alert_id']} ref={a.get('alert_source_ref','')} title={a.get('alert_title','')[:55]}")
- else:
- fail(f"iris_alert_ids={s['iris_alert_ids']} but no matching IRIS alerts found")
- errors += 1
- except Exception as exc:
- fail(f"Could not verify IRIS alerts: {exc}")
- errors += 1
- # ------------------------------------------------------------------
- # Step 7: Auto-sync status
- # ------------------------------------------------------------------
- step(7, "Auto-sync worker status")
- try:
- st = _integrator("/wazuh/auto-sync/status")["data"]
- info(f"enabled : {st['enabled']}")
- info(f"task_running : {st['task_running']}")
- info(f"min_severity : {st['settings']['min_severity']}")
- if st.get("state", {}).get("last_status"):
- info(f"last_status : {st['state']['last_status']}")
- ok("Auto-sync status retrieved")
- except Exception as exc:
- fail(f"Could not read auto-sync status: {exc}")
- errors += 1
- # ------------------------------------------------------------------
- # Summary
- # ------------------------------------------------------------------
- print()
- print("─" * 60)
- if errors == 0:
- print(f" {PASS} All checks passed")
- else:
- print(f" {FAIL} {errors} check(s) failed")
- print("─" * 60)
- return errors
- # ---------------------------------------------------------------------------
- # CLI
- # ---------------------------------------------------------------------------
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="End-to-end test: Wazuh → soc-integrator → IRIS")
- parser.add_argument("--no-send", action="store_true", help="Skip sending test events to Wazuh")
- parser.add_argument("--scenario", default="rdp", help="Firewall scenario to send (default: rdp)")
- parser.add_argument("--wait", type=int, default=20, help="Seconds to wait for indexer (default: 20)")
- parser.add_argument("--minutes", type=int, default=5, help="Sync lookback window in minutes (default: 5)")
- parser.add_argument("--limit", type=int, default=20, help="Max alerts to sync (default: 20)")
- parser.add_argument("--min-severity", default=None,
- help="Override min_severity for this run (default: use policy)")
- args = parser.parse_args()
- sys.exit(run(args))
|