ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

channel-based status updates (#410)

Signed-off-by: marshmallow <git@althaea.zone>

authored by

marshmallow and committed by
GitHub
bf3e31bb d438a4e3

+198 -141
+4
CHANGELOG.md
··· 7 7 8 8 ## [Unreleased] - yyyy-mm-dd 9 9 10 + ### Fixed 11 + 12 + - Status bar is cleaned every time after execution is completed. 13 + 10 14 ## [v1.2.0] - 2026-03-18 11 15 12 16 ### Added
+10 -8
crates/cli/src/apply.rs
··· 6 6 use miette::{Diagnostic, IntoDiagnostic, Result}; 7 7 use std::any::Any; 8 8 use std::collections::HashSet; 9 - use std::io::{Read, stderr}; 9 + use std::io::Read; 10 10 use std::sync::Arc; 11 11 use std::sync::atomic::AtomicBool; 12 12 use thiserror::Error; ··· 15 15 use wire_core::hive::node::{Name, Node}; 16 16 use wire_core::hive::plan::{Goal, plan_for_node}; 17 17 use wire_core::hive::{Hive, HiveLocation}; 18 - use wire_core::status::STATUS; 18 + use wire_core::status::{UI_SENDER, UiMessage}; 19 19 use wire_core::{SubCommandModifiers, errors::HiveLibError}; 20 20 21 21 use crate::cli::{ApplyTarget, CommonVerbArgs, Partitions}; ··· 132 132 ); 133 133 } 134 134 135 - STATUS 136 - .lock() 137 - .add_many(&partitioned_names.iter().collect::<Vec<_>>()); 135 + if let Some(tx) = UI_SENDER.get() { 136 + let _ = tx.send(UiMessage::AddMany(partitioned_names.clone())); 137 + } 138 138 139 139 let mut set = hive 140 140 .nodes ··· 178 178 ); 179 179 } 180 180 181 - if !errors.is_empty() { 182 - // clear the status bar if we are about to print error messages 183 - STATUS.lock().clear(&mut stderr()); 181 + // clear the status bar at the end of execution. 182 + if let Some(tx) = UI_SENDER.get() { 183 + let _ = tx.send(UiMessage::Clear); 184 + } 184 185 186 + if !errors.is_empty() { 185 187 return Err(NodeErrors( 186 188 errors 187 189 .into_iter()
+19 -65
crates/cli/src/tracing_setup.rs
··· 1 1 // SPDX-License-Identifier: AGPL-3.0-or-later 2 2 // Copyright 2024-2025 wire Contributors 3 3 4 - use std::{ 5 - collections::VecDeque, 6 - io::{self, Stderr, Write, stderr}, 7 - time::Duration, 8 - }; 4 + use std::io::{self, Write}; 9 5 10 6 use clap_verbosity_flag::{LogLevel, Verbosity}; 11 7 use owo_colors::{OwoColorize, Stream, Style}; 8 + use tokio::sync::mpsc; 12 9 use tracing::{Level, Subscriber}; 13 10 use tracing_log::AsTrace; 14 11 use tracing_subscriber::{ ··· 22 19 registry::LookupSpan, 23 20 util::SubscriberInitExt, 24 21 }; 25 - use wire_core::{STDIN_CLOBBER_LOCK, status::STATUS}; 22 + use wire_core::status::{UI_SENDER, UiMessage}; 26 23 27 - /// The non-clobbering writer ensures that log lines are held while interactive 28 - /// prompts are shown to the user. If logs where shown, they would "clobber" the 29 - /// sudo / ssh prompt. 30 - /// 31 - /// Additionally, the `STDIN_CLOBBER_LOCK` is used to ensure that no two 32 - /// interactive prompts are shown at the same time. 33 - struct NonClobberingWriter { 34 - queue: VecDeque<Vec<u8>>, 35 - stderr: Stderr, 36 - } 24 + /// Forwards log lines to the UI worker over `UI_SENDER`. 25 + struct NonClobberingWriter; 37 26 38 27 impl NonClobberingWriter { 39 - fn new() -> Self { 40 - NonClobberingWriter { 41 - queue: VecDeque::with_capacity(100), 42 - stderr: stderr(), 43 - } 44 - } 45 - 46 - /// expects the caller to write the status line 47 - fn dump_previous(&mut self) -> Result<(), io::Error> { 48 - STATUS.lock().clear(&mut self.stderr); 49 - 50 - for buf in self.queue.iter().rev() { 51 - self.stderr.write(buf).map(|_| ())?; 52 - } 53 - 54 - Ok(()) 28 + const fn new() -> Self { 29 + NonClobberingWriter 55 30 } 56 31 } 57 32 58 33 impl Write for NonClobberingWriter { 59 - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 60 - if let 1.. = STDIN_CLOBBER_LOCK.available_permits() { 61 - self.dump_previous().map(|()| 0)?; 62 - 63 - STATUS.lock().write_above_status(buf, &mut self.stderr) 64 - } else { 65 - self.queue.push_front(buf.to_vec()); 66 - 67 - Ok(buf.len()) 34 + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 35 + if let Some(tx) = UI_SENDER.get() { 36 + let _ = tx.send(UiMessage::LogLine(buf.to_vec())); 68 37 } 38 + 39 + Ok(buf.len()) 69 40 } 70 41 71 - fn flush(&mut self) -> std::io::Result<()> { 72 - self.stderr.flush() 42 + fn flush(&mut self) -> io::Result<()> { 43 + Ok(()) 73 44 } 74 45 } 75 46 ··· 231 202 } 232 203 } 233 204 234 - async fn status_tick_worker() { 235 - let mut interval = tokio::time::interval(Duration::from_secs(1)); 236 - let mut stderr = stderr(); 237 - 238 - loop { 239 - interval.tick().await; 240 - 241 - if STDIN_CLOBBER_LOCK.available_permits() < 1 { 242 - continue; 243 - } 244 - 245 - let mut status = STATUS.lock(); 246 - 247 - status.clear(&mut stderr); 248 - status.write_status(&mut stderr); 249 - } 250 - } 251 - 252 205 /// Set up logging for the application 253 206 /// Uses `WireFieldFormat` if -v was never passed 254 207 pub fn setup_logging<L: LogLevel>(verbosity: &Verbosity<L>, show_progress: bool) { 255 208 let filter = verbosity.log_level_filter().as_trace(); 256 209 let registry = tracing_subscriber::registry(); 257 210 258 - STATUS.lock().show_progress(show_progress); 211 + let (tx, rx) = mpsc::unbounded_channel(); 212 + UI_SENDER 213 + .set(tx) 214 + .expect("expected setup_logging to the first and only .set() of `UI_SENDER`"); 259 215 260 216 // spawn worker to tick the status bar 261 - if show_progress { 262 - tokio::spawn(status_tick_worker()); 263 - } 217 + tokio::spawn(wire_core::status::status_tick_worker(rx, show_progress)); 264 218 265 219 if verbosity.is_present() { 266 220 let layer = tracing_subscriber::fmt::layer()
+10 -10
crates/core/src/commands/pty/mod.rs
··· 3 3 4 4 use crate::commands::pty::output::{WatchStdoutArguments, handle_pty_stdout}; 5 5 use crate::hive::node::SharedTarget; 6 - use crate::status::STATUS; 6 + use crate::status::{UI_SENDER, UiMessage}; 7 7 use aho_corasick::PatternID; 8 8 use itertools::Itertools; 9 9 use nix::sys::termios::{LocalFlags, SetArg, Termios, tcgetattr, tcsetattr}; ··· 12 12 use portable_pty::{CommandBuilder, NativePtySystem, PtyPair, PtySize}; 13 13 use rand::distr::Alphabetic; 14 14 use std::collections::VecDeque; 15 - use std::io::stderr; 16 15 use std::sync::{LazyLock, Mutex}; 17 16 use std::{ 18 17 io::{Read, Write}, ··· 262 261 "localhost (!)".to_string() 263 262 }; 264 263 265 - let _ = STATUS.lock().write_above_status( 266 - &format!( 267 - "{target_display} | Authenticate for \"sudo {}\":\n", 268 - arguments.command_string.as_ref() 269 - ) 270 - .into_bytes(), 271 - &mut stderr(), 272 - ); 264 + if let Some(tx) = UI_SENDER.get() { 265 + let _ = tx.send(UiMessage::LogLine( 266 + format!( 267 + "{target_display} | Authenticate for \"sudo {}\":\n", 268 + arguments.command_string.as_ref() 269 + ) 270 + .into_bytes(), 271 + )); 272 + } 273 273 274 274 Ok(()) 275 275 }
+22 -7
crates/core/src/hive/executor.rs
··· 1 - use crate::hive::node::Step; 1 + use crate::{ 2 + hive::node::Step, 3 + status::{NodeStatus, UI_SENDER, UiMessage}, 4 + }; 2 5 use std::{assert_matches::debug_assert_matches, sync::Arc}; 3 6 4 7 use tracing::{Instrument, Span, debug, error, event, instrument}; ··· 12 15 node::{Context, Derivation, ExecuteStep, Name}, 13 16 plan::NodePlan, 14 17 }, 15 - status::STATUS, 16 18 }; 17 19 18 20 /// returns Err if the application should shut down. ··· 96 98 progress = format!("{}/{length}", position + 1) 97 99 ); 98 100 99 - STATUS 100 - .lock() 101 - .set_node_step(&plan.context.name, step.to_string()); 101 + if let Some(tx) = UI_SENDER.get() { 102 + let _ = tx.send(UiMessage::SetStatus( 103 + plan.context.name.clone(), 104 + NodeStatus::Running(step.to_string()), 105 + )); 106 + } 102 107 103 108 if let Err(err) = step.execute(&mut plan.context).await.inspect_err(|_| { 104 109 error!("Failed to execute `{step}`"); ··· 107 112 return Ok(()); 108 113 } 109 114 110 - STATUS.lock().mark_node_failed(&plan.context.name); 115 + if let Some(tx) = UI_SENDER.get() { 116 + let _ = tx.send(UiMessage::SetStatus( 117 + plan.context.name.clone(), 118 + NodeStatus::Failed, 119 + )); 120 + } 111 121 112 122 return Err(err); 113 123 } 114 124 } 115 125 116 - STATUS.lock().mark_node_succeeded(&plan.context.name); 126 + if let Some(tx) = UI_SENDER.get() { 127 + let _ = tx.send(UiMessage::SetStatus( 128 + plan.context.name.clone(), 129 + NodeStatus::Succeeded, 130 + )); 131 + } 117 132 118 133 Ok(()) 119 134 }
+33 -8
crates/core/src/lib.rs
··· 6 6 #![feature(sync_nonpoison)] 7 7 #![feature(nonpoison_mutex)] 8 8 9 - use std::{ 10 - io::{IsTerminal, stderr}, 11 - sync::LazyLock, 12 - }; 9 + use std::{io::IsTerminal, sync::LazyLock}; 13 10 14 - use tokio::sync::{AcquireError, Semaphore, SemaphorePermit}; 11 + use tokio::sync::{AcquireError, Semaphore, SemaphorePermit, mpsc::UnboundedSender, oneshot}; 15 12 16 - use crate::{errors::HiveLibError, hive::node::Name, status::STATUS}; 13 + use crate::{ 14 + errors::HiveLibError, 15 + hive::node::Name, 16 + status::{UI_SENDER, UiMessage}, 17 + }; 17 18 18 19 pub mod cache; 19 20 pub mod commands; ··· 63 64 64 65 pub static STDIN_CLOBBER_LOCK: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); 65 66 66 - pub async fn acquire_stdin_lock<'a>() -> Result<SemaphorePermit<'a>, AcquireError> { 67 + /// `SemaphorePermit` that sends a `UiMessage::Release` on drop 68 + pub struct ClobberGuard<'a>( 69 + #[allow(unused)] SemaphorePermit<'a>, 70 + Option<&'a UnboundedSender<UiMessage>>, 71 + ); 72 + 73 + impl Drop for ClobberGuard<'_> { 74 + fn drop(&mut self) { 75 + if let Some(tx) = self.1 { 76 + let _ = tx.send(UiMessage::Release); 77 + } 78 + } 79 + } 80 + 81 + pub async fn acquire_stdin_lock<'a>() -> Result<ClobberGuard<'a>, AcquireError> { 67 82 let result = STDIN_CLOBBER_LOCK.acquire().await?; 68 - STATUS.lock().wipe_out(&mut stderr()); 83 + let (sender, rx) = oneshot::channel(); 84 + let tx = UI_SENDER.get(); 85 + 86 + if let Some(tx) = tx { 87 + let _ = tx.send(UiMessage::Takeover(sender)); 88 + 89 + // wait until takeover is confirmed 90 + let _ = rx.await; 91 + } 92 + 93 + let result = ClobberGuard(result, tx); 69 94 70 95 Ok(result) 71 96 }
+100 -43
crates/core/src/status.rs
··· 2 2 // Copyright 2024-2025 wire Contributors 3 3 4 4 use owo_colors::OwoColorize; 5 - use std::{fmt::Write, time::Instant}; 5 + use std::{ 6 + collections::VecDeque, 7 + fmt::Write, 8 + sync::OnceLock, 9 + time::{Duration, Instant}, 10 + }; 6 11 use termion::{clear, cursor}; 12 + use tokio::sync::{ 13 + mpsc::{self, UnboundedReceiver}, 14 + oneshot, 15 + }; 7 16 8 - use crate::{STDIN_CLOBBER_LOCK, hive::node::Name}; 17 + use crate::hive::node::Name; 9 18 10 - use std::{ 11 - collections::HashMap, 12 - sync::{LazyLock, nonpoison::Mutex}, 13 - }; 19 + use std::collections::HashMap; 14 20 15 21 #[derive(Default)] 16 22 pub enum NodeStatus { ··· 21 27 Failed, 22 28 } 23 29 30 + pub enum UiMessage { 31 + /// Initialise the status bar with many nodes at once. 32 + AddMany(Vec<Name>), 33 + SetStatus(Name, NodeStatus), 34 + /// Takeover the terminal, blocking new messages from being printed until 35 + /// `Release` is sent. 36 + /// 37 + /// Once the takeover request is completed, the oneshot channel will be 38 + /// consumed. 39 + Takeover(oneshot::Sender<()>), 40 + /// Indicate that the takeover is no longer necessary 41 + Release, 42 + /// Clear the status line, mostly for when the program is about to end 43 + Clear, 44 + /// Writes above the status line 45 + LogLine(Vec<u8>), 46 + } 47 + 24 48 pub struct Status { 25 49 statuses: HashMap<String, NodeStatus>, 26 50 began: Instant, 27 51 show_progress: bool, 28 52 } 29 53 30 - /// global status used for the progress bar in the cli crate 31 - pub static STATUS: LazyLock<Mutex<Status>> = LazyLock::new(|| Mutex::new(Status::new())); 54 + pub static UI_SENDER: OnceLock<mpsc::UnboundedSender<UiMessage>> = OnceLock::new(); 32 55 33 56 impl Status { 34 57 fn new() -> Self { ··· 41 64 42 65 pub const fn show_progress(&mut self, show_progress: bool) { 43 66 self.show_progress = show_progress; 44 - } 45 - 46 - pub fn add_many(&mut self, names: &[&Name]) { 47 - self.statuses.extend( 48 - names 49 - .iter() 50 - .map(|name| (name.0.to_string(), NodeStatus::Pending)), 51 - ); 52 - } 53 - 54 - pub fn set_node_step(&mut self, node: &Name, step: String) { 55 - self.statuses 56 - .insert(node.0.to_string(), NodeStatus::Running(step)); 57 - } 58 - 59 - pub fn mark_node_failed(&mut self, node: &Name) { 60 - self.statuses.insert(node.0.to_string(), NodeStatus::Failed); 61 - } 62 - 63 - pub fn mark_node_succeeded(&mut self, node: &Name) { 64 - self.statuses 65 - .insert(node.0.to_string(), NodeStatus::Succeeded); 66 67 } 67 68 68 69 #[must_use] ··· 153 154 let _ = write!(writer, "{}", self.get_msg()); 154 155 } 155 156 } 157 + } 156 158 157 - pub fn write_above_status<T: std::io::Write>( 158 - &mut self, 159 - buf: &[u8], 160 - writer: &mut T, 161 - ) -> std::io::Result<usize> { 162 - if STDIN_CLOBBER_LOCK.available_permits() != 1 { 163 - // skip 164 - return Ok(0); 165 - } 159 + pub async fn status_tick_worker(mut rx: UnboundedReceiver<UiMessage>, show_progress: bool) { 160 + let mut status = Status::new(); 161 + 162 + status.show_progress(show_progress); 163 + 164 + let mut ticker = tokio::time::interval(Duration::from_secs(1)); 165 + let mut stderr = std::io::stderr(); 166 + let mut log_queue: VecDeque<Vec<u8>> = VecDeque::with_capacity(100); 167 + 168 + // A single boolean represents the "taken over" state, where stdin is being 169 + // accepted from the user. A "depth" is not used as it is expected the 170 + // callers of `Takeover` respect the Semaphore. 171 + // 172 + // If there was ever multiple take overs at once (unlikely), this code would 173 + // need to be updated to track multiple takeovers at once. 174 + let mut taken_over = false; 175 + 176 + loop { 177 + tokio::select! { 178 + Some(msg) = rx.recv() => { 179 + match msg { 180 + UiMessage::AddMany(names) => { 181 + status.statuses.extend( 182 + names 183 + .iter() 184 + .map(|name| (name.0.to_string(), NodeStatus::Pending)), 185 + ); 186 + }, 187 + UiMessage::SetStatus(name, value) => { 188 + status.statuses.insert(name.0.to_string(), value); 189 + }, 190 + UiMessage::Takeover(tx) => { 191 + taken_over = true; 192 + status.wipe_out(&mut stderr); 193 + let _ = tx.send(()); 194 + }, 195 + UiMessage::Release => { 196 + taken_over = false; 197 + for buf in log_queue.drain(..) { 198 + let _ = std::io::Write::write_all(&mut stderr, &buf); 199 + } 200 + status.write_status(&mut stderr); 201 + }, 202 + UiMessage::Clear => { 203 + status.clear(&mut stderr); 204 + }, 205 + UiMessage::LogLine(line) => { 206 + if taken_over { 207 + log_queue.push_back(line); 208 + } else { 209 + status.clear(&mut stderr); 210 + for buf in log_queue.drain(..) { 211 + let _ = std::io::Write::write_all(&mut stderr, &buf); 212 + } 213 + let _ = std::io::Write::write_all(&mut stderr, &line); 214 + status.write_status(&mut stderr); 215 + } 216 + }, 217 + } 218 + } 166 219 167 - self.clear(writer); 168 - let written = writer.write(buf)?; 169 - self.write_status(writer); 220 + _ = ticker.tick() => { 221 + if taken_over { 222 + continue; 223 + } 170 224 171 - Ok(written) 225 + status.clear(&mut stderr); 226 + status.write_status(&mut stderr); 227 + } 228 + } 172 229 } 173 230 }
index.scip

This is a binary file and will not be displayed.