Nav apraksta

models.py 3.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from datetime import datetime, timezone
  2. from typing import Any, Literal
  3. from pydantic import BaseModel, Field
  4. def utc_now() -> datetime:
  5. return datetime.now(timezone.utc)
  6. class WazuhIngestRequest(BaseModel):
  7. source: str = "wazuh"
  8. rule_id: str | None = None
  9. alert_id: str | None = None
  10. severity: int | None = None
  11. title: str | None = None
  12. payload: dict[str, Any] = Field(default_factory=dict)
  13. class ActionCreateIncidentRequest(BaseModel):
  14. title: str
  15. severity: str = "medium"
  16. source: str = "soc-integrator"
  17. dedupe_key: str | None = None
  18. payload: dict[str, Any] = Field(default_factory=dict)
  19. class IrisTicketCreateRequest(BaseModel):
  20. title: str
  21. description: str = "Created by soc-integrator"
  22. case_customer: int | None = None
  23. case_soc_id: str | None = None
  24. payload: dict[str, Any] = Field(default_factory=dict)
  25. class IocEnrichRequest(BaseModel):
  26. ioc_type: Literal["domain", "ip", "hash", "url"]
  27. ioc_value: str
  28. providers: list[str] = Field(default_factory=lambda: ["virustotal"])
  29. class IocEvaluateRequest(BaseModel):
  30. ioc_type: Literal["domain", "ip", "hash", "url"]
  31. ioc_value: str
  32. providers: list[str] = Field(default_factory=lambda: ["virustotal"])
  33. malicious_threshold: int = 1
  34. suspicious_threshold: int = 3
  35. class TriggerShuffleRequest(BaseModel):
  36. workflow_id: str
  37. execution_argument: dict[str, Any] = Field(default_factory=dict)
  38. class ShuffleProxyRequest(BaseModel):
  39. method: str = "GET"
  40. path: str
  41. params: dict[str, Any] = Field(default_factory=dict)
  42. payload: dict[str, Any] = Field(default_factory=dict)
  43. class ShuffleLoginRequest(BaseModel):
  44. username: str
  45. password: str
  46. class MvpIncidentIngestRequest(BaseModel):
  47. source: Literal["wazuh", "shuffle", "manual"] = "wazuh"
  48. event_type: Literal["ioc_dns", "ioc_ips", "vpn_geo_anomaly", "auth_anomaly", "generic"] = "generic"
  49. event_id: str
  50. timestamp: datetime
  51. severity: Literal["low", "medium", "high", "critical"] = "medium"
  52. title: str
  53. description: str
  54. asset: dict[str, Any] = Field(default_factory=dict)
  55. network: dict[str, Any] = Field(default_factory=dict)
  56. tags: list[str] = Field(default_factory=list)
  57. risk_context: dict[str, Any] = Field(default_factory=dict)
  58. raw: dict[str, Any] = Field(default_factory=dict)
  59. payload: dict[str, Any] = Field(default_factory=dict)
  60. class MvpIocEvaluateRequest(BaseModel):
  61. ioc_type: Literal["domain", "ip"]
  62. ioc_value: str
  63. source_event: dict[str, Any] = Field(default_factory=dict)
  64. class MvpVpnEvaluateRequest(BaseModel):
  65. user: str
  66. src_ip: str
  67. country_code: str
  68. success: bool
  69. event_time: datetime
  70. is_admin: bool = False
  71. off_hours: bool = False
  72. first_seen_country: bool = False
  73. event_id: str | None = None
  74. class ApiResponse(BaseModel):
  75. ok: bool = True
  76. message: str = "ok"
  77. timestamp: datetime = Field(default_factory=utc_now)
  78. data: dict[str, Any] = Field(default_factory=dict)