from __future__ import annotations import base64 from typing import Any import httpx class VirusTotalAdapter: def __init__(self, base_url: str, api_key: str) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key def _headers(self) -> dict[str, str]: return {"x-apikey": self.api_key} if self.api_key else {} def _build_path(self, ioc_type: str, ioc_value: str) -> str: value = ioc_value.strip() if ioc_type == "domain": return f"/domains/{value}" if ioc_type == "ip": return f"/ip_addresses/{value}" if ioc_type == "hash": return f"/files/{value}" if ioc_type == "url": # VT URL ID is urlsafe base64(url) without trailing "=" encoded = base64.urlsafe_b64encode(value.encode("utf-8")).decode("utf-8").rstrip("=") return f"/urls/{encoded}" raise ValueError(f"Unsupported IOC type: {ioc_type}") async def enrich_ioc(self, ioc_type: str, ioc_value: str) -> dict[str, Any]: if not self.api_key: raise RuntimeError("VirusTotal API key is not configured") path = self._build_path(ioc_type, ioc_value) url = f"{self.base_url}{path}" headers = self._headers() async with httpx.AsyncClient(timeout=20.0) as client: response = await client.get(url, headers=headers) try: response.raise_for_status() except httpx.HTTPStatusError as exc: detail = response.text.strip() raise RuntimeError( f"VirusTotal returned {response.status_code} for {url}. Response: {detail}" ) from exc return response.json() if response.content else {"status_code": response.status_code} async def upload_file(self, filename: str, content: bytes) -> dict[str, Any]: if not self.api_key: raise RuntimeError("VirusTotal API key is not configured") url = f"{self.base_url}/files" headers = self._headers() files = {"file": (filename, content)} async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, files=files) try: response.raise_for_status() except httpx.HTTPStatusError as exc: detail = response.text.strip() raise RuntimeError( f"VirusTotal returned {response.status_code} for {url}. Response: {detail}" ) from exc return response.json() if response.content else {"status_code": response.status_code} async def get_analysis(self, analysis_id: str) -> dict[str, Any]: if not self.api_key: raise RuntimeError("VirusTotal API key is not configured") url = f"{self.base_url}/analyses/{analysis_id}" headers = self._headers() async with httpx.AsyncClient(timeout=20.0) as client: response = await client.get(url, headers=headers) try: response.raise_for_status() except httpx.HTTPStatusError as exc: detail = response.text.strip() raise RuntimeError( f"VirusTotal returned {response.status_code} for {url}. Response: {detail}" ) from exc return response.json() if response.content else {"status_code": response.status_code}