A nightstand noise generator based on M5Stack Atom Echo and integrating with Home Assistant
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 806 lines 31 kB view raw
1//! Network task: WiFi + MQTT, online/offline state machine, gatekeeper for 2//! button events. 3//! 4//! Two threads: 5//! - **state thread** owns WiFi + MQTT, processes `NetTaskMsg` from a single 6//! FreeRTOS queue. Routes connection-state changes (Connected/Disconnected) 7//! and outbound publishes (button events, state snapshots). 8//! - **forwarder thread** reads `OutboundEvent` from the rest of the firmware 9//! and re-emits as `NetTaskMsg::Outbound` so the state thread sees a 10//! single stream. 11//! 12//! The MQTT client is constructed with `new_cb`. The callback runs in the 13//! ESP-IDF MQTT task and forwards Connected/Disconnected to the state queue 14//! and routes inbound commands directly to `audio_tx` — no extra pump 15//! thread or borrowed-string copy required. 16//! 17//! All cross-thread channels are FreeRTOS queues (see `channels.rs`). 18//! `std::sync::mpsc` is broken on esp-idf-rs because of the 19//! `PTHREAD_MUTEX_INITIALIZER` mismatch between newlib (zeros) and ESP-IDF 20//! (`0xffffffff`); see channels.rs for the gory details. 21//! 22//! WiFi auto-reconnects via the default ESP-IDF behavior; the MQTT C client 23//! auto-reconnects on its own (default `reconnect_timeout` is non-zero), so 24//! we observe `Connected`/`Disconnected` events instead of running explicit 25//! retry loops. If the very first WiFi attempt fails, we retry every 60s. 26 27use crate::channels::{channel, Receiver, Sender}; 28use crate::discovery; 29use crate::events::{ 30 AudioCommand, ButtonEvent, LedSignal, NetStatus, OutboundEvent, StateSnapshot, 31}; 32use crate::ota; 33use crate::secrets::CONFIG; 34use crate::state::snap_to_preset_index; 35use anyhow::{anyhow, Result}; 36use esp_idf_svc::eventloop::EspSystemEventLoop; 37use esp_idf_svc::hal::modem::Modem; 38use esp_idf_svc::mqtt::client::{ 39 Details, EspMqttClient, EventPayload, LwtConfiguration, MqttClientConfiguration, QoS, 40}; 41use esp_idf_svc::nvs::{EspNvsPartition, NvsDefault}; 42use esp_idf_svc::wifi::{ 43 BlockingWifi, ClientConfiguration, Configuration, EspWifi, WifiDeviceId, 44}; 45use log::{info, warn}; 46use std::sync::atomic::{AtomicBool, Ordering}; 47use std::sync::Arc; 48use std::thread::{Builder, JoinHandle}; 49use std::time::{Duration, Instant}; 50 51const WIFI_RETRY_INTERVAL: Duration = Duration::from_secs(60); 52const STATE_THREAD_STACK: usize = 16 * 1024; 53const FORWARDER_THREAD_STACK: usize = 4 * 1024; 54 55/// Spawn the network task. Takes ownership of the modem, an NVS partition 56/// clone, and the channel endpoints it'll need for the rest of its life. 57/// The MAC address is read inside the task and logged loudly so first-boot 58/// units show their identity in the serial monitor before any connect. 59pub fn spawn( 60 modem: Modem, 61 sys_loop: EspSystemEventLoop, 62 nvs_partition: EspNvsPartition<NvsDefault>, 63 sw_version: &'static str, 64 audio_tx: Sender<AudioCommand>, 65 led_tx: Sender<LedSignal>, 66 out_rx: Receiver<OutboundEvent>, 67) -> Result<JoinHandle<()>> { 68 let handle = Builder::new() 69 .name("network".into()) 70 .stack_size(STATE_THREAD_STACK) 71 .spawn(move || { 72 if let Err(e) = run( 73 modem, 74 sys_loop, 75 nvs_partition, 76 sw_version, 77 audio_tx, 78 led_tx, 79 out_rx, 80 ) { 81 warn!("network task exiting on error: {e:?}"); 82 } 83 })?; 84 Ok(handle) 85} 86 87/// Unified message type for the network state thread. All variants are `Copy` 88/// so they fit in a FreeRTOS queue (which uses byte-copy semantics). 89#[derive(Debug, Clone, Copy)] 90enum NetTaskMsg { 91 Connected, 92 Disconnected, 93 Outbound(OutboundEvent), 94 /// New `latest_version` seen on the shared topic. Cached so that an 95 /// install request later can build the download URL from it. 96 LatestVersion(VersionBuf), 97 /// HA published `install` to `cmd/update`. 98 OtaInstall, 99 /// OTA worker reporting download progress (0..=100). 100 OtaProgress(u8), 101 /// OTA worker finished (true = success, false = failure). On success 102 /// the worker also calls `esp_restart` so we may never observe this 103 /// variant for the success case; on failure it lets us repaint the 104 /// LED and clear the in-progress state. 105 OtaFinished(bool), 106} 107 108/// Stack-allocated, `Copy`-friendly version string. FreeRTOS queues copy by 109/// value, so we can't pass `String`/`heapless::String` (both are non-Copy). 110/// 31 bytes covers any reasonable semver, including pre-release tags. 111#[derive(Debug, Clone, Copy)] 112struct VersionBuf { 113 bytes: [u8; 31], 114 len: u8, 115} 116 117impl VersionBuf { 118 fn from_bytes(b: &[u8]) -> Option<Self> { 119 if b.is_empty() || b.len() > 31 { 120 return None; 121 } 122 // Reject anything that isn't valid UTF-8; saves the as_str caller a 123 // failure mode it can't recover from. 124 std::str::from_utf8(b).ok()?; 125 let mut bytes = [0u8; 31]; 126 bytes[..b.len()].copy_from_slice(b); 127 Some(Self { 128 bytes, 129 len: b.len() as u8, 130 }) 131 } 132 133 fn as_str(&self) -> &str { 134 // SAFETY: from_bytes verified UTF-8 at construction. 135 unsafe { std::str::from_utf8_unchecked(&self.bytes[..self.len as usize]) } 136 } 137} 138 139fn run( 140 modem: Modem, 141 sys_loop: EspSystemEventLoop, 142 nvs_partition: EspNvsPartition<NvsDefault>, 143 sw_version: &'static str, 144 audio_tx: Sender<AudioCommand>, 145 led_tx: Sender<LedSignal>, 146 out_rx: Receiver<OutboundEvent>, 147) -> Result<()> { 148 info!("network task: starting; broker={}", CONFIG.mqtt_url); 149 if CONFIG.wifi_ssid.is_empty() { 150 warn!("cfg.toml: wifi_ssid is empty; network task exiting"); 151 return Err(anyhow!("wifi_ssid empty")); 152 } 153 if CONFIG.wifi_password.is_empty() { 154 warn!("cfg.toml: wifi_password is empty; network task exiting"); 155 return Err(anyhow!("wifi_password empty")); 156 } 157 if CONFIG.mqtt_url.is_empty() { 158 warn!("cfg.toml: mqtt_url is empty; network task exiting"); 159 return Err(anyhow!("mqtt_url empty")); 160 } 161 162 let _ = led_tx.send(LedSignal::Net(NetStatus::Connecting)); 163 let esp_wifi = EspWifi::new(modem, sys_loop.clone(), Some(nvs_partition)) 164 .map_err(|e| anyhow!("EspWifi::new failed: {e}"))?; 165 let mut wifi = BlockingWifi::wrap(esp_wifi, sys_loop) 166 .map_err(|e| anyhow!("BlockingWifi::wrap failed: {e}"))?; 167 168 let mac = wifi 169 .wifi() 170 .driver() 171 .get_mac(WifiDeviceId::Sta) 172 .map_err(|e| anyhow!("get_mac: {e}"))?; 173 let mac_hex = format_mac(mac); 174 175 let topic_prefix = format!("nightstand/{mac_hex}"); 176 let avail_topic = format!("{topic_prefix}/available"); 177 let state_topic = format!("{topic_prefix}/state"); 178 let button_topic = format!("{topic_prefix}/button"); 179 let cmd_filter = format!("{topic_prefix}/cmd/+"); 180 let cmd_play_topic = format!("{topic_prefix}/cmd/play"); 181 let cmd_volume_topic = format!("{topic_prefix}/cmd/volume"); 182 let cmd_update_topic = format!("{topic_prefix}/cmd/update"); 183 // HA's update entity reads this single JSON-state topic for installed 184 // version, in_progress flag, and update_percentage. We keep the file- 185 // path-style suffix `update/state` even though the discovery payload 186 // calls it state_topic — clearer when subscribed via `mosquitto_sub`. 187 let update_state_topic = format!("{topic_prefix}/update/state"); 188 let client_id = format!("nightstand_{mac_hex}"); 189 let hostname = format!("nightstand-{mac_hex}"); 190 191 info!( 192 "device MAC = {mac_hex} → topic prefix {topic_prefix}/, client_id {client_id}, broker {}", 193 CONFIG.mqtt_url 194 ); 195 196 set_hostname(wifi.wifi().sta_netif(), &hostname); 197 wifi.set_configuration(&Configuration::Client(ClientConfiguration { 198 ssid: CONFIG 199 .wifi_ssid 200 .try_into() 201 .map_err(|_| anyhow!("wifi_ssid too long for heapless::String<32>"))?, 202 password: CONFIG 203 .wifi_password 204 .try_into() 205 .map_err(|_| anyhow!("wifi_password too long for heapless::String<64>"))?, 206 // Let the driver auto-detect WPA2/WPA3/etc. based on what the AP 207 // advertises. Forcing WPA2Personal can hang association on WPA3 208 // and WPA2/WPA3-mixed networks. 209 ..Default::default() 210 }))?; 211 212 connect_wifi_with_retry(&mut wifi); 213 214 // Unified queue for the state thread. The MQTT callback pushes 215 // Connected/Disconnected into it; the forwarder thread pushes Outbound. 216 let (msg_tx, msg_rx) = channel::<NetTaskMsg>(32); 217 218 // Spawn the outbound forwarder: pulls OutboundEvent from the outside 219 // world and re-emits as NetTaskMsg::Outbound. 220 let fwd_msg_tx = msg_tx.clone(); 221 let _fwd_handle = Builder::new() 222 .name("net-fwd".into()) 223 .stack_size(FORWARDER_THREAD_STACK) 224 .spawn(move || forward_outbound(out_rx, fwd_msg_tx))?; 225 226 // Build the MQTT callback. It runs in the ESP-IDF MQTT task and routes: 227 // Connected/Disconnected → state-thread queue 228 // Received(/cmd/play) → audio_tx::Play/Stop 229 // Received(/cmd/volume) → audio_tx::SetVolumeIndex(snap_to_preset(pct)) 230 // Received(/cmd/update) → state-thread queue (install request) 231 // Received(shared latest)→ state-thread queue (cache new version) 232 let cb_msg_tx = msg_tx.clone(); 233 let cb_audio_tx = audio_tx.clone(); 234 let cb_cmd_play = cmd_play_topic.clone(); 235 let cb_cmd_volume = cmd_volume_topic.clone(); 236 let cb_cmd_update = cmd_update_topic.clone(); 237 let mqtt_lwt_payload = b"offline"; 238 let mqtt_config = MqttClientConfiguration { 239 client_id: Some(&client_id), 240 lwt: Some(LwtConfiguration { 241 topic: &avail_topic, 242 payload: mqtt_lwt_payload, 243 qos: QoS::AtLeastOnce, 244 retain: true, 245 }), 246 keep_alive_interval: Some(Duration::from_secs(60)), 247 ..Default::default() 248 }; 249 let mut client = EspMqttClient::new_cb(CONFIG.mqtt_url, &mqtt_config, move |event| { 250 mqtt_callback( 251 event, 252 &cb_msg_tx, 253 &cb_audio_tx, 254 &cb_cmd_play, 255 &cb_cmd_volume, 256 &cb_cmd_update, 257 ); 258 }) 259 .map_err(|e| anyhow!("EspMqttClient::new_cb: {e}"))?; 260 261 // Keep one Sender alive so the OTA worker can post Progress/Finished 262 // back to this loop without racing the MQTT callback's clone. We 263 // intentionally don't drop the original msg_tx — there's no point 264 // detecting a closed channel from this thread, since this thread is 265 // the only consumer and the only loop body. 266 let msg_tx_for_ota = msg_tx; 267 268 let mut last_snapshot: Option<StateSnapshot> = None; 269 let mut online = false; 270 let boot_at = Instant::now(); 271 // After publishing a button gesture, set this to "now + reset window" so 272 // we know to publish a retained "idle" once the deadline lapses. The 273 // sensor-style button entity in HA needs a stable resting state to show 274 // instead of the "Unknown" of the old event entity. 275 let mut button_idle_at: Option<Instant> = None; 276 // Most recent `latest_version` seen on the shared topic. None until 277 // we've received our first retained message. The OTA URL is built as 278 // `<ota_url_base>/sound-machine-<latest>.bin` at install time. 279 let mut latest_version: Option<VersionBuf> = None; 280 // Cancel-rollback runs once on the first healthy MQTT connect of a 281 // boot. Set after the call so re-Connecteds are no-ops. 282 let mut have_marked_valid = false; 283 // Guards against a second OTA being kicked off while one is already 284 // running (e.g., HA Install double-click). Cleared on failure; on 285 // success the device reboots and the flag goes away with it. 286 let ota_in_progress = Arc::new(AtomicBool::new(false)); 287 // OTA progress state surfaced into HA's update entity via JSON state. 288 // None when no OTA is running. Updated in 5% steps from the OTA worker. 289 let mut ota_progress: Option<u8> = None; 290 291 info!("network task: entering main loop"); 292 293 loop { 294 // Wait for either a queued message or, if a button reset is pending, 295 // the moment we owe an idle publish. 296 let msg = match button_idle_at { 297 Some(deadline) => { 298 let wait = deadline.saturating_duration_since(Instant::now()); 299 msg_rx.recv_timeout(wait) 300 } 301 None => msg_rx.recv(), 302 }; 303 304 let Some(msg) = msg else { 305 // Timed out -> button idle deadline lapsed. 306 if online { 307 publish_button_idle(&mut client, &button_topic); 308 } 309 button_idle_at = None; 310 continue; 311 }; 312 313 match msg { 314 NetTaskMsg::Connected => { 315 info!("MQTT connected"); 316 online = true; 317 let _ = led_tx.send(LedSignal::Net(NetStatus::Online)); 318 publish_online_announce( 319 &mut client, 320 &avail_topic, 321 &state_topic, 322 &button_topic, 323 &update_state_topic, 324 &cmd_filter, 325 &mac_hex, 326 sw_version, 327 last_snapshot, 328 ota_progress, 329 boot_at, 330 ); 331 if !have_marked_valid { 332 match ota::mark_app_valid() { 333 Ok(()) => info!("OTA: marked running app as valid (rollback canceled)"), 334 Err(e) => warn!("OTA: mark_app_valid failed: {e}"), 335 } 336 have_marked_valid = true; 337 } 338 } 339 NetTaskMsg::Disconnected => { 340 info!("MQTT disconnected"); 341 online = false; 342 let _ = led_tx.send(LedSignal::Net(NetStatus::Offline)); 343 } 344 NetTaskMsg::Outbound(OutboundEvent::Button(e)) => { 345 handle_outbound_button(e, online, &mut client, &button_topic, &audio_tx); 346 if online { 347 button_idle_at = 348 Some(Instant::now() + Duration::from_millis(BUTTON_IDLE_AFTER_MS)); 349 } 350 } 351 NetTaskMsg::Outbound(OutboundEvent::State(snap)) => { 352 last_snapshot = Some(snap); 353 if online { 354 publish_state(&mut client, &state_topic, snap, boot_at); 355 } 356 } 357 NetTaskMsg::LatestVersion(v) => { 358 info!("OTA: latest_version is now {}", v.as_str()); 359 latest_version = Some(v); 360 } 361 NetTaskMsg::OtaInstall => { 362 if handle_ota_install(latest_version, &led_tx, &ota_in_progress, &msg_tx_for_ota) { 363 ota_progress = Some(0); 364 if online { 365 publish_update_state( 366 &mut client, 367 &update_state_topic, 368 sw_version, 369 ota_progress, 370 ); 371 } 372 } 373 } 374 NetTaskMsg::OtaProgress(pct) => { 375 ota_progress = Some(pct); 376 if online { 377 publish_update_state( 378 &mut client, 379 &update_state_topic, 380 sw_version, 381 ota_progress, 382 ); 383 } 384 } 385 NetTaskMsg::OtaFinished(success) => { 386 ota_progress = None; 387 ota_in_progress.store(false, Ordering::SeqCst); 388 let _ = led_tx.send(LedSignal::UpdateDone); 389 if !success && online { 390 // Republish a non-progress state JSON so HA stops 391 // showing the progress bar. (On success the device 392 // reboots before reaching this, so success path 393 // mainly exists for symmetry.) 394 publish_update_state( 395 &mut client, 396 &update_state_topic, 397 sw_version, 398 ota_progress, 399 ); 400 } 401 } 402 } 403 } 404} 405 406/// How long the button entity holds a gesture state before snapping back 407/// to "idle". Long enough for HA automations to fire on the transition, 408/// short enough that the device card always feels at-rest. 409const BUTTON_IDLE_AFTER_MS: u64 = 800; 410 411/// Outbound forwarder: blocking-receive `OutboundEvent` from the rest of the 412/// firmware, re-emit as `NetTaskMsg::Outbound` for the state thread. 413fn forward_outbound(out_rx: Receiver<OutboundEvent>, msg_tx: Sender<NetTaskMsg>) { 414 info!("outbound forwarder thread ready"); 415 loop { 416 let Some(ev) = out_rx.recv() else { continue }; 417 info!("forwarder: got OutboundEvent {ev:?}"); 418 if msg_tx.send(NetTaskMsg::Outbound(ev)).is_err() { 419 warn!("outbound forwarder: state-queue send failed; exiting"); 420 return; 421 } 422 } 423} 424 425/// MQTT event handler — runs in the ESP-IDF MQTT task. Must be quick. Uses 426/// `try_send` so it never blocks the MQTT task even if our state queue is 427/// briefly backed up; messages are events, not durable state. 428fn mqtt_callback( 429 event: esp_idf_svc::mqtt::client::EspMqttEvent<'_>, 430 msg_tx: &Sender<NetTaskMsg>, 431 audio_tx: &Sender<AudioCommand>, 432 cmd_play: &str, 433 cmd_volume: &str, 434 cmd_update: &str, 435) { 436 match event.payload() { 437 EventPayload::Connected(_) => { 438 let _ = msg_tx.try_send(NetTaskMsg::Connected); 439 } 440 EventPayload::Disconnected => { 441 let _ = msg_tx.try_send(NetTaskMsg::Disconnected); 442 } 443 EventPayload::Received { 444 topic, 445 data, 446 details, 447 .. 448 } => { 449 if !matches!(details, Details::Complete) { 450 return; 451 } 452 let Some(topic) = topic else { 453 return; 454 }; 455 if topic == cmd_play { 456 let cmd = match data { 457 b"ON" => Some(AudioCommand::Play), 458 b"OFF" => Some(AudioCommand::Stop), 459 _ => None, 460 }; 461 if let Some(cmd) = cmd { 462 let _ = audio_tx.try_send(cmd); 463 } 464 } else if topic == cmd_volume { 465 if let Ok(s) = std::str::from_utf8(data) { 466 if let Ok(pct) = s.trim().parse::<u32>() { 467 let idx = snap_to_preset_index(pct.min(100) as u8); 468 let _ = audio_tx.try_send(AudioCommand::SetVolumeIndex(idx)); 469 } 470 } 471 } else if topic == cmd_update { 472 if data == b"install" { 473 let _ = msg_tx.try_send(NetTaskMsg::OtaInstall); 474 } 475 } else if topic == discovery::SHARED_LATEST_VERSION_TOPIC { 476 let trimmed = trim_ascii(data); 477 if let Some(v) = VersionBuf::from_bytes(trimmed) { 478 let _ = msg_tx.try_send(NetTaskMsg::LatestVersion(v)); 479 } else { 480 warn!( 481 "shared latest_version: invalid payload (len={}, dropped)", 482 data.len() 483 ); 484 } 485 } 486 } 487 EventPayload::Error(e) => { 488 warn!("MQTT error event: {e:?}"); 489 } 490 _ => {} 491 } 492} 493 494/// Strip leading/trailing ASCII whitespace from a byte slice without 495/// allocating. (`bytes::trim_ascii` is unstable.) 496fn trim_ascii(b: &[u8]) -> &[u8] { 497 let mut start = 0; 498 let mut end = b.len(); 499 while start < end && b[start].is_ascii_whitespace() { 500 start += 1; 501 } 502 while end > start && b[end - 1].is_ascii_whitespace() { 503 end -= 1; 504 } 505 &b[start..end] 506} 507 508fn connect_wifi_with_retry(wifi: &mut BlockingWifi<EspWifi<'static>>) { 509 loop { 510 match try_connect_wifi(wifi) { 511 Ok(()) => { 512 info!("WiFi up"); 513 return; 514 } 515 Err(e) => { 516 warn!("WiFi connect failed: {e:?}; retrying in {WIFI_RETRY_INTERVAL:?}"); 517 std::thread::sleep(WIFI_RETRY_INTERVAL); 518 } 519 } 520 } 521} 522 523fn try_connect_wifi(wifi: &mut BlockingWifi<EspWifi<'static>>) -> Result<()> { 524 if !wifi.is_started()? { 525 info!("WiFi: starting driver"); 526 wifi.start()?; 527 info!("WiFi: driver started"); 528 } 529 info!("WiFi: associating with AP"); 530 wifi.connect()?; 531 info!("WiFi: associated; waiting for DHCP lease"); 532 wifi.wait_netif_up()?; 533 Ok(()) 534} 535 536fn handle_outbound_button( 537 e: ButtonEvent, 538 online: bool, 539 client: &mut EspMqttClient<'_>, 540 button_topic: &str, 541 audio_tx: &Sender<AudioCommand>, 542) { 543 info!("handle_outbound_button: {e:?} online={online}"); 544 if online { 545 let payload = format!(r#"{{"event_type":"{}"}}"#, e.as_event_type()); 546 match client.publish(button_topic, QoS::AtMostOnce, false, payload.as_bytes()) { 547 Ok(msg_id) => info!("published button event {e:?} (msg_id={msg_id})"), 548 Err(err) => warn!("publish button event failed: {err}"), 549 } 550 } else { 551 // Offline fallback: only short needs translation to a local toggle. 552 // Long already cycles volume locally (coordinator did that). Double 553 // is a pure-MQTT gesture and is a no-op offline per spec. 554 match e { 555 ButtonEvent::Short => { 556 info!("offline short → AudioCommand::Toggle (local fallback)"); 557 let _ = audio_tx.send(AudioCommand::Toggle); 558 } 559 ButtonEvent::Long => {} // already handled locally 560 ButtonEvent::Double => { 561 info!("offline double → no-op (HA late-night-lights gesture unavailable)"); 562 } 563 } 564 } 565} 566 567#[allow(clippy::too_many_arguments)] 568fn publish_online_announce( 569 client: &mut EspMqttClient<'_>, 570 avail_topic: &str, 571 state_topic: &str, 572 button_topic: &str, 573 update_state_topic: &str, 574 cmd_filter: &str, 575 mac_hex: &str, 576 sw_version: &str, 577 last_snapshot: Option<StateSnapshot>, 578 ota_progress: Option<u8>, 579 boot_at: Instant, 580) { 581 if let Err(e) = client.publish(avail_topic, QoS::AtLeastOnce, true, b"online") { 582 warn!("publish availability=online failed: {e}"); 583 } 584 585 // Tell HA to drop entities we used to ship but no longer do. An empty 586 // retained payload on a discovery topic is the documented "remove this 587 // entity" signal. Add to this list whenever we remove an entity so the 588 // broker stops carrying its retained config. 589 let retired = [ 590 // v0.2.0 dropped the diagnostic RSSI sensor. 591 format!("homeassistant/sensor/nightstand_{mac_hex}/rssi/config"), 592 // v0.2.1 changed the button from `event` (no resting state) to 593 // `sensor` (idle/short/long/double). 594 format!("homeassistant/event/nightstand_{mac_hex}/button/config"), 595 // v0.3.3 split the update entity to a JSON state_topic so we can 596 // surface progress; the old plain-string `update/installed` 597 // retains stale config after the discovery payload changed. 598 format!("nightstand/{mac_hex}/update/installed"), 599 ]; 600 for topic in &retired { 601 if let Err(e) = client.publish(topic, QoS::AtLeastOnce, true, b"") { 602 warn!("publish retired-entity removal {topic} failed: {e}"); 603 } 604 } 605 606 for entry in discovery::all(mac_hex, sw_version) { 607 if let Err(e) = client.publish( 608 &entry.topic, 609 QoS::AtLeastOnce, 610 true, 611 entry.payload.as_bytes(), 612 ) { 613 warn!("publish discovery {} failed: {e}", entry.topic); 614 } 615 } 616 617 // Seed the button sensor with a retained "idle" so reconnecting clients 618 // (and HA on first discovery) see a stable resting state. 619 publish_button_idle(client, button_topic); 620 621 // Publish our installed firmware version + any in-flight OTA progress 622 // as a single retained JSON to the update entity's state_topic. HA 623 // reads installed_version, in_progress, and update_percentage from it. 624 publish_update_state(client, update_state_topic, sw_version, ota_progress); 625 626 if let Some(snap) = last_snapshot { 627 publish_state(client, state_topic, snap, boot_at); 628 } 629 630 if let Err(e) = client.subscribe(cmd_filter, QoS::AtLeastOnce) { 631 warn!("subscribe {cmd_filter} failed: {e}"); 632 } 633 // Subscribe to the shared latest-version topic. Retained, so the broker 634 // delivers the current value immediately (if any) and we cache it. 635 if let Err(e) = client.subscribe(discovery::SHARED_LATEST_VERSION_TOPIC, QoS::AtLeastOnce) { 636 warn!( 637 "subscribe {} failed: {e}", 638 discovery::SHARED_LATEST_VERSION_TOPIC 639 ); 640 } 641} 642 643/// Publish the JSON `state_topic` for HA's update entity. `installed_version` 644/// is always present; `in_progress` and `update_percentage` are added when 645/// an OTA is mid-download. Retained so HA picks up the current state on 646/// discovery + restart without waiting for the next change. 647fn publish_update_state( 648 client: &mut EspMqttClient<'_>, 649 update_state_topic: &str, 650 sw_version: &str, 651 ota_progress: Option<u8>, 652) { 653 let payload = match ota_progress { 654 Some(pct) => format!( 655 r#"{{"installed_version":"{sw}","in_progress":true,"update_percentage":{pct}}}"#, 656 sw = sw_version, 657 pct = pct, 658 ), 659 None => format!( 660 r#"{{"installed_version":"{sw}","in_progress":false}}"#, 661 sw = sw_version 662 ), 663 }; 664 if let Err(e) = client.publish(update_state_topic, QoS::AtLeastOnce, true, payload.as_bytes()) 665 { 666 warn!("publish update state failed: {e}"); 667 } 668} 669 670/// Triggered when HA publishes "install" to `cmd/update`. Builds the 671/// firmware URL from the cached latest version and the configured 672/// `ota_url_base`, then spawns a worker thread that does the chunked 673/// download with progress callbacks. Returns `true` if the install was 674/// accepted (so the caller can update its local progress state). 675fn handle_ota_install( 676 latest: Option<VersionBuf>, 677 led_tx: &Sender<LedSignal>, 678 in_progress: &Arc<AtomicBool>, 679 msg_tx: &Sender<NetTaskMsg>, 680) -> bool { 681 let Some(version) = latest else { 682 warn!("OTA install requested but no latest_version cached yet — ignoring"); 683 return false; 684 }; 685 if CONFIG.ota_url_base.is_empty() { 686 warn!("OTA install requested but ota_url_base is empty in cfg.toml — ignoring"); 687 return false; 688 } 689 if in_progress 690 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) 691 .is_err() 692 { 693 warn!("OTA install requested but one is already in progress — ignoring"); 694 return false; 695 } 696 697 let base = CONFIG.ota_url_base.trim_end_matches('/'); 698 let url = format!("{base}/sound-machine-{}.bin", version.as_str()); 699 info!("OTA install: kicking download thread for {url}"); 700 let _ = led_tx.send(LedSignal::Updating); 701 702 let progress_tx = msg_tx.clone(); 703 let finished_tx = msg_tx.clone(); 704 if let Err(e) = Builder::new() 705 .name("ota".into()) 706 .stack_size(OTA_THREAD_STACK) 707 .spawn(move || { 708 let result = ota::download_and_install(&url, |pct| { 709 let _ = progress_tx.try_send(NetTaskMsg::OtaProgress(pct)); 710 }); 711 match result { 712 Ok(()) => { 713 // No need to send OtaFinished(true) — we're about to 714 // reboot, the network state thread won't get a chance 715 // to act on it. esp_restart returns `!`. 716 info!("OTA: rebooting into new firmware"); 717 esp_idf_svc::hal::reset::restart(); 718 } 719 Err(e) => { 720 warn!("OTA: download_and_install failed: {e}"); 721 let _ = finished_tx.send(NetTaskMsg::OtaFinished(false)); 722 } 723 } 724 }) 725 { 726 warn!("OTA: failed to spawn worker thread: {e}"); 727 in_progress.store(false, Ordering::SeqCst); 728 return false; 729 } 730 true 731} 732 733const OTA_THREAD_STACK: usize = 12 * 1024; 734 735/// Publish a retained "idle" state to the button topic. Called on connect 736/// and after the BUTTON_IDLE_AFTER_MS window following any gesture. 737fn publish_button_idle(client: &mut EspMqttClient<'_>, button_topic: &str) { 738 if let Err(e) = client.publish( 739 button_topic, 740 QoS::AtLeastOnce, 741 true, 742 br#"{"event_type":"idle"}"#, 743 ) { 744 warn!("publish button idle failed: {e}"); 745 } 746} 747 748fn publish_state( 749 client: &mut EspMqttClient<'_>, 750 state_topic: &str, 751 snap: StateSnapshot, 752 boot_at: Instant, 753) { 754 let playing = if snap.playing { "ON" } else { "OFF" }; 755 let uptime_s = boot_at.elapsed().as_secs(); 756 let payload = format!( 757 r#"{{"playing":"{}","volume":{},"uptime_s":{}}}"#, 758 playing, snap.volume_pct, uptime_s 759 ); 760 if let Err(e) = client.publish(state_topic, QoS::AtLeastOnce, true, payload.as_bytes()) { 761 warn!("publish state failed: {e}"); 762 } 763} 764 765/// Set the netif hostname via the raw C API. The Rust trait method is 766/// crate-private in esp-idf-svc 0.51, so we punch through to the C call. 767/// Failure is logged and ignored — DHCP-visible hostname is a nicety, not 768/// a correctness requirement. 769fn set_hostname(netif: &esp_idf_svc::netif::EspNetif, hostname: &str) { 770 use esp_idf_svc::handle::RawHandle; 771 use esp_idf_svc::sys::{esp, esp_netif_set_hostname}; 772 use std::ffi::CString; 773 let Ok(cstr) = CString::new(hostname) else { 774 warn!("hostname contains nul byte; skipping set_hostname"); 775 return; 776 }; 777 let res = unsafe { esp!(esp_netif_set_hostname(netif.handle(), cstr.as_ptr())) }; 778 if let Err(e) = res { 779 warn!("set_hostname failed (continuing): {e:?}"); 780 } 781} 782 783/// Format a 6-byte MAC as a 12-char lowercase hex string. 784fn format_mac(mac: [u8; 6]) -> String { 785 let mut s = String::with_capacity(12); 786 for b in mac { 787 use std::fmt::Write; 788 let _ = write!(s, "{:02x}", b); 789 } 790 s 791} 792 793#[cfg(test)] 794mod tests { 795 use super::*; 796 797 #[test] 798 fn format_mac_lowercase_hex() { 799 assert_eq!( 800 format_mac([0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]), 801 "aabbccddeeff" 802 ); 803 assert_eq!(format_mac([0, 0, 0, 0, 0, 0]), "000000000000"); 804 assert_eq!(format_mac([0x01, 0x02, 0x03, 0x04, 0x05, 0x06]), "010203040506"); 805 } 806}