Rockbox open source high quality audio player as a Music Player Daemon
mpris rockbox mpd libadwaita audio rust zig deno
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 51 lines 1.4 kB view raw
1"""02 — Now playing (real-time subscriptions). 2 3Opens a WebSocket and prints every track / status / queue change. Ctrl+C exits. 4 5 uv run python examples/02_now_playing.py 6""" 7 8from __future__ import annotations 9 10import asyncio 11import contextlib 12 13from _client import create_client # type: ignore[import-not-found] 14 15from rockbox_sdk import PlaybackStatus, Playlist, Track 16 17 18async def main() -> None: 19 async with create_client() as client: 20 await client.connect() 21 22 @client.on_track_changed 23 def _track(track: Track) -> None: 24 print(f"{track.title}{track.artist} [{track.album}]") 25 26 @client.on_status_changed 27 def _status(raw: int) -> None: 28 try: 29 label = PlaybackStatus(raw).name 30 except ValueError: 31 label = str(raw) 32 print(f"{label}") 33 34 @client.on_playlist_changed 35 def _queue(q: Playlist) -> None: 36 print(f"☰ queue updated — {q.amount} tracks (index {q.index})") 37 38 @client.on("ws:error") 39 def _err(err: Exception) -> None: 40 print(f"✗ websocket error: {err}") 41 42 print("Listening for events. Press Ctrl+C to exit.") 43 with contextlib.suppress(asyncio.CancelledError): 44 await asyncio.Event().wait() 45 46 47if __name__ == "__main__": 48 try: 49 asyncio.run(main()) 50 except KeyboardInterrupt: 51 print("\nDisconnecting...")