A nightstand noise generator based on M5Stack Atom Echo and integrating with Home Assistant
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 106 lines 4.2 kB view raw
1//! Cross-task message channels backed by FreeRTOS native queues. 2//! 3//! `std::sync::mpsc` and `crossbeam-channel` both use `pthread_mutex_t` 4//! internally. ESP-IDF's pthread implementation defines 5//! `PTHREAD_MUTEX_INITIALIZER` as `0xffffffff` (a sentinel for lazy init via 6//! `pthread_mutex_init_if_static`), but the `libc` crate that Rust's std is 7//! built against assumes the GNU/newlib value (zeros) and a 40-byte 8//! `pthread_mutex_t`. ESP-IDF's `pthread_mutex_t` is 4 bytes (a pointer to a 9//! dynamically-allocated struct). The mismatch means the lazy init crashes 10//! the moment any `Mutex<Waker>` inside a channel needs to wake a blocked 11//! receiver — which happens on every multi-producer send under contention. 12//! 13//! FreeRTOS queues sidestep this entirely. They're the native primitive on 14//! the ESP32, ISR-safe, support multiple producers and consumers, and don't 15//! touch the pthread layer. 16//! 17//! The wrapper here gives us familiar `Sender`/`Receiver` ergonomics. Both 18//! ends are `Arc<Queue<T>>`; cloning is cheap; `T` must be `Copy` (FreeRTOS 19//! queues are byte-copy semantics, so non-`Copy` payloads would leak). 20 21use anyhow::{anyhow, Result}; 22use esp_idf_svc::hal::task::queue::Queue; 23use std::sync::Arc; 24use std::time::Duration; 25 26/// Block forever — FreeRTOS `portMAX_DELAY`. 27const PORT_MAX_DELAY: u32 = u32::MAX; 28 29pub struct Sender<T: Copy + Send + Sync + 'static> { 30 queue: Arc<Queue<T>>, 31} 32 33impl<T: Copy + Send + Sync + 'static> Clone for Sender<T> { 34 fn clone(&self) -> Self { 35 Self { 36 queue: self.queue.clone(), 37 } 38 } 39} 40 41/// Default send timeout — long enough that any sane receiver should drain in 42/// time, short enough that a wedged receiver doesn't hang the sender forever. 43const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(500); 44 45impl<T: Copy + Send + Sync + 'static> Sender<T> { 46 /// Send with a reasonable default timeout. Mirrors `std::sync::mpsc::Sender::send` 47 /// for call-site ergonomics — most call sites use `let _ = tx.send(...)` 48 /// and don't want to think about timeouts. 49 pub fn send(&self, item: T) -> Result<()> { 50 self.send_timeout(item, DEFAULT_SEND_TIMEOUT) 51 } 52 53 /// Send with an explicit timeout. Returns Err on timeout / failure. 54 pub fn send_timeout(&self, item: T, timeout: Duration) -> Result<()> { 55 let ticks = duration_to_ticks(timeout); 56 self.queue 57 .send_back(item, ticks) 58 .map(|_| ()) 59 .map_err(|e| anyhow!("queue send failed (full?): {e}")) 60 } 61 62 /// Send without blocking. Returns Err if the queue is full. 63 pub fn try_send(&self, item: T) -> Result<()> { 64 self.queue 65 .send_back(item, 0) 66 .map(|_| ()) 67 .map_err(|e| anyhow!("queue try_send failed (full): {e}")) 68 } 69} 70 71pub struct Receiver<T: Copy + Send + Sync + 'static> { 72 queue: Arc<Queue<T>>, 73} 74 75impl<T: Copy + Send + Sync + 'static> Receiver<T> { 76 /// Block forever waiting for a message. 77 pub fn recv(&self) -> Option<T> { 78 self.queue.recv_front(PORT_MAX_DELAY).map(|(t, _)| t) 79 } 80 81 /// Block up to `timeout` waiting for a message. Returns `None` on timeout. 82 pub fn recv_timeout(&self, timeout: Duration) -> Option<T> { 83 self.queue 84 .recv_front(duration_to_ticks(timeout)) 85 .map(|(t, _)| t) 86 } 87 88 /// Non-blocking receive. Returns `None` immediately if empty. 89 pub fn try_recv(&self) -> Option<T> { 90 self.queue.recv_front(0).map(|(t, _)| t) 91 } 92} 93 94/// Build an unbounded-ish channel with a fixed slot count. Pick a count that 95/// comfortably covers your peak in-flight messages — extra slots cost 96/// `count * size_of::<T>()` bytes of static heap. 97pub fn channel<T: Copy + Send + Sync + 'static>(slots: usize) -> (Sender<T>, Receiver<T>) { 98 let q = Arc::new(Queue::<T>::new(slots)); 99 (Sender { queue: q.clone() }, Receiver { queue: q }) 100} 101 102fn duration_to_ticks(d: Duration) -> u32 { 103 // FreeRTOS tick rate is set in sdkconfig (we have CONFIG_FREERTOS_HZ=1000), 104 // so 1 ms = 1 tick. Saturate on overflow rather than wrapping. 105 d.as_millis().min(u32::MAX as u128 - 1) as u32 106}