Просмотр исходного кода

feat: Wazuh→IRIS alert sync with severity filter

- soc-integrator creates IRIS Alerts (not Cases) from Wazuh indexer hits
  via ingest_wazuh_alert_to_iris() in mvp_service.py
- Severity filter: only alerts at/above min_severity reach IRIS
  (default: medium; persisted in policy_config table)
- GET /wazuh/sync-policy — read current threshold
- PUT /wazuh/sync-policy — update threshold at runtime (no restart needed)
- POST /wazuh/sync-to-mvp — new min_severity query param for per-run override
- GET /wazuh/auto-sync/status — now includes min_severity from policy
- Sync result includes skipped_filtered and min_severity_applied counters
- Add scripts/test-wazuh-iris-sync.py: 7-step end-to-end pipeline test
- Update README.md and scripts/README.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
tum 1 день назад
Родитель
Сommit
3be7c0d801

+ 56 - 5
README.md

@@ -135,14 +135,54 @@ GET  /mvp/health/dependencies
135 135
 Protected endpoints require header: `X-Internal-API-Key`
136 136
 Key from: `SOC_INTEGRATOR_INTERNAL_KEY` in `soc-integrator/.env`
137 137
 
138
+### Wazuh → IRIS alert sync
139
+
140
+The sync pipeline fetches alerts from the Wazuh indexer and creates **IRIS Alerts** (not cases)
141
+for each one. Dedup prevents re-processing the same alert across runs.
142
+
143
+```
144
+GET  /wazuh/sync-policy           Read current sync filter policy
145
+PUT  /wazuh/sync-policy           Update sync filter policy  (auth required)
146
+POST /wazuh/sync-to-mvp           Trigger manual sync
147
+GET  /wazuh/auto-sync/status      Show worker state and active settings
148
+```
149
+
150
+**Severity filter** — only alerts at or above `min_severity` are forwarded to IRIS:
151
+
152
+```bash
153
+# Read current threshold (default: medium)
154
+curl http://localhost:8088/wazuh/sync-policy
155
+
156
+# Raise threshold — only high + critical reach IRIS
157
+curl -X PUT http://localhost:8088/wazuh/sync-policy \
158
+  -H 'X-Internal-API-Key: dev-internal-key' \
159
+  -H 'Content-Type: application/json' \
160
+  -d '{"min_severity": "high"}'
161
+
162
+# Manual sync — override threshold for this run only
163
+curl -X POST "http://localhost:8088/wazuh/sync-to-mvp?limit=50&minutes=120&min_severity=low" \
164
+  -H 'X-Internal-API-Key: dev-internal-key'
165
+```
166
+
167
+Severity scale: `informational < low < medium < high < critical`
168
+
169
+Sync response fields:
170
+
171
+| Field | Meaning |
172
+|---|---|
173
+| `min_severity_applied` | Threshold used for this run |
174
+| `processed` | Wazuh alerts examined |
175
+| `skipped_existing` | Already synced (dedup) |
176
+| `skipped_filtered` | Below severity threshold |
177
+| `ingested` | New IRIS Alerts created |
178
+| `iris_alert_ids` | IDs of created IRIS Alerts |
179
+
138 180
 ### Other endpoints
139 181
 
140 182
 ```
141 183
 GET  /health
142 184
 GET  /wazuh/alerts
143 185
 GET  /wazuh/agents
144
-POST /wazuh/sync-to-mvp
145
-GET  /wazuh/auto-sync/status
146 186
 POST /ingest/wazuh-alert
147 187
 GET  /ioc/enrich
148 188
 POST /ioc/evaluate
@@ -205,7 +245,7 @@ python3 scripts/test-firewall-syslog.py --scenario rdp --via-docker
205 245
 The `--via-docker` flag sends from inside the container to preserve the firewall source IP
206 246
 through Docker NAT. Source IP must be in the `allowed-ips` list in `wazuh_manager.conf`.
207 247
 
208
-### Sync Wazuh alerts into MVP pipeline
248
+### Sync Wazuh alerts into IRIS
209 249
 
210 250
 ```bash
211 251
 curl -X POST "http://localhost:8088/wazuh/sync-to-mvp?limit=50&minutes=120&q=*" \
@@ -214,10 +254,21 @@ curl -X POST "http://localhost:8088/wazuh/sync-to-mvp?limit=50&minutes=120&q=*"
214 254
 
