| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- from typing import Any
- import httpx
- class ShuffleAdapter:
- def __init__(self, base_url: str, api_key: str) -> None:
- self.base_url = base_url.rstrip("/")
- self.api_key = api_key
- def _headers(self) -> dict[str, str]:
- if not self.api_key:
- return {}
- return {
- "Authorization": f"Bearer {self.api_key}",
- "X-Api-Key": self.api_key,
- }
- async def _request(
- self,
- method: str,
- path: str,
- params: dict[str, Any] | None = None,
- payload: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- path = path if path.startswith("/") else f"/{path}"
- url = f"{self.base_url}{path}"
- async with httpx.AsyncClient(timeout=25.0) as client:
- response = await client.request(
- method=method.upper(),
- url=url,
- params=params or None,
- json=payload or None,
- headers=self._headers(),
- )
- try:
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = response.text.strip()
- raise RuntimeError(
- f"Shuffle returned {response.status_code} for {url}. Response: {detail}"
- ) from exc
- return response.json() if response.content else {"status_code": response.status_code}
- async def health(self) -> dict[str, Any]:
- return await self._request("GET", "/api/v1/_ah/health")
- async def auth_test(self) -> dict[str, Any]:
- return await self._request("GET", "/api/v1/users/getinfo")
- async def list_workflows(self) -> dict[str, Any]:
- return await self._request("GET", "/api/v1/workflows")
- async def get_workflow(self, workflow_id: str) -> dict[str, Any]:
- return await self._request("GET", f"/api/v1/workflows/{workflow_id}")
- async def trigger_workflow(self, workflow_id: str, payload: dict[str, Any]) -> dict[str, Any]:
- return await self._request("POST", f"/api/v1/workflows/{workflow_id}/execute", payload=payload)
- async def list_apps(self) -> dict[str, Any]:
- return await self._request("GET", "/api/v1/apps")
- async def proxy(
- self,
- method: str,
- path: str,
- params: dict[str, Any] | None = None,
- payload: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- return await self._request(method=method, path=path, params=params, payload=payload)
- async def login(self, username: str, password: str) -> dict[str, Any]:
- return await self._request(
- "POST",
- "/api/v1/users/login",
- payload={"username": username, "password": password},
- )
- async def generate_apikey_from_login(self, username: str, password: str) -> dict[str, Any]:
- login_url = f"{self.base_url}/api/v1/users/login"
- api_key_url = f"{self.base_url}/api/v1/users/generateapikey"
- async with httpx.AsyncClient(timeout=25.0) as client:
- login_resp = await client.post(login_url, json={"username": username, "password": password})
- login_resp.raise_for_status()
- generate_resp = await client.get(api_key_url)
- try:
- generate_resp.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = generate_resp.text.strip()
- raise RuntimeError(
- f"Shuffle returned {generate_resp.status_code} for {api_key_url}. Response: {detail}"
- ) from exc
- return generate_resp.json() if generate_resp.content else {"status_code": generate_resp.status_code}
|