| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082 |
- import asyncio
- import logging
- import os
- import re
- import shlex
- import subprocess
- import uuid
- from collections import deque
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- from fastapi import Depends, FastAPI, File, HTTPException, Request, UploadFile
- from fastapi.responses import FileResponse, Response
- from fastapi.staticfiles import StaticFiles
- from app.adapters.abuseipdb import AbuseIpdbAdapter
- from app.adapters.geoip import GeoIpAdapter
- from app.adapters.iris import IrisAdapter
- from app.adapters.pagerduty import PagerDutyAdapter
- from app.adapters.shuffle import ShuffleAdapter
- from app.adapters.virustotal import VirusTotalAdapter
- from app.adapters.wazuh import WazuhAdapter
- from app.config import settings
- from app.db import init_schema
- from app.models import (
- ActionCreateIncidentRequest,
- ApiResponse,
- CDetectionEvaluateRequest,
- IocEnrichRequest,
- IocEvaluateRequest,
- IrisTicketCreateRequest,
- LogLossCheckRequest,
- LogLossStreamCheck,
- SimLogRunRequest,
- ShuffleLoginRequest,
- ShuffleProxyRequest,
- TriggerShuffleRequest,
- WazuhIngestRequest,
- )
- from app.repositories.mvp_repo import MvpRepository
- from app.routes.mvp import build_mvp_router
- from app.security import require_internal_api_key
- from app.services.mvp_service import MvpService
- from app.services.c_detection_service import CDetectionService
- app = FastAPI(title=settings.app_name, version="0.1.0")
- logger = logging.getLogger(__name__)
- UI_DIR = Path(__file__).resolve().parent / "ui"
- UI_ASSETS_DIR = UI_DIR / "assets"
- SIM_SCRIPTS_DIR = Path("/app/scripts")
- SIM_RUN_LOGS_DIR = Path("/tmp/soc-integrator-sim-logs")
- wazuh_adapter = WazuhAdapter(
- base_url=settings.wazuh_base_url,
- username=settings.wazuh_username,
- password=settings.wazuh_password,
- indexer_url=settings.wazuh_indexer_url,
- indexer_username=settings.wazuh_indexer_username,
- indexer_password=settings.wazuh_indexer_password,
- )
- shuffle_adapter = ShuffleAdapter(
- base_url=settings.shuffle_base_url,
- api_key=settings.shuffle_api_key,
- )
- pagerduty_adapter = PagerDutyAdapter(
- base_url=settings.pagerduty_base_url,
- api_key=settings.pagerduty_api_key,
- )
- iris_adapter = IrisAdapter(
- base_url=settings.iris_base_url,
- api_key=settings.iris_api_key,
- )
- virustotal_adapter = VirusTotalAdapter(
- base_url=settings.virustotal_base_url,
- api_key=settings.virustotal_api_key,
- )
- abuseipdb_adapter = AbuseIpdbAdapter(
- base_url=settings.abuseipdb_base_url,
- api_key=settings.abuseipdb_api_key,
- )
- geoip_adapter = GeoIpAdapter(
- provider=settings.geoip_provider,
- cache_ttl_seconds=settings.geoip_cache_ttl_seconds,
- )
- repo = MvpRepository()
- mvp_service = MvpService(
- repo=repo,
- wazuh_adapter=wazuh_adapter,
- shuffle_adapter=shuffle_adapter,
- iris_adapter=iris_adapter,
- pagerduty_adapter=pagerduty_adapter,
- )
- c_detection_service = CDetectionService(
- repo=repo,
- geoip_adapter=geoip_adapter,
- )
- app.include_router(build_mvp_router(mvp_service, require_internal_api_key))
- app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets")
- @app.middleware("http")
- async def ui_no_cache_middleware(request: Request, call_next):
- response: Response = await call_next(request)
- if request.url.path == "/ui" or request.url.path.startswith("/ui/assets/"):
- response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
- response.headers["Pragma"] = "no-cache"
- response.headers["Expires"] = "0"
- return response
- DEFAULT_LOG_LOSS_STREAMS: list[dict[str, object]] = [
- {
- "name": "fortigate",
- "query": "full_log:fortigate OR full_log:FGT80F OR full_log:FGT60F OR full_log:FGT40F OR full_log:FGT501E",
- "min_count": 1,
- },
- {
- "name": "windows_agent",
- "query": "full_log:windows_agent OR full_log:windows",
- "min_count": 1,
- },
- {
- "name": "vmware",
- "query": "full_log:vmware",
- "min_count": 1,
- },
- {
- "name": "log_monitor",
- "query": "full_log:log_monitor OR rule.id:100411",
- "min_count": 1,
- },
- ]
- async def _execute_log_loss_check(
- req: LogLossCheckRequest,
- create_ticket: bool,
- ) -> dict[str, object]:
- minutes = max(1, int(req.minutes))
- streams = req.streams or [LogLossStreamCheck(**item) for item in DEFAULT_LOG_LOSS_STREAMS]
- items: list[dict[str, object]] = []
- loss_count = 0
- error_count = 0
- loss_stream_names: list[str] = []
- for stream in streams:
- min_count = max(0, int(stream.min_count))
- try:
- observed = await wazuh_adapter.count_alerts(query=stream.query, minutes=minutes)
- is_loss = observed < min_count
- if is_loss:
- loss_count += 1
- loss_stream_names.append(stream.name)
- items.append(
- {
- "name": stream.name,
- "query": stream.query,
- "minutes": minutes,
- "min_count": min_count,
- "observed_count": observed,
- "status": "loss" if is_loss else "ok",
- }
- )
- except Exception as exc:
- error_count += 1
- items.append(
- {
- "name": stream.name,
- "query": stream.query,
- "minutes": minutes,
- "min_count": min_count,
- "observed_count": None,
- "status": "error",
- "error": str(exc),
- }
- )
- summary = {
- "total_streams": len(items),
- "loss_streams": loss_count,
- "error_streams": error_count,
- "all_ok": loss_count == 0 and error_count == 0,
- }
- ticket_data: dict[str, object] | None = None
- if create_ticket and loss_count > 0:
- cooldown = max(0, int(settings.log_loss_monitor_ticket_cooldown_seconds))
- now_ts = datetime.now(timezone.utc).timestamp()
- state = app.state.log_loss_monitor_state
- last_ticket_ts = float(state.get("last_ticket_ts", 0.0) or 0.0)
- in_cooldown = cooldown > 0 and (now_ts - last_ticket_ts) < cooldown
- if not in_cooldown:
- title = f"Log loss detected ({loss_count} stream(s))"
- description = (
- f"Log-loss monitor detected missing telemetry in the last {minutes} minute(s). "
- f"Affected streams: {', '.join(loss_stream_names)}."
- )
- case_payload = {
- "case_name": title,
- "case_description": description,
- "case_customer": settings.iris_default_customer_id,
- "case_soc_id": settings.iris_default_soc_id,
- }
- try:
- iris_result = await iris_adapter.create_case(case_payload)
- state["last_ticket_ts"] = now_ts
- ticket_data = {"created": True, "iris": iris_result}
- except Exception as exc:
- ticket_data = {"created": False, "error": str(exc)}
- else:
- ticket_data = {
- "created": False,
- "skipped": "cooldown_active",
- "cooldown_seconds": cooldown,
- "last_ticket_ts": last_ticket_ts,
- }
- result: dict[str, object] = {
- "checked_at": datetime.now(timezone.utc).isoformat(),
- "minutes": minutes,
- "summary": summary,
- "streams": items,
- }
- if ticket_data is not None:
- result["ticket"] = ticket_data
- return result
- async def _log_loss_monitor_loop() -> None:
- interval = max(5, int(settings.log_loss_monitor_interval_seconds))
- while True:
- started_at = datetime.now(timezone.utc).isoformat()
- try:
- app.state.log_loss_monitor_state["running"] = True
- app.state.log_loss_monitor_state["last_started_at"] = started_at
- req = LogLossCheckRequest(minutes=max(1, int(settings.log_loss_monitor_window_minutes)))
- result = await _execute_log_loss_check(
- req=req,
- create_ticket=bool(settings.log_loss_monitor_create_iris_ticket),
- )
- app.state.log_loss_monitor_state["last_status"] = "ok"
- app.state.log_loss_monitor_state["last_result"] = result
- app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
- logger.info(
- "log-loss monitor checked=%s loss=%s errors=%s",
- result.get("summary", {}).get("total_streams", 0),
- result.get("summary", {}).get("loss_streams", 0),
- result.get("summary", {}).get("error_streams", 0),
- )
- except Exception as exc:
- app.state.log_loss_monitor_state["last_status"] = "error"
- app.state.log_loss_monitor_state["last_error"] = str(exc)
- app.state.log_loss_monitor_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
- logger.exception("log-loss monitor failed: %s", exc)
- finally:
- app.state.log_loss_monitor_state["running"] = False
- await asyncio.sleep(interval)
- async def _wazuh_auto_sync_loop() -> None:
- interval = max(5, int(settings.wazuh_auto_sync_interval_seconds))
- while True:
- started_at = datetime.now(timezone.utc).isoformat()
- try:
- app.state.wazuh_auto_sync_state["running"] = True
- app.state.wazuh_auto_sync_state["last_started_at"] = started_at
- result = await mvp_service.sync_wazuh_alerts(
- query=settings.wazuh_auto_sync_query,
- limit=settings.wazuh_auto_sync_limit,
- minutes=settings.wazuh_auto_sync_minutes,
- )
- app.state.wazuh_auto_sync_state["last_status"] = "ok"
- app.state.wazuh_auto_sync_state["last_result"] = result
- app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
- logger.info(
- "wazuh auto-sync processed=%s ingested=%s skipped=%s failed=%s ioc_evaluated=%s ioc_matched=%s ioc_rejected=%s",
- result.get("processed", 0),
- result.get("ingested", 0),
- result.get("skipped_existing", 0),
- result.get("failed", 0),
- result.get("ioc_evaluated", 0),
- result.get("ioc_matched", 0),
- result.get("ioc_rejected", 0),
- )
- except Exception as exc:
- app.state.wazuh_auto_sync_state["last_status"] = "error"
- app.state.wazuh_auto_sync_state["last_error"] = str(exc)
- app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
- logger.exception("wazuh auto-sync failed: %s", exc)
- finally:
- app.state.wazuh_auto_sync_state["running"] = False
- await asyncio.sleep(interval)
- def _c_match_to_incident_event(match: dict[str, object]) -> dict[str, object]:
- event = dict(match.get("event") or {})
- usecase_id = str(match.get("usecase_id") or "C-unknown")
- section = str(match.get("section") or "c")
- severity = str(match.get("severity") or "medium")
- entity = str(match.get("entity") or "unknown")
- evidence = dict(match.get("evidence") or {})
- source = str(event.get("source") or "wazuh")
- timestamp = str(event.get("timestamp") or datetime.now(timezone.utc).isoformat())
- event_id = str(event.get("event_id") or f"{usecase_id}-{int(datetime.now(timezone.utc).timestamp())}")
- payload = dict(event.get("payload") or {})
- asset = dict(event.get("asset") or {})
- network = dict(event.get("network") or {})
- event_type = "c2_credential_abuse"
- if section == "c1":
- event_type = "c1_impossible_travel"
- elif section == "c3":
- event_type = "c3_lateral_movement"
- title = f"{usecase_id} detection for {entity}"
- description = f"{usecase_id} matched for entity={entity}. evidence={evidence}"
- tags = list(event.get("tags") or [])
- tags.extend(["appendix_c", usecase_id.lower(), section])
- return {
- "source": source,
- "event_type": event_type,
- "event_id": event_id,
- "timestamp": timestamp,
- "severity": severity,
- "title": title,
- "description": description,
- "asset": asset,
- "network": network,
- "tags": sorted(set(tags)),
- "risk_context": {"appendix_c_usecase": usecase_id},
- "raw": event.get("raw") or {},
- "payload": payload,
- }
- @app.on_event("startup")
- async def startup() -> None:
- init_schema()
- repo.ensure_policy()
- app.state.wazuh_auto_sync_state = {
- "running": False,
- "last_status": None,
- "last_started_at": None,
- "last_finished_at": None,
- "last_error": None,
- "last_result": None,
- }
- app.state.log_loss_monitor_state = {
- "running": False,
- "last_status": None,
- "last_started_at": None,
- "last_finished_at": None,
- "last_error": None,
- "last_result": None,
- "last_ticket_ts": 0.0,
- }
- app.state.c_detection_state = {
- "last_status": None,
- "last_started_at": None,
- "last_finished_at": None,
- "last_error": None,
- "last_result": None,
- "last_ticket_ts_by_key": {},
- }
- app.state.systems_monitor_state = {
- "last_ok_at": {},
- }
- app.state.sim_runs = {}
- SIM_RUN_LOGS_DIR.mkdir(parents=True, exist_ok=True)
- if settings.wazuh_auto_sync_enabled:
- app.state.wazuh_auto_sync_task = asyncio.create_task(_wazuh_auto_sync_loop())
- logger.info(
- "wazuh auto-sync enabled interval=%ss limit=%s minutes=%s query=%s",
- settings.wazuh_auto_sync_interval_seconds,
- settings.wazuh_auto_sync_limit,
- settings.wazuh_auto_sync_minutes,
- settings.wazuh_auto_sync_query,
- )
- if settings.log_loss_monitor_enabled:
- app.state.log_loss_monitor_task = asyncio.create_task(_log_loss_monitor_loop())
- logger.info(
- "log-loss monitor enabled interval=%ss window=%sm create_iris_ticket=%s cooldown=%ss",
- settings.log_loss_monitor_interval_seconds,
- settings.log_loss_monitor_window_minutes,
- settings.log_loss_monitor_create_iris_ticket,
- settings.log_loss_monitor_ticket_cooldown_seconds,
- )
- @app.on_event("shutdown")
- async def shutdown() -> None:
- task = getattr(app.state, "wazuh_auto_sync_task", None)
- if task:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
- ll_task = getattr(app.state, "log_loss_monitor_task", None)
- if ll_task:
- ll_task.cancel()
- try:
- await ll_task
- except asyncio.CancelledError:
- pass
- sim_runs = getattr(app.state, "sim_runs", {})
- for run in sim_runs.values():
- process = run.get("process")
- if process and process.poll() is None:
- process.terminate()
- @app.get(
- "/ui",
- summary="SOC Integrator UI",
- description="Serve the built-in Alpine.js operations console.",
- include_in_schema=False,
- )
- async def ui_index() -> FileResponse:
- if not UI_DIR.exists():
- raise HTTPException(status_code=404, detail="UI is not available in this build")
- return FileResponse(UI_DIR / "index.html")
- @app.get(
- "/health",
- response_model=ApiResponse,
- summary="Service health",
- description="Return soc-integrator service identity and configured upstream targets.",
- )
- async def health() -> ApiResponse:
- return ApiResponse(
- data={
- "service": settings.app_name,
- "env": settings.app_env,
- "targets": {
- "wazuh": settings.wazuh_base_url,
- "shuffle": settings.shuffle_base_url,
- "pagerduty": settings.pagerduty_base_url,
- "iris": settings.iris_base_url,
- },
- }
- )
- @app.post(
- "/ingest/wazuh-alert",
- response_model=ApiResponse,
- summary="Normalize Wazuh alert",
- description="Normalize a raw Wazuh alert payload into the internal ingest shape.",
- )
- async def ingest_wazuh_alert(payload: WazuhIngestRequest) -> ApiResponse:
- normalized = {
- "source": payload.source,
- "alert_id": payload.alert_id,
- "rule_id": payload.rule_id,
- "severity": payload.severity,
- "title": payload.title,
- "payload": payload.payload,
- }
- return ApiResponse(data={"normalized": normalized})
- @app.post(
- "/action/create-incident",
- response_model=ApiResponse,
- summary="Create PagerDuty incident",
- description="Create an incident in PagerDuty (stub or real integration) from request payload.",
- )
- async def create_incident(payload: ActionCreateIncidentRequest) -> ApiResponse:
- incident_payload = {
- "title": payload.title,
- "urgency": payload.severity,
- "incident_key": payload.dedupe_key,
- "body": payload.payload,
- "source": payload.source,
- }
- try:
- pd_result = await pagerduty_adapter.create_incident(incident_payload)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"PagerDuty call failed: {exc}") from exc
- return ApiResponse(data={"pagerduty": pd_result})
- @app.post(
- "/action/trigger-shuffle",
- response_model=ApiResponse,
- summary="Trigger Shuffle workflow",
- description="Execute a Shuffle workflow by ID with execution_argument payload.",
- )
- async def trigger_shuffle(payload: TriggerShuffleRequest) -> ApiResponse:
- try:
- shuffle_result = await shuffle_adapter.trigger_workflow(
- workflow_id=payload.workflow_id,
- payload=payload.execution_argument,
- )
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": shuffle_result})
- @app.get(
- "/shuffle/health",
- response_model=ApiResponse,
- summary="Shuffle health",
- description="Check Shuffle backend health endpoint through adapter connectivity.",
- )
- async def shuffle_health() -> ApiResponse:
- try:
- result = await shuffle_adapter.health()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.get(
- "/shuffle/auth-test",
- response_model=ApiResponse,
- summary="Shuffle auth test",
- description="Validate Shuffle API key authentication.",
- )
- async def shuffle_auth_test() -> ApiResponse:
- try:
- result = await shuffle_adapter.auth_test()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.post(
- "/shuffle/login",
- response_model=ApiResponse,
- summary="Shuffle login",
- description="Login to Shuffle with username/password and return auth response.",
- )
- async def shuffle_login(payload: ShuffleLoginRequest) -> ApiResponse:
- try:
- result = await shuffle_adapter.login(payload.username, payload.password)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.post(
- "/shuffle/generate-apikey",
- response_model=ApiResponse,
- summary="Generate Shuffle API key",
- description="Login using provided or configured credentials and generate a Shuffle API key.",
- )
- async def shuffle_generate_apikey(payload: ShuffleLoginRequest | None = None) -> ApiResponse:
- username = payload.username if payload else settings.shuffle_username
- password = payload.password if payload else settings.shuffle_password
- if not username or not password:
- raise HTTPException(
- status_code=400,
- detail="Missing shuffle credentials. Provide username/password in body or set SHUFFLE_USERNAME and SHUFFLE_PASSWORD.",
- )
- try:
- result = await shuffle_adapter.generate_apikey_from_login(username, password)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.get(
- "/shuffle/workflows",
- response_model=ApiResponse,
- summary="List Shuffle workflows",
- description="List available workflows in Shuffle using configured API key.",
- )
- async def shuffle_workflows() -> ApiResponse:
- try:
- result = await shuffle_adapter.list_workflows()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.get(
- "/shuffle/workflows/{workflow_id}",
- response_model=ApiResponse,
- summary="Get Shuffle workflow",
- description="Get a single Shuffle workflow definition by workflow ID.",
- )
- async def shuffle_workflow(workflow_id: str) -> ApiResponse:
- try:
- result = await shuffle_adapter.get_workflow(workflow_id)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.post(
- "/shuffle/workflows/{workflow_id}/execute",
- response_model=ApiResponse,
- summary="Execute Shuffle workflow",
- description="Execute a specific Shuffle workflow with custom JSON payload.",
- )
- async def shuffle_workflow_execute(
- workflow_id: str, payload: dict[str, object]
- ) -> ApiResponse:
- try:
- result = await shuffle_adapter.trigger_workflow(workflow_id=workflow_id, payload=payload)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.get(
- "/shuffle/apps",
- response_model=ApiResponse,
- summary="List Shuffle apps",
- description="List installed/available Shuffle apps from app API.",
- )
- async def shuffle_apps() -> ApiResponse:
- try:
- result = await shuffle_adapter.list_apps()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.post(
- "/shuffle/proxy",
- response_model=ApiResponse,
- summary="Proxy request to Shuffle API",
- description="Forward arbitrary HTTP request to Shuffle API path via configured credentials.",
- )
- async def shuffle_proxy(payload: ShuffleProxyRequest) -> ApiResponse:
- path = payload.path if payload.path.startswith("/api/") else f"/api/v1/{payload.path.lstrip('/')}"
- try:
- result = await shuffle_adapter.proxy(
- method=payload.method,
- path=path,
- params=payload.params,
- payload=payload.payload,
- )
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
- return ApiResponse(data={"shuffle": result})
- @app.post(
- "/action/create-iris-case",
- response_model=ApiResponse,
- summary="Create IRIS case (action)",
- description="Create an IRIS case using action payload fields and defaults.",
- )
- async def create_iris_case(payload: ActionCreateIncidentRequest) -> ApiResponse:
- # IRIS v2 expects case_name, case_description, case_customer, case_soc_id.
- case_payload = {
- "case_name": payload.title,
- "case_description": payload.payload.get("description", "Created by soc-integrator"),
- "case_customer": payload.payload.get("case_customer", settings.iris_default_customer_id),
- "case_soc_id": payload.payload.get("case_soc_id", settings.iris_default_soc_id),
- }
- try:
- iris_result = await iris_adapter.create_case(case_payload)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
- return ApiResponse(data={"iris": iris_result})
- @app.post(
- "/iris/tickets",
- response_model=ApiResponse,
- summary="Create IRIS ticket",
- description="Create an IRIS case/ticket directly using ticket request model.",
- )
- async def iris_create_ticket(payload: IrisTicketCreateRequest) -> ApiResponse:
- case_payload = {
- "case_name": payload.title,
- "case_description": payload.description,
- "case_customer": payload.case_customer or settings.iris_default_customer_id,
- "case_soc_id": payload.case_soc_id or settings.iris_default_soc_id,
- }
- if payload.payload:
- case_payload.update(payload.payload)
- try:
- iris_result = await iris_adapter.create_case(case_payload)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
- return ApiResponse(data={"iris": iris_result})
- @app.get(
- "/iris/tickets",
- response_model=ApiResponse,
- summary="List IRIS tickets",
- description="List IRIS cases with pagination, using v2 or legacy fallback endpoint.",
- )
- async def iris_list_tickets(limit: int = 50, offset: int = 0) -> ApiResponse:
- try:
- iris_result = await iris_adapter.list_cases(limit=limit, offset=offset)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
- return ApiResponse(data={"iris": iris_result})
- def _build_vt_ioc_result(
- vt: dict[str, object],
- ioc_type: str,
- ioc_value: str,
- malicious_threshold: int,
- suspicious_threshold: int,
- ) -> tuple[dict[str, object], bool, str, float]:
- stats = (
- (((vt.get("data") or {}).get("attributes") or {}).get("last_analysis_stats"))
- if isinstance(vt, dict)
- else None
- ) or {}
- malicious = int(stats.get("malicious", 0) or 0)
- suspicious = int(stats.get("suspicious", 0) or 0)
- harmless = int(stats.get("harmless", 0) or 0)
- undetected = int(stats.get("undetected", 0) or 0)
- total = malicious + suspicious + harmless + undetected
- confidence = 0.0 if total == 0 else round(((malicious + (0.5 * suspicious)) / total), 4)
- matched = (malicious >= malicious_threshold) or (suspicious >= suspicious_threshold)
- severity = "low"
- if malicious >= 5 or suspicious >= 10:
- severity = "critical"
- elif malicious >= 2 or suspicious >= 5:
- severity = "high"
- elif malicious >= 1 or suspicious >= 1:
- severity = "medium"
- reason = (
- f"virustotal_stats malicious={malicious} suspicious={suspicious} "
- f"thresholds(malicious>={malicious_threshold}, suspicious>={suspicious_threshold})"
- )
- result: dict[str, object] = {
- "ioc_type": ioc_type,
- "ioc_value": ioc_value,
- "matched": matched,
- "severity": severity,
- "confidence": confidence,
- "reason": reason,
- "providers": {
- "virustotal": {
- "stats": stats,
- }
- },
- "raw": {
- "virustotal": vt,
- },
- }
- return result, matched, severity, confidence
- def _build_abuseipdb_ioc_result(
- abuse: dict[str, object],
- ioc_value: str,
- confidence_threshold: int = 50,
- ) -> tuple[dict[str, object], bool, str, float]:
- data = ((abuse.get("data") if isinstance(abuse, dict) else None) or {}) if isinstance(abuse, dict) else {}
- score = int(data.get("abuseConfidenceScore", 0) or 0)
- total_reports = int(data.get("totalReports", 0) or 0)
- matched = score >= confidence_threshold
- severity = "low"
- if score >= 90:
- severity = "critical"
- elif score >= 70:
- severity = "high"
- elif score >= 30:
- severity = "medium"
- confidence = round(score / 100.0, 4)
- reason = f"abuseipdb score={score} totalReports={total_reports} threshold>={confidence_threshold}"
- result: dict[str, object] = {
- "ioc_type": "ip",
- "ioc_value": ioc_value,
- "matched": matched,
- "severity": severity,
- "confidence": confidence,
- "reason": reason,
- "providers": {"abuseipdb": {"score": score, "totalReports": total_reports, "raw": abuse}},
- }
- return result, matched, severity, confidence
- def _extract_first_array(payload: object) -> list[object]:
- if isinstance(payload, list):
- return payload
- if not isinstance(payload, dict):
- return []
- preferred_keys = [
- "items",
- "results",
- "workflows",
- "apps",
- "affected_items",
- "data",
- ]
- for key in preferred_keys:
- value = payload.get(key)
- if isinstance(value, list):
- return value
- for value in payload.values():
- extracted = _extract_first_array(value)
- if extracted:
- return extracted
- return []
- SIM_SCRIPT_MAP: dict[str, str] = {
- "fortigate": "send-wazuh-fortigate-test-events.sh",
- "endpoint": "send-wazuh-endpoint-agent-test-events.sh",
- "cisco": "send-wazuh-cisco-test-events.sh",
- "proposal_required": "send-wazuh-proposal-required-events.sh",
- "proposal_appendix_b": "send-wazuh-proposal-appendix-b-events.sh",
- "proposal_appendix_c": "send-wazuh-proposal-appendix-c-events.sh",
- "wazuh_test": "send-wazuh-test-events.sh",
- }
- def _build_sim_command(payload: SimLogRunRequest) -> list[str]:
- script_name = SIM_SCRIPT_MAP[payload.script]
- script_path = SIM_SCRIPTS_DIR / script_name
- count = max(1, int(payload.count))
- delay = max(0.0, float(payload.delay_seconds))
- if payload.script == "endpoint":
- cmd = [
- "/bin/bash",
- str(script_path),
- payload.target or "all",
- payload.scenario or "all",
- str(count),
- str(delay),
- ]
- else:
- cmd = [
- "/bin/bash",
- str(script_path),
- payload.target or "all",
- str(count),
- str(delay),
- ]
- if payload.forever:
- cmd.append("--forever")
- return cmd
- def _serialize_sim_run(run_id: str, run: dict[str, object]) -> dict[str, object]:
- process = run.get("process")
- poll_code = process.poll() if process else None
- return_code = run.get("return_code")
- if poll_code is not None and return_code is None:
- run["return_code"] = poll_code
- return_code = poll_code
- return {
- "run_id": run_id,
- "script": run.get("script"),
- "target": run.get("target"),
- "scenario": run.get("scenario"),
- "count": run.get("count"),
- "delay_seconds": run.get("delay_seconds"),
- "forever": run.get("forever"),
- "pid": run.get("pid"),
- "cmd": run.get("cmd"),
- "started_at": run.get("started_at"),
- "stopped_at": run.get("stopped_at"),
- "running": bool(process and process.poll() is None),
- "return_code": return_code,
- "log_file": run.get("log_file"),
- }
- def _tail_log_lines(path: Path, limit: int = 200) -> list[str]:
- line_limit = max(1, min(int(limit), 1000))
- lines: deque[str] = deque(maxlen=line_limit)
- try:
- with path.open("r", encoding="utf-8", errors="replace") as handle:
- for line in handle:
- lines.append(line.rstrip("\n"))
- except FileNotFoundError:
- return []
- return list(lines)
- def _safe_query_token(value: object) -> str | None:
- text = str(value or "").strip()
- if not text:
- return None
- if not re.fullmatch(r"[A-Za-z0-9_.:-]+", text):
- return None
- return text
- def _parse_iso_datetime(value: object) -> datetime | None:
- text = str(value or "").strip()
- if not text:
- return None
- if text.endswith("Z"):
- text = text[:-1] + "+00:00"
- try:
- parsed = datetime.fromisoformat(text)
- except ValueError:
- return None
- if parsed.tzinfo is None:
- parsed = parsed.replace(tzinfo=timezone.utc)
- return parsed.astimezone(timezone.utc)
- def _sim_wazuh_query_clauses(run: dict[str, object]) -> list[str]:
- script = str(run.get("script") or "").strip().lower()
- target = str(run.get("target") or "all").strip()
- scenario = str(run.get("scenario") or "all").strip().lower()
- target_token = _safe_query_token(target)
- _ = scenario
- clauses: list[str] = ["(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)"]
- if script == "fortigate":
- clauses.append(
- "(full_log:*fortigate* OR full_log:*FGT80F* OR full_log:*FGT60F* OR full_log:*FGT40F* "
- "OR full_log:*FGT501E* OR data.vendor:fortinet OR data.product:fortigate OR data.source:fortigate)"
- )
- if target_token and target_token.lower() != "all":
- clauses.append(f"(full_log:*{target_token}* OR data.model:{target_token})")
- elif script == "endpoint":
- if target_token and target_token.lower() != "all":
- lowered = target_token.lower()
- if lowered in {"windows", "win"}:
- clauses.append(
- "(full_log:*source=windows* OR full_log:*source=windows_agent* "
- "OR data.source:windows OR data.source:windows_agent OR data.platform:windows)"
- )
- elif lowered in {"mac", "macos"}:
- clauses.append(
- "(full_log:*source=mac* OR full_log:*source=mac_agent* "
- "OR data.source:mac OR data.source:mac_agent OR data.platform:mac)"
- )
- elif lowered == "linux":
- clauses.append(
- "(full_log:*source=linux* OR full_log:*source=linux_agent* "
- "OR data.source:linux OR data.source:linux_agent OR data.platform:linux)"
- )
- else:
- clauses.append(f"full_log:*{target_token}*")
- else:
- clauses.append(
- "(full_log:*source=windows* OR full_log:*source=windows_agent* "
- "OR full_log:*source=mac* OR full_log:*source=mac_agent* "
- "OR full_log:*source=linux* OR full_log:*source=linux_agent* "
- "OR data.source:windows OR data.source:windows_agent "
- "OR data.source:mac OR data.source:mac_agent "
- "OR data.source:linux OR data.source:linux_agent)"
- )
- elif script == "cisco":
- clauses.append("(full_log:*cisco* OR data.vendor:cisco)")
- if target_token and target_token.lower() != "all":
- clauses.append(f"full_log:*{target_token}*")
- elif script in {"proposal_required", "proposal_appendix_b", "proposal_appendix_c", "wazuh_test"}:
- clauses.append("(full_log:*soc_mvp_test=true* OR data.soc_mvp_test:true)")
- if target_token and target_token.lower() != "all":
- clauses.append(f"full_log:*{target_token}*")
- else:
- clauses.append("full_log:*soc_mvp_test=true*")
- return clauses
- def _extract_wazuh_hits(payload: object) -> list[dict[str, object]]:
- if not isinstance(payload, dict):
- return []
- hits_root = payload.get("hits")
- if not isinstance(hits_root, dict):
- return []
- hits = hits_root.get("hits")
- if not isinstance(hits, list):
- return []
- result: list[dict[str, object]] = []
- for hit in hits:
- if isinstance(hit, dict):
- result.append(hit)
- return result
- def _extract_wazuh_event_item(hit: dict[str, object], include_raw: bool) -> dict[str, object]:
- source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
- source = source if isinstance(source, dict) else {}
- agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
- agent = agent if isinstance(agent, dict) else {}
- decoder = source.get("decoder") if isinstance(source.get("decoder"), dict) else {}
- decoder = decoder if isinstance(decoder, dict) else {}
- data = source.get("data") if isinstance(source.get("data"), dict) else {}
- data = data if isinstance(data, dict) else {}
- rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
- rule = rule if isinstance(rule, dict) else {}
- item: dict[str, object] = {
- "@timestamp": source.get("@timestamp") or source.get("timestamp"),
- "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
- "agent_name": agent.get("name"),
- "agent_id": agent.get("id"),
- "decoder_name": decoder.get("name"),
- "source": data.get("source"),
- "event_type": data.get("event_type"),
- "severity": data.get("severity"),
- "rule_id": rule.get("id"),
- "rule_description": rule.get("description"),
- "full_log": source.get("full_log"),
- }
- if include_raw:
- item["raw"] = source
- return item
- def _extract_wazuh_rule_item(hit: dict[str, object], include_raw: bool) -> dict[str, object] | None:
- source = hit.get("_source") if isinstance(hit.get("_source"), dict) else {}
- source = source if isinstance(source, dict) else {}
- rule = source.get("rule") if isinstance(source.get("rule"), dict) else {}
- rule = rule if isinstance(rule, dict) else {}
- rule_id = rule.get("id")
- if rule_id in {None, ""}:
- return None
- agent = source.get("agent") if isinstance(source.get("agent"), dict) else {}
- agent = agent if isinstance(agent, dict) else {}
- data = source.get("data") if isinstance(source.get("data"), dict) else {}
- data = data if isinstance(data, dict) else {}
- item: dict[str, object] = {
- "@timestamp": source.get("@timestamp") or source.get("timestamp"),
- "rule_id": rule_id,
- "rule_level": rule.get("level"),
- "rule_description": rule.get("description"),
- "rule_firedtimes": rule.get("firedtimes"),
- "event_id": data.get("event_id") or source.get("id") or hit.get("_id"),
- "agent_name": agent.get("name"),
- "full_log": source.get("full_log"),
- }
- if include_raw:
- item["raw"] = source
- return item
- @app.post(
- "/ioc/enrich",
- response_model=ApiResponse,
- summary="IOC enrich",
- description="Fetch enrichment data for IOC from selected providers without final verdict scoring.",
- )
- async def ioc_enrich(payload: IocEnrichRequest) -> ApiResponse:
- providers = [p.lower().strip() for p in payload.providers]
- result: dict[str, object] = {
- "ioc_type": payload.ioc_type,
- "ioc_value": payload.ioc_value,
- "providers_requested": providers,
- "providers": {},
- }
- if "virustotal" in providers:
- try:
- vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
- result["providers"] = {**(result.get("providers") or {}), "virustotal": vt}
- except Exception as exc:
- repo.add_ioc_trace(
- action="enrich",
- ioc_type=payload.ioc_type,
- ioc_value=payload.ioc_value,
- providers=providers,
- request_payload=payload.model_dump(mode="json"),
- response_payload={},
- error=str(exc),
- )
- raise HTTPException(status_code=502, detail=f"VirusTotal call failed: {exc}") from exc
- if "abuseipdb" in providers:
- if payload.ioc_type != "ip":
- result["providers"] = {
- **(result.get("providers") or {}),
- "abuseipdb": {"skipped": "AbuseIPDB currently supports ioc_type='ip' only"},
- }
- else:
- try:
- abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
- result["providers"] = {**(result.get("providers") or {}), "abuseipdb": abuse}
- except Exception as exc:
- repo.add_ioc_trace(
- action="enrich",
- ioc_type=payload.ioc_type,
- ioc_value=payload.ioc_value,
- providers=providers,
- request_payload=payload.model_dump(mode="json"),
- response_payload={},
- error=str(exc),
- )
- raise HTTPException(status_code=502, detail=f"AbuseIPDB call failed: {exc}") from exc
- repo.add_ioc_trace(
- action="enrich",
- ioc_type=payload.ioc_type,
- ioc_value=payload.ioc_value,
- providers=providers,
- request_payload=payload.model_dump(mode="json"),
- response_payload=result,
- )
- return ApiResponse(data={"ioc": result})
- @app.post(
- "/ioc/evaluate",
- response_model=ApiResponse,
- summary="IOC evaluate",
- description="Evaluate IOC against selected intelligence providers and return matched/severity/confidence.",
- )
- async def ioc_evaluate(payload: IocEvaluateRequest) -> ApiResponse:
- providers = [p.lower().strip() for p in payload.providers]
- supported = {"virustotal", "abuseipdb"}
- requested = [p for p in providers if p in supported]
- if not requested:
- raise HTTPException(status_code=400, detail="No supported provider requested. Use ['virustotal'] or ['abuseipdb'].")
- per_provider: dict[str, dict[str, object]] = {}
- errors: dict[str, str] = {}
- if "virustotal" in requested:
- try:
- vt = await virustotal_adapter.enrich_ioc(payload.ioc_type, payload.ioc_value)
- vt_result, _, _, _ = _build_vt_ioc_result(
- vt=vt,
- ioc_type=payload.ioc_type,
- ioc_value=payload.ioc_value,
- malicious_threshold=payload.malicious_threshold,
- suspicious_threshold=payload.suspicious_threshold,
- )
- per_provider["virustotal"] = vt_result
- except Exception as exc:
- errors["virustotal"] = str(exc)
- if "abuseipdb" in requested:
- if payload.ioc_type != "ip":
- errors["abuseipdb"] = "AbuseIPDB supports ioc_type='ip' only"
- else:
- try:
- abuse = await abuseipdb_adapter.check_ip(payload.ioc_value)
- abuse_result, _, _, _ = _build_abuseipdb_ioc_result(
- abuse=abuse,
- ioc_value=payload.ioc_value,
- confidence_threshold=50,
- )
- per_provider["abuseipdb"] = abuse_result
- except Exception as exc:
- errors["abuseipdb"] = str(exc)
- if not per_provider:
- repo.add_ioc_trace(
- action="evaluate",
- ioc_type=payload.ioc_type,
- ioc_value=payload.ioc_value,
- providers=requested,
- request_payload=payload.model_dump(mode="json"),
- response_payload={},
- error=str(errors),
- )
- raise HTTPException(status_code=502, detail=f"Provider evaluation failed: {errors}")
- # aggregate decision (max confidence/severity, matched if any provider matched)
- order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
- matched = any(bool(r.get("matched")) for r in per_provider.values())
- confidence = max(float(r.get("confidence", 0.0) or 0.0) for r in per_provider.values())
- severity = max((str(r.get("severity", "low")) for r in per_provider.values()), key=lambda x: order.get(x, 1))
- reason_parts = [f"{name}:{res.get('reason','')}" for name, res in per_provider.items()]
- if errors:
- reason_parts.append(f"errors={errors}")
- ioc_result = {
- "ioc_type": payload.ioc_type,
- "ioc_value": payload.ioc_value,
- "matched": matched,
- "severity": severity,
- "confidence": round(confidence, 4),
- "reason": " | ".join(reason_parts),
- "providers": per_provider,
- }
- repo.add_ioc_trace(
- action="evaluate",
- ioc_type=payload.ioc_type,
- ioc_value=payload.ioc_value,
- providers=providers,
- request_payload=payload.model_dump(mode="json"),
- response_payload=ioc_result,
- matched=matched,
- severity=severity,
- confidence=float(ioc_result["confidence"]),
- )
- return ApiResponse(data={"ioc": ioc_result})
- @app.post(
- "/ioc/upload-file",
- response_model=ApiResponse,
- summary="Upload file to VirusTotal",
- description="Upload a file sample to VirusTotal and return upload/analysis identifiers.",
- )
- async def ioc_upload_file(file: UploadFile = File(...)) -> ApiResponse:
- content = await file.read()
- if not content:
- raise HTTPException(status_code=400, detail="Uploaded file is empty")
- try:
- vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
- except Exception as exc:
- repo.add_ioc_trace(
- action="upload_file",
- ioc_type="hash",
- ioc_value=file.filename or "<unknown>",
- providers=["virustotal"],
- request_payload={"filename": file.filename, "size": len(content)},
- response_payload={},
- error=str(exc),
- )
- raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
- repo.add_ioc_trace(
- action="upload_file",
- ioc_type="hash",
- ioc_value=file.filename or "<unknown>",
- providers=["virustotal"],
- request_payload={"filename": file.filename, "size": len(content)},
- response_payload=vt_upload if isinstance(vt_upload, dict) else {"raw": str(vt_upload)},
- )
- return ApiResponse(data={"virustotal": vt_upload})
- @app.get(
- "/ioc/analysis/{analysis_id}",
- response_model=ApiResponse,
- summary="Get VirusTotal analysis",
- description="Fetch analysis status/details from VirusTotal by analysis ID.",
- )
- async def ioc_get_analysis(analysis_id: str) -> ApiResponse:
- try:
- vt_analysis = await virustotal_adapter.get_analysis(analysis_id)
- except Exception as exc:
- repo.add_ioc_trace(
- action="analysis",
- ioc_type="hash",
- ioc_value=analysis_id,
- providers=["virustotal"],
- request_payload={"analysis_id": analysis_id},
- response_payload={},
- error=str(exc),
- )
- raise HTTPException(status_code=502, detail=f"VirusTotal analysis fetch failed: {exc}") from exc
- repo.add_ioc_trace(
- action="analysis",
- ioc_type="hash",
- ioc_value=analysis_id,
- providers=["virustotal"],
- request_payload={"analysis_id": analysis_id},
- response_payload=vt_analysis if isinstance(vt_analysis, dict) else {"raw": str(vt_analysis)},
- )
- return ApiResponse(data={"virustotal": vt_analysis})
- @app.post(
- "/ioc/evaluate-file",
- response_model=ApiResponse,
- summary="Evaluate uploaded file IOC",
- description="Upload a file, poll analysis completion, fetch final file report, and return IOC verdict.",
- )
- async def ioc_evaluate_file(
- file: UploadFile = File(...),
- malicious_threshold: int = 1,
- suspicious_threshold: int = 3,
- poll_timeout_seconds: int = 30,
- poll_interval_seconds: int = 2,
- ) -> ApiResponse:
- content = await file.read()
- if not content:
- raise HTTPException(status_code=400, detail="Uploaded file is empty")
- try:
- vt_upload = await virustotal_adapter.upload_file(file.filename or "upload.bin", content)
- except Exception as exc:
- repo.add_ioc_trace(
- action="evaluate_file",
- ioc_type="hash",
- ioc_value=file.filename or "<unknown>",
- providers=["virustotal"],
- request_payload={"filename": file.filename, "size": len(content)},
- response_payload={},
- error=str(exc),
- )
- raise HTTPException(status_code=502, detail=f"VirusTotal upload failed: {exc}") from exc
- analysis_id = (
- (((vt_upload.get("data") or {}).get("id")) if isinstance(vt_upload, dict) else None)
- or ""
- )
- if not analysis_id:
- raise HTTPException(status_code=502, detail="VirusTotal upload response missing analysis ID")
- timeout = max(1, poll_timeout_seconds)
- interval = max(1, poll_interval_seconds)
- elapsed = 0
- analysis: dict[str, object] = {}
- while elapsed <= timeout:
- analysis = await virustotal_adapter.get_analysis(analysis_id)
- status = (
- (((analysis.get("data") or {}).get("attributes") or {}).get("status"))
- if isinstance(analysis, dict)
- else None
- )
- if status == "completed":
- break
- await asyncio.sleep(interval)
- elapsed += interval
- sha256 = (
- (((analysis.get("meta") or {}).get("file_info") or {}).get("sha256"))
- if isinstance(analysis, dict)
- else None
- )
- if not sha256:
- raise HTTPException(status_code=502, detail="VirusTotal analysis did not return file hash yet")
- try:
- vt_file = await virustotal_adapter.enrich_ioc("hash", str(sha256))
- except Exception as exc:
- repo.add_ioc_trace(
- action="evaluate_file",
- ioc_type="hash",
- ioc_value=str(sha256),
- providers=["virustotal"],
- request_payload={"filename": file.filename, "analysis_id": analysis_id},
- response_payload={"upload": vt_upload, "analysis": analysis},
- error=str(exc),
- )
- raise HTTPException(status_code=502, detail=f"VirusTotal report fetch failed: {exc}") from exc
- ioc_result, matched, severity, confidence = _build_vt_ioc_result(
- vt=vt_file,
- ioc_type="hash",
- ioc_value=str(sha256),
- malicious_threshold=malicious_threshold,
- suspicious_threshold=suspicious_threshold,
- )
- ioc_result["analysis_id"] = analysis_id
- ioc_result["filename"] = file.filename
- repo.add_ioc_trace(
- action="evaluate_file",
- ioc_type="hash",
- ioc_value=str(sha256),
- providers=["virustotal"],
- request_payload={"filename": file.filename, "analysis_id": analysis_id},
- response_payload={
- "upload": vt_upload,
- "analysis": analysis,
- "ioc": ioc_result,
- },
- matched=matched,
- severity=severity,
- confidence=confidence,
- )
- return ApiResponse(data={"ioc": ioc_result, "analysis": analysis, "upload": vt_upload})
- @app.get(
- "/ioc/history",
- response_model=ApiResponse,
- summary="IOC trace history",
- description="List recent IOC enrichment/evaluation trace records stored in database.",
- )
- async def ioc_history(limit: int = 50, offset: int = 0) -> ApiResponse:
- return ApiResponse(data={"items": repo.list_ioc_trace(limit=limit, offset=offset)})
- @app.get(
- "/geoip/{ip}",
- response_model=ApiResponse,
- summary="GeoIP lookup",
- description="Lookup geolocation for a public IP address using configured GeoIP provider.",
- )
- async def geoip_lookup(ip: str) -> ApiResponse:
- result = await geoip_adapter.lookup(ip)
- return ApiResponse(data={"geoip": result})
- @app.get(
- "/sync/wazuh-version",
- response_model=ApiResponse,
- summary="Wazuh version",
- description="Get Wazuh API/manager version information through adapter.",
- )
- async def sync_wazuh_version() -> ApiResponse:
- try:
- wazuh_result = await wazuh_adapter.get_version()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
- return ApiResponse(data={"wazuh": wazuh_result})
- @app.get(
- "/wazuh/auth-test",
- response_model=ApiResponse,
- summary="Wazuh auth test",
- description="Validate Wazuh API authentication using configured credentials.",
- )
- async def wazuh_auth_test() -> ApiResponse:
- try:
- result = await wazuh_adapter.auth_test()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh auth failed: {exc}") from exc
- return ApiResponse(data={"wazuh": result})
- @app.get(
- "/wazuh/manager-info",
- response_model=ApiResponse,
- summary="Wazuh manager info",
- description="Return manager information from Wazuh API.",
- )
- async def wazuh_manager_info() -> ApiResponse:
- try:
- result = await wazuh_adapter.get_manager_info()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
- return ApiResponse(data={"wazuh": result})
- @app.get(
- "/wazuh/agents",
- response_model=ApiResponse,
- summary="List Wazuh agents",
- description="List registered Wazuh agents with pagination and optional field selection.",
- )
- async def wazuh_agents(
- limit: int = 50,
- offset: int = 0,
- select: str | None = None,
- ) -> ApiResponse:
- try:
- result = await wazuh_adapter.list_agents(limit=limit, offset=offset, select=select)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
- return ApiResponse(data={"wazuh": result})
- @app.get(
- "/wazuh/alerts",
- response_model=ApiResponse,
- summary="List Wazuh alerts",
- description="List alert-like entries from manager logs API for current Wazuh build.",
- )
- async def wazuh_alerts(
- limit: int = 50,
- offset: int = 0,
- q: str | None = None,
- sort: str | None = None,
- ) -> ApiResponse:
- try:
- # In this Wazuh build, API alerts are exposed via manager logs.
- result = await wazuh_adapter.list_manager_logs(
- limit=limit, offset=offset, q=q, sort=sort
- )
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
- return ApiResponse(data={"wazuh": result})
- @app.get(
- "/wazuh/manager-logs",
- response_model=ApiResponse,
- summary="List Wazuh manager logs",
- description="Query manager logs endpoint with pagination and optional q/sort filters.",
- )
- async def wazuh_manager_logs(
- limit: int = 50,
- offset: int = 0,
- q: str | None = None,
- sort: str | None = None,
- ) -> ApiResponse:
- try:
- result = await wazuh_adapter.list_manager_logs(
- limit=limit, offset=offset, q=q, sort=sort
- )
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
- return ApiResponse(data={"wazuh": result})
- @app.post(
- "/wazuh/sync-to-mvp",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Sync Wazuh to MVP",
- description="Fetch Wazuh alerts from indexer and pass them through MVP ingest/evaluation logic.",
- )
- async def wazuh_sync_to_mvp(
- limit: int = 50,
- minutes: int = 120,
- q: str = "soc_mvp_test=true OR event_type:*",
- ) -> ApiResponse:
- try:
- result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
- return ApiResponse(data={"sync": result})
- @app.get(
- "/wazuh/auto-sync/status",
- response_model=ApiResponse,
- summary="Wazuh auto-sync status",
- description="Show auto-sync enablement, settings, task runtime state, and last sync result.",
- )
- async def wazuh_auto_sync_status() -> ApiResponse:
- state = getattr(app.state, "wazuh_auto_sync_state", {})
- task = getattr(app.state, "wazuh_auto_sync_task", None)
- return ApiResponse(
- data={
- "enabled": settings.wazuh_auto_sync_enabled,
- "task_running": bool(task and not task.done()),
- "settings": {
- "interval_seconds": settings.wazuh_auto_sync_interval_seconds,
- "limit": settings.wazuh_auto_sync_limit,
- "minutes": settings.wazuh_auto_sync_minutes,
- "query": settings.wazuh_auto_sync_query,
- },
- "state": state,
- }
- )
- @app.get(
- "/monitor/systems",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Systems monitor overview",
- description="Unified monitoring snapshot for Wazuh, Shuffle, IRIS, and PagerDuty with pipeline KPIs and recent records.",
- )
- async def monitor_systems(
- minutes: int = 60,
- limit: int = 20,
- include_raw: bool = False,
- ) -> ApiResponse:
- window_minutes = max(1, minutes)
- row_limit = max(1, limit)
- now = datetime.now(timezone.utc)
- since = now - timedelta(minutes=window_minutes)
- now_iso = now.isoformat()
- dependencies = await mvp_service.dependency_health()
- monitor_state = getattr(app.state, "systems_monitor_state", {"last_ok_at": {}})
- last_ok_at_by_key = monitor_state.setdefault("last_ok_at", {})
- # KPI counters from persisted database records in the selected lookback window.
- alerts_ingested = repo.count_incident_events_since(since=since, source="wazuh")
- detections_matched = repo.count_c_detection_events_since(since=since)
- iris_tickets_created = repo.count_incidents_with_iris_since(since=since)
- pagerduty_escalations_sent = repo.count_escalations_since(since=since, success=True)
- pagerduty_escalations_failed = repo.count_escalations_since(since=since, success=False)
- wazuh_recent: list[object] = []
- wazuh_recent_error: str | None = None
- try:
- wazuh_resp = await wazuh_adapter.list_manager_logs(limit=row_limit, offset=0, q=None, sort=None)
- wazuh_recent = _extract_first_array(wazuh_resp)[:row_limit]
- except Exception as exc:
- wazuh_recent_error = str(exc)
- shuffle_recent: list[object] = []
- shuffle_recent_error: str | None = None
- try:
- workflows_resp = await shuffle_adapter.list_workflows()
- workflows = _extract_first_array(workflows_resp)
- for item in workflows[:row_limit]:
- if isinstance(item, dict):
- shuffle_recent.append(
- {
- "id": item.get("id") or item.get("workflow_id"),
- "name": item.get("name") or item.get("workflow", {}).get("name"),
- "status": item.get("status"),
- }
- )
- else:
- shuffle_recent.append(item)
- except Exception as exc:
- shuffle_recent_error = str(exc)
- iris_recent: list[object] = []
- iris_recent_error: str | None = None
- try:
- iris_resp = await iris_adapter.list_cases(limit=row_limit, offset=0)
- iris_recent = _extract_first_array(iris_resp)[:row_limit]
- except Exception as exc:
- iris_recent_error = str(exc)
- pagerduty_recent = repo.list_recent_escalations(limit=row_limit)
- def build_card(
- label: str,
- dependency_key: str,
- recent: list[object],
- kpis: dict[str, object],
- extra_error: str | None = None,
- ) -> dict[str, object]:
- dep = dependencies.get(dependency_key, {})
- dep_status = str(dep.get("status") or "down")
- status = "ok" if dep_status == "up" else "down"
- if dep_status == "up":
- last_ok_at_by_key[label] = now_iso
- error_parts: list[str] = []
- if dep.get("error"):
- error_parts.append(str(dep.get("error")))
- if extra_error:
- error_parts.append(extra_error)
- if dep_status == "up" and extra_error:
- status = "degraded"
- card: dict[str, object] = {
- "status": status,
- "latency_ms": dep.get("latency_ms"),
- "last_ok_at": last_ok_at_by_key.get(label),
- "last_error": " | ".join(error_parts) if error_parts else None,
- "kpis": kpis,
- "recent": recent,
- }
- if include_raw:
- card["raw"] = dep.get("details")
- return card
- cards = {
- "wazuh": build_card(
- label="wazuh",
- dependency_key="wazuh",
- recent=wazuh_recent,
- extra_error=wazuh_recent_error,
- kpis={
- "alerts_ingested": alerts_ingested,
- "recent_rows": len(wazuh_recent),
- },
- ),
- "shuffle": build_card(
- label="shuffle",
- dependency_key="shuffle",
- recent=shuffle_recent,
- extra_error=shuffle_recent_error,
- kpis={
- "recent_workflows": len(shuffle_recent),
- },
- ),
- "iris": build_card(
- label="iris",
- dependency_key="iris",
- recent=iris_recent,
- extra_error=iris_recent_error,
- kpis={
- "tickets_created": iris_tickets_created,
- "recent_rows": len(iris_recent),
- },
- ),
- "pagerduty": build_card(
- label="pagerduty",
- dependency_key="pagerduty_stub",
- recent=pagerduty_recent,
- kpis={
- "escalations_sent": pagerduty_escalations_sent,
- "escalations_failed": pagerduty_escalations_failed,
- },
- ),
- }
- app.state.systems_monitor_state = monitor_state
- return ApiResponse(
- data={
- "generated_at": now_iso,
- "window_minutes": window_minutes,
- "cards": cards,
- "pipeline": {
- "alerts_ingested": alerts_ingested,
- "detections_matched": detections_matched,
- "iris_tickets_created": iris_tickets_created,
- "pagerduty_escalations_sent": pagerduty_escalations_sent,
- "pagerduty_escalations_failed": pagerduty_escalations_failed,
- },
- }
- )
- @app.get(
- "/sim/logs/runs",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="List simulator runs",
- description="List active and recent simulator script runs started from soc-integrator.",
- )
- async def sim_logs_runs() -> ApiResponse:
- sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
- items: list[dict[str, object]] = []
- for run_id, run in sim_runs.items():
- serialized = _serialize_sim_run(run_id, run)
- if (not serialized["running"]) and not run.get("stopped_at"):
- run["stopped_at"] = datetime.now(timezone.utc).isoformat()
- serialized["stopped_at"] = run["stopped_at"]
- items.append(serialized)
- items.sort(key=lambda x: str(x.get("started_at") or ""), reverse=True)
- return ApiResponse(data={"items": items})
- @app.post(
- "/sim/logs/start",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Start simulator logs script",
- description="Start a whitelisted simulator script in background and return run metadata.",
- )
- async def sim_logs_start(payload: SimLogRunRequest) -> ApiResponse:
- script_name = SIM_SCRIPT_MAP[payload.script]
- script_path = SIM_SCRIPTS_DIR / script_name
- if not script_path.exists():
- raise HTTPException(status_code=400, detail=f"Simulator script not found in container: {script_name}")
- cmd = _build_sim_command(payload)
- env = dict(os.environ)
- env.setdefault("WAZUH_SYSLOG_HOST", "wazuh.manager")
- env.setdefault("WAZUH_SYSLOG_PORT", "514")
- run_id = str(uuid.uuid4())
- log_file = SIM_RUN_LOGS_DIR / f"{run_id}.log"
- log_handle = None
- try:
- log_handle = log_file.open("ab")
- process = subprocess.Popen(
- cmd,
- cwd=str(SIM_SCRIPTS_DIR),
- env=env,
- stdout=log_handle,
- stderr=subprocess.STDOUT,
- start_new_session=True,
- )
- except Exception as exc:
- if log_handle:
- try:
- log_handle.close()
- except Exception:
- pass
- raise HTTPException(status_code=502, detail=f"Failed to start simulator: {exc}") from exc
- finally:
- if log_handle:
- log_handle.close()
- sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
- sim_runs[run_id] = {
- "script": payload.script,
- "target": payload.target,
- "scenario": payload.scenario,
- "count": payload.count,
- "delay_seconds": payload.delay_seconds,
- "forever": payload.forever,
- "pid": process.pid,
- "cmd": " ".join(shlex.quote(part) for part in cmd),
- "started_at": datetime.now(timezone.utc).isoformat(),
- "stopped_at": None,
- "return_code": None,
- "log_file": str(log_file),
- "process": process,
- }
- app.state.sim_runs = sim_runs
- return ApiResponse(data={"run": _serialize_sim_run(run_id, sim_runs[run_id])})
- @app.post(
- "/sim/logs/stop/{run_id}",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Stop simulator run",
- description="Stop a running simulator script by run_id.",
- )
- async def sim_logs_stop(run_id: str) -> ApiResponse:
- sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
- run = sim_runs.get(run_id)
- if not run:
- raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
- process = run.get("process")
- if process and process.poll() is None:
- try:
- process.terminate()
- process.wait(timeout=3)
- except subprocess.TimeoutExpired:
- process.kill()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Failed to stop run: {exc}") from exc
- run["stopped_at"] = datetime.now(timezone.utc).isoformat()
- return ApiResponse(data={"run": _serialize_sim_run(run_id, run)})
- @app.post(
- "/sim/logs/stop-running",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Stop all running simulator runs",
- description="Stop all currently running simulator scripts (including forever mode).",
- )
- async def sim_logs_stop_running() -> ApiResponse:
- sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
- stopped: list[dict[str, object]] = []
- already_stopped = 0
- for run_id, run in sim_runs.items():
- process = run.get("process")
- if process and process.poll() is None:
- try:
- process.terminate()
- process.wait(timeout=3)
- except subprocess.TimeoutExpired:
- process.kill()
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Failed to stop run {run_id}: {exc}") from exc
- run["stopped_at"] = datetime.now(timezone.utc).isoformat()
- stopped.append(_serialize_sim_run(run_id, run))
- else:
- already_stopped += 1
- return ApiResponse(
- data={
- "stopped_count": len(stopped),
- "already_stopped_count": already_stopped,
- "runs": stopped,
- }
- )
- @app.get(
- "/sim/logs/output/{run_id}",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Get simulator run output",
- description="Return tailed output lines from simulator run log file.",
- )
- async def sim_logs_output(run_id: str, limit: int = 200) -> ApiResponse:
- sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
- run = sim_runs.get(run_id)
- if not run:
- raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
- log_file_path = run.get("log_file")
- if not log_file_path:
- raise HTTPException(status_code=404, detail=f"No log file for run: {run_id}")
- log_file = Path(str(log_file_path))
- lines = _tail_log_lines(log_file, limit=limit)
- process = run.get("process")
- running = bool(process and process.poll() is None)
- return ApiResponse(
- data={
- "run_id": run_id,
- "running": running,
- "line_count": len(lines),
- "lines": lines,
- "text": "\n".join(lines),
- "log_file": str(log_file),
- }
- )
- @app.get(
- "/sim/logs/wazuh-latest/{run_id}",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Get latest Wazuh logs/rules for simulator run",
- description="Return latest Wazuh event logs and matched rules correlated to a simulator run.",
- )
- async def sim_logs_wazuh_latest(
- run_id: str,
- limit: int = 50,
- minutes: int = 15,
- include_raw: bool = False,
- ) -> ApiResponse:
- sim_runs: dict[str, dict[str, object]] = getattr(app.state, "sim_runs", {})
- run = sim_runs.get(run_id)
- if not run:
- raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
- requested_minutes = max(1, int(minutes))
- # Keep query unfiltered and use a wide lookback to emulate Discover "latest records".
- effective_minutes = max(1440, requested_minutes)
- query_limit = max(1, min(int(limit), 200))
- query_text = "*"
- try:
- raw = await wazuh_adapter.search_alerts(
- query=query_text,
- limit=query_limit,
- minutes=effective_minutes,
- )
- except Exception as exc:
- raise HTTPException(status_code=502, detail=f"Wazuh search failed: {exc}") from exc
- hits = _extract_wazuh_hits(raw)
- events = [_extract_wazuh_event_item(hit, include_raw=include_raw) for hit in hits]
- rules: list[dict[str, object]] = []
- for hit in hits:
- rule_item = _extract_wazuh_rule_item(hit, include_raw=include_raw)
- if rule_item:
- rules.append(rule_item)
- return ApiResponse(
- data={
- "run": _serialize_sim_run(run_id, run),
- "query": {
- "effective_minutes": effective_minutes,
- "text": query_text,
- "limit": query_limit,
- },
- "events": events,
- "rules": rules,
- "totals": {
- "events": len(events),
- "rules": len(rules),
- },
- }
- )
- @app.post(
- "/monitor/log-loss/check",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Check log loss",
- description="Check expected telemetry streams for missing logs in a configurable lookback window.",
- )
- async def monitor_log_loss_check(
- payload: LogLossCheckRequest | None = None,
- create_ticket: bool = False,
- ) -> ApiResponse:
- req = payload or LogLossCheckRequest()
- result = await _execute_log_loss_check(req=req, create_ticket=create_ticket)
- return ApiResponse(data=result)
- @app.post(
- "/monitor/c-detections/evaluate",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="Evaluate Appendix C detections",
- description="Evaluate C1-C3 detection rules on recent events, optionally creating incidents/tickets.",
- )
- async def monitor_c_detections_evaluate(payload: CDetectionEvaluateRequest) -> ApiResponse:
- if not settings.c_detection_enabled:
- raise HTTPException(status_code=400, detail="C detection is disabled by configuration")
- started_at = datetime.now(timezone.utc).isoformat()
- app.state.c_detection_state["last_started_at"] = started_at
- try:
- raw = await wazuh_adapter.search_alerts(
- query=payload.query,
- limit=max(1, payload.limit),
- minutes=max(1, payload.minutes),
- )
- hits = (raw.get("hits", {}) or {}).get("hits", []) if isinstance(raw, dict) else []
- normalized = [mvp_service.normalize_wazuh_hit(hit) for hit in hits]
- evaluated = await c_detection_service.evaluate(normalized, selectors=payload.selectors)
- records: list[dict[str, object]] = []
- for match in evaluated.get("matches", []):
- usecase_id = str(match.get("usecase_id") or "")
- entity = str(match.get("entity") or "unknown")
- severity = str(match.get("severity") or "medium")
- evidence = dict(match.get("evidence") or {})
- event_ref = {
- "event_id": ((match.get("event") or {}).get("event_id")),
- "timestamp": ((match.get("event") or {}).get("timestamp")),
- "source": ((match.get("event") or {}).get("source")),
- }
- in_cooldown = repo.is_c_detection_in_cooldown(
- usecase_id=usecase_id,
- entity=entity,
- cooldown_seconds=int(settings.c_detection_ticket_cooldown_seconds),
- )
- incident_key: str | None = None
- event_row = repo.add_c_detection_event(
- usecase_id=usecase_id,
- entity=entity,
- severity=severity,
- evidence=evidence,
- event_ref=event_ref,
- incident_key=None,
- )
- if (not payload.dry_run) and settings.c_detection_create_iris_ticket and not in_cooldown:
- incident_event = _c_match_to_incident_event(match)
- ingest = await mvp_service.ingest_incident(incident_event)
- incident_key = str(ingest.get("incident_key") or "") or None
- repo.update_c_detection_incident(int(event_row["id"]), incident_key)
- records.append(
- {
- "id": event_row["id"],
- "usecase_id": usecase_id,
- "entity": entity,
- "severity": severity,
- "incident_key": incident_key,
- "cooldown_active": in_cooldown,
- "evidence": evidence,
- }
- )
- result = {
- "query": payload.query,
- "minutes": max(1, payload.minutes),
- "selectors": payload.selectors,
- "dry_run": payload.dry_run,
- "summary": evaluated.get("summary", {}),
- "matches": records,
- "total_hits": len(hits),
- }
- app.state.c_detection_state["last_status"] = "ok"
- app.state.c_detection_state["last_result"] = result
- app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
- return ApiResponse(data=result)
- except Exception as exc:
- app.state.c_detection_state["last_status"] = "error"
- app.state.c_detection_state["last_error"] = str(exc)
- app.state.c_detection_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
- raise HTTPException(status_code=502, detail=f"C detection evaluation failed: {exc}") from exc
- @app.get(
- "/monitor/c-detections/history",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="C detection history",
- description="List persisted C1-C3 detection matches, including evidence and linked incident keys.",
- )
- async def monitor_c_detections_history(
- limit: int = 50,
- offset: int = 0,
- usecase_id: str | None = None,
- ) -> ApiResponse:
- rows = repo.list_c_detection_events(limit=limit, offset=offset, usecase_id=usecase_id)
- return ApiResponse(data={"items": rows, "limit": max(1, limit), "offset": max(0, offset), "usecase_id": usecase_id})
- @app.get(
- "/monitor/c-detections/state",
- response_model=ApiResponse,
- dependencies=[Depends(require_internal_api_key)],
- summary="C detection state",
- description="Return Appendix C detection settings and last evaluation runtime state.",
- )
- async def monitor_c_detections_state() -> ApiResponse:
- return ApiResponse(
- data={
- "enabled": settings.c_detection_enabled,
- "settings": {
- "window_minutes": settings.c_detection_window_minutes,
- "c1_max_travel_speed_kmph": settings.c1_max_travel_speed_kmph,
- "c2_offhours_start_utc": settings.c2_offhours_start_utc,
- "c2_offhours_end_utc": settings.c2_offhours_end_utc,
- "c3_host_spread_threshold": settings.c3_host_spread_threshold,
- "c3_scan_port_threshold": settings.c3_scan_port_threshold,
- "create_iris_ticket": settings.c_detection_create_iris_ticket,
- "ticket_cooldown_seconds": settings.c_detection_ticket_cooldown_seconds,
- },
- "state": getattr(app.state, "c_detection_state", {}),
- }
- )
|