215 255
 Notes:
216 256
 
217
-- Reads from `wazuh-alerts-*` in Wazuh indexer.
218
-- Re-running is safe — dedupe applied by `source + event_id`.
257
+- Reads from `wazuh-alerts-*` in Wazuh indexer and creates IRIS Alerts.
258
+- Re-running is safe — dedup applied by `source + event_id`.
259
+- Only alerts at or above `min_severity` (policy default: `medium`) are forwarded.
219 260
 - Wazuh must fire rules before alerts appear (check `archives.log` first).
220 261
 
262
+### End-to-end pipeline test
263
+
264
+```bash
265
+python3 scripts/test-wazuh-iris-sync.py                        # full: send → wait → sync → verify
266
+python3 scripts/test-wazuh-iris-sync.py --no-send --minutes 60 # sync only
267
+python3 scripts/test-wazuh-iris-sync.py --min-severity critical # test filter
268
+```
269
+
270
+See `scripts/README.md` for full argument reference.
271
+
221 272
 ### Enable automatic sync worker
222 273
 
223 274
 ```bash

+ 34 - 0
scripts/README.md

@@ -93,6 +93,40 @@ scripts/import-wazuh-dashboard.sh scripts/events/wazuh-fortigate-sim-dashboard.n
93 93
 scripts/import-wazuh-dashboard.sh scripts/events/wazuh-proposal-custom-rules-dashboard.ndjson
94 94
 ```
95 95
 
96
+## Wazuh → soc-integrator → IRIS end-to-end test
97
+
98
+End-to-end pipeline test: sends a test event to Wazuh, waits for indexing, triggers the sync,
99
+and verifies that an IRIS Alert was created with `source=wazuh`.
100
+
101
+```bash
102
+python3 scripts/test-wazuh-iris-sync.py
103
+python3 scripts/test-wazuh-iris-sync.py --no-send --minutes 60   # sync only, no new events
104
+python3 scripts/test-wazuh-iris-sync.py --min-severity critical  # test filter behaviour
105
+python3 scripts/test-wazuh-iris-sync.py --scenario ips_critical  # use a specific scenario
106
+```
107
+
108
+Arguments:
109
+
110
+- `--no-send` — skip sending test events (useful to verify an already-running pipeline)
111
+- `--scenario` — firewall scenario to send (default `rdp`); same options as `test-firewall-syslog.py`
112
+- `--wait` — seconds to wait for Wazuh indexer (default `20`)
113
+- `--minutes` — sync lookback window in minutes (default `5`)
114
+- `--limit` — max alerts to sync per run (default `20`)
115
+- `--min-severity` — override `min_severity` for this run without changing the policy
116
+
117
+Steps run:
118
+
119
+| Step | Check |
120
+|---|---|
121
+| 0 | soc-integrator health |
122
+| 1 | Read current sync policy (`min_severity`) |
123
+| 2 | Send test syslog event to Wazuh |
124
+| 3 | Wait for Wazuh indexer |
125
+| 4 | Snapshot latest IRIS alert ID |
126
+| 5 | Run sync, show all counters |
127
+| 6 | Verify new IRIS Alerts with `source=wazuh` |
128
+| 7 | Show auto-sync worker state |
129
+
96 130
 ## KPI test data seeder
97 131
 
98 132
 Create IRIS alerts and cases covering every KPI state for UI testing.

+ 306 - 0
scripts/test-wazuh-iris-sync.py

