ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

a little int runner cleanup (#342)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>

authored by

marshmallow
autofix-ci[bot]
and committed by
GitHub
038edd83 f953d9cb

+963 -930
+6 -15
wire/cli/src/tracing_setup.rs
··· 4 4 use std::{ 5 5 collections::VecDeque, 6 6 io::{self, Stderr, Write, stderr}, 7 - sync::TryLockError, 8 7 }; 9 8 10 9 use clap_verbosity_flag::{LogLevel, Verbosity}; ··· 56 55 57 56 impl Write for NonClobberingWriter { 58 57 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 59 - match STDIN_CLOBBER_LOCK.clone().try_lock() { 60 - Ok(_) => { 61 - self.dump_previous().map(|()| 0)?; 58 + if let 1.. = STDIN_CLOBBER_LOCK.available_permits() { 59 + self.dump_previous().map(|()| 0)?; 62 60 63 - self.stderr.write(buf) 64 - } 65 - Err(e) => match e { 66 - TryLockError::Poisoned(_) => { 67 - panic!("Internal stdout clobber lock is posioned. Please create an issue."); 68 - } 69 - TryLockError::WouldBlock => { 70 - self.queue.push_front(buf.to_vec()); 61 + self.stderr.write(buf) 62 + } else { 63 + self.queue.push_front(buf.to_vec()); 71 64 72 - Ok(buf.len()) 73 - } 74 - }, 65 + Ok(buf.len()) 75 66 } 76 67 } 77 68
+4 -2
wire/lib/src/commands/common.rs
··· 37 37 .target 38 38 .create_ssh_opts(context.modifiers, false)?, 39 39 )]), 40 - )?; 40 + ) 41 + .await?; 41 42 42 43 child 43 44 .wait_till_success() ··· 95 96 let child = run_command( 96 97 &CommandArguments::new(command_string, modifiers) 97 98 .mode(crate::commands::ChildOutputMode::Nix), 98 - )?; 99 + ) 100 + .await?; 99 101 100 102 child 101 103 .wait_till_success()
-891
wire/lib/src/commands/interactive.rs
··· 1 - // SPDX-License-Identifier: AGPL-3.0-or-later 2 - // Copyright 2024-2025 wire Contributors 3 - 4 - use aho_corasick::{AhoCorasick, PatternID}; 5 - use itertools::Itertools; 6 - use nix::sys::termios::{LocalFlags, SetArg, Termios, tcgetattr, tcsetattr}; 7 - use nix::{ 8 - poll::{PollFd, PollFlags, PollTimeout, poll}, 9 - unistd::{pipe as posix_pipe, read as posix_read, write as posix_write}, 10 - }; 11 - use portable_pty::{CommandBuilder, NativePtySystem, PtyPair, PtySize}; 12 - use rand::distr::Alphabetic; 13 - use std::collections::VecDeque; 14 - use std::sync::mpsc::{self, Sender}; 15 - use std::sync::{Condvar, LazyLock, Mutex}; 16 - use std::thread::JoinHandle; 17 - use std::{ 18 - io::{Read, Write}, 19 - os::fd::{AsFd, OwnedFd}, 20 - sync::Arc, 21 - }; 22 - use tracing::instrument; 23 - use tracing::{Span, debug, error, trace, warn}; 24 - 25 - use crate::commands::CommandArguments; 26 - use crate::commands::interactive_logbuffer::LogBuffer; 27 - use crate::errors::CommandError; 28 - use crate::{STDIN_CLOBBER_LOCK, SubCommandModifiers}; 29 - use crate::{ 30 - commands::{ChildOutputMode, WireCommandChip}, 31 - errors::HiveLibError, 32 - hive::node::Target, 33 - }; 34 - 35 - type MasterWriter = Box<dyn Write + Send>; 36 - type MasterReader = Box<dyn Read + Send>; 37 - type Child = Box<dyn portable_pty::Child + Send + Sync>; 38 - 39 - pub(crate) struct InteractiveChildChip { 40 - child: Child, 41 - 42 - cancel_stdin_pipe_w: OwnedFd, 43 - write_stdin_pipe_w: OwnedFd, 44 - 45 - stderr_collection: Arc<Mutex<VecDeque<String>>>, 46 - stdout_collection: Arc<Mutex<VecDeque<String>>>, 47 - 48 - original_command: String, 49 - 50 - completion_status: Arc<CompletionStatus>, 51 - stdout_handle: JoinHandle<Result<(), CommandError>>, 52 - } 53 - 54 - struct StdinTermiosAttrGuard(Termios); 55 - 56 - struct CompletionStatus { 57 - completed: Mutex<bool>, 58 - success: Mutex<Option<bool>>, 59 - condvar: Condvar, 60 - } 61 - 62 - struct WatchStdoutArguments { 63 - began_tx: Sender<()>, 64 - reader: MasterReader, 65 - succeed_needle: Arc<Vec<u8>>, 66 - failed_needle: Arc<Vec<u8>>, 67 - start_needle: Arc<Vec<u8>>, 68 - output_mode: ChildOutputMode, 69 - stderr_collection: Arc<Mutex<VecDeque<String>>>, 70 - stdout_collection: Arc<Mutex<VecDeque<String>>>, 71 - completion_status: Arc<CompletionStatus>, 72 - span: Span, 73 - log_stdout: bool, 74 - } 75 - 76 - #[derive(Debug)] 77 - enum SearchFindings { 78 - None, 79 - Started, 80 - Terminate, 81 - } 82 - 83 - /// the underlying command began 84 - const THREAD_BEGAN_SIGNAL: &[u8; 1] = b"b"; 85 - const THREAD_QUIT_SIGNAL: &[u8; 1] = b"q"; 86 - 87 - static STARTED_PATTERN: LazyLock<PatternID> = LazyLock::new(|| PatternID::must(0)); 88 - static SUCCEEDED_PATTERN: LazyLock<PatternID> = LazyLock::new(|| PatternID::must(1)); 89 - static FAILED_PATTERN: LazyLock<PatternID> = LazyLock::new(|| PatternID::must(2)); 90 - 91 - /// substitutes STDOUT with #$line. stdout is far less common than stderr. 92 - const IO_SUBS: &str = "1> >(while IFS= read -r line; do echo \"#$line\"; done)"; 93 - 94 - fn create_ending_segment<S: AsRef<str>>( 95 - arguments: &CommandArguments<'_, S>, 96 - needles: Needles, 97 - ) -> String { 98 - let (succeed_needle, failed_needle, start_needle) = needles; 99 - 100 - format!( 101 - "echo -e '{succeed}' || echo '{failed}'", 102 - succeed = if matches!(arguments.output_mode, ChildOutputMode::Interactive) { 103 - format!( 104 - "{start}\\n{succeed}", 105 - start = String::from_utf8_lossy(&start_needle), 106 - succeed = String::from_utf8_lossy(&succeed_needle) 107 - ) 108 - } else { 109 - String::from_utf8_lossy(&succeed_needle).to_string() 110 - }, 111 - failed = String::from_utf8_lossy(&failed_needle) 112 - ) 113 - } 114 - 115 - fn create_starting_segment<S: AsRef<str>>( 116 - arguments: &CommandArguments<'_, S>, 117 - start_needle: &Arc<Vec<u8>>, 118 - ) -> String { 119 - if matches!(arguments.output_mode, ChildOutputMode::Interactive) { 120 - String::new() 121 - } else { 122 - format!( 123 - "echo '{start}' && ", 124 - start = String::from_utf8_lossy(start_needle) 125 - ) 126 - } 127 - } 128 - 129 - #[instrument(skip_all, name = "run-int", fields(elevated = %arguments.is_elevated()))] 130 - pub(crate) fn interactive_command_with_env<S: AsRef<str>>( 131 - arguments: &CommandArguments<S>, 132 - envs: std::collections::HashMap<String, String>, 133 - ) -> Result<InteractiveChildChip, HiveLibError> { 134 - print_authenticate_warning(arguments)?; 135 - 136 - let (succeed_needle, failed_needle, start_needle) = create_needles(); 137 - 138 - let pty_system = NativePtySystem::default(); 139 - let pty_pair = portable_pty::PtySystem::openpty(&pty_system, PtySize::default()).unwrap(); 140 - setup_master(&pty_pair)?; 141 - 142 - let command_string = &format!( 143 - "{starting}{command} {flags} {IO_SUBS} && {ending}", 144 - command = arguments.command_string.as_ref(), 145 - flags = match arguments.output_mode { 146 - ChildOutputMode::Nix => "--log-format internal-json", 147 - ChildOutputMode::Generic | ChildOutputMode::Interactive => "", 148 - }, 149 - starting = create_starting_segment(arguments, &start_needle), 150 - ending = create_ending_segment( 151 - arguments, 152 - ( 153 - succeed_needle.clone(), 154 - failed_needle.clone(), 155 - start_needle.clone() 156 - ) 157 - ) 158 - ); 159 - 160 - debug!("{command_string}"); 161 - 162 - let mut command = build_command(arguments, command_string)?; 163 - 164 - // give command all env vars 165 - for (key, value) in envs { 166 - command.env(key, value); 167 - } 168 - 169 - let clobber_guard = STDIN_CLOBBER_LOCK.lock().unwrap(); 170 - let _guard = StdinTermiosAttrGuard::new().map_err(HiveLibError::CommandError)?; 171 - let child = pty_pair 172 - .slave 173 - .spawn_command(command) 174 - .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 175 - 176 - // Release any handles owned by the slave: we don't need it now 177 - // that we've spawned the child. 178 - drop(pty_pair.slave); 179 - 180 - let reader = pty_pair 181 - .master 182 - .try_clone_reader() 183 - .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 184 - let master_writer = pty_pair 185 - .master 186 - .take_writer() 187 - .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 188 - 189 - let stderr_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 190 - let stdout_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 191 - let (began_tx, began_rx) = mpsc::channel::<()>(); 192 - let completion_status = Arc::new(CompletionStatus::new()); 193 - 194 - let stdout_handle = { 195 - let arguments = WatchStdoutArguments { 196 - began_tx, 197 - reader, 198 - succeed_needle: succeed_needle.clone(), 199 - failed_needle: failed_needle.clone(), 200 - start_needle: start_needle.clone(), 201 - output_mode: arguments.output_mode, 202 - stderr_collection: stderr_collection.clone(), 203 - stdout_collection: stdout_collection.clone(), 204 - completion_status: completion_status.clone(), 205 - span: Span::current(), 206 - log_stdout: arguments.log_stdout, 207 - }; 208 - 209 - std::thread::spawn(move || dynamic_watch_sudo_stdout(arguments)) 210 - }; 211 - 212 - let (write_stdin_pipe_r, write_stdin_pipe_w) = 213 - posix_pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 214 - let (cancel_stdin_pipe_r, cancel_stdin_pipe_w) = 215 - posix_pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 216 - 217 - std::thread::spawn(move || { 218 - watch_stdin_from_user( 219 - &cancel_stdin_pipe_r, 220 - master_writer, 221 - &write_stdin_pipe_r, 222 - Span::current(), 223 - ) 224 - }); 225 - 226 - debug!("Setup threads"); 227 - 228 - let () = began_rx 229 - .recv() 230 - .map_err(|x| HiveLibError::CommandError(CommandError::RecvError(x)))?; 231 - 232 - drop(clobber_guard); 233 - 234 - if arguments.keep_stdin_open { 235 - trace!("Sending THREAD_BEGAN_SIGNAL"); 236 - 237 - posix_write(&cancel_stdin_pipe_w, THREAD_BEGAN_SIGNAL) 238 - .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 239 - } else { 240 - trace!("Sending THREAD_QUIT_SIGNAL"); 241 - 242 - posix_write(&cancel_stdin_pipe_w, THREAD_QUIT_SIGNAL) 243 - .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 244 - } 245 - 246 - Ok(InteractiveChildChip { 247 - child, 248 - cancel_stdin_pipe_w, 249 - write_stdin_pipe_w, 250 - stderr_collection, 251 - stdout_collection, 252 - original_command: arguments.command_string.as_ref().to_string(), 253 - completion_status, 254 - stdout_handle, 255 - }) 256 - } 257 - 258 - fn print_authenticate_warning<S: AsRef<str>>( 259 - arguments: &CommandArguments<S>, 260 - ) -> Result<(), HiveLibError> { 261 - if !arguments.is_elevated() { 262 - return Ok(()); 263 - } 264 - 265 - eprintln!( 266 - "{} | Authenticate for \"sudo {}\":", 267 - arguments 268 - .target 269 - .map_or(Ok("localhost (!)".to_string()), |target| Ok(format!( 270 - "{}@{}:{}", 271 - target.user, 272 - target.get_preferred_host()?, 273 - target.port 274 - )))?, 275 - arguments.command_string.as_ref() 276 - ); 277 - 278 - Ok(()) 279 - } 280 - 281 - type Needles = (Arc<Vec<u8>>, Arc<Vec<u8>>, Arc<Vec<u8>>); 282 - 283 - fn create_needles() -> Needles { 284 - let tmp_prefix = rand::distr::SampleString::sample_string(&Alphabetic, &mut rand::rng(), 5); 285 - 286 - ( 287 - Arc::new(format!("{tmp_prefix}_W_Q").as_bytes().to_vec()), 288 - Arc::new(format!("{tmp_prefix}_W_F").as_bytes().to_vec()), 289 - Arc::new(format!("{tmp_prefix}_W_S").as_bytes().to_vec()), 290 - ) 291 - } 292 - 293 - fn setup_master(pty_pair: &PtyPair) -> Result<(), HiveLibError> { 294 - if let Some(fd) = pty_pair.master.as_raw_fd() { 295 - // convert raw fd to a BorrowedFd 296 - // safe as `fd` is dropped well before `pty_pair.master` 297 - let fd = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(fd) }; 298 - let mut termios = 299 - tcgetattr(fd).map_err(|x| HiveLibError::CommandError(CommandError::TermAttrs(x)))?; 300 - 301 - termios.local_flags &= !LocalFlags::ECHO; 302 - // Key agent does not work well without canonical mode 303 - termios.local_flags &= !LocalFlags::ICANON; 304 - // Actually quit 305 - termios.local_flags &= !LocalFlags::ISIG; 306 - 307 - tcsetattr(fd, SetArg::TCSANOW, &termios) 308 - .map_err(|x| HiveLibError::CommandError(CommandError::TermAttrs(x)))?; 309 - } 310 - 311 - Ok(()) 312 - } 313 - 314 - fn build_command<S: AsRef<str>>( 315 - arguments: &CommandArguments<'_, S>, 316 - command_string: &String, 317 - ) -> Result<CommandBuilder, HiveLibError> { 318 - let mut command = if let Some(target) = arguments.target { 319 - let mut command = create_sync_ssh_command(target, arguments.modifiers)?; 320 - 321 - // force ssh to use our pesudo terminal 322 - command.arg("-tt"); 323 - 324 - command 325 - } else { 326 - let mut command = portable_pty::CommandBuilder::new("sh"); 327 - 328 - command.arg("-c"); 329 - 330 - command 331 - }; 332 - 333 - if let Some(escalation_command) = &arguments.privilege_escalation_command { 334 - command.arg(format!("{escalation_command} sh -c '{command_string}'")); 335 - } else { 336 - command.arg(command_string); 337 - } 338 - 339 - Ok(command) 340 - } 341 - 342 - impl CompletionStatus { 343 - const fn new() -> Self { 344 - CompletionStatus { 345 - completed: Mutex::new(false), 346 - success: Mutex::new(None), 347 - condvar: Condvar::new(), 348 - } 349 - } 350 - 351 - fn mark_completed(&self, was_successful: bool) { 352 - let mut completed = self.completed.lock().unwrap(); 353 - let mut success = self.success.lock().unwrap(); 354 - 355 - *completed = true; 356 - *success = Some(was_successful); 357 - 358 - self.condvar.notify_all(); 359 - } 360 - 361 - fn wait(&self) -> Option<bool> { 362 - let mut completed = self.completed.lock().unwrap(); 363 - 364 - while !*completed { 365 - completed = self.condvar.wait(completed).unwrap(); 366 - } 367 - 368 - *self.success.lock().unwrap() 369 - } 370 - } 371 - 372 - impl WireCommandChip for InteractiveChildChip { 373 - type ExitStatus = (portable_pty::ExitStatus, String); 374 - 375 - #[instrument(skip_all)] 376 - async fn wait_till_success(mut self) -> Result<Self::ExitStatus, CommandError> { 377 - drop(self.write_stdin_pipe_w); 378 - 379 - let exit_status = tokio::task::spawn_blocking(move || self.child.wait()) 380 - .await 381 - .map_err(CommandError::JoinError)? 382 - .map_err(CommandError::WaitForStatus)?; 383 - 384 - debug!("exit_status: {exit_status:?}"); 385 - 386 - self.stdout_handle 387 - .join() 388 - .map_err(|_| CommandError::ThreadPanic)??; 389 - let success = self.completion_status.wait(); 390 - let _ = posix_write(&self.cancel_stdin_pipe_w, THREAD_QUIT_SIGNAL); 391 - 392 - if let Some(true) = success { 393 - let logs = self 394 - .stdout_collection 395 - .lock() 396 - .unwrap() 397 - .iter() 398 - .rev() 399 - .map(|x| x.trim()) 400 - .join("\n"); 401 - 402 - return Ok((exit_status, logs)); 403 - } 404 - 405 - debug!("child did not succeed"); 406 - 407 - let logs = self 408 - .stderr_collection 409 - .lock() 410 - .unwrap() 411 - .iter() 412 - .rev() 413 - .join("\n"); 414 - 415 - Err(CommandError::CommandFailed { 416 - command_ran: self.original_command, 417 - logs, 418 - code: format!("code {}", exit_status.exit_code()), 419 - reason: match success { 420 - Some(_) => "marked-unsuccessful", 421 - None => "child-crashed-before-succeeding", 422 - }, 423 - }) 424 - } 425 - 426 - async fn write_stdin(&mut self, data: Vec<u8>) -> Result<(), HiveLibError> { 427 - trace!("Writing {} bytes to stdin", data.len()); 428 - 429 - posix_write(&self.write_stdin_pipe_w, &data) 430 - .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 431 - 432 - Ok(()) 433 - } 434 - } 435 - 436 - impl StdinTermiosAttrGuard { 437 - fn new() -> Result<Self, CommandError> { 438 - let stdin = std::io::stdin(); 439 - let stdin_fd = stdin.as_fd(); 440 - 441 - let mut termios = tcgetattr(stdin_fd).map_err(CommandError::TermAttrs)?; 442 - let original_termios = termios.clone(); 443 - 444 - termios.local_flags &= !(LocalFlags::ECHO | LocalFlags::ICANON); 445 - tcsetattr(stdin_fd, SetArg::TCSANOW, &termios).map_err(CommandError::TermAttrs)?; 446 - 447 - Ok(StdinTermiosAttrGuard(original_termios)) 448 - } 449 - } 450 - 451 - impl Drop for StdinTermiosAttrGuard { 452 - fn drop(&mut self) { 453 - let stdin = std::io::stdin(); 454 - let stdin_fd = stdin.as_fd(); 455 - 456 - let _ = tcsetattr(stdin_fd, SetArg::TCSANOW, &self.0); 457 - } 458 - } 459 - 460 - fn create_sync_ssh_command( 461 - target: &Target, 462 - modifiers: SubCommandModifiers, 463 - ) -> Result<portable_pty::CommandBuilder, HiveLibError> { 464 - let mut command = portable_pty::CommandBuilder::new("ssh"); 465 - command.args(target.create_ssh_args(modifiers, false, false)?); 466 - command.arg(target.get_preferred_host()?.to_string()); 467 - Ok(command) 468 - } 469 - 470 - #[instrument(skip_all, name = "log", parent = arguments.span)] 471 - fn dynamic_watch_sudo_stdout(arguments: WatchStdoutArguments) -> Result<(), CommandError> { 472 - let WatchStdoutArguments { 473 - began_tx, 474 - mut reader, 475 - succeed_needle, 476 - failed_needle, 477 - start_needle, 478 - output_mode, 479 - stdout_collection, 480 - stderr_collection, 481 - completion_status, 482 - log_stdout, 483 - .. 484 - } = arguments; 485 - 486 - let aho_corasick = AhoCorasick::builder() 487 - .ascii_case_insensitive(false) 488 - .match_kind(aho_corasick::MatchKind::LeftmostFirst) 489 - .build([ 490 - start_needle.as_ref(), 491 - succeed_needle.as_ref(), 492 - failed_needle.as_ref(), 493 - ]) 494 - .unwrap(); 495 - 496 - let mut buffer = [0u8; 1024]; 497 - let mut stderr = std::io::stderr(); 498 - let mut began = false; 499 - let mut log_buffer = LogBuffer::new(); 500 - let mut raw_mode_buffer = Vec::new(); 501 - let mut belled = false; 502 - 503 - 'outer: loop { 504 - match reader.read(&mut buffer) { 505 - Ok(0) => break 'outer, 506 - Ok(n) => { 507 - if !began { 508 - let findings = handle_rawmode_data( 509 - &mut stderr, 510 - &buffer, 511 - n, 512 - &mut raw_mode_buffer, 513 - &aho_corasick, 514 - &completion_status, 515 - &began_tx, 516 - )?; 517 - 518 - match findings { 519 - SearchFindings::Terminate => break 'outer, 520 - SearchFindings::Started => { 521 - began = true; 522 - continue; 523 - } 524 - SearchFindings::None => {} 525 - } 526 - 527 - if belled { 528 - continue; 529 - } 530 - 531 - stderr 532 - .write(b"\x07") 533 - .map_err(CommandError::WritingClientStderr)?; 534 - stderr.flush().map_err(CommandError::WritingClientStderr)?; 535 - 536 - belled = true; 537 - 538 - continue; 539 - } 540 - 541 - log_buffer.process_slice(&buffer[..n]); 542 - 543 - while let Some(mut line) = log_buffer.next_line() { 544 - let findings = 545 - search_string(&aho_corasick, &line, &completion_status, &began_tx); 546 - 547 - match findings { 548 - SearchFindings::Terminate => break 'outer, 549 - SearchFindings::Started => { 550 - began = true; 551 - continue; 552 - } 553 - SearchFindings::None => {} 554 - } 555 - 556 - handle_normal_data( 557 - &stderr_collection, 558 - &stdout_collection, 559 - &mut line, 560 - log_stdout, 561 - output_mode, 562 - ); 563 - } 564 - } 565 - Err(e) => { 566 - eprintln!("Error reading from PTY: {e}"); 567 - break; 568 - } 569 - } 570 - } 571 - 572 - let _ = began_tx.send(()); 573 - 574 - // failsafe if there were errors or the reader stopped 575 - if !*completion_status.completed.lock().unwrap() { 576 - completion_status.mark_completed(false); 577 - } 578 - 579 - debug!("stdout: goodbye"); 580 - 581 - Ok(()) 582 - } 583 - 584 - fn handle_normal_data( 585 - stderr_collection: &Arc<Mutex<VecDeque<String>>>, 586 - stdout_collection: &Arc<Mutex<VecDeque<String>>>, 587 - line: &mut [u8], 588 - log_stdout: bool, 589 - output_mode: ChildOutputMode, 590 - ) { 591 - if line.starts_with(b"#") { 592 - let stripped = &mut line[1..]; 593 - 594 - if log_stdout { 595 - output_mode.trace_slice(stripped); 596 - } 597 - 598 - let mut queue = stdout_collection.lock().unwrap(); 599 - queue.push_front(String::from_utf8_lossy(stripped).to_string()); 600 - return; 601 - } 602 - 603 - let log = output_mode.trace_slice(line); 604 - 605 - if let Some(error_msg) = log { 606 - let mut queue = stderr_collection.lock().unwrap(); 607 - 608 - // add at most 20 message to the front, drop the rest. 609 - queue.push_front(error_msg); 610 - queue.truncate(20); 611 - } 612 - } 613 - 614 - fn handle_rawmode_data<W: std::io::Write>( 615 - stderr: &mut W, 616 - buffer: &[u8], 617 - n: usize, 618 - raw_mode_buffer: &mut Vec<u8>, 619 - aho_corasick: &AhoCorasick, 620 - completion_status: &CompletionStatus, 621 - began_tx: &Sender<()>, 622 - ) -> Result<SearchFindings, CommandError> { 623 - raw_mode_buffer.extend_from_slice(&buffer[..n]); 624 - 625 - let findings = search_string(aho_corasick, raw_mode_buffer, completion_status, began_tx); 626 - 627 - if !matches!(findings, SearchFindings::None) { 628 - return Ok(findings); 629 - } 630 - 631 - stderr 632 - .write_all(&buffer[..n]) 633 - .map_err(CommandError::WritingClientStderr)?; 634 - 635 - stderr.flush().map_err(CommandError::WritingClientStderr)?; 636 - 637 - Ok(findings) 638 - } 639 - 640 - /// returns true if the command is considered stopped 641 - fn search_string( 642 - aho_corasick: &AhoCorasick, 643 - haystack: &[u8], 644 - completion_status: &CompletionStatus, 645 - began_tx: &Sender<()>, 646 - ) -> SearchFindings { 647 - let searched = aho_corasick 648 - .find_iter(haystack) 649 - .map(|x| x.pattern()) 650 - .collect::<Vec<_>>(); 651 - 652 - let started = if searched.contains(&STARTED_PATTERN) { 653 - debug!("start needle was found, switching mode..."); 654 - let _ = began_tx.send(()); 655 - true 656 - } else { 657 - false 658 - }; 659 - 660 - let succeeded = if searched.contains(&SUCCEEDED_PATTERN) { 661 - debug!("succeed needle was found, marking child as succeeding."); 662 - completion_status.mark_completed(true); 663 - true 664 - } else { 665 - false 666 - }; 667 - 668 - let failed = if searched.contains(&FAILED_PATTERN) { 669 - debug!("failed needle was found, elevated child did not succeed."); 670 - completion_status.mark_completed(false); 671 - true 672 - } else { 673 - false 674 - }; 675 - 676 - if succeeded || failed { 677 - return SearchFindings::Terminate; 678 - } 679 - 680 - if started { 681 - return SearchFindings::Started; 682 - } 683 - 684 - SearchFindings::None 685 - } 686 - 687 - /// Exits on any data written to `cancel_pipe_r` 688 - #[instrument(skip_all, level = "trace", parent = span)] 689 - fn watch_stdin_from_user( 690 - cancel_pipe_r: &OwnedFd, 691 - mut master_writer: MasterWriter, 692 - write_pipe_r: &OwnedFd, 693 - span: Span, 694 - ) -> Result<(), CommandError> { 695 - const WRITER_POSITION: usize = 0; 696 - const SIGNAL_POSITION: usize = 1; 697 - const USER_POSITION: usize = 2; 698 - 699 - let mut buffer = [0u8; 1024]; 700 - let stdin = std::io::stdin(); 701 - let mut cancel_pipe_buf = [0u8; 1]; 702 - 703 - let user_stdin_fd = std::os::fd::AsFd::as_fd(&stdin); 704 - let cancel_pipe_r_fd = cancel_pipe_r.as_fd(); 705 - 706 - let mut all_fds = vec![ 707 - PollFd::new(write_pipe_r.as_fd(), PollFlags::POLLIN), 708 - PollFd::new(cancel_pipe_r.as_fd(), PollFlags::POLLIN), 709 - PollFd::new(user_stdin_fd, PollFlags::POLLIN), 710 - ]; 711 - 712 - loop { 713 - match poll(&mut all_fds, PollTimeout::NONE) { 714 - Ok(0) => {} // timeout, impossible 715 - Ok(_) => { 716 - // The user stdin pipe can be removed 717 - if all_fds.get(USER_POSITION).is_some() 718 - && let Some(events) = all_fds[USER_POSITION].revents() 719 - && events.contains(PollFlags::POLLIN) 720 - { 721 - trace!("Got stdin from user..."); 722 - let n = 723 - posix_read(user_stdin_fd, &mut buffer).map_err(CommandError::PosixPipe)?; 724 - master_writer 725 - .write_all(&buffer[..n]) 726 - .map_err(CommandError::WritingMasterStdout)?; 727 - master_writer 728 - .flush() 729 - .map_err(CommandError::WritingMasterStdout)?; 730 - } 731 - 732 - if let Some(events) = all_fds[WRITER_POSITION].revents() 733 - && events.contains(PollFlags::POLLIN) 734 - { 735 - trace!("Got stdin from writer..."); 736 - let n = 737 - posix_read(write_pipe_r, &mut buffer).map_err(CommandError::PosixPipe)?; 738 - master_writer 739 - .write_all(&buffer[..n]) 740 - .map_err(CommandError::WritingMasterStdout)?; 741 - master_writer 742 - .flush() 743 - .map_err(CommandError::WritingMasterStdout)?; 744 - } 745 - 746 - if let Some(events) = all_fds[SIGNAL_POSITION].revents() 747 - && events.contains(PollFlags::POLLIN) 748 - { 749 - let n = posix_read(cancel_pipe_r_fd, &mut cancel_pipe_buf) 750 - .map_err(CommandError::PosixPipe)?; 751 - let message = &cancel_pipe_buf[..n]; 752 - 753 - trace!("Got byte from signal pipe: {message:?}"); 754 - 755 - if message == THREAD_QUIT_SIGNAL { 756 - return Ok(()); 757 - } 758 - 759 - if message == THREAD_BEGAN_SIGNAL { 760 - all_fds.remove(USER_POSITION); 761 - } 762 - } 763 - } 764 - Err(e) => { 765 - error!("Poll error: {e}"); 766 - break; 767 - } 768 - } 769 - } 770 - 771 - debug!("stdin_thread: goodbye"); 772 - Ok(()) 773 - } 774 - 775 - #[cfg(test)] 776 - mod tests { 777 - use super::*; 778 - use std::{assert_matches::assert_matches, sync::mpsc::TryRecvError}; 779 - 780 - #[test] 781 - fn test_rawmode_data() { 782 - let aho_corasick = AhoCorasick::builder() 783 - .ascii_case_insensitive(false) 784 - .match_kind(aho_corasick::MatchKind::LeftmostFirst) 785 - .build(["START_NEEDLE", "SUCCEEDED_NEEDLE", "FAILED_NEEDLE"]) 786 - .unwrap(); 787 - let mut stderr = vec![]; 788 - let (began_tx, began_rx) = mpsc::channel::<()>(); 789 - let completion_status = CompletionStatus::new(); 790 - 791 - // each "Bla" is 4 bytes. 792 - let buffer = "bla bla bla START_NEEDLE bla bla bla".as_bytes(); 793 - let mut raw_mode_buffer = vec![]; 794 - 795 - // handle 1 "bla" 796 - assert_matches!( 797 - handle_rawmode_data( 798 - &mut stderr, 799 - buffer, 800 - 4, 801 - &mut raw_mode_buffer, 802 - &aho_corasick, 803 - &completion_status, 804 - &began_tx 805 - ), 806 - Ok(SearchFindings::None) 807 - ); 808 - assert_eq!(raw_mode_buffer, b"bla "); 809 - assert_matches!(began_rx.try_recv(), Err(TryRecvError::Empty)); 810 - assert!(!*completion_status.completed.lock().unwrap()); 811 - 812 - let buffer = &buffer[4..]; 813 - 814 - // handle 2 "bla"'s and half a "START_NEEDLE" 815 - let n = 4 + 4 + 6; 816 - assert_matches!( 817 - handle_rawmode_data( 818 - &mut stderr, 819 - buffer, 820 - n, 821 - &mut raw_mode_buffer, 822 - &aho_corasick, 823 - &completion_status, 824 - &began_tx 825 - ), 826 - Ok(SearchFindings::None) 827 - ); 828 - assert_matches!(began_rx.try_recv(), Err(TryRecvError::Empty)); 829 - assert_eq!(raw_mode_buffer, b"bla bla bla START_"); 830 - assert!(!*completion_status.completed.lock().unwrap()); 831 - 832 - let buffer = &buffer[n..]; 833 - 834 - // handle rest of the data 835 - let n = buffer.len(); 836 - assert_matches!( 837 - handle_rawmode_data( 838 - &mut stderr, 839 - buffer, 840 - n, 841 - &mut raw_mode_buffer, 842 - &aho_corasick, 843 - &completion_status, 844 - &began_tx 845 - ), 846 - Ok(SearchFindings::Started) 847 - ); 848 - assert_matches!(began_rx.try_recv(), Ok(())); 849 - assert_eq!(raw_mode_buffer, b"bla bla bla START_NEEDLE bla bla bla"); 850 - assert!(!*completion_status.completed.lock().unwrap()); 851 - 852 - // test failed needle 853 - let buffer = "bla FAILED_NEEDLE bla".as_bytes(); 854 - let mut raw_mode_buffer = vec![]; 855 - 856 - let n = buffer.len(); 857 - assert_matches!( 858 - handle_rawmode_data( 859 - &mut stderr, 860 - buffer, 861 - n, 862 - &mut raw_mode_buffer, 863 - &aho_corasick, 864 - &completion_status, 865 - &began_tx 866 - ), 867 - Ok(SearchFindings::Terminate) 868 - ); 869 - assert_matches!(*completion_status.success.lock().unwrap(), Some(false)); 870 - 871 - // test succeed needle 872 - let buffer = "bla SUCCEEDED_NEEDLE bla".as_bytes(); 873 - let mut raw_mode_buffer = vec![]; 874 - let completion_status = CompletionStatus::new(); 875 - 876 - let n = buffer.len(); 877 - assert_matches!( 878 - handle_rawmode_data( 879 - &mut stderr, 880 - buffer, 881 - n, 882 - &mut raw_mode_buffer, 883 - &aho_corasick, 884 - &completion_status, 885 - &began_tx 886 - ), 887 - Ok(SearchFindings::Terminate) 888 - ); 889 - assert_matches!(*completion_status.success.lock().unwrap(), Some(true)); 890 - } 891 - }
wire/lib/src/commands/interactive_logbuffer.rs wire/lib/src/commands/pty/logbuffer.rs
+9 -10
wire/lib/src/commands/mod.rs
··· 1 1 // SPDX-License-Identifier: AGPL-3.0-or-later 2 2 // Copyright 2024-2025 wire Contributors 3 3 4 + use crate::commands::pty::{InteractiveChildChip, interactive_command_with_env}; 4 5 use std::{collections::HashMap, str::from_utf8, sync::LazyLock}; 5 6 6 7 use aho_corasick::AhoCorasick; ··· 12 13 13 14 use crate::{ 14 15 SubCommandModifiers, 15 - commands::{ 16 - interactive::{InteractiveChildChip, interactive_command_with_env}, 17 - noninteractive::{NonInteractiveChildChip, non_interactive_command_with_env}, 18 - }, 16 + commands::noninteractive::{NonInteractiveChildChip, non_interactive_command_with_env}, 19 17 errors::{CommandError, HiveLibError}, 20 18 hive::node::{Node, Target}, 21 19 }; 22 20 23 21 pub(crate) mod common; 24 - pub(crate) mod interactive; 25 - pub(crate) mod interactive_logbuffer; 26 22 pub(crate) mod noninteractive; 23 + pub(crate) mod pty; 27 24 28 25 #[derive(Copy, Clone, Debug)] 29 26 pub(crate) enum ChildOutputMode { ··· 101 98 } 102 99 } 103 100 104 - pub(crate) fn run_command<S: AsRef<str>>( 101 + pub(crate) async fn run_command<S: AsRef<str>>( 105 102 arguments: &CommandArguments<'_, S>, 106 103 ) -> Result<Either<InteractiveChildChip, NonInteractiveChildChip>, HiveLibError> { 107 - run_command_with_env(arguments, HashMap::new()) 104 + run_command_with_env(arguments, HashMap::new()).await 108 105 } 109 106 110 - pub(crate) fn run_command_with_env<S: AsRef<str>>( 107 + pub(crate) async fn run_command_with_env<S: AsRef<str>>( 111 108 arguments: &CommandArguments<'_, S>, 112 109 envs: HashMap<String, String>, 113 110 ) -> Result<Either<InteractiveChildChip, NonInteractiveChildChip>, HiveLibError> { ··· 118 115 )?)); 119 116 } 120 117 121 - Ok(Either::Left(interactive_command_with_env(arguments, envs)?)) 118 + Ok(Either::Left( 119 + interactive_command_with_env(arguments, envs).await?, 120 + )) 122 121 } 123 122 124 123 pub(crate) trait WireCommandChip {
+102
wire/lib/src/commands/pty/input.rs
··· 1 + // SPDX-License-Identifier: AGPL-3.0-or-later 2 + // Copyright 2024-2025 wire Contributors 3 + 4 + use std::os::fd::{AsFd, OwnedFd}; 5 + 6 + use nix::{ 7 + poll::{PollFd, PollFlags, PollTimeout, poll}, 8 + unistd::read, 9 + }; 10 + use tracing::{Span, debug, error, instrument, trace}; 11 + 12 + use crate::{ 13 + commands::pty::{MasterWriter, THREAD_BEGAN_SIGNAL, THREAD_QUIT_SIGNAL}, 14 + errors::CommandError, 15 + }; 16 + 17 + /// Exits on any data written to `cancel_pipe_r` 18 + /// A pipe is used to cancel the function. 19 + #[instrument(skip_all, level = "trace", parent = span)] 20 + pub(super) fn watch_stdin_from_user( 21 + cancel_pipe_r: &OwnedFd, 22 + mut master_writer: MasterWriter, 23 + write_pipe_r: &OwnedFd, 24 + span: Span, 25 + ) -> Result<(), CommandError> { 26 + const WRITER_POSITION: usize = 0; 27 + const SIGNAL_POSITION: usize = 1; 28 + const USER_POSITION: usize = 2; 29 + 30 + let mut buffer = [0u8; 1024]; 31 + let stdin = std::io::stdin(); 32 + let mut cancel_pipe_buf = [0u8; 1]; 33 + 34 + let user_stdin_fd = stdin.as_fd(); 35 + let cancel_pipe_r_fd = cancel_pipe_r.as_fd(); 36 + 37 + let mut all_fds = vec![ 38 + PollFd::new(write_pipe_r.as_fd(), PollFlags::POLLIN), 39 + PollFd::new(cancel_pipe_r.as_fd(), PollFlags::POLLIN), 40 + PollFd::new(user_stdin_fd, PollFlags::POLLIN), 41 + ]; 42 + 43 + loop { 44 + match poll(&mut all_fds, PollTimeout::NONE) { 45 + Ok(0) => {} // timeout, impossible 46 + Ok(_) => { 47 + // The user stdin pipe can be removed 48 + if all_fds.get(USER_POSITION).is_some() 49 + && let Some(events) = all_fds[USER_POSITION].revents() 50 + && events.contains(PollFlags::POLLIN) 51 + { 52 + trace!("Got stdin from user..."); 53 + let n = read(user_stdin_fd, &mut buffer).map_err(CommandError::PosixPipe)?; 54 + master_writer 55 + .write_all(&buffer[..n]) 56 + .map_err(CommandError::WritingMasterStdout)?; 57 + master_writer 58 + .flush() 59 + .map_err(CommandError::WritingMasterStdout)?; 60 + } 61 + 62 + if let Some(events) = all_fds[WRITER_POSITION].revents() 63 + && events.contains(PollFlags::POLLIN) 64 + { 65 + trace!("Got stdin from writer..."); 66 + let n = read(write_pipe_r, &mut buffer).map_err(CommandError::PosixPipe)?; 67 + master_writer 68 + .write_all(&buffer[..n]) 69 + .map_err(CommandError::WritingMasterStdout)?; 70 + master_writer 71 + .flush() 72 + .map_err(CommandError::WritingMasterStdout)?; 73 + } 74 + 75 + if let Some(events) = all_fds[SIGNAL_POSITION].revents() 76 + && events.contains(PollFlags::POLLIN) 77 + { 78 + let n = read(cancel_pipe_r_fd, &mut cancel_pipe_buf) 79 + .map_err(CommandError::PosixPipe)?; 80 + let message = &cancel_pipe_buf[..n]; 81 + 82 + trace!("Got byte from signal pipe: {message:?}"); 83 + 84 + if message == THREAD_QUIT_SIGNAL { 85 + return Ok(()); 86 + } 87 + 88 + if message == THREAD_BEGAN_SIGNAL { 89 + all_fds.remove(USER_POSITION); 90 + } 91 + } 92 + } 93 + Err(e) => { 94 + error!("Poll error: {e}"); 95 + break; 96 + } 97 + } 98 + } 99 + 100 + debug!("stdin_thread: goodbye"); 101 + Ok(()) 102 + }
+560
wire/lib/src/commands/pty/mod.rs
··· 1 + // SPDX-License-Identifier: AGPL-3.0-or-later 2 + // Copyright 2024-2025 wire Contributors 3 + 4 + use crate::commands::pty::output::{WatchStdoutArguments, handle_pty_stdout}; 5 + use aho_corasick::PatternID; 6 + use itertools::Itertools; 7 + use nix::sys::termios::{LocalFlags, SetArg, Termios, tcgetattr, tcsetattr}; 8 + use nix::unistd::pipe; 9 + use nix::unistd::write as posix_write; 10 + use portable_pty::{CommandBuilder, NativePtySystem, PtyPair, PtySize}; 11 + use rand::distr::Alphabetic; 12 + use std::collections::VecDeque; 13 + use std::sync::{LazyLock, Mutex}; 14 + use std::{ 15 + io::{Read, Write}, 16 + os::fd::{AsFd, OwnedFd}, 17 + sync::Arc, 18 + }; 19 + use tokio::sync::{oneshot, watch}; 20 + use tracing::instrument; 21 + use tracing::{Span, debug, trace}; 22 + 23 + use crate::commands::CommandArguments; 24 + use crate::commands::pty::input::watch_stdin_from_user; 25 + use crate::errors::CommandError; 26 + use crate::{STDIN_CLOBBER_LOCK, SubCommandModifiers}; 27 + use crate::{ 28 + commands::{ChildOutputMode, WireCommandChip}, 29 + errors::HiveLibError, 30 + hive::node::Target, 31 + }; 32 + 33 + mod input; 34 + mod logbuffer; 35 + mod output; 36 + 37 + type MasterWriter = Box<dyn Write + Send>; 38 + type MasterReader = Box<dyn Read + Send>; 39 + 40 + /// the underlying command began 41 + const THREAD_BEGAN_SIGNAL: &[u8; 1] = b"b"; 42 + const THREAD_QUIT_SIGNAL: &[u8; 1] = b"q"; 43 + 44 + type Child = Box<dyn portable_pty::Child + Send + Sync>; 45 + 46 + pub(crate) struct InteractiveChildChip { 47 + child: Child, 48 + 49 + cancel_stdin_pipe_w: OwnedFd, 50 + write_stdin_pipe_w: OwnedFd, 51 + 52 + stderr_collection: Arc<Mutex<VecDeque<String>>>, 53 + stdout_collection: Arc<Mutex<VecDeque<String>>>, 54 + 55 + original_command: String, 56 + 57 + status_receiver: watch::Receiver<Status>, 58 + stdout_handle: tokio::task::JoinHandle<Result<(), CommandError>>, 59 + } 60 + 61 + /// sets and reverts terminal options (the terminal user interaction is performed) 62 + /// reverts data when dropped 63 + struct StdinTermiosAttrGuard(Termios); 64 + 65 + #[derive(Debug)] 66 + enum Status { 67 + Running, 68 + Done { success: bool }, 69 + } 70 + 71 + #[derive(Debug)] 72 + enum SearchFindings { 73 + None, 74 + Started, 75 + Terminate, 76 + } 77 + 78 + static STARTED_PATTERN: LazyLock<PatternID> = LazyLock::new(|| PatternID::must(0)); 79 + static SUCCEEDED_PATTERN: LazyLock<PatternID> = LazyLock::new(|| PatternID::must(1)); 80 + static FAILED_PATTERN: LazyLock<PatternID> = LazyLock::new(|| PatternID::must(2)); 81 + 82 + /// substitutes STDOUT with #$line. stdout is far less common than stderr. 83 + const IO_SUBS: &str = "1> >(while IFS= read -r line; do echo \"#$line\"; done)"; 84 + 85 + fn create_ending_segment<S: AsRef<str>>( 86 + arguments: &CommandArguments<'_, S>, 87 + needles: &Needles, 88 + ) -> String { 89 + let Needles { 90 + succeed, 91 + fail, 92 + start, 93 + } = needles; 94 + 95 + format!( 96 + "echo -e '{succeed}' || echo '{failed}'", 97 + succeed = if matches!(arguments.output_mode, ChildOutputMode::Interactive) { 98 + format!( 99 + "{start}\\n{succeed}", 100 + start = String::from_utf8_lossy(start), 101 + succeed = String::from_utf8_lossy(succeed) 102 + ) 103 + } else { 104 + String::from_utf8_lossy(succeed).to_string() 105 + }, 106 + failed = String::from_utf8_lossy(fail) 107 + ) 108 + } 109 + 110 + fn create_starting_segment<S: AsRef<str>>( 111 + arguments: &CommandArguments<'_, S>, 112 + start_needle: &Arc<Vec<u8>>, 113 + ) -> String { 114 + if matches!(arguments.output_mode, ChildOutputMode::Interactive) { 115 + String::new() 116 + } else { 117 + format!( 118 + "echo '{start}' && ", 119 + start = String::from_utf8_lossy(start_needle) 120 + ) 121 + } 122 + } 123 + 124 + #[instrument(skip_all, name = "run-int", fields(elevated = %arguments.is_elevated()))] 125 + pub(crate) async fn interactive_command_with_env<S: AsRef<str>>( 126 + arguments: &CommandArguments<'_, S>, 127 + envs: std::collections::HashMap<String, String>, 128 + ) -> Result<InteractiveChildChip, HiveLibError> { 129 + print_authenticate_warning(arguments)?; 130 + 131 + let needles = create_needles(); 132 + let pty_system = NativePtySystem::default(); 133 + let pty_pair = portable_pty::PtySystem::openpty(&pty_system, PtySize::default()).unwrap(); 134 + setup_master(&pty_pair)?; 135 + 136 + let command_string = &format!( 137 + "{starting}{command} {flags} {IO_SUBS} && {ending}", 138 + command = arguments.command_string.as_ref(), 139 + flags = match arguments.output_mode { 140 + ChildOutputMode::Nix => "--log-format internal-json", 141 + ChildOutputMode::Generic | ChildOutputMode::Interactive => "", 142 + }, 143 + starting = create_starting_segment(arguments, &needles.start), 144 + ending = create_ending_segment(arguments, &needles) 145 + ); 146 + 147 + debug!("{command_string}"); 148 + 149 + let mut command = build_command(arguments, command_string)?; 150 + 151 + // give command all env vars 152 + for (key, value) in envs { 153 + command.env(key, value); 154 + } 155 + 156 + let clobber_guard = STDIN_CLOBBER_LOCK.acquire().await.unwrap(); 157 + let _guard = StdinTermiosAttrGuard::new().map_err(HiveLibError::CommandError)?; 158 + let child = pty_pair 159 + .slave 160 + .spawn_command(command) 161 + .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 162 + 163 + // Release any handles owned by the slave: we don't need it now 164 + // that we've spawned the child. 165 + drop(pty_pair.slave); 166 + 167 + let reader = pty_pair 168 + .master 169 + .try_clone_reader() 170 + .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 171 + let master_writer = pty_pair 172 + .master 173 + .take_writer() 174 + .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 175 + 176 + let stderr_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 177 + let stdout_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 178 + let (began_tx, began_rx) = oneshot::channel::<()>(); 179 + let (status_sender, status_receiver) = watch::channel(Status::Running); 180 + 181 + let stdout_handle = { 182 + let arguments = WatchStdoutArguments { 183 + began_tx, 184 + reader, 185 + needles, 186 + output_mode: arguments.output_mode, 187 + stderr_collection: stderr_collection.clone(), 188 + stdout_collection: stdout_collection.clone(), 189 + span: Span::current(), 190 + log_stdout: arguments.log_stdout, 191 + status_sender, 192 + }; 193 + 194 + tokio::task::spawn_blocking(move || handle_pty_stdout(arguments)) 195 + }; 196 + 197 + let (write_stdin_pipe_r, write_stdin_pipe_w) = 198 + pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 199 + let (cancel_stdin_pipe_r, cancel_stdin_pipe_w) = 200 + pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 201 + 202 + tokio::task::spawn_blocking(move || { 203 + watch_stdin_from_user( 204 + &cancel_stdin_pipe_r, 205 + master_writer, 206 + &write_stdin_pipe_r, 207 + Span::current(), 208 + ) 209 + }); 210 + 211 + debug!("Setup threads"); 212 + 213 + let () = began_rx 214 + .await 215 + .map_err(|x| HiveLibError::CommandError(CommandError::OneshotRecvError(x)))?; 216 + 217 + drop(clobber_guard); 218 + 219 + if arguments.keep_stdin_open { 220 + trace!("Sending THREAD_BEGAN_SIGNAL"); 221 + 222 + posix_write(&cancel_stdin_pipe_w, THREAD_BEGAN_SIGNAL) 223 + .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 224 + } else { 225 + trace!("Sending THREAD_QUIT_SIGNAL"); 226 + 227 + posix_write(&cancel_stdin_pipe_w, THREAD_QUIT_SIGNAL) 228 + .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 229 + } 230 + 231 + Ok(InteractiveChildChip { 232 + child, 233 + cancel_stdin_pipe_w, 234 + write_stdin_pipe_w, 235 + stderr_collection, 236 + stdout_collection, 237 + original_command: arguments.command_string.as_ref().to_string(), 238 + status_receiver, 239 + stdout_handle, 240 + }) 241 + } 242 + 243 + fn print_authenticate_warning<S: AsRef<str>>( 244 + arguments: &CommandArguments<S>, 245 + ) -> Result<(), HiveLibError> { 246 + if !arguments.is_elevated() { 247 + return Ok(()); 248 + } 249 + 250 + eprintln!( 251 + "{} | Authenticate for \"sudo {}\":", 252 + arguments 253 + .target 254 + .map_or(Ok("localhost (!)".to_string()), |target| Ok(format!( 255 + "{}@{}:{}", 256 + target.user, 257 + target.get_preferred_host()?, 258 + target.port 259 + )))?, 260 + arguments.command_string.as_ref() 261 + ); 262 + 263 + Ok(()) 264 + } 265 + 266 + struct Needles { 267 + succeed: Arc<Vec<u8>>, 268 + fail: Arc<Vec<u8>>, 269 + start: Arc<Vec<u8>>, 270 + } 271 + 272 + fn create_needles() -> Needles { 273 + let tmp_prefix = rand::distr::SampleString::sample_string(&Alphabetic, &mut rand::rng(), 5); 274 + 275 + Needles { 276 + succeed: Arc::new(format!("{tmp_prefix}_W_Q").as_bytes().to_vec()), 277 + fail: Arc::new(format!("{tmp_prefix}_W_F").as_bytes().to_vec()), 278 + start: Arc::new(format!("{tmp_prefix}_W_S").as_bytes().to_vec()), 279 + } 280 + } 281 + 282 + fn setup_master(pty_pair: &PtyPair) -> Result<(), HiveLibError> { 283 + if let Some(fd) = pty_pair.master.as_raw_fd() { 284 + // convert raw fd to a BorrowedFd 285 + // safe as `fd` is dropped well before `pty_pair.master` 286 + let fd = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(fd) }; 287 + let mut termios = 288 + tcgetattr(fd).map_err(|x| HiveLibError::CommandError(CommandError::TermAttrs(x)))?; 289 + 290 + termios.local_flags &= !LocalFlags::ECHO; 291 + // Key agent does not work well without canonical mode 292 + termios.local_flags &= !LocalFlags::ICANON; 293 + // Actually quit 294 + termios.local_flags &= !LocalFlags::ISIG; 295 + 296 + tcsetattr(fd, SetArg::TCSANOW, &termios) 297 + .map_err(|x| HiveLibError::CommandError(CommandError::TermAttrs(x)))?; 298 + } 299 + 300 + Ok(()) 301 + } 302 + 303 + fn build_command<S: AsRef<str>>( 304 + arguments: &CommandArguments<'_, S>, 305 + command_string: &String, 306 + ) -> Result<CommandBuilder, HiveLibError> { 307 + let mut command = if let Some(target) = arguments.target { 308 + let mut command = create_int_ssh_command(target, arguments.modifiers)?; 309 + 310 + // force ssh to use our pesudo terminal 311 + command.arg("-tt"); 312 + 313 + command 314 + } else { 315 + let mut command = portable_pty::CommandBuilder::new("sh"); 316 + 317 + command.arg("-c"); 318 + 319 + command 320 + }; 321 + 322 + if arguments.is_elevated() { 323 + command.arg(format!("sudo -u root -- sh -c '{command_string}'")); 324 + } else { 325 + command.arg(command_string); 326 + } 327 + 328 + Ok(command) 329 + } 330 + 331 + impl WireCommandChip for InteractiveChildChip { 332 + type ExitStatus = (portable_pty::ExitStatus, String); 333 + 334 + #[instrument(skip_all)] 335 + async fn wait_till_success(mut self) -> Result<Self::ExitStatus, CommandError> { 336 + drop(self.write_stdin_pipe_w); 337 + 338 + let exit_status = tokio::task::spawn_blocking(move || self.child.wait()) 339 + .await 340 + .map_err(CommandError::JoinError)? 341 + .map_err(CommandError::WaitForStatus)?; 342 + 343 + debug!("exit_status: {exit_status:?}"); 344 + 345 + self.stdout_handle 346 + .await 347 + .map_err(|_| CommandError::ThreadPanic)??; 348 + 349 + let status = self 350 + .status_receiver 351 + .wait_for(|value| matches!(value, Status::Done { .. })) 352 + .await 353 + .unwrap(); 354 + 355 + let _ = posix_write(&self.cancel_stdin_pipe_w, THREAD_QUIT_SIGNAL); 356 + 357 + if let Status::Done { success: true } = *status { 358 + let logs = self 359 + .stdout_collection 360 + .lock() 361 + .unwrap() 362 + .iter() 363 + .rev() 364 + .map(|x| x.trim()) 365 + .join("\n"); 366 + 367 + return Ok((exit_status, logs)); 368 + } 369 + 370 + debug!("child did not succeed"); 371 + 372 + let logs = self 373 + .stderr_collection 374 + .lock() 375 + .unwrap() 376 + .iter() 377 + .rev() 378 + .join("\n"); 379 + 380 + Err(CommandError::CommandFailed { 381 + command_ran: self.original_command, 382 + logs, 383 + code: format!("code {}", exit_status.exit_code()), 384 + reason: match *status { 385 + Status::Done { .. } => "marked-unsuccessful", 386 + Status::Running => "child-crashed-before-succeeding", 387 + }, 388 + }) 389 + } 390 + 391 + async fn write_stdin(&mut self, data: Vec<u8>) -> Result<(), HiveLibError> { 392 + trace!("Writing {} bytes to stdin", data.len()); 393 + 394 + posix_write(&self.write_stdin_pipe_w, &data) 395 + .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 396 + 397 + Ok(()) 398 + } 399 + } 400 + 401 + impl StdinTermiosAttrGuard { 402 + fn new() -> Result<Self, CommandError> { 403 + let stdin = std::io::stdin(); 404 + let stdin_fd = stdin.as_fd(); 405 + 406 + let mut termios = tcgetattr(stdin_fd).map_err(CommandError::TermAttrs)?; 407 + let original_termios = termios.clone(); 408 + 409 + termios.local_flags &= !(LocalFlags::ECHO | LocalFlags::ICANON); 410 + tcsetattr(stdin_fd, SetArg::TCSANOW, &termios).map_err(CommandError::TermAttrs)?; 411 + 412 + Ok(StdinTermiosAttrGuard(original_termios)) 413 + } 414 + } 415 + 416 + impl Drop for StdinTermiosAttrGuard { 417 + fn drop(&mut self) { 418 + let stdin = std::io::stdin(); 419 + let stdin_fd = stdin.as_fd(); 420 + 421 + let _ = tcsetattr(stdin_fd, SetArg::TCSANOW, &self.0); 422 + } 423 + } 424 + 425 + fn create_int_ssh_command( 426 + target: &Target, 427 + modifiers: SubCommandModifiers, 428 + ) -> Result<portable_pty::CommandBuilder, HiveLibError> { 429 + let mut command = portable_pty::CommandBuilder::new("ssh"); 430 + command.args(target.create_ssh_args(modifiers, false, false)?); 431 + command.arg(target.get_preferred_host()?.to_string()); 432 + Ok(command) 433 + } 434 + 435 + #[cfg(test)] 436 + mod tests { 437 + use aho_corasick::AhoCorasick; 438 + use tokio::sync::oneshot::error::TryRecvError; 439 + 440 + use crate::commands::pty::output::handle_rawmode_data; 441 + 442 + use super::*; 443 + use std::assert_matches::assert_matches; 444 + 445 + #[test] 446 + fn test_rawmode_data() { 447 + let aho_corasick = AhoCorasick::builder() 448 + .ascii_case_insensitive(false) 449 + .match_kind(aho_corasick::MatchKind::LeftmostFirst) 450 + .build(["START_NEEDLE", "SUCCEEDED_NEEDLE", "FAILED_NEEDLE"]) 451 + .unwrap(); 452 + let mut stderr = vec![]; 453 + let (began_tx, mut began_rx) = oneshot::channel::<()>(); 454 + let mut began_tx = Some(began_tx); 455 + let (status_sender, _) = watch::channel(Status::Running); 456 + 457 + // each "Bla" is 4 bytes. 458 + let buffer = "bla bla bla START_NEEDLE bla bla bla".as_bytes(); 459 + let mut raw_mode_buffer = vec![]; 460 + 461 + // handle 1 "bla" 462 + assert_matches!( 463 + handle_rawmode_data( 464 + &mut stderr, 465 + buffer, 466 + 4, 467 + &mut raw_mode_buffer, 468 + &aho_corasick, 469 + &status_sender, 470 + &mut began_tx 471 + ), 472 + Ok(SearchFindings::None) 473 + ); 474 + assert_matches!(began_rx.try_recv(), Err(TryRecvError::Empty)); 475 + assert!(began_tx.is_some()); 476 + assert_eq!(raw_mode_buffer, b"bla "); 477 + assert_matches!(*status_sender.borrow(), Status::Running); 478 + 479 + let buffer = &buffer[4..]; 480 + 481 + // handle 2 "bla"'s and half a "START_NEEDLE" 482 + let n = 4 + 4 + 6; 483 + assert_matches!( 484 + handle_rawmode_data( 485 + &mut stderr, 486 + buffer, 487 + n, 488 + &mut raw_mode_buffer, 489 + &aho_corasick, 490 + &status_sender, 491 + &mut began_tx 492 + ), 493 + Ok(SearchFindings::None) 494 + ); 495 + assert_matches!(began_rx.try_recv(), Err(TryRecvError::Empty)); 496 + assert!(began_tx.is_some()); 497 + assert_matches!(*status_sender.borrow(), Status::Running); 498 + assert_eq!(raw_mode_buffer, b"bla bla bla START_"); 499 + 500 + let buffer = &buffer[n..]; 501 + 502 + // handle rest of the data 503 + let n = buffer.len(); 504 + assert_matches!( 505 + handle_rawmode_data( 506 + &mut stderr, 507 + buffer, 508 + n, 509 + &mut raw_mode_buffer, 510 + &aho_corasick, 511 + &status_sender, 512 + &mut began_tx 513 + ), 514 + Ok(SearchFindings::Started) 515 + ); 516 + assert_matches!(began_rx.try_recv(), Ok(())); 517 + assert_matches!(began_tx, None); 518 + assert_eq!(raw_mode_buffer, b"bla bla bla START_NEEDLE bla bla bla"); 519 + assert_matches!(*status_sender.borrow(), Status::Running); 520 + 521 + // test failed needle 522 + let buffer = "bla FAILED_NEEDLE bla".as_bytes(); 523 + let mut raw_mode_buffer = vec![]; 524 + 525 + let n = buffer.len(); 526 + assert_matches!( 527 + handle_rawmode_data( 528 + &mut stderr, 529 + buffer, 530 + n, 531 + &mut raw_mode_buffer, 532 + &aho_corasick, 533 + &status_sender, 534 + &mut began_tx 535 + ), 536 + Ok(SearchFindings::Terminate) 537 + ); 538 + assert_matches!(*status_sender.borrow(), Status::Done { success: false }); 539 + 540 + // test succeed needle 541 + let buffer = "bla SUCCEEDED_NEEDLE bla".as_bytes(); 542 + let mut raw_mode_buffer = vec![]; 543 + let (status_sender, _) = watch::channel(Status::Running); 544 + 545 + let n = buffer.len(); 546 + assert_matches!( 547 + handle_rawmode_data( 548 + &mut stderr, 549 + buffer, 550 + n, 551 + &mut raw_mode_buffer, 552 + &aho_corasick, 553 + &status_sender, 554 + &mut began_tx 555 + ), 556 + Ok(SearchFindings::Terminate) 557 + ); 558 + assert_matches!(*status_sender.borrow(), Status::Done { success: true }); 559 + } 560 + }
+259
wire/lib/src/commands/pty/output.rs
··· 1 + // SPDX-License-Identifier: AGPL-3.0-or-later 2 + // Copyright 2024-2025 wire Contributors 3 + 4 + use crate::{ 5 + commands::{ 6 + ChildOutputMode, 7 + pty::{ 8 + FAILED_PATTERN, Needles, STARTED_PATTERN, SUCCEEDED_PATTERN, SearchFindings, Status, 9 + logbuffer::LogBuffer, 10 + }, 11 + }, 12 + errors::CommandError, 13 + }; 14 + use aho_corasick::AhoCorasick; 15 + use std::{ 16 + collections::VecDeque, 17 + io::Write, 18 + sync::{Arc, Mutex}, 19 + }; 20 + use tokio::sync::{oneshot, watch}; 21 + use tracing::{Span, debug, instrument}; 22 + 23 + pub(super) struct WatchStdoutArguments { 24 + pub began_tx: oneshot::Sender<()>, 25 + pub reader: super::MasterReader, 26 + pub needles: Needles, 27 + pub output_mode: ChildOutputMode, 28 + pub stderr_collection: Arc<Mutex<VecDeque<String>>>, 29 + pub stdout_collection: Arc<Mutex<VecDeque<String>>>, 30 + pub status_sender: watch::Sender<Status>, 31 + pub span: Span, 32 + pub log_stdout: bool, 33 + } 34 + 35 + /// Handles data from the PTY, and logs or prompts the user depending on the state 36 + /// of the command. 37 + /// 38 + /// Emits a message on the `began_tx` when the command is considered started. 39 + /// 40 + /// Records stderr and stdout when it is considered notable (all stdout, last few stderr messages) 41 + #[instrument(skip_all, name = "log", parent = arguments.span)] 42 + pub(super) fn handle_pty_stdout(arguments: WatchStdoutArguments) -> Result<(), CommandError> { 43 + let WatchStdoutArguments { 44 + began_tx, 45 + mut reader, 46 + needles, 47 + output_mode, 48 + stdout_collection, 49 + stderr_collection, 50 + status_sender, 51 + log_stdout, 52 + .. 53 + } = arguments; 54 + 55 + let aho_corasick = AhoCorasick::builder() 56 + .ascii_case_insensitive(false) 57 + .match_kind(aho_corasick::MatchKind::LeftmostFirst) 58 + .build([ 59 + needles.start.as_ref(), 60 + needles.succeed.as_ref(), 61 + needles.fail.as_ref(), 62 + ]) 63 + .unwrap(); 64 + 65 + let mut buffer = [0u8; 1024]; 66 + let mut stderr = std::io::stderr(); 67 + let mut began = false; 68 + let mut log_buffer = LogBuffer::new(); 69 + let mut raw_mode_buffer = Vec::new(); 70 + let mut belled = false; 71 + let mut began_tx = Some(began_tx); 72 + 73 + 'outer: loop { 74 + match reader.read(&mut buffer) { 75 + Ok(0) => break 'outer, 76 + Ok(n) => { 77 + if !began { 78 + let findings = handle_rawmode_data( 79 + &mut stderr, 80 + &buffer, 81 + n, 82 + &mut raw_mode_buffer, 83 + &aho_corasick, 84 + &status_sender, 85 + &mut began_tx, 86 + )?; 87 + 88 + match findings { 89 + SearchFindings::Terminate => break 'outer, 90 + SearchFindings::Started => { 91 + began = true; 92 + continue; 93 + } 94 + SearchFindings::None => {} 95 + } 96 + 97 + if belled { 98 + continue; 99 + } 100 + 101 + stderr 102 + .write(b"\x07") 103 + .map_err(CommandError::WritingClientStderr)?; 104 + stderr.flush().map_err(CommandError::WritingClientStderr)?; 105 + 106 + belled = true; 107 + 108 + continue; 109 + } 110 + 111 + log_buffer.process_slice(&buffer[..n]); 112 + 113 + while let Some(mut line) = log_buffer.next_line() { 114 + let findings = 115 + search_string(&aho_corasick, &line, &status_sender, &mut began_tx); 116 + 117 + match findings { 118 + SearchFindings::Terminate => break 'outer, 119 + SearchFindings::Started => { 120 + began = true; 121 + continue; 122 + } 123 + SearchFindings::None => {} 124 + } 125 + 126 + handle_normal_data( 127 + &stderr_collection, 128 + &stdout_collection, 129 + &mut line, 130 + log_stdout, 131 + output_mode, 132 + ); 133 + } 134 + } 135 + Err(e) => { 136 + eprintln!("Error reading from PTY: {e}"); 137 + break; 138 + } 139 + } 140 + } 141 + 142 + began_tx.map(|began_tx| began_tx.send(())); 143 + 144 + // failsafe if there were errors or the reader stopped 145 + if matches!(*status_sender.borrow(), Status::Running) { 146 + status_sender.send_replace(Status::Done { success: false }); 147 + } 148 + 149 + debug!("stdout: goodbye"); 150 + 151 + Ok(()) 152 + } 153 + 154 + /// handles raw data, prints to stderr when a prompt is detected 155 + pub(super) fn handle_rawmode_data<W: std::io::Write>( 156 + stderr: &mut W, 157 + buffer: &[u8], 158 + n: usize, 159 + raw_mode_buffer: &mut Vec<u8>, 160 + aho_corasick: &AhoCorasick, 161 + status_sender: &watch::Sender<Status>, 162 + began_tx: &mut Option<oneshot::Sender<()>>, 163 + ) -> Result<SearchFindings, CommandError> { 164 + raw_mode_buffer.extend_from_slice(&buffer[..n]); 165 + 166 + let findings = search_string(aho_corasick, raw_mode_buffer, status_sender, began_tx); 167 + 168 + if !matches!(findings, SearchFindings::None) { 169 + return Ok(findings); 170 + } 171 + 172 + stderr 173 + .write_all(&buffer[..n]) 174 + .map_err(CommandError::WritingClientStderr)?; 175 + 176 + stderr.flush().map_err(CommandError::WritingClientStderr)?; 177 + 178 + Ok(findings) 179 + } 180 + 181 + /// handles data when the command is considered "started", logs and records errors as appropriate 182 + fn handle_normal_data( 183 + stderr_collection: &Arc<Mutex<VecDeque<String>>>, 184 + stdout_collection: &Arc<Mutex<VecDeque<String>>>, 185 + line: &mut [u8], 186 + log_stdout: bool, 187 + output_mode: ChildOutputMode, 188 + ) { 189 + if line.starts_with(b"#") { 190 + let stripped = &mut line[1..]; 191 + 192 + if log_stdout { 193 + output_mode.trace_slice(stripped); 194 + } 195 + 196 + let mut queue = stdout_collection.lock().unwrap(); 197 + queue.push_front(String::from_utf8_lossy(stripped).to_string()); 198 + return; 199 + } 200 + 201 + let log = output_mode.trace_slice(line); 202 + 203 + if let Some(error_msg) = log { 204 + let mut queue = stderr_collection.lock().unwrap(); 205 + 206 + // add at most 20 message to the front, drop the rest. 207 + queue.push_front(error_msg); 208 + queue.truncate(20); 209 + } 210 + } 211 + 212 + /// returns true if the command is considered stopped 213 + fn search_string( 214 + aho_corasick: &AhoCorasick, 215 + haystack: &[u8], 216 + status_sender: &watch::Sender<Status>, 217 + began_tx: &mut Option<oneshot::Sender<()>>, 218 + ) -> SearchFindings { 219 + let searched = aho_corasick 220 + .find_iter(haystack) 221 + .map(|x| x.pattern()) 222 + .collect::<Vec<_>>(); 223 + 224 + let started = if searched.contains(&STARTED_PATTERN) { 225 + debug!("start needle was found, switching mode..."); 226 + if let Some(began_tx) = began_tx.take() { 227 + let _ = began_tx.send(()); 228 + } 229 + true 230 + } else { 231 + false 232 + }; 233 + 234 + let succeeded = if searched.contains(&SUCCEEDED_PATTERN) { 235 + debug!("succeed needle was found, marking child as succeeding."); 236 + status_sender.send_replace(Status::Done { success: true }); 237 + true 238 + } else { 239 + false 240 + }; 241 + 242 + let failed = if searched.contains(&FAILED_PATTERN) { 243 + debug!("failed needle was found, elevated child did not succeed."); 244 + status_sender.send_replace(Status::Done { success: false }); 245 + true 246 + } else { 247 + false 248 + }; 249 + 250 + if succeeded || failed { 251 + return SearchFindings::Terminate; 252 + } 253 + 254 + if started { 255 + return SearchFindings::Started; 256 + } 257 + 258 + SearchFindings::None 259 + }
+7
wire/lib/src/errors.rs
··· 276 276 )] 277 277 #[error("$XDG_RUNTIME_DIR could not be used.")] 278 278 RuntimeDirectoryMissing(#[source] std::env::VarError), 279 + 280 + #[diagnostic( 281 + code(wire::command::OneshotRecvError), 282 + url("{DOCS_URL}#{}", self.code().unwrap()) 283 + )] 284 + #[error("Error waiting for begin message")] 285 + OneshotRecvError(#[source] tokio::sync::oneshot::error::RecvError), 279 286 } 280 287 281 288 #[derive(Debug, Diagnostic, Error)]
+2 -1
wire/lib/src/hive/node.rs
··· 218 218 &CommandArguments::new(command_string, modifiers) 219 219 .log_stdout() 220 220 .mode(crate::commands::ChildOutputMode::Interactive), 221 - )?; 221 + ) 222 + .await?; 222 223 223 224 output.wait_till_success().await.map_err(|source| { 224 225 HiveLibError::NetworkError(NetworkError::HostUnreachable {
+6 -3
wire/lib/src/hive/steps/activate.rs
··· 58 58 Some(&ctx.node.target) 59 59 }) 60 60 .elevated(ctx.node), 61 - )?; 61 + ) 62 + .await?; 62 63 63 64 let _ = child 64 65 .wait_till_success() ··· 113 114 }) 114 115 .elevated(ctx.node) 115 116 .log_stdout(), 116 - )?; 117 + ) 118 + .await?; 117 119 118 120 let result = child.wait_till_success().await; 119 121 ··· 136 138 .log_stdout() 137 139 .on_target(Some(&ctx.node.target)) 138 140 .elevated(ctx.node), 139 - )?; 141 + ) 142 + .await?; 140 143 141 144 // consume result, impossible to know if the machine failed to reboot or we 142 145 // simply disconnected
+2 -1
wire/lib/src/hive/steps/build.rs
··· 47 47 .mode(crate::commands::ChildOutputMode::Nix) 48 48 .log_stdout(), 49 49 std::collections::HashMap::new(), 50 - )? 50 + ) 51 + .await? 51 52 .wait_till_success() 52 53 .await 53 54 .map_err(|source| HiveLibError::NixBuildError {
+2 -1
wire/lib/src/hive/steps/keys.rs
··· 261 261 .elevated(ctx.node) 262 262 .keep_stdin_open() 263 263 .log_stdout(), 264 - )?; 264 + ) 265 + .await?; 265 266 266 267 let mut writer = SimpleLengthDelimWriter::new(async |data| child.write_stdin(data).await); 267 268
+4 -6
wire/lib/src/lib.rs
··· 4 4 #![feature(assert_matches)] 5 5 #![feature(iter_intersperse)] 6 6 7 - use std::{ 8 - io::IsTerminal, 9 - sync::{Arc, LazyLock, Mutex}, 10 - }; 7 + use std::{io::IsTerminal, sync::LazyLock}; 8 + 9 + use tokio::sync::Semaphore; 11 10 12 11 use crate::{errors::HiveLibError, hive::node::Name}; 13 12 ··· 54 53 GetTopLevel(&'a Name), 55 54 } 56 55 57 - pub static STDIN_CLOBBER_LOCK: LazyLock<Arc<Mutex<()>>> = 58 - LazyLock::new(|| Arc::new(Mutex::new(()))); 56 + pub static STDIN_CLOBBER_LOCK: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));