tum 1 month ago
parent
commit
f14e344bb5

+ 14 - 0
README.md

@@ -62,6 +62,7 @@ Main sections:
62 62
 
63 63
 - Legacy integration APIs (`/wazuh/*`, `/shuffle/*`, `/action/*`)
64 64
 - MVP orchestration APIs (`/mvp/*`)
65
+- Wazuh-to-MVP sync API (`/wazuh/sync-to-mvp`)
65 66
 
66 67
 ### MVP endpoints
67 68
 
@@ -118,6 +119,19 @@ Scenarios:
118 119
 
119 120
 See `scripts/README.md` for details.
120 121
 
122
+Sync Wazuh alerts from indexer into MVP pipeline:
123
+
124
+```bash
125
+curl -X POST "http://localhost:8088/wazuh/sync-to-mvp?limit=50&minutes=120&q=*" \
126
+  -H 'X-Internal-API-Key: dev-internal-key'
127
+```
128
+
129
+Notes:
130
+
131
+- This sync reads from `wazuh-alerts-*` in Wazuh indexer.
132
+- Re-running sync is safe; dedupe is applied by `source + event_id`.
133
+- Your `send-wazuh-test-events.sh` traffic appears only after Wazuh rules generate alerts.
134
+
121 135
 ## Logs
122 136
 
123 137
 All logs (non-follow):

+ 3 - 0
soc-integrator/.env

@@ -11,6 +11,9 @@ SOC_INTEGRATOR_DB_PASSWORD=soc_integrator_password
11 11
 WAZUH_BASE_URL=https://wazuh.manager:55000
12 12
 WAZUH_USERNAME=wazuh-wui
13 13
 WAZUH_PASSWORD=MyS3cr37P450r.*-
14
+WAZUH_INDEXER_URL=https://wazuh.indexer:9200
15
+WAZUH_INDEXER_USERNAME=admin
16
+WAZUH_INDEXER_PASSWORD=SecretPassword
14 17
 
15 18
 SHUFFLE_BASE_URL=http://shuffle-backend:5001
16 19
 SHUFFLE_API_KEY=95286d1a-1d02-4dd6-8de1-9832e326871f

+ 3 - 0
soc-integrator/.env.example

@@ -11,6 +11,9 @@ SOC_INTEGRATOR_DB_PASSWORD=soc_integrator_password
11 11
 WAZUH_BASE_URL=https://wazuh.manager:55000
12 12
 WAZUH_USERNAME=wazuh-wui
13 13
 WAZUH_PASSWORD=MyS3cr37P450r.*-
14
+WAZUH_INDEXER_URL=https://wazuh.indexer:9200
15
+WAZUH_INDEXER_USERNAME=admin
16
+WAZUH_INDEXER_PASSWORD=SecretPassword
14 17
 
15 18
 SHUFFLE_BASE_URL=http://shuffle-backend:5001
16 19
 SHUFFLE_API_KEY=

+ 50 - 1
soc-integrator/app/adapters/wazuh.py

@@ -4,10 +4,21 @@ import httpx
4 4
 
5 5
 
6 6
 class WazuhAdapter:
7
-    def __init__(self, base_url: str, username: str, password: str) -> None:
7
+    def __init__(
8
+        self,
9
+        base_url: str,
10
+        username: str,
11
+        password: str,
12
+        indexer_url: str | None = None,
13
+        indexer_username: str | None = None,
14
+        indexer_password: str | None = None,
15
+    ) -> None:
8 16
         self.base_url = base_url.rstrip("/")
9 17
         self.username = username
10 18
         self.password = password
19
+        self.indexer_url = (indexer_url or "").rstrip("/")
20
+        self.indexer_username = indexer_username
21
+        self.indexer_password = indexer_password
11 22
 
12 23
     async def _authenticate(self, client: httpx.AsyncClient) -> str:
13 24
         auth_url = f"{self.base_url}/security/user/authenticate?raw=true"