@@ -0,0 +1,306 @@
1
+#!/usr/bin/env python3
2
+"""
3
+test-wazuh-iris-sync.py — End-to-end test: Wazuh → soc-integrator → IRIS alert sync.
4
+
5
+Steps:
6
+  1. Send test syslog events to Wazuh (optional, skip with --no-send)
7
+  2. Wait for Wazuh indexer to index them
8
+  3. Call POST /wazuh/sync-to-mvp
9
+  4. Verify IRIS alerts were created with source="wazuh"
10
+  5. Print a pass/fail summary
11
+
12
+Usage:
13
+  python3 scripts/test-wazuh-iris-sync.py
14
+  python3 scripts/test-wazuh-iris-sync.py --no-send          # skip sending, just sync
15
+  python3 scripts/test-wazuh-iris-sync.py --min-severity high
16
+  python3 scripts/test-wazuh-iris-sync.py --minutes 60       # widen search window
17
+
18
+Env vars (override defaults):
19
+  INTEGRATOR_URL        default: http://localhost:8088
20
+  INTEGRATOR_API_KEY    default: dev-internal-key
21
+  IRIS_URL              default: https://localhost:8443
22
+  IRIS_API_KEY          required for IRIS verification (or set in soc-integrator/.env)
23
+"""
24
+from __future__ import annotations
25
+
26
+import argparse
27
+import json
28
+import os
29
+import ssl
30
+import subprocess
31
+import sys
32
+import time
33
+import urllib.request
34
+from pathlib import Path
35
+
36
+
37
+# ---------------------------------------------------------------------------
38
+# Config
39
+# ---------------------------------------------------------------------------
40
+
41
+INTEGRATOR_URL = os.environ.get("INTEGRATOR_URL", "http://localhost:8088")
42
+INTEGRATOR_KEY = os.environ.get("INTEGRATOR_API_KEY", "dev-internal-key")
43
+IRIS_URL = os.environ.get("IRIS_URL", "https://localhost:8443")
44
+
45
+# Try to read IRIS_API_KEY from env, then from soc-integrator/.env
46
+def _read_iris_key() -> str:
47
+    if k := os.environ.get("IRIS_API_KEY"):
48
+        return k
49
+    env_file = Path(__file__).parent.parent / "soc-integrator" / ".env"
50
+    if env_file.exists():
51
+        for line in env_file.read_text().splitlines():
52
+            if line.startswith("IRIS_API_KEY="):
53
+                return line.split("=", 1)[1].strip()
54
+    return ""
55
+
56
+IRIS_KEY = _read_iris_key()
57
+
58
+SSL_CTX = ssl.create_default_context()
59
+SSL_CTX.check_hostname = False
60
+SSL_CTX.verify_mode = ssl.CERT_NONE
61
+
62
+PASS = "\033[32m✓\033[0m"
63
+FAIL = "\033[31m✗\033[0m"
64
+INFO = "\033[36m·\033[0m"
65
+
66
+
67
+# ---------------------------------------------------------------------------
68
+# Helpers
69
+# ---------------------------------------------------------------------------
70
+
71
+def _get(url: str, headers: dict | None = None) -> dict:
72
+    req = urllib.request.Request(url, headers=headers or {})
73
+    with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
74
+        return json.loads(r.read())
75
+
76
+
77
+def _post(url: str, data: dict | None = None, headers: dict | None = None) -> dict:
78
+    body = json.dumps(data or {}).encode() if data else b""
79
+    h = {"Content-Type": "application/json", **(headers or {})}
80
+    req = urllib.request.Request(url, data=body, headers=h, method="POST")
81
+    with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as r:
82
+        return json.loads(r.read())
83
+
84
+
85
+def _put(url: str, data: dict, headers: dict | None = None) -> dict:
86
+    body = json.dumps(data).encode()
87
+    h = {"Content-Type": "application/json", **(headers or {})}
88
+    req = urllib.request.Request(url, data=body, headers=h, method="PUT")
89
+    with urllib.request.urlopen(req, context=SSL_CTX, timeout=15) as r:
90
+        return json.loads(r.read())
91
+
92
+
93
+def _integrator(path: str, method: str = "GET", data: dict | None = None, params: str = "") -> dict:
94
+    url = f"{INTEGRATOR_URL}{path}"
95
+    if params:
96
+        url += ("&" if "?" in url else "?") + params
97
+    headers = {"X-Internal-API-Key": INTEGRATOR_KEY}
98
+    if method == "POST":
99
+        return _post(url, data, headers)
100
+    if method == "PUT":
101
+        return _put(url, data or {}, headers)
102
+    return _get(url, headers)
103
+
104
+
105
+def _iris_alerts(page: int = 1, per_page: int = 20) -> list[dict]:
106
+    url = f"{INTEGRATOR_URL}/iris/alerts?page={page}&per_page={per_page}&sort_by=alert_id&sort_dir=desc"
107
+    data = _get(url)
108
+    return (data.get("data") or {}).get("alerts", {}).get("data", [])
109
+
110
+
111
+def step(n: int, label: str) -> None:
112
+    print(f"\n\033[1mStep {n}: {label}\033[0m")
113
+
114
+
115
+def ok(msg: str) -> None:
116
+    print(f"  {PASS}  {msg}")
117
+
118
+
119
+def fail(msg: str) -> None:
120
+    print(f"  {FAIL}  {msg}")
121
+
122
+
123
+def info(msg: str) -> None:
124
+    print(f"  {INFO}  {msg}")
125
+
126
+
127
+# ---------------------------------------------------------------------------
128
+# Main test
129
+# ---------------------------------------------------------------------------
130
+
131
+def run(args: argparse.Namespace) -> int:
132
+    errors = 0
133
+
134
+    # ------------------------------------------------------------------
135
+    # Step 0: Health check
136
+    # ------------------------------------------------------------------
137
+    step(0, "Health check")
138
+    try:
139
+        h = _get(f"{INTEGRATOR_URL}/health")
140
+        if h.get("ok"):
141
+            ok(f"soc-integrator reachable at {INTEGRATOR_URL}")
142
+        else:
143
+            fail(f"soc-integrator unhealthy: {h}")
144
+            errors += 1
145
+    except Exception as exc:
146
+        fail(f"Cannot reach soc-integrator: {exc}")
147
+        return 1
148
+
149
+    # ------------------------------------------------------------------
150
+    # Step 1: Read current sync policy
151
+    # ------------------------------------------------------------------
152
+    step(1, "Read sync policy")
153
+    try:
154
+        policy_resp = _integrator("/wazuh/sync-policy")
155
+        current_min = policy_resp["data"]["sync"]["min_severity"]
156
+        ok(f"Current min_severity = {current_min!r}")
157
+    except Exception as exc:
158
+        fail(f"Could not read sync policy: {exc}")
159
+        errors += 1
160
+        current_min = "medium"
161
+
162
+    # ------------------------------------------------------------------
163
+    # Step 2: Optionally send test events to Wazuh
164
+    # ------------------------------------------------------------------
165
+    step(2, "Send test events to Wazuh")
166
+    if args.no_send:
167
+        info("Skipped (--no-send)")
168
+    else:
169
+        script = Path(__file__).parent / "test-firewall-syslog.py"
170
+        if not script.exists():
171
+            info("test-firewall-syslog.py not found — skipping send")
172
+        else:
173
+            try:
174
+                result = subprocess.run(
175
+                    [sys.executable, str(script), "--via-docker", "--scenario", args.scenario],
176
+                    capture_output=True, text=True, timeout=30,
177
+                )
178
+                sent = result.stdout.count("✓")
179
+                if sent:
180
+                    ok(f"Sent {sent} test event(s) (scenario={args.scenario})")
181
+                else:
182
+                    fail(f"No events sent\n{result.stdout}\n{result.stderr}")
183
+                    errors += 1
184
+            except Exception as exc:
185
+                fail(f"Failed to send test events: {exc}")
186
+                errors += 1
187
+
188
+    # ------------------------------------------------------------------
189
+    # Step 3: Wait for Wazuh indexer
190
+    # ------------------------------------------------------------------
191
+    step(3, f"Wait {args.wait}s for Wazuh indexer")
192
+    if args.no_send:
193
+        info("Skipped (--no-send)")
194
+    else:
195
+        for i in range(args.wait, 0, -5):
196
+            print(f"  {INFO}  {i}s remaining...", end="\r", flush=True)
197
+            time.sleep(min(5, i))
198
+        print()
199
+        ok("Done")
200
+
201
+    # ------------------------------------------------------------------
202
+    # Step 4: Snapshot IRIS alert count before sync
203
+    # ------------------------------------------------------------------
204
+    step(4, "Snapshot IRIS alert count before sync")
205
+    try:
206
+        alerts_before = _iris_alerts(per_page=5)
207
+        max_id_before = max((a["alert_id"] for a in alerts_before), default=0)
208
+        ok(f"Latest IRIS alert_id before sync: {max_id_before}")
209
+    except Exception as exc:
210
+        info(f"Could not read IRIS alerts (verification will be skipped): {exc}")
211
+        max_id_before = 0
212
+
213
+    # ------------------------------------------------------------------
214
+    # Step 5: Run sync
215
+    # ------------------------------------------------------------------
216
+    step(5, "Run sync")
217
+    min_sev = args.min_severity or current_min
218
+    params = f"limit={args.limit}&minutes={args.minutes}&q=*&min_severity={min_sev}"
219
+    try:
220
+        resp = _integrator("/wazuh/sync-to-mvp", method="POST", params=params)
221
+        s = resp["data"]["sync"]
222
+        info(f"min_severity_applied : {s['min_severity_applied']}")
223
+        info(f"processed            : {s['processed']}")
224
+        info(f"skipped_existing     : {s['skipped_existing']}")
225
+        info(f"skipped_filtered     : {s.get('skipped_filtered', 0)}")
226
+        info(f"ingested             : {s['ingested']}")
227
+        info(f"iris_alert_ids       : {s['iris_alert_ids']}")
228
+        if s.get("errors"):
229
+            fail(f"Sync errors: {s['errors']}")
230
+            errors += 1
231
+        else:
232
+            ok("Sync completed without errors")
233
+    except Exception as exc:
234
+        fail(f"Sync request failed: {exc}")
235
+        return errors + 1
236
+
237
+    # ------------------------------------------------------------------
238
+    # Step 6: Verify new IRIS alerts
239
+    # ------------------------------------------------------------------
240
+    step(6, "Verify IRIS alerts")
241
+    if not s["iris_alert_ids"]:
242
+        if s["ingested"] == 0 and s["skipped_existing"] == s["processed"]:
243
+            ok("All alerts already synced (no duplicates created) — dedup working")
244
+        elif s["skipped_filtered"] > 0 and s["ingested"] == 0:
245
+            ok(f"All new alerts filtered by min_severity={min_sev} — filter working")
246
+        else:
247
+            info("No new IRIS alerts created this run")
248
+    else:
249
+        try:
250
+            alerts_after = _iris_alerts(per_page=10)
251
+            new_alerts = [a for a in alerts_after if a["alert_id"] > max_id_before and a.get("alert_source") == "wazuh"]
252
+            if new_alerts:
253
+                ok(f"Found {len(new_alerts)} new IRIS alert(s) with source=wazuh:")
254
+                for a in new_alerts:
255
+                    print(f"       alert_id={a['alert_id']}  ref={a.get('alert_source_ref','')}  title={a.get('alert_title','')[:55]}")
256
+            else:
257
+                fail(f"iris_alert_ids={s['iris_alert_ids']} but no matching IRIS alerts found")
258
+                errors += 1
259
+        except Exception as exc:
260
+            fail(f"Could not verify IRIS alerts: {exc}")
261
+            errors += 1
262
+
263
+    # ------------------------------------------------------------------
264
+    # Step 7: Auto-sync status
265
+    # ------------------------------------------------------------------
266
+    step(7, "Auto-sync worker status")
267
+    try:
268
+        st = _integrator("/wazuh/auto-sync/status")["data"]
269
+        info(f"enabled      : {st['enabled']}")
270
+        info(f"task_running : {st['task_running']}")
271
+        info(f"min_severity : {st['settings']['min_severity']}")
272
+        if st.get("state", {}).get("last_status"):
273
+            info(f"last_status  : {st['state']['last_status']}")
274
+        ok("Auto-sync status retrieved")
275
+    except Exception as exc:
276
+        fail(f"Could not read auto-sync status: {exc}")
277
+        errors += 1
278
+
279
+    # ------------------------------------------------------------------
280
+    # Summary
281
+    # ------------------------------------------------------------------
282
+    print()
283
+    print("─" * 60)
284
+    if errors == 0:
285
+        print(f"  {PASS}  All checks passed")
286
+    else:
287
+        print(f"  {FAIL}  {errors} check(s) failed")
288
+    print("─" * 60)
289
+    return errors
290
+
291
+
292
+# ---------------------------------------------------------------------------
293
+# CLI
294
+# ---------------------------------------------------------------------------
295
+
296
+if __name__ == "__main__":
297
+    parser = argparse.ArgumentParser(description="End-to-end test: Wazuh → soc-integrator → IRIS")
298
+    parser.add_argument("--no-send", action="store_true", help="Skip sending test events to Wazuh")
299
+    parser.add_argument("--scenario", default="rdp", help="Firewall scenario to send (default: rdp)")
300
+    parser.add_argument("--wait", type=int, default=20, help="Seconds to wait for indexer (default: 20)")
301
+    parser.add_argument("--minutes", type=int, default=5, help="Sync lookback window in minutes (default: 5)")
302
+    parser.add_argument("--limit", type=int, default=20, help="Max alerts to sync (default: 20)")
303
+    parser.add_argument("--min-severity", default=None,
304
+                        help="Override min_severity for this run (default: use policy)")
305
+    args = parser.parse_args()
306
+    sys.exit(run(args))

