from typing import Any import httpx class ShuffleAdapter: def __init__(self, base_url: str, api_key: str) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key def _headers(self) -> dict[str, str]: if not self.api_key: return {} return { "Authorization": f"Bearer {self.api_key}", "X-Api-Key": self.api_key, } async def _request( self, method: str, path: str, params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: path = path if path.startswith("/") else f"/{path}" url = f"{self.base_url}{path}" async with httpx.AsyncClient(timeout=25.0) as client: response = await client.request( method=method.upper(), url=url, params=params or None, json=payload or None, headers=self._headers(), ) try: response.raise_for_status() except httpx.HTTPStatusError as exc: detail = response.text.strip() raise RuntimeError( f"Shuffle returned {response.status_code} for {url}. Response: {detail}" ) from exc return response.json() if response.content else {"status_code": response.status_code} async def health(self) -> dict[str, Any]: return await self._request("GET", "/api/v1/_ah/health") async def auth_test(self) -> dict[str, Any]: return await self._request("GET", "/api/v1/users/getinfo") async def list_workflows(self) -> dict[str, Any]: return await self._request("GET", "/api/v1/workflows") async def get_workflow(self, workflow_id: str) -> dict[str, Any]: return await self._request("GET", f"/api/v1/workflows/{workflow_id}") async def trigger_workflow(self, workflow_id: str, payload: dict[str, Any]) -> dict[str, Any]: return await self._request("POST", f"/api/v1/workflows/{workflow_id}/execute", payload=payload) async def list_apps(self) -> dict[str, Any]: return await self._request("GET", "/api/v1/apps") async def proxy( self, method: str, path: str, params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: return await self._request(method=method, path=path, params=params, payload=payload) async def login(self, username: str, password: str) -> dict[str, Any]: return await self._request( "POST", "/api/v1/users/login", payload={"username": username, "password": password}, ) async def generate_apikey_from_login(self, username: str, password: str) -> dict[str, Any]: login_url = f"{self.base_url}/api/v1/users/login" api_key_url = f"{self.base_url}/api/v1/users/generateapikey" async with httpx.AsyncClient(timeout=25.0) as client: login_resp = await client.post(login_url, json={"username": username, "password": password}) login_resp.raise_for_status() generate_resp = await client.get(api_key_url) try: generate_resp.raise_for_status() except httpx.HTTPStatusError as exc: detail = generate_resp.text.strip() raise RuntimeError( f"Shuffle returned {generate_resp.status_code} for {api_key_url}. Response: {detail}" ) from exc return generate_resp.json() if generate_resp.content else {"status_code": generate_resp.status_code}