@@ -80,3 +91,41 @@ class WazuhAdapter:
80 91
         async with httpx.AsyncClient(verify=False, timeout=20.0) as client:
81 92
             token = await self._authenticate(client)
82 93
             return await self._get_with_bearer(client, token, "/manager/logs", params=params)
94
+
95
+    async def search_alerts(
96
+        self,
97
+        query: str,
98
+        limit: int = 50,
99
+        minutes: int = 120,
100
+    ) -> dict[str, Any]:
101
+        if not self.indexer_url:
102
+            raise RuntimeError("Wazuh indexer URL is not configured.")
103
+
104
+        body: dict[str, Any] = {
105
+            "size": limit,
106
+            "sort": [{"@timestamp": {"order": "desc"}}],
107
+            "query": {
108
+                "bool": {
109
+                    "must": [{"query_string": {"query": query}}],
110
+                    "filter": [
111
+                        {
112
+                            "range": {
113
+                                "@timestamp": {
114
+                                    "gte": f"now-{minutes}m",
115
+                                    "lte": "now",
116
+                                }
117
+                            }
118
+                        }
119
+                    ],
120
+                }
121
+            },
122
+        }
123
+
124
+        async with httpx.AsyncClient(
125
+            verify=False,
126
+            timeout=20.0,
127
+            auth=(self.indexer_username, self.indexer_password),
128
+        ) as client:
129
+            response = await client.post(f"{self.indexer_url}/wazuh-alerts-*/_search", json=body)
130
+            response.raise_for_status()
131
+            return response.json()

+ 3 - 0
soc-integrator/app/config.py

@@ -18,6 +18,9 @@ class Settings(BaseSettings):
18 18
     wazuh_base_url: str = "https://wazuh.manager:55000"
19 19
     wazuh_username: str = "wazuh-wui"
20 20
     wazuh_password: str = "MyS3cr37P450r.*-"
21
+    wazuh_indexer_url: str = "https://wazuh.indexer:9200"
22
+    wazuh_indexer_username: str = "admin"
23
+    wazuh_indexer_password: str = "SecretPassword"
21 24
 
22 25
     shuffle_base_url: str = "http://shuffle-backend:5001"
23 26
     shuffle_api_key: str = ""

+ 3 - 0
soc-integrator/app/db.py

@@ -81,5 +81,8 @@ def init_schema() -> None:
81 81
             "CREATE INDEX IF NOT EXISTS idx_incident_events_incident_key_created_at ON incident_events(incident_key, created_at DESC);"
82 82
         )
83 83
         cur.execute(
84
+            "CREATE INDEX IF NOT EXISTS idx_incident_events_source_event_id ON incident_events(source, event_id);"
85
+        )
86
+        cur.execute(
84 87
             "CREATE INDEX IF NOT EXISTS idx_escalation_audit_incident_key_attempted_at ON escalation_audit(incident_key, attempted_at DESC);"
85 88
         )

+ 17 - 1
soc-integrator/app/main.py

@@ -1,4 +1,4 @@
1
-from fastapi import FastAPI, HTTPException
1
+from fastapi import Depends, FastAPI, HTTPException
2 2
 
3 3
 from app.adapters.iris import IrisAdapter
4 4
 from app.adapters.pagerduty import PagerDutyAdapter
@@ -25,6 +25,9 @@ wazuh_adapter = WazuhAdapter(
25 25
     base_url=settings.wazuh_base_url,
26 26
     username=settings.wazuh_username,
27 27
     password=settings.wazuh_password,
28
+    indexer_url=settings.wazuh_indexer_url,
29
+    indexer_username=settings.wazuh_indexer_username,
30
+    indexer_password=settings.wazuh_indexer_password,
28 31
 )
