Rockbox open source high quality audio player as a Music Player Daemon
mpris
rockbox
mpd
libadwaita
audio
rust
zig
deno
1"""02 — Now playing (real-time subscriptions).
2
3Opens a WebSocket and prints every track / status / queue change. Ctrl+C exits.
4
5 uv run python examples/02_now_playing.py
6"""
7
8from __future__ import annotations
9
10import asyncio
11import contextlib
12
13from _client import create_client # type: ignore[import-not-found]
14
15from rockbox_sdk import PlaybackStatus, Playlist, Track
16
17
18async def main() -> None:
19 async with create_client() as client:
20 await client.connect()
21
22 @client.on_track_changed
23 def _track(track: Track) -> None:
24 print(f"▶ {track.title} — {track.artist} [{track.album}]")
25
26 @client.on_status_changed
27 def _status(raw: int) -> None:
28 try:
29 label = PlaybackStatus(raw).name
30 except ValueError:
31 label = str(raw)
32 print(f"◐ {label}")
33
34 @client.on_playlist_changed
35 def _queue(q: Playlist) -> None:
36 print(f"☰ queue updated — {q.amount} tracks (index {q.index})")
37
38 @client.on("ws:error")
39 def _err(err: Exception) -> None:
40 print(f"✗ websocket error: {err}")
41
42 print("Listening for events. Press Ctrl+C to exit.")
43 with contextlib.suppress(asyncio.CancelledError):
44 await asyncio.Event().wait()
45
46
47if __name__ == "__main__":
48 try:
49 asyncio.run(main())
50 except KeyboardInterrupt:
51 print("\nDisconnecting...")