Rockbox open source high quality audio player as a Music Player Daemon
mpris rockbox mpd libadwaita audio rust zig deno
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 331 lines 11 kB view raw
1"""HTTP and WebSocket transport for GraphQL. 2 3``HttpTransport`` is a thin wrapper around ``httpx.AsyncClient`` — one POST per 4operation, no caching. ``WsTransport`` implements the 5`graphql-transport-ws <https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md>`_ 6sub-protocol on top of ``websockets``, with infinite reconnects and exponential 7backoff. 8""" 9 10from __future__ import annotations 11 12import asyncio 13import json 14import logging 15from collections.abc import AsyncIterator, Awaitable, Callable 16from typing import Any 17 18import httpx 19import websockets 20import websockets.exceptions 21 22from .errors import RockboxGraphQLError, RockboxNetworkError 23 24# Type-only — works for websockets >= 11 25try: 26 from websockets.asyncio.client import ( 27 ClientConnection as _WsConn, # type: ignore[import-not-found] 28 ) 29except ImportError: # pragma: no cover - very old websockets 30 from websockets.client import _WsConn as _WsConn # type: ignore[no-redef] 31 32logger = logging.getLogger(__name__) 33 34 35# --------------------------------------------------------------------------- 36# HTTP transport 37# --------------------------------------------------------------------------- 38 39 40class HttpTransport: 41 """Async GraphQL-over-HTTP transport. 42 43 The client is created lazily on first use and reused for the lifetime of 44 the transport. Call :meth:`aclose` (or use the parent client's context 45 manager) to release the underlying connection pool. 46 """ 47 48 def __init__(self, url: str, *, timeout: float = 30.0) -> None: 49 self.url = url 50 self._timeout = timeout 51 self._client: httpx.AsyncClient | None = None 52 53 def _ensure_client(self) -> httpx.AsyncClient: 54 if self._client is None: 55 self._client = httpx.AsyncClient(timeout=self._timeout) 56 return self._client 57 58 async def execute( 59 self, 60 query: str, 61 variables: dict[str, Any] | None = None, 62 ) -> Any: 63 """Execute a single GraphQL operation and return the ``data`` payload.""" 64 client = self._ensure_client() 65 try: 66 response = await client.post( 67 self.url, 68 json={"query": query, "variables": variables or {}}, 69 headers={"Content-Type": "application/json", "Accept": "application/json"}, 70 ) 71 except httpx.HTTPError as err: 72 raise RockboxNetworkError(f"Failed to reach Rockbox at {self.url}", err) from err 73 74 if response.status_code >= 400: 75 raise RockboxNetworkError( 76 f"HTTP {response.status_code} {response.reason_phrase}" 77 ) 78 79 try: 80 payload = response.json() 81 except json.JSONDecodeError as err: 82 raise RockboxNetworkError("Server returned non-JSON response", err) from err 83 84 errors = payload.get("errors") or [] 85 if errors: 86 raise RockboxGraphQLError(errors) 87 return payload.get("data") or {} 88 89 async def aclose(self) -> None: 90 if self._client is not None: 91 await self._client.aclose() 92 self._client = None 93 94 95# --------------------------------------------------------------------------- 96# WebSocket transport — graphql-transport-ws subprotocol 97# --------------------------------------------------------------------------- 98 99 100SubscriptionSink = Callable[[dict[str, Any]], Awaitable[None] | None] 101ErrorSink = Callable[[Exception], Awaitable[None] | None] 102 103 104def _is_closed(ws: Any) -> bool: 105 """Compatibility shim — both legacy (.closed) and asyncio (.state) APIs.""" 106 if hasattr(ws, "state"): 107 # websockets >= 13: state is an enum, OPEN/CONNECTING are not closed 108 from websockets.protocol import State 109 110 return ws.state not in (State.OPEN, State.CONNECTING) 111 return bool(getattr(ws, "closed", False)) 112 113 114class WsTransport: 115 """Lazy, auto-reconnecting GraphQL subscription client. 116 117 The connection is established on the first ``subscribe`` call and shared 118 across all active subscriptions. On disconnect the transport reconnects 119 with exponential backoff and re-issues every still-active subscription. 120 """ 121 122 PROTOCOL = "graphql-transport-ws" 123 124 def __init__( 125 self, 126 url: str, 127 *, 128 on_open: Callable[[], Awaitable[None] | None] | None = None, 129 on_close: Callable[[], Awaitable[None] | None] | None = None, 130 on_error: ErrorSink | None = None, 131 ) -> None: 132 self.url = url 133 self._on_open = on_open 134 self._on_close = on_close 135 self._on_error = on_error 136 137 self._ws: Any | None = None 138 self._reader_task: asyncio.Task[None] | None = None 139 self._connect_lock = asyncio.Lock() 140 self._next_id = 0 141 self._subs: dict[str, _Subscription] = {} 142 self._closed = False 143 144 async def subscribe( 145 self, 146 query: str, 147 variables: dict[str, Any] | None, 148 on_next: SubscriptionSink, 149 on_error: ErrorSink | None = None, 150 ) -> Callable[[], Awaitable[None]]: 151 """Start a subscription and return a coroutine that cancels it.""" 152 await self._ensure_connected() 153 154 sub_id = str(self._next_id) 155 self._next_id += 1 156 157 sub = _Subscription( 158 id=sub_id, 159 query=query, 160 variables=variables or {}, 161 on_next=on_next, 162 on_error=on_error or self._on_error, 163 ) 164 self._subs[sub_id] = sub 165 await self._send_subscribe(sub) 166 167 async def unsubscribe() -> None: 168 self._subs.pop(sub_id, None) 169 if self._ws is not None and not _is_closed(self._ws): 170 try: 171 await self._ws.send(json.dumps({"id": sub_id, "type": "complete"})) 172 except Exception: # noqa: BLE001 173 pass 174 175 return unsubscribe 176 177 async def aclose(self) -> None: 178 self._closed = True 179 self._subs.clear() 180 if self._reader_task is not None: 181 self._reader_task.cancel() 182 try: 183 await self._reader_task 184 except (asyncio.CancelledError, Exception): # noqa: BLE001 185 pass 186 self._reader_task = None 187 if self._ws is not None: 188 try: 189 await self._ws.close() 190 except Exception: # noqa: BLE001 191 pass 192 self._ws = None 193 194 # ---- internals ------------------------------------------------------ 195 196 async def _ensure_connected(self) -> None: 197 if self._closed: 198 raise RuntimeError("WsTransport has been closed") 199 async with self._connect_lock: 200 if self._ws is not None and not _is_closed(self._ws): 201 return 202 await self._connect_once() 203 204 async def _connect_once(self) -> None: 205 try: 206 ws = await websockets.connect(self.url, subprotocols=[self.PROTOCOL]) 207 except Exception as err: # noqa: BLE001 208 raise RockboxNetworkError(f"Failed to open WebSocket to {self.url}", err) from err 209 210 await ws.send(json.dumps({"type": "connection_init"})) 211 ack = json.loads(await ws.recv()) 212 if ack.get("type") != "connection_ack": 213 await ws.close() 214 raise RockboxNetworkError(f"WebSocket handshake failed: {ack!r}") 215 216 self._ws = ws 217 await self._fire(self._on_open) 218 self._reader_task = asyncio.create_task(self._reader_loop(ws)) 219 220 async def _reader_loop(self, ws: Any) -> None: 221 try: 222 async for raw in ws: 223 try: 224 msg = json.loads(raw) 225 except json.JSONDecodeError: 226 continue 227 await self._dispatch(msg) 228 except (websockets.exceptions.ConnectionClosed, ConnectionError) as err: 229 if self._closed: 230 return 231 await self._fire(self._on_close) 232 await self._reconnect_with_backoff(err) 233 except Exception as err: # noqa: BLE001 234 await self._fire_error(err) 235 236 async def _dispatch(self, msg: dict[str, Any]) -> None: 237 msg_type = msg.get("type") 238 sub_id = msg.get("id") 239 sub = self._subs.get(sub_id) if sub_id is not None else None 240 241 if msg_type == "next" and sub is not None: 242 payload = msg.get("payload") or {} 243 try: 244 result = sub.on_next(payload) 245 if asyncio.iscoroutine(result): 246 await result 247 except Exception as err: # noqa: BLE001 248 if sub.on_error is not None: 249 await self._fire_error(err, sub.on_error) 250 elif msg_type == "error" and sub is not None: 251 err = RockboxGraphQLError(msg.get("payload") or []) 252 if sub.on_error is not None: 253 await self._fire_error(err, sub.on_error) 254 elif msg_type == "complete" and sub_id is not None: 255 self._subs.pop(sub_id, None) 256 257 async def _send_subscribe(self, sub: _Subscription) -> None: 258 assert self._ws is not None 259 await self._ws.send( 260 json.dumps( 261 { 262 "id": sub.id, 263 "type": "subscribe", 264 "payload": {"query": sub.query, "variables": sub.variables}, 265 } 266 ) 267 ) 268 269 async def _reconnect_with_backoff(self, _err: Exception) -> None: 270 attempt = 0 271 while not self._closed: 272 delay = min(2**attempt, 30) 273 await asyncio.sleep(delay) 274 attempt += 1 275 try: 276 await self._connect_once() 277 except Exception as err: # noqa: BLE001 278 await self._fire_error(err) 279 continue 280 # Resubscribe everything we still care about 281 for sub in list(self._subs.values()): 282 try: 283 await self._send_subscribe(sub) 284 except Exception as err: # noqa: BLE001 285 await self._fire_error(err) 286 return 287 288 @staticmethod 289 async def _fire(cb: Callable[[], Awaitable[None] | None] | None) -> None: 290 if cb is None: 291 return 292 try: 293 res = cb() 294 if asyncio.iscoroutine(res): 295 await res 296 except Exception: # noqa: BLE001 297 logger.exception("event callback raised") 298 299 async def _fire_error(self, err: Exception, sink: ErrorSink | None = None) -> None: 300 target = sink or self._on_error 301 if target is None: 302 logger.warning("ws transport error: %s", err) 303 return 304 try: 305 res = target(err) 306 if asyncio.iscoroutine(res): 307 await res 308 except Exception: # noqa: BLE001 309 logger.exception("error callback raised") 310 311 312class _Subscription: 313 __slots__ = ("id", "query", "variables", "on_next", "on_error") 314 315 def __init__( 316 self, 317 id: str, 318 query: str, 319 variables: dict[str, Any], 320 on_next: SubscriptionSink, 321 on_error: ErrorSink | None, 322 ) -> None: 323 self.id = id 324 self.query = query 325 self.variables = variables 326 self.on_next = on_next 327 self.on_error = on_error 328 329 330# Re-exported for type checkers — kept out of the public package surface 331__all__ = ["HttpTransport", "WsTransport", "AsyncIterator"]