| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from datetime import datetime, timezone
- from typing import Any, Literal
- from pydantic import BaseModel, Field
- def utc_now() -> datetime:
- return datetime.now(timezone.utc)
- class WazuhIngestRequest(BaseModel):
- source: str = "wazuh"
- rule_id: str | None = None
- alert_id: str | None = None
- severity: int | None = None
- title: str | None = None
- payload: dict[str, Any] = Field(default_factory=dict)
- class ActionCreateIncidentRequest(BaseModel):
- title: str
- severity: str = "medium"
- source: str = "soc-integrator"
- dedupe_key: str | None = None
- payload: dict[str, Any] = Field(default_factory=dict)
- class IrisTicketCreateRequest(BaseModel):
- title: str
- description: str = "Created by soc-integrator"
- case_customer: int | None = None
- case_soc_id: str | None = None
- payload: dict[str, Any] = Field(default_factory=dict)
- class IocEnrichRequest(BaseModel):
- ioc_type: Literal["domain", "ip", "hash", "url"]
- ioc_value: str
- providers: list[str] = Field(default_factory=lambda: ["virustotal"])
- class IocEvaluateRequest(BaseModel):
- ioc_type: Literal["domain", "ip", "hash", "url"]
- ioc_value: str
- providers: list[str] = Field(default_factory=lambda: ["virustotal"])
- malicious_threshold: int = 1
- suspicious_threshold: int = 3
- class TriggerShuffleRequest(BaseModel):
- workflow_id: str
- execution_argument: dict[str, Any] = Field(default_factory=dict)
- class ShuffleProxyRequest(BaseModel):
- method: str = "GET"
- path: str
- params: dict[str, Any] = Field(default_factory=dict)
- payload: dict[str, Any] = Field(default_factory=dict)
- class ShuffleLoginRequest(BaseModel):
- username: str
- password: str
- class MvpIncidentIngestRequest(BaseModel):
- source: Literal["wazuh", "shuffle", "manual"] = "wazuh"
- event_type: Literal["ioc_dns", "ioc_ips", "vpn_geo_anomaly", "auth_anomaly", "generic"] = "generic"
- event_id: str
- timestamp: datetime
- severity: Literal["low", "medium", "high", "critical"] = "medium"
- title: str
- description: str
- asset: dict[str, Any] = Field(default_factory=dict)
- network: dict[str, Any] = Field(default_factory=dict)
- tags: list[str] = Field(default_factory=list)
- risk_context: dict[str, Any] = Field(default_factory=dict)
- raw: dict[str, Any] = Field(default_factory=dict)
- payload: dict[str, Any] = Field(default_factory=dict)
- class MvpIocEvaluateRequest(BaseModel):
- ioc_type: Literal["domain", "ip"]
- ioc_value: str
- source_event: dict[str, Any] = Field(default_factory=dict)
- class MvpVpnEvaluateRequest(BaseModel):
- user: str
- src_ip: str
- country_code: str
- success: bool
- event_time: datetime
- is_admin: bool = False
- off_hours: bool = False
- first_seen_country: bool = False
- event_id: str | None = None
- class ApiResponse(BaseModel):
- ok: bool = True
- message: str = "ok"
- timestamp: datetime = Field(default_factory=utc_now)
- data: dict[str, Any] = Field(default_factory=dict)
|