No Description

mvp_service.py 26KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. from __future__ import annotations
  2. import hashlib
  3. import logging
  4. import re
  5. import time
  6. from datetime import datetime, timezone
  7. from typing import Any
  8. import httpx
  9. from app.adapters.iris import IrisAdapter
  10. from app.adapters.pagerduty import PagerDutyAdapter
  11. from app.adapters.shuffle import ShuffleAdapter
  12. from app.adapters.wazuh import WazuhAdapter
  13. from app.config import settings
  14. from app.repositories.mvp_repo import MvpRepository
  15. logger = logging.getLogger(__name__)
  16. class MvpService:
  17. def __init__(
  18. self,
  19. repo: MvpRepository,
  20. wazuh_adapter: WazuhAdapter,
  21. shuffle_adapter: ShuffleAdapter,
  22. iris_adapter: IrisAdapter,
  23. pagerduty_adapter: PagerDutyAdapter,
  24. ) -> None:
  25. self.repo = repo
  26. self.wazuh_adapter = wazuh_adapter
  27. self.shuffle_adapter = shuffle_adapter
  28. self.iris_adapter = iris_adapter
  29. self.pagerduty_adapter = pagerduty_adapter
  30. def _is_off_hours(self, ts: datetime) -> bool:
  31. hour = ts.astimezone(timezone.utc).hour
  32. return hour < 6 or hour >= 20
  33. def _safe_excerpt(self, payload: Any) -> str:
  34. text = str(payload)
  35. return text[:300]
  36. def _primary_subject(self, event: dict[str, Any]) -> str:
  37. asset = event.get("asset", {})
  38. return str(asset.get("user") or asset.get("hostname") or "unknown")
  39. def _primary_observable(self, event: dict[str, Any]) -> str:
  40. network = event.get("network", {})
  41. return str(network.get("domain") or network.get("src_ip") or network.get("dst_ip") or "unknown")
  42. def _incident_key(self, event: dict[str, Any]) -> str:
  43. ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00")).astimezone(timezone.utc)
  44. day_bucket = ts.strftime("%Y-%m-%d")
  45. raw = "|".join(
  46. [
  47. str(event.get("event_type", "generic")),
  48. self._primary_subject(event),
  49. self._primary_observable(event),
  50. day_bucket,
  51. ]
  52. )
  53. return hashlib.sha256(raw.encode("utf-8")).hexdigest()
  54. def _effective_severity(self, event: dict[str, Any], policy: dict[str, Any]) -> tuple[str, int, list[str]]:
  55. severity = str(event.get("severity", "medium")).lower()
  56. risk_context = event.get("risk_context", {})
  57. network = event.get("network", {})
  58. weights = policy.get("risk", {}).get("weights", {})
  59. score = 0
  60. factors: list[str] = []
  61. allowed_country = policy.get("vpn", {}).get("allowed_country", "TH")
  62. country = str(network.get("country", "")).upper()
  63. if country and country != allowed_country:
  64. score += int(weights.get("outside_thailand", 50))
  65. factors.append("outside_country")
  66. if risk_context.get("admin_account"):
  67. score += int(weights.get("admin", 20))
  68. factors.append("admin_account")
  69. ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
  70. if risk_context.get("off_hours") or self._is_off_hours(ts):
  71. score += int(weights.get("off_hours", 15))
  72. factors.append("off_hours")
  73. if risk_context.get("first_seen_country"):
  74. score += int(weights.get("first_seen_country", 15))
  75. factors.append("first_seen_country")
  76. thresholds = policy.get("risk", {}).get("thresholds", {})
  77. if score >= int(thresholds.get("high", 70)):
  78. severity = "high" if severity in {"low", "medium"} else severity
  79. elif score >= int(thresholds.get("medium", 40)) and severity == "low":
  80. severity = "medium"
  81. return severity, score, factors
  82. def _is_exception(self, event: dict[str, Any], policy: dict[str, Any]) -> bool:
  83. if event.get("event_type") != "vpn_geo_anomaly":
  84. return False
  85. asset = event.get("asset", {})
  86. user = str(asset.get("user", ""))
  87. allowed_users = set(policy.get("vpn", {}).get("exception_users", []))
  88. return user in allowed_users
  89. def _extract_iris_case_id(self, iris_response: dict[str, Any]) -> str | None:
  90. if "case_id" in iris_response:
  91. return str(iris_response.get("case_id"))
  92. data = iris_response.get("data")
  93. if isinstance(data, dict) and "case_id" in data:
  94. return str(data.get("case_id"))
  95. return None
  96. def _parse_kv_pairs(self, text: str) -> dict[str, str]:
  97. pattern = r"([A-Za-z0-9_]+)=('(?:[^']*)'|\"(?:[^\"]*)\"|[^\s]+)"
  98. out: dict[str, str] = {}
  99. for key, raw in re.findall(pattern, text):
  100. value = raw.strip().strip("'").strip('"')
  101. out[key] = value
  102. return out
  103. def _severity_from_rule_level(self, rule_level: Any) -> str:
  104. try:
  105. level = int(rule_level)
  106. except (TypeError, ValueError):
  107. return "medium"
  108. if level >= 12:
  109. return "critical"
  110. if level >= 8:
  111. return "high"
  112. if level >= 4:
  113. return "medium"
  114. return "low"
  115. def _event_type_from_text(self, text: str, parsed: dict[str, str]) -> str:
  116. explicit = str(parsed.get("event_type") or "").strip().lower()
  117. usecase_id = str(parsed.get("usecase_id") or "").strip().upper()
  118. section = str(parsed.get("section") or "").strip().upper()
  119. source = str(parsed.get("source") or "").strip().lower()
  120. success = str(parsed.get("success") or "").strip().lower()
  121. has_geo = bool(parsed.get("country") or parsed.get("src_lat") or parsed.get("src_lon"))
  122. has_user = bool(parsed.get("user"))
  123. has_src_ip = bool(parsed.get("src_ip") or parsed.get("srcip"))
  124. explicit_success_login = explicit in {
  125. "vpn_login_success",
  126. "windows_auth_success",
  127. "auth_success",
  128. }
  129. # Production-first C1 detection:
  130. # successful auth/login + geo context on vpn/windows identity streams.
  131. if (
  132. (source in {"vpn", "fortigate", "windows", "identity"} or "vpn" in source)
  133. and has_geo
  134. and has_user
  135. and has_src_ip
  136. and (success == "true" or explicit_success_login)
  137. ):
  138. return "c1_impossible_travel"
  139. # Legacy simulator markers remain supported as fallback.
  140. if usecase_id.startswith("C1") or section == "C1":
  141. return "c1_impossible_travel"
  142. if explicit in {"c1_impossible_travel", "impossible_travel"}:
  143. return "c1_impossible_travel"
  144. if explicit == "vpn_geo_anomaly":
  145. return "vpn_geo_anomaly"
  146. if explicit:
  147. return explicit
  148. lowered = text.lower()
  149. if "impossible travel" in lowered:
  150. return "c1_impossible_travel"
  151. if "vpn" in lowered and ("geo" in lowered or "country" in lowered):
  152. return "vpn_geo_anomaly"
  153. if "domain" in lowered or "dns" in lowered:
  154. return "ioc_dns"
  155. if "c2" in lowered or "ips" in lowered or "ip " in lowered:
  156. return "ioc_ips"
  157. if "auth" in lowered and "fail" in lowered:
  158. return "auth_anomaly"
  159. return "generic"
  160. def _normalize_wazuh_hit(self, hit: dict[str, Any]) -> dict[str, Any]:
  161. src = hit.get("_source", {})
  162. full_log = str(src.get("full_log", ""))
  163. parsed = self._parse_kv_pairs(full_log)
  164. event_id = str(parsed.get("event_id") or src.get("id") or hit.get("_id") or f"wazuh-{int(time.time())}")
  165. timestamp = (
  166. src.get("@timestamp")
  167. or src.get("timestamp")
  168. or datetime.now(timezone.utc).isoformat()
  169. )
  170. rule = src.get("rule", {}) if isinstance(src.get("rule"), dict) else {}
  171. rule_desc = str(rule.get("description") or "")
  172. event_type = self._event_type_from_text(full_log, parsed)
  173. severity = str(parsed.get("severity", "")).lower() or self._severity_from_rule_level(rule.get("level"))
  174. src_ip = parsed.get("src_ip")
  175. if not src_ip:
  176. src_ip = parsed.get("srcip")
  177. dst_ip = parsed.get("dst_ip")
  178. if not dst_ip:
  179. dst_ip = parsed.get("dstip")
  180. domain = parsed.get("query") or parsed.get("domain")
  181. country = parsed.get("country")
  182. user = parsed.get("user") or (src.get("agent", {}) or {}).get("name")
  183. dst_port = parsed.get("dst_port") or parsed.get("dstport")
  184. event_action = parsed.get("event_action") or parsed.get("action")
  185. title = rule_desc or f"Wazuh alert {rule.get('id', '')}".strip()
  186. description = full_log or rule_desc or "Wazuh alert"
  187. src_lat_raw = parsed.get("src_lat")
  188. src_lon_raw = parsed.get("src_lon")
  189. try:
  190. src_lat = float(src_lat_raw) if src_lat_raw not in {None, ""} else None
  191. except (TypeError, ValueError):
  192. src_lat = None
  193. try:
  194. src_lon = float(src_lon_raw) if src_lon_raw not in {None, ""} else None
  195. except (TypeError, ValueError):
  196. src_lon = None
  197. return {
  198. "source": "wazuh",
  199. "event_type": event_type,
  200. "event_id": event_id,
  201. "timestamp": timestamp,
  202. "severity": severity if severity in {"low", "medium", "high", "critical"} else "medium",
  203. "title": title,
  204. "description": description,
  205. "asset": {
  206. "user": user,
  207. "hostname": (src.get("agent", {}) or {}).get("name"),
  208. "agent_id": (src.get("agent", {}) or {}).get("id"),
  209. },
  210. "network": {
  211. "src_ip": src_ip,
  212. "dst_ip": dst_ip,
  213. "dst_host": parsed.get("dst_host") or parsed.get("host"),
  214. "dst_port": int(dst_port) if str(dst_port or "").isdigit() else None,
  215. "domain": domain,
  216. "country": country,
  217. "src_lat": src_lat,
  218. "src_lon": src_lon,
  219. },
  220. "tags": ["wazuh", event_type, f"rule_{rule.get('id', 'unknown')}"],
  221. "risk_context": {
  222. "outside_thailand": bool(country and str(country).upper() != "TH"),
  223. },
  224. "raw": src,
  225. "payload": {
  226. **parsed,
  227. "event_action": event_action,
  228. "event_id": parsed.get("event_id"),
  229. "event_type": event_type,
  230. "success": parsed.get("success"),
  231. "logon_type": parsed.get("logon_type"),
  232. "account_type": parsed.get("account_type"),
  233. "is_admin": parsed.get("is_admin"),
  234. "is_service": parsed.get("is_service"),
  235. },
  236. }
  237. def normalize_wazuh_hit(self, hit: dict[str, Any]) -> dict[str, Any]:
  238. return self._normalize_wazuh_hit(hit)
  239. def _to_float(self, value: Any, default: float = 0.0) -> float:
  240. try:
  241. return float(value)
  242. except (TypeError, ValueError):
  243. return default
  244. def _severity_from_confidence(self, confidence: float) -> str:
  245. if confidence >= 0.9:
  246. return "high"
  247. if confidence >= 0.7:
  248. return "medium"
  249. return "low"
  250. def _extract_shuffle_verdict(self, shuffle_result: dict[str, Any] | None) -> dict[str, Any]:
  251. if not isinstance(shuffle_result, dict):
  252. return {
  253. "matched": False,
  254. "confidence": 0.0,
  255. "severity": "low",
  256. "evidence": "",
  257. "iocs": [],
  258. "reason": "no_shuffle_result",
  259. }
  260. flat = dict(shuffle_result)
  261. nested = shuffle_result.get("result")
  262. if isinstance(nested, dict):
  263. merged = dict(nested)
  264. merged.update(flat)
  265. flat = merged
  266. confidence = self._to_float(flat.get("confidence"), 0.0)
  267. matched_raw = flat.get("matched")
  268. if isinstance(matched_raw, bool):
  269. matched = matched_raw
  270. reason = "shuffle_explicit"
  271. else:
  272. matched = confidence >= 0.7
  273. reason = "confidence_threshold_fallback"
  274. severity_raw = str(flat.get("severity", "")).lower()
  275. severity = severity_raw if severity_raw in {"low", "medium", "high", "critical"} else self._severity_from_confidence(confidence)
  276. return {
  277. "matched": matched,
  278. "confidence": confidence,
  279. "severity": severity,
  280. "evidence": str(flat.get("evidence", "")),
  281. "iocs": flat.get("iocs", []),
  282. "reason": reason,
  283. "raw": shuffle_result,
  284. }
  285. async def ingest_incident(self, event: dict[str, Any]) -> dict[str, Any]:
  286. policy = self.repo.get_policy()
  287. incident_key = self._incident_key(event)
  288. if self._is_exception(event, policy):
  289. decision_trace = {
  290. "incident_key": incident_key,
  291. "policy_exception": True,
  292. "reason": "vpn_exception_user",
  293. }
  294. self.repo.upsert_incident(incident_key, severity="low", status="ignored_exception", iris_case_id=None)
  295. self.repo.add_event(
  296. incident_key=incident_key,
  297. event_id=event.get("event_id"),
  298. source=event.get("source", "unknown"),
  299. event_type=event.get("event_type", "generic"),
  300. raw_payload=event,
  301. decision_trace=decision_trace,
  302. )
  303. return {
  304. "incident_key": incident_key,
  305. "action_taken": "ignored_exception",
  306. "escalation_stub_sent": False,
  307. "decision_trace": decision_trace,
  308. }
  309. effective_severity, risk_score, risk_factors = self._effective_severity(event, policy)
  310. current = self.repo.get_incident(incident_key)
  311. action_taken = "updated_case" if current else "created_case"
  312. iris_case_id = current.get("iris_case_id") if current else None
  313. if not iris_case_id:
  314. case_payload = {
  315. "case_name": event.get("title", "SOC Incident"),
  316. "case_description": event.get("description", "Generated by soc-integrator MVP"),
  317. "case_customer": event.get("payload", {}).get("case_customer", settings.iris_default_customer_id),
  318. "case_soc_id": event.get("payload", {}).get("case_soc_id", settings.iris_default_soc_id),
  319. }
  320. iris_result = await self.iris_adapter.create_case(case_payload)
  321. iris_case_id = self._extract_iris_case_id(iris_result)
  322. else:
  323. update_payload = {
  324. "case_description": f"{event.get('description', 'Updated by soc-integrator MVP')} [event_id={event.get('event_id', '')}]"
  325. }
  326. try:
  327. await self.iris_adapter.update_case(iris_case_id, update_payload)
  328. except Exception:
  329. # Keep pipeline progressing for MVP even if update path is unsupported.
  330. pass
  331. stored = self.repo.upsert_incident(
  332. incident_key=incident_key,
  333. severity=effective_severity,
  334. status="open",
  335. iris_case_id=iris_case_id,
  336. )
  337. decision_trace = {
  338. "incident_key": incident_key,
  339. "risk_score": risk_score,
  340. "risk_factors": risk_factors,
  341. "effective_severity": effective_severity,
  342. "action_taken": action_taken,
  343. }
  344. self.repo.add_event(
  345. incident_key=incident_key,
  346. event_id=event.get("event_id"),
  347. source=event.get("source", "unknown"),
  348. event_type=event.get("event_type", "generic"),
  349. raw_payload=event,
  350. decision_trace=decision_trace,
  351. )
  352. escalate_severities = set(policy.get("escalate_severities", ["high", "critical"]))
  353. escalation_stub_sent = False
  354. stub_response: dict[str, Any] | None = None
  355. if effective_severity in escalate_severities:
  356. escalation_payload = {
  357. "incident_key": incident_key,
  358. "title": event.get("title", "SOC Incident"),
  359. "severity": effective_severity,
  360. "source": event.get("source", "soc-integrator"),
  361. "iris_case_id": iris_case_id,
  362. "event_summary": event.get("description", ""),
  363. "timestamp": event.get("timestamp"),
  364. }
  365. try:
  366. pd_result = await self.pagerduty_adapter.create_incident(escalation_payload)
  367. escalation_stub_sent = True
  368. stub_response = {"ok": True, "data": pd_result}
  369. self.repo.add_escalation_audit(
  370. incident_key=incident_key,
  371. status_code=200,
  372. success=True,
  373. response_excerpt=self._safe_excerpt(pd_result),
  374. )
  375. except Exception as exc:
  376. stub_response = {"ok": False, "error": str(exc)}
  377. self.repo.add_escalation_audit(
  378. incident_key=incident_key,
  379. status_code=502,
  380. success=False,
  381. response_excerpt=self._safe_excerpt(exc),
  382. )
  383. return {
  384. "incident_key": stored["incident_key"],
  385. "action_taken": action_taken,
  386. "iris_case_id": stored.get("iris_case_id"),
  387. "escalation_stub_sent": escalation_stub_sent,
  388. "stub_response": stub_response,
  389. "decision_trace": decision_trace,
  390. }
  391. async def evaluate_ioc(self, payload: dict[str, Any]) -> dict[str, Any]:
  392. policy = self.repo.get_policy()
  393. workflow_id = str(policy.get("shuffle", {}).get("ioc_workflow_id", "")).strip()
  394. shuffle_result: dict[str, Any] | None = None
  395. if workflow_id:
  396. shuffle_result = await self.shuffle_adapter.trigger_workflow(workflow_id, payload)
  397. verdict = self._extract_shuffle_verdict(shuffle_result)
  398. matched = bool(verdict["matched"])
  399. confidence = self._to_float(verdict["confidence"], 0.0)
  400. logger.info(
  401. "ioc evaluation workflow_id=%s matched=%s confidence=%.2f",
  402. workflow_id or "<none>",
  403. matched,
  404. confidence,
  405. )
  406. if matched:
  407. src_event = payload.get("source_event", {})
  408. event_id = src_event.get("event_id") or f"ioc-{int(time.time())}"
  409. if not isinstance(event_id, str):
  410. event_id = str(event_id)
  411. description = f"IOC evaluation result confidence={confidence:.2f}"
  412. evidence = str(verdict.get("evidence", "")).strip()
  413. if evidence:
  414. description = f"{description} evidence={evidence[:180]}"
  415. event = {
  416. "source": "shuffle",
  417. "event_type": "ioc_dns" if payload.get("ioc_type") == "domain" else "ioc_ips",
  418. "event_id": event_id,
  419. "timestamp": datetime.now(timezone.utc).isoformat(),
  420. "severity": verdict["severity"],
  421. "title": f"IOC match: {payload.get('ioc_value', 'unknown')}",
  422. "description": description,
  423. "asset": src_event.get("asset", {}),
  424. "network": src_event.get("network", {}),
  425. "tags": ["ioc", str(payload.get("ioc_type", "unknown"))],
  426. "risk_context": {},
  427. "raw": {
  428. "payload": payload,
  429. "shuffle": verdict.get("raw"),
  430. },
  431. "payload": {},
  432. }
  433. ingest_result = await self.ingest_incident(event)
  434. else:
  435. ingest_result = {"action_taken": "rejected"}
  436. return {
  437. "matched": matched,
  438. "confidence": confidence,
  439. "severity": verdict["severity"],
  440. "evidence": verdict["evidence"],
  441. "iocs": verdict["iocs"],
  442. "decision_source": verdict["reason"],
  443. "shuffle": shuffle_result,
  444. "result": ingest_result,
  445. }
  446. async def evaluate_vpn(self, payload: dict[str, Any]) -> dict[str, Any]:
  447. if not payload.get("success", False):
  448. return {
  449. "risk_score": 0,
  450. "risk_factors": [],
  451. "exception_applied": False,
  452. "action_taken": "rejected",
  453. }
  454. event = {
  455. "source": "wazuh",
  456. "event_type": "vpn_geo_anomaly",
  457. "event_id": payload.get("event_id") or f"vpn-{int(time.time())}",
  458. "timestamp": payload.get("event_time") or datetime.now(timezone.utc).isoformat(),
  459. "severity": "high",
  460. "title": f"VPN login anomaly: {payload.get('user', 'unknown')}",
  461. "description": f"VPN success from {payload.get('country_code', 'unknown')} for user {payload.get('user', 'unknown')}",
  462. "asset": {"user": payload.get("user")},
  463. "network": {"src_ip": payload.get("src_ip"), "country": payload.get("country_code")},
  464. "tags": ["vpn", "geo-anomaly"],
  465. "risk_context": {
  466. "outside_thailand": payload.get("country_code", "").upper() != "TH",
  467. "admin_account": bool(payload.get("is_admin", False)),
  468. "off_hours": bool(payload.get("off_hours", False)),
  469. "first_seen_country": bool(payload.get("first_seen_country", False)),
  470. },
  471. "raw": payload,
  472. "payload": {},
  473. }
  474. ingest_result = await self.ingest_incident(event)
  475. decision_trace = ingest_result.get("decision_trace", {})
  476. return {
  477. "risk_score": decision_trace.get("risk_score", 0),
  478. "risk_factors": decision_trace.get("risk_factors", []),
  479. "exception_applied": ingest_result.get("action_taken") == "ignored_exception",
  480. "action_taken": ingest_result.get("action_taken"),
  481. "incident_key": ingest_result.get("incident_key"),
  482. "iris_case_id": ingest_result.get("iris_case_id"),
  483. "escalation_stub_sent": ingest_result.get("escalation_stub_sent", False),
  484. }
  485. async def sync_wazuh_alerts(
  486. self,
  487. query: str = "soc_mvp_test=true OR event_type:*",
  488. limit: int = 50,
  489. minutes: int = 120,
  490. ) -> dict[str, Any]:
  491. raw = await self.wazuh_adapter.search_alerts(query=query, limit=limit, minutes=minutes)
  492. hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
  493. processed = 0
  494. ingested = 0
  495. skipped_existing = 0
  496. failed = 0
  497. errors: list[str] = []
  498. created_incidents: list[str] = []
  499. ioc_evaluated = 0
  500. ioc_matched = 0
  501. ioc_rejected = 0
  502. for hit in hits:
  503. processed += 1
  504. event = self._normalize_wazuh_hit(hit)
  505. event_id = str(event.get("event_id", "")).strip()
  506. if event_id and self.repo.has_event("wazuh", event_id):
  507. skipped_existing += 1
  508. continue
  509. try:
  510. if event.get("event_type") in {"ioc_dns", "ioc_ips"}:
  511. ioc_evaluated += 1
  512. payload = {
  513. "ioc_type": "domain" if event.get("event_type") == "ioc_dns" else "ip",
  514. "ioc_value": (event.get("network", {}) or {}).get("domain")
  515. or (event.get("network", {}) or {}).get("dst_ip")
  516. or (event.get("network", {}) or {}).get("src_ip")
  517. or "unknown",
  518. "source_event": {
  519. "event_id": event.get("event_id"),
  520. "asset": event.get("asset", {}),
  521. "network": event.get("network", {}),
  522. "raw": event.get("raw", {}),
  523. },
  524. }
  525. ioc_result = await self.evaluate_ioc(payload)
  526. if ioc_result.get("matched"):
  527. ioc_matched += 1
  528. ingested += 1
  529. incident_key = str((ioc_result.get("result", {}) or {}).get("incident_key", ""))
  530. if incident_key:
  531. created_incidents.append(incident_key)
  532. else:
  533. ioc_rejected += 1
  534. else:
  535. result = await self.ingest_incident(event)
  536. ingested += 1
  537. incident_key = str(result.get("incident_key", ""))
  538. if incident_key:
  539. created_incidents.append(incident_key)
  540. except Exception as exc:
  541. failed += 1
  542. errors.append(f"{event_id or 'unknown_event'}: {exc}")
  543. return {
  544. "query": query,
  545. "window_minutes": minutes,
  546. "limit": limit,
  547. "processed": processed,
  548. "ingested": ingested,
  549. "skipped_existing": skipped_existing,
  550. "failed": failed,
  551. "ioc_evaluated": ioc_evaluated,
  552. "ioc_matched": ioc_matched,
  553. "ioc_rejected": ioc_rejected,
  554. "incident_keys": created_incidents,
  555. "errors": errors[:10],
  556. "total_hits": (raw.get("hits", {}).get("total", {}) if isinstance(raw, dict) else {}),
  557. }
  558. async def dependency_health(self) -> dict[str, Any]:
  559. out: dict[str, Any] = {}
  560. async def timed(name: str, fn):
  561. start = time.time()
  562. try:
  563. result = await fn()
  564. out[name] = {
  565. "status": "up",
  566. "latency_ms": round((time.time() - start) * 1000, 2),
  567. "details": result,
  568. }
  569. except Exception as exc:
  570. out[name] = {
  571. "status": "down",
  572. "latency_ms": round((time.time() - start) * 1000, 2),
  573. "error": str(exc),
  574. }
  575. await timed("wazuh", self.wazuh_adapter.auth_test)
  576. await timed("shuffle", self.shuffle_adapter.health)
  577. await timed("iris", self.iris_adapter.whoami)
  578. async def pagerduty_stub_health():
  579. async with httpx.AsyncClient(timeout=10.0) as client:
  580. r = await client.get(settings.pagerduty_base_url)
  581. r.raise_for_status()
  582. return {"status_code": r.status_code}
  583. await timed("pagerduty_stub", pagerduty_stub_health)
  584. return out