Ei kuvausta

c_detection_service.py 19KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. from __future__ import annotations
  2. import ipaddress
  3. import math
  4. from collections import defaultdict, deque
  5. from datetime import datetime, timezone
  6. from typing import Any
  7. from app.adapters.geoip import GeoIpAdapter
  8. from app.config import settings
  9. from app.repositories.mvp_repo import MvpRepository
  10. class CDetectionService:
  11. def __init__(self, repo: MvpRepository, geoip_adapter: GeoIpAdapter) -> None:
  12. self.repo = repo
  13. self.geoip_adapter = geoip_adapter
  14. def _to_dt(self, value: Any) -> datetime:
  15. if isinstance(value, datetime):
  16. return value.astimezone(timezone.utc)
  17. text = str(value or "")
  18. if not text:
  19. return datetime.now(timezone.utc)
  20. return datetime.fromisoformat(text.replace("Z", "+00:00")).astimezone(timezone.utc)
  21. def _to_bool(self, value: Any) -> bool:
  22. if isinstance(value, bool):
  23. return value
  24. return str(value).strip().lower() in {"1", "true", "yes", "y", "success", "ok"}
  25. def _to_int(self, value: Any, default: int = 0) -> int:
  26. try:
  27. return int(value)
  28. except (TypeError, ValueError):
  29. return default
  30. def _is_public_ip(self, value: str) -> bool:
  31. try:
  32. ip = ipaddress.ip_address(value)
  33. except ValueError:
  34. return False
  35. return not (
  36. ip.is_private
  37. or ip.is_loopback
  38. or ip.is_reserved
  39. or ip.is_link_local
  40. or ip.is_multicast
  41. or ip.is_unspecified
  42. )
  43. def _haversine_km(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
  44. radius = 6371.0
  45. d_lat = math.radians(lat2 - lat1)
  46. d_lon = math.radians(lon2 - lon1)
  47. a = (
  48. math.sin(d_lat / 2) ** 2
  49. + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lon / 2) ** 2
  50. )
  51. c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
  52. return radius * c
  53. def _selector_enabled(self, selectors: set[str], usecase_id: str, section: str) -> bool:
  54. if not selectors:
  55. return True
  56. usecase = usecase_id.lower()
  57. sec = section.lower()
  58. return usecase in selectors or sec in selectors
  59. async def _resolve_geo(self, event: dict[str, Any]) -> dict[str, Any]:
  60. network = event.get("network", {}) if isinstance(event.get("network"), dict) else {}
  61. src_ip = str(network.get("src_ip") or "").strip()
  62. country = str(network.get("country") or "").strip().upper()
  63. geo = {
  64. "country": country or None,
  65. "lat": network.get("src_lat"),
  66. "lon": network.get("src_lon"),
  67. "src_ip": src_ip,
  68. "geo_source": "event",
  69. "geo_error": None,
  70. }
  71. if geo["country"] and geo["lat"] is not None and geo["lon"] is not None:
  72. return geo
  73. if src_ip and self._is_public_ip(src_ip):
  74. lookup = await self.geoip_adapter.lookup(src_ip)
  75. if lookup.get("ok"):
  76. geo["country"] = geo["country"] or str(lookup.get("country_code") or "").upper() or None
  77. geo["lat"] = lookup.get("lat")
  78. geo["lon"] = lookup.get("lon")
  79. geo["geo_source"] = str(lookup.get("provider") or "geoip")
  80. else:
  81. geo["geo_error"] = lookup.get("error")
  82. return geo
  83. async def evaluate(self, events: list[dict[str, Any]], selectors: list[str] | None = None) -> dict[str, Any]:
  84. selector_set = {str(s).strip().lower() for s in (selectors or []) if str(s).strip()}
  85. sorted_events = sorted(events, key=lambda e: self._to_dt(e.get("timestamp")))
  86. matches: list[dict[str, Any]] = []
  87. triggered_once: set[tuple[str, str]] = set()
  88. c3_user_hosts: dict[str, deque[tuple[datetime, str]]] = defaultdict(deque)
  89. c3_src_targets: dict[str, deque[tuple[datetime, str, int]]] = defaultdict(deque)
  90. for event in sorted_events:
  91. timestamp = self._to_dt(event.get("timestamp"))
  92. asset = event.get("asset", {}) if isinstance(event.get("asset"), dict) else {}
  93. network = event.get("network", {}) if isinstance(event.get("network"), dict) else {}
  94. payload = event.get("payload", {}) if isinstance(event.get("payload"), dict) else {}
  95. user = str(asset.get("user") or payload.get("account") or "").strip().lower()
  96. src_ip = str(network.get("src_ip") or "").strip()
  97. dst_ip = str(network.get("dst_ip") or "").strip()
  98. dst_host = str(network.get("dst_host") or payload.get("dst_host") or dst_ip or "").strip().lower()
  99. dst_port = self._to_int(network.get("dst_port") or payload.get("dst_port"))
  100. event_type = str(event.get("event_type") or "").lower()
  101. action = str(payload.get("event_action") or payload.get("action") or "").lower()
  102. logon_type = str(payload.get("logon_type") or "").lower()
  103. account_type = str(asset.get("account_type") or payload.get("account_type") or "").lower()
  104. is_admin = self._to_bool(asset.get("is_admin") or payload.get("is_admin")) or user in {"admin", "administrator"}
  105. is_service = self._to_bool(asset.get("is_service") or payload.get("is_service")) or user.startswith("svc_")
  106. is_success_login = self._to_bool(payload.get("success")) or action.endswith("success") or "login-success" in action
  107. if not is_success_login and self._to_int(payload.get("event_id")) == 4624:
  108. is_success_login = True
  109. is_priv_esc = (
  110. "privilege" in event_type
  111. or "group_add" in event_type
  112. or self._to_int(payload.get("event_id")) in {4728, 4732}
  113. )
  114. is_sensitive_access = (
  115. "share" in event_type
  116. or "rdp" in event_type
  117. or "smb" in event_type
  118. or self._to_int(payload.get("event_id")) in {5145}
  119. )
  120. # C1-01 Impossible travel
  121. if user and is_success_login and self._selector_enabled(selector_set, "C1-01", "c1"):
  122. geo = await self._resolve_geo(event)
  123. state_key = f"c1:last_login:{user}"
  124. prev = self.repo.get_correlation_state(state_key) or {}
  125. prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None
  126. prev_country = str(prev.get("country") or "").upper()
  127. prev_lat = prev.get("lat")
  128. prev_lon = prev.get("lon")
  129. prev_geo_source = prev.get("geo_source")
  130. if (
  131. prev_ts
  132. and prev_country
  133. and geo.get("country")
  134. and prev_country != geo["country"]
  135. and prev_lat is not None
  136. and prev_lon is not None
  137. and geo.get("lat") is not None
  138. and geo.get("lon") is not None
  139. ):
  140. delta_seconds = max(1.0, (timestamp - prev_ts).total_seconds())
  141. distance_km = self._haversine_km(float(prev_lat), float(prev_lon), float(geo["lat"]), float(geo["lon"]))
  142. speed_kmph = distance_km / (delta_seconds / 3600.0)
  143. if speed_kmph > float(settings.c1_max_travel_speed_kmph):
  144. trigger_key = ("C1-01", user)
  145. if trigger_key not in triggered_once:
  146. triggered_once.add(trigger_key)
  147. matches.append(
  148. {
  149. "usecase_id": "C1-01",
  150. "section": "c1",
  151. "severity": "high",
  152. "entity": user,
  153. "event": event,
  154. "evidence": {
  155. "prev_country": prev_country,
  156. "current_country": geo["country"],
  157. "prev_src_ip": prev.get("src_ip"),
  158. "current_src_ip": src_ip,
  159. "prev_geo_source": prev_geo_source,
  160. "current_geo_source": geo.get("geo_source"),
  161. "distance_km": round(distance_km, 2),
  162. "travel_seconds": int(delta_seconds),
  163. "speed_kmph": round(speed_kmph, 2),
  164. "threshold_kmph": settings.c1_max_travel_speed_kmph,
  165. },
  166. }
  167. )
  168. self.repo.upsert_correlation_state(
  169. state_key,
  170. {
  171. "timestamp": timestamp.isoformat(),
  172. "country": geo.get("country"),
  173. "lat": geo.get("lat"),
  174. "lon": geo.get("lon"),
  175. "src_ip": src_ip,
  176. "geo_source": geo.get("geo_source"),
  177. },
  178. )
  179. # C2-01 privileged off-hours login
  180. if user and is_success_login and is_admin and self._selector_enabled(selector_set, "C2-01", "c2"):
  181. hour = timestamp.hour
  182. if hour >= settings.c2_offhours_start_utc or hour < settings.c2_offhours_end_utc:
  183. trigger_key = ("C2-01", user)
  184. if trigger_key not in triggered_once:
  185. triggered_once.add(trigger_key)
  186. matches.append(
  187. {
  188. "usecase_id": "C2-01",
  189. "section": "c2",
  190. "severity": "high",
  191. "entity": user,
  192. "event": event,
  193. "evidence": {"hour_utc": hour, "offhours_start_utc": settings.c2_offhours_start_utc},
  194. }
  195. )
  196. # C2-02 dormant account activation
  197. if user and is_success_login and self._selector_enabled(selector_set, "C2-02", "c2"):
  198. state_key = f"c2:last_success:{user}"
  199. prev = self.repo.get_correlation_state(state_key) or {}
  200. prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None
  201. dormant_days = 30
  202. if prev_ts:
  203. gap_days = (timestamp - prev_ts).total_seconds() / 86400.0
  204. if gap_days >= dormant_days:
  205. trigger_key = ("C2-02", user)
  206. if trigger_key not in triggered_once:
  207. triggered_once.add(trigger_key)
  208. matches.append(
  209. {
  210. "usecase_id": "C2-02",
  211. "section": "c2",
  212. "severity": "medium",
  213. "entity": user,
  214. "event": event,
  215. "evidence": {"gap_days": round(gap_days, 2), "threshold_days": dormant_days},
  216. }
  217. )
  218. self.repo.upsert_correlation_state(state_key, {"timestamp": timestamp.isoformat(), "src_ip": src_ip})
  219. # C2-03 service account interactive
  220. if user and is_success_login and self._selector_enabled(selector_set, "C2-03", "c2"):
  221. interactive = logon_type in {"2", "10", "interactive", "remoteinteractive", "rdp", "console"}
  222. if is_service and interactive:
  223. trigger_key = ("C2-03", user)
  224. if trigger_key not in triggered_once:
  225. triggered_once.add(trigger_key)
  226. matches.append(
  227. {
  228. "usecase_id": "C2-03",
  229. "section": "c2",
  230. "severity": "high",
  231. "entity": user,
  232. "event": event,
  233. "evidence": {"account_type": account_type or "service", "logon_type": logon_type},
  234. }
  235. )
  236. # C2-04 rapid privilege escalation then sensitive access
  237. if user and self._selector_enabled(selector_set, "C2-04", "c2"):
  238. state_key = f"c2:last_priv_esc:{user}"
  239. if is_priv_esc:
  240. self.repo.upsert_correlation_state(state_key, {"timestamp": timestamp.isoformat(), "src_ip": src_ip})
  241. elif is_sensitive_access:
  242. prev = self.repo.get_correlation_state(state_key) or {}
  243. prev_ts = self._to_dt(prev.get("timestamp")) if prev.get("timestamp") else None
  244. if prev_ts:
  245. gap_seconds = (timestamp - prev_ts).total_seconds()
  246. if 0 <= gap_seconds <= 900:
  247. trigger_key = ("C2-04", user)
  248. if trigger_key not in triggered_once:
  249. triggered_once.add(trigger_key)
  250. matches.append(
  251. {
  252. "usecase_id": "C2-04",
  253. "section": "c2",
  254. "severity": "high",
  255. "entity": user,
  256. "event": event,
  257. "evidence": {"gap_seconds": int(gap_seconds), "threshold_seconds": 900},
  258. }
  259. )
  260. # C3-01 multi-host auth success
  261. if user and (not is_admin) and is_success_login and dst_host and self._selector_enabled(selector_set, "C3-01", "c3"):
  262. dq = c3_user_hosts[user]
  263. dq.append((timestamp, dst_host))
  264. while dq and (timestamp - dq[0][0]).total_seconds() > 600:
  265. dq.popleft()
  266. unique_hosts = {item[1] for item in dq}
  267. if len(unique_hosts) >= int(settings.c3_host_spread_threshold):
  268. trigger_key = ("C3-01", user)
  269. if trigger_key not in triggered_once:
  270. triggered_once.add(trigger_key)
  271. matches.append(
  272. {
  273. "usecase_id": "C3-01",
  274. "section": "c3",
  275. "severity": "high",
  276. "entity": user,
  277. "event": event,
  278. "evidence": {"unique_hosts": sorted(unique_hosts), "window_seconds": 600},
  279. }
  280. )
  281. # C3-02/C3-04 by source IP bursts
  282. if src_ip and dst_host and (not is_admin) and self._selector_enabled(selector_set, "C3-02", "c3"):
  283. dq = c3_src_targets[src_ip]
  284. dq.append((timestamp, dst_host, dst_port))
  285. while dq and (timestamp - dq[0][0]).total_seconds() > 600:
  286. dq.popleft()
  287. lateral_targets = {host for _, host, port in dq if port in {3389, 445}}
  288. if len(lateral_targets) >= 4:
  289. trigger_key = ("C3-02", src_ip)
  290. if trigger_key not in triggered_once:
  291. triggered_once.add(trigger_key)
  292. matches.append(
  293. {
  294. "usecase_id": "C3-02",
  295. "section": "c3",
  296. "severity": "high",
  297. "entity": src_ip,
  298. "event": event,
  299. "evidence": {"protocol_ports": [3389, 445], "targets": sorted(lateral_targets)},
  300. }
  301. )
  302. if self._selector_enabled(selector_set, "C3-04", "c3"):
  303. port_set = {port for _, _, port in dq if port > 0}
  304. host_set = {host for _, host, _ in dq}
  305. if len(port_set) >= int(settings.c3_scan_port_threshold) or len(host_set) >= int(settings.c3_host_spread_threshold * 2):
  306. trigger_key = ("C3-04", src_ip)
  307. if trigger_key not in triggered_once:
  308. triggered_once.add(trigger_key)
  309. matches.append(
  310. {
  311. "usecase_id": "C3-04",
  312. "section": "c3",
  313. "severity": "medium",
  314. "entity": src_ip,
  315. "event": event,
  316. "evidence": {"unique_ports": len(port_set), "unique_hosts": len(host_set), "window_seconds": 600},
  317. }
  318. )
  319. # C3-03 admin many servers rapidly
  320. if user and is_admin and is_success_login and dst_host and self._selector_enabled(selector_set, "C3-03", "c3"):
  321. dq = c3_user_hosts[f"admin:{user}"]
  322. dq.append((timestamp, dst_host))
  323. while dq and (timestamp - dq[0][0]).total_seconds() > 600:
  324. dq.popleft()
  325. unique_hosts = {item[1] for item in dq}
  326. if len(unique_hosts) >= int(settings.c3_host_spread_threshold):
  327. trigger_key = ("C3-03", user)
  328. if trigger_key not in triggered_once:
  329. triggered_once.add(trigger_key)
  330. matches.append(
  331. {
  332. "usecase_id": "C3-03",
  333. "section": "c3",
  334. "severity": "critical",
  335. "entity": user,
  336. "event": event,
  337. "evidence": {"unique_hosts": sorted(unique_hosts), "is_admin": True, "window_seconds": 600},
  338. }
  339. )
  340. deduped: list[dict[str, Any]] = []
  341. seen: set[tuple[str, str, str]] = set()
  342. for match in matches:
  343. ts = str((match.get("event") or {}).get("timestamp") or "")
  344. key = (str(match.get("usecase_id")), str(match.get("entity")), ts)
  345. if key in seen:
  346. continue
  347. seen.add(key)
  348. deduped.append(match)
  349. summary = {
  350. "processed": len(sorted_events),
  351. "matched": len(deduped),
  352. "by_usecase": dict(
  353. sorted(
  354. ((k, len([m for m in deduped if m.get("usecase_id") == k])) for k in {m.get("usecase_id") for m in deduped}),
  355. key=lambda item: item[0],
  356. )
  357. ),
  358. }
  359. return {"summary": summary, "matches": deduped}