| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- from __future__ import annotations
- import ipaddress
- import math
- from collections import defaultdict, deque
- from datetime import datetime, timezone
- from typing import Any
- from app.adapters.geoip import GeoIpAdapter
- from app.config import settings
- from app.repositories.mvp_repo import MvpRepository
- class CDetectionService:
- def __init__(self, repo: MvpRepository, geoip_adapter: GeoIpAdapter) -> None:
- self.repo = repo
- self.geoip_adapter = geoip_adapter
- def _to_dt(self, value: Any) -> datetime:
- if isinstance(value, datetime):
- return value.astimezone(timezone.utc)
- text = str(value or "")
- if not text:
- return datetime.now(timezone.utc)
- return datetime.fromisoformat(text.replace("Z", "+00:00")).astimezone(timezone.utc)
- def _to_bool(self, value: Any) -> bool:
- if isinstance(value, bool):
- return value
- return str(value).strip().lower() in {"1", "true", "yes", "y", "success", "ok"}
- def _to_int(self, value: Any, default: int = 0) -> int:
- try:
- return int(value)
- except (TypeError, ValueError):
- return default
- def _is_public_ip(self, value: str) -> bool:
- try:
- ip = ipaddress.ip_address(value)
- except ValueError:
- return False
- return not (
- ip.is_private
- or ip.is_loopback
- or ip.is_reserved
- or ip.is_link_local
- or ip.is_multicast
- or ip.is_unspecified
- )
- def _haversine_km(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
- radius = 6371.0
- d_lat = math.radians(lat2 - lat1)
- d_lon = math.radians(lon2 - lon1)
- a = (
- math.sin(d_lat / 2) ** 2
- + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lon / 2) ** 2
- )
- c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
- return radius * c
- def _selector_enabled(self, selectors: set[str], usecase_id: str, section: str) -> bool:
- if not selectors:
- return True
- usecase = usecase_id.lower()
- sec = section.lower()
- return usecase in selectors or sec in selectors
- async def _resolve_geo(self, event: dict[str, Any]) -> dict[str, Any]:
- network = event.get("network", {}) if isinstance(event.get("network"), dict) else {}
- src_ip = str(network.get("src_ip") or "").strip()
- country = str(network.get("country") or "").strip().upper()
- geo = {
- "country": country or None,
- "lat": network.get("src_lat"),
- "lon": network.get("src_lon"),
- "src_ip": src_ip,
- "geo_source": "event",
- "geo_error": None,
- }
- if geo["country"] and geo["lat"] is not None and geo["lon"] is not None:
- return geo
- if src_ip and self._is_public_ip(src_ip):
- lookup = await self.geoip_adapter.lookup(src_ip)
- if lookup.get("ok"):
- geo["country"] = geo["country"] or str(lookup.get("country_code") or "").upper() or None
- geo["lat"] = lookup.get("lat")
- geo["lon"] = lookup.get("lon")
- geo["geo_source"] = str(lookup.get("provider") or "geoip")
- else:
- geo["geo_error"] = lookup.get("error")
- return geo
- async def evaluate(self, events: list[dict[str, Any]], selectors: list[str] | None = None) -> dict[str, Any]:
- selector_set = {str(s).strip().lower() for s in (selectors or []) if str(s).strip()}
- sorted_events = sorted(events, key=lambda e: self._to_dt(e.get("timestamp")))
- matches: list[dict[str, Any]] = []
- triggered_once: set[tuple[str, str]] = set()
- c3_user_hosts: dict[str, deque[tuple[datetime, str]]] = defaultdict(deque)
- c3_src_targets: dict[str, deque[tuple[datetime, str, int]]] = defaultdict(deque)
- for event in sorted_events:
- timestamp = self._to_dt(event.get("timestamp"))
- asset = event.get("asset", {}) if isinstance(event.get("asset"), dict) else {}
- network = event.get("network", {}) if isinstance(event.get("network"), dict) else {}
- payload = event.get("payload", {}) if isinstance(event.get("payload"), dict) else {}
- user = str(asset.get("user") or payload.get("account") or "").strip().lower()
- src_ip = str(network.get("src_ip") or "").strip()
- dst_ip = str(network.get("dst_ip") or "").strip()
- dst_host = str(network.get("dst_host") or payload.get("dst_host") or dst_ip or "").strip().lower()
- dst_port = self._to_int(network.get("dst_port") or payload.get("dst_port"))
- event_type = str(event.get("event_type") or "").lower()
- action = str(payload.get("event_action") or payload.get("action") or "").lower()
- logon_type = str(payload.get("logon_type") or "").lower()
- account_type = str(asset.get("account_type") or payload.get("account_type") or "").lower()
- is_admin = self._to_bool(asset.get("is_admin") or payload.get("is_admin")) or user in {"admin", "administrator"}
- is_service = self._to_bool(asset.get("is_service") or payload.get("is_service")) or user.startswith("svc_")
- is_success_login = self._to_bool(payload.get("success")) or action.endswith("success") or "login-success" in action
- if not is_success_login and self._to_int(payload.get("event_id")) == 4624:
- is_success_login = True
- is_priv_esc = (
- "privilege" in event_type
- or "group_add" in event_type
- or self._to_int(payload.get("event_id")) in {4728, 4732}
- )
- is_sensitive_access = (
- "share" in event_type
- or "rdp" in event_type
- or "smb" in event_type
- or self._to_int(payload.get("event_id")) in {5145}
- )
- # C1-01 Impossible travel
- if user and is_success_login and self._selector_enabled(selector_set, "C1-01", "c1"):
- geo = await self._resolve_geo(event)
- state_key = f"c1:last_login:{user}"
- prev = self.repo.get_correlation_state(state_key) or {}
- prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None
- prev_country = str(prev.get("country") or "").upper()
- prev_lat = prev.get("lat")
- prev_lon = prev.get("lon")
- prev_geo_source = prev.get("geo_source")
- if (
- prev_ts
- and prev_country
- and geo.get("country")
- and prev_country != geo["country"]
- and prev_lat is not None
- and prev_lon is not None
- and geo.get("lat") is not None
- and geo.get("lon") is not None
- ):
- delta_seconds = max(1.0, (timestamp - prev_ts).total_seconds())
- distance_km = self._haversine_km(float(prev_lat), float(prev_lon), float(geo["lat"]), float(geo["lon"]))
- speed_kmph = distance_km / (delta_seconds / 3600.0)
- if speed_kmph > float(settings.c1_max_travel_speed_kmph):
- trigger_key = ("C1-01", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C1-01",
- "section": "c1",
- "severity": "high",
- "entity": user,
- "event": event,
- "evidence": {
- "prev_country": prev_country,
- "current_country": geo["country"],
- "prev_src_ip": prev.get("src_ip"),
- "current_src_ip": src_ip,
- "prev_geo_source": prev_geo_source,
- "current_geo_source": geo.get("geo_source"),
- "distance_km": round(distance_km, 2),
- "travel_seconds": int(delta_seconds),
- "speed_kmph": round(speed_kmph, 2),
- "threshold_kmph": settings.c1_max_travel_speed_kmph,
- },
- }
- )
- self.repo.upsert_correlation_state(
- state_key,
- {
- "timestamp": timestamp.isoformat(),
- "country": geo.get("country"),
- "lat": geo.get("lat"),
- "lon": geo.get("lon"),
- "src_ip": src_ip,
- "geo_source": geo.get("geo_source"),
- },
- )
- # C2-01 privileged off-hours login
- if user and is_success_login and is_admin and self._selector_enabled(selector_set, "C2-01", "c2"):
- hour = timestamp.hour
- if hour >= settings.c2_offhours_start_utc or hour < settings.c2_offhours_end_utc:
- trigger_key = ("C2-01", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C2-01",
- "section": "c2",
- "severity": "high",
- "entity": user,
- "event": event,
- "evidence": {"hour_utc": hour, "offhours_start_utc": settings.c2_offhours_start_utc},
- }
- )
- # C2-02 dormant account activation
- if user and is_success_login and self._selector_enabled(selector_set, "C2-02", "c2"):
- state_key = f"c2:last_success:{user}"
- prev = self.repo.get_correlation_state(state_key) or {}
- prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None
- dormant_days = 30
- if prev_ts:
- gap_days = (timestamp - prev_ts).total_seconds() / 86400.0
- if gap_days >= dormant_days:
- trigger_key = ("C2-02", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C2-02",
- "section": "c2",
- "severity": "medium",
- "entity": user,
- "event": event,
- "evidence": {"gap_days": round(gap_days, 2), "threshold_days": dormant_days},
- }
- )
- self.repo.upsert_correlation_state(state_key, {"timestamp": timestamp.isoformat(), "src_ip": src_ip})
- # C2-03 service account interactive
- if user and is_success_login and self._selector_enabled(selector_set, "C2-03", "c2"):
- interactive = logon_type in {"2", "10", "interactive", "remoteinteractive", "rdp", "console"}
- if is_service and interactive:
- trigger_key = ("C2-03", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C2-03",
- "section": "c2",
- "severity": "high",
- "entity": user,
- "event": event,
- "evidence": {"account_type": account_type or "service", "logon_type": logon_type},
- }
- )
- # C2-04 rapid privilege escalation then sensitive access
- if user and self._selector_enabled(selector_set, "C2-04", "c2"):
- state_key = f"c2:last_priv_esc:{user}"
- if is_priv_esc:
- self.repo.upsert_correlation_state(state_key, {"timestamp": timestamp.isoformat(), "src_ip": src_ip})
- elif is_sensitive_access:
- prev = self.repo.get_correlation_state(state_key) or {}
- prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None
- if prev_ts:
- gap_seconds = (timestamp - prev_ts).total_seconds()
- if 0 <= gap_seconds <= 900:
- trigger_key = ("C2-04", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C2-04",
- "section": "c2",
- "severity": "high",
- "entity": user,
- "event": event,
- "evidence": {"gap_seconds": int(gap_seconds), "threshold_seconds": 900},
- }
- )
- # C3-01 multi-host auth success
- if user and (not is_admin) and is_success_login and dst_host and self._selector_enabled(selector_set, "C3-01", "c3"):
- dq = c3_user_hosts[user]
- dq.append((timestamp, dst_host))
- while dq and (timestamp - dq[0][0]).total_seconds() > 600:
- dq.popleft()
- unique_hosts = {item[1] for item in dq}
- if len(unique_hosts) >= int(settings.c3_host_spread_threshold):
- trigger_key = ("C3-01", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C3-01",
- "section": "c3",
- "severity": "high",
- "entity": user,
- "event": event,
- "evidence": {"unique_hosts": sorted(unique_hosts), "window_seconds": 600},
- }
- )
- # C3-02/C3-04 by source IP bursts
- if src_ip and dst_host and (not is_admin) and self._selector_enabled(selector_set, "C3-02", "c3"):
- dq = c3_src_targets[src_ip]
- dq.append((timestamp, dst_host, dst_port))
- while dq and (timestamp - dq[0][0]).total_seconds() > 600:
- dq.popleft()
- lateral_targets = {host for _, host, port in dq if port in {3389, 445}}
- if len(lateral_targets) >= 4:
- trigger_key = ("C3-02", src_ip)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C3-02",
- "section": "c3",
- "severity": "high",
- "entity": src_ip,
- "event": event,
- "evidence": {"protocol_ports": [3389, 445], "targets": sorted(lateral_targets)},
- }
- )
- if self._selector_enabled(selector_set, "C3-04", "c3"):
- port_set = {port for _, _, port in dq if port > 0}
- host_set = {host for _, host, _ in dq}
- if len(port_set) >= int(settings.c3_scan_port_threshold) or len(host_set) >= int(settings.c3_host_spread_threshold * 2):
- trigger_key = ("C3-04", src_ip)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C3-04",
- "section": "c3",
- "severity": "medium",
- "entity": src_ip,
- "event": event,
- "evidence": {"unique_ports": len(port_set), "unique_hosts": len(host_set), "window_seconds": 600},
- }
- )
- # C3-03 admin many servers rapidly
- if user and is_admin and is_success_login and dst_host and self._selector_enabled(selector_set, "C3-03", "c3"):
- dq = c3_user_hosts[f"admin:{user}"]
- dq.append((timestamp, dst_host))
- while dq and (timestamp - dq[0][0]).total_seconds() > 600:
- dq.popleft()
- unique_hosts = {item[1] for item in dq}
- if len(unique_hosts) >= int(settings.c3_host_spread_threshold):
- trigger_key = ("C3-03", user)
- if trigger_key not in triggered_once:
- triggered_once.add(trigger_key)
- matches.append(
- {
- "usecase_id": "C3-03",
- "section": "c3",
- "severity": "critical",
- "entity": user,
- "event": event,
- "evidence": {"unique_hosts": sorted(unique_hosts), "is_admin": True, "window_seconds": 600},
- }
- )
- deduped: list[dict[str, Any]] = []
- seen: set[tuple[str, str, str]] = set()
- for match in matches:
- ts = str((match.get("event") or {}).get("timestamp") or "")
- key = (str(match.get("usecase_id")), str(match.get("entity")), ts)
- if key in seen:
- continue
- seen.add(key)
- deduped.append(match)
- summary = {
- "processed": len(sorted_events),
- "matched": len(deduped),
- "by_usecase": dict(
- sorted(
- ((k, len([m for m in deduped if m.get("usecase_id") == k])) for k in {m.get("usecase_id") for m in deduped}),
- key=lambda item: item[0],
- )
- ),
- }
- return {"summary": summary, "matches": deduped}
|