+ 34 - 3
soc-integrator/app/main.py

@@ -38,6 +38,7 @@ from app.models import (
38 38
     SimLogRunRequest,
39 39
     ShuffleLoginRequest,
40 40
     ShuffleProxyRequest,
41
+    SyncPolicyRequest,
41 42
     TriggerShuffleRequest,
42 43
     WazuhIngestRequest,
43 44
 )
@@ -1600,15 +1601,18 @@ async def wazuh_manager_logs(
1600 1601
     response_model=ApiResponse,
1601 1602
     dependencies=[Depends(require_internal_api_key)],
1602 1603
     summary="Sync Wazuh to MVP",
1603
-    description="Fetch Wazuh alerts from indexer and pass them through MVP ingest/evaluation logic.",
1604
+    description="Fetch Wazuh alerts from indexer and create IRIS Alerts for each. Returns iris_alert_ids of created alerts.",
1604 1605
 )
1605 1606
 async def wazuh_sync_to_mvp(
1606 1607
     limit: int = 50,
1607 1608
     minutes: int = 120,
1608 1609
     q: str = "soc_mvp_test=true OR event_type:*",
1610
+    min_severity: str | None = None,
1609 1611
 ) -> ApiResponse:
1610 1612
     try:
1611
-        result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
1613
+        result = await mvp_service.sync_wazuh_alerts(
1614
+            query=q, limit=limit, minutes=minutes, min_severity=min_severity
1615
+        )
1612 1616
     except Exception as exc:
