#!/usr/bin/env python3 """ seed-kpi-test-data.py — Create test IRIS alerts and cases for KPI dashboard UI testing. Creates a spread of records covering every KPI state: On Track | Watch | Warning | Urgent | Critical | Breached | Resolved Usage: python3 scripts/seed-kpi-test-data.py [--alerts-only] [--cases-only] [--dry-run] Env vars (override defaults): IRIS_BASE_URL default: https://localhost:8443 IRIS_API_KEY required """ import argparse import json import os import ssl import sys import urllib.request from datetime import datetime, timedelta, timezone from pathlib import Path # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- def _read_env_file(path: str, key: str) -> str: p = Path(path) if not p.exists(): return "" for line in p.read_text().splitlines(): if line.startswith(f"{key}="): return line[len(key) + 1:].strip() return "" BASE_DIR = Path(__file__).parent.parent ENV_FILE = BASE_DIR / "soc-integrator" / ".env" IRIS_BASE_URL = ( os.environ.get("IRIS_BASE_URL") or _read_env_file(str(ENV_FILE), "IRIS_BASE_URL") or "https://localhost:8443" ).rstrip("/").replace("iriswebapp_nginx", "localhost") IRIS_API_KEY = ( os.environ.get("IRIS_API_KEY") or _read_env_file(str(ENV_FILE), "IRIS_API_KEY") or "" ) if not IRIS_API_KEY: sys.exit("error: IRIS_API_KEY not set. Export it or add it to soc-integrator/.env") # --------------------------------------------------------------------------- # HTTP helpers (no extra deps) # --------------------------------------------------------------------------- _ssl_ctx = ssl.create_default_context() _ssl_ctx.check_hostname = False _ssl_ctx.verify_mode = ssl.CERT_NONE def _req(method: str, path: str, body: dict | None = None) -> dict: url = f"{IRIS_BASE_URL}{path}" data = json.dumps(body).encode() if body else None headers = { "Authorization": f"Bearer {IRIS_API_KEY}", "Content-Type": "application/json", } req = urllib.request.Request(url, data=data, headers=headers, method=method) with urllib.request.urlopen(req, context=_ssl_ctx, timeout=15) as r: return json.loads(r.read()) def get(path: str) -> dict: return _req("GET", path) def post(path: str, body: dict) -> dict: return _req("POST", path, body) def put(path: str, body: dict) -> dict: return _req("PUT", path, body) # --------------------------------------------------------------------------- # Lookup tables # --------------------------------------------------------------------------- def _get_severity_ids() -> dict[str, int]: """Return name→id map for alert severities.""" data = get("/manage/severities/list") items = (data.get("data") or []) return {s["severity_name"].lower(): s["severity_id"] for s in items if "severity_name" in s} def _get_alert_status_ids() -> dict[str, int]: data = get("/manage/alert-status/list") items = data.get("data") or [] return {s["status_name"].lower(): s["status_id"] for s in items if "status_name" in s} def _get_resolution_status_ids() -> dict[str, int]: try: data = get("/manage/alert-resolutions/list") items = data.get("data") or [] return {s["resolution_status_name"].lower(): s["resolution_status_id"] for s in items if "resolution_status_name" in s} except Exception: return {} def _get_customer_id() -> int: try: data = get("/api/v2/customers") items = (data.get("data") or {}).get("customers") or [] if items: return items[0].get("customer_id", 1) except Exception: pass return 1 # --------------------------------------------------------------------------- # Alert scenarios # --------------------------------------------------------------------------- def _ts(offset_hours: float) -> str: """ISO timestamp offset_hours ago (UTC, naive — what IRIS expects).""" dt = datetime.now(timezone.utc) - timedelta(hours=offset_hours) return dt.strftime("%Y-%m-%dT%H:%M:%S") def _date(offset_hours: float) -> str: """Date string (YYYY-MM-DD) offset_hours ago — for case close_date.""" dt = datetime.now(timezone.utc) - timedelta(hours=offset_hours) return dt.strftime("%Y-%m-%d") # Each tuple: (label, severity, created_hours_ago, resolved_hours_after_creation_or_None) # SLA: High=4h Medium=8h Low=24h ALERT_SCENARIOS = [ # --- High severity (4h SLA) --- ("High / On Track (1h old)", "High", 1.0, None), # 75% remaining ("High / Watch (2.5h old)", "High", 2.5, None), # ~37% remaining → Watch ("High / Warning (3h old)", "High", 3.0, None), # 25% remaining ("High / Breached (6h old)", "High", 6.0, None), # 0% ("High / Resolved in SLA (2h)", "High", 4.0, 2.0), # resolved 2h after open → 50% KPI frozen ("High / Resolved breached (5h)", "High", 7.0, 5.0), # resolved after SLA breach → Resolved/0% # --- Medium severity (8h SLA) --- ("Medium / On Track (1h old)", "Medium", 1.0, None), ("Medium / Watch (3h old)", "Medium", 3.0, None), ("Medium / Warning (5h old)", "Medium", 5.0, None), ("Medium / Urgent (7h old)", "Medium", 7.0, None), ("Medium / Critical (7.8h old)", "Medium", 7.8, None), ("Medium / Breached (10h old)", "Medium", 10.0, None), ("Medium / Resolved in SLA (4h)", "Medium", 9.0, 4.0), # --- Low severity (24h SLA) --- ("Low / On Track (2h old)", "Low", 2.0, None), ("Low / Warning (14h old)", "Low", 14.0, None), ("Low / Breached (30h old)", "Low", 30.0, None), ("Low / Resolved in SLA (12h)", "Low", 25.0, 12.0), ] # Case scenarios: (label, tags, created_hours_ago, close_hours_after_creation_or_None) CASE_SCENARIOS = [ ("High / Open On Track", "High,wazuh", 1.0, None), ("High / Open Watch", "High,brute-force", 2.5, None), ("High / Breached", "High,lateral-movement", 6.0, None), ("High / Resolved in SLA", "High,exfiltration", 5.0, 2.0), ("Medium / Open Watch", "Medium,wazuh", 3.0, None), ("Medium / Open Urgent", "Medium,phishing", 7.0, None), ("Medium / Breached", "Medium,ransomware", 12.0, None), ("Medium / Resolved", "Medium,malware", 10.0, 5.0), ("Low / On Track", "Low,wazuh", 2.0, None), ("Low / Warning", "Low,recon", 14.0, None), ("Low / Resolved in SLA", "Low,policy", 26.0, 10.0), ] # --------------------------------------------------------------------------- # Create alerts # --------------------------------------------------------------------------- def create_alerts(sev_ids: dict, status_ids: dict, res_ids: dict, customer_id: int, dry_run: bool): new_id = status_ids.get("new") or 2 closed_id = status_ids.get("closed") or 6 # Pick any "true positive" resolution, falling back to first available res_tp_id = ( res_ids.get("true positive with impact") or res_ids.get("true positive without impact") or (list(res_ids.values())[0] if res_ids else 2) ) print(f"\n=== Creating {len(ALERT_SCENARIOS)} alerts ===") for label, sev_name, created_h, resolved_h in ALERT_SCENARIOS: sev_id = sev_ids.get(sev_name.lower()) or sev_ids.get("medium") or 3 created_ts = _ts(created_h) payload: dict = { "alert_title": f"[KPI Test] {label}", "alert_description": f"Seed data: {label}. Created {created_h}h ago.", "alert_severity_id": sev_id, "alert_status_id": new_id, "alert_customer_id": customer_id, "alert_source": "kpi-seed", "alert_source_ref": "seed-kpi-test-data", "alert_source_event_time": created_ts, "alert_creation_time": created_ts, } if resolved_h is not None: payload["alert_status_id"] = closed_id if res_tp_id: payload["alert_resolution_status_id"] = res_tp_id if dry_run: print(f" DRY-RUN {label}") continue try: resp = post("/alerts/add", payload) alert_data = resp.get("data") or {} aid = alert_data.get("alert_id", "?") print(f" created alert_id={aid} {label}") except Exception as exc: print(f" FAILED {label}: {exc}") # --------------------------------------------------------------------------- # Create cases # --------------------------------------------------------------------------- def create_cases(customer_id: int, dry_run: bool): print(f"\n=== Creating {len(CASE_SCENARIOS)} cases ===") for label, tags, created_h, close_h in CASE_SCENARIOS: open_date = _ts(created_h) # close_date: a date-only string (IRIS v2 close_date is a date, not datetime) close_date = _date(created_h - close_h) if close_h is not None else None payload: dict = { "case_name": f"[KPI Test] {label}", "case_description": f"Seed data: {label}. Opened {created_h}h ago.", "case_customer": customer_id, "case_tags": tags, "case_soc_id": "seed-kpi", } if dry_run: print(f" DRY-RUN {label}") continue try: resp = post("/api/v2/cases", payload) # v2 create returns the case object directly (no data wrapper) cid = resp.get("case_id") or (resp.get("data") or {}).get("case_id", "?") print(f" created case_id={cid} {label}") # Close the case if needed — IRIS v2: PUT /api/v2/cases/{id} with close_date if close_date and cid and cid != "?": try: put(f"/api/v2/cases/{cid}", {"close_date": close_date}) print(f" └─ closed at {close_date}") except Exception as exc: print(f" └─ close failed: {exc}") except Exception as exc: print(f" FAILED {label}: {exc}") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def _backdate_alerts_via_db(scenarios: list, dry_run: bool): """Update alert_creation_time and modification_history in Postgres via docker exec.""" import subprocess lines = [] for label, sev, created_h, resolved_h in scenarios: title_sql = label.replace("'", "''") lines.append( f"UPDATE alerts SET alert_creation_time = NOW() - INTERVAL '{int(created_h * 60)} minutes' " f"WHERE alert_title = '[KPI Test] {title_sql}';" ) if resolved_h is not None: elapsed_h = created_h - resolved_h # hours from now to resolution lines.append( f"WITH ts AS (SELECT EXTRACT(EPOCH FROM NOW() - INTERVAL '{int(elapsed_h * 60)} minutes') AS t) " f"UPDATE alerts SET modification_history = jsonb_build_object((SELECT t::text FROM ts), " f"'{{\"user\":\"seed\",\"action\":\"Alert resolved\"}}') " f"WHERE alert_title = '[KPI Test] {title_sql}';" ) sql = "\n".join(lines) print("\n--- Backdating alert timestamps via docker exec ---") if dry_run: print(" DRY-RUN (SQL would be):") print(sql[:500] + "...") return result = subprocess.run( ["docker", "exec", "iriswebapp_db", "psql", "-U", "postgres", "-d", "iris_db", "-c", sql], capture_output=True, text=True, ) if result.returncode != 0: print(f" WARN: backdate failed: {result.stderr[:300]}") else: print(" done.") def main(): parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--alerts-only", action="store_true") parser.add_argument("--cases-only", action="store_true") parser.add_argument("--backdate", action="store_true", help="Update alert_creation_time in PostgreSQL via docker exec after creation") parser.add_argument("--dry-run", action="store_true", help="Print what would be created without calling IRIS") args = parser.parse_args() print(f"IRIS: {IRIS_BASE_URL}") if args.dry_run: print("DRY-RUN mode — no requests will be made\n") if not args.cases_only: create_alerts({}, {}, {}, 1, dry_run=True) if args.backdate: _backdate_alerts_via_db(ALERT_SCENARIOS, dry_run=True) if not args.alerts_only: create_cases(1, dry_run=True) return print("Fetching IRIS lookup tables...") try: sev_ids = _get_severity_ids() status_ids = _get_alert_status_ids() res_ids = _get_resolution_status_ids() customer_id = _get_customer_id() except Exception as exc: sys.exit(f"error: could not reach IRIS at {IRIS_BASE_URL}: {exc}") print(f" severities: {sev_ids}") print(f" alert statuses:{status_ids}") print(f" resolution: {res_ids}") print(f" customer_id: {customer_id}") if not args.cases_only: create_alerts(sev_ids, status_ids, res_ids, customer_id, dry_run=False) if args.backdate: _backdate_alerts_via_db(ALERT_SCENARIOS, dry_run=False) if not args.alerts_only: create_cases(customer_id, dry_run=False) print("\ndone.") if __name__ == "__main__": main()