Rockbox open source high quality audio player as a Music Player Daemon
mpris
rockbox
mpd
libadwaita
audio
rust
zig
deno
1"""HTTP and WebSocket transport for GraphQL.
2
3``HttpTransport`` is a thin wrapper around ``httpx.AsyncClient`` — one POST per
4operation, no caching. ``WsTransport`` implements the
5`graphql-transport-ws <https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md>`_
6sub-protocol on top of ``websockets``, with infinite reconnects and exponential
7backoff.
8"""
9
10from __future__ import annotations
11
12import asyncio
13import json
14import logging
15from collections.abc import AsyncIterator, Awaitable, Callable
16from typing import Any
17
18import httpx
19import websockets
20import websockets.exceptions
21
22from .errors import RockboxGraphQLError, RockboxNetworkError
23
24# Type-only — works for websockets >= 11
25try:
26 from websockets.asyncio.client import (
27 ClientConnection as _WsConn, # type: ignore[import-not-found]
28 )
29except ImportError: # pragma: no cover - very old websockets
30 from websockets.client import _WsConn as _WsConn # type: ignore[no-redef]
31
32logger = logging.getLogger(__name__)
33
34
35# ---------------------------------------------------------------------------
36# HTTP transport
37# ---------------------------------------------------------------------------
38
39
40class HttpTransport:
41 """Async GraphQL-over-HTTP transport.
42
43 The client is created lazily on first use and reused for the lifetime of
44 the transport. Call :meth:`aclose` (or use the parent client's context
45 manager) to release the underlying connection pool.
46 """
47
48 def __init__(self, url: str, *, timeout: float = 30.0) -> None:
49 self.url = url
50 self._timeout = timeout
51 self._client: httpx.AsyncClient | None = None
52
53 def _ensure_client(self) -> httpx.AsyncClient:
54 if self._client is None:
55 self._client = httpx.AsyncClient(timeout=self._timeout)
56 return self._client
57
58 async def execute(
59 self,
60 query: str,
61 variables: dict[str, Any] | None = None,
62 ) -> Any:
63 """Execute a single GraphQL operation and return the ``data`` payload."""
64 client = self._ensure_client()
65 try:
66 response = await client.post(
67 self.url,
68 json={"query": query, "variables": variables or {}},
69 headers={"Content-Type": "application/json", "Accept": "application/json"},
70 )
71 except httpx.HTTPError as err:
72 raise RockboxNetworkError(f"Failed to reach Rockbox at {self.url}", err) from err
73
74 if response.status_code >= 400:
75 raise RockboxNetworkError(
76 f"HTTP {response.status_code} {response.reason_phrase}"
77 )
78
79 try:
80 payload = response.json()
81 except json.JSONDecodeError as err:
82 raise RockboxNetworkError("Server returned non-JSON response", err) from err
83
84 errors = payload.get("errors") or []
85 if errors:
86 raise RockboxGraphQLError(errors)
87 return payload.get("data") or {}
88
89 async def aclose(self) -> None:
90 if self._client is not None:
91 await self._client.aclose()
92 self._client = None
93
94
95# ---------------------------------------------------------------------------
96# WebSocket transport — graphql-transport-ws subprotocol
97# ---------------------------------------------------------------------------
98
99
100SubscriptionSink = Callable[[dict[str, Any]], Awaitable[None] | None]
101ErrorSink = Callable[[Exception], Awaitable[None] | None]
102
103
104def _is_closed(ws: Any) -> bool:
105 """Compatibility shim — both legacy (.closed) and asyncio (.state) APIs."""
106 if hasattr(ws, "state"):
107 # websockets >= 13: state is an enum, OPEN/CONNECTING are not closed
108 from websockets.protocol import State
109
110 return ws.state not in (State.OPEN, State.CONNECTING)
111 return bool(getattr(ws, "closed", False))
112
113
114class WsTransport:
115 """Lazy, auto-reconnecting GraphQL subscription client.
116
117 The connection is established on the first ``subscribe`` call and shared
118 across all active subscriptions. On disconnect the transport reconnects
119 with exponential backoff and re-issues every still-active subscription.
120 """
121
122 PROTOCOL = "graphql-transport-ws"
123
124 def __init__(
125 self,
126 url: str,
127 *,
128 on_open: Callable[[], Awaitable[None] | None] | None = None,
129 on_close: Callable[[], Awaitable[None] | None] | None = None,
130 on_error: ErrorSink | None = None,
131 ) -> None:
132 self.url = url
133 self._on_open = on_open
134 self._on_close = on_close
135 self._on_error = on_error
136
137 self._ws: Any | None = None
138 self._reader_task: asyncio.Task[None] | None = None
139 self._connect_lock = asyncio.Lock()
140 self._next_id = 0
141 self._subs: dict[str, _Subscription] = {}
142 self._closed = False
143
144 async def subscribe(
145 self,
146 query: str,
147 variables: dict[str, Any] | None,
148 on_next: SubscriptionSink,
149 on_error: ErrorSink | None = None,
150 ) -> Callable[[], Awaitable[None]]:
151 """Start a subscription and return a coroutine that cancels it."""
152 await self._ensure_connected()
153
154 sub_id = str(self._next_id)
155 self._next_id += 1
156
157 sub = _Subscription(
158 id=sub_id,
159 query=query,
160 variables=variables or {},
161 on_next=on_next,
162 on_error=on_error or self._on_error,
163 )
164 self._subs[sub_id] = sub
165 await self._send_subscribe(sub)
166
167 async def unsubscribe() -> None:
168 self._subs.pop(sub_id, None)
169 if self._ws is not None and not _is_closed(self._ws):
170 try:
171 await self._ws.send(json.dumps({"id": sub_id, "type": "complete"}))
172 except Exception: # noqa: BLE001
173 pass
174
175 return unsubscribe
176
177 async def aclose(self) -> None:
178 self._closed = True
179 self._subs.clear()
180 if self._reader_task is not None:
181 self._reader_task.cancel()
182 try:
183 await self._reader_task
184 except (asyncio.CancelledError, Exception): # noqa: BLE001
185 pass
186 self._reader_task = None
187 if self._ws is not None:
188 try:
189 await self._ws.close()
190 except Exception: # noqa: BLE001
191 pass
192 self._ws = None
193
194 # ---- internals ------------------------------------------------------
195
196 async def _ensure_connected(self) -> None:
197 if self._closed:
198 raise RuntimeError("WsTransport has been closed")
199 async with self._connect_lock:
200 if self._ws is not None and not _is_closed(self._ws):
201 return
202 await self._connect_once()
203
204 async def _connect_once(self) -> None:
205 try:
206 ws = await websockets.connect(self.url, subprotocols=[self.PROTOCOL])
207 except Exception as err: # noqa: BLE001
208 raise RockboxNetworkError(f"Failed to open WebSocket to {self.url}", err) from err
209
210 await ws.send(json.dumps({"type": "connection_init"}))
211 ack = json.loads(await ws.recv())
212 if ack.get("type") != "connection_ack":
213 await ws.close()
214 raise RockboxNetworkError(f"WebSocket handshake failed: {ack!r}")
215
216 self._ws = ws
217 await self._fire(self._on_open)
218 self._reader_task = asyncio.create_task(self._reader_loop(ws))
219
220 async def _reader_loop(self, ws: Any) -> None:
221 try:
222 async for raw in ws:
223 try:
224 msg = json.loads(raw)
225 except json.JSONDecodeError:
226 continue
227 await self._dispatch(msg)
228 except (websockets.exceptions.ConnectionClosed, ConnectionError) as err:
229 if self._closed:
230 return
231 await self._fire(self._on_close)
232 await self._reconnect_with_backoff(err)
233 except Exception as err: # noqa: BLE001
234 await self._fire_error(err)
235
236 async def _dispatch(self, msg: dict[str, Any]) -> None:
237 msg_type = msg.get("type")
238 sub_id = msg.get("id")
239 sub = self._subs.get(sub_id) if sub_id is not None else None
240
241 if msg_type == "next" and sub is not None:
242 payload = msg.get("payload") or {}
243 try:
244 result = sub.on_next(payload)
245 if asyncio.iscoroutine(result):
246 await result
247 except Exception as err: # noqa: BLE001
248 if sub.on_error is not None:
249 await self._fire_error(err, sub.on_error)
250 elif msg_type == "error" and sub is not None:
251 err = RockboxGraphQLError(msg.get("payload") or [])
252 if sub.on_error is not None:
253 await self._fire_error(err, sub.on_error)
254 elif msg_type == "complete" and sub_id is not None:
255 self._subs.pop(sub_id, None)
256
257 async def _send_subscribe(self, sub: _Subscription) -> None:
258 assert self._ws is not None
259 await self._ws.send(
260 json.dumps(
261 {
262 "id": sub.id,
263 "type": "subscribe",
264 "payload": {"query": sub.query, "variables": sub.variables},
265 }
266 )
267 )
268
269 async def _reconnect_with_backoff(self, _err: Exception) -> None:
270 attempt = 0
271 while not self._closed:
272 delay = min(2**attempt, 30)
273 await asyncio.sleep(delay)
274 attempt += 1
275 try:
276 await self._connect_once()
277 except Exception as err: # noqa: BLE001
278 await self._fire_error(err)
279 continue
280 # Resubscribe everything we still care about
281 for sub in list(self._subs.values()):
282 try:
283 await self._send_subscribe(sub)
284 except Exception as err: # noqa: BLE001
285 await self._fire_error(err)
286 return
287
288 @staticmethod
289 async def _fire(cb: Callable[[], Awaitable[None] | None] | None) -> None:
290 if cb is None:
291 return
292 try:
293 res = cb()
294 if asyncio.iscoroutine(res):
295 await res
296 except Exception: # noqa: BLE001
297 logger.exception("event callback raised")
298
299 async def _fire_error(self, err: Exception, sink: ErrorSink | None = None) -> None:
300 target = sink or self._on_error
301 if target is None:
302 logger.warning("ws transport error: %s", err)
303 return
304 try:
305 res = target(err)
306 if asyncio.iscoroutine(res):
307 await res
308 except Exception: # noqa: BLE001
309 logger.exception("error callback raised")
310
311
312class _Subscription:
313 __slots__ = ("id", "query", "variables", "on_next", "on_error")
314
315 def __init__(
316 self,
317 id: str,
318 query: str,
319 variables: dict[str, Any],
320 on_next: SubscriptionSink,
321 on_error: ErrorSink | None,
322 ) -> None:
323 self.id = id
324 self.query = query
325 self.variables = variables
326 self.on_next = on_next
327 self.on_error = on_error
328
329
330# Re-exported for type checkers — kept out of the public package surface
331__all__ = ["HttpTransport", "WsTransport", "AsyncIterator"]