//! Cross-task message channels backed by FreeRTOS native queues. //! //! `std::sync::mpsc` and `crossbeam-channel` both use `pthread_mutex_t` //! internally. ESP-IDF's pthread implementation defines //! `PTHREAD_MUTEX_INITIALIZER` as `0xffffffff` (a sentinel for lazy init via //! `pthread_mutex_init_if_static`), but the `libc` crate that Rust's std is //! built against assumes the GNU/newlib value (zeros) and a 40-byte //! `pthread_mutex_t`. ESP-IDF's `pthread_mutex_t` is 4 bytes (a pointer to a //! dynamically-allocated struct). The mismatch means the lazy init crashes //! the moment any `Mutex` inside a channel needs to wake a blocked //! receiver — which happens on every multi-producer send under contention. //! //! FreeRTOS queues sidestep this entirely. They're the native primitive on //! the ESP32, ISR-safe, support multiple producers and consumers, and don't //! touch the pthread layer. //! //! The wrapper here gives us familiar `Sender`/`Receiver` ergonomics. Both //! ends are `Arc>`; cloning is cheap; `T` must be `Copy` (FreeRTOS //! queues are byte-copy semantics, so non-`Copy` payloads would leak). use anyhow::{anyhow, Result}; use esp_idf_svc::hal::task::queue::Queue; use std::sync::Arc; use std::time::Duration; /// Block forever — FreeRTOS `portMAX_DELAY`. const PORT_MAX_DELAY: u32 = u32::MAX; pub struct Sender { queue: Arc>, } impl Clone for Sender { fn clone(&self) -> Self { Self { queue: self.queue.clone(), } } } /// Default send timeout — long enough that any sane receiver should drain in /// time, short enough that a wedged receiver doesn't hang the sender forever. const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(500); impl Sender { /// Send with a reasonable default timeout. Mirrors `std::sync::mpsc::Sender::send` /// for call-site ergonomics — most call sites use `let _ = tx.send(...)` /// and don't want to think about timeouts. pub fn send(&self, item: T) -> Result<()> { self.send_timeout(item, DEFAULT_SEND_TIMEOUT) } /// Send with an explicit timeout. Returns Err on timeout / failure. pub fn send_timeout(&self, item: T, timeout: Duration) -> Result<()> { let ticks = duration_to_ticks(timeout); self.queue .send_back(item, ticks) .map(|_| ()) .map_err(|e| anyhow!("queue send failed (full?): {e}")) } /// Send without blocking. Returns Err if the queue is full. pub fn try_send(&self, item: T) -> Result<()> { self.queue .send_back(item, 0) .map(|_| ()) .map_err(|e| anyhow!("queue try_send failed (full): {e}")) } } pub struct Receiver { queue: Arc>, } impl Receiver { /// Block forever waiting for a message. pub fn recv(&self) -> Option { self.queue.recv_front(PORT_MAX_DELAY).map(|(t, _)| t) } /// Block up to `timeout` waiting for a message. Returns `None` on timeout. pub fn recv_timeout(&self, timeout: Duration) -> Option { self.queue .recv_front(duration_to_ticks(timeout)) .map(|(t, _)| t) } /// Non-blocking receive. Returns `None` immediately if empty. pub fn try_recv(&self) -> Option { self.queue.recv_front(0).map(|(t, _)| t) } } /// Build an unbounded-ish channel with a fixed slot count. Pick a count that /// comfortably covers your peak in-flight messages — extra slots cost /// `count * size_of::()` bytes of static heap. pub fn channel(slots: usize) -> (Sender, Receiver) { let q = Arc::new(Queue::::new(slots)); (Sender { queue: q.clone() }, Receiver { queue: q }) } fn duration_to_ticks(d: Duration) -> u32 { // FreeRTOS tick rate is set in sdkconfig (we have CONFIG_FREERTOS_HZ=1000), // so 1 ms = 1 tick. Saturate on overflow rather than wrapping. d.as_millis().min(u32::MAX as u128 - 1) as u32 }