| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- from __future__ import annotations
- from contextlib import contextmanager
- import psycopg
- from psycopg.rows import dict_row
- from app.config import settings
- def db_dsn() -> str:
- return (
- f"host={settings.soc_integrator_db_host} "
- f"port={settings.soc_integrator_db_port} "
- f"dbname={settings.soc_integrator_db_name} "
- f"user={settings.soc_integrator_db_user} "
- f"password={settings.soc_integrator_db_password}"
- )
- @contextmanager
- def get_conn():
- with psycopg.connect(db_dsn(), row_factory=dict_row, autocommit=True) as conn:
- yield conn
- def init_schema() -> None:
- with get_conn() as conn, conn.cursor() as cur:
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS incident_index (
- incident_key TEXT PRIMARY KEY,
- iris_case_id TEXT,
- status TEXT NOT NULL,
- severity TEXT NOT NULL,
- first_seen TIMESTAMPTZ NOT NULL,
- last_seen TIMESTAMPTZ NOT NULL
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS incident_events (
- id BIGSERIAL PRIMARY KEY,
- incident_key TEXT NOT NULL REFERENCES incident_index(incident_key) ON DELETE CASCADE,
- event_id TEXT,
- source TEXT NOT NULL,
- event_type TEXT NOT NULL,
- raw_payload JSONB NOT NULL,
- decision_trace JSONB NOT NULL,
- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS escalation_audit (
- id BIGSERIAL PRIMARY KEY,
- incident_key TEXT NOT NULL REFERENCES incident_index(incident_key) ON DELETE CASCADE,
- attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
- status_code INT,
- success BOOLEAN NOT NULL,
- response_excerpt TEXT
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS policy_config (
- id INT PRIMARY KEY,
- data JSONB NOT NULL,
- updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
- );
- """
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_incident_events_incident_key_created_at ON incident_events(incident_key, created_at DESC);"
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_incident_events_source_event_id ON incident_events(source, event_id);"
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_escalation_audit_incident_key_attempted_at ON escalation_audit(incident_key, attempted_at DESC);"
- )
|