Geen omschrijving

virustotal.py 3.4KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from __future__ import annotations
  2. import base64
  3. from typing import Any
  4. import httpx
  5. class VirusTotalAdapter:
  6. def __init__(self, base_url: str, api_key: str) -> None:
  7. self.base_url = base_url.rstrip("/")
  8. self.api_key = api_key
  9. def _headers(self) -> dict[str, str]:
  10. return {"x-apikey": self.api_key} if self.api_key else {}
  11. def _build_path(self, ioc_type: str, ioc_value: str) -> str:
  12. value = ioc_value.strip()
  13. if ioc_type == "domain":
  14. return f"/domains/{value}"
  15. if ioc_type == "ip":
  16. return f"/ip_addresses/{value}"
  17. if ioc_type == "hash":
  18. return f"/files/{value}"
  19. if ioc_type == "url":
  20. # VT URL ID is urlsafe base64(url) without trailing "="
  21. encoded = base64.urlsafe_b64encode(value.encode("utf-8")).decode("utf-8").rstrip("=")
  22. return f"/urls/{encoded}"
  23. raise ValueError(f"Unsupported IOC type: {ioc_type}")
  24. async def enrich_ioc(self, ioc_type: str, ioc_value: str) -> dict[str, Any]:
  25. if not self.api_key:
  26. raise RuntimeError("VirusTotal API key is not configured")
  27. path = self._build_path(ioc_type, ioc_value)
  28. url = f"{self.base_url}{path}"
  29. headers = self._headers()
  30. async with httpx.AsyncClient(timeout=20.0) as client:
  31. response = await client.get(url, headers=headers)
  32. try:
  33. response.raise_for_status()
  34. except httpx.HTTPStatusError as exc:
  35. detail = response.text.strip()
  36. raise RuntimeError(
  37. f"VirusTotal returned {response.status_code} for {url}. Response: {detail}"
  38. ) from exc
  39. return response.json() if response.content else {"status_code": response.status_code}
  40. async def upload_file(self, filename: str, content: bytes) -> dict[str, Any]:
  41. if not self.api_key:
  42. raise RuntimeError("VirusTotal API key is not configured")
  43. url = f"{self.base_url}/files"
  44. headers = self._headers()
  45. files = {"file": (filename, content)}
  46. async with httpx.AsyncClient(timeout=60.0) as client:
  47. response = await client.post(url, headers=headers, files=files)
  48. try:
  49. response.raise_for_status()
  50. except httpx.HTTPStatusError as exc:
  51. detail = response.text.strip()
  52. raise RuntimeError(
  53. f"VirusTotal returned {response.status_code} for {url}. Response: {detail}"
  54. ) from exc
  55. return response.json() if response.content else {"status_code": response.status_code}
  56. async def get_analysis(self, analysis_id: str) -> dict[str, Any]:
  57. if not self.api_key:
  58. raise RuntimeError("VirusTotal API key is not configured")
  59. url = f"{self.base_url}/analyses/{analysis_id}"
  60. headers = self._headers()
  61. async with httpx.AsyncClient(timeout=20.0) as client:
  62. response = await client.get(url, headers=headers)
  63. try:
  64. response.raise_for_status()
  65. except httpx.HTTPStatusError as exc:
  66. detail = response.text.strip()
  67. raise RuntimeError(
  68. f"VirusTotal returned {response.status_code} for {url}. Response: {detail}"
  69. ) from exc
  70. return response.json() if response.content else {"status_code": response.status_code}