1613 1617
         raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
1614 1618
     return ApiResponse(data={"sync": result})
@@ -1618,11 +1622,12 @@ async def wazuh_sync_to_mvp(
1618 1622
     "/wazuh/auto-sync/status",
1619 1623
     response_model=ApiResponse,
1620 1624
     summary="Wazuh auto-sync status",
1621
-    description="Show auto-sync enablement, settings, task runtime state, and last sync result.",
1625
+    description="Show auto-sync enablement, settings, task runtime state, and last sync result (iris_alert_ids of created IRIS Alerts).",
1622 1626
 )
1623 1627
 async def wazuh_auto_sync_status() -> ApiResponse:
1624 1628
     state = getattr(app.state, "wazuh_auto_sync_state", {})
1625 1629
     task = getattr(app.state, "wazuh_auto_sync_task", None)
1630
+    policy = mvp_service.repo.get_policy()
1626 1631
     return ApiResponse(
1627 1632
         data={
1628 1633
             "enabled": settings.wazuh_auto_sync_enabled,
@@ -1632,6 +1637,7 @@ async def wazuh_auto_sync_status() -> ApiResponse:
1632 1637
                 "limit": settings.wazuh_auto_sync_limit,
1633 1638
                 "minutes": settings.wazuh_auto_sync_minutes,
1634 1639
                 "query": settings.wazuh_auto_sync_query,
1640
+                "min_severity": policy.get("sync", {}).get("min_severity", "medium"),
1635 1641
             },
1636 1642
             "state": state,
1637 1643
         }
