Aucune description

main.py 74KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082
  1. import asyncio
  2. import logging
  3. import os
  4. import re
  5. import shlex
  6. import subprocess
  7. import uuid
  8. from collections import deque
  9. from datetime import datetime, timedelta, timezone
  10. from pathlib import Path
  11. from fastapi import Depends, FastAPI, File, HTTPException, Request, UploadFile
  12. from fastapi.responses import FileResponse, Response
  13. from fastapi.staticfiles import StaticFiles
  14. from app.adapters.abuseipdb import AbuseIpdbAdapter
  15. from app.adapters.geoip import GeoIpAdapter
  16. from app.adapters.iris import IrisAdapter
  17. from app.adapters.pagerduty import PagerDutyAdapter
  18. from app.adapters.shuffle import ShuffleAdapter
  19. from app.adapters.virustotal import VirusTotalAdapter
  20. from app.adapters.wazuh import WazuhAdapter
  21. from app.config import settings
  22. from app.db import init_schema
  23. from app.models import (
  24. ActionCreateIncidentRequest,
  25. ApiResponse,
  26. CDetectionEvaluateRequest,
  27. IocEnrichRequest,
  28. IocEvaluateRequest,
  29. IrisTicketCreateRequest,
  30. LogLossCheckRequest,
  31. LogLossStreamCheck,
  32. SimLogRunRequest,
  33. ShuffleLoginRequest,
  34. ShuffleProxyRequest,
  35. TriggerShuffleRequest,
  36. WazuhIngestRequest,
  37. )
  38. from app.repositories.mvp_repo import MvpRepository
  39. from app.routes.mvp import build_mvp_router
  40. from app.security import require_internal_api_key
  41. from app.services.mvp_service import MvpService
  42. from app.services.c_detection_service import CDetectionService
  43. app = FastAPI(title=settings.app_name, version="0.1.0")
  44. logger = logging.getLogger(__name__)
  45. UI_DIR = Path(__file__).resolve().parent / "ui"
  46. UI_ASSETS_DIR = UI_DIR / "assets"
  47. SIM_SCRIPTS_DIR = Path("/app/scripts")
  48. SIM_RUN_LOGS_DIR = Path("/tmp/soc-integrator-sim-logs")
  49. wazuh_adapter = WazuhAdapter(
  50. base_url=settings.wazuh_base_url,
  51. username=settings.wazuh_username,
  52. password=settings.wazuh_password,
  53. indexer_url=settings.wazuh_indexer_url,
  54. indexer_username=settings.wazuh_indexer_username,
  55. indexer_password=settings.wazuh_indexer_password,
  56. )
  57. shuffle_adapter = ShuffleAdapter(
  58. base_url=settings.shuffle_base_url,
  59. api_key=settings.shuffle_api_key,
  60. )
  61. pagerduty_adapter = PagerDutyAdapter(
  62. base_url=settings.pagerduty_base_url,
  63. api_key=settings.pagerduty_api_key,
  64. )
  65. iris_adapter = IrisAdapter(
  66. base_url=settings.iris_base_url,
  67. api_key=settings.iris_api_key,
  68. )
  69. virustotal_adapter = VirusTotalAdapter(
  70. base_url=settings.virustotal_base_url,
  71. api_key=settings.virustotal_api_key,
  72. )
  73. abuseipdb_adapter = AbuseIpdbAdapter(
  74. base_url=settings.abuseipdb_base_url,
  75. api_key=settings.abuseipdb_api_key,
  76. )
  77. geoip_adapter = GeoIpAdapter(
  78. provider=settings.geoip_provider,
  79. cache_ttl_seconds=settings.geoip_cache_ttl_seconds,
  80. )
  81. repo = MvpRepository()
  82. mvp_service = MvpService(
  83. repo=repo,
  84. wazuh_adapter=wazuh_adapter,
  85. shuffle_adapter=shuffle_adapter,
  86. iris_adapter=iris_adapter,
  87. pagerduty_adapter=pagerduty_adapter,
  88. )
  89. c_detection_service = CDetectionService(
  90. repo=repo,
  91. geoip_adapter=geoip_adapter,
  92. )
  93. app.include_router(build_mvp_router(mvp_service, require_internal_api_key))
  94. app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets")
  95. @app.middleware("http")
  96. async def ui_no_cache_middleware(request: Request, call_next):
  97. response: Response = await call_next(request)
  98. if request.url.path == "/ui" or request.url.path.startswith("/ui/assets/"):
  99. response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
  100. response.headers["Pragma"] = "no-cache"
  101. response.headers["Expires"] = "0"
  102. return response
  103. DEFAULT_LOG_LOSS_STREAMS: list[dict[str, object]] = [
  104. {
  105. "name": "fortigate",
  106. "query": "full_log:fortigate OR full_log:FGT80F OR full_log:FGT60F OR full_log:FGT40F OR full_log:FGT501E",
  107. "min_count": 1,
  108. },
  109. {
  110. "name": "windows_agent",
  111. "query": "full_log:windows_agent OR full_log:windows",
  112. "min_count": 1,
  113. },
  114. {
  115. "name": "vmware",
  116. "query": "full_log:vmware",
  117. "min_count": 1,
  118. },
  119. {
  120. "name": "log_monitor",
  121. "query": "full_log:log_monitor OR rule.id:100411",
  122. "min_count": 1,
  123. },
  124. ]
  125. async def _execute_log_loss_check(
  126. req: LogLossCheckRequest,
  127. create_ticket: bool,
  128. ) -> dict[str, object]:
  129. minutes = max(1, int(req.minutes))
  130. streams = req.streams or [LogLossStreamCheck(**item) for item in DEFAULT_LOG_LOSS_STREAMS]
  131. items: list[dict[str, object]] = []
  132. loss_count = 0
  133. error_count = 0
  134. loss_stream_names: list[str] = []
  135. for stream in streams:
  136. min_count = max(0, int(stream.min_count))
  137. try:
  138. observed = await wazuh_adapter.count_alerts(query=stream.query, minutes=minutes)
  139. is_loss = observed < min_count
  140. if is_loss:
  141. loss_count += 1
  142. loss_stream_names.append(stream.name)
  143. items.append(
  144. {
  145. "name": stream.name,
  146. "query": stream.query,
  147. "minutes": minutes,
  148. "min_count": min_count,
  149. "observed_count": observed,
  150. "status": "loss" if is_loss else "ok",
  151. }
  152. )
  153. except Exception as exc:
  154. error_count += 1
  155. items.append(
  156. {
  157. "name": stream.name,
  158. "query": stream.query,
  159. "minutes": minutes,
  160. "min_count": min_count,
  161. "observed_count": None,
  162. "status": "error",
  163. "error": str(exc),
  164. }
  165. )
  166. summary = {
  167. "total_streams": len(items),
  168. "loss_streams": loss_count,
  169. "error_streams": error_count,
  170. "all_ok": loss_count == 0 and error_count == 0,
  171. }
  172. ticket_data: dict[str, object] | None = None
  173. if create_ticket and loss_count > 0:
  174. cooldown = max(0, int(settings.log_loss_monitor_ticket_cooldown_seconds))
  175. now_ts = datetime.now(timezone.utc).timestamp()
  176. state = app.state.log_loss_monitor_state
  177. last_ticket_ts = float(state.get("last_ticket_ts", 0.0) or 0.0)
  178. in_cooldown = cooldown > 0 and (now_ts - last_ticket_ts) < cooldown
  179. if not in_cooldown:
  180. title = f"Log loss detected ({loss_count} stream(s))"
  181. description = (
  182. f"Log-loss monitor detected missing telemetry in the last {minutes} minute(s). "
  183. f"Affected streams: {', '.join(loss_stream_names)}."
  184. )
  185. case_payload = {
  186. "case_name": title,
  187. "case_description": description,
  188. "case_customer": settings.iris_default_customer_id,
  189. "case_soc_id": settings.iris_default_soc_id,
  190. }
  191. try:
  192. iris_result = await iris_adapter.create_case(case_payload)
  193. state["last_ticket_ts"] = now_ts
  194. ticket_data = {"created": True, "iris": iris_result}
  195. except Exception as exc:
  196. ticket_data = {"created": False, "error": str(exc)}
  197. else:
  198. ticket_data = {
  199. "created": False,
  200. "skipped": "cooldown_active",
  201. "cooldown_seconds": cooldown,
  202. "last_ticket_ts": last_ticket_ts,
  203. }
  204. result: dict[str, object] = {
  205. "checked_at": datetime.now(timezone.utc).isoformat(),
  206. "minutes": minutes,
  207. "summary": summary,
  208. "streams": items,
  209. }
  210. if ticket_data is not None:
  211. result["ticket"] = ticket_data
  212. return result
  213. async def _log_loss_monitor_loop() -> None:
  214. interval = max(5, int(settings.log_loss_monitor_interval_seconds))
  215. while True:
  216. started_at = datetime.now(timezone.utc).isoformat()
  217. try:
  218. app.state.log_loss_monitor_state["running"] = True
  219. app.state.log_loss_monitor_state["last_started_at"] = started_at
  220. req = LogLossCheckRequest(minutes=max(1, int(settings.log_loss_monitor_window_minutes)))
  221. result = await _execute_log_loss_check(
  222. req=req,
  223. create_ticket=bool(settings.log_loss_monitor_create_iris_ticket),
  224. )
  225. app.state.log_loss_monitor_state["last_status"] = "ok"
  226. app.state.log_loss_monitor_state["last_result"] = result
  227. app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  228. logger.info(
  229. "log-loss monitor checked=%s loss=%s errors=%s",
  230. result.get("summary", {}).get("total_streams", 0),
  231. result.get("summary", {}).get("loss_streams", 0),
  232. result.get("summary", {}).get("error_streams", 0),
  233. )
  234. except Exception as exc:
  235. app.state.log_loss_monitor_state["last_status"] = "error"
  236. app.state.log_loss_monitor_state["last_error"] = str(exc)
  237. app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  238. logger.exception("log-loss monitor failed: %s", exc)
  239. finally:
  240. app.state.log_loss_monitor_state["running"] = False
  241. await asyncio.sleep(interval)
  242. async def _wazuh_auto_sync_loop() -> None:
  243. interval = max(5, int(settings.wazuh_auto_sync_interval_seconds))
  244. while True:
  245. started_at = datetime.now(timezone.utc).isoformat()
  246. try:
  247. app.state.wazuh_auto_sync_state["running"] = True
  248. app.state.wazuh_auto_sync_state["last_started_at"] = started_at
  249. result = await mvp_service.sync_wazuh_alerts(
  250. query=settings.wazuh_auto_sync_query,
  251. limit=settings.wazuh_auto_sync_limit,
  252. minutes=settings.wazuh_auto_sync_minutes,
  253. )
  254. app.state.wazuh_auto_sync_state["last_status"] = "ok"
  255. app.state.wazuh_auto_sync_state["last_result"] = result
  256. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  257. logger.info(
  258. "wazuh auto-sync processed=%s ingested=%s skipped=%s failed=%s ioc_evaluated=%s ioc_matched=%s ioc_rejected=%s",
  259. result.get("processed", 0),
  260. result.get("ingested", 0),
  261. result.get("skipped_existing", 0),
  262. result.get("failed", 0),
  263. result.get("ioc_evaluated", 0),
  264. result.get("ioc_matched", 0),
  265. result.get("ioc_rejected", 0),
  266. )
  267. except Exception as exc:
  268. app.state.wazuh_auto_sync_state["last_status"] = "error"
  269. app.state.wazuh_auto_sync_state["last_error"] = str(exc)
  270. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  271. logger.exception("wazuh auto-sync failed: %s", exc)
  272. finally:
  273. app.state.wazuh_auto_sync_state["running"] = False
  274. await asyncio.sleep(interval)
  275. def _c_match_to_incident_event(match: dict[str, object]) -> dict[str, object]:
  276. event = dict(match.get("event") or {})
  277. usecase_id = str(match.get("usecase_id") or "C-unknown")
  278. section = str(match.get("section") or "c")
  279. severity = str(match.get("severity") or "medium")
  280. entity = str(match.get("entity") or "unknown")
  281. evidence = dict(match.get("evidence") or {})
  282. source = str(event.get("source") or "wazuh")
  283. timestamp = str(event.get("timestamp") or datetime.now(timezone.utc).isoformat())
  284. event_id = str(event.get("event_id") or f"{usecase_id}-{int(datetime.now(timezone.utc).timestamp())}")
  285. payload = dict(event.get("payload") or {})
  286. asset = dict(event.get("asset") or {})
  287. network = dict(event.get("network") or {})
  288. event_type = "c2_credential_abuse"
  289. if section == "c1":
  290. event_type = "c1_impossible_travel"
  291. elif section == "c3":
  292. event_type = "c3_lateral_movement"
  293. title = f"{usecase_id} detection for {entity}"
  294. description = f"{usecase_id} matched for entity={entity}. evidence={evidence}"
  295. tags = list(event.get("tags") or [])
  296. tags.extend(["appendix_c", usecase_id.lower(), section])
  297. return {
  298. "source": source,
  299. "event_type": event_type,
  300. "event_id": event_id,
  301. "timestamp": timestamp,
  302. "severity": severity,
  303. "title": title,
  304. "description": description,
  305. "asset": asset,
  306. "network": network,
  307. "tags": sorted(set(tags)),
  308. "risk_context": {"appendix_c_usecase": usecase_id},
  309. "raw": event.get("raw") or {},
  310. "payload": payload,
  311. }
  312. @app.on_event("startup")
  313. async def startup() -> None:
  314. init_schema()
  315. repo.ensure_policy()
  316. app.state.wazuh_auto_sync_state = {
  317. "running": False,
  318. "last_status": None,
  319. "last_started_at": None,
  320. "last_finished_at": None,
  321. "last_error": None,
  322. "last_result": None,
  323. }
  324. app.state.log_loss_monitor_state = {
  325. "running": False,
  326. "last_status": None,
  327. "last_started_at": None,
  328. "last_finished_at": None,
  329. "last_error": None,
  330. "last_result": None,
  331. "last_ticket_ts": 0.0,
  332. }
  333. app.state.c_detection_state = {
  334. "last_status": None,
  335. "last_started_at": None,
  336. "last_finished_at": None,
  337. "last_error": None,
  338. "last_result": None,
  339. "last_ticket_ts_by_key": {},
  340. }
  341. app.state.systems_monitor_state = {
  342. "last_ok_at": {},
  343. }
  344. app.state.sim_runs = {}
  345. SIM_RUN_LOGS_DIR.mkdir(parents=True, exist_ok=True)
  346. if settings.wazuh_auto_sync_enabled:
  347. app.state.wazuh_auto_sync_task = asyncio.create_task(_wazuh_auto_sync_loop())
  348. logger.info(
  349. "wazuh auto-sync enabled interval=%ss limit=%s minutes=%s query=%s",
  350. settings.wazuh_auto_sync_interval_seconds,
  351. settings.wazuh_auto_sync_limit,
  352. settings.wazuh_auto_sync_minutes,
  353. settings.wazuh_auto_sync_query,
  354. )
  355. if settings.log_loss_monitor_enabled:
  356. app.state.log_loss_monitor_task = asyncio.create_task(_log_loss_monitor_loop())
  357. logger.info(
  358. "log-loss monitor enabled interval=%ss window=%sm create_iris_ticket=%s cooldown=%ss",
  359. settings.log_loss_monitor_interval_seconds,
  360. settings.log_loss_monitor_window_minutes,
  361. settings.log_loss_monitor_create_iris_ticket,
  362. settings.log_loss_monitor_ticket_cooldown_seconds,
  363. )
  364. @app.on_event("shutdown")
  365. async def shutdown() -> None:
  366. task = getattr(app.state, "wazuh_auto_sync_task", None)
  367. if task:
  368. task.cancel()
  369. try:
  370. await task
  371. except asyncio.CancelledError:
  372. pass
  373. ll_task = getattr(app.state, "log_loss_monitor_task", None)
  374. if ll_task:
  375. ll_task.cancel()
  376. try:
  377. await ll_task
  378. except asyncio.CancelledError:
  379. pass
  380. sim_runs = getattr(app.state, "sim_runs", {})
  381. for run in sim_runs.values():
  382. process = run.get("process")
  383. if process and process.poll() is None:
  384. process.terminate()
  385. @app.get(
  386. "/ui",
  387. summary="SOC Integrator UI",
  388. description="Serve the built-in Alpine.js operations console.",
  389. include_in_schema=False,
  390. )
  391. async def ui_index() -> FileResponse:
  392. if not UI_DIR.exists():
  393. raise HTTPException(status_code=404, detail="UI is not available in this build")
  394. return FileResponse(UI_DIR / "index.html")
  395. @app.get(
  396. "/health",
  397. response_model=ApiResponse,
  398. summary="Service health",
  399. description="Return soc-integrator service identity and configured upstream targets.",
  400. )
  401. async def health() -> ApiResponse:
  402. return ApiResponse(
  403. data={
  404. "service": settings.app_name,
  405. "env": settings.app_env,
  406. "targets": {
  407. "wazuh": settings.wazuh_base_url,
  408. "shuffle": settings.shuffle_base_url,
  409. "pagerduty": settings.pagerduty_base_url,
  410. "iris": settings.iris_base_url,
  411. },
  412. }
  413. )
  414. @app.post(
  415. "/ingest/wazuh-alert",
  416. response_model=ApiResponse,
  417. summary="Normalize Wazuh alert",
  418. description="Normalize a raw Wazuh alert payload into the internal ingest shape.",
  419. )
  420. async def ingest_wazuh_alert(payload: WazuhIngestRequest) -> ApiResponse:
  421. normalized = {
  422. "source": payload.source,
  423. "alert_id": payload.alert_id,
  424. "rule_id": payload.rule_id,
  425. "severity": payload.severity,
  426. "title": payload.title,
  427. "payload": payload.payload,
  428. }
  429. return ApiResponse(data={"normalized": normalized})
  430. @app.post(
  431. "/action/create-incident",
  432. response_model=ApiResponse,
  433. summary="Create PagerDuty incident",
  434. description="Create an incident in PagerDuty (stub or real integration) from request payload.",
  435. )
  436. async def create_incident(payload: ActionCreateIncidentRequest) -> ApiResponse:
  437. incident_payload = {
  438. "title": payload.title,
  439. "urgency": payload.severity,
  440. "incident_key": payload.dedupe_key,
  441. "body": payload.payload,
  442. "source": payload.source,
  443. }
  444. try:
  445. pd_result = await pagerduty_adapter.create_incident(incident_payload)
  446. except Exception as exc:
  447. raise HTTPException(status_code=502, detail=f"PagerDuty call failed: {exc}") from exc
  448. return ApiResponse(data={"pagerduty": pd_result})
  449. @app.post(
  450. "/action/trigger-shuffle",
  451. response_model=ApiResponse,
  452. summary="Trigger Shuffle workflow",
  453. description="Execute a Shuffle workflow by ID with execution_argument payload.",
  454. )
  455. async def trigger_shuffle(payload: TriggerShuffleRequest) -> ApiResponse:
  456. try:
  457. shuffle_result = await shuffle_adapter.trigger_workflow(
  458. workflow_id=payload.workflow_id,
  459. payload=payload.execution_argument,
  460. )
  461. except Exception as exc:
  462. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  463. return ApiResponse(data={"shuffle": shuffle_result})
  464. @app.get(
  465. "/shuffle/health",
  466. response_model=ApiResponse,
  467. summary="Shuffle health",
  468. description="Check Shuffle backend health endpoint through adapter connectivity.",
  469. )
  470. async def shuffle_health() -> ApiResponse:
  471. try:
  472. result = await shuffle_adapter.health()
  473. except Exception as exc:
  474. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  475. return ApiResponse(data={"shuffle": result})
  476. @app.get(
  477. "/shuffle/auth-test",
  478. response_model=ApiResponse,
  479. summary="Shuffle auth test",
  480. description="Validate Shuffle API key authentication.",
  481. )
  482. async def shuffle_auth_test() -> ApiResponse:
  483. try:
  484. result = await shuffle_adapter.auth_test()
  485. except Exception as exc:
  486. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  487. return ApiResponse(data={"shuffle": result})
  488. @app.post(
  489. "/shuffle/login",
  490. response_model=ApiResponse,
  491. summary="Shuffle login",
  492. description="Login to Shuffle with username/password and return auth response.",
  493. )
  494. async def shuffle_login(payload: ShuffleLoginRequest) -> ApiResponse:
  495. try:
  496. result = await shuffle_adapter.login(payload.username, payload.password)
  497. except Exception as exc:
  498. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  499. return ApiResponse(data={"shuffle": result})
  500. @app.post(
  501. "/shuffle/generate-apikey",
  502. response_model=ApiResponse,
  503. summary="Generate Shuffle API key",
  504. description="Login using provided or configured credentials and generate a Shuffle API key.",
  505. )
  506. async def shuffle_generate_apikey(payload: ShuffleLoginRequest | None = None) -> ApiResponse:
  507. username = payload.username if payload else settings.shuffle_username
  508. password = payload.password if payload else settings.shuffle_password
  509. if not username or not password:
  510. raise HTTPException(
  511. status_code=400,
  512. detail="Missing shuffle credentials. Provide username/password in body or set SHUFFLE_USERNAME and SHUFFLE_PASSWORD.",
  513. )
  514. try:
  515. result = await shuffle_adapter.generate_apikey_from_login(username, password)
  516. except Exception as exc:
  517. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  518. return ApiResponse(data={"shuffle": result})
  519. @app.get(
  520. "/shuffle/workflows",
  521. response_model=ApiResponse,
  522. summary="List Shuffle workflows",
  523. description="List available workflows in Shuffle using configured API key.",
  524. )
  525. async def shuffle_workflows() -> ApiResponse:
  526. try:
  527. result = await shuffle_adapter.list_workflows()
  528. except Exception as exc:
  529. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  530. return ApiResponse(data={"shuffle": result})
  531. @app.get(
  532. "/shuffle/workflows/{workflow_id}",
  533. response_model=ApiResponse,
  534. summary="Get Shuffle workflow",
  535. description="Get a single Shuffle workflow definition by workflow ID.",
  536. )
  537. async def shuffle_workflow(workflow_id: str) -> ApiResponse:
  538. try:
  539. result = await shuffle_adapter.get_workflow(workflow_id)
  540. except Exception as exc:
  541. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  542. return ApiResponse(data={"shuffle": result})
  543. @app.post(
  544. "/shuffle/workflows/{workflow_id}/execute",
  545. response_model=ApiResponse,
  546. summary="Execute Shuffle workflow",
  547. description="Execute a specific Shuffle workflow with custom JSON payload.",
  548. )
  549. async def shuffle_workflow_execute(
  550. workflow_id: str, payload: dict[str, object]
  551. ) -> ApiResponse:
  552. try:
  553. result = await shuffle_adapter.trigger_workflow(workflow_id=workflow_id, payload=payload)
  554. except Exception as exc:
  555. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  556. return ApiResponse(data={"shuffle": result})
  557. @app.get(
  558. "/shuffle/apps",
  559. response_model=ApiResponse,
  560. summary="List Shuffle apps",
  561. description="List installed/available Shuffle apps from app API.",
  562. )
  563. async def shuffle_apps() -> ApiResponse:
  564. try:
  565. result = await shuffle_adapter.list_apps()
  566. except Exception as exc:
  567. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  568. return ApiResponse(data={"shuffle": result})
  569. @app.post(
  570. "/shuffle/proxy",
  571. response_model=ApiResponse,
  572. summary="Proxy request to Shuffle API",
  573. description="Forward arbitrary HTTP request to Shuffle API path via configured credentials.",
  574. )
  575. async def shuffle_proxy(payload: ShuffleProxyRequest) -> ApiResponse:
  576. path = payload.path if payload.path.startswith("/api/") else f"/api/v1/{payload.path.lstrip('/')}"
  577. try:
  578. result = await shuffle_adapter.proxy(
  579. method=payload.method,
  580. path=path,
  581. params=payload.params,
  582. payload=payload.payload,
  583. )
  584. except Exception as exc:
  585. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  586. return ApiResponse(data={"shuffle": result})
  587. @app.post(
  588. "/action/create-iris-case",
  589. response_model=ApiResponse,
  590. summary="Create IRIS case (action)",
  591. description="Create an IRIS case using action payload fields and defaults.",
  592. )
  593. async def create_iris_case(payload: ActionCreateIncidentRequest) -> ApiResponse:
  594. # IRIS v2 expects case_name, case_description, case_customer, case_soc_id.
  595. case_payload = {
  596. "case_name": payload.title,
  597. "case_description": payload.payload.get("description", "Created by soc-integrator"),
  598. "case_customer": payload.payload.get("case_customer", settings.iris_default_customer_id),
  599. "case_soc_id": payload.payload.get("case_soc_id", settings.iris_default_soc_id),
  600. }
  601. try:
  602. iris_result = await iris_adapter.create_case(case_payload)
  603. except Exception as exc:
  604. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  605. return ApiResponse(data={"iris": iris_result})
  606. @app.post(
  607. "/iris/tickets",
  608. response_model=ApiResponse,
  609. summary="Create IRIS ticket",
  610. description="Create an IRIS case/ticket directly using ticket request model.",
  611. )
  612. async def iris_create_ticket(payload: IrisTicketCreateRequest) -> ApiResponse:
  613. case_payload = {
  614. "case_name": payload.title,
  615. "case_description": payload.description,
  616. "case_customer": payload.case_customer or settings.iris_default_customer_id,
  617. "case_soc_id": payload.case_soc_id or settings.iris_default_soc_id,
  618. }
  619. if payload.payload:
  620. case_payload.update(payload.payload)
  621. try:
  622. iris_result = await iris_adapter.create_case(case_payload)
  623. except Exception as exc:
  624. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  625. return ApiResponse(data={"iris": iris_result})
  626. @app.get(
  627. "/iris/tickets",
  628. response_model=ApiResponse,
  629. summary="List IRIS tickets",
  630. description="List IRIS cases with pagination, using v2 or legacy fallback endpoint.",
  631. )
  632. async def iris_list_tickets(limit: int = 50, offset: int = 0) -> ApiResponse:
  633. try:
  634. iris_result = await iris_adapter.list_cases(limit=limit, offset=offset)
  635. except Exception as exc:
  636. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  637. return ApiResponse(data={"iris": iris_result})
  638. def _build_vt_ioc_result(
  639. vt: dict[str, object],
  640. ioc_type: str,
  641. ioc_value: str,
  642. malicious_threshold: int,
  643. suspicious_threshold: int,
  644. ) -> tuple[dict[str, object], bool, str, float]:
  645. stats = (
  646. (((vt.get("data") or {}).get("attributes") or {}).get("last_analysis_stats"))
  647. if isinstance(vt, dict)
  648. else None
  649. ) or {}
  650. malicious = int(stats.get("malicious", 0) or 0)
  651. suspicious = int(stats.get("suspicious", 0) or 0)
  652. harmless = int(stats.get("harmless", 0) or 0)
  653. undetected = int(stats.get("undetected", 0) or 0)
  654. total = malicious + suspicious + harmless + undetected
  655. confidence = 0.0 if total == 0 else round(((malicious + (0.5 * suspicious)) / total), 4)
  656. matched = (malicious >= malicious_threshold) or (suspicious >= suspicious_threshold)
  657. severity = "low"
  658. if malicious >= 5 or suspicious >= 10:
  659. severity = "critical"
  660. elif malicious >= 2 or suspicious >= 5:
  661. severity = "high"
  662. elif malicious >= 1 or suspicious >= 1:
  663. severity = "medium"
  664. reason = (
  665. f"virustotal_stats malicious={malicious} suspicious={suspicious} "
  666. f"thresholds(malicious>={malicious_threshold}, suspicious>={suspicious_threshold})"
  667. )
  668. result: dict[str, object] = {
  669. "ioc_type": ioc_type,
  670. "ioc_value": ioc_value,
  671. "matched": matched,
  672. "severity": severity,
  673. "confidence": confidence,
  674. "reason": reason,
  675. "providers": {
  676. "virustotal": {
  677. "stats": stats,
  678. }
  679. },
  680. "raw": {
  681. "virustotal": vt,
  682. },
  683. }
  684. return result, matched, severity, confidence
  685. def _build_abuseipdb_ioc_result(
  686. abuse: dict[str, object],
  687. ioc_value: str,
  688. confidence_threshold: int = 50,
  689. ) -> tuple[dict[str, object], bool, str, float]:
  690. data = ((abuse.get("data") if isinstance(abuse, dict) else None) or {}) if isinstance(abuse, dict) else {}
  691. score = int(data.get("abuseConfidenceScore", 0) or 0)
  692. total_reports = int(data.get("totalReports", 0) or 0)
  693. matched = score >= confidence_threshold
  694. severity = "low"
  695. if score >= 90:
  696. severity = "critical"
  697. elif score >= 70:
  698. severity = "high"
  699. elif score >= 30:
  700. severity = "medium"
  701. confidence = round(score / 100.0, 4)
  702. reason = f"abuseipdb score={score} totalReports={total_reports} threshold>={confidence_threshold}"
  703. result: dict[str, object] = {
  704. "ioc_type": "ip",
  705. "ioc_value": ioc_value,
  706. "matched": matched,
  707. "severity": severity,
  708. "confidence": confidence,
  709. "reason": reason,
  710. "providers": {"abuseipdb": {"score": score, "totalReports": total_reports, "raw": abuse}},
  711. }
  712. return result, matched, severity, confidence
  713. def _extract_first_array(payload: object) -> list[object]:
  714. if isinstance(payload, list):
  715. return payload
  716. if not isinstance(payload, dict):
  717. return []
  718. preferred_keys = [
  719. "items",
  720. "results",
  721. "workflows",
  722. "apps",
  723. "affected_items",
  724. "data",
  725. ]
  726. for key in preferred_keys:
  727. value = payload.get(key)
  728. if isinstance(value, list):
  729. return value
  730. for value in payload.values():
  731. extracted = _extract_first_array(value)
  732. if extracted:
  733. return extracted
  734. return []
  735. SIM_SCRIPT_MAP: dict[str, str] = {
  736. "fortigate": "send-wazuh-fortigate-test-events.sh",
  737. "endpoint": "send-wazuh-endpoint-agent-test-events.sh",
  738. "cisco": "send-wazuh-cisco-test-events.sh",
  739. "proposal_required": "send-wazuh-proposal-required-events.sh",
  740. "proposal_appendix_b": "send-wazuh-proposal-appendix-b-events.sh",
  741. "proposal_appendix_c": "send-wazuh-proposal-appendix-c-events.sh",
  742. "wazuh_test": "send-wazuh-test-events.sh",
  743. }
  744. def _build_sim_command(payload: SimLogRunRequest) -> list[str]:
  745. script_name = SIM_SCRIPT_MAP[payload.script]
  746. script_path = SIM_SCRIPTS_DIR / script_name
  747. count = max(1, int(payload.count))
  748. delay = max(0.0, float(payload.delay_seconds))
  749. if payload.script == "endpoint":
  750. cmd = [
  751. "/bin/bash",
  752. str(script_path),
  753. payload.target or "all",
  754. payload.scenario or "all",
  755. str(count),
  756. str(delay),
  757. ]
  758. else:
  759. cmd = [
  760. "/bin/bash",
  761. str(script_path),
  762. payload.target or "all",
  763. str(count),
  764. str(delay),
  765. ]
  766. if payload.forever:
  767. cmd.append("--forever")
  768. return cmd
  769. def _serialize_sim_run(run_id: str, run: dict[str, object]) -> dict[str, object]:
  770. process = run.get("process")
  771. poll_code = process.poll() if process else None
  772. return_code = run.get("return_code")
  773. if poll_code is not None and return_code is None:
  774. run["return_code"] = poll_code
  775. return_code = poll_code
  776. return {
  777. "run_id": run_id,
  778. "script": run.get("script"),
  779. "target": run.get("target"),
  780. "scenario": run.get("scenario"),
  781. "count": run.get("count"),
  782. "delay_seconds": run.get("delay_seconds"),
  783. "forever": run.get("forever"),
  784. "pid": run.get("pid"),
  785. "cmd": run.get("cmd"),
  786. "started_at": run.get("started_at"),
  787. "stopped_at": run.get("stopped_at"),
  788. "running": bool(process and process.poll() is None),
  789. "return_code": return_code,
  790. "log_file": run.get("log_file"),
  791. }
  792. def _tail_log_lines(path: Path, limit: int = 200) -> list[str]:
  793. line_limit = max(1, min(int(limit), 1000))
  794. lines: deque[str] = deque(maxlen=line_limit)
  795. try:
  796. with path.open("r", encoding="utf-8", errors="replace") as handle:
  797. for line in handle:
  798. lines.append(line.rstrip("\n"))
  799. except FileNotFoundError:
  800. return []
  801. return list(lines)
  802. def _safe_query_token(value: object) -> str | None:
  803. text = str(value or "").strip()
  804. if not text:
  805. return None
  806. if not re.fullmatch(r"[A-Za-z0-9_.:-]+", text):
  807. return None
  808. return text
  809. def _parse_iso_datetime(value: object) -> datetime | None:
  810. text = str(value or "").strip()
  811. if not text:
  812. return None
  813. if text.endswith("Z"):
  814. text = text[:-1] + "+00:00"
  815. try:
  816. parsed = datetime.fromisoformat(text)
  817. except ValueError:
  818. return None
  819. if parsed.tzinfo is None:
  820. parsed = parsed.replace(tzinfo=timezone.utc)
  821. return parsed.astimezone(timezone.utc)
  822. def _sim_wazuh_query_clauses(run: dict[str, object]) -> list[str]:
  823. script = str(run.get("script") or "").strip().lower()
  824. target = str(run.get("target") or "all").strip()
  825. scenario = str(run.get("scenario") or "all").strip().lower()
  826. target_token = _safe_query_token(target)
  827. _ = scenario
  828. clauses: list[str] = ["(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)"]
  829. if script == "fortigate":
  830. clauses.append(
  831. "(full_log:*fortigate* OR full_log:*FGT80F* OR full_log:*FGT60F* OR full_log:*FGT40F* "
  832. "OR full_log:*FGT501E* OR data.vendor:fortinet OR data.product:fortigate OR data.source:fortigate)"
  833. )
  834. if target_token and target_token.lower() != "all":
  835. clauses.append(f"(full_log:*{target_token}* OR data.model:{target_token})")
  836. elif script == "endpoint":
  837. if target_token and target_token.lower() != "all":
  838. lowered = target_token.lower()
  839. if lowered in {"windows", "win"}:
  840. clauses.append(
  841. "(full_log:*source=windows* OR full_log:*source=windows_agent* "
  842. "OR data.source:windows OR data.source:windows_agent OR data.platform:windows)"
  843. )
  844. elif lowered in {"mac", "macos"}:
  845. clauses.append(
  846. "(full_log:*source=mac* OR full_log:*source=mac_agent* "
  847. "OR data.source:mac OR data.source:mac_agent OR data.platform:mac)"
  848. )
  849. elif lowered == "linux":
  850. clauses.append(
  851. "(full_log:*source=linux* OR full_log:*source=linux_agent* "
  852. "OR data.source:linux OR data.source:linux_agent OR data.platform:linux)"
  853. )
  854. else:
  855. clauses.append(f"full_log:*{target_token}*")
  856. else:
  857. clauses.append(
  858. "(full_log:*source=windows* OR full_log:*source=windows_agent* "
  859. "OR full_log:*source=mac* OR full_log:*source=mac_agent* "
  860. "OR full_log:*source=linux* OR full_log:*source=linux_agent* "
  861. "OR data.source:windows OR data.source:windows_agent "
  862. "OR data.source:mac OR data.source:mac_agent "
  863. "OR data.source:linux OR data.source:linux_agent)"
  864. )
  865. elif script == "cisco":
  866. clauses.append("(full_log:*cisco* OR data.vendor:cisco)")
  867. if target_token and target_token.lower() != "all":
  868. clauses.append(f"full_log:*{target_token}*")
  869. elif script in {"proposal_required", "proposal_appendix_b", "proposal_appendix_c", "wazuh_test"}:
  870. clauses.append("(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)")
  871. if target_token and target_token.lower() != "all":
  872. clauses.append(f"full_log:*{target_token}*")
  873. else:
  874. clauses.append("full_log:*soc_mvp_test=true*")
  875. return clauses
  876. def _extract_wazuh_hits(payload: object) -> list[dict[str, object]]:
  877. if not isinstance(payload, dict):
  878. return []
  879. hits_root = payload.get("hits")
  880. if not isinstance(hits_root, dict):
  881. return []
  882. hits = hits_root.get("hits")
  883. if not isinstance(hits, list):
  884. return []
  885. result: list[dict[str, object]] = []
  886. for hit in hits:
  887. if isinstance(hit, dict):
  888. result.append(hit)
  889. return result
  890. def _extract_wazuh_event_item(hit: dict[str, object], include_raw: bool) -> dict[str, object]:
  891. source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
  892. source = source if isinstance(source, dict) else {}
  893. agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
  894. agent = agent if isinstance(agent, dict) else {}
  895. decoder = source.get("decoder") if isinstance(source.get("decoder"), dict) else {}
  896. decoder = decoder if isinstance(decoder, dict) else {}
  897. data = source.get("data") if isinstance(source.get("data"), dict) else {}
  898. data = data if isinstance(data, dict) else {}
  899. rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
  900. rule = rule if isinstance(rule, dict) else {}
  901. item: dict[str, object] = {
  902. "@timestamp": source.get("@timestamp") or source.get("timestamp"),
  903. "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
  904. "agent_name": agent.get("name"),
  905. "agent_id": agent.get("id"),
  906. "decoder_name": decoder.get("name"),
  907. "source": data.get("source"),
  908. "event_type": data.get("event_type"),
  909. "severity": data.get("severity"),
  910. "rule_id": rule.get("id"),
  911. "rule_description": rule.get("description"),
  912. "full_log": source.get("full_log"),
  913. }
  914. if include_raw:
  915. item["raw"] = source
  916. return item
  917. def _extract_wazuh_rule_item(hit: dict[str, object], include_raw: bool) -> dict[str, object] | None:
  918. source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
  919. source = source if isinstance(source, dict) else {}
  920. rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
  921. rule = rule if isinstance(rule, dict) else {}
  922. rule_id = rule.get("id")
  923. if rule_id in {None, ""}:
  924. return None
  925. agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
  926. agent = agent if isinstance(agent, dict) else {}
  927. data = source.get("data") if isinstance(source.get("data"), dict) else {}
  928. data = data if isinstance(data, dict) else {}
  929. item: dict[str, object] = {
  930. "@timestamp": source.get("@timestamp") or source.get("timestamp"),
  931. "rule_id": rule_id,
  932. "rule_level": rule.get("level"),
  933. "rule_description": rule.get("description"),
  934. "rule_firedtimes": rule.get("firedtimes"),
  935. "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
  936. "agent_name": agent.get("name"),
  937. "full_log": source.get("full_log"),
  938. }
  939. if include_raw:
  940. item["raw"] = source
  941. return item
  942. @app.post(
  943. "/ioc/enrich",
  944. response_model=ApiResponse,
  945. summary="IOC enrich",
  946. description="Fetch enrichment data for IOC from selected providers without final verdict scoring.",
  947. )
  948. async def ioc_enrich(payload: IocEnrichRequest) -> ApiResponse:
  949. providers = [p.lower().strip() for p in payload.providers]
  950. result: dict[str, object] = {
  951. "ioc_type": payload.ioc_type,
  952. "ioc_value": payload.ioc_value,
  953. "providers_requested": providers,
  954. "providers": {},
  955. }
  956. if "virustotal" in providers:
  957. try:
  958. vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
  959. result["providers"] = {**(result.get("providers") or {}), "virustotal": vt}
  960. except Exception as exc:
  961. repo.add_ioc_trace(
  962. action="enrich",
  963. ioc_type=payload.ioc_type,
  964. ioc_value=payload.ioc_value,
  965. providers=providers,
  966. request_payload=payload.model_dump(mode="json"),
  967. response_payload={},
  968. error=str(exc),
  969. )
  970. raise HTTPException(status_code=502, detail=f"VirusTotal call failed: {exc}") from exc
  971. if "abuseipdb" in providers:
  972. if payload.ioc_type != "ip":
  973. result["providers"] = {
  974. **(result.get("providers") or {}),
  975. "abuseipdb": {"skipped": "AbuseIPDB currently supports ioc_type='ip' only"},
  976. }
  977. else:
  978. try:
  979. abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
  980. result["providers"] = {**(result.get("providers") or {}), "abuseipdb": abuse}
  981. except Exception as exc:
  982. repo.add_ioc_trace(
  983. action="enrich",
  984. ioc_type=payload.ioc_type,
  985. ioc_value=payload.ioc_value,
  986. providers=providers,
  987. request_payload=payload.model_dump(mode="json"),
  988. response_payload={},
  989. error=str(exc),
  990. )
  991. raise HTTPException(status_code=502, detail=f"AbuseIPDB call failed: {exc}") from exc
  992. repo.add_ioc_trace(
  993. action="enrich",
  994. ioc_type=payload.ioc_type,
  995. ioc_value=payload.ioc_value,
  996. providers=providers,
  997. request_payload=payload.model_dump(mode="json"),
  998. response_payload=result,
  999. )
  1000. return ApiResponse(data={"ioc": result})
  1001. @app.post(
  1002. "/ioc/evaluate",
  1003. response_model=ApiResponse,
  1004. summary="IOC evaluate",
  1005. description="Evaluate IOC against selected intelligence providers and return matched/severity/confidence.",
  1006. )
  1007. async def ioc_evaluate(payload: IocEvaluateRequest) -> ApiResponse:
  1008. providers = [p.lower().strip() for p in payload.providers]
  1009. supported = {"virustotal", "abuseipdb"}
  1010. requested = [p for p in providers if p in supported]
  1011. if not requested:
  1012. raise HTTPException(status_code=400, detail="No supported provider requested. Use ['virustotal'] or ['abuseipdb'].")
  1013. per_provider: dict[str, dict[str, object]] = {}
  1014. errors: dict[str, str] = {}
  1015. if "virustotal" in requested:
  1016. try:
  1017. vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
  1018. vt_result, _, _, _ = _build_vt_ioc_result(
  1019. vt=vt,
  1020. ioc_type=payload.ioc_type,
  1021. ioc_value=payload.ioc_value,
  1022. malicious_threshold=payload.malicious_threshold,
  1023. suspicious_threshold=payload.suspicious_threshold,
  1024. )
  1025. per_provider["virustotal"] = vt_result
  1026. except Exception as exc:
  1027. errors["virustotal"] = str(exc)
  1028. if "abuseipdb" in requested:
  1029. if payload.ioc_type != "ip":
  1030. errors["abuseipdb"] = "AbuseIPDB supports ioc_type='ip' only"
  1031. else:
  1032. try:
  1033. abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
  1034. abuse_result, _, _, _ = _build_abuseipdb_ioc_result(
  1035. abuse=abuse,
  1036. ioc_value=payload.ioc_value,
  1037. confidence_threshold=50,
  1038. )
  1039. per_provider["abuseipdb"] = abuse_result
  1040. except Exception as exc:
  1041. errors["abuseipdb"] = str(exc)
  1042. if not per_provider:
  1043. repo.add_ioc_trace(
  1044. action="evaluate",
  1045. ioc_type=payload.ioc_type,
  1046. ioc_value=payload.ioc_value,
  1047. providers=requested,
  1048. request_payload=payload.model_dump(mode="json"),
  1049. response_payload={},
  1050. error=str(errors),
  1051. )
  1052. raise HTTPException(status_code=502, detail=f"Provider evaluation failed: {errors}")
  1053. # aggregate decision (max confidence/severity, matched if any provider matched)
  1054. order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
  1055. matched = any(bool(r.get("matched")) for r in per_provider.values())
  1056. confidence = max(float(r.get("confidence", 0.0) or 0.0) for r in per_provider.values())
  1057. severity = max((str(r.get("severity", "low")) for r in per_provider.values()), key=lambda x: order.get(x, 1))
  1058. reason_parts = [f"{name}:{res.get('reason','')}" for name, res in per_provider.items()]
  1059. if errors:
  1060. reason_parts.append(f"errors={errors}")
  1061. ioc_result = {
  1062. "ioc_type": payload.ioc_type,
  1063. "ioc_value": payload.ioc_value,
  1064. "matched": matched,
  1065. "severity": severity,
  1066. "confidence": round(confidence, 4),
  1067. "reason": " | ".join(reason_parts),
  1068. "providers": per_provider,
  1069. }
  1070. repo.add_ioc_trace(
  1071. action="evaluate",
  1072. ioc_type=payload.ioc_type,
  1073. ioc_value=payload.ioc_value,
  1074. providers=providers,
  1075. request_payload=payload.model_dump(mode="json"),
  1076. response_payload=ioc_result,
  1077. matched=matched,
  1078. severity=severity,
  1079. confidence=float(ioc_result["confidence"]),
  1080. )
  1081. return ApiResponse(data={"ioc": ioc_result})
  1082. @app.post(
  1083. "/ioc/upload-file",
  1084. response_model=ApiResponse,
  1085. summary="Upload file to VirusTotal",
  1086. description="Upload a file sample to VirusTotal and return upload/analysis identifiers.",
  1087. )
  1088. async def ioc_upload_file(file: UploadFile = File(...)) -> ApiResponse:
  1089. content = await file.read()
  1090. if not content:
  1091. raise HTTPException(status_code=400, detail="Uploaded file is empty")
  1092. try:
  1093. vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
  1094. except Exception as exc:
  1095. repo.add_ioc_trace(
  1096. action="upload_file",
  1097. ioc_type="hash",
  1098. ioc_value=file.filename or "<unknown>",
  1099. providers=["virustotal"],
  1100. request_payload={"filename": file.filename, "size": len(content)},
  1101. response_payload={},
  1102. error=str(exc),
  1103. )
  1104. raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
  1105. repo.add_ioc_trace(
  1106. action="upload_file",
  1107. ioc_type="hash",
  1108. ioc_value=file.filename or "<unknown>",
  1109. providers=["virustotal"],
  1110. request_payload={"filename": file.filename, "size": len(content)},
  1111. response_payload=vt_upload if isinstance(vt_upload, dict) else {"raw": str(vt_upload)},
  1112. )
  1113. return ApiResponse(data={"virustotal": vt_upload})
  1114. @app.get(
  1115. "/ioc/analysis/{analysis_id}",
  1116. response_model=ApiResponse,
  1117. summary="Get VirusTotal analysis",
  1118. description="Fetch analysis status/details from VirusTotal by analysis ID.",
  1119. )
  1120. async def ioc_get_analysis(analysis_id: str) -> ApiResponse:
  1121. try:
  1122. vt_analysis = await virustotal_adapter.get_analysis(analysis_id)
  1123. except Exception as exc:
  1124. repo.add_ioc_trace(
  1125. action="analysis",
  1126. ioc_type="hash",
  1127. ioc_value=analysis_id,
  1128. providers=["virustotal"],
  1129. request_payload={"analysis_id": analysis_id},
  1130. response_payload={},
  1131. error=str(exc),
  1132. )
  1133. raise HTTPException(status_code=502, detail=f"VirusTotal analysis fetch failed: {exc}") from exc
  1134. repo.add_ioc_trace(
  1135. action="analysis",
  1136. ioc_type="hash",
  1137. ioc_value=analysis_id,
  1138. providers=["virustotal"],
  1139. request_payload={"analysis_id": analysis_id},
  1140. response_payload=vt_analysis if isinstance(vt_analysis, dict) else {"raw": str(vt_analysis)},
  1141. )
  1142. return ApiResponse(data={"virustotal": vt_analysis})
  1143. @app.post(
  1144. "/ioc/evaluate-file",
  1145. response_model=ApiResponse,
  1146. summary="Evaluate uploaded file IOC",
  1147. description="Upload a file, poll analysis completion, fetch final file report, and return IOC verdict.",
  1148. )
  1149. async def ioc_evaluate_file(
  1150. file: UploadFile = File(...),
  1151. malicious_threshold: int = 1,
  1152. suspicious_threshold: int = 3,
  1153. poll_timeout_seconds: int = 30,
  1154. poll_interval_seconds: int = 2,
  1155. ) -> ApiResponse:
  1156. content = await file.read()
  1157. if not content:
  1158. raise HTTPException(status_code=400, detail="Uploaded file is empty")
  1159. try:
  1160. vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
  1161. except Exception as exc:
  1162. repo.add_ioc_trace(
  1163. action="evaluate_file",
  1164. ioc_type="hash",
  1165. ioc_value=file.filename or "<unknown>",
  1166. providers=["virustotal"],
  1167. request_payload={"filename": file.filename, "size": len(content)},
  1168. response_payload={},
  1169. error=str(exc),
  1170. )
  1171. raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
  1172. analysis_id = (
  1173. (((vt_upload.get("data") or {}).get("id")) if isinstance(vt_upload, dict) else None)
  1174. or ""
  1175. )
  1176. if not analysis_id:
  1177. raise HTTPException(status_code=502, detail="VirusTotal upload response missing analysis ID")
  1178. timeout = max(1, poll_timeout_seconds)
  1179. interval = max(1, poll_interval_seconds)
  1180. elapsed = 0
  1181. analysis: dict[str, object] = {}
  1182. while elapsed <= timeout:
  1183. analysis = await virustotal_adapter.get_analysis(analysis_id)
  1184. status = (
  1185. (((analysis.get("data") or {}).get("attributes") or {}).get("status"))
  1186. if isinstance(analysis, dict)
  1187. else None
  1188. )
  1189. if status == "completed":
  1190. break
  1191. await asyncio.sleep(interval)
  1192. elapsed += interval
  1193. sha256 = (
  1194. (((analysis.get("meta") or {}).get("file_info") or {}).get("sha256"))
  1195. if isinstance(analysis, dict)
  1196. else None
  1197. )
  1198. if not sha256:
  1199. raise HTTPException(status_code=502, detail="VirusTotal analysis did not return file hash yet")
  1200. try:
  1201. vt_file = await virustotal_adapter.enrich_ioc("hash", str(sha256))
  1202. except Exception as exc:
  1203. repo.add_ioc_trace(
  1204. action="evaluate_file",
  1205. ioc_type="hash",
  1206. ioc_value=str(sha256),
  1207. providers=["virustotal"],
  1208. request_payload={"filename": file.filename, "analysis_id": analysis_id},
  1209. response_payload={"upload": vt_upload, "analysis": analysis},
  1210. error=str(exc),
  1211. )
  1212. raise HTTPException(status_code=502, detail=f"VirusTotal report fetch failed: {exc}") from exc
  1213. ioc_result, matched, severity, confidence = _build_vt_ioc_result(
  1214. vt=vt_file,
  1215. ioc_type="hash",
  1216. ioc_value=str(sha256),
  1217. malicious_threshold=malicious_threshold,
  1218. suspicious_threshold=suspicious_threshold,
  1219. )
  1220. ioc_result["analysis_id"] = analysis_id
  1221. ioc_result["filename"] = file.filename
  1222. repo.add_ioc_trace(
  1223. action="evaluate_file",
  1224. ioc_type="hash",
  1225. ioc_value=str(sha256),
  1226. providers=["virustotal"],
  1227. request_payload={"filename": file.filename, "analysis_id": analysis_id},
  1228. response_payload={
  1229. "upload": vt_upload,
  1230. "analysis": analysis,
  1231. "ioc": ioc_result,
  1232. },
  1233. matched=matched,
  1234. severity=severity,
  1235. confidence=confidence,
  1236. )
  1237. return ApiResponse(data={"ioc": ioc_result, "analysis": analysis, "upload": vt_upload})
  1238. @app.get(
  1239. "/ioc/history",
  1240. response_model=ApiResponse,
  1241. summary="IOC trace history",
  1242. description="List recent IOC enrichment/evaluation trace records stored in database.",
  1243. )
  1244. async def ioc_history(limit: int = 50, offset: int = 0) -> ApiResponse:
  1245. return ApiResponse(data={"items": repo.list_ioc_trace(limit=limit, offset=offset)})
  1246. @app.get(
  1247. "/geoip/{ip}",
  1248. response_model=ApiResponse,
  1249. summary="GeoIP lookup",
  1250. description="Lookup geolocation for a public IP address using configured GeoIP provider.",
  1251. )
  1252. async def geoip_lookup(ip: str) -> ApiResponse:
  1253. result = await geoip_adapter.lookup(ip)
  1254. return ApiResponse(data={"geoip": result})
  1255. @app.get(
  1256. "/sync/wazuh-version",
  1257. response_model=ApiResponse,
  1258. summary="Wazuh version",
  1259. description="Get Wazuh API/manager version information through adapter.",
  1260. )
  1261. async def sync_wazuh_version() -> ApiResponse:
  1262. try:
  1263. wazuh_result = await wazuh_adapter.get_version()
  1264. except Exception as exc:
  1265. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1266. return ApiResponse(data={"wazuh": wazuh_result})
  1267. @app.get(
  1268. "/wazuh/auth-test",
  1269. response_model=ApiResponse,
  1270. summary="Wazuh auth test",
  1271. description="Validate Wazuh API authentication using configured credentials.",
  1272. )
  1273. async def wazuh_auth_test() -> ApiResponse:
  1274. try:
  1275. result = await wazuh_adapter.auth_test()
  1276. except Exception as exc:
  1277. raise HTTPException(status_code=502, detail=f"Wazuh auth failed: {exc}") from exc
  1278. return ApiResponse(data={"wazuh": result})
  1279. @app.get(
  1280. "/wazuh/manager-info",
  1281. response_model=ApiResponse,
  1282. summary="Wazuh manager info",
  1283. description="Return manager information from Wazuh API.",
  1284. )
  1285. async def wazuh_manager_info() -> ApiResponse:
  1286. try:
  1287. result = await wazuh_adapter.get_manager_info()
  1288. except Exception as exc:
  1289. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1290. return ApiResponse(data={"wazuh": result})
  1291. @app.get(
  1292. "/wazuh/agents",
  1293. response_model=ApiResponse,
  1294. summary="List Wazuh agents",
  1295. description="List registered Wazuh agents with pagination and optional field selection.",
  1296. )
  1297. async def wazuh_agents(
  1298. limit: int = 50,
  1299. offset: int = 0,
  1300. select: str | None = None,
  1301. ) -> ApiResponse:
  1302. try:
  1303. result = await wazuh_adapter.list_agents(limit=limit, offset=offset, select=select)
  1304. except Exception as exc:
  1305. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1306. return ApiResponse(data={"wazuh": result})
  1307. @app.get(
  1308. "/wazuh/alerts",
  1309. response_model=ApiResponse,
  1310. summary="List Wazuh alerts",
  1311. description="List alert-like entries from manager logs API for current Wazuh build.",
  1312. )
  1313. async def wazuh_alerts(
  1314. limit: int = 50,
  1315. offset: int = 0,
  1316. q: str | None = None,
  1317. sort: str | None = None,
  1318. ) -> ApiResponse:
  1319. try:
  1320. # In this Wazuh build, API alerts are exposed via manager logs.
  1321. result = await wazuh_adapter.list_manager_logs(
  1322. limit=limit, offset=offset, q=q, sort=sort
  1323. )
  1324. except Exception as exc:
  1325. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1326. return ApiResponse(data={"wazuh": result})
  1327. @app.get(
  1328. "/wazuh/manager-logs",
  1329. response_model=ApiResponse,
  1330. summary="List Wazuh manager logs",
  1331. description="Query manager logs endpoint with pagination and optional q/sort filters.",
  1332. )
  1333. async def wazuh_manager_logs(
  1334. limit: int = 50,
  1335. offset: int = 0,
  1336. q: str | None = None,
  1337. sort: str | None = None,
  1338. ) -> ApiResponse:
  1339. try:
  1340. result = await wazuh_adapter.list_manager_logs(
  1341. limit=limit, offset=offset, q=q, sort=sort
  1342. )
  1343. except Exception as exc:
  1344. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  1345. return ApiResponse(data={"wazuh": result})
  1346. @app.post(
  1347. "/wazuh/sync-to-mvp",
  1348. response_model=ApiResponse,
  1349. dependencies=[Depends(require_internal_api_key)],
  1350. summary="Sync Wazuh to MVP",
  1351. description="Fetch Wazuh alerts from indexer and pass them through MVP ingest/evaluation logic.",
  1352. )
  1353. async def wazuh_sync_to_mvp(
  1354. limit: int = 50,
  1355. minutes: int = 120,
  1356. q: str = "soc_mvp_test=true OR event_type:*",
  1357. ) -> ApiResponse:
  1358. try:
  1359. result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
  1360. except Exception as exc:
  1361. raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
  1362. return ApiResponse(data={"sync": result})
  1363. @app.get(
  1364. "/wazuh/auto-sync/status",
  1365. response_model=ApiResponse,
  1366. summary="Wazuh auto-sync status",
  1367. description="Show auto-sync enablement, settings, task runtime state, and last sync result.",
  1368. )
  1369. async def wazuh_auto_sync_status() -> ApiResponse:
  1370. state = getattr(app.state, "wazuh_auto_sync_state", {})
  1371. task = getattr(app.state, "wazuh_auto_sync_task", None)
  1372. return ApiResponse(
  1373. data={
  1374. "enabled": settings.wazuh_auto_sync_enabled,
  1375. "task_running": bool(task and not task.done()),
  1376. "settings": {
  1377. "interval_seconds": settings.wazuh_auto_sync_interval_seconds,
  1378. "limit": settings.wazuh_auto_sync_limit,
  1379. "minutes": settings.wazuh_auto_sync_minutes,
  1380. "query": settings.wazuh_auto_sync_query,
  1381. },
  1382. "state": state,
  1383. }
  1384. )
  1385. @app.get(
  1386. "/monitor/systems",
  1387. response_model=ApiResponse,
  1388. dependencies=[Depends(require_internal_api_key)],
  1389. summary="Systems monitor overview",
  1390. description="Unified monitoring snapshot for Wazuh, Shuffle, IRIS, and PagerDuty with pipeline KPIs and recent records.",
  1391. )
  1392. async def monitor_systems(
  1393. minutes: int = 60,
  1394. limit: int = 20,
  1395. include_raw: bool = False,
  1396. ) -> ApiResponse:
  1397. window_minutes = max(1, minutes)
  1398. row_limit = max(1, limit)
  1399. now = datetime.now(timezone.utc)
  1400. since = now - timedelta(minutes=window_minutes)
  1401. now_iso = now.isoformat()
  1402. dependencies = await mvp_service.dependency_health()
  1403. monitor_state = getattr(app.state, "systems_monitor_state", {"last_ok_at": {}})
  1404. last_ok_at_by_key = monitor_state.setdefault("last_ok_at", {})
  1405. # KPI counters from persisted database records in the selected lookback window.
  1406. alerts_ingested = repo.count_incident_events_since(since=since, source="wazuh")
  1407. detections_matched = repo.count_c_detection_events_since(since=since)
  1408. iris_tickets_created = repo.count_incidents_with_iris_since(since=since)
  1409. pagerduty_escalations_sent = repo.count_escalations_since(since=since, success=True)
  1410. pagerduty_escalations_failed = repo.count_escalations_since(since=since, success=False)
  1411. wazuh_recent: list[object] = []
  1412. wazuh_recent_error: str | None = None
  1413. try:
  1414. wazuh_resp = await wazuh_adapter.list_manager_logs(limit=row_limit, offset=0, q=None, sort=None)
  1415. wazuh_recent = _extract_first_array(wazuh_resp)[:row_limit]
  1416. except Exception as exc:
  1417. wazuh_recent_error = str(exc)
  1418. shuffle_recent: list[object] = []
  1419. shuffle_recent_error: str | None = None
  1420. try:
  1421. workflows_resp = await shuffle_adapter.list_workflows()
  1422. workflows = _extract_first_array(workflows_resp)
  1423. for item in workflows[:row_limit]:
  1424. if isinstance(item, dict):
  1425. shuffle_recent.append(
  1426. {
  1427. "id": item.get("id") or item.get("workflow_id"),
  1428. "name": item.get("name") or item.get("workflow", {}).get("name"),
  1429. "status": item.get("status"),
  1430. }
  1431. )
  1432. else:
  1433. shuffle_recent.append(item)
  1434. except Exception as exc:
  1435. shuffle_recent_error = str(exc)
  1436. iris_recent: list[object] = []
  1437. iris_recent_error: str | None = None
  1438. try:
  1439. iris_resp = await iris_adapter.list_cases(limit=row_limit, offset=0)
  1440. iris_recent = _extract_first_array(iris_resp)[:row_limit]
  1441. except Exception as exc:
  1442. iris_recent_error = str(exc)
  1443. pagerduty_recent = repo.list_recent_escalations(limit=row_limit)
  1444. def build_card(
  1445. label: str,
  1446. dependency_key: str,
  1447. recent: list[object],
  1448. kpis: dict[str, object],
  1449. extra_error: str | None = None,
  1450. ) -> dict[str, object]:
  1451. dep = dependencies.get(dependency_key, {})
  1452. dep_status = str(dep.get("status") or "down")
  1453. status = "ok" if dep_status == "up" else "down"
  1454. if dep_status == "up":
  1455. last_ok_at_by_key[label] = now_iso
  1456. error_parts: list[str] = []
  1457. if dep.get("error"):
  1458. error_parts.append(str(dep.get("error")))
  1459. if extra_error:
  1460. error_parts.append(extra_error)
  1461. if dep_status == "up" and extra_error:
  1462. status = "degraded"
  1463. card: dict[str, object] = {
  1464. "status": status,
  1465. "latency_ms": dep.get("latency_ms"),
  1466. "last_ok_at": last_ok_at_by_key.get(label),
  1467. "last_error": " | ".join(error_parts) if error_parts else None,
  1468. "kpis": kpis,
  1469. "recent": recent,
  1470. }
  1471. if include_raw:
  1472. card["raw"] = dep.get("details")
  1473. return card
  1474. cards = {
  1475. "wazuh": build_card(
  1476. label="wazuh",
  1477. dependency_key="wazuh",
  1478. recent=wazuh_recent,
  1479. extra_error=wazuh_recent_error,
  1480. kpis={
  1481. "alerts_ingested": alerts_ingested,
  1482. "recent_rows": len(wazuh_recent),
  1483. },
  1484. ),
  1485. "shuffle": build_card(
  1486. label="shuffle",
  1487. dependency_key="shuffle",
  1488. recent=shuffle_recent,
  1489. extra_error=shuffle_recent_error,
  1490. kpis={
  1491. "recent_workflows": len(shuffle_recent),
  1492. },
  1493. ),
  1494. "iris": build_card(
  1495. label="iris",
  1496. dependency_key="iris",
  1497. recent=iris_recent,
  1498. extra_error=iris_recent_error,
  1499. kpis={
  1500. "tickets_created": iris_tickets_created,
  1501. "recent_rows": len(iris_recent),
  1502. },
  1503. ),
  1504. "pagerduty": build_card(
  1505. label="pagerduty",
  1506. dependency_key="pagerduty_stub",
  1507. recent=pagerduty_recent,
  1508. kpis={
  1509. "escalations_sent": pagerduty_escalations_sent,
  1510. "escalations_failed": pagerduty_escalations_failed,
  1511. },
  1512. ),
  1513. }
  1514. app.state.systems_monitor_state = monitor_state
  1515. return ApiResponse(
  1516. data={
  1517. "generated_at": now_iso,
  1518. "window_minutes": window_minutes,
  1519. "cards": cards,
  1520. "pipeline": {
  1521. "alerts_ingested": alerts_ingested,
  1522. "detections_matched": detections_matched,
  1523. "iris_tickets_created": iris_tickets_created,
  1524. "pagerduty_escalations_sent": pagerduty_escalations_sent,
  1525. "pagerduty_escalations_failed": pagerduty_escalations_failed,
  1526. },
  1527. }
  1528. )
  1529. @app.get(
  1530. "/sim/logs/runs",
  1531. response_model=ApiResponse,
  1532. dependencies=[Depends(require_internal_api_key)],
  1533. summary="List simulator runs",
  1534. description="List active and recent simulator script runs started from soc-integrator.",
  1535. )
  1536. async def sim_logs_runs() -> ApiResponse:
  1537. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1538. items: list[dict[str, object]] = []
  1539. for run_id, run in sim_runs.items():
  1540. serialized = _serialize_sim_run(run_id, run)
  1541. if (not serialized["running"]) and not run.get("stopped_at"):
  1542. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1543. serialized["stopped_at"] = run["stopped_at"]
  1544. items.append(serialized)
  1545. items.sort(key=lambda x: str(x.get("started_at") or ""), reverse=True)
  1546. return ApiResponse(data={"items": items})
  1547. @app.post(
  1548. "/sim/logs/start",
  1549. response_model=ApiResponse,
  1550. dependencies=[Depends(require_internal_api_key)],
  1551. summary="Start simulator logs script",
  1552. description="Start a whitelisted simulator script in background and return run metadata.",
  1553. )
  1554. async def sim_logs_start(payload: SimLogRunRequest) -> ApiResponse:
  1555. script_name = SIM_SCRIPT_MAP[payload.script]
  1556. script_path = SIM_SCRIPTS_DIR / script_name
  1557. if not script_path.exists():
  1558. raise HTTPException(status_code=400, detail=f"Simulator script not found in container: {script_name}")
  1559. cmd = _build_sim_command(payload)
  1560. env = dict(os.environ)
  1561. env.setdefault("WAZUH_SYSLOG_HOST", "wazuh.manager")
  1562. env.setdefault("WAZUH_SYSLOG_PORT", "514")
  1563. run_id = str(uuid.uuid4())
  1564. log_file = SIM_RUN_LOGS_DIR / f"{run_id}.log"
  1565. log_handle = None
  1566. try:
  1567. log_handle = log_file.open("ab")
  1568. process = subprocess.Popen(
  1569. cmd,
  1570. cwd=str(SIM_SCRIPTS_DIR),
  1571. env=env,
  1572. stdout=log_handle,
  1573. stderr=subprocess.STDOUT,
  1574. start_new_session=True,
  1575. )
  1576. except Exception as exc:
  1577. if log_handle:
  1578. try:
  1579. log_handle.close()
  1580. except Exception:
  1581. pass
  1582. raise HTTPException(status_code=502, detail=f"Failed to start simulator: {exc}") from exc
  1583. finally:
  1584. if log_handle:
  1585. log_handle.close()
  1586. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1587. sim_runs[run_id] = {
  1588. "script": payload.script,
  1589. "target": payload.target,
  1590. "scenario": payload.scenario,
  1591. "count": payload.count,
  1592. "delay_seconds": payload.delay_seconds,
  1593. "forever": payload.forever,
  1594. "pid": process.pid,
  1595. "cmd": " ".join(shlex.quote(part) for part in cmd),
  1596. "started_at": datetime.now(timezone.utc).isoformat(),
  1597. "stopped_at": None,
  1598. "return_code": None,
  1599. "log_file": str(log_file),
  1600. "process": process,
  1601. }
  1602. app.state.sim_runs = sim_runs
  1603. return ApiResponse(data={"run": _serialize_sim_run(run_id, sim_runs[run_id])})
  1604. @app.post(
  1605. "/sim/logs/stop/{run_id}",
  1606. response_model=ApiResponse,
  1607. dependencies=[Depends(require_internal_api_key)],
  1608. summary="Stop simulator run",
  1609. description="Stop a running simulator script by run_id.",
  1610. )
  1611. async def sim_logs_stop(run_id: str) -> ApiResponse:
  1612. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1613. run = sim_runs.get(run_id)
  1614. if not run:
  1615. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1616. process = run.get("process")
  1617. if process and process.poll() is None:
  1618. try:
  1619. process.terminate()
  1620. process.wait(timeout=3)
  1621. except subprocess.TimeoutExpired:
  1622. process.kill()
  1623. except Exception as exc:
  1624. raise HTTPException(status_code=502, detail=f"Failed to stop run: {exc}") from exc
  1625. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1626. return ApiResponse(data={"run": _serialize_sim_run(run_id, run)})
  1627. @app.post(
  1628. "/sim/logs/stop-running",
  1629. response_model=ApiResponse,
  1630. dependencies=[Depends(require_internal_api_key)],
  1631. summary="Stop all running simulator runs",
  1632. description="Stop all currently running simulator scripts (including forever mode).",
  1633. )
  1634. async def sim_logs_stop_running() -> ApiResponse:
  1635. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1636. stopped: list[dict[str, object]] = []
  1637. already_stopped = 0
  1638. for run_id, run in sim_runs.items():
  1639. process = run.get("process")
  1640. if process and process.poll() is None:
  1641. try:
  1642. process.terminate()
  1643. process.wait(timeout=3)
  1644. except subprocess.TimeoutExpired:
  1645. process.kill()
  1646. except Exception as exc:
  1647. raise HTTPException(status_code=502, detail=f"Failed to stop run {run_id}: {exc}") from exc
  1648. run["stopped_at"] = datetime.now(timezone.utc).isoformat()
  1649. stopped.append(_serialize_sim_run(run_id, run))
  1650. else:
  1651. already_stopped += 1
  1652. return ApiResponse(
  1653. data={
  1654. "stopped_count": len(stopped),
  1655. "already_stopped_count": already_stopped,
  1656. "runs": stopped,
  1657. }
  1658. )
  1659. @app.get(
  1660. "/sim/logs/output/{run_id}",
  1661. response_model=ApiResponse,
  1662. dependencies=[Depends(require_internal_api_key)],
  1663. summary="Get simulator run output",
  1664. description="Return tailed output lines from simulator run log file.",
  1665. )
  1666. async def sim_logs_output(run_id: str, limit: int = 200) -> ApiResponse:
  1667. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1668. run = sim_runs.get(run_id)
  1669. if not run:
  1670. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1671. log_file_path = run.get("log_file")
  1672. if not log_file_path:
  1673. raise HTTPException(status_code=404, detail=f"No log file for run: {run_id}")
  1674. log_file = Path(str(log_file_path))
  1675. lines = _tail_log_lines(log_file, limit=limit)
  1676. process = run.get("process")
  1677. running = bool(process and process.poll() is None)
  1678. return ApiResponse(
  1679. data={
  1680. "run_id": run_id,
  1681. "running": running,
  1682. "line_count": len(lines),
  1683. "lines": lines,
  1684. "text": "\n".join(lines),
  1685. "log_file": str(log_file),
  1686. }
  1687. )
  1688. @app.get(
  1689. "/sim/logs/wazuh-latest/{run_id}",
  1690. response_model=ApiResponse,
  1691. dependencies=[Depends(require_internal_api_key)],
  1692. summary="Get latest Wazuh logs/rules for simulator run",
  1693. description="Return latest Wazuh event logs and matched rules correlated to a simulator run.",
  1694. )
  1695. async def sim_logs_wazuh_latest(
  1696. run_id: str,
  1697. limit: int = 50,
  1698. minutes: int = 15,
  1699. include_raw: bool = False,
  1700. ) -> ApiResponse:
  1701. sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
  1702. run = sim_runs.get(run_id)
  1703. if not run:
  1704. raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
  1705. requested_minutes = max(1, int(minutes))
  1706. # Keep query unfiltered and use a wide lookback to emulate Discover "latest records".
  1707. effective_minutes = max(1440, requested_minutes)
  1708. query_limit = max(1, min(int(limit), 200))
  1709. query_text = "*"
  1710. try:
  1711. raw = await wazuh_adapter.search_alerts(
  1712. query=query_text,
  1713. limit=query_limit,
  1714. minutes=effective_minutes,
  1715. )
  1716. except Exception as exc:
  1717. raise HTTPException(status_code=502, detail=f"Wazuh search failed: {exc}") from exc
  1718. hits = _extract_wazuh_hits(raw)
  1719. events = [_extract_wazuh_event_item(hit, include_raw=include_raw) for hit in hits]
  1720. rules: list[dict[str, object]] = []
  1721. for hit in hits:
  1722. rule_item = _extract_wazuh_rule_item(hit, include_raw=include_raw)
  1723. if rule_item:
  1724. rules.append(rule_item)
  1725. return ApiResponse(
  1726. data={
  1727. "run": _serialize_sim_run(run_id, run),
  1728. "query": {
  1729. "effective_minutes": effective_minutes,
  1730. "text": query_text,
  1731. "limit": query_limit,
  1732. },
  1733. "events": events,
  1734. "rules": rules,
  1735. "totals": {
  1736. "events": len(events),
  1737. "rules": len(rules),
  1738. },
  1739. }
  1740. )
  1741. @app.post(
  1742. "/monitor/log-loss/check",
  1743. response_model=ApiResponse,
  1744. dependencies=[Depends(require_internal_api_key)],
  1745. summary="Check log loss",
  1746. description="Check expected telemetry streams for missing logs in a configurable lookback window.",
  1747. )
  1748. async def monitor_log_loss_check(
  1749. payload: LogLossCheckRequest | None = None,
  1750. create_ticket: bool = False,
  1751. ) -> ApiResponse:
  1752. req = payload or LogLossCheckRequest()
  1753. result = await _execute_log_loss_check(req=req, create_ticket=create_ticket)
  1754. return ApiResponse(data=result)
  1755. @app.post(
  1756. "/monitor/c-detections/evaluate",
  1757. response_model=ApiResponse,
  1758. dependencies=[Depends(require_internal_api_key)],
  1759. summary="Evaluate Appendix C detections",
  1760. description="Evaluate C1-C3 detection rules on recent events, optionally creating incidents/tickets.",
  1761. )
  1762. async def monitor_c_detections_evaluate(payload: CDetectionEvaluateRequest) -> ApiResponse:
  1763. if not settings.c_detection_enabled:
  1764. raise HTTPException(status_code=400, detail="C detection is disabled by configuration")
  1765. started_at = datetime.now(timezone.utc).isoformat()
  1766. app.state.c_detection_state["last_started_at"] = started_at
  1767. try:
  1768. raw = await wazuh_adapter.search_alerts(
  1769. query=payload.query,
  1770. limit=max(1, payload.limit),
  1771. minutes=max(1, payload.minutes),
  1772. )
  1773. hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
  1774. normalized = [mvp_service.normalize_wazuh_hit(hit) for hit in hits]
  1775. evaluated = await c_detection_service.evaluate(normalized, selectors=payload.selectors)
  1776. records: list[dict[str, object]] = []
  1777. for match in evaluated.get("matches", []):
  1778. usecase_id = str(match.get("usecase_id") or "")
  1779. entity = str(match.get("entity") or "unknown")
  1780. severity = str(match.get("severity") or "medium")
  1781. evidence = dict(match.get("evidence") or {})
  1782. event_ref = {
  1783. "event_id": ((match.get("event") or {}).get("event_id")),
  1784. "timestamp": ((match.get("event") or {}).get("timestamp")),
  1785. "source": ((match.get("event") or {}).get("source")),
  1786. }
  1787. in_cooldown = repo.is_c_detection_in_cooldown(
  1788. usecase_id=usecase_id,
  1789. entity=entity,
  1790. cooldown_seconds=int(settings.c_detection_ticket_cooldown_seconds),
  1791. )
  1792. incident_key: str | None = None
  1793. event_row = repo.add_c_detection_event(
  1794. usecase_id=usecase_id,
  1795. entity=entity,
  1796. severity=severity,
  1797. evidence=evidence,
  1798. event_ref=event_ref,
  1799. incident_key=None,
  1800. )
  1801. if (not payload.dry_run) and settings.c_detection_create_iris_ticket and not in_cooldown:
  1802. incident_event = _c_match_to_incident_event(match)
  1803. ingest = await mvp_service.ingest_incident(incident_event)
  1804. incident_key = str(ingest.get("incident_key") or "") or None
  1805. repo.update_c_detection_incident(int(event_row["id"]), incident_key)
  1806. records.append(
  1807. {
  1808. "id": event_row["id"],
  1809. "usecase_id": usecase_id,
  1810. "entity": entity,
  1811. "severity": severity,
  1812. "incident_key": incident_key,
  1813. "cooldown_active": in_cooldown,
  1814. "evidence": evidence,
  1815. }
  1816. )
  1817. result = {
  1818. "query": payload.query,
  1819. "minutes": max(1, payload.minutes),
  1820. "selectors": payload.selectors,
  1821. "dry_run": payload.dry_run,
  1822. "summary": evaluated.get("summary", {}),
  1823. "matches": records,
  1824. "total_hits": len(hits),
  1825. }
  1826. app.state.c_detection_state["last_status"] = "ok"
  1827. app.state.c_detection_state["last_result"] = result
  1828. app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  1829. return ApiResponse(data=result)
  1830. except Exception as exc:
  1831. app.state.c_detection_state["last_status"] = "error"
  1832. app.state.c_detection_state["last_error"] = str(exc)
  1833. app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  1834. raise HTTPException(status_code=502, detail=f"C detection evaluation failed: {exc}") from exc
  1835. @app.get(
  1836. "/monitor/c-detections/history",
  1837. response_model=ApiResponse,
  1838. dependencies=[Depends(require_internal_api_key)],
  1839. summary="C detection history",
  1840. description="List persisted C1-C3 detection matches, including evidence and linked incident keys.",
  1841. )
  1842. async def monitor_c_detections_history(
  1843. limit: int = 50,
  1844. offset: int = 0,
  1845. usecase_id: str | None = None,
  1846. ) -> ApiResponse:
  1847. rows = repo.list_c_detection_events(limit=limit, offset=offset, usecase_id=usecase_id)
  1848. return ApiResponse(data={"items": rows, "limit": max(1, limit), "offset": max(0, offset), "usecase_id": usecase_id})
  1849. @app.get(
  1850. "/monitor/c-detections/state",
  1851. response_model=ApiResponse,
  1852. dependencies=[Depends(require_internal_api_key)],
  1853. summary="C detection state",
  1854. description="Return Appendix C detection settings and last evaluation runtime state.",
  1855. )
  1856. async def monitor_c_detections_state() -> ApiResponse:
  1857. return ApiResponse(
  1858. data={
  1859. "enabled": settings.c_detection_enabled,
  1860. "settings": {
  1861. "window_minutes": settings.c_detection_window_minutes,
  1862. "c1_max_travel_speed_kmph": settings.c1_max_travel_speed_kmph,
  1863. "c2_offhours_start_utc": settings.c2_offhours_start_utc,
  1864. "c2_offhours_end_utc": settings.c2_offhours_end_utc,
  1865. "c3_host_spread_threshold": settings.c3_host_spread_threshold,
  1866. "c3_scan_port_threshold": settings.c3_scan_port_threshold,
  1867. "create_iris_ticket": settings.c_detection_create_iris_ticket,
  1868. "ticket_cooldown_seconds": settings.c_detection_ticket_cooldown_seconds,
  1869. },
  1870. "state": getattr(app.state, "c_detection_state", {}),
  1871. }
  1872. )