Нет описания

seed-kpi-test-data.py 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. #!/usr/bin/env python3
  2. """
  3. seed-kpi-test-data.py — Create test IRIS alerts and cases for KPI dashboard UI testing.
  4. Creates a spread of records covering every KPI state:
  5. On Track | Watch | Warning | Urgent | Critical | Breached | Resolved
  6. Usage:
  7. python3 scripts/seed-kpi-test-data.py [--alerts-only] [--cases-only] [--dry-run]
  8. Env vars (override defaults):
  9. IRIS_BASE_URL default: https://localhost:8443
  10. IRIS_API_KEY required
  11. """
  12. import argparse
  13. import json
  14. import os
  15. import ssl
  16. import sys
  17. import urllib.request
  18. from datetime import datetime, timedelta, timezone
  19. from pathlib import Path
  20. # ---------------------------------------------------------------------------
  21. # Config
  22. # ---------------------------------------------------------------------------
  23. def _read_env_file(path: str, key: str) -> str:
  24. p = Path(path)
  25. if not p.exists():
  26. return ""
  27. for line in p.read_text().splitlines():
  28. if line.startswith(f"{key}="):
  29. return line[len(key) + 1:].strip()
  30. return ""
  31. BASE_DIR = Path(__file__).parent.parent
  32. ENV_FILE = BASE_DIR / "soc-integrator" / ".env"
  33. IRIS_BASE_URL = (
  34. os.environ.get("IRIS_BASE_URL")
  35. or _read_env_file(str(ENV_FILE), "IRIS_BASE_URL")
  36. or "https://localhost:8443"
  37. ).rstrip("/").replace("iriswebapp_nginx", "localhost")
  38. IRIS_API_KEY = (
  39. os.environ.get("IRIS_API_KEY")
  40. or _read_env_file(str(ENV_FILE), "IRIS_API_KEY")
  41. or ""
  42. )
  43. if not IRIS_API_KEY:
  44. sys.exit("error: IRIS_API_KEY not set. Export it or add it to soc-integrator/.env")
  45. # ---------------------------------------------------------------------------
  46. # HTTP helpers (no extra deps)
  47. # ---------------------------------------------------------------------------
  48. _ssl_ctx = ssl.create_default_context()
  49. _ssl_ctx.check_hostname = False
  50. _ssl_ctx.verify_mode = ssl.CERT_NONE
  51. def _req(method: str, path: str, body: dict | None = None) -> dict:
  52. url = f"{IRIS_BASE_URL}{path}"
  53. data = json.dumps(body).encode() if body else None
  54. headers = {
  55. "Authorization": f"Bearer {IRIS_API_KEY}",
  56. "Content-Type": "application/json",
  57. }
  58. req = urllib.request.Request(url, data=data, headers=headers, method=method)
  59. with urllib.request.urlopen(req, context=_ssl_ctx, timeout=15) as r:
  60. return json.loads(r.read())
  61. def get(path: str) -> dict:
  62. return _req("GET", path)
  63. def post(path: str, body: dict) -> dict:
  64. return _req("POST", path, body)
  65. def put(path: str, body: dict) -> dict:
  66. return _req("PUT", path, body)
  67. # ---------------------------------------------------------------------------
  68. # Lookup tables
  69. # ---------------------------------------------------------------------------
  70. def _get_severity_ids() -> dict[str, int]:
  71. """Return name→id map for alert severities."""
  72. data = get("/manage/severities/list")
  73. items = (data.get("data") or [])
  74. return {s["severity_name"].lower(): s["severity_id"] for s in items if "severity_name" in s}
  75. def _get_alert_status_ids() -> dict[str, int]:
  76. data = get("/manage/alert-status/list")
  77. items = data.get("data") or []
  78. return {s["status_name"].lower(): s["status_id"] for s in items if "status_name" in s}
  79. def _get_resolution_status_ids() -> dict[str, int]:
  80. try:
  81. data = get("/manage/alert-resolutions/list")
  82. items = data.get("data") or []
  83. return {s["resolution_status_name"].lower(): s["resolution_status_id"]
  84. for s in items if "resolution_status_name" in s}
  85. except Exception:
  86. return {}
  87. def _get_customer_id() -> int:
  88. try:
  89. data = get("/api/v2/customers")
  90. items = (data.get("data") or {}).get("customers") or []
  91. if items:
  92. return items[0].get("customer_id", 1)
  93. except Exception:
  94. pass
  95. return 1
  96. # ---------------------------------------------------------------------------
  97. # Alert scenarios
  98. # ---------------------------------------------------------------------------
  99. def _ts(offset_hours: float) -> str:
  100. """ISO timestamp offset_hours ago (UTC, naive — what IRIS expects)."""
  101. dt = datetime.now(timezone.utc) - timedelta(hours=offset_hours)
  102. return dt.strftime("%Y-%m-%dT%H:%M:%S")
  103. def _date(offset_hours: float) -> str:
  104. """Date string (YYYY-MM-DD) offset_hours ago — for case close_date."""
  105. dt = datetime.now(timezone.utc) - timedelta(hours=offset_hours)
  106. return dt.strftime("%Y-%m-%d")
  107. # Each tuple: (label, severity, created_hours_ago, resolved_hours_after_creation_or_None)
  108. # SLA: High=4h Medium=8h Low=24h
  109. ALERT_SCENARIOS = [
  110. # --- High severity (4h SLA) ---
  111. ("High / On Track (1h old)", "High", 1.0, None), # 75% remaining
  112. ("High / Watch (2.5h old)", "High", 2.5, None), # ~37% remaining → Watch
  113. ("High / Warning (3h old)", "High", 3.0, None), # 25% remaining
  114. ("High / Breached (6h old)", "High", 6.0, None), # 0%
  115. ("High / Resolved in SLA (2h)", "High", 4.0, 2.0), # resolved 2h after open → 50% KPI frozen
  116. ("High / Resolved breached (5h)", "High", 7.0, 5.0), # resolved after SLA breach → Resolved/0%
  117. # --- Medium severity (8h SLA) ---
  118. ("Medium / On Track (1h old)", "Medium", 1.0, None),
  119. ("Medium / Watch (3h old)", "Medium", 3.0, None),
  120. ("Medium / Warning (5h old)", "Medium", 5.0, None),
  121. ("Medium / Urgent (7h old)", "Medium", 7.0, None),
  122. ("Medium / Critical (7.8h old)", "Medium", 7.8, None),
  123. ("Medium / Breached (10h old)", "Medium", 10.0, None),
  124. ("Medium / Resolved in SLA (4h)", "Medium", 9.0, 4.0),
  125. # --- Low severity (24h SLA) ---
  126. ("Low / On Track (2h old)", "Low", 2.0, None),
  127. ("Low / Warning (14h old)", "Low", 14.0, None),
  128. ("Low / Breached (30h old)", "Low", 30.0, None),
  129. ("Low / Resolved in SLA (12h)", "Low", 25.0, 12.0),
  130. ]
  131. # Case scenarios: (label, tags, created_hours_ago, close_hours_after_creation_or_None)
  132. CASE_SCENARIOS = [
  133. ("High / Open On Track", "High,wazuh", 1.0, None),
  134. ("High / Open Watch", "High,brute-force", 2.5, None),
  135. ("High / Breached", "High,lateral-movement", 6.0, None),
  136. ("High / Resolved in SLA", "High,exfiltration", 5.0, 2.0),
  137. ("Medium / Open Watch", "Medium,wazuh", 3.0, None),
  138. ("Medium / Open Urgent", "Medium,phishing", 7.0, None),
  139. ("Medium / Breached", "Medium,ransomware", 12.0, None),
  140. ("Medium / Resolved", "Medium,malware", 10.0, 5.0),
  141. ("Low / On Track", "Low,wazuh", 2.0, None),
  142. ("Low / Warning", "Low,recon", 14.0, None),
  143. ("Low / Resolved in SLA", "Low,policy", 26.0, 10.0),
  144. ]
  145. # ---------------------------------------------------------------------------
  146. # Create alerts
  147. # ---------------------------------------------------------------------------
  148. def create_alerts(sev_ids: dict, status_ids: dict, res_ids: dict, customer_id: int, dry_run: bool):
  149. new_id = status_ids.get("new") or 2
  150. closed_id = status_ids.get("closed") or 6
  151. # Pick any "true positive" resolution, falling back to first available
  152. res_tp_id = (
  153. res_ids.get("true positive with impact")
  154. or res_ids.get("true positive without impact")
  155. or (list(res_ids.values())[0] if res_ids else 2)
  156. )
  157. print(f"\n=== Creating {len(ALERT_SCENARIOS)} alerts ===")
  158. for label, sev_name, created_h, resolved_h in ALERT_SCENARIOS:
  159. sev_id = sev_ids.get(sev_name.lower()) or sev_ids.get("medium") or 3
  160. created_ts = _ts(created_h)
  161. payload: dict = {
  162. "alert_title": f"[KPI Test] {label}",
  163. "alert_description": f"Seed data: {label}. Created {created_h}h ago.",
  164. "alert_severity_id": sev_id,
  165. "alert_status_id": new_id,
  166. "alert_customer_id": customer_id,
  167. "alert_source": "kpi-seed",
  168. "alert_source_ref": "seed-kpi-test-data",
  169. "alert_source_event_time": created_ts,
  170. "alert_creation_time": created_ts,
  171. }
  172. if resolved_h is not None:
  173. payload["alert_status_id"] = closed_id
  174. if res_tp_id:
  175. payload["alert_resolution_status_id"] = res_tp_id
  176. if dry_run:
  177. print(f" DRY-RUN {label}")
  178. continue
  179. try:
  180. resp = post("/alerts/add", payload)
  181. alert_data = resp.get("data") or {}
  182. aid = alert_data.get("alert_id", "?")
  183. print(f" created alert_id={aid} {label}")
  184. except Exception as exc:
  185. print(f" FAILED {label}: {exc}")
  186. # ---------------------------------------------------------------------------
  187. # Create cases
  188. # ---------------------------------------------------------------------------
  189. def create_cases(customer_id: int, dry_run: bool):
  190. print(f"\n=== Creating {len(CASE_SCENARIOS)} cases ===")
  191. for label, tags, created_h, close_h in CASE_SCENARIOS:
  192. open_date = _ts(created_h)
  193. # close_date: a date-only string (IRIS v2 close_date is a date, not datetime)
  194. close_date = _date(created_h - close_h) if close_h is not None else None
  195. payload: dict = {
  196. "case_name": f"[KPI Test] {label}",
  197. "case_description": f"Seed data: {label}. Opened {created_h}h ago.",
  198. "case_customer": customer_id,
  199. "case_tags": tags,
  200. "case_soc_id": "seed-kpi",
  201. }
  202. if dry_run:
  203. print(f" DRY-RUN {label}")
  204. continue
  205. try:
  206. resp = post("/api/v2/cases", payload)
  207. # v2 create returns the case object directly (no data wrapper)
  208. cid = resp.get("case_id") or (resp.get("data") or {}).get("case_id", "?")
  209. print(f" created case_id={cid} {label}")
  210. # Close the case if needed — IRIS v2: PUT /api/v2/cases/{id} with close_date
  211. if close_date and cid and cid != "?":
  212. try:
  213. put(f"/api/v2/cases/{cid}", {"close_date": close_date})
  214. print(f" └─ closed at {close_date}")
  215. except Exception as exc:
  216. print(f" └─ close failed: {exc}")
  217. except Exception as exc:
  218. print(f" FAILED {label}: {exc}")
  219. # ---------------------------------------------------------------------------
  220. # Main
  221. # ---------------------------------------------------------------------------
  222. def _backdate_alerts_via_db(scenarios: list, dry_run: bool):
  223. """Update alert_creation_time and modification_history in Postgres via docker exec."""
  224. import subprocess
  225. lines = []
  226. for label, sev, created_h, resolved_h in scenarios:
  227. title_sql = label.replace("'", "''")
  228. lines.append(
  229. f"UPDATE alerts SET alert_creation_time = NOW() - INTERVAL '{int(created_h * 60)} minutes' "
  230. f"WHERE alert_title = '[KPI Test] {title_sql}';"
  231. )
  232. if resolved_h is not None:
  233. elapsed_h = created_h - resolved_h # hours from now to resolution
  234. lines.append(
  235. f"WITH ts AS (SELECT EXTRACT(EPOCH FROM NOW() - INTERVAL '{int(elapsed_h * 60)} minutes') AS t) "
  236. f"UPDATE alerts SET modification_history = jsonb_build_object((SELECT t::text FROM ts), "
  237. f"'{{\"user\":\"seed\",\"action\":\"Alert resolved\"}}') "
  238. f"WHERE alert_title = '[KPI Test] {title_sql}';"
  239. )
  240. sql = "\n".join(lines)
  241. print("\n--- Backdating alert timestamps via docker exec ---")
  242. if dry_run:
  243. print(" DRY-RUN (SQL would be):")
  244. print(sql[:500] + "...")
  245. return
  246. result = subprocess.run(
  247. ["docker", "exec", "iriswebapp_db", "psql", "-U", "postgres", "-d", "iris_db", "-c", sql],
  248. capture_output=True, text=True,
  249. )
  250. if result.returncode != 0:
  251. print(f" WARN: backdate failed: {result.stderr[:300]}")
  252. else:
  253. print(" done.")
  254. def main():
  255. parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
  256. parser.add_argument("--alerts-only", action="store_true")
  257. parser.add_argument("--cases-only", action="store_true")
  258. parser.add_argument("--backdate", action="store_true",
  259. help="Update alert_creation_time in PostgreSQL via docker exec after creation")
  260. parser.add_argument("--dry-run", action="store_true", help="Print what would be created without calling IRIS")
  261. args = parser.parse_args()
  262. print(f"IRIS: {IRIS_BASE_URL}")
  263. if args.dry_run:
  264. print("DRY-RUN mode — no requests will be made\n")
  265. if not args.cases_only:
  266. create_alerts({}, {}, {}, 1, dry_run=True)
  267. if args.backdate:
  268. _backdate_alerts_via_db(ALERT_SCENARIOS, dry_run=True)
  269. if not args.alerts_only:
  270. create_cases(1, dry_run=True)
  271. return
  272. print("Fetching IRIS lookup tables...")
  273. try:
  274. sev_ids = _get_severity_ids()
  275. status_ids = _get_alert_status_ids()
  276. res_ids = _get_resolution_status_ids()
  277. customer_id = _get_customer_id()
  278. except Exception as exc:
  279. sys.exit(f"error: could not reach IRIS at {IRIS_BASE_URL}: {exc}")
  280. print(f" severities: {sev_ids}")
  281. print(f" alert statuses:{status_ids}")
  282. print(f" resolution: {res_ids}")
  283. print(f" customer_id: {customer_id}")
  284. if not args.cases_only:
  285. create_alerts(sev_ids, status_ids, res_ids, customer_id, dry_run=False)
  286. if args.backdate:
  287. _backdate_alerts_via_db(ALERT_SCENARIOS, dry_run=False)
  288. if not args.alerts_only:
  289. create_cases(customer_id, dry_run=False)
  290. print("\ndone.")
  291. if __name__ == "__main__":
  292. main()