| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- #!/usr/bin/env python3
- """
- seed-kpi-test-data.py — Create test IRIS alerts and cases for KPI dashboard UI testing.
- Creates a spread of records covering every KPI state:
- On Track | Watch | Warning | Urgent | Critical | Breached | Resolved
- Usage:
- python3 scripts/seed-kpi-test-data.py [--alerts-only] [--cases-only] [--dry-run]
- Env vars (override defaults):
- IRIS_BASE_URL default: https://localhost:8443
- IRIS_API_KEY required
- """
- import argparse
- import json
- import os
- import ssl
- import sys
- import urllib.request
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- # ---------------------------------------------------------------------------
- # Config
- # ---------------------------------------------------------------------------
- def _read_env_file(path: str, key: str) -> str:
- p = Path(path)
- if not p.exists():
- return ""
- for line in p.read_text().splitlines():
- if line.startswith(f"{key}="):
- return line[len(key) + 1:].strip()
- return ""
- BASE_DIR = Path(__file__).parent.parent
- ENV_FILE = BASE_DIR / "soc-integrator" / ".env"
- IRIS_BASE_URL = (
- os.environ.get("IRIS_BASE_URL")
- or _read_env_file(str(ENV_FILE), "IRIS_BASE_URL")
- or "https://localhost:8443"
- ).rstrip("/").replace("iriswebapp_nginx", "localhost")
- IRIS_API_KEY = (
- os.environ.get("IRIS_API_KEY")
- or _read_env_file(str(ENV_FILE), "IRIS_API_KEY")
- or ""
- )
- if not IRIS_API_KEY:
- sys.exit("error: IRIS_API_KEY not set. Export it or add it to soc-integrator/.env")
- # ---------------------------------------------------------------------------
- # HTTP helpers (no extra deps)
- # ---------------------------------------------------------------------------
- _ssl_ctx = ssl.create_default_context()
- _ssl_ctx.check_hostname = False
- _ssl_ctx.verify_mode = ssl.CERT_NONE
- def _req(method: str, path: str, body: dict | None = None) -> dict:
- url = f"{IRIS_BASE_URL}{path}"
- data = json.dumps(body).encode() if body else None
- headers = {
- "Authorization": f"Bearer {IRIS_API_KEY}",
- "Content-Type": "application/json",
- }
- req = urllib.request.Request(url, data=data, headers=headers, method=method)
- with urllib.request.urlopen(req, context=_ssl_ctx, timeout=15) as r:
- return json.loads(r.read())
- def get(path: str) -> dict:
- return _req("GET", path)
- def post(path: str, body: dict) -> dict:
- return _req("POST", path, body)
- def put(path: str, body: dict) -> dict:
- return _req("PUT", path, body)
- # ---------------------------------------------------------------------------
- # Lookup tables
- # ---------------------------------------------------------------------------
- def _get_severity_ids() -> dict[str, int]:
- """Return name→id map for alert severities."""
- data = get("/manage/severities/list")
- items = (data.get("data") or [])
- return {s["severity_name"].lower(): s["severity_id"] for s in items if "severity_name" in s}
- def _get_alert_status_ids() -> dict[str, int]:
- data = get("/manage/alert-status/list")
- items = data.get("data") or []
- return {s["status_name"].lower(): s["status_id"] for s in items if "status_name" in s}
- def _get_resolution_status_ids() -> dict[str, int]:
- try:
- data = get("/manage/alert-resolutions/list")
- items = data.get("data") or []
- return {s["resolution_status_name"].lower(): s["resolution_status_id"]
- for s in items if "resolution_status_name" in s}
- except Exception:
- return {}
- def _get_customer_id() -> int:
- try:
- data = get("/api/v2/customers")
- items = (data.get("data") or {}).get("customers") or []
- if items:
- return items[0].get("customer_id", 1)
- except Exception:
- pass
- return 1
- # ---------------------------------------------------------------------------
- # Alert scenarios
- # ---------------------------------------------------------------------------
- def _ts(offset_hours: float) -> str:
- """ISO timestamp offset_hours ago (UTC, naive — what IRIS expects)."""
- dt = datetime.now(timezone.utc) - timedelta(hours=offset_hours)
- return dt.strftime("%Y-%m-%dT%H:%M:%S")
- def _date(offset_hours: float) -> str:
- """Date string (YYYY-MM-DD) offset_hours ago — for case close_date."""
- dt = datetime.now(timezone.utc) - timedelta(hours=offset_hours)
- return dt.strftime("%Y-%m-%d")
- # Each tuple: (label, severity, created_hours_ago, resolved_hours_after_creation_or_None)
- # SLA: High=4h Medium=8h Low=24h
- ALERT_SCENARIOS = [
- # --- High severity (4h SLA) ---
- ("High / On Track (1h old)", "High", 1.0, None), # 75% remaining
- ("High / Watch (2.5h old)", "High", 2.5, None), # ~37% remaining → Watch
- ("High / Warning (3h old)", "High", 3.0, None), # 25% remaining
- ("High / Breached (6h old)", "High", 6.0, None), # 0%
- ("High / Resolved in SLA (2h)", "High", 4.0, 2.0), # resolved 2h after open → 50% KPI frozen
- ("High / Resolved breached (5h)", "High", 7.0, 5.0), # resolved after SLA breach → Resolved/0%
- # --- Medium severity (8h SLA) ---
- ("Medium / On Track (1h old)", "Medium", 1.0, None),
- ("Medium / Watch (3h old)", "Medium", 3.0, None),
- ("Medium / Warning (5h old)", "Medium", 5.0, None),
- ("Medium / Urgent (7h old)", "Medium", 7.0, None),
- ("Medium / Critical (7.8h old)", "Medium", 7.8, None),
- ("Medium / Breached (10h old)", "Medium", 10.0, None),
- ("Medium / Resolved in SLA (4h)", "Medium", 9.0, 4.0),
- # --- Low severity (24h SLA) ---
- ("Low / On Track (2h old)", "Low", 2.0, None),
- ("Low / Warning (14h old)", "Low", 14.0, None),
- ("Low / Breached (30h old)", "Low", 30.0, None),
- ("Low / Resolved in SLA (12h)", "Low", 25.0, 12.0),
- ]
- # Case scenarios: (label, tags, created_hours_ago, close_hours_after_creation_or_None)
- CASE_SCENARIOS = [
- ("High / Open On Track", "High,wazuh", 1.0, None),
- ("High / Open Watch", "High,brute-force", 2.5, None),
- ("High / Breached", "High,lateral-movement", 6.0, None),
- ("High / Resolved in SLA", "High,exfiltration", 5.0, 2.0),
- ("Medium / Open Watch", "Medium,wazuh", 3.0, None),
- ("Medium / Open Urgent", "Medium,phishing", 7.0, None),
- ("Medium / Breached", "Medium,ransomware", 12.0, None),
- ("Medium / Resolved", "Medium,malware", 10.0, 5.0),
- ("Low / On Track", "Low,wazuh", 2.0, None),
- ("Low / Warning", "Low,recon", 14.0, None),
- ("Low / Resolved in SLA", "Low,policy", 26.0, 10.0),
- ]
- # ---------------------------------------------------------------------------
- # Create alerts
- # ---------------------------------------------------------------------------
- def create_alerts(sev_ids: dict, status_ids: dict, res_ids: dict, customer_id: int, dry_run: bool):
- new_id = status_ids.get("new") or 2
- closed_id = status_ids.get("closed") or 6
- # Pick any "true positive" resolution, falling back to first available
- res_tp_id = (
- res_ids.get("true positive with impact")
- or res_ids.get("true positive without impact")
- or (list(res_ids.values())[0] if res_ids else 2)
- )
- print(f"\n=== Creating {len(ALERT_SCENARIOS)} alerts ===")
- for label, sev_name, created_h, resolved_h in ALERT_SCENARIOS:
- sev_id = sev_ids.get(sev_name.lower()) or sev_ids.get("medium") or 3
- created_ts = _ts(created_h)
- payload: dict = {
- "alert_title": f"[KPI Test] {label}",
- "alert_description": f"Seed data: {label}. Created {created_h}h ago.",
- "alert_severity_id": sev_id,
- "alert_status_id": new_id,
- "alert_customer_id": customer_id,
- "alert_source": "kpi-seed",
- "alert_source_ref": "seed-kpi-test-data",
- "alert_source_event_time": created_ts,
- "alert_creation_time": created_ts,
- }
- if resolved_h is not None:
- payload["alert_status_id"] = closed_id
- if res_tp_id:
- payload["alert_resolution_status_id"] = res_tp_id
- if dry_run:
- print(f" DRY-RUN {label}")
- continue
- try:
- resp = post("/alerts/add", payload)
- alert_data = resp.get("data") or {}
- aid = alert_data.get("alert_id", "?")
- print(f" created alert_id={aid} {label}")
- except Exception as exc:
- print(f" FAILED {label}: {exc}")
- # ---------------------------------------------------------------------------
- # Create cases
- # ---------------------------------------------------------------------------
- def create_cases(customer_id: int, dry_run: bool):
- print(f"\n=== Creating {len(CASE_SCENARIOS)} cases ===")
- for label, tags, created_h, close_h in CASE_SCENARIOS:
- open_date = _ts(created_h)
- # close_date: a date-only string (IRIS v2 close_date is a date, not datetime)
- close_date = _date(created_h - close_h) if close_h is not None else None
- payload: dict = {
- "case_name": f"[KPI Test] {label}",
- "case_description": f"Seed data: {label}. Opened {created_h}h ago.",
- "case_customer": customer_id,
- "case_tags": tags,
- "case_soc_id": "seed-kpi",
- }
- if dry_run:
- print(f" DRY-RUN {label}")
- continue
- try:
- resp = post("/api/v2/cases", payload)
- # v2 create returns the case object directly (no data wrapper)
- cid = resp.get("case_id") or (resp.get("data") or {}).get("case_id", "?")
- print(f" created case_id={cid} {label}")
- # Close the case if needed — IRIS v2: PUT /api/v2/cases/{id} with close_date
- if close_date and cid and cid != "?":
- try:
- put(f"/api/v2/cases/{cid}", {"close_date": close_date})
- print(f" └─ closed at {close_date}")
- except Exception as exc:
- print(f" └─ close failed: {exc}")
- except Exception as exc:
- print(f" FAILED {label}: {exc}")
- # ---------------------------------------------------------------------------
- # Main
- # ---------------------------------------------------------------------------
- def _backdate_alerts_via_db(scenarios: list, dry_run: bool):
- """Update alert_creation_time and modification_history in Postgres via docker exec."""
- import subprocess
- lines = []
- for label, sev, created_h, resolved_h in scenarios:
- title_sql = label.replace("'", "''")
- lines.append(
- f"UPDATE alerts SET alert_creation_time = NOW() - INTERVAL '{int(created_h * 60)} minutes' "
- f"WHERE alert_title = '[KPI Test] {title_sql}';"
- )
- if resolved_h is not None:
- elapsed_h = created_h - resolved_h # hours from now to resolution
- lines.append(
- f"WITH ts AS (SELECT EXTRACT(EPOCH FROM NOW() - INTERVAL '{int(elapsed_h * 60)} minutes') AS t) "
- f"UPDATE alerts SET modification_history = jsonb_build_object((SELECT t::text FROM ts), "
- f"'{{\"user\":\"seed\",\"action\":\"Alert resolved\"}}') "
- f"WHERE alert_title = '[KPI Test] {title_sql}';"
- )
- sql = "\n".join(lines)
- print("\n--- Backdating alert timestamps via docker exec ---")
- if dry_run:
- print(" DRY-RUN (SQL would be):")
- print(sql[:500] + "...")
- return
- result = subprocess.run(
- ["docker", "exec", "iriswebapp_db", "psql", "-U", "postgres", "-d", "iris_db", "-c", sql],
- capture_output=True, text=True,
- )
- if result.returncode != 0:
- print(f" WARN: backdate failed: {result.stderr[:300]}")
- else:
- print(" done.")
- def main():
- parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument("--alerts-only", action="store_true")
- parser.add_argument("--cases-only", action="store_true")
- parser.add_argument("--backdate", action="store_true",
- help="Update alert_creation_time in PostgreSQL via docker exec after creation")
- parser.add_argument("--dry-run", action="store_true", help="Print what would be created without calling IRIS")
- args = parser.parse_args()
- print(f"IRIS: {IRIS_BASE_URL}")
- if args.dry_run:
- print("DRY-RUN mode — no requests will be made\n")
- if not args.cases_only:
- create_alerts({}, {}, {}, 1, dry_run=True)
- if args.backdate:
- _backdate_alerts_via_db(ALERT_SCENARIOS, dry_run=True)
- if not args.alerts_only:
- create_cases(1, dry_run=True)
- return
- print("Fetching IRIS lookup tables...")
- try:
- sev_ids = _get_severity_ids()
- status_ids = _get_alert_status_ids()
- res_ids = _get_resolution_status_ids()
- customer_id = _get_customer_id()
- except Exception as exc:
- sys.exit(f"error: could not reach IRIS at {IRIS_BASE_URL}: {exc}")
- print(f" severities: {sev_ids}")
- print(f" alert statuses:{status_ids}")
- print(f" resolution: {res_ids}")
- print(f" customer_id: {customer_id}")
- if not args.cases_only:
- create_alerts(sev_ids, status_ids, res_ids, customer_id, dry_run=False)
- if args.backdate:
- _backdate_alerts_via_db(ALERT_SCENARIOS, dry_run=False)
- if not args.alerts_only:
- create_cases(customer_id, dry_run=False)
- print("\ndone.")
- if __name__ == "__main__":
- main()
|