Rockbox open source high quality audio player as a Music Player Daemon
mpris
rockbox
mpd
libadwaita
audio
rust
zig
deno
1"""Lightweight async event emitter used by ``RockboxClient``.
2
3Listeners may be sync (``def``) or async (``async def``); the emitter awaits
4the latter and calls the former synchronously. There is no Node-style
5``EventEmitter`` dependency.
6"""
7
8from __future__ import annotations
9
10import asyncio
11import inspect
12from collections import defaultdict
13from collections.abc import Awaitable, Callable
14from typing import Any, TypeAlias
15
16# Public event names emitted by the client.
17EventName: TypeAlias = str
18
19#: Event names emitted by the SDK. Importing as constants keeps editors honest.
20TRACK_CHANGED: EventName = "track:changed"
21"""Fires whenever the currently playing track changes. Payload: ``Track``."""
22
23STATUS_CHANGED: EventName = "status:changed"
24"""Fires when playback status changes. Payload: ``int`` (raw firmware status)."""
25
26PLAYLIST_CHANGED: EventName = "playlist:changed"
27"""Fires when the active playlist changes. Payload: ``Playlist``."""
28
29WS_OPEN: EventName = "ws:open"
30"""WebSocket connection opened. No payload."""
31
32WS_CLOSE: EventName = "ws:close"
33"""WebSocket connection closed. No payload."""
34
35WS_ERROR: EventName = "ws:error"
36"""WebSocket or subscription error. Payload: ``Exception``."""
37
38
39Listener: TypeAlias = Callable[..., Any] | Callable[..., Awaitable[Any]]
40
41
42class EventEmitter:
43 """Async-aware event emitter.
44
45 Usage:
46 emitter = EventEmitter()
47 emitter.on("track:changed", lambda t: print(t.title))
48
49 @emitter.on("track:changed")
50 async def on_track(track):
51 ...
52 """
53
54 def __init__(self) -> None:
55 self._listeners: dict[str, list[Listener]] = defaultdict(list)
56
57 def on(self, event: str, listener: Listener | None = None) -> Any:
58 """Register a listener. Usable as a decorator when ``listener`` is omitted."""
59 if listener is None:
60
61 def decorator(fn: Listener) -> Listener:
62 self._listeners[event].append(fn)
63 return fn
64
65 return decorator
66 self._listeners[event].append(listener)
67 return self
68
69 def once(self, event: str, listener: Listener | None = None) -> Any:
70 """Register a listener that fires at most once."""
71
72 def wrap(fn: Listener) -> Listener:
73 async def wrapper(*args: Any, **kwargs: Any) -> None:
74 self.off(event, wrapper)
75 result = fn(*args, **kwargs)
76 if inspect.isawaitable(result):
77 await result
78
79 self._listeners[event].append(wrapper)
80 return fn
81
82 if listener is None:
83 return wrap
84 wrap(listener)
85 return self
86
87 def off(self, event: str, listener: Listener) -> EventEmitter:
88 """Remove a listener. No-op if it isn't registered."""
89 try:
90 self._listeners[event].remove(listener)
91 except ValueError:
92 pass
93 return self
94
95 def remove_all_listeners(self, event: str | None = None) -> EventEmitter:
96 """Drop every listener, or every listener for ``event`` if specified."""
97 if event is None:
98 self._listeners.clear()
99 else:
100 self._listeners.pop(event, None)
101 return self
102
103 async def emit(self, event: str, *args: Any) -> None:
104 """Notify every listener for ``event``.
105
106 Async listeners are awaited concurrently. Exceptions in any listener are
107 re-raised after every other listener has been scheduled — they do not
108 prevent other listeners from running.
109 """
110 listeners = list(self._listeners.get(event, ()))
111 if not listeners:
112 return
113
114 coros: list[Awaitable[Any]] = []
115 for fn in listeners:
116 try:
117 result = fn(*args)
118 except Exception:
119 continue
120 if inspect.isawaitable(result):
121 coros.append(result)
122
123 if coros:
124 await asyncio.gather(*coros, return_exceptions=True)