">
@@ -50,10 +58,73 @@ mvp_service = MvpService(
50 58
 app.include_router(build_mvp_router(mvp_service, require_internal_api_key))
51 59
 
52 60
 
61
+async def _wazuh_auto_sync_loop() -> None:
62
+    interval = max(5, int(settings.wazuh_auto_sync_interval_seconds))
63
+    while True:
64
+        started_at = datetime.now(timezone.utc).isoformat()
65
+        try:
66
+            app.state.wazuh_auto_sync_state["running"] = True
67
+            app.state.wazuh_auto_sync_state["last_started_at"] = started_at
68
+            result = await mvp_service.sync_wazuh_alerts(
69
+                query=settings.wazuh_auto_sync_query,
70
+                limit=settings.wazuh_auto_sync_limit,
71
+                minutes=settings.wazuh_auto_sync_minutes,
72
+            )
73
+            app.state.wazuh_auto_sync_state["last_status"] = "ok"
74
+            app.state.wazuh_auto_sync_state["last_result"] = result
75
+            app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
76
+            logger.info(
77
+                "wazuh auto-sync processed=%s ingested=%s skipped=%s failed=%s ioc_evaluated=%s ioc_matched=%s ioc_rejected=%s",
78
+                result.get("processed", 0),
79
+                result.get("ingested", 0),
80
+                result.get("skipped_existing", 0),
81
+                result.get("failed", 0),
82
+                result.get("ioc_evaluated", 0),
83
+                result.get("ioc_matched", 0),
84
+                result.get("ioc_rejected", 0),
85
+            )
86
+        except Exception as exc:
87
+            app.state.wazuh_auto_sync_state["last_status"] = "error"
88
+            app.state.wazuh_auto_sync_state["last_error"] = str(exc)
89
+            app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
90
+            logger.exception("wazuh auto-sync failed: %s", exc)
91
+        finally:
92
+            app.state.wazuh_auto_sync_state["running"] = False
93
+        await asyncio.sleep(interval)
94
+
95
+
53 96
 @app.on_event("startup")
54 97
 async def startup() -> None:
55 98
     init_schema()
56 99
     repo.ensure_policy()
100
+    app.state.wazuh_auto_sync_state = {
101
+        "running": False,
102
+        "last_status": None,
103
+        "last_started_at": None,
104
+        "last_finished_at": None,
105
+        "last_error": None,
106
+        "last_result": None,
107
+    }
108
+    if settings.wazuh_auto_sync_enabled:
109
+        app.state.wazuh_auto_sync_task = asyncio.create_task(_wazuh_auto_sync_loop())
110
+        logger.info(
111
+            "wazuh auto-sync enabled interval=%ss limit=%s minutes=%s query=%s",
112
+            settings.wazuh_auto_sync_interval_seconds,
113
+            settings.wazuh_auto_sync_limit,
114
+            settings.wazuh_auto_sync_minutes,
115
+            settings.wazuh_auto_sync_query,
116
+        )
117
+
118
+
119
+@app.on_event("shutdown")
120
+async def shutdown() -> None:
121
+    task = getattr(app.state, "wazuh_auto_sync_task", None)
122
+    if task:
123
+        task.cancel()
124
+        try:
125
+            await task
126
+        except asyncio.CancelledError:
127
+            pass
57 128
 
58 129
 
59 130
 @app.get("/health", response_model=ApiResponse)
@@ -297,3 +368,35 @@ async def wazuh_manager_logs(
297 368
     except Exception as exc:
298 369
         raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
299 370
     return ApiResponse(data={"wazuh": result})
371
+
372
+
373
+@app.post("/wazuh/sync-to-mvp", response_model=ApiResponse, dependencies=[Depends(require_internal_api_key)])
374
+async def wazuh_sync_to_mvp(
375
+    limit: int = 50,
376
+    minutes: int = 120,
377
+    q: str = "soc_mvp_test=true OR event_type:*",
378
+) -> ApiResponse:
379
+    try:
380
+        result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
381
+    except Exception as exc:
382
+        raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
383
+    return ApiResponse(data={"sync": result})
384
+
385
+
386
+@app.get("/wazuh/auto-sync/status", response_model=ApiResponse)
387
+async def wazuh_auto_sync_status() -> ApiResponse:
388
+    state = getattr(app.state, "wazuh_auto_sync_state", {})
389
+    task = getattr(app.state, "wazuh_auto_sync_task", None)
390
+    return ApiResponse(
391
+        data={
392
+            "enabled": settings.wazuh_auto_sync_enabled,
393
+            "task_running": bool(task and not task.done()),
394
+            "settings": {
395
+                "interval_seconds": settings.wazuh_auto_sync_interval_seconds,
396
+                "limit": settings.wazuh_auto_sync_limit,
397
+                "minutes": settings.wazuh_auto_sync_minutes,
398
+                "query": settings.wazuh_auto_sync_query,
399
+            },
400
+            "state": state,
401
+        }
402
+    )

+ 8 - 0
soc-integrator/app/repositories/mvp_repo.py

@@ -37,6 +37,14 @@ DEFAULT_POLICY: dict[str, Any] = {
37 37
 
38 38
 
39 39
 class MvpRepository:
40
+    def has_event(self, source: str, event_id: str) -> bool:
41
+        with get_conn() as conn, conn.cursor() as cur:
42
+            cur.execute(
43
+                "SELECT 1 FROM incident_events WHERE source = %s AND event_id = %s LIMIT 1",
44
+                (source, event_id),
45
+            )
46
+            return cur.fetchone() is not None
47
+
40 48
     def ensure_policy(self) -> None:
41 49
         with get_conn() as conn, conn.cursor() as cur:
42 50
             cur.execute("SELECT id FROM policy_config WHERE id = 1")

+ 254 - 8
soc-integrator/app/services/mvp_service.py

@@ -1,6 +1,8 @@
1 1
 from __future__ import annotations
2 2
 
3 3
 import hashlib
4
+import logging
5
+import re
4 6
 import time
5 7
 from datetime import datetime, timezone
6 8
 from typing import Any
@@ -14,6 +16,8 @@ from app.adapters.wazuh import WazuhAdapter
14 16
 from app.config import settings
15 17
 from app.repositories.mvp_repo import MvpRepository
16 18
 
19
+logger = logging.getLogger(__name__)
20
+
17 21
 
18 22
 class MvpService:
19 23
     def __init__(
@@ -112,6 +116,146 @@ class MvpService:
112 116
             return str(data.get("case_id"))
113 117
         return None
114 118
 
119
+    def _parse_kv_pairs(self, text: str) -> dict[str, str]:
120
+        pattern = r"([A-Za-z0-9_]+)=('(?:[^']*)'|\"(?:[^\"]*)\"|[^\s]+)"
121
+        out: dict[str, str] = {}
122
+        for key, raw in re.findall(pattern, text):
123
+            value = raw.strip().strip("'").strip('"')
124
+            out[key] = value
125
+        return out
126
+
127
+    def _severity_from_rule_level(self, rule_level: Any) -> str:
128
+        try:
129
+            level = int(rule_level)
130
+        except (TypeError, ValueError):
131
+            return "medium"
132
+        if level >= 12:
133
+            return "critical"
134
+        if level >= 8:
135
+            return "high"
136
+        if level >= 4:
137
+            return "medium"
138
+        return "low"
139
+
140
+    def _event_type_from_text(self, text: str, parsed: dict[str, str]) -> str:
141
+        explicit = parsed.get("event_type")
142
+        if explicit:
143
+            return explicit
144
+        lowered = text.lower()
145
+        if "vpn" in lowered and ("geo" in lowered or "country" in lowered):
146
+            return "vpn_geo_anomaly"
147
+        if "domain" in lowered or "dns" in lowered:
148
+            return "ioc_dns"
149
+        if "c2" in lowered or "ips" in lowered or "ip " in lowered:
150
+            return "ioc_ips"
151
+        if "auth" in lowered and "fail" in lowered:
152
+            return "auth_anomaly"
153
+        return "generic"
154
+
155
+    def _normalize_wazuh_hit(self, hit: dict[str, Any]) -> dict[str, Any]:
156
+        src = hit.get("_source", {})
157
+        full_log = str(src.get("full_log", ""))
158
+        parsed = self._parse_kv_pairs(full_log)
159
+        event_id = str(parsed.get("event_id") or src.get("id") or hit.get("_id") or f"wazuh-{int(time.time())}")
160
+        timestamp = (
161
+            src.get("@timestamp")
162
+            or src.get("timestamp")
163
+            or datetime.now(timezone.utc).isoformat()
164
+        )
165
+        rule = src.get("rule", {}) if isinstance(src.get("rule"), dict) else {}
166
+        rule_desc = str(rule.get("description") or "")
167
+        event_type = self._event_type_from_text(full_log, parsed)
168
+        severity = str(parsed.get("severity", "")).lower() or self._severity_from_rule_level(rule.get("level"))
169
+
170
+        src_ip = parsed.get("src_ip")
171
+        dst_ip = parsed.get("dst_ip")
172
+        domain = parsed.get("query") or parsed.get("domain")
173
+        country = parsed.get("country")
174
+        user = parsed.get("user") or (src.get("agent", {}) or {}).get("name")
175
+
176
+        title = rule_desc or f"Wazuh alert {rule.get('id', '')}".strip()
177
+        description = full_log or rule_desc or "Wazuh alert"
178
+
179
+        return {
180
+            "source": "wazuh",
181
+            "event_type": event_type,
182
+            "event_id": event_id,
183
+            "timestamp": timestamp,
184
+            "severity": severity if severity in {"low", "medium", "high", "critical"} else "medium",
185
+            "title": title,
186
+            "description": description,
187
+            "asset": {
188
+                "user": user,
189
+                "hostname": (src.get("agent", {}) or {}).get("name"),
190
+                "agent_id": (src.get("agent", {}) or {}).get("id"),
191
+            },
192
+            "network": {
193
+                "src_ip": src_ip,
194
+                "dst_ip": dst_ip,
195
+                "domain": domain,
196
+                "country": country,
197
+            },
198
+            "tags": ["wazuh", event_type, f"rule_{rule.get('id', 'unknown')}"],
199
+            "risk_context": {
200
+                "outside_thailand": bool(country and str(country).upper() != "TH"),
201
+            },
202
+            "raw": src,
203
+            "payload": {},
204
+        }
205
+
206
+    def _to_float(self, value: Any, default: float = 0.0) -> float:
207
+        try:
208
+            return float(value)
209
+        except (TypeError, ValueError):
210
+            return default
211
+
212
+    def _severity_from_confidence(self, confidence: float) -> str:
213
+        if confidence >= 0.9:
214
+            return "high"
215
+        if confidence >= 0.7:
216
+            return "medium"
217
+        return "low"
218
+
219
+    def _extract_shuffle_verdict(self, shuffle_result: dict[str, Any] | None) -> dict[str, Any]:
220
+        if not isinstance(shuffle_result, dict):
221
+            return {
222
+                "matched": False,
223
+                "confidence": 0.0,
224
+                "severity": "low",
225
+                "evidence": "",
226
+                "iocs": [],
227
+                "reason": "no_shuffle_result",
228
+            }
229
+
230
+        flat = dict(shuffle_result)
231
+        nested = shuffle_result.get("result")
232
+        if isinstance(nested, dict):
233
+            merged = dict(nested)
234
+            merged.update(flat)
235
+            flat = merged
236
+
237
+        confidence = self._to_float(flat.get("confidence"), 0.0)
238
+        matched_raw = flat.get("matched")
239
+        if isinstance(matched_raw, bool):
240
+            matched = matched_raw
241
+            reason = "shuffle_explicit"
242
+        else:
243
+            matched = confidence >= 0.7
244
+            reason = "confidence_threshold_fallback"
245
+
246
+        severity_raw = str(flat.get("severity", "")).lower()
247
+        severity = severity_raw if severity_raw in {"low", "medium", "high", "critical"} else self._severity_from_confidence(confidence)
248
+
249
+        return {
250
+            "matched": matched,
251
+            "confidence": confidence,
252
+            "severity": severity,
253
+            "evidence": str(flat.get("evidence", "")),
254
+            "iocs": flat.get("iocs", []),
255
+            "reason": reason,
256
+            "raw": shuffle_result,
257
+        }
258
+
115 259
     async def ingest_incident(self, event: dict[str, Any]) -> dict[str, Any]:
116 260
         policy = self.repo.get_policy()
117 261
         incident_key = self._incident_key(event)
@@ -233,27 +377,48 @@ class MvpService:
233 377
         policy = self.repo.get_policy()
234 378
         workflow_id = str(policy.get("shuffle", {}).get("ioc_workflow_id", "")).strip()
235 379
 
236
-        matched = True
237
-        confidence = 0.7
238 380
         shuffle_result: dict[str, Any] | None = None
239 381
 
240 382
         if workflow_id:
241 383
             shuffle_result = await self.shuffle_adapter.trigger_workflow(workflow_id, payload)
384
+        verdict = self._extract_shuffle_verdict(shuffle_result)
385
+        matched = bool(verdict["matched"])
386
+        confidence = self._to_float(verdict["confidence"], 0.0)
387
+
388
+        logger.info(
389
+            "ioc evaluation workflow_id=%s matched=%s confidence=%.2f",
390
+            workflow_id or "<none>",
391
+            matched,
392
+            confidence,
393
+        )
242 394
 
243 395
         if matched:
396
+            src_event = payload.get("source_event", {})
397
+            event_id = src_event.get("event_id") or f"ioc-{int(time.time())}"
398
+            if not isinstance(event_id, str):
399
+                event_id = str(event_id)
400
+
401
+            description = f"IOC evaluation result confidence={confidence:.2f}"
402
+            evidence = str(verdict.get("evidence", "")).strip()
403
+            if evidence:
404
+                description = f"{description} evidence={evidence[:180]}"
405
+
244 406
             event = {
245 407
                 "source": "shuffle",
246 408
                 "event_type": "ioc_dns" if payload.get("ioc_type") == "domain" else "ioc_ips",
247
-                "event_id": payload.get("source_event", {}).get("event_id") or f"ioc-{int(time.time())}",
409
+                "event_id": event_id,
248 410
                 "timestamp": datetime.now(timezone.utc).isoformat(),
249
-                "severity": "medium",
411
+                "severity": verdict["severity"],
250 412
                 "title": f"IOC match: {payload.get('ioc_value', 'unknown')}",
251
-                "description": "IOC evaluation result",
252
-                "asset": payload.get("source_event", {}).get("asset", {}),
253
-                "network": payload.get("source_event", {}).get("network", {}),
413
+                "description": description,
414
+                "asset": src_event.get("asset", {}),
415
+                "network": src_event.get("network", {}),
254 416
                 "tags": ["ioc", str(payload.get("ioc_type", "unknown"))],
255 417
                 "risk_context": {},
256
-                "raw": payload,
418
+                "raw": {
419
+                    "payload": payload,
420
+                    "shuffle": verdict.get("raw"),
421
+                },
257 422
                 "payload": {},
258 423
             }
259 424
             ingest_result = await self.ingest_incident(event)
@@ -263,6 +428,10 @@ class MvpService:
263 428
         return {
264 429
             "matched": matched,
265 430
             "confidence": confidence,
431
+            "severity": verdict["severity"],
432
+            "evidence": verdict["evidence"],
433
+            "iocs": verdict["iocs"],
434
+            "decision_source": verdict["reason"],
266 435
             "shuffle": shuffle_result,
267 436
             "result": ingest_result,
268 437
         }
@@ -310,6 +479,83 @@ class MvpService:
310 479
             "escalation_stub_sent": ingest_result.get("escalation_stub_sent", False),
311 480
         }
312 481
 
482
+    async def sync_wazuh_alerts(
483
+        self,
484
+        query: str = "soc_mvp_test=true OR event_type:*",
485
+        limit: int = 50,
486
+        minutes: int = 120,
487
+    ) -> dict[str, Any]:
488
+        raw = await self.wazuh_adapter.search_alerts(query=query, limit=limit, minutes=minutes)
489
+        hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
490
+
491
+        processed = 0
492
+        ingested = 0
493
+        skipped_existing = 0
494
+        failed = 0
495
+        errors: list[str] = []
496
+        created_incidents: list[str] = []
497
+        ioc_evaluated = 0
498
+        ioc_matched = 0
499
+        ioc_rejected = 0
500
+
501
+        for hit in hits:
502
+            processed += 1
503
+            event = self._normalize_wazuh_hit(hit)
504
+            event_id = str(event.get("event_id", "")).strip()
505
+            if event_id and self.repo.has_event("wazuh", event_id):
506
+                skipped_existing += 1
507
+                continue
508
+            try:
509
+                if event.get("event_type") in {"ioc_dns", "ioc_ips"}:
510
+                    ioc_evaluated += 1
511
+                    payload = {
512
+                        "ioc_type": "domain" if event.get("event_type") == "ioc_dns" else "ip",
513
+                        "ioc_value": (event.get("network", {}) or {}).get("domain")
514
+                        or (event.get("network", {}) or {}).get("dst_ip")
515
+                        or (event.get("network", {}) or {}).get("src_ip")
516
+                        or "unknown",
517
+                        "source_event": {
518
+                            "event_id": event.get("event_id"),
519
+                            "asset": event.get("asset", {}),
520
+                            "network": event.get("network", {}),
521
+                            "raw": event.get("raw", {}),
522
+                        },
523
+                    }
524
+                    ioc_result = await self.evaluate_ioc(payload)
525
+                    if ioc_result.get("matched"):
526
+                        ioc_matched += 1
527
+                        ingested += 1
528
+                        incident_key = str((ioc_result.get("result", {}) or {}).get("incident_key", ""))
529
+                        if incident_key:
530
+                            created_incidents.append(incident_key)
531
+                    else:
532
+                        ioc_rejected += 1
533
+                else:
534
+                    result = await self.ingest_incident(event)
535
+                    ingested += 1
536
+                    incident_key = str(result.get("incident_key", ""))
537
+                    if incident_key:
538
+                        created_incidents.append(incident_key)
539
+            except Exception as exc:
540
+                failed += 1
541
+                errors.append(f"{event_id or 'unknown_event'}: {exc}")
542
+
543
+        return {
544
+            "query": query,
545
+            "window_minutes": minutes,
546
+            "limit": limit,
547
+            "processed": processed,
548
+            "ingested": ingested,
549
+            "skipped_existing": skipped_existing,
550
+            "failed": failed,
551
+            "ioc_evaluated": ioc_evaluated,
552
+            "ioc_matched": ioc_matched,
553
+            "ioc_rejected": ioc_rejected,
554
+            "incident_keys": created_incidents,
555
+            "errors": errors[:10],
556
+            "total_hits": (raw.get("hits", {}).get("total", {}) if isinstance(raw, dict) else {}),
557
+        }
558
+
313 559
     async def dependency_health(self) -> dict[str, Any]:
314 560
         out: dict[str, Any] = {}
315 561
 

+ 7 - 0
wazuh-docker/single-node/config/wazuh_cluster/local_rules.xml

@@ -0,0 +1,7 @@
1
+<group name="soc_mvp_test,">
2
+  <rule id="100200" level="10">
3
+    <match>soc_mvp_test=true</match>
4
+    <description>SOC MVP synthetic test event detected</description>
5
+    <group>soc_mvp_test,syslog,</group>
6
+  </rule>
7
+</group>

+ 6 - 0
wazuh-docker/single-node/config/wazuh_cluster/wazuh_manager.conf

@@ -30,6 +30,12 @@
30 30
     <protocol>tcp</protocol>
31 31
     <queue_size>131072</queue_size>
32 32
   </remote>
33
+  <remote>
34
+    <connection>syslog</connection>
35
+    <port>514</port>
36
+    <protocol>udp</protocol>
37
+    <allowed-ips>0.0.0.0/0</allowed-ips>
38
+  </remote>
33 39
 
34 40
   <!-- Policy monitoring -->
35 41
   <rootcheck>

+ 2 - 1
wazuh-docker/single-node/docker-compose.yml

@@ -41,7 +41,8 @@ services:
41 41
       - ./config/wazuh_indexer_ssl_certs/root-ca-manager.pem:/etc/ssl/root-ca.pem
42 42
       - ./config/wazuh_indexer_ssl_certs/wazuh.manager.pem:/etc/ssl/filebeat.pem
43 43
       - ./config/wazuh_indexer_ssl_certs/wazuh.manager-key.pem:/etc/ssl/filebeat.key
44
-      - ./config/wazuh_cluster/wazuh_manager.conf:/wazuh-config-mount/etc/ossec.conf
44
+      - ./config/wazuh_cluster/wazuh_manager.conf:/var/ossec/etc/ossec.conf
45
+      - ./config/wazuh_cluster/local_rules.xml:/var/ossec/etc/rules/local_rules.xml
45 46
 
46 47
   wazuh.indexer:
47 48
     image: wazuh/wazuh-indexer:4.14.3

tum/soc - Gogs: Simplico Git Service

1 次代碼提交 (main)

作者 SHA1 備註 提交日期
  tum 0de071e7c9 soc update 3 周之前