| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from __future__ import annotations
- import base64
- from typing import Any
- import httpx
- class VirusTotalAdapter:
- def __init__(self, base_url: str, api_key: str) -> None:
- self.base_url = base_url.rstrip("/")
- self.api_key = api_key
- def _headers(self) -> dict[str, str]:
- return {"x-apikey": self.api_key} if self.api_key else {}
- def _build_path(self, ioc_type: str, ioc_value: str) -> str:
- value = ioc_value.strip()
- if ioc_type == "domain":
- return f"/domains/{value}"
- if ioc_type == "ip":
- return f"/ip_addresses/{value}"
- if ioc_type == "hash":
- return f"/files/{value}"
- if ioc_type == "url":
- # VT URL ID is urlsafe base64(url) without trailing "="
- encoded = base64.urlsafe_b64encode(value.encode("utf-8")).decode("utf-8").rstrip("=")
- return f"/urls/{encoded}"
- raise ValueError(f"Unsupported IOC type: {ioc_type}")
- async def enrich_ioc(self, ioc_type: str, ioc_value: str) -> dict[str, Any]:
- if not self.api_key:
- raise RuntimeError("VirusTotal API key is not configured")
- path = self._build_path(ioc_type, ioc_value)
- url = f"{self.base_url}{path}"
- headers = self._headers()
- async with httpx.AsyncClient(timeout=20.0) as client:
- response = await client.get(url, headers=headers)
- try:
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = response.text.strip()
- raise RuntimeError(
- f"VirusTotal returned {response.status_code} for {url}. Response: {detail}"
- ) from exc
- return response.json() if response.content else {"status_code": response.status_code}
- async def upload_file(self, filename: str, content: bytes) -> dict[str, Any]:
- if not self.api_key:
- raise RuntimeError("VirusTotal API key is not configured")
- url = f"{self.base_url}/files"
- headers = self._headers()
- files = {"file": (filename, content)}
- async with httpx.AsyncClient(timeout=60.0) as client:
- response = await client.post(url, headers=headers, files=files)
- try:
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = response.text.strip()
- raise RuntimeError(
- f"VirusTotal returned {response.status_code} for {url}. Response: {detail}"
- ) from exc
- return response.json() if response.content else {"status_code": response.status_code}
- async def get_analysis(self, analysis_id: str) -> dict[str, Any]:
- if not self.api_key:
- raise RuntimeError("VirusTotal API key is not configured")
- url = f"{self.base_url}/analyses/{analysis_id}"
- headers = self._headers()
- async with httpx.AsyncClient(timeout=20.0) as client:
- response = await client.get(url, headers=headers)
- try:
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = response.text.strip()
- raise RuntimeError(
- f"VirusTotal returned {response.status_code} for {url}. Response: {detail}"
- ) from exc
- return response.json() if response.content else {"status_code": response.status_code}
|