暫無描述

main.py 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. import asyncio
  2. import logging
  3. from datetime import datetime, timezone
  4. from fastapi import Depends, FastAPI, HTTPException
  5. from app.adapters.iris import IrisAdapter
  6. from app.adapters.pagerduty import PagerDutyAdapter
  7. from app.adapters.shuffle import ShuffleAdapter
  8. from app.adapters.wazuh import WazuhAdapter
  9. from app.config import settings
  10. from app.db import init_schema
  11. from app.models import (
  12. ActionCreateIncidentRequest,
  13. ApiResponse,
  14. ShuffleLoginRequest,
  15. ShuffleProxyRequest,
  16. TriggerShuffleRequest,
  17. WazuhIngestRequest,
  18. )
  19. from app.repositories.mvp_repo import MvpRepository
  20. from app.routes.mvp import build_mvp_router
  21. from app.security import require_internal_api_key
  22. from app.services.mvp_service import MvpService
  23. app = FastAPI(title=settings.app_name, version="0.1.0")
  24. logger = logging.getLogger(__name__)
  25. wazuh_adapter = WazuhAdapter(
  26. base_url=settings.wazuh_base_url,
  27. username=settings.wazuh_username,
  28. password=settings.wazuh_password,
  29. indexer_url=settings.wazuh_indexer_url,
  30. indexer_username=settings.wazuh_indexer_username,
  31. indexer_password=settings.wazuh_indexer_password,
  32. )
  33. shuffle_adapter = ShuffleAdapter(
  34. base_url=settings.shuffle_base_url,
  35. api_key=settings.shuffle_api_key,
  36. )
  37. pagerduty_adapter = PagerDutyAdapter(
  38. base_url=settings.pagerduty_base_url,
  39. api_key=settings.pagerduty_api_key,
  40. )
  41. iris_adapter = IrisAdapter(
  42. base_url=settings.iris_base_url,
  43. api_key=settings.iris_api_key,
  44. )
  45. repo = MvpRepository()
  46. mvp_service = MvpService(
  47. repo=repo,
  48. wazuh_adapter=wazuh_adapter,
  49. shuffle_adapter=shuffle_adapter,
  50. iris_adapter=iris_adapter,
  51. pagerduty_adapter=pagerduty_adapter,
  52. )
  53. app.include_router(build_mvp_router(mvp_service, require_internal_api_key))
  54. async def _wazuh_auto_sync_loop() -> None:
  55. interval = max(5, int(settings.wazuh_auto_sync_interval_seconds))
  56. while True:
  57. started_at = datetime.now(timezone.utc).isoformat()
  58. try:
  59. app.state.wazuh_auto_sync_state["running"] = True
  60. app.state.wazuh_auto_sync_state["last_started_at"] = started_at
  61. result = await mvp_service.sync_wazuh_alerts(
  62. query=settings.wazuh_auto_sync_query,
  63. limit=settings.wazuh_auto_sync_limit,
  64. minutes=settings.wazuh_auto_sync_minutes,
  65. )
  66. app.state.wazuh_auto_sync_state["last_status"] = "ok"
  67. app.state.wazuh_auto_sync_state["last_result"] = result
  68. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  69. logger.info(
  70. "wazuh auto-sync processed=%s ingested=%s skipped=%s failed=%s ioc_evaluated=%s ioc_matched=%s ioc_rejected=%s",
  71. result.get("processed", 0),
  72. result.get("ingested", 0),
  73. result.get("skipped_existing", 0),
  74. result.get("failed", 0),
  75. result.get("ioc_evaluated", 0),
  76. result.get("ioc_matched", 0),
  77. result.get("ioc_rejected", 0),
  78. )
  79. except Exception as exc:
  80. app.state.wazuh_auto_sync_state["last_status"] = "error"
  81. app.state.wazuh_auto_sync_state["last_error"] = str(exc)
  82. app.state.wazuh_auto_sync_state["last_finished_at"] = datetime.now(timezone.utc).isoformat()
  83. logger.exception("wazuh auto-sync failed: %s", exc)
  84. finally:
  85. app.state.wazuh_auto_sync_state["running"] = False
  86. await asyncio.sleep(interval)
  87. @app.on_event("startup")
  88. async def startup() -> None:
  89. init_schema()
  90. repo.ensure_policy()
  91. app.state.wazuh_auto_sync_state = {
  92. "running": False,
  93. "last_status": None,
  94. "last_started_at": None,
  95. "last_finished_at": None,
  96. "last_error": None,
  97. "last_result": None,
  98. }
  99. if settings.wazuh_auto_sync_enabled:
  100. app.state.wazuh_auto_sync_task = asyncio.create_task(_wazuh_auto_sync_loop())
  101. logger.info(
  102. "wazuh auto-sync enabled interval=%ss limit=%s minutes=%s query=%s",
  103. settings.wazuh_auto_sync_interval_seconds,
  104. settings.wazuh_auto_sync_limit,
  105. settings.wazuh_auto_sync_minutes,
  106. settings.wazuh_auto_sync_query,
  107. )
  108. @app.on_event("shutdown")
  109. async def shutdown() -> None:
  110. task = getattr(app.state, "wazuh_auto_sync_task", None)
  111. if task:
  112. task.cancel()
  113. try:
  114. await task
  115. except asyncio.CancelledError:
  116. pass
  117. @app.get("/health", response_model=ApiResponse)
  118. async def health() -> ApiResponse:
  119. return ApiResponse(
  120. data={
  121. "service": settings.app_name,
  122. "env": settings.app_env,
  123. "targets": {
  124. "wazuh": settings.wazuh_base_url,
  125. "shuffle": settings.shuffle_base_url,
  126. "pagerduty": settings.pagerduty_base_url,
  127. "iris": settings.iris_base_url,
  128. },
  129. }
  130. )
  131. @app.post("/ingest/wazuh-alert", response_model=ApiResponse)
  132. async def ingest_wazuh_alert(payload: WazuhIngestRequest) -> ApiResponse:
  133. normalized = {
  134. "source": payload.source,
  135. "alert_id": payload.alert_id,
  136. "rule_id": payload.rule_id,
  137. "severity": payload.severity,
  138. "title": payload.title,
  139. "payload": payload.payload,
  140. }
  141. return ApiResponse(data={"normalized": normalized})
  142. @app.post("/action/create-incident", response_model=ApiResponse)
  143. async def create_incident(payload: ActionCreateIncidentRequest) -> ApiResponse:
  144. incident_payload = {
  145. "title": payload.title,
  146. "urgency": payload.severity,
  147. "incident_key": payload.dedupe_key,
  148. "body": payload.payload,
  149. "source": payload.source,
  150. }
  151. try:
  152. pd_result = await pagerduty_adapter.create_incident(incident_payload)
  153. except Exception as exc:
  154. raise HTTPException(status_code=502, detail=f"PagerDuty call failed: {exc}") from exc
  155. return ApiResponse(data={"pagerduty": pd_result})
  156. @app.post("/action/trigger-shuffle", response_model=ApiResponse)
  157. async def trigger_shuffle(payload: TriggerShuffleRequest) -> ApiResponse:
  158. try:
  159. shuffle_result = await shuffle_adapter.trigger_workflow(
  160. workflow_id=payload.workflow_id,
  161. payload=payload.execution_argument,
  162. )
  163. except Exception as exc:
  164. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  165. return ApiResponse(data={"shuffle": shuffle_result})
  166. @app.get("/shuffle/health", response_model=ApiResponse)
  167. async def shuffle_health() -> ApiResponse:
  168. try:
  169. result = await shuffle_adapter.health()
  170. except Exception as exc:
  171. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  172. return ApiResponse(data={"shuffle": result})
  173. @app.get("/shuffle/auth-test", response_model=ApiResponse)
  174. async def shuffle_auth_test() -> ApiResponse:
  175. try:
  176. result = await shuffle_adapter.auth_test()
  177. except Exception as exc:
  178. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  179. return ApiResponse(data={"shuffle": result})
  180. @app.post("/shuffle/login", response_model=ApiResponse)
  181. async def shuffle_login(payload: ShuffleLoginRequest) -> ApiResponse:
  182. try:
  183. result = await shuffle_adapter.login(payload.username, payload.password)
  184. except Exception as exc:
  185. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  186. return ApiResponse(data={"shuffle": result})
  187. @app.post("/shuffle/generate-apikey", response_model=ApiResponse)
  188. async def shuffle_generate_apikey(payload: ShuffleLoginRequest | None = None) -> ApiResponse:
  189. username = payload.username if payload else settings.shuffle_username
  190. password = payload.password if payload else settings.shuffle_password
  191. if not username or not password:
  192. raise HTTPException(
  193. status_code=400,
  194. detail="Missing shuffle credentials. Provide username/password in body or set SHUFFLE_USERNAME and SHUFFLE_PASSWORD.",
  195. )
  196. try:
  197. result = await shuffle_adapter.generate_apikey_from_login(username, password)
  198. except Exception as exc:
  199. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  200. return ApiResponse(data={"shuffle": result})
  201. @app.get("/shuffle/workflows", response_model=ApiResponse)
  202. async def shuffle_workflows() -> ApiResponse:
  203. try:
  204. result = await shuffle_adapter.list_workflows()
  205. except Exception as exc:
  206. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  207. return ApiResponse(data={"shuffle": result})
  208. @app.get("/shuffle/workflows/{workflow_id}", response_model=ApiResponse)
  209. async def shuffle_workflow(workflow_id: str) -> ApiResponse:
  210. try:
  211. result = await shuffle_adapter.get_workflow(workflow_id)
  212. except Exception as exc:
  213. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  214. return ApiResponse(data={"shuffle": result})
  215. @app.post("/shuffle/workflows/{workflow_id}/execute", response_model=ApiResponse)
  216. async def shuffle_workflow_execute(
  217. workflow_id: str, payload: dict[str, object]
  218. ) -> ApiResponse:
  219. try:
  220. result = await shuffle_adapter.trigger_workflow(workflow_id=workflow_id, payload=payload)
  221. except Exception as exc:
  222. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  223. return ApiResponse(data={"shuffle": result})
  224. @app.get("/shuffle/apps", response_model=ApiResponse)
  225. async def shuffle_apps() -> ApiResponse:
  226. try:
  227. result = await shuffle_adapter.list_apps()
  228. except Exception as exc:
  229. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  230. return ApiResponse(data={"shuffle": result})
  231. @app.post("/shuffle/proxy", response_model=ApiResponse)
  232. async def shuffle_proxy(payload: ShuffleProxyRequest) -> ApiResponse:
  233. path = payload.path if payload.path.startswith("/api/") else f"/api/v1/{payload.path.lstrip('/')}"
  234. try:
  235. result = await shuffle_adapter.proxy(
  236. method=payload.method,
  237. path=path,
  238. params=payload.params,
  239. payload=payload.payload,
  240. )
  241. except Exception as exc:
  242. raise HTTPException(status_code=502, detail=f"Shuffle call failed: {exc}") from exc
  243. return ApiResponse(data={"shuffle": result})
  244. @app.post("/action/create-iris-case", response_model=ApiResponse)
  245. async def create_iris_case(payload: ActionCreateIncidentRequest) -> ApiResponse:
  246. # IRIS v2 expects case_name, case_description, case_customer, case_soc_id.
  247. case_payload = {
  248. "case_name": payload.title,
  249. "case_description": payload.payload.get("description", "Created by soc-integrator"),
  250. "case_customer": payload.payload.get("case_customer", settings.iris_default_customer_id),
  251. "case_soc_id": payload.payload.get("case_soc_id", settings.iris_default_soc_id),
  252. }
  253. try:
  254. iris_result = await iris_adapter.create_case(case_payload)
  255. except Exception as exc:
  256. raise HTTPException(status_code=502, detail=f"IRIS call failed: {exc}") from exc
  257. return ApiResponse(data={"iris": iris_result})
  258. @app.get("/sync/wazuh-version", response_model=ApiResponse)
  259. async def sync_wazuh_version() -> ApiResponse:
  260. try:
  261. wazuh_result = await wazuh_adapter.get_version()
  262. except Exception as exc:
  263. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  264. return ApiResponse(data={"wazuh": wazuh_result})
  265. @app.get("/wazuh/auth-test", response_model=ApiResponse)
  266. async def wazuh_auth_test() -> ApiResponse:
  267. try:
  268. result = await wazuh_adapter.auth_test()
  269. except Exception as exc:
  270. raise HTTPException(status_code=502, detail=f"Wazuh auth failed: {exc}") from exc
  271. return ApiResponse(data={"wazuh": result})
  272. @app.get("/wazuh/manager-info", response_model=ApiResponse)
  273. async def wazuh_manager_info() -> ApiResponse:
  274. try:
  275. result = await wazuh_adapter.get_manager_info()
  276. except Exception as exc:
  277. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  278. return ApiResponse(data={"wazuh": result})
  279. @app.get("/wazuh/agents", response_model=ApiResponse)
  280. async def wazuh_agents(
  281. limit: int = 50,
  282. offset: int = 0,
  283. select: str | None = None,
  284. ) -> ApiResponse:
  285. try:
  286. result = await wazuh_adapter.list_agents(limit=limit, offset=offset, select=select)
  287. except Exception as exc:
  288. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  289. return ApiResponse(data={"wazuh": result})
  290. @app.get("/wazuh/alerts", response_model=ApiResponse)
  291. async def wazuh_alerts(
  292. limit: int = 50,
  293. offset: int = 0,
  294. q: str | None = None,
  295. sort: str | None = None,
  296. ) -> ApiResponse:
  297. try:
  298. # In this Wazuh build, API alerts are exposed via manager logs.
  299. result = await wazuh_adapter.list_manager_logs(
  300. limit=limit, offset=offset, q=q, sort=sort
  301. )
  302. except Exception as exc:
  303. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  304. return ApiResponse(data={"wazuh": result})
  305. @app.get("/wazuh/manager-logs", response_model=ApiResponse)
  306. async def wazuh_manager_logs(
  307. limit: int = 50,
  308. offset: int = 0,
  309. q: str | None = None,
  310. sort: str | None = None,
  311. ) -> ApiResponse:
  312. try:
  313. result = await wazuh_adapter.list_manager_logs(
  314. limit=limit, offset=offset, q=q, sort=sort
  315. )
  316. except Exception as exc:
  317. raise HTTPException(status_code=502, detail=f"Wazuh call failed: {exc}") from exc
  318. return ApiResponse(data={"wazuh": result})
  319. @app.post("/wazuh/sync-to-mvp", response_model=ApiResponse, dependencies=[Depends(require_internal_api_key)])
  320. async def wazuh_sync_to_mvp(
  321. limit: int = 50,
  322. minutes: int = 120,
  323. q: str = "soc_mvp_test=true OR event_type:*",
  324. ) -> ApiResponse:
  325. try:
  326. result = await mvp_service.sync_wazuh_alerts(query=q, limit=limit, minutes=minutes)
  327. except Exception as exc:
  328. raise HTTPException(status_code=502, detail=f"Wazuh sync failed: {exc}") from exc
  329. return ApiResponse(data={"sync": result})
  330. @app.get("/wazuh/auto-sync/status", response_model=ApiResponse)
  331. async def wazuh_auto_sync_status() -> ApiResponse:
  332. state = getattr(app.state, "wazuh_auto_sync_state", {})
  333. task = getattr(app.state, "wazuh_auto_sync_task", None)
  334. return ApiResponse(
  335. data={
  336. "enabled": settings.wazuh_auto_sync_enabled,
  337. "task_running": bool(task and not task.done()),
  338. "settings": {
  339. "interval_seconds": settings.wazuh_auto_sync_interval_seconds,
  340. "limit": settings.wazuh_auto_sync_limit,
  341. "minutes": settings.wazuh_auto_sync_minutes,
  342. "query": settings.wazuh_auto_sync_query,
  343. },
  344. "state": state,
  345. }
  346. )