A nightstand noise generator based on M5Stack Atom Echo and integrating with Home Assistant
1//! Cross-task message channels backed by FreeRTOS native queues.
2//!
3//! `std::sync::mpsc` and `crossbeam-channel` both use `pthread_mutex_t`
4//! internally. ESP-IDF's pthread implementation defines
5//! `PTHREAD_MUTEX_INITIALIZER` as `0xffffffff` (a sentinel for lazy init via
6//! `pthread_mutex_init_if_static`), but the `libc` crate that Rust's std is
7//! built against assumes the GNU/newlib value (zeros) and a 40-byte
8//! `pthread_mutex_t`. ESP-IDF's `pthread_mutex_t` is 4 bytes (a pointer to a
9//! dynamically-allocated struct). The mismatch means the lazy init crashes
10//! the moment any `Mutex<Waker>` inside a channel needs to wake a blocked
11//! receiver — which happens on every multi-producer send under contention.
12//!
13//! FreeRTOS queues sidestep this entirely. They're the native primitive on
14//! the ESP32, ISR-safe, support multiple producers and consumers, and don't
15//! touch the pthread layer.
16//!
17//! The wrapper here gives us familiar `Sender`/`Receiver` ergonomics. Both
18//! ends are `Arc<Queue<T>>`; cloning is cheap; `T` must be `Copy` (FreeRTOS
19//! queues are byte-copy semantics, so non-`Copy` payloads would leak).
20
21use anyhow::{anyhow, Result};
22use esp_idf_svc::hal::task::queue::Queue;
23use std::sync::Arc;
24use std::time::Duration;
25
26/// Block forever — FreeRTOS `portMAX_DELAY`.
27const PORT_MAX_DELAY: u32 = u32::MAX;
28
29pub struct Sender<T: Copy + Send + Sync + 'static> {
30 queue: Arc<Queue<T>>,
31}
32
33impl<T: Copy + Send + Sync + 'static> Clone for Sender<T> {
34 fn clone(&self) -> Self {
35 Self {
36 queue: self.queue.clone(),
37 }
38 }
39}
40
41/// Default send timeout — long enough that any sane receiver should drain in
42/// time, short enough that a wedged receiver doesn't hang the sender forever.
43const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(500);
44
45impl<T: Copy + Send + Sync + 'static> Sender<T> {
46 /// Send with a reasonable default timeout. Mirrors `std::sync::mpsc::Sender::send`
47 /// for call-site ergonomics — most call sites use `let _ = tx.send(...)`
48 /// and don't want to think about timeouts.
49 pub fn send(&self, item: T) -> Result<()> {
50 self.send_timeout(item, DEFAULT_SEND_TIMEOUT)
51 }
52
53 /// Send with an explicit timeout. Returns Err on timeout / failure.
54 pub fn send_timeout(&self, item: T, timeout: Duration) -> Result<()> {
55 let ticks = duration_to_ticks(timeout);
56 self.queue
57 .send_back(item, ticks)
58 .map(|_| ())
59 .map_err(|e| anyhow!("queue send failed (full?): {e}"))
60 }
61
62 /// Send without blocking. Returns Err if the queue is full.
63 pub fn try_send(&self, item: T) -> Result<()> {
64 self.queue
65 .send_back(item, 0)
66 .map(|_| ())
67 .map_err(|e| anyhow!("queue try_send failed (full): {e}"))
68 }
69}
70
71pub struct Receiver<T: Copy + Send + Sync + 'static> {
72 queue: Arc<Queue<T>>,
73}
74
75impl<T: Copy + Send + Sync + 'static> Receiver<T> {
76 /// Block forever waiting for a message.
77 pub fn recv(&self) -> Option<T> {
78 self.queue.recv_front(PORT_MAX_DELAY).map(|(t, _)| t)
79 }
80
81 /// Block up to `timeout` waiting for a message. Returns `None` on timeout.
82 pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
83 self.queue
84 .recv_front(duration_to_ticks(timeout))
85 .map(|(t, _)| t)
86 }
87
88 /// Non-blocking receive. Returns `None` immediately if empty.
89 pub fn try_recv(&self) -> Option<T> {
90 self.queue.recv_front(0).map(|(t, _)| t)
91 }
92}
93
94/// Build an unbounded-ish channel with a fixed slot count. Pick a count that
95/// comfortably covers your peak in-flight messages — extra slots cost
96/// `count * size_of::<T>()` bytes of static heap.
97pub fn channel<T: Copy + Send + Sync + 'static>(slots: usize) -> (Sender<T>, Receiver<T>) {
98 let q = Arc::new(Queue::<T>::new(slots));
99 (Sender { queue: q.clone() }, Receiver { queue: q })
100}
101
102fn duration_to_ticks(d: Duration) -> u32 {
103 // FreeRTOS tick rate is set in sdkconfig (we have CONFIG_FREERTOS_HZ=1000),
104 // so 1 ms = 1 tick. Saturate on overflow rather than wrapping.
105 d.as_millis().min(u32::MAX as u128 - 1) as u32
106}