from __future__ import annotations from contextlib import contextmanager import psycopg from psycopg.rows import dict_row from app.config import settings def db_dsn() -> str: return ( f"host={settings.soc_integrator_db_host} " f"port={settings.soc_integrator_db_port} " f"dbname={settings.soc_integrator_db_name} " f"user={settings.soc_integrator_db_user} " f"password={settings.soc_integrator_db_password}" ) @contextmanager def get_conn(): with psycopg.connect(db_dsn(), row_factory=dict_row, autocommit=True) as conn: yield conn def init_schema() -> None: with get_conn() as conn, conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS incident_index ( incident_key TEXT PRIMARY KEY, iris_case_id TEXT, status TEXT NOT NULL, severity TEXT NOT NULL, first_seen TIMESTAMPTZ NOT NULL, last_seen TIMESTAMPTZ NOT NULL ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS incident_events ( id BIGSERIAL PRIMARY KEY, incident_key TEXT NOT NULL REFERENCES incident_index(incident_key) ON DELETE CASCADE, event_id TEXT, source TEXT NOT NULL, event_type TEXT NOT NULL, raw_payload JSONB NOT NULL, decision_trace JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS escalation_audit ( id BIGSERIAL PRIMARY KEY, incident_key TEXT NOT NULL REFERENCES incident_index(incident_key) ON DELETE CASCADE, attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), status_code INT, success BOOLEAN NOT NULL, response_excerpt TEXT ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS policy_config ( id INT PRIMARY KEY, data JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS ioc_trace ( id BIGSERIAL PRIMARY KEY, action TEXT NOT NULL, ioc_type TEXT NOT NULL, ioc_value TEXT NOT NULL, providers JSONB NOT NULL, request_payload JSONB NOT NULL, response_payload JSONB NOT NULL, matched BOOLEAN, severity TEXT, confidence DOUBLE PRECISION, error TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS correlation_state ( entity_key TEXT PRIMARY KEY, state JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS c_detection_events ( id BIGSERIAL PRIMARY KEY, usecase_id TEXT NOT NULL, entity TEXT NOT NULL, severity TEXT NOT NULL, evidence JSONB NOT NULL, event_ref JSONB NOT NULL, incident_key TEXT, matched_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_incident_events_incident_key_created_at ON incident_events(incident_key, created_at DESC);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_incident_events_source_event_id ON incident_events(source, event_id);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_escalation_audit_incident_key_attempted_at ON escalation_audit(incident_key, attempted_at DESC);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_ioc_trace_created_at ON ioc_trace(created_at DESC);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_ioc_trace_type_value ON ioc_trace(ioc_type, ioc_value);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_correlation_state_updated_at ON correlation_state(updated_at DESC);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_c_detection_events_usecase_matched_at ON c_detection_events(usecase_id, matched_at DESC);" ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_c_detection_events_entity_matched_at ON c_detection_events(entity, matched_at DESC);" )