Rockbox open source high quality audio player as a Music Player Daemon
mpris rockbox mpd libadwaita audio rust zig deno
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 124 lines 4.0 kB view raw
1"""Lightweight async event emitter used by ``RockboxClient``. 2 3Listeners may be sync (``def``) or async (``async def``); the emitter awaits 4the latter and calls the former synchronously. There is no Node-style 5``EventEmitter`` dependency. 6""" 7 8from __future__ import annotations 9 10import asyncio 11import inspect 12from collections import defaultdict 13from collections.abc import Awaitable, Callable 14from typing import Any, TypeAlias 15 16# Public event names emitted by the client. 17EventName: TypeAlias = str 18 19#: Event names emitted by the SDK. Importing as constants keeps editors honest. 20TRACK_CHANGED: EventName = "track:changed" 21"""Fires whenever the currently playing track changes. Payload: ``Track``.""" 22 23STATUS_CHANGED: EventName = "status:changed" 24"""Fires when playback status changes. Payload: ``int`` (raw firmware status).""" 25 26PLAYLIST_CHANGED: EventName = "playlist:changed" 27"""Fires when the active playlist changes. Payload: ``Playlist``.""" 28 29WS_OPEN: EventName = "ws:open" 30"""WebSocket connection opened. No payload.""" 31 32WS_CLOSE: EventName = "ws:close" 33"""WebSocket connection closed. No payload.""" 34 35WS_ERROR: EventName = "ws:error" 36"""WebSocket or subscription error. Payload: ``Exception``.""" 37 38 39Listener: TypeAlias = Callable[..., Any] | Callable[..., Awaitable[Any]] 40 41 42class EventEmitter: 43 """Async-aware event emitter. 44 45 Usage: 46 emitter = EventEmitter() 47 emitter.on("track:changed", lambda t: print(t.title)) 48 49 @emitter.on("track:changed") 50 async def on_track(track): 51 ... 52 """ 53 54 def __init__(self) -> None: 55 self._listeners: dict[str, list[Listener]] = defaultdict(list) 56 57 def on(self, event: str, listener: Listener | None = None) -> Any: 58 """Register a listener. Usable as a decorator when ``listener`` is omitted.""" 59 if listener is None: 60 61 def decorator(fn: Listener) -> Listener: 62 self._listeners[event].append(fn) 63 return fn 64 65 return decorator 66 self._listeners[event].append(listener) 67 return self 68 69 def once(self, event: str, listener: Listener | None = None) -> Any: 70 """Register a listener that fires at most once.""" 71 72 def wrap(fn: Listener) -> Listener: 73 async def wrapper(*args: Any, **kwargs: Any) -> None: 74 self.off(event, wrapper) 75 result = fn(*args, **kwargs) 76 if inspect.isawaitable(result): 77 await result 78 79 self._listeners[event].append(wrapper) 80 return fn 81 82 if listener is None: 83 return wrap 84 wrap(listener) 85 return self 86 87 def off(self, event: str, listener: Listener) -> EventEmitter: 88 """Remove a listener. No-op if it isn't registered.""" 89 try: 90 self._listeners[event].remove(listener) 91 except ValueError: 92 pass 93 return self 94 95 def remove_all_listeners(self, event: str | None = None) -> EventEmitter: 96 """Drop every listener, or every listener for ``event`` if specified.""" 97 if event is None: 98 self._listeners.clear() 99 else: 100 self._listeners.pop(event, None) 101 return self 102 103 async def emit(self, event: str, *args: Any) -> None: 104 """Notify every listener for ``event``. 105 106 Async listeners are awaited concurrently. Exceptions in any listener are 107 re-raised after every other listener has been scheduled — they do not 108 prevent other listeners from running. 109 """ 110 listeners = list(self._listeners.get(event, ())) 111 if not listeners: 112 return 113 114 coros: list[Awaitable[Any]] = [] 115 for fn in listeners: 116 try: 117 result = fn(*args) 118 except Exception: 119 continue 120 if inspect.isawaitable(result): 121 coros.append(result) 122 123 if coros: 124 await asyncio.gather(*coros, return_exceptions=True)