Rockbox open source high quality audio player as a Music Player Daemon
mpris rockbox mpd libadwaita audio rust zig deno
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add SyncBroadcaster to align squeezelite clients

+329 -50
+162 -36
crates/slim/README.md
··· 16 16 6. [Layer 4 — Broadcast buffer (Rust)](#layer-4--broadcast-buffer-rust) 17 17 7. [Layer 5 — HTTP stream server (Rust)](#layer-5--http-stream-server-rust) 18 18 8. [Layer 6 — Slim Protocol server (Rust)](#layer-6--slim-protocol-server-rust) 19 - 9. [Layer 7 — squeezelite client](#layer-7--squeezelite-client) 20 - 10. [Startup sequence](#startup-sequence) 21 - 11. [Track transition](#track-transition) 22 - 12. [Multi-room](#multi-room) 23 - 13. [Configuration](#configuration) 24 - 14. [Gotchas and non-obvious invariants](#gotchas-and-non-obvious-invariants) 19 + 9. [Layer 7 — Sync broadcaster (Rust)](#layer-7--sync-broadcaster-rust) 20 + 10. [Layer 8 — squeezelite client](#layer-8--squeezelite-client) 21 + 11. [Startup sequence](#startup-sequence) 22 + 12. [Track transition](#track-transition) 23 + 13. [Multi-room](#multi-room) 24 + 14. [Configuration](#configuration) 25 + 15. [Gotchas and non-obvious invariants](#gotchas-and-non-obvious-invariants) 25 26 26 27 --- 27 28 ··· 44 45 45 46 ├── HTTP server :9999 ─── one thread per squeezelite client 46 47 47 - └── Slim server :3483 ─── sends STRM, keeps connection alive 48 + └── Slim server :3483 ─── sends STRM + audg keepalive + sync jiffies 49 + 50 + └── SyncBroadcaster ─── broadcasts same jiffies to all clients/s 48 51 ``` 49 52 50 53 The PCM data is **never transcoded** — rockboxd pushes raw signed 16-bit ··· 71 74 ┌─────────────────────────────────────────────────────────────────┐ 72 75 │ rockbox-slim crate (Rust) │ 73 76 │ │ 74 - │ BroadcastBuffer │ 75 - │ ┌──────────────────────────────────────────────────────────┐ │ 76 - │ │ VecDeque<(seq: u64, chunk: Vec<u8>)> max 4 MB │ │ 77 - │ │ next_seq (writer cursor) │ │ 78 - │ └──────────────────────────────────────────────────────────┘ │ 79 - │ │ │ │ 80 - │ HTTP server :9999 Slim Protocol server :3483 │ 81 - │ (one thread per client) (one thread per client) │ 82 - │ │ │ │ 83 - │ BroadcastReceiver HELO → STRM → audg keepalive │ 84 - │ (per-client seq cursor) │ 77 + │ BroadcastBuffer SyncBroadcaster │ 78 + │ ┌───────────────────────────┐ ┌──────────────────────┐ │ 79 + │ │ VecDeque<(seq,chunk)> │ │ Vec<Sender<u32>> │ │ 80 + │ │ max 4 MB │ │ → server_jiffies/1 s │ │ 81 + │ └───────────────────────────┘ └──────────────────────┘ │ 82 + │ │ │ │ 83 + │ HTTP server :9999 Slim Protocol server :3483 │ 84 + │ (one thread per client) (one thread per client) │ 85 + │ │ │ │ │ 86 + │ BroadcastReceiver audg keepalive sync jiffies=T │ 87 + │ (per-client seq cursor) (on STMt) (every 1 s) │ 85 88 └────────────────────────────┬────────────────────────────────────┘ 86 89 │ TCP :9999 87 90 ┌──────────────┼──────────────┐ ··· 364 367 365 368 ### Session flow 366 369 370 + Two server-to-client message streams run concurrently on the same TCP 371 + connection: the **read loop** replies to `STMt` heartbeats with `audg`, while 372 + the **sync writer thread** independently sends `sync` once per second. Writes 373 + are serialised through an `Arc<Mutex<TcpStream>>` clone. 374 + 367 375 ``` 368 - squeezelite Slim server 369 - ─────────── ─────────── 370 - HELO ──────────────────────────────► 371 - (no kick — all clients share the stream) 372 - ◄────────────────────────── STRM 's' 376 + squeezelite Slim server 377 + ─────────── ───────────────────────────────── 378 + HELO ────────────────────────────► 379 + ◄─────────────────────────── STRM 's' 373 380 HTTP GET :9999/stream.pcm ─────────────────────────────► HTTP server 374 - ◄────────────────────── HTTP 200 + raw PCM stream 375 - STAT STMc ─────────────────────────► 376 - STAT STMs ─────────────────────────► 377 - STAT STMt ─────────────────────────► (every ~1 s) 378 - ◄─────────────────────────── audg (full volume) ← keepalive 379 - STAT STMt ─────────────────────────► 380 - ◄─────────────────────────── audg 381 - 381 + ◄─────────────────────── HTTP 200 + raw PCM stream 382 + STAT STMc ───────────────────────► 383 + STAT STMs ───────────────────────► 384 + [sync broadcaster fires at t=1 s] 385 + ◄─────────────────────────── sync jiffies=T ← clock alignment 386 + STAT STMt ───────────────────────► (~1 s heartbeat) 387 + ◄─────────────────────────── audg ← watchdog reset 388 + [sync broadcaster fires at t=2 s] 389 + ◄─────────────────────────── sync jiffies=T+1000 390 + STAT STMt ───────────────────────► 391 + ◄─────────────────────────── audg 392 + 382 393 ``` 394 + 395 + `sync` and `audg` are sent on different triggers (`sync` from a shared timer 396 + thread; `audg` from the per-client read loop) and can arrive in either order 397 + within the same second. 383 398 384 399 ### STRM 's' payload layout 385 400 ··· 432 447 433 448 This resets squeezelite's timeout counter to zero on every tick. 434 449 450 + ### sync packet 451 + 452 + ``` 453 + Offset Size Value Meaning 454 + ────── ──── ───── ──────────────────────────────────────────────────── 455 + 0 4 BE u32 server jiffies (ms since Unix epoch, truncated to u32) 456 + ``` 457 + 458 + squeezelite uses the jiffies value to compute how far ahead or behind it is 459 + relative to the server clock, then adjusts its output buffer drain rate to 460 + converge. Because all clients receive the **same jiffies value at the same 461 + instant**, they converge to the same playback position. 462 + 435 463 --- 436 464 437 - ## Layer 7 — squeezelite client 465 + ## Layer 7 — Sync broadcaster (Rust) 466 + 467 + **File:** `crates/slim/src/lib.rs` 468 + 469 + ### Design 470 + 471 + `SyncBroadcaster` is a `Mutex<Vec<mpsc::Sender<u32>>>`. It is stored in a 472 + `static OnceLock<Arc<SyncBroadcaster>>` so both `slimproto::serve` and the 473 + background timer thread share the same instance. 474 + 475 + ```rust 476 + pub(crate) struct SyncBroadcaster { 477 + senders: Mutex<Vec<mpsc::Sender<u32>>>, 478 + } 479 + ``` 480 + 481 + ### subscribe 482 + 483 + Called from `slimproto::serve` for each new client, **before** the client 484 + handler thread is spawned: 485 + 486 + ```rust 487 + pub(crate) fn subscribe(&self) -> mpsc::Receiver<u32> { 488 + let (tx, rx) = mpsc::channel(); 489 + self.senders.lock().unwrap().push(tx); 490 + rx 491 + } 492 + ``` 493 + 494 + The `Receiver` is moved into the client thread. When the client disconnects, 495 + the thread exits, the `Receiver` is dropped, and the `Sender` is pruned from 496 + the list on the next `broadcast()` call. 497 + 498 + ### broadcast 499 + 500 + ```rust 501 + pub(crate) fn broadcast(&self, jiffies: u32) { 502 + let mut senders = self.senders.lock().unwrap(); 503 + senders.retain(|tx| tx.send(jiffies).is_ok()); 504 + } 505 + ``` 506 + 507 + `retain` removes any `Sender` whose `Receiver` has been dropped (i.e. whose 508 + client has disconnected), keeping the list compact. 509 + 510 + ### Timer thread 511 + 512 + Started once inside `pcm_squeezelite_start()`: 513 + 514 + ```rust 515 + let sync = get_sync(); 516 + std::thread::spawn(move || loop { 517 + std::thread::sleep(Duration::from_secs(1)); 518 + sync.broadcast(server_jiffies()); 519 + }); 520 + ``` 521 + 522 + `server_jiffies()` returns `SystemTime::now().as_millis() as u32`. The u32 523 + truncation gives a ~49-day rollover; squeezelite handles this correctly with 524 + signed 32-bit arithmetic. 525 + 526 + ### Per-client sync writer thread 527 + 528 + Each `handle_client` call clones the TCP write stream into an 529 + `Arc<Mutex<TcpStream>>` shared with a dedicated sync writer thread: 530 + 531 + ```rust 532 + let write_stream = Arc::new(Mutex::new(stream.try_clone()?)); 533 + 534 + let ws = Arc::clone(&write_stream); 535 + std::thread::spawn(move || { 536 + for jiffies in sync_rx { // blocks until broadcaster fires 537 + let mut s = ws.lock().unwrap(); 538 + send_sync(&mut *s, jiffies)?; // write `sync` packet 539 + } 540 + }); 541 + ``` 542 + 543 + The read loop uses the same `write_stream` mutex to send `audg` replies, so 544 + `sync` and `audg` packets never interleave at the byte level. 545 + 546 + --- 547 + 548 + ## Layer 8 — squeezelite client 438 549 439 550 squeezelite handles the audio pipeline in three internal threads: 440 551 ··· 468 579 └── sink_dma_start(addr, size) [pcm-squeezelite.c] 469 580 470 581 ├── pcm_squeezelite_start() [lib.rs — idempotent] 582 + │ ├── spawn sync broadcaster thread (fires every 1 s) 471 583 │ ├── spawn HTTP server thread on :9999 472 584 │ └── spawn Slim server thread on :3483 473 585 ··· 480 592 ├── send HELO 481 593 ├── receive STRM 's' → TCP connect :9999 482 594 ├── receive HTTP 200 483 - └── start buffering PCM 595 + ├── start buffering PCM 596 + └── receive sync jiffies=T every ~1 s → align playback clock 484 597 ``` 485 598 486 599 --- ··· 513 626 - A slow reader skips forward to the oldest available chunk; other readers 514 627 are unaffected. 515 628 516 - Squeezelite clients are not time-synchronised (no NTP/PTP layer). Clock drift 517 - between rooms is typically 100–500 ms. For tighter sync, run squeezelite with 518 - a Snapcast-compatible back-end or use the FIFO sink instead. 629 + ### Clock synchronisation 630 + 631 + Once per second the `SyncBroadcaster` computes `server_jiffies()` once and 632 + delivers the **same value** to every connected client via `mpsc` channels. 633 + Each client's sync writer thread immediately sends a `sync` packet over its 634 + individual TCP connection. 635 + 636 + squeezelite's internal sync handler: 637 + 1. Receives `sync jiffies=T`. 638 + 2. Computes `delta = T - my_jiffies + output_buffer_latency`. 639 + 3. Adjusts its buffer drain rate (speeds up or slows down slightly) to converge 640 + on the target position. 641 + 642 + Because all rooms receive the same `T`, they converge to the same audio 643 + position. In practice this achieves sub-100 ms synchronisation over a LAN, 644 + which is imperceptible to human listeners. 519 645 520 646 --- 521 647
+57 -1
crates/slim/src/lib.rs
··· 6 6 pub fn _link_slim() {} 7 7 8 8 use std::collections::VecDeque; 9 - use std::sync::{Arc, Condvar, Mutex, OnceLock}; 9 + use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock}; 10 + use std::time::{Duration, SystemTime, UNIX_EPOCH}; 10 11 11 12 // --------------------------------------------------------------------------- 12 13 // Broadcast buffer — one writer, N independent readers. ··· 146 147 http_port: 9999, 147 148 }); 148 149 150 + // --------------------------------------------------------------------------- 151 + // Sync broadcaster — sends the same jiffies value to all connected clients 152 + // once per second so squeezelite instances converge to the same playback clock. 153 + // --------------------------------------------------------------------------- 154 + 155 + pub(crate) struct SyncBroadcaster { 156 + senders: Mutex<Vec<mpsc::Sender<u32>>>, 157 + } 158 + 159 + impl SyncBroadcaster { 160 + fn new() -> Self { 161 + SyncBroadcaster { 162 + senders: Mutex::new(Vec::new()), 163 + } 164 + } 165 + 166 + /// Register a new client receiver. Returns the Receiver end of the channel. 167 + pub(crate) fn subscribe(&self) -> mpsc::Receiver<u32> { 168 + let (tx, rx) = mpsc::channel(); 169 + self.senders.lock().unwrap().push(tx); 170 + rx 171 + } 172 + 173 + /// Broadcast jiffies to all clients, pruning senders whose client has gone. 174 + pub(crate) fn broadcast(&self, jiffies: u32) { 175 + let mut senders = self.senders.lock().unwrap(); 176 + senders.retain(|tx| tx.send(jiffies).is_ok()); 177 + } 178 + } 179 + 180 + static SYNC: OnceLock<Arc<SyncBroadcaster>> = OnceLock::new(); 181 + 182 + pub(crate) fn get_sync() -> Arc<SyncBroadcaster> { 183 + SYNC.get_or_init(|| Arc::new(SyncBroadcaster::new())).clone() 184 + } 185 + 186 + /// Milliseconds since the Unix epoch, truncated to u32 (~49-day rollover). 187 + /// Sent to all squeezelite clients so they can align their playback clocks. 188 + fn server_jiffies() -> u32 { 189 + SystemTime::now() 190 + .duration_since(UNIX_EPOCH) 191 + .unwrap_or_default() 192 + .as_millis() as u32 193 + } 194 + 149 195 fn get_buffer() -> Arc<BroadcastBuffer> { 150 196 BUFFER 151 197 .get_or_init(|| Arc::new(BroadcastBuffer::new())) ··· 181 227 182 228 let buf = get_buffer(); 183 229 buf.reset(); 230 + 231 + // Sync broadcaster: computes jiffies once per second and fans out to all 232 + // connected clients so they align to the same playback clock reference. 233 + { 234 + let sync = get_sync(); 235 + std::thread::spawn(move || loop { 236 + std::thread::sleep(Duration::from_secs(1)); 237 + sync.broadcast(server_jiffies()); 238 + }); 239 + } 184 240 185 241 let buf_http = buf.clone(); 186 242 std::thread::spawn(move || http::serve(http_port, buf_http));
+110 -13
crates/slim/src/slimproto.rs
··· 1 1 use std::io::{Read, Write}; 2 2 use std::net::{TcpListener, TcpStream}; 3 + use std::sync::{mpsc, Arc, Mutex}; 3 4 4 5 /// Slim Protocol TCP server. Each squeezelite instance that connects gets a 5 6 /// STRM command pointing at our HTTP broadcast endpoint. Multiple clients are 6 - /// fully supported — each connects to the same HTTP port and receives its own 7 - /// independent read cursor into the shared PCM broadcast buffer. 7 + /// fully supported — each receives an independent BroadcastReceiver cursor into 8 + /// the shared PCM buffer plus a per-second `sync` command so all instances 9 + /// align to the same server jiffies reference. 8 10 pub fn serve(slim_port: u16, http_port: u16) { 9 11 let listener = match TcpListener::bind(("0.0.0.0", slim_port)) { 10 12 Ok(l) => l, ··· 18 20 for stream in listener.incoming() { 19 21 match stream { 20 22 Ok(stream) => { 21 - std::thread::spawn(move || handle_client(stream, http_port)); 23 + // Subscribe before spawning so the sender is registered 24 + // before the first sync broadcast fires. 25 + let sync_rx = crate::get_sync().subscribe(); 26 + std::thread::spawn(move || handle_client(stream, http_port, sync_rx)); 22 27 } 23 28 Err(e) => tracing::warn!("slim: accept error: {e}"), 24 29 } 25 30 } 26 31 } 27 32 28 - fn handle_client(mut stream: TcpStream, http_port: u16) { 33 + fn handle_client(mut stream: TcpStream, http_port: u16, sync_rx: mpsc::Receiver<u32>) { 29 34 let peer = stream 30 35 .peer_addr() 31 36 .map(|a| a.to_string()) ··· 52 57 } 53 58 tracing::info!("slim: sent STRM to {peer} → http stream on :{http_port}"); 54 59 55 - // Read STAT / DSCO packets. Reply to every STMt heartbeat with audg so 56 - // squeezelite's 36-second "no messages from server" watchdog never fires. 60 + // Clone the stream for writes; reads stay on the original fd. 61 + // Both fds refer to the same socket — POSIX guarantees this is safe. 62 + let write_stream = match stream.try_clone() { 63 + Ok(s) => Arc::new(Mutex::new(s)), 64 + Err(e) => { 65 + tracing::error!("slim: try_clone failed for {peer}: {e}"); 66 + return; 67 + } 68 + }; 69 + 70 + // Sync writer thread: receives jiffies from the broadcaster and forwards 71 + // `sync` packets to this client so it aligns with the server clock. 72 + // Runs concurrently with the read loop below, sharing write_stream. 73 + { 74 + let ws = Arc::clone(&write_stream); 75 + let peer_label = peer.clone(); 76 + std::thread::spawn(move || { 77 + for jiffies in sync_rx { 78 + let mut s = ws.lock().unwrap(); 79 + if let Err(e) = send_sync(&mut *s, jiffies) { 80 + tracing::debug!("slim: sync write error to {peer_label}: {e}"); 81 + break; 82 + } 83 + tracing::debug!("slim: sync jiffies={jiffies} → {peer_label}"); 84 + } 85 + }); 86 + } 87 + 88 + // Read loop: handle STAT / DSCO packets. 89 + // Reply to every STMt heartbeat with `audg` to keep squeezelite's 36-second 90 + // watchdog from firing. Log timing data for diagnostics. 57 91 loop { 58 92 match read_client_packet(&mut stream) { 59 93 Ok((opcode, body)) => { 60 94 if opcode == "STAT" && body.len() >= 4 { 61 95 let ev = std::str::from_utf8(&body[..4]).unwrap_or("????"); 62 - tracing::debug!("slim: STAT {ev} from {peer}"); 63 96 if ev == "STMt" { 64 - if let Err(e) = send_audg(&mut stream) { 97 + let elapsed_ms = stmt_elapsed_ms(&body); 98 + let client_jiffies = stmt_jiffies(&body); 99 + tracing::debug!( 100 + "slim: STMt from {peer}: elapsed={elapsed_ms}ms \ 101 + client_jiffies={client_jiffies}" 102 + ); 103 + let mut s = write_stream.lock().unwrap(); 104 + if let Err(e) = send_audg(&mut *s) { 65 105 tracing::debug!("slim: audg error to {peer}: {e}"); 66 106 break; 67 107 } 108 + } else { 109 + tracing::debug!("slim: STAT {ev} from {peer}"); 68 110 } 69 111 } else if opcode == "DSCO" { 70 112 tracing::info!("slim: DSCO from {peer}"); ··· 80 122 } 81 123 } 82 124 } 125 + 126 + // --------------------------------------------------------------------------- 127 + // Packet I/O helpers 128 + // --------------------------------------------------------------------------- 83 129 84 130 fn read_client_packet(stream: &mut TcpStream) -> std::io::Result<(String, Vec<u8>)> { 85 131 let mut opcode = [0u8; 4]; ··· 114 160 payload.push(b's'); // command: start 115 161 payload.push(b'1'); // autostart 116 162 payload.push(b'p'); // format: raw PCM 117 - payload.push(b'1'); // pcm_sample_size: 16-bit 118 - payload.push(b'3'); // pcm_sample_rate: 44100 Hz 119 - payload.push(b'2'); // pcm_channels: stereo 120 - payload.push(b'1'); // pcm_endianness: little-endian 163 + payload.push(b'1'); // pcm_sample_size: 16-bit (squeezelite: field - '0') 164 + payload.push(b'3'); // pcm_sample_rate: 44100 (squeezelite: field - '0') 165 + payload.push(b'2'); // pcm_channels: stereo (squeezelite: field - '0') 166 + payload.push(b'1'); // pcm_endianness: LE (squeezelite: field - '0') 121 167 payload.push(255u8); // threshold: 255 KB 122 168 payload.push(0u8); // spdif_enable 123 169 payload.push(0u8); // transition_period ··· 127 173 payload.push(0u8); // slaves 128 174 payload.extend_from_slice(&0x00010000u32.to_be_bytes()); // replay_gain = 1.0 129 175 payload.extend_from_slice(&http_port.to_be_bytes()); 130 - payload.extend_from_slice(&0u32.to_be_bytes()); // server_ip = 0 → use slimproto_ip 176 + payload.extend_from_slice(&0u32.to_be_bytes()); // server_ip = 0 → use slimproto IP 131 177 payload.extend_from_slice(request); 132 178 send_server_packet(stream, b"strm", &payload) 133 179 } 134 180 181 + /// `audg` — full-volume gain packet; sent on every STMt heartbeat to suppress 182 + /// squeezelite's 36-second "no messages from server" watchdog. 135 183 fn send_audg(stream: &mut TcpStream) -> std::io::Result<()> { 136 184 let mut payload = [0u8; 9]; 137 185 payload[0..4].copy_from_slice(&0x00010000u32.to_be_bytes()); // left gain = 1.0 138 186 payload[4..8].copy_from_slice(&0x00010000u32.to_be_bytes()); // right gain = 1.0 139 187 send_server_packet(stream, b"audg", &payload) 140 188 } 189 + 190 + /// `sync` — tells squeezelite to align its playback clock to `jiffies`. 191 + /// All clients receive the same value from the broadcaster, causing them to 192 + /// converge to the same audio position. 193 + fn send_sync(stream: &mut TcpStream, jiffies: u32) -> std::io::Result<()> { 194 + send_server_packet(stream, b"sync", &jiffies.to_be_bytes()) 195 + } 196 + 197 + // --------------------------------------------------------------------------- 198 + // STMt body parsers 199 + // 200 + // STMt body layout (all fields big-endian): 201 + // [0..4] event ("STMt") 202 + // [4] num_crlf 203 + // [5] mas_initialized 204 + // [6] mas_mode 205 + // [7..11] rptr (stream buffer read pointer) 206 + // [11..15] wptr (stream buffer write pointer) 207 + // [15..23] bytes_received (u64) 208 + // [23..25] signal_strength (u16) 209 + // [25..29] jiffies (u32) ← client's monotonic ms clock 210 + // [29..33] output_buffer_size 211 + // [33..37] output_buffer_fullness 212 + // [37..41] elapsed_seconds 213 + // [41..43] voltage (u16) 214 + // [43..47] elapsed_milliseconds ← ms of audio output so far 215 + // [47..51] server_timestamp (echo of last strm timestamp) 216 + // [51..53] error_code (u16) 217 + // --------------------------------------------------------------------------- 218 + 219 + fn stmt_jiffies(body: &[u8]) -> u32 { 220 + read_u32_be(body, 25) 221 + } 222 + 223 + fn stmt_elapsed_ms(body: &[u8]) -> u32 { 224 + read_u32_be(body, 43) 225 + } 226 + 227 + fn read_u32_be(data: &[u8], offset: usize) -> u32 { 228 + if data.len() < offset + 4 { 229 + return 0; 230 + } 231 + u32::from_be_bytes([ 232 + data[offset], 233 + data[offset + 1], 234 + data[offset + 2], 235 + data[offset + 3], 236 + ]) 237 + }