| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- from __future__ import annotations
- from contextlib import contextmanager
- import psycopg
- from psycopg.rows import dict_row
- from app.config import settings
- def db_dsn() -> str:
- return (
- f"host={settings.soc_integrator_db_host} "
- f"port={settings.soc_integrator_db_port} "
- f"dbname={settings.soc_integrator_db_name} "
- f"user={settings.soc_integrator_db_user} "
- f"password={settings.soc_integrator_db_password}"
- )
- @contextmanager
- def get_conn():
- with psycopg.connect(db_dsn(), row_factory=dict_row, autocommit=True) as conn:
- yield conn
- def init_schema() -> None:
- with get_conn() as conn, conn.cursor() as cur:
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS incident_index (
- incident_key TEXT PRIMARY KEY,
- iris_case_id TEXT,
- status TEXT NOT NULL,
- severity TEXT NOT NULL,
- first_seen TIMESTAMPTZ NOT NULL,
- last_seen TIMESTAMPTZ NOT NULL
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS incident_events (
- id BIGSERIAL PRIMARY KEY,
- incident_key TEXT NOT NULL REFERENCES incident_index(incident_key) ON DELETE CASCADE,
- event_id TEXT,
- source TEXT NOT NULL,
- event_type TEXT NOT NULL,
- raw_payload JSONB NOT NULL,
- decision_trace JSONB NOT NULL,
- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS escalation_audit (
- id BIGSERIAL PRIMARY KEY,
- incident_key TEXT NOT NULL REFERENCES incident_index(incident_key) ON DELETE CASCADE,
- attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
- status_code INT,
- success BOOLEAN NOT NULL,
- response_excerpt TEXT
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS policy_config (
- id INT PRIMARY KEY,
- data JSONB NOT NULL,
- updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
- );
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS ioc_trace (
- id BIGSERIAL PRIMARY KEY,
- action TEXT NOT NULL,
- ioc_type TEXT NOT NULL,
- ioc_value TEXT NOT NULL,
- providers JSONB NOT NULL,
- request_payload JSONB NOT NULL,
- response_payload JSONB NOT NULL,
- matched BOOLEAN,
- severity TEXT,
- confidence DOUBLE PRECISION,
- error TEXT,
- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
- );
- """
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_incident_events_incident_key_created_at ON incident_events(incident_key, created_at DESC);"
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_incident_events_source_event_id ON incident_events(source, event_id);"
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_escalation_audit_incident_key_attempted_at ON escalation_audit(incident_key, attempted_at DESC);"
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_ioc_trace_created_at ON ioc_trace(created_at DESC);"
- )
- cur.execute(
- "CREATE INDEX IF NOT EXISTS idx_ioc_trace_type_value ON ioc_trace(ioc_type, ioc_value);"
- )
|