Nav apraksta

test-wazuh-iris-sync.py 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. #!/usr/bin/env python3
  2. """
  3. test-wazuh-iris-sync.py — End-to-end test: Wazuh → soc-integrator → IRIS alert sync.
  4. Steps:
  5. 1. Send test syslog events to Wazuh (optional, skip with --no-send)
  6. 2. Wait for Wazuh indexer to index them
  7. 3. Call POST /wazuh/sync-to-mvp
  8. 4. Verify IRIS alerts were created with source="wazuh"
  9. 5. Print a pass/fail summary
  10. Usage:
  11. python3 scripts/test-wazuh-iris-sync.py
  12. python3 scripts/test-wazuh-iris-sync.py --no-send # skip sending, just sync
  13. python3 scripts/test-wazuh-iris-sync.py --min-severity high
  14. python3 scripts/test-wazuh-iris-sync.py --minutes 60 # widen search window
  15. Env vars (override defaults):
  16. INTEGRATOR_URL default: http://localhost:8088
  17. INTEGRATOR_API_KEY default: dev-internal-key
  18. IRIS_URL default: https://localhost:8443
  19. IRIS_API_KEY required for IRIS verification (or set in soc-integrator/.env)
  20. """
  21. from __future__ import annotations
  22. import argparse
  23. import json
  24. import os
  25. import ssl
  26. import subprocess
  27. import sys
  28. import time
  29. import urllib.request
  30. from pathlib import Path
  31. # ---------------------------------------------------------------------------
  32. # Config
  33. # ---------------------------------------------------------------------------
  34. INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088")
  35. INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key")
  36. IRIS_URL = os.environ.get("IRIS_URL", "https://localhost:8443")
  37. # Try to read IRIS_API_KEY from env, then from soc-integrator/.env
  38. def _read_iris_key() -> str:
  39. if k := os.environ.get("IRIS_API_KEY"):
  40. return k
  41. env_file = Path(__file__).parent.parent / "soc-integrator" / ".env"
  42. if env_file.exists():
  43. for line in env_file.read_text().splitlines():
  44. if line.startswith("IRIS_API_KEY="):
  45. return line.split("=", 1)[1].strip()
  46. return ""
  47. IRIS_KEY = _read_iris_key()
  48. SSL_CTX = ssl.create_default_context()
  49. SSL_CTX.check_hostname = False
  50. SSL_CTX.verify_mode = ssl.CERT_NONE
  51. PASS = "\033[32m✓\033[0m"
  52. FAIL = "\033[31m✗\033[0m"
  53. INFO = "\033[36m·\033[0m"
  54. # ---------------------------------------------------------------------------
  55. # Helpers
  56. # ---------------------------------------------------------------------------
  57. def _get(url: str, headers: dict | None = None) -> dict:
  58. req = urllib.request.Request(url, headers=headers or {})
  59. with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
  60. return json.loads(r.read())
  61. def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict:
  62. body = json.dumps(data or {}).encode() if data else b""
  63. h = {"Content-Type": "application/json", **(headers or {})}
  64. req = urllib.request.Request(url, data=body, headers=h, method="POST")
  65. with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r:
  66. return json.loads(r.read())
  67. def _put(url: str, data: dict, headers: dict | None = None) -> dict:
  68. body = json.dumps(data).encode()
  69. h = {"Content-Type": "application/json", **(headers or {})}
  70. req = urllib.request.Request(url, data=body, headers=h, method="PUT")
  71. with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
  72. return json.loads(r.read())
  73. def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict:
  74. url = f"{INTEGRATOR_URL}{path}"
  75. if params:
  76. url += ("&" if "?" in url else "?") + params
  77. headers = {"X-Internal-API-Key": INTEGRATOR_KEY}
  78. if method == "POST":
  79. return _post(url, data, headers)
  80. if method == "PUT":
  81. return _put(url, data or {}, headers)
  82. return _get(url, headers)
  83. def _iris_alerts(page: int = 1, per_page: int = 20) -> list[dict]:
  84. url = f"{INTEGRATOR_URL}/iris/alerts?page={page}&per_page={per_page}&sort_by=alert_id&sort_dir=desc"
  85. data = _get(url)
  86. return (data.get("data") or {}).get("alerts", {}).get("data", [])
  87. def step(n: int, label: str) -> None:
  88. print(f"\n\033[1mStep {n}: {label}\033[0m")
  89. def ok(msg: str) -> None:
  90. print(f" {PASS} {msg}")
  91. def fail(msg: str) -> None:
  92. print(f" {FAIL} {msg}")
  93. def info(msg: str) -> None:
  94. print(f" {INFO} {msg}")
  95. # ---------------------------------------------------------------------------
  96. # Main test
  97. # ---------------------------------------------------------------------------
  98. def run(args: argparse.Namespace) -> int:
  99. errors = 0
  100. # ------------------------------------------------------------------
  101. # Step 0: Health check
  102. # ------------------------------------------------------------------
  103. step(0, "Health check")
  104. try:
  105. h = _get(f"{INTEGRATOR_URL}/health")
  106. if h.get("ok"):
  107. ok(f"soc-integrator reachable at {INTEGRATOR_URL}")
  108. else:
  109. fail(f"soc-integrator unhealthy: {h}")
  110. errors += 1
  111. except Exception as exc:
  112. fail(f"Cannot reach soc-integrator: {exc}")
  113. return 1
  114. # ------------------------------------------------------------------
  115. # Step 1: Read current sync policy
  116. # ------------------------------------------------------------------
  117. step(1, "Read sync policy")
  118. try:
  119. policy_resp = _integrator("/wazuh/sync-policy")
  120. current_min = policy_resp["data"]["sync"]["min_severity"]
  121. ok(f"Current min_severity = {current_min!r}")
  122. except Exception as exc:
  123. fail(f"Could not read sync policy: {exc}")
  124. errors += 1
  125. current_min = "medium"
  126. # ------------------------------------------------------------------
  127. # Step 2: Optionally send test events to Wazuh
  128. # ------------------------------------------------------------------
  129. step(2, "Send test events to Wazuh")
  130. if args.no_send:
  131. info("Skipped (--no-send)")
  132. else:
  133. script = Path(__file__).parent / "test-firewall-syslog.py"
  134. if not script.exists():
  135. info("test-firewall-syslog.py not found — skipping send")
  136. else:
  137. try:
  138. result = subprocess.run(
  139. [sys.executable, str(script), "--via-docker", "--scenario", args.scenario],
  140. capture_output=True, text=True, timeout=30,
  141. )
  142. sent = result.stdout.count("✓")
  143. if sent:
  144. ok(f"Sent {sent} test event(s) (scenario={args.scenario})")
  145. else:
  146. fail(f"No events sent\n{result.stdout}\n{result.stderr}")
  147. errors += 1
  148. except Exception as exc:
  149. fail(f"Failed to send test events: {exc}")
  150. errors += 1
  151. # ------------------------------------------------------------------
  152. # Step 3: Wait for Wazuh indexer
  153. # ------------------------------------------------------------------
  154. step(3, f"Wait {args.wait}s for Wazuh indexer")
  155. if args.no_send:
  156. info("Skipped (--no-send)")
  157. else:
  158. for i in range(args.wait, 0, -5):
  159. print(f" {INFO} {i}s remaining...", end="\r", flush=True)
  160. time.sleep(min(5, i))
  161. print()
  162. ok("Done")
  163. # ------------------------------------------------------------------
  164. # Step 4: Snapshot IRIS alert count before sync
  165. # ------------------------------------------------------------------
  166. step(4, "Snapshot IRIS alert count before sync")
  167. try:
  168. alerts_before = _iris_alerts(per_page=5)
  169. max_id_before = max((a["alert_id"] for a in alerts_before), default=0)
  170. ok(f"Latest IRIS alert_id before sync: {max_id_before}")
  171. except Exception as exc:
  172. info(f"Could not read IRIS alerts (verification will be skipped): {exc}")
  173. max_id_before = 0
  174. # ------------------------------------------------------------------
  175. # Step 5: Run sync
  176. # ------------------------------------------------------------------
  177. step(5, "Run sync")
  178. min_sev = args.min_severity or current_min
  179. params = f"limit={args.limit}&minutes={args.minutes}&q=*&min_severity={min_sev}"
  180. try:
  181. resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params)
  182. s = resp["data"]["sync"]
  183. info(f"min_severity_applied : {s['min_severity_applied']}")
  184. info(f"processed : {s['processed']}")
  185. info(f"skipped_existing : {s['skipped_existing']}")
  186. info(f"skipped_filtered : {s.get('skipped_filtered', 0)}")
  187. info(f"ingested : {s['ingested']}")
  188. info(f"iris_alert_ids : {s['iris_alert_ids']}")
  189. if s.get("errors"):
  190. fail(f"Sync errors: {s['errors']}")
  191. errors += 1
  192. else:
  193. ok("Sync completed without errors")
  194. except Exception as exc:
  195. fail(f"Sync request failed: {exc}")
  196. return errors + 1
  197. # ------------------------------------------------------------------
  198. # Step 6: Verify new IRIS alerts
  199. # ------------------------------------------------------------------
  200. step(6, "Verify IRIS alerts")
  201. if not s["iris_alert_ids"]:
  202. if s["ingested"] == 0 and s["skipped_existing"] == s["processed"]:
  203. ok("All alerts already synced (no duplicates created) — dedup working")
  204. elif s["skipped_filtered"] > 0 and s["ingested"] == 0:
  205. ok(f"All new alerts filtered by min_severity={min_sev} — filter working")
  206. else:
  207. info("No new IRIS alerts created this run")
  208. else:
  209. try:
  210. alerts_after = _iris_alerts(per_page=10)
  211. new_alerts = [a for a in alerts_after if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh"]
  212. if new_alerts:
  213. ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:")
  214. for a in new_alerts:
  215. print(f" alert_id={a['alert_id']} ref={a.get('alert_source_ref','')} title={a.get('alert_title','')[:55]}")
  216. else:
  217. fail(f"iris_alert_ids={s['iris_alert_ids']} but no matching IRIS alerts found")
  218. errors += 1
  219. except Exception as exc:
  220. fail(f"Could not verify IRIS alerts: {exc}")
  221. errors += 1
  222. # ------------------------------------------------------------------
  223. # Step 7: Auto-sync status
  224. # ------------------------------------------------------------------
  225. step(7, "Auto-sync worker status")
  226. try:
  227. st = _integrator("/wazuh/auto-sync/status")["data"]
  228. info(f"enabled : {st['enabled']}")
  229. info(f"task_running : {st['task_running']}")
  230. info(f"min_severity : {st['settings']['min_severity']}")
  231. if st.get("state", {}).get("last_status"):
  232. info(f"last_status : {st['state']['last_status']}")
  233. ok("Auto-sync status retrieved")
  234. except Exception as exc:
  235. fail(f"Could not read auto-sync status: {exc}")
  236. errors += 1
  237. # ------------------------------------------------------------------
  238. # Summary
  239. # ------------------------------------------------------------------
  240. print()
  241. print("─" * 60)
  242. if errors == 0:
  243. print(f" {PASS} All checks passed")
  244. else:
  245. print(f" {FAIL} {errors} check(s) failed")
  246. print("─" * 60)
  247. return errors
  248. # ---------------------------------------------------------------------------
  249. # CLI
  250. # ---------------------------------------------------------------------------
  251. if __name__ == "__main__":
  252. parser = argparse.ArgumentParser(description="End-to-end test: Wazuh → soc-integrator → IRIS")
  253. parser.add_argument("--no-send", action="store_true", help="Skip sending test events to Wazuh")
  254. parser.add_argument("--scenario", default="rdp", help="Firewall scenario to send (default: rdp)")
  255. parser.add_argument("--wait", type=int, default=20, help="Seconds to wait for indexer (default: 20)")
  256. parser.add_argument("--minutes", type=int, default=5, help="Sync lookback window in minutes (default: 5)")
  257. parser.add_argument("--limit", type=int, default=20, help="Max alerts to sync (default: 20)")
  258. parser.add_argument("--min-severity", default=None,
  259. help="Override min_severity for this run (default: use policy)")
  260. args = parser.parse_args()
  261. sys.exit(run(args))