from __future__ import annotations from datetime import datetime, timezone from typing import Any from psycopg.types.json import Json from app.db import get_conn def utc_now() -> datetime: return datetime.now(timezone.utc) DEFAULT_POLICY: dict[str, Any] = { "escalate_severities": ["high", "critical"], "vpn": { "allowed_country": "TH", "exception_users": [], }, "risk": { "weights": { "outside_thailand": 50, "admin": 20, "off_hours": 15, "first_seen_country": 15, }, "thresholds": { "high": 70, "medium": 40, }, }, "shuffle": { "ioc_workflow_id": "", }, } class MvpRepository: def has_event(self, source: str, event_id: str) -> bool: with get_conn() as conn, conn.cursor() as cur: cur.execute( "SELECT 1 FROM incident_events WHERE source = %s AND event_id = %s LIMIT 1", (source, event_id), ) return cur.fetchone() is not None def ensure_policy(self) -> None: with get_conn() as conn, conn.cursor() as cur: cur.execute("SELECT id FROM policy_config WHERE id = 1") found = cur.fetchone() if not found: cur.execute( "INSERT INTO policy_config(id, data) VALUES (1, %s)", (Json(DEFAULT_POLICY),), ) def get_policy(self) -> dict[str, Any]: self.ensure_policy() with get_conn() as conn, conn.cursor() as cur: cur.execute("SELECT data FROM policy_config WHERE id = 1") row = cur.fetchone() return dict(row["data"]) if row else dict(DEFAULT_POLICY) def update_policy(self, data: dict[str, Any]) -> dict[str, Any]: with get_conn() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO policy_config(id, data, updated_at) VALUES (1, %s, NOW()) ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data, updated_at = NOW() RETURNING data """, (Json(data),), ) row = cur.fetchone() return dict(row["data"]) def get_incident(self, incident_key: str) -> dict[str, Any] | None: with get_conn() as conn, conn.cursor() as cur: cur.execute( "SELECT incident_key, iris_case_id, status, severity, first_seen, last_seen FROM incident_index WHERE incident_key = %s", (incident_key,), ) row = cur.fetchone() return dict(row) if row else None def upsert_incident( self, incident_key: str, severity: str, status: str, iris_case_id: str | None, ) -> dict[str, Any]: now = utc_now() with get_conn() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO incident_index(incident_key, iris_case_id, status, severity, first_seen, last_seen) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (incident_key) DO UPDATE SET iris_case_id = COALESCE(EXCLUDED.iris_case_id, incident_index.iris_case_id), status = EXCLUDED.status, severity = EXCLUDED.severity, last_seen = EXCLUDED.last_seen RETURNING incident_key, iris_case_id, status, severity, first_seen, last_seen """, (incident_key, iris_case_id, status, severity, now, now), ) return dict(cur.fetchone()) def add_event( self, incident_key: str, event_id: str | None, source: str, event_type: str, raw_payload: dict[str, Any], decision_trace: dict[str, Any], ) -> None: with get_conn() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO incident_events(incident_key, event_id, source, event_type, raw_payload, decision_trace) VALUES (%s, %s, %s, %s, %s, %s) """, ( incident_key, event_id, source, event_type, Json(raw_payload), Json(decision_trace), ), ) def add_escalation_audit( self, incident_key: str, status_code: int | None, success: bool, response_excerpt: str | None, ) -> None: with get_conn() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO escalation_audit(incident_key, status_code, success, response_excerpt) VALUES (%s, %s, %s, %s) """, (incident_key, status_code, success, response_excerpt), )