ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor WireCommand trait out

+295 -395
+22 -14
wire/lib/src/commands/common.rs
··· 8 8 9 9 use crate::{ 10 10 EvalGoal, SubCommandModifiers, 11 - commands::{ChildOutputMode, CommandArguments, WireCommand, WireCommandChip, get_command}, 11 + commands::{ 12 + ChildOutputMode, CommandArguments, Either, WireCommandChip, run_command, run_command_with_env 13 + }, 12 14 errors::HiveLibError, 13 15 hive::{ 14 16 HiveLocation, ··· 23 25 modifiers: SubCommandModifiers, 24 26 clobber_lock: Arc<Mutex<()>>, 25 27 ) -> Result<(), HiveLibError> { 26 - let mut command = get_command(None, ChildOutputMode::Nix, modifiers).await?; 27 - 28 28 let command_string = format!( 29 29 "nix --extra-experimental-features nix-command \ 30 30 copy --substitute-on-destination --to ssh://{user}@{host} {path}", ··· 36 36 } 37 37 ); 38 38 39 - let child = command.run_command_with_env( 40 - CommandArguments { 39 + let child = run_command_with_env( 40 + &CommandArguments { 41 + target: None, 42 + output_mode: ChildOutputMode::Nix, 43 + modifiers, 41 44 command_string, 42 45 keep_stdin_open: false, 43 46 elevated: false, ··· 66 69 modifiers: SubCommandModifiers, 67 70 clobber_lock: Arc<Mutex<()>>, 68 71 ) -> Result<String, HiveLibError> { 69 - let mut command = get_command(None, ChildOutputMode::Nix, modifiers).await?; 70 - 71 72 let attribute = match location { 72 73 HiveLocation::Flake(uri) => { 73 74 format!( ··· 101 102 }, 102 103 ); 103 104 104 - let child = command.run_command(CommandArguments { 105 - command_string, 106 - keep_stdin_open: false, 107 - elevated: false, 108 - clobber_lock, 109 - })?; 105 + let child = run_command( 106 + &CommandArguments { 107 + target: None, 108 + output_mode: ChildOutputMode::Nix, 109 + modifiers, 110 + command_string, 111 + keep_stdin_open: false, 112 + elevated: false, 113 + clobber_lock, 114 + } 115 + )?; 110 116 111 117 child 112 118 .wait_till_success() 113 119 .await 114 120 .map_err(|source| HiveLibError::NixEvalError { attribute, source }) 115 - .map(|x| x.get_stdout().clone()) 121 + .map(|x| match x { 122 + Either::Left((_, stdout)) | Either::Right((_, stdout)) => stdout 123 + }) 116 124 }
+126 -148
wire/lib/src/commands/interactive.rs
··· 9 9 use portable_pty::{CommandBuilder, NativePtySystem, PtyPair, PtySize}; 10 10 use rand::distr::Alphabetic; 11 11 use std::collections::VecDeque; 12 - use std::fmt::Debug; 13 12 use std::sync::mpsc::{self, Sender}; 14 13 use std::sync::{Condvar, Mutex}; 15 14 use std::thread::JoinHandle; ··· 27 26 use crate::errors::CommandError; 28 27 use crate::nix_log::{SubcommandLog, get_errorish_message}; 29 28 use crate::{ 30 - commands::{ChildOutputMode, WireCommand, WireCommandChip}, 29 + commands::{ChildOutputMode, WireCommandChip}, 31 30 errors::HiveLibError, 32 31 hive::node::Target, 33 32 }; ··· 36 35 type MasterReader = Box<dyn Read + Send>; 37 36 type Child = Box<dyn portable_pty::Child + Send + Sync>; 38 37 39 - #[derive(Debug)] 40 - pub(crate) struct InteractiveCommand<'t> { 41 - target: Option<&'t Target>, 42 - output_mode: Arc<ChildOutputMode>, 43 - succeed_needle: Arc<String>, 44 - failed_needle: Arc<String>, 45 - start_needle: Arc<String>, 46 - modifiers: SubCommandModifiers, 47 - } 48 - 49 38 pub(crate) struct InteractiveChildChip { 50 39 child: Child, 51 40 ··· 75 64 succeed_needle: Arc<String>, 76 65 failed_needle: Arc<String>, 77 66 start_needle: Arc<String>, 78 - output_mode: Arc<ChildOutputMode>, 67 + output_mode: ChildOutputMode, 79 68 stderr_collection: Arc<Mutex<VecDeque<String>>>, 80 69 stdout_collection: Arc<Mutex<VecDeque<String>>>, 81 70 completion_status: Arc<CompletionStatus>, ··· 89 78 /// substitutes STDOUT with #$line. stdout is far less common than stderr. 90 79 const IO_SUBS: &str = "1> >(while IFS= read -r line; do echo \"#$line\"; done)"; 91 80 92 - impl<'t> WireCommand<'t> for InteractiveCommand<'t> { 93 - type ChildChip = InteractiveChildChip; 81 + #[instrument(level = "trace", skip_all, name = "run", fields(elevated = %arguments.elevated))] 82 + pub(crate) fn interactive_command_with_env<S: AsRef<str>>( 83 + arguments: &CommandArguments<S>, 84 + envs: std::collections::HashMap<String, String>, 85 + ) -> Result<InteractiveChildChip, HiveLibError> { 86 + let (start_needle, succeed_needle, failed_needle) = create_needles(); 94 87 95 - async fn spawn_new( 96 - target: Option<&'t Target>, 97 - output_mode: ChildOutputMode, 98 - modifiers: SubCommandModifiers, 99 - ) -> Result<InteractiveCommand<'t>, HiveLibError> { 100 - let output_mode = Arc::new(output_mode); 101 - let tmp_prefix = rand::distr::SampleString::sample_string(&Alphabetic, &mut rand::rng(), 5); 102 - let succeed_needle = Arc::new(format!("{tmp_prefix}_WIRE_QUIT")); 103 - let failed_needle = Arc::new(format!("{tmp_prefix}_WIRE_FAIL")); 104 - let start_needle = Arc::new(format!("{tmp_prefix}_WIRE_START")); 88 + if arguments.elevated { 89 + eprintln!( 90 + "Please authenticate for \"sudo {}\"", 91 + arguments.command_string.as_ref(), 92 + ); 93 + } 105 94 106 - Ok(Self { 107 - target, 108 - output_mode, 109 - succeed_needle, 110 - failed_needle, 111 - start_needle, 112 - modifiers, 113 - }) 114 - } 95 + let pty_system = NativePtySystem::default(); 96 + let pty_pair = portable_pty::PtySystem::openpty(&pty_system, PtySize::default()).unwrap(); 97 + setup_master(&pty_pair)?; 115 98 116 - #[instrument(level = "trace", skip_all, name = "run", fields(elevated = %arguments.elevated))] 117 - fn run_command_with_env<S: AsRef<str> + Debug>( 118 - &mut self, 119 - arguments: CommandArguments<S>, 120 - envs: std::collections::HashMap<String, String>, 121 - ) -> Result<Self::ChildChip, HiveLibError> { 122 - if arguments.elevated { 123 - eprintln!( 124 - "Please authenticate for \"sudo {}\"", 125 - arguments.command_string.as_ref(), 126 - ); 99 + let command_string = &format!( 100 + "echo '{start}' && {command} {flags} {IO_SUBS} && echo '{succeed}' || echo '{failed}'", 101 + start = start_needle, 102 + succeed = succeed_needle, 103 + failed = failed_needle, 104 + command = arguments.command_string.as_ref(), 105 + flags = match arguments.output_mode { 106 + ChildOutputMode::Nix => "--log-format internal-json", 107 + ChildOutputMode::Raw => "", 127 108 } 109 + ); 128 110 129 - let pty_system = NativePtySystem::default(); 130 - let pty_pair = portable_pty::PtySystem::openpty(&pty_system, PtySize::default()).unwrap(); 131 - setup_master(&pty_pair)?; 111 + debug!("{command_string}"); 132 112 133 - let command_string = &format!( 134 - "echo '{start}' && {command} {flags} {IO_SUBS} && echo '{succeed}' || echo '{failed}'", 135 - start = self.start_needle, 136 - succeed = self.succeed_needle, 137 - failed = self.failed_needle, 138 - command = arguments.command_string.as_ref(), 139 - flags = match *self.output_mode { 140 - ChildOutputMode::Nix => "--log-format internal-json", 141 - ChildOutputMode::Raw => "", 142 - } 143 - ); 113 + let mut command = build_command(arguments)?; 144 114 145 - debug!("{command_string}"); 115 + // give command all env vars 116 + for (key, value) in envs { 117 + command.env(key, value); 118 + } 146 119 147 - let mut command = build_command(&arguments.command_string, self.target, arguments.elevated, self.modifiers)?; 120 + let clobber_guard = arguments.clobber_lock.lock().unwrap(); 121 + let _guard = StdinTermiosAttrGuard::new().map_err(HiveLibError::CommandError)?; 122 + let child = pty_pair 123 + .slave 124 + .spawn_command(command) 125 + .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 148 126 149 - // give command all env vars 150 - for (key, value) in envs { 151 - command.env(key, value); 152 - } 127 + // Release any handles owned by the slave: we don't need it now 128 + // that we've spawned the child. 129 + drop(pty_pair.slave); 153 130 154 - let clobber_guard = arguments.clobber_lock.lock().unwrap(); 155 - let _guard = StdinTermiosAttrGuard::new().map_err(HiveLibError::CommandError)?; 156 - let child = pty_pair 157 - .slave 158 - .spawn_command(command) 159 - .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 131 + let reader = pty_pair 132 + .master 133 + .try_clone_reader() 134 + .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 135 + let master_writer = pty_pair 136 + .master 137 + .take_writer() 138 + .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 160 139 161 - // Release any handles owned by the slave: we don't need it now 162 - // that we've spawned the child. 163 - drop(pty_pair.slave); 140 + let stderr_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 141 + let stdout_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 142 + let (began_tx, began_rx) = mpsc::channel::<()>(); 143 + let completion_status = Arc::new(CompletionStatus::new()); 164 144 165 - let reader = pty_pair 166 - .master 167 - .try_clone_reader() 168 - .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 169 - let master_writer = pty_pair 170 - .master 171 - .take_writer() 172 - .map_err(|x| HiveLibError::CommandError(CommandError::PortablePty(x)))?; 145 + let stdout_handle = { 146 + let arguments = WatchStdoutArguments { 147 + began_tx, 148 + reader, 149 + succeed_needle: succeed_needle.clone(), 150 + failed_needle: failed_needle.clone(), 151 + start_needle: start_needle.clone(), 152 + output_mode: arguments.output_mode, 153 + stderr_collection: stderr_collection.clone(), 154 + stdout_collection: stdout_collection.clone(), 155 + completion_status: completion_status.clone(), 156 + span: Span::current(), 157 + }; 173 158 174 - let stderr_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 175 - let stdout_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 176 - let (began_tx, began_rx) = mpsc::channel::<()>(); 177 - let completion_status = Arc::new(CompletionStatus::new()); 159 + std::thread::spawn(move || dynamic_watch_sudo_stdout(arguments)) 160 + }; 178 161 179 - let stdout_handle = { 180 - let arguments = WatchStdoutArguments { 181 - began_tx, 182 - reader, 183 - succeed_needle: self.succeed_needle.clone(), 184 - failed_needle: self.failed_needle.clone(), 185 - start_needle: self.start_needle.clone(), 186 - output_mode: self.output_mode.clone(), 187 - stderr_collection: stderr_collection.clone(), 188 - stdout_collection: stdout_collection.clone(), 189 - completion_status: completion_status.clone(), 190 - span: Span::current(), 191 - }; 162 + let (write_stdin_pipe_r, write_stdin_pipe_w) = 163 + posix_pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 164 + let (cancel_stdin_pipe_r, cancel_stdin_pipe_w) = 165 + posix_pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 192 166 193 - std::thread::spawn(move || dynamic_watch_sudo_stdout(arguments)) 194 - }; 167 + std::thread::spawn(move || { 168 + watch_stdin_from_user( 169 + &cancel_stdin_pipe_r, 170 + master_writer, 171 + &write_stdin_pipe_r, 172 + Span::current(), 173 + ) 174 + }); 195 175 196 - let (write_stdin_pipe_r, write_stdin_pipe_w) = 197 - posix_pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 198 - let (cancel_stdin_pipe_r, cancel_stdin_pipe_w) = 199 - posix_pipe().map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 176 + info!("Setup threads"); 200 177 201 - std::thread::spawn(move || { 202 - watch_stdin_from_user( 203 - &cancel_stdin_pipe_r, 204 - master_writer, 205 - &write_stdin_pipe_r, 206 - Span::current(), 207 - ) 208 - }); 178 + let () = began_rx 179 + .recv() 180 + .map_err(|x| HiveLibError::CommandError(CommandError::RecvError(x)))?; 209 181 210 - info!("Setup threads"); 182 + drop(clobber_guard); 211 183 212 - let () = began_rx 213 - .recv() 214 - .map_err(|x| HiveLibError::CommandError(CommandError::RecvError(x)))?; 184 + if arguments.keep_stdin_open { 185 + trace!("Sending THREAD_BEGAN_SIGNAL"); 215 186 216 - drop(clobber_guard); 187 + posix_write(&cancel_stdin_pipe_w, THREAD_BEGAN_SIGNAL) 188 + .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 189 + } else { 190 + trace!("Sending THREAD_QUIT_SIGNAL"); 217 191 218 - if arguments.keep_stdin_open { 219 - trace!("Sending THREAD_BEGAN_SIGNAL"); 192 + posix_write(&cancel_stdin_pipe_w, THREAD_QUIT_SIGNAL) 193 + .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 194 + } 220 195 221 - posix_write(&cancel_stdin_pipe_w, THREAD_BEGAN_SIGNAL) 222 - .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 223 - } else { 224 - trace!("Sending THREAD_QUIT_SIGNAL"); 196 + Ok(InteractiveChildChip { 197 + child, 198 + cancel_stdin_pipe_w, 199 + write_stdin_pipe_w, 200 + stderr_collection, 201 + stdout_collection, 202 + original_command: arguments.command_string.as_ref().to_string(), 203 + completion_status, 204 + stdout_handle, 205 + }) 206 + } 225 207 226 - posix_write(&cancel_stdin_pipe_w, THREAD_QUIT_SIGNAL) 227 - .map_err(|x| HiveLibError::CommandError(CommandError::PosixPipe(x)))?; 228 - } 208 + fn create_needles() -> (Arc<String>, Arc<String>, Arc<String>) { 209 + let tmp_prefix = rand::distr::SampleString::sample_string(&Alphabetic, &mut rand::rng(), 5); 229 210 230 - Ok(InteractiveChildChip { 231 - child, 232 - cancel_stdin_pipe_w, 233 - write_stdin_pipe_w, 234 - stderr_collection, 235 - stdout_collection, 236 - original_command: arguments.command_string.as_ref().to_string(), 237 - completion_status, 238 - stdout_handle, 239 - }) 240 - } 211 + ( 212 + Arc::new(format!("{tmp_prefix}_WIRE_QUIT")), 213 + Arc::new(format!("{tmp_prefix}_WIRE_FAIL")), 214 + Arc::new(format!("{tmp_prefix}_WIRE_START")), 215 + ) 241 216 } 242 217 243 218 fn setup_master(pty_pair: &PtyPair) -> Result<(), HiveLibError> { ··· 245 220 // convert raw fd to a BorrowedFd 246 221 // safe as `fd` is dropped well before `pty_pair.master` 247 222 let fd = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(fd) }; 248 - let mut termios = tcgetattr(fd) 249 - .map_err(|x| HiveLibError::CommandError(CommandError::TermAttrs(x)))?; 223 + let mut termios = 224 + tcgetattr(fd).map_err(|x| HiveLibError::CommandError(CommandError::TermAttrs(x)))?; 250 225 251 226 termios.local_flags &= !LocalFlags::ECHO; 252 227 // Key agent does not work well without canonical mode ··· 259 234 } 260 235 261 236 Ok(()) 262 - 263 237 } 264 238 265 - fn build_command<S: AsRef<str>>(original_command: &S, target: Option<&Target>, elevated: bool, modifiers: SubCommandModifiers) -> Result<CommandBuilder, HiveLibError> { 266 - let mut command = if let Some(target) = target { 267 - let mut command = create_sync_ssh_command(target, modifiers)?; 239 + fn build_command<S: AsRef<str>>( 240 + arguments: &CommandArguments<'_, S>, 241 + ) -> Result<CommandBuilder, HiveLibError> { 242 + let mut command = if let Some(target) = arguments.target { 243 + let mut command = create_sync_ssh_command(target, arguments.modifiers)?; 268 244 269 245 // force ssh to use our pesudo terminal 270 246 command.arg("-tt"); ··· 278 254 command 279 255 }; 280 256 281 - if elevated { 282 - command.arg(format!("sudo -u root -- sh -c '{}'", original_command.as_ref())); 257 + if arguments.elevated { 258 + command.arg(format!( 259 + "sudo -u root -- sh -c '{}'", 260 + arguments.command_string.as_ref() 261 + )); 283 262 } else { 284 - command.arg(original_command.as_ref()); 263 + command.arg(arguments.command_string.as_ref()); 285 264 } 286 265 287 266 Ok(command) 288 - 289 267 } 290 268 291 269 impl CompletionStatus {
+21 -72
wire/lib/src/commands/mod.rs
··· 11 11 use crate::{ 12 12 SubCommandModifiers, 13 13 commands::{ 14 - interactive::{InteractiveChildChip, InteractiveCommand}, 15 - noninteractive::{NonInteractiveChildChip, NonInteractiveCommand}, 14 + interactive::{InteractiveChildChip, interactive_command_with_env}, 15 + noninteractive::{NonInteractiveChildChip, non_interactive_command_with_env}, 16 16 }, 17 17 errors::{CommandError, HiveLibError}, 18 18 hive::node::Target, ··· 37 37 } 38 38 39 39 #[derive(Debug)] 40 - pub(crate) struct CommandArguments<S: AsRef<str>> { 40 + pub(crate) struct CommandArguments<'t, S: AsRef<str>> { 41 + pub(crate) modifiers: SubCommandModifiers, 42 + pub(crate) target: Option<&'t Target>, 43 + pub(crate) output_mode: ChildOutputMode, 41 44 pub(crate) command_string: S, 42 45 pub(crate) keep_stdin_open: bool, 43 46 pub(crate) elevated: bool, 44 47 pub(crate) clobber_lock: Arc<Mutex<()>>, 45 48 } 46 49 47 - pub(crate) async fn get_command( 48 - target: Option<&'_ Target>, 49 - output_mode: ChildOutputMode, 50 - modifiers: SubCommandModifiers, 51 - ) -> Result<Either<InteractiveCommand<'_>, NonInteractiveCommand<'_>>, HiveLibError> { 52 - if modifiers.non_interactive { 53 - return Ok(Either::Right( 54 - NonInteractiveCommand::spawn_new(target, output_mode, modifiers).await?, 55 - )); 56 - } 57 - 58 - return Ok(Either::Left( 59 - InteractiveCommand::spawn_new(target, output_mode, modifiers).await?, 60 - )); 50 + pub(crate) fn run_command<S: AsRef<str>>( 51 + arguments: &CommandArguments<'_, S>, 52 + ) -> Result<Either<InteractiveChildChip, NonInteractiveChildChip>, HiveLibError> { 53 + run_command_with_env(arguments, HashMap::new()) 61 54 } 62 55 63 - pub(crate) trait WireCommand<'target>: Sized { 64 - type ChildChip; 65 - 66 - async fn spawn_new( 67 - target: Option<&'target Target>, 68 - output_mode: ChildOutputMode, 69 - modifiers: SubCommandModifiers, 70 - ) -> Result<Self, HiveLibError>; 71 - 72 - fn run_command<S: AsRef<str> + std::fmt::Debug>( 73 - &mut self, 74 - command_arugments: CommandArguments<S>, 75 - ) -> Result<Self::ChildChip, HiveLibError> { 76 - self.run_command_with_env(command_arugments, std::collections::HashMap::new()) 56 + pub(crate) fn run_command_with_env<S: AsRef<str>>( 57 + arguments: &CommandArguments<'_, S>, 58 + envs: HashMap<String, String>, 59 + ) -> Result<Either<InteractiveChildChip, NonInteractiveChildChip>, HiveLibError> { 60 + // use the non interactive command runner when forced or when there is simply no reason 61 + // for user input to be taken (local, and not elevated) 62 + if arguments.modifiers.non_interactive || (!arguments.elevated && arguments.target.is_none()) { 63 + return Ok(Either::Right(non_interactive_command_with_env( 64 + arguments, envs, 65 + )?)); 77 66 } 78 67 79 - fn run_command_with_env<S: AsRef<str> + std::fmt::Debug>( 80 - &mut self, 81 - command_arugments: CommandArguments<S>, 82 - args: HashMap<String, String>, 83 - ) -> Result<Self::ChildChip, HiveLibError>; 68 + Ok(Either::Left(interactive_command_with_env(arguments, envs)?)) 84 69 } 85 70 86 71 pub(crate) trait WireCommandChip { ··· 90 75 async fn write_stdin(&mut self, data: Vec<u8>) -> Result<(), HiveLibError>; 91 76 } 92 77 93 - impl WireCommand<'_> for Either<InteractiveCommand<'_>, NonInteractiveCommand<'_>> { 94 - type ChildChip = Either<InteractiveChildChip, NonInteractiveChildChip>; 95 - 96 - /// How'd you get here? 97 - async fn spawn_new( 98 - _target: Option<&'_ Target>, 99 - _output_mode: ChildOutputMode, 100 - _modifiers: SubCommandModifiers, 101 - ) -> Result<Self, HiveLibError> { 102 - unimplemented!() 103 - } 104 - 105 - fn run_command_with_env<S: AsRef<str> + std::fmt::Debug>( 106 - &mut self, 107 - command_arugments: CommandArguments<S>, 108 - envs: HashMap<String, String>, 109 - ) -> Result<Self::ChildChip, HiveLibError> { 110 - match self { 111 - Self::Left(left) => left 112 - .run_command_with_env(command_arugments, envs) 113 - .map(Either::Left), 114 - Self::Right(right) => right 115 - .run_command_with_env(command_arugments, envs) 116 - .map(Either::Right), 117 - } 118 - } 119 - } 120 - 121 78 type ExitStatus = Either<(portable_pty::ExitStatus, String), (std::process::ExitStatus, String)>; 122 79 123 80 impl WireCommandChip for Either<InteractiveChildChip, NonInteractiveChildChip> { ··· 134 91 match self { 135 92 Self::Left(left) => left.wait_till_success().await.map(Either::Left), 136 93 Self::Right(right) => right.wait_till_success().await.map(Either::Right), 137 - } 138 - } 139 - } 140 - 141 - impl ExitStatus { 142 - fn get_stdout(&self) -> &String { 143 - match self { 144 - Self::Left((_, stdout)) | Self::Right((_, stdout)) => stdout, 145 94 } 146 95 } 147 96 }
+69 -94
wire/lib/src/commands/noninteractive.rs
··· 14 14 sync::Mutex, 15 15 task::JoinSet, 16 16 }; 17 - use tracing::{debug, trace}; 17 + use tracing::{debug, instrument, trace}; 18 18 19 19 use crate::{ 20 20 SubCommandModifiers, 21 - commands::{ChildOutputMode, CommandArguments, WireCommand, WireCommandChip}, 21 + commands::{ChildOutputMode, CommandArguments, WireCommandChip}, 22 22 errors::{CommandError, HiveLibError}, 23 23 hive::node::Target, 24 24 nix_log::{SubcommandLog, get_errorish_message}, 25 25 }; 26 26 27 - pub(crate) struct NonInteractiveCommand<'t> { 28 - target: Option<&'t Target>, 29 - output_mode: Arc<ChildOutputMode>, 30 - modifiers: SubCommandModifiers, 31 - } 32 - 33 27 pub(crate) struct NonInteractiveChildChip { 34 28 error_collection: Arc<Mutex<VecDeque<String>>>, 35 29 stdout_collection: Arc<Mutex<VecDeque<String>>>, ··· 39 33 stdin: ChildStdin, 40 34 } 41 35 42 - impl<'t> WireCommand<'t> for NonInteractiveCommand<'t> { 43 - type ChildChip = NonInteractiveChildChip; 36 + #[instrument(level = "trace", skip_all, name = "run", fields(elevated = %arguments.elevated))] 37 + pub(crate) fn non_interactive_command_with_env<S: AsRef<str>>( 38 + arguments: &CommandArguments<S>, 39 + envs: HashMap<String, String>, 40 + ) -> Result<NonInteractiveChildChip, HiveLibError> { 41 + let mut command = if let Some(target) = arguments.target { 42 + create_sync_ssh_command(target, arguments.modifiers)? 43 + } else { 44 + let mut command = Command::new("sh"); 44 45 45 - /// If target is Some, then the command will be ran remotely. 46 - /// Otherwise, the command is ran locally. 47 - async fn spawn_new( 48 - target: Option<&'t Target>, 49 - output_mode: ChildOutputMode, 50 - modifiers: SubCommandModifiers, 51 - ) -> Result<Self, crate::errors::HiveLibError> { 52 - let output_mode = Arc::new(output_mode); 46 + command.arg("-c"); 53 47 54 - Ok(Self { 55 - target, 56 - output_mode, 57 - modifiers, 58 - }) 59 - } 48 + command 49 + }; 60 50 61 - fn run_command_with_env<S: AsRef<str>>( 62 - &mut self, 63 - arguments: CommandArguments<S>, 64 - envs: HashMap<String, String>, 65 - ) -> Result<Self::ChildChip, HiveLibError> { 66 - let mut command = if let Some(target) = self.target { 67 - create_sync_ssh_command(target, self.modifiers)? 68 - } else { 69 - let mut command = Command::new("sh"); 51 + let command_string = format!( 52 + "{command_string}{extra}", 53 + command_string = arguments.command_string.as_ref(), 54 + extra = match arguments.output_mode { 55 + ChildOutputMode::Raw => "", 56 + ChildOutputMode::Nix => " --log-format internal-json", 57 + } 58 + ); 70 59 71 - command.arg("-c"); 60 + let command_string = if arguments.elevated { 61 + format!("sudo -u root -- sh -c '{command_string}'") 62 + } else { 63 + command_string 64 + }; 72 65 73 - command 74 - }; 66 + debug!("{command_string}"); 75 67 76 - let command_string = format!( 77 - "{command_string}{extra}", 78 - command_string = arguments.command_string.as_ref(), 79 - extra = match *self.output_mode { 80 - ChildOutputMode::Raw => "", 81 - ChildOutputMode::Nix => " --log-format internal-json", 82 - } 83 - ); 68 + command.arg(&command_string); 69 + command.stdin(std::process::Stdio::piped()); 70 + command.stderr(std::process::Stdio::piped()); 71 + command.stdout(std::process::Stdio::piped()); 72 + command.kill_on_drop(true); 73 + // command.env_clear(); 74 + command.envs(envs); 84 75 85 - let command_string = if arguments.elevated { 86 - format!("sudo -u root -- sh -c '{command_string}'") 87 - } else { 88 - command_string 89 - }; 76 + let mut child = command.spawn().unwrap(); 77 + let error_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 78 + let stdout_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 79 + let stdin = child.stdin.take().unwrap(); 90 80 91 - debug!("{command_string}"); 81 + let stdout_handle = child 82 + .stdout 83 + .take() 84 + .ok_or(HiveLibError::CommandError(CommandError::NoHandle))?; 85 + let stderr_handle = child 86 + .stderr 87 + .take() 88 + .ok_or(HiveLibError::CommandError(CommandError::NoHandle))?; 92 89 93 - command.arg(&command_string); 94 - command.stdin(std::process::Stdio::piped()); 95 - command.stderr(std::process::Stdio::piped()); 96 - command.stdout(std::process::Stdio::piped()); 97 - command.kill_on_drop(true); 98 - // command.env_clear(); 99 - command.envs(envs); 90 + let mut joinset = JoinSet::new(); 91 + let output_mode = Arc::new(arguments.output_mode); 100 92 101 - let mut child = command.spawn().unwrap(); 102 - let error_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 103 - let stdout_collection = Arc::new(Mutex::new(VecDeque::<String>::with_capacity(10))); 104 - let stdin = child.stdin.take().unwrap(); 93 + joinset.spawn(handle_io( 94 + stderr_handle, 95 + output_mode.clone(), 96 + error_collection.clone(), 97 + true, 98 + )); 99 + joinset.spawn(handle_io( 100 + stdout_handle, 101 + output_mode.clone(), 102 + stdout_collection.clone(), 103 + false, 104 + )); 105 105 106 - let stdout_handle = child 107 - .stdout 108 - .take() 109 - .ok_or(HiveLibError::CommandError(CommandError::NoHandle))?; 110 - let stderr_handle = child 111 - .stderr 112 - .take() 113 - .ok_or(HiveLibError::CommandError(CommandError::NoHandle))?; 114 - 115 - let mut joinset = JoinSet::new(); 116 - 117 - joinset.spawn(handle_io( 118 - stderr_handle, 119 - self.output_mode.clone(), 120 - error_collection.clone(), 121 - true, 122 - )); 123 - joinset.spawn(handle_io( 124 - stdout_handle, 125 - self.output_mode.clone(), 126 - stdout_collection.clone(), 127 - false, 128 - )); 129 - 130 - Ok(NonInteractiveChildChip { 131 - error_collection, 132 - stdout_collection, 133 - child, 134 - joinset, 135 - original_command: arguments.command_string.as_ref().to_string(), 136 - stdin, 137 - }) 138 - } 106 + Ok(NonInteractiveChildChip { 107 + error_collection, 108 + stdout_collection, 109 + child, 110 + joinset, 111 + original_command: arguments.command_string.as_ref().to_string(), 112 + stdin, 113 + }) 139 114 } 140 115 141 116 impl WireCommandChip for NonInteractiveChildChip {
+6 -6
wire/lib/src/hive/node.rs
··· 11 11 use tracing::{Level, error, event, instrument, trace}; 12 12 13 13 use crate::SubCommandModifiers; 14 - use crate::commands::{ 15 - ChildOutputMode, CommandArguments, WireCommand, WireCommandChip, get_command, 16 - }; 14 + use crate::commands::{ChildOutputMode, CommandArguments, WireCommandChip, run_command_with_env}; 17 15 use crate::errors::NetworkError; 18 16 use crate::hive::HiveLocation; 19 17 use crate::hive::steps::build::Build; ··· 207 205 self.target.user, host 208 206 ); 209 207 210 - let mut command = get_command(None, ChildOutputMode::Nix, modifiers).await?; 211 - let output = command.run_command_with_env( 212 - CommandArguments { 208 + let output = run_command_with_env( 209 + &CommandArguments { 210 + target: None, 211 + output_mode: ChildOutputMode::Nix, 212 + modifiers, 213 213 command_string, 214 214 keep_stdin_open: false, 215 215 elevated: false,
+20 -27
wire/lib/src/hive/steps/activate.rs
··· 7 7 8 8 use crate::{ 9 9 HiveLibError, 10 - commands::{ChildOutputMode, CommandArguments, WireCommand, WireCommandChip, get_command}, 10 + commands::{ChildOutputMode, CommandArguments, WireCommandChip, run_command}, 11 11 errors::{ActivationError, NetworkError}, 12 12 hive::node::{Context, ExecuteStep, Goal, SwitchToConfigurationGoal, should_apply_locally}, 13 13 }; ··· 47 47 ) -> Result<(), HiveLibError> { 48 48 info!("Setting profiles in anticipation for switch-to-configuration {goal}"); 49 49 50 - let mut command = get_command( 51 - if should_apply_locally(ctx.node.allow_local_deployment, &ctx.name.to_string()) { 50 + let command_string = format!("nix-env -p /nix/var/nix/profiles/system/ --set {built_path}"); 51 + 52 + let child = run_command(&CommandArguments { 53 + target: if should_apply_locally(ctx.node.allow_local_deployment, &ctx.name.to_string()) { 52 54 None 53 55 } else { 54 56 Some(&ctx.node.target) 55 57 }, 56 - ChildOutputMode::Nix, 57 - ctx.modifiers, 58 - ) 59 - .await?; 60 - let command_string = format!("nix-env -p /nix/var/nix/profiles/system/ --set {built_path}"); 61 - 62 - let child = command.run_command(CommandArguments { 58 + output_mode: ChildOutputMode::Nix, 59 + modifiers: ctx.modifiers, 63 60 command_string, 64 61 keep_stdin_open: false, 65 62 elevated: true, ··· 98 95 99 96 info!("Running switch-to-configuration {goal}"); 100 97 101 - let mut command = get_command( 102 - if should_apply_locally(ctx.node.allow_local_deployment, &ctx.name.to_string()) { 103 - None 104 - } else { 105 - Some(&ctx.node.target) 106 - }, 107 - ChildOutputMode::Raw, 108 - ctx.modifiers, 109 - ) 110 - .await?; 111 - 112 98 let command_string = format!( 113 99 "{built_path}/bin/switch-to-configuration {}", 114 100 match goal { ··· 119 105 } 120 106 ); 121 107 122 - let child = command.run_command(CommandArguments { 108 + let child = run_command(&CommandArguments { 109 + target: if should_apply_locally(ctx.node.allow_local_deployment, &ctx.name.to_string()) 110 + { 111 + None 112 + } else { 113 + Some(&ctx.node.target) 114 + }, 115 + output_mode: ChildOutputMode::Raw, 116 + modifiers: ctx.modifiers, 123 117 command_string, 124 118 keep_stdin_open: false, 125 119 elevated: true, ··· 140 134 return Ok(()); 141 135 } 142 136 143 - let mut command = 144 - get_command(Some(&ctx.node.target), ChildOutputMode::Nix, ctx.modifiers) 145 - .await?; 146 - 147 137 warn!("Rebooting {name}!", name = ctx.name); 148 138 149 - let reboot = command.run_command(CommandArguments { 139 + let reboot = run_command(&CommandArguments { 140 + target: Some(&ctx.node.target), 141 + output_mode: ChildOutputMode::Nix, 142 + modifiers: ctx.modifiers, 150 143 command_string: "reboot now", 151 144 keep_stdin_open: false, 152 145 elevated: true,
+23 -22
wire/lib/src/hive/steps/build.rs
··· 8 8 use crate::{ 9 9 HiveLibError, 10 10 commands::{ 11 - ChildOutputMode, CommandArguments, WireCommand, WireCommandChip, 12 - noninteractive::NonInteractiveCommand, 11 + ChildOutputMode, CommandArguments, Either, WireCommandChip, run_command_with_env 13 12 }, 14 13 hive::node::{Context, ExecuteStep, Goal}, 15 14 }; ··· 37 36 build --print-build-logs --no-link --print-out-paths {top_level}" 38 37 ); 39 38 40 - let mut command = NonInteractiveCommand::spawn_new( 41 - if ctx.node.build_remotely { 42 - Some(&ctx.node.target) 43 - } else { 44 - None 45 - }, 46 - ChildOutputMode::Nix, 47 - ctx.modifiers, 48 - ) 49 - .await?; 50 - 51 - let (_, stdout) = command 52 - .run_command(CommandArguments { 39 + let status = run_command_with_env( 40 + &CommandArguments { 41 + target: if ctx.node.build_remotely { 42 + Some(&ctx.node.target) 43 + } else { 44 + None 45 + }, 46 + output_mode: ChildOutputMode::Nix, 47 + modifiers: ctx.modifiers, 53 48 command_string, 54 49 keep_stdin_open: false, 55 50 elevated: false, 56 51 clobber_lock: ctx.clobber_lock.clone(), 57 - })? 58 - .wait_till_success() 59 - .await 60 - .map_err(|source| HiveLibError::NixBuildError { 61 - name: ctx.name.clone(), 62 - source, 63 - })?; 52 + }, 53 + std::collections::HashMap::new(), 54 + )? 55 + .wait_till_success() 56 + .await 57 + .map_err(|source| HiveLibError::NixBuildError { 58 + name: ctx.name.clone(), 59 + source, 60 + })?; 61 + 62 + let stdout = match status { 63 + Either::Left((_, stdout)) | Either::Right((_, stdout)) => stdout 64 + }; 64 65 65 66 info!("Built output: {stdout:?}"); 66 67 ctx.state.build = Some(stdout);
+8 -12
wire/lib/src/hive/steps/keys.rs
··· 24 24 25 25 use crate::HiveLibError; 26 26 use crate::commands::common::push; 27 - use crate::commands::{ 28 - ChildOutputMode, CommandArguments, WireCommand, WireCommandChip, get_command, 29 - }; 27 + use crate::commands::{ChildOutputMode, CommandArguments, WireCommandChip, run_command}; 30 28 use crate::errors::KeyError; 31 29 use crate::hive::node::{ 32 30 Context, ExecuteStep, Goal, Push, SwitchToConfigurationGoal, should_apply_locally, ··· 235 233 return Ok(()); 236 234 } 237 235 238 - let mut command = get_command( 239 - if should_apply_locally(ctx.node.allow_local_deployment, &ctx.name.to_string()) { 236 + let command_string = format!("{agent_directory}/bin/key_agent"); 237 + 238 + let mut child = run_command(&CommandArguments { 239 + target: if should_apply_locally(ctx.node.allow_local_deployment, &ctx.name.to_string()) 240 + { 240 241 None 241 242 } else { 242 243 Some(&ctx.node.target) 243 244 }, 244 - ChildOutputMode::Raw, 245 - ctx.modifiers, 246 - ) 247 - .await?; 248 - let command_string = format!("{agent_directory}/bin/key_agent"); 249 - 250 - let mut child = command.run_command(CommandArguments { 245 + output_mode: ChildOutputMode::Raw, 246 + modifiers: ctx.modifiers, 251 247 command_string, 252 248 keep_stdin_open: true, 253 249 elevated: true,