| 123456789101112131415161718 |
- from typing import Any
- import httpx
- class PagerDutyAdapter:
- def __init__(self, base_url: str, api_key: str) -> None:
- self.base_url = base_url.rstrip("/")
- self.api_key = api_key
- async def create_incident(self, payload: dict[str, Any]) -> dict[str, Any]:
- url = f"{self.base_url}/incidents"
- headers = {"Authorization": f"Token token={self.api_key}"} if self.api_key else {}
- async with httpx.AsyncClient(timeout=20.0) as client:
- response = await client.post(url, json=payload, headers=headers)
- response.raise_for_status()
- return response.json() if response.content else {"status_code": response.status_code}
|