@@ -1639,6 +1645,31 @@ async def wazuh_auto_sync_status() -> ApiResponse:
1639 1645
 
1640 1646
 
1641 1647
 @app.get(
1648
+    "/wazuh/sync-policy",
1649
+    response_model=ApiResponse,
1650
+    summary="Get Wazuh sync policy",
1651
+    description="Return the current sync policy (min_severity threshold for IRIS alert creation).",
1652
+)
1653
+async def get_sync_policy() -> ApiResponse:
1654
+    policy = mvp_service.repo.get_policy()
1655
+    return ApiResponse(data={"sync": policy.get("sync", {"min_severity": "medium"})})
1656
+
1657
+
1658
+@app.put(
1659
+    "/wazuh/sync-policy",
1660
+    response_model=ApiResponse,
1661
+    dependencies=[Depends(require_internal_api_key)],
1662
+    summary="Update Wazuh sync policy",
1663
+    description="Set the minimum severity level for forwarding Wazuh alerts to IRIS. Persists immediately; auto-sync picks it up on the next cycle.",
1664
+)
1665
+async def put_sync_policy(body: SyncPolicyRequest) -> ApiResponse:
1666
+    policy = mvp_service.repo.get_policy()
1667
+    policy.setdefault("sync", {})["min_severity"] = body.min_severity
1668
+    mvp_service.repo.update_policy(policy)
1669
+    return ApiResponse(data={"sync": policy["sync"]})
1670
+
1671
+
1672
+@app.get(
1642 1673
     "/monitor/db/tables",
1643 1674
     response_model=ApiResponse,
1644 1675
     dependencies=[Depends(require_internal_api_key)],

+ 7 - 0
soc-integrator/app/models.py

@@ -381,6 +381,13 @@ class IrisAlertCreateRequest(BaseModel):
381 381
     payload: dict[str, Any] = Field(default_factory=dict, description="Additional IRIS alert fields merged into the request.")
382 382
 
383 383
 
384
+class SyncPolicyRequest(BaseModel):
385
+    min_severity: Literal["informational", "low", "medium", "high", "critical"] = Field(
386
+        description='Minimum severity to forward to IRIS. Alerts below this level are skipped.',
387
+        examples=["high"],
388
+    )
389
+
390
+
384 391
 class ApiResponse(BaseModel):
385 392
     ok: bool = True
386 393
     message: str = "ok"

+ 3 - 0
soc-integrator/app/repositories/mvp_repo.py

@@ -14,6 +14,9 @@ def utc_now() -> datetime:
14 14
 
15 15
 DEFAULT_POLICY: dict[str, Any] = {
16 16
     "escalate_severities": ["high", "critical"],
17
+    "sync": {
18
+        "min_severity": "medium",  # "informational" | "low" | "medium" | "high" | "critical"
19
+    },
17 20
     "vpn": {
18 21
         "allowed_country": "TH",
19 22
         "exception_users": [],

+ 79 - 5
soc-integrator/app/services/mvp_service.py

@@ -1,6 +1,7 @@
1 1
 from __future__ import annotations
2 2
 
3 3
 import hashlib
4
+import json
4 5
 import logging
5 6
 import re
6 7
 import time
@@ -18,6 +19,22 @@ from app.repositories.mvp_repo import MvpRepository
18 19
 
19 20
 logger = logging.getLogger(__name__)
20 21
 
22
+_IRIS_SEVERITY_ID: dict[str, int] = {
23
+    "critical": 5,
24
+    "high": 4,
25
+    "medium": 3,
26
+    "low": 2,
27
+    "informational": 1,
28
+}
29
+
30
+_SEVERITY_ORDER: dict[str, int] = {
31
+    "informational": 0,
32
+    "low": 1,
33
+    "medium": 2,
34
+    "high": 3,
35
+    "critical": 4,
36
+}
37
+
21 38
 
22 39
 class MvpService:
23 40
     def __init__(
@@ -546,18 +563,68 @@ class MvpService:
546 563
             "escalation_stub_sent": ingest_result.get("escalation_stub_sent", False),
547 564
         }
548 565
 
566
+    async def ingest_wazuh_alert_to_iris(self, event: dict[str, Any]) -> dict[str, Any]:
567
+        """Create an IRIS Alert from a normalised Wazuh event and record it for dedup."""
568
+        event_id = str(event.get("event_id", "")).strip()
569
+        severity_str = (event.get("severity") or "medium").lower()
570
+        severity_id = _IRIS_SEVERITY_ID.get(severity_str, 3)
571
+
572
+        payload: dict[str, Any] = {
573
+            "alert_title": event.get("title") or f"Wazuh alert {event_id}",
574
+            "alert_description": event.get("description") or "",
575
+            "alert_severity_id": severity_id,
576
+            "alert_status_id": 1,  # Unassigned
577
+            "alert_source": "wazuh",
578
+            "alert_source_ref": event_id,
579
+            "alert_source_event_time": event.get("timestamp") or datetime.now(timezone.utc).isoformat(),
580
+            "alert_customer_id": settings.iris_default_customer_id or 1,
581
+            "alert_note": json.dumps({
582
+                "asset": event.get("asset", {}),
583
+                "network": event.get("network", {}),
584
+                "tags": event.get("tags", []),
585
+            }),
586
+        }
587
+        result = await self.iris_adapter.create_alert(payload)
588
+        iris_alert_id = (result.get("data") or {}).get("alert_id")
589
+
590
+        if event_id:
591
+            synthetic_key = f"wazuh_alert_{event_id}"
592
+            self.repo.upsert_incident(
593
+                incident_key=synthetic_key,
594
+                severity=event.get("severity") or "medium",
595
+                status="open",
596
+                iris_case_id=str(iris_alert_id) if iris_alert_id else None,
597
+            )
598
+            self.repo.add_event(
599
+                incident_key=synthetic_key,
600
+                event_id=event_id,
601
+                source="wazuh",
602
+                event_type=event.get("event_type") or "wazuh",
603
+                raw_payload=event,
604
+                decision_trace={"iris_alert_id": iris_alert_id, "action": "created_iris_alert"},
605
+            )
606
+
607
+        return {"iris_alert_id": iris_alert_id, "event_id": event_id}
608
+
549 609
     async def sync_wazuh_alerts(
550 610
         self,
551 611
         query: str = "soc_mvp_test=true OR event_type:*",
552 612
         limit: int = 50,
553 613
         minutes: int = 120,
614
+        min_severity: str | None = None,
554 615
     ) -> dict[str, Any]:
555 616
         raw = await self.wazuh_adapter.search_alerts(query=query, limit=limit, minutes=minutes)
556 617
         hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
557 618
 
619
+        # Resolve minimum severity: param > policy > default "medium"
620
+        policy = self.repo.get_policy()
621
+        effective_min = (min_severity or policy.get("sync", {}).get("min_severity", "medium")).lower()
622
+        min_order = _SEVERITY_ORDER.get(effective_min, 2)
623
+
558 624
         processed = 0
559 625
         ingested = 0
560 626
         skipped_existing = 0
627
+        skipped_filtered = 0
561 628
         failed = 0
562 629
         errors: list[str] = []
563 630
         created_incidents: list[str] = []
@@ -572,6 +639,11 @@ class MvpService:
572 639
             if event_id and self.repo.has_event("wazuh", event_id):
573 640
                 skipped_existing += 1
574 641
                 continue
642
+            # Severity filter — skip alerts below minimum threshold
643
+            event_order = _SEVERITY_ORDER.get((event.get("severity") or "low").lower(), 1)
644
+            if event_order < min_order:
645
+                skipped_filtered += 1
646
+                continue
575 647
             try:
576 648
                 if event.get("event_type") in {"ioc_dns", "ioc_ips"}:
577 649
                     ioc_evaluated += 1
@@ -598,11 +670,11 @@ class MvpService:
598 670
                     else:
599 671
                         ioc_rejected += 1
600 672
                 else:
601
-                    result = await self.ingest_incident(event)
673
+                    result = await self.ingest_wazuh_alert_to_iris(event)
602 674
                     ingested += 1
603
-                    incident_key = str(result.get("incident_key", ""))
604
-                    if incident_key:
605
-                        created_incidents.append(incident_key)
675
+                    iris_alert_id = result.get("iris_alert_id")
676
+                    if iris_alert_id:
677
+                        created_incidents.append(str(iris_alert_id))
606 678
             except Exception as exc:
607 679
                 failed += 1
608 680
                 errors.append(f"{event_id or 'unknown_event'}: {exc}")
@@ -611,14 +683,16 @@ class MvpService:
611 683
             "query": query,
612 684
             "window_minutes": minutes,
613 685
             "limit": limit,
686
+            "min_severity_applied": effective_min,
614 687
             "processed": processed,
615 688
             "ingested": ingested,
616 689
             "skipped_existing": skipped_existing,
690
+            "skipped_filtered": skipped_filtered,
617 691
             "failed": failed,
618 692
             "ioc_evaluated": ioc_evaluated,
619 693
             "ioc_matched": ioc_matched,
620 694
             "ioc_rejected": ioc_rejected,
621
-            "incident_keys": created_incidents,
695
+            "iris_alert_ids": created_incidents,
622 696
             "errors": errors[:10],
623 697
             "total_hits": (raw.get("hits", {}).get("total", {}) if isinstance(raw, dict) else {}),
624 698
         }