from __future__ import annotations import ipaddress import math from collections import defaultdict, deque from datetime import datetime, timezone from typing import Any from app.adapters.geoip import GeoIpAdapter from app.config import settings from app.repositories.mvp_repo import MvpRepository class CDetectionService: def __init__(self, repo: MvpRepository, geoip_adapter: GeoIpAdapter) -> None: self.repo = repo self.geoip_adapter = geoip_adapter def _to_dt(self, value: Any) -> datetime: if isinstance(value, datetime): return value.astimezone(timezone.utc) text = str(value or "") if not text: return datetime.now(timezone.utc) return datetime.fromisoformat(text.replace("Z", "+00:00")).astimezone(timezone.utc) def _to_bool(self, value: Any) -> bool: if isinstance(value, bool): return value return str(value).strip().lower() in {"1", "true", "yes", "y", "success", "ok"} def _to_int(self, value: Any, default: int = 0) -> int: try: return int(value) except (TypeError, ValueError): return default def _is_public_ip(self, value: str) -> bool: try: ip = ipaddress.ip_address(value) except ValueError: return False return not ( ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local or ip.is_multicast or ip.is_unspecified ) def _haversine_km(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: radius = 6371.0 d_lat = math.radians(lat2 - lat1) d_lon = math.radians(lon2 - lon1) a = ( math.sin(d_lat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lon / 2) ** 2 ) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return radius * c def _selector_enabled(self, selectors: set[str], usecase_id: str, section: str) -> bool: if not selectors: return True usecase = usecase_id.lower() sec = section.lower() return usecase in selectors or sec in selectors async def _resolve_geo(self, event: dict[str, Any]) -> dict[str, Any]: network = event.get("network", {}) if isinstance(event.get("network"), dict) else {} src_ip = str(network.get("src_ip") or "").strip() country = str(network.get("country") or "").strip().upper() geo = { "country": country or None, "lat": network.get("src_lat"), "lon": network.get("src_lon"), "src_ip": src_ip, "geo_source": "event", "geo_error": None, } if geo["country"] and geo["lat"] is not None and geo["lon"] is not None: return geo if src_ip and self._is_public_ip(src_ip): lookup = await self.geoip_adapter.lookup(src_ip) if lookup.get("ok"): geo["country"] = geo["country"] or str(lookup.get("country_code") or "").upper() or None geo["lat"] = lookup.get("lat") geo["lon"] = lookup.get("lon") geo["geo_source"] = str(lookup.get("provider") or "geoip") else: geo["geo_error"] = lookup.get("error") return geo async def evaluate(self, events: list[dict[str, Any]], selectors: list[str] | None = None) -> dict[str, Any]: selector_set = {str(s).strip().lower() for s in (selectors or []) if str(s).strip()} sorted_events = sorted(events, key=lambda e: self._to_dt(e.get("timestamp"))) matches: list[dict[str, Any]] = [] triggered_once: set[tuple[str, str]] = set() c3_user_hosts: dict[str, deque[tuple[datetime, str]]] = defaultdict(deque) c3_src_targets: dict[str, deque[tuple[datetime, str, int]]] = defaultdict(deque) for event in sorted_events: timestamp = self._to_dt(event.get("timestamp")) asset = event.get("asset", {}) if isinstance(event.get("asset"), dict) else {} network = event.get("network", {}) if isinstance(event.get("network"), dict) else {} payload = event.get("payload", {}) if isinstance(event.get("payload"), dict) else {} user = str(asset.get("user") or payload.get("account") or "").strip().lower() src_ip = str(network.get("src_ip") or "").strip() dst_ip = str(network.get("dst_ip") or "").strip() dst_host = str(network.get("dst_host") or payload.get("dst_host") or dst_ip or "").strip().lower() dst_port = self._to_int(network.get("dst_port") or payload.get("dst_port")) event_type = str(event.get("event_type") or "").lower() action = str(payload.get("event_action") or payload.get("action") or "").lower() logon_type = str(payload.get("logon_type") or "").lower() account_type = str(asset.get("account_type") or payload.get("account_type") or "").lower() is_admin = self._to_bool(asset.get("is_admin") or payload.get("is_admin")) or user in {"admin", "administrator"} is_service = self._to_bool(asset.get("is_service") or payload.get("is_service")) or user.startswith("svc_") is_success_login = self._to_bool(payload.get("success")) or action.endswith("success") or "login-success" in action if not is_success_login and self._to_int(payload.get("event_id")) == 4624: is_success_login = True is_priv_esc = ( "privilege" in event_type or "group_add" in event_type or self._to_int(payload.get("event_id")) in {4728, 4732} ) is_sensitive_access = ( "share" in event_type or "rdp" in event_type or "smb" in event_type or self._to_int(payload.get("event_id")) in {5145} ) # C1-01 Impossible travel if user and is_success_login and self._selector_enabled(selector_set, "C1-01", "c1"): geo = await self._resolve_geo(event) state_key = f"c1:last_login:{user}" prev = self.repo.get_correlation_state(state_key) or {} prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None prev_country = str(prev.get("country") or "").upper() prev_lat = prev.get("lat") prev_lon = prev.get("lon") prev_geo_source = prev.get("geo_source") if ( prev_ts and prev_country and geo.get("country") and prev_country != geo["country"] and prev_lat is not None and prev_lon is not None and geo.get("lat") is not None and geo.get("lon") is not None ): delta_seconds = max(1.0, (timestamp - prev_ts).total_seconds()) distance_km = self._haversine_km(float(prev_lat), float(prev_lon), float(geo["lat"]), float(geo["lon"])) speed_kmph = distance_km / (delta_seconds / 3600.0) if speed_kmph > float(settings.c1_max_travel_speed_kmph): trigger_key = ("C1-01", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C1-01", "section": "c1", "severity": "high", "entity": user, "event": event, "evidence": { "prev_country": prev_country, "current_country": geo["country"], "prev_src_ip": prev.get("src_ip"), "current_src_ip": src_ip, "prev_geo_source": prev_geo_source, "current_geo_source": geo.get("geo_source"), "distance_km": round(distance_km, 2), "travel_seconds": int(delta_seconds), "speed_kmph": round(speed_kmph, 2), "threshold_kmph": settings.c1_max_travel_speed_kmph, }, } ) self.repo.upsert_correlation_state( state_key, { "timestamp": timestamp.isoformat(), "country": geo.get("country"), "lat": geo.get("lat"), "lon": geo.get("lon"), "src_ip": src_ip, "geo_source": geo.get("geo_source"), }, ) # C2-01 privileged off-hours login if user and is_success_login and is_admin and self._selector_enabled(selector_set, "C2-01", "c2"): hour = timestamp.hour if hour >= settings.c2_offhours_start_utc or hour < settings.c2_offhours_end_utc: trigger_key = ("C2-01", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C2-01", "section": "c2", "severity": "high", "entity": user, "event": event, "evidence": {"hour_utc": hour, "offhours_start_utc": settings.c2_offhours_start_utc}, } ) # C2-02 dormant account activation if user and is_success_login and self._selector_enabled(selector_set, "C2-02", "c2"): state_key = f"c2:last_success:{user}" prev = self.repo.get_correlation_state(state_key) or {} prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None dormant_days = 30 if prev_ts: gap_days = (timestamp - prev_ts).total_seconds() / 86400.0 if gap_days >= dormant_days: trigger_key = ("C2-02", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C2-02", "section": "c2", "severity": "medium", "entity": user, "event": event, "evidence": {"gap_days": round(gap_days, 2), "threshold_days": dormant_days}, } ) self.repo.upsert_correlation_state(state_key, {"timestamp": timestamp.isoformat(), "src_ip": src_ip}) # C2-03 service account interactive if user and is_success_login and self._selector_enabled(selector_set, "C2-03", "c2"): interactive = logon_type in {"2", "10", "interactive", "remoteinteractive", "rdp", "console"} if is_service and interactive: trigger_key = ("C2-03", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C2-03", "section": "c2", "severity": "high", "entity": user, "event": event, "evidence": {"account_type": account_type or "service", "logon_type": logon_type}, } ) # C2-04 rapid privilege escalation then sensitive access if user and self._selector_enabled(selector_set, "C2-04", "c2"): state_key = f"c2:last_priv_esc:{user}" if is_priv_esc: self.repo.upsert_correlation_state(state_key, {"timestamp": timestamp.isoformat(), "src_ip": src_ip}) elif is_sensitive_access: prev = self.repo.get_correlation_state(state_key) or {} prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None if prev_ts: gap_seconds = (timestamp - prev_ts).total_seconds() if 0 <= gap_seconds <= 900: trigger_key = ("C2-04", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C2-04", "section": "c2", "severity": "high", "entity": user, "event": event, "evidence": {"gap_seconds": int(gap_seconds), "threshold_seconds": 900}, } ) # C3-01 multi-host auth success if user and (not is_admin) and is_success_login and dst_host and self._selector_enabled(selector_set, "C3-01", "c3"): dq = c3_user_hosts[user] dq.append((timestamp, dst_host)) while dq and (timestamp - dq[0][0]).total_seconds() > 600: dq.popleft() unique_hosts = {item[1] for item in dq} if len(unique_hosts) >= int(settings.c3_host_spread_threshold): trigger_key = ("C3-01", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C3-01", "section": "c3", "severity": "high", "entity": user, "event": event, "evidence": {"unique_hosts": sorted(unique_hosts), "window_seconds": 600}, } ) # C3-02/C3-04 by source IP bursts if src_ip and dst_host and (not is_admin) and self._selector_enabled(selector_set, "C3-02", "c3"): dq = c3_src_targets[src_ip] dq.append((timestamp, dst_host, dst_port)) while dq and (timestamp - dq[0][0]).total_seconds() > 600: dq.popleft() lateral_targets = {host for _, host, port in dq if port in {3389, 445}} if len(lateral_targets) >= 4: trigger_key = ("C3-02", src_ip) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C3-02", "section": "c3", "severity": "high", "entity": src_ip, "event": event, "evidence": {"protocol_ports": [3389, 445], "targets": sorted(lateral_targets)}, } ) if self._selector_enabled(selector_set, "C3-04", "c3"): port_set = {port for _, _, port in dq if port > 0} host_set = {host for _, host, _ in dq} if len(port_set) >= int(settings.c3_scan_port_threshold) or len(host_set) >= int(settings.c3_host_spread_threshold * 2): trigger_key = ("C3-04", src_ip) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C3-04", "section": "c3", "severity": "medium", "entity": src_ip, "event": event, "evidence": {"unique_ports": len(port_set), "unique_hosts": len(host_set), "window_seconds": 600}, } ) # C3-03 admin many servers rapidly if user and is_admin and is_success_login and dst_host and self._selector_enabled(selector_set, "C3-03", "c3"): dq = c3_user_hosts[f"admin:{user}"] dq.append((timestamp, dst_host)) while dq and (timestamp - dq[0][0]).total_seconds() > 600: dq.popleft() unique_hosts = {item[1] for item in dq} if len(unique_hosts) >= int(settings.c3_host_spread_threshold): trigger_key = ("C3-03", user) if trigger_key not in triggered_once: triggered_once.add(trigger_key) matches.append( { "usecase_id": "C3-03", "section": "c3", "severity": "critical", "entity": user, "event": event, "evidence": {"unique_hosts": sorted(unique_hosts), "is_admin": True, "window_seconds": 600}, } ) deduped: list[dict[str, Any]] = [] seen: set[tuple[str, str, str]] = set() for match in matches: ts = str((match.get("event") or {}).get("timestamp") or "") key = (str(match.get("usecase_id")), str(match.get("entity")), ts) if key in seen: continue seen.add(key) deduped.append(match) summary = { "processed": len(sorted_events), "matched": len(deduped), "by_usecase": dict( sorted( ((k, len([m for m in deduped if m.get("usecase_id") == k])) for k in {m.get("usecase_id") for m in deduped}), key=lambda item: item[0], ) ), } return {"summary": summary, "matches": deduped}