| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- from datetime import datetime, timezone
- from typing import Any, Literal
- from pydantic import BaseModel, ConfigDict, Field
- def utc_now() -> datetime:
- return datetime.now(timezone.utc)
- class WazuhIngestRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "source": "wazuh",
- "rule_id": "100335",
- "alert_id": "1737301024.88912",
- "severity": 12,
- "title": "VPN success outside Thailand",
- "payload": {"full_log": "soc_mvp_test=true usecase_id=A3-05"},
- }
- }
- )
- source: str = Field(default="wazuh", description="Alert source name.", examples=["wazuh"])
- rule_id: str | None = Field(default=None, description="Wazuh rule ID.", examples=["100335"])
- alert_id: str | None = Field(default=None, description="Original alert identifier.", examples=["1737301024.88912"])
- severity: int | None = Field(default=None, description="Numeric alert severity.", examples=[12])
- title: str | None = Field(default=None, description="Human-readable alert title.", examples=["VPN success outside Thailand"])
- payload: dict[str, Any] = Field(
- default_factory=dict,
- description="Raw alert payload from source.",
- examples=[{"full_log": "soc_mvp_test=true usecase_id=A3-05"}],
- )
- class ActionCreateIncidentRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "title": "Suspicious admin login",
- "severity": "high",
- "source": "soc-integrator",
- "dedupe_key": "vpn-user1-2026-03-03",
- "payload": {"description": "Detected from C1-01", "case_customer": 1},
- }
- }
- )
- title: str = Field(description="Incident title for escalation target.", examples=["Suspicious admin login"])
- severity: str = Field(default="medium", description="Incident severity level.", examples=["high"])
- source: str = Field(default="soc-integrator", description="Producer of the incident.", examples=["soc-integrator"])
- dedupe_key: str | None = Field(default=None, description="Stable key to deduplicate incidents.", examples=["vpn-user1-2026-03-03"])
- payload: dict[str, Any] = Field(
- default_factory=dict,
- description="Additional incident context.",
- examples=[{"description": "Detected from C1-01", "case_customer": 1}],
- )
- class IrisTicketCreateRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "title": "C1-01 impossible travel",
- "description": "Auto-created from SOC workflow",
- "case_customer": 1,
- "case_soc_id": "soc-prod",
- "payload": {"case_tags": ["appendix_c", "c1-01"]},
- }
- }
- )
- title: str = Field(description="IRIS case name.", examples=["C1-01 impossible travel"])
- description: str = Field(default="Created by soc-integrator", description="IRIS case description.", examples=["Auto-created from SOC workflow"])
- case_customer: int | None = Field(default=None, description="IRIS customer ID.", examples=[1])
- case_soc_id: str | None = Field(default=None, description="IRIS SOC ID.", examples=["soc-prod"])
- payload: dict[str, Any] = Field(
- default_factory=dict,
- description="Optional IRIS API fields merged into request body.",
- examples=[{"case_tags": ["appendix_c", "c1-01"]}],
- )
- class IocEnrichRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "ioc_type": "ip",
- "ioc_value": "8.8.8.8",
- "providers": ["virustotal", "abuseipdb"],
- }
- }
- )
- ioc_type: Literal["domain", "ip", "hash", "url"] = Field(description="Type of IOC.", examples=["ip"])
- ioc_value: str = Field(description="IOC value to enrich.", examples=["8.8.8.8"])
- providers: list[str] = Field(
- default_factory=lambda: ["virustotal"],
- description="Enrichment providers to query.",
- examples=[["virustotal", "abuseipdb"]],
- )
- class IocEvaluateRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "ioc_type": "domain",
- "ioc_value": "example.com",
- "providers": ["virustotal"],
- "malicious_threshold": 1,
- "suspicious_threshold": 3,
- }
- }
- )
- ioc_type: Literal["domain", "ip", "hash", "url"] = Field(description="Type of IOC.", examples=["domain"])
- ioc_value: str = Field(description="IOC value to evaluate.", examples=["example.com"])
- providers: list[str] = Field(default_factory=lambda: ["virustotal"], description="Evaluation providers.", examples=[["virustotal"]])
- malicious_threshold: int = Field(default=1, description="Minimum malicious engines to mark matched.", examples=[1])
- suspicious_threshold: int = Field(default=3, description="Minimum suspicious engines to mark matched.", examples=[3])
- class TriggerShuffleRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "workflow_id": "07ecad05-ff68-41cb-888d-96d1a8e8db4b",
- "execution_argument": {"ioc_type": "ip", "ioc_value": "8.8.8.8"},
- }
- }
- )
- workflow_id: str = Field(description="Shuffle workflow UUID.", examples=["07ecad05-ff68-41cb-888d-96d1a8e8db4b"])
- execution_argument: dict[str, Any] = Field(
- default_factory=dict,
- description="Execution argument payload passed to workflow.",
- examples=[{"ioc_type": "ip", "ioc_value": "8.8.8.8"}],
- )
- class ShuffleProxyRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "method": "POST",
- "path": "/api/v1/workflows",
- "params": {"limit": 50},
- "payload": {"name": "test"},
- }
- }
- )
- method: str = Field(default="GET", description="HTTP method for proxied request.", examples=["POST"])
- path: str = Field(description="Shuffle API path (with or without /api prefix).", examples=["/api/v1/workflows"])
- params: dict[str, Any] = Field(default_factory=dict, description="Query parameters.", examples=[{"limit": 50}])
- payload: dict[str, Any] = Field(default_factory=dict, description="JSON body for POST/PUT/PATCH.", examples=[{"name": "test"}])
- class ShuffleLoginRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={"example": {"username": "admin@example.com", "password": "change-me"}}
- )
- username: str = Field(description="Shuffle username.", examples=["admin@example.com"])
- password: str = Field(description="Shuffle password.", examples=["change-me"])
- class LogLossStreamCheck(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "name": "log_monitor",
- "query": "full_log:log_monitor OR rule.id:100411",
- "min_count": 1,
- }
- }
- )
- name: str = Field(description="Logical stream name.", examples=["fortigate"])
- query: str = Field(description="OpenSearch query to count matching logs.", examples=["full_log:*source=fortigate*"])
- min_count: int = Field(default=1, description="Minimum events expected in time window.", examples=[1])
- class LogLossCheckRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "minutes": 5,
- "streams": [
- {"name": "log_monitor", "query": "full_log:log_monitor OR rule.id:100411", "min_count": 1},
- {
- "name": "fortigate",
- "query": "full_log:fortigate OR full_log:FGT80F OR full_log:FGT60F OR full_log:FGT40F OR full_log:FGT501E",
- "min_count": 1,
- },
- {"name": "windows_agent", "query": "full_log:windows_agent OR full_log:windows", "min_count": 1},
- ],
- }
- }
- )
- minutes: int = Field(default=5, description="Lookback window in minutes.", examples=[5])
- streams: list[LogLossStreamCheck] = Field(
- default_factory=list,
- description="Streams to validate. Empty uses default streams.",
- )
- class MvpIncidentIngestRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "source": "wazuh",
- "event_type": "vpn_geo_anomaly",
- "event_id": "evt-1737301024",
- "timestamp": "2026-03-03T06:00:00Z",
- "severity": "high",
- "title": "VPN login anomaly",
- "description": "User logged in from unexpected country",
- "asset": {"user": "alice", "hostname": "win-dc01"},
- "network": {"src_ip": "8.8.8.8", "country": "US"},
- "tags": ["vpn", "geo-anomaly"],
- "risk_context": {"admin_account": True},
- "raw": {"full_log": "..."},
- "payload": {"case_customer": 1},
- }
- }
- )
- source: Literal["wazuh", "shuffle", "manual"] = Field(default="wazuh", description="Source system for the event.", examples=["wazuh"])
- event_type: Literal[
- "ioc_dns",
- "ioc_ips",
- "vpn_geo_anomaly",
- "auth_anomaly",
- "c1_impossible_travel",
- "c2_credential_abuse",
- "c3_lateral_movement",
- "generic",
- ] = Field(default="generic", description="Normalized event category.", examples=["vpn_geo_anomaly"])
- event_id: str = Field(description="Unique event identifier from source.", examples=["evt-1737301024"])
- timestamp: datetime = Field(description="Event timestamp in ISO-8601 format.", examples=["2026-03-03T06:00:00Z"])
- severity: Literal["low", "medium", "high", "critical"] = Field(default="medium", description="Normalized severity.", examples=["high"])
- title: str = Field(description="Short event title.", examples=["VPN login anomaly"])
- description: str = Field(description="Detailed event description.", examples=["User logged in from unexpected country"])
- asset: dict[str, Any] = Field(default_factory=dict, description="Asset and identity context.", examples=[{"user": "alice", "hostname": "win-dc01"}])
- network: dict[str, Any] = Field(default_factory=dict, description="Network context.", examples=[{"src_ip": "8.8.8.8", "country": "US"}])
- tags: list[str] = Field(default_factory=list, description="Search/filter tags.", examples=[["vpn", "geo-anomaly"]])
- risk_context: dict[str, Any] = Field(default_factory=dict, description="Risk indicators used in scoring.", examples=[{"admin_account": True}])
- raw: dict[str, Any] = Field(default_factory=dict, description="Original raw event object.", examples=[{"full_log": "..." }])
- payload: dict[str, Any] = Field(default_factory=dict, description="Additional pipeline payload.", examples=[{"case_customer": 1}])
- class MvpIocEvaluateRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "ioc_type": "ip",
- "ioc_value": "1.1.1.1",
- "source_event": {"event_id": "evt-1", "asset": {"hostname": "fw01"}},
- }
- }
- )
- ioc_type: Literal["domain", "ip"] = Field(description="IOC type for MVP workflow.", examples=["ip"])
- ioc_value: str = Field(description="IOC value to evaluate.", examples=["1.1.1.1"])
- source_event: dict[str, Any] = Field(
- default_factory=dict,
- description="Source event context used for incident linking.",
- examples=[{"event_id": "evt-1", "asset": {"hostname": "fw01"}}],
- )
- class MvpVpnEvaluateRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "user": "alice",
- "src_ip": "8.8.8.8",
- "country_code": "US",
- "success": True,
- "event_time": "2026-03-03T06:00:00Z",
- "is_admin": False,
- "off_hours": True,
- "first_seen_country": True,
- "event_id": "vpn-1737301024",
- }
- }
- )
- user: str = Field(description="Authenticated username.", examples=["alice"])
- src_ip: str = Field(description="Source public IP address.", examples=["8.8.8.8"])
- country_code: str = Field(description="ISO country code of source IP.", examples=["US"])
- success: bool = Field(description="Whether VPN login succeeded.", examples=[True])
- event_time: datetime = Field(description="Authentication timestamp.", examples=["2026-03-03T06:00:00Z"])
- is_admin: bool = Field(default=False, description="User is privileged/admin account.", examples=[False])
- off_hours: bool = Field(default=False, description="Event happened outside business hours.", examples=[True])
- first_seen_country: bool = Field(default=False, description="First observed country for user.", examples=[True])
- event_id: str | None = Field(default=None, description="Optional event ID from upstream.", examples=["vpn-1737301024"])
- class CDetectionEvaluateRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "minutes": 30,
- "query": "soc_mvp_test=true",
- "selectors": ["c1", "c3"],
- "dry_run": True,
- "limit": 200,
- }
- }
- )
- minutes: int = Field(default=30, description="Lookback window in minutes.", examples=[30])
- query: str = Field(default="soc_mvp_test=true", description="OpenSearch query for candidate events.", examples=["soc_mvp_test=true"])
- selectors: list[str] = Field(
- default_factory=list,
- description="Optional use-case filters (c1/c2/c3 or Cx-yy).",
- examples=[["c1", "c3"]],
- )
- dry_run: bool = Field(default=False, description="If true, evaluate only and do not create incidents/tickets.", examples=[True])
- limit: int = Field(default=200, description="Maximum events fetched from Wazuh indexer.", examples=[200])
- class SimLogRunRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "script": "fortigate",
- "target": "all",
- "scenario": "all",
- "count": 1,
- "delay_seconds": 0.3,
- "forever": False,
- }
- }
- )
- script: Literal[
- "fortigate",
- "endpoint",
- "cisco",
- "proposal_required",
- "proposal_appendix_b",
- "proposal_appendix_c",
- "wazuh_test",
- ] = Field(description="Whitelisted simulator script key.", examples=["fortigate"])
- target: str = Field(default="all", description="Primary selector/model/platform for the script.", examples=["all"])
- scenario: str = Field(default="all", description="Secondary selector used by endpoint simulator.", examples=["process"])
- count: int = Field(default=1, description="Number of iterations for non-forever mode.", examples=[1])
- delay_seconds: float = Field(default=0.3, description="Delay between iterations.", examples=[0.3])
- forever: bool = Field(default=False, description="Run continuously until stopped.", examples=[False])
- class IrisAlertCreateRequest(BaseModel):
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "title": "Suspicious login detected",
- "description": "Multiple failed logins followed by success from unusual IP",
- "severity_id": 4,
- "status_id": 2,
- "source": "wazuh",
- "source_ref": "wazuh-alert-12345",
- "customer_id": 1,
- "payload": {"alert_tags": "brute-force,authentication"},
- }
- }
- )
- title: str = Field(description="Alert title.", examples=["Suspicious login detected"])
- description: str = Field(default="Created by soc-integrator", description="Alert description.")
- severity_id: int = Field(default=3, description="IRIS severity ID (1=Info,2=Low,3=Medium,4=High,5=Critical).", examples=[4])
- status_id: int = Field(default=2, description="IRIS alert status ID.", examples=[2])
- source: str = Field(default="soc-integrator", description="Alert source name.", examples=["wazuh"])
- source_ref: str | None = Field(default=None, description="Source-system reference ID.", examples=["wazuh-alert-12345"])
- customer_id: int | None = Field(default=None, description="IRIS customer ID (defaults to configured value).")
- payload: dict[str, Any] = Field(default_factory=dict, description="Additional IRIS alert fields merged into the request.")
- class ApiResponse(BaseModel):
- ok: bool = True
- message: str = "ok"
- timestamp: datetime = Field(default_factory=utc_now)
- data: dict[str, Any] = Field(default_factory=dict)
|