Nessuna descrizione

models.py 2.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from datetime import datetime, timezone
  2. from typing import Any, Literal
  3. from pydantic import BaseModel, Field
  4. def utc_now() -> datetime:
  5. return datetime.now(timezone.utc)
  6. class WazuhIngestRequest(BaseModel):
  7. source: str = "wazuh"
  8. rule_id: str | None = None
  9. alert_id: str | None = None
  10. severity: int | None = None
  11. title: str | None = None
  12. payload: dict[str, Any] = Field(default_factory=dict)
  13. class ActionCreateIncidentRequest(BaseModel):
  14. title: str
  15. severity: str = "medium"
  16. source: str = "soc-integrator"
  17. dedupe_key: str | None = None
  18. payload: dict[str, Any] = Field(default_factory=dict)
  19. class TriggerShuffleRequest(BaseModel):
  20. workflow_id: str
  21. execution_argument: dict[str, Any] = Field(default_factory=dict)
  22. class ShuffleProxyRequest(BaseModel):
  23. method: str = "GET"
  24. path: str
  25. params: dict[str, Any] = Field(default_factory=dict)
  26. payload: dict[str, Any] = Field(default_factory=dict)
  27. class ShuffleLoginRequest(BaseModel):
  28. username: str
  29. password: str
  30. class MvpIncidentIngestRequest(BaseModel):
  31. source: Literal["wazuh", "shuffle", "manual"] = "wazuh"
  32. event_type: Literal["ioc_dns", "ioc_ips", "vpn_geo_anomaly", "auth_anomaly", "generic"] = "generic"
  33. event_id: str
  34. timestamp: datetime
  35. severity: Literal["low", "medium", "high", "critical"] = "medium"
  36. title: str
  37. description: str
  38. asset: dict[str, Any] = Field(default_factory=dict)
  39. network: dict[str, Any] = Field(default_factory=dict)
  40. tags: list[str] = Field(default_factory=list)
  41. risk_context: dict[str, Any] = Field(default_factory=dict)
  42. raw: dict[str, Any] = Field(default_factory=dict)
  43. payload: dict[str, Any] = Field(default_factory=dict)
  44. class MvpIocEvaluateRequest(BaseModel):
  45. ioc_type: Literal["domain", "ip"]
  46. ioc_value: str
  47. source_event: dict[str, Any] = Field(default_factory=dict)
  48. class MvpVpnEvaluateRequest(BaseModel):
  49. user: str
  50. src_ip: str
  51. country_code: str
  52. success: bool
  53. event_time: datetime
  54. is_admin: bool = False
  55. off_hours: bool = False
  56. first_seen_country: bool = False
  57. event_id: str | None = None
  58. class ApiResponse(BaseModel):
  59. ok: bool = True
  60. message: str = "ok"
  61. timestamp: datetime = Field(default_factory=utc_now)
  62. data: dict[str, Any] = Field(default_factory=dict)