29 32
 shuffle_adapter = ShuffleAdapter(
30 33
     base_url=settings.shuffle_base_url,
@@ -297,3 +300,16 @@ async def wazuh_manager_logs(
297 300
     except Exception as exc:
298 301
         raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
299 302
     return ApiResponse(data={"wazuh": result})
303
+
304
+
305
+@app.post("/wazuh/sync-to-mvp", response_model=ApiResponse, dependencies=[Depends(require_internal_api_key)])
306
+async def wazuh_sync_to_mvp(
307
+    limit: int = 50,
308
+    minutes: int = 120,
309
+    q: str = "soc_mvp_test=true OR event_type:*",
310
+) -> ApiResponse:
311
+    try:
312
+        result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
313
+    except Exception as exc:
314
+        raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
315
+    return ApiResponse(data={"sync": result})

+ 8 - 0
soc-integrator/app/repositories/mvp_repo.py

@@ -37,6 +37,14 @@ DEFAULT_POLICY: dict[str, Any] = {
37 37
 
38 38
 
39 39
 class MvpRepository:
40
+    def has_event(self, source: str, event_id: str) -> bool:
41
+        with get_conn() as conn, conn.cursor() as cur:
42
+            cur.execute(
43
+                "SELECT 1 FROM incident_events WHERE source = %s AND event_id = %s LIMIT 1",
44
+                (source, event_id),
45
+            )
46
+            return cur.fetchone() is not None
47
+
40 48
     def ensure_policy(self) -> None:
41 49
         with get_conn() as conn, conn.cursor() as cur:
42 50
             cur.execute("SELECT id FROM policy_config WHERE id = 1")

+ 134 - 0
soc-integrator/app/services/mvp_service.py

@@ -1,6 +1,7 @@
1 1
 from __future__ import annotations
2 2
 
3 3
 import hashlib
4
+import re
4 5
 import time
5 6
 from datetime import datetime, timezone
6 7
 from typing import Any
@@ -112,6 +113,93 @@ class MvpService:
112 113
             return str(data.get("case_id"))
113 114
         return None
114 115
 
116
+    def _parse_kv_pairs(self, text: str) -> dict[str, str]:
117
+        pattern = r"([A-Za-z0-9_]+)=('(?:[^']*)'|\"(?:[^\"]*)\"|[^\\s]+)"
118
+        out: dict[str, str] = {}
119
+        for key, raw in re.findall(pattern, text):
120
+            value = raw.strip().strip("'").strip('"')
121
+            out[key] = value
122
+        return out
123
+
124
+    def _severity_from_rule_level(self, rule_level: Any) -> str:
125
+        try:
126
+            level = int(rule_level)
127
+        except (TypeError, ValueError):
128
+            return "medium"
129
+        if level >= 12:
130
+            return "critical"
131
+        if level >= 8:
132
+            return "high"
133
+        if level >= 4:
134
+            return "medium"
135
+        return "low"
136
+
137
+    def _event_type_from_text(self, text: str, parsed: dict[str, str]) -> str:
138
+        explicit = parsed.get("event_type")
139
+        if explicit:
140
+            return explicit
141
+        lowered = text.lower()
142
+        if "vpn" in lowered and ("geo" in lowered or "country" in lowered):
143
+            return "vpn_geo_anomaly"
144
+        if "domain" in lowered or "dns" in lowered:
145
+            return "ioc_dns"
146
+        if "c2" in lowered or "ips" in lowered or "ip " in lowered:
147
+            return "ioc_ips"
148
+        if "auth" in lowered and "fail" in lowered:
149
+            return "auth_anomaly"
150
+        return "generic"
151
+
152
+    def _normalize_wazuh_hit(self, hit: dict[str, Any]) -> dict[str, Any]:
153
+        src = hit.get("_source", {})
154
+        full_log = str(src.get("full_log", ""))
155
+        parsed = self._parse_kv_pairs(full_log)
156
+        event_id = str(parsed.get("event_id") or src.get("id") or hit.get("_id") or f"wazuh-{int(time.time())}")
157
+        timestamp = (
158
+            src.get("@timestamp")
159
+            or src.get("timestamp")
160
+            or datetime.now(timezone.utc).isoformat()
161
+        )
162
+        rule = src.get("rule", {}) if isinstance(src.get("rule"), dict) else {}
163
+        rule_desc = str(rule.get("description") or "")
164
+        event_type = self._event_type_from_text(full_log, parsed)
165
+        severity = str(parsed.get("severity", "")).lower() or self._severity_from_rule_level(rule.get("level"))
166
+
167
+        src_ip = parsed.get("src_ip")
168
+        dst_ip = parsed.get("dst_ip")
169
+        domain = parsed.get("query") or parsed.get("domain")
170
+        country = parsed.get("country")
171
+        user = parsed.get("user") or (src.get("agent", {}) or {}).get("name")
172
+
173
+        title = rule_desc or f"Wazuh alert {rule.get('id', '')}".strip()
174
+        description = full_log or rule_desc or "Wazuh alert"
175
+
176
+        return {
177
+            "source": "wazuh",
178
+            "event_type": event_type,
179
+            "event_id": event_id,
180
+            "timestamp": timestamp,
181
+            "severity": severity if severity in {"low", "medium", "high", "critical"} else "medium",
182
+            "title": title,
183
+            "description": description,
184
+            "asset": {
185
+                "user": user,
186
+                "hostname": (src.get("agent", {}) or {}).get("name"),
187
+                "agent_id": (src.get("agent", {}) or {}).get("id"),
188
+            },
189
+            "network": {
190
+                "src_ip": src_ip,
191
+                "dst_ip": dst_ip,
192
+                "domain": domain,
193
+                "country": country,
194
+            },
195
+            "tags": ["wazuh", event_type, f"rule_{rule.get('id', 'unknown')}"],
196
+            "risk_context": {
197
+                "outside_thailand": bool(country and str(country).upper() != "TH"),
198
+            },
199
+            "raw": src,
200
+            "payload": {},
201
+        }
202
+
115 203
     async def ingest_incident(self, event: dict[str, Any]) -> dict[str, Any]:
116 204
         policy = self.repo.get_policy()
117 205
         incident_key = self._incident_key(event)
@@ -310,6 +398,52 @@ class MvpService:
310 398
             "escalation_stub_sent": ingest_result.get("escalation_stub_sent", False),
311 399
         }
312 400
 
401
+    async def sync_wazuh_alerts(
402
+        self,
403
+        query: str = "soc_mvp_test=true OR event_type:*",
404
+        limit: int = 50,
405
+        minutes: int = 120,
406
+    ) -> dict[str, Any]:
407
+        raw = await self.wazuh_adapter.search_alerts(query=query, limit=limit, minutes=minutes)
408
+        hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
409
+
410
+        processed = 0
411
+        ingested = 0
412
+        skipped_existing = 0
413
+        failed = 0
414
+        errors: list[str] = []
415
+        created_incidents: list[str] = []
416
+
417
+        for hit in hits:
418
+            processed += 1
419
+            event = self._normalize_wazuh_hit(hit)
420
+            event_id = str(event.get("event_id", "")).strip()
421
+            if event_id and self.repo.has_event("wazuh", event_id):
422
+                skipped_existing += 1
423
+                continue
424
+            try:
425
+                result = await self.ingest_incident(event)
426
+                ingested += 1
427
+                incident_key = str(result.get("incident_key", ""))
428
+                if incident_key:
429
+                    created_incidents.append(incident_key)
430
+            except Exception as exc:
431
+                failed += 1
432
+                errors.append(f"{event_id or 'unknown_event'}: {exc}")
433
+
434
+        return {
435
+            "query": query,
436
+            "window_minutes": minutes,
437
+            "limit": limit,
438
+            "processed": processed,
439
+            "ingested": ingested,
440
+            "skipped_existing": skipped_existing,
441
+            "failed": failed,
442
+            "incident_keys": created_incidents,
443
+            "errors": errors[:10],
444
+            "total_hits": (raw.get("hits", {}).get("total", {}) if isinstance(raw, dict) else {}),
445
+        }
446
+
313 447
     async def dependency_health(self) -> dict[str, Any]:
314 448
         out: dict[str, Any] = {}
315 449