ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

re: refactor how nodes are executed (#400)

Signed-off-by: marshmallow <github@althaea.zone>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>

authored by

marshmallow
autofix-ci[bot]
and committed by
GitHub
6abbf025 ad6fac18

+1307 -1025
+5
CHANGELOG.md
··· 17 17 18 18 - The domain for documentation to be `wire.forall.systems`. The previous URL 19 19 will continue to be available but may redirect in the future. 20 + - Refactored node execution to be in two distinct phases, "planning" and 21 + "execution". Previously, picking what steps would be run was done on the fly 22 + during execution. 23 + - Cases where there are no keys to deploy, such as having 0 keys or filtered 24 + keys, the "Key" step will not be planned when it previously would have. 20 25 21 26 ### Fixed 22 27
+16 -19
crates/cli/src/apply.rs
··· 11 11 use std::sync::atomic::AtomicBool; 12 12 use thiserror::Error; 13 13 use tracing::{error, info}; 14 - use wire_core::hive::node::{Context, GoalExecutor, Name, Node, Objective, StepState}; 14 + use wire_core::hive::executor::execute; 15 + use wire_core::hive::node::{Name, Node}; 16 + use wire_core::hive::plan::{Goal, plan_for_node}; 15 17 use wire_core::hive::{Hive, HiveLocation}; 16 18 use wire_core::status::STATUS; 17 19 use wire_core::{SubCommandModifiers, errors::HiveLibError}; ··· 96 98 97 99 pub async fn apply<F>( 98 100 hive: &mut Hive, 99 - should_shutdown: Arc<AtomicBool>, 101 + should_quit: Arc<AtomicBool>, 100 102 location: HiveLocation, 101 103 args: CommonVerbArgs, 102 104 partition: Partitions, 103 - make_objective: F, 105 + make_goal: F, 104 106 mut modifiers: SubCommandModifiers, 105 107 ) -> Result<()> 106 108 where 107 - F: Fn(&Name, &Node) -> Objective, 109 + F: Fn(&Name, &Node) -> Goal, 108 110 { 109 111 let location = Arc::new(location); 110 112 ··· 142 144 .iter_mut() 143 145 .filter(|(name, _)| partitioned_names.contains(name)) 144 146 .map(|(name, node)| { 145 - info!("Resolved {:?} to include {}", args.on, name); 147 + let goal = make_goal(name, node); 146 148 147 - let objective = make_objective(name, node); 148 - 149 - let context = Context { 149 + let plan = plan_for_node( 150 150 node, 151 - name, 152 - objective, 153 - state: StepState::default(), 154 - hive_location: location.clone(), 155 - modifiers, 156 - should_quit: should_shutdown.clone(), 157 - }; 158 - 159 - GoalExecutor::new(context) 160 - .execute() 161 - .map(move |result| (name, result)) 151 + name.clone(), 152 + &goal, 153 + location.clone(), 154 + &modifiers, 155 + should_quit.clone(), 156 + ); 157 + execute(plan).map(move |result| (name, result)) 162 158 }) 163 159 .peekable(); 164 160 ··· 168 164 169 165 let futures = futures::stream::iter(set).buffer_unordered(args.parallel); 170 166 let result = futures.collect::<Vec<_>>().await; 167 + 171 168 let (successful, errors): (Vec<_>, Vec<_>) = 172 169 result 173 170 .into_iter()
+3 -1
crates/cli/src/cli.rs
··· 11 11 use tokio::runtime::Handle; 12 12 use wire_core::SubCommandModifiers; 13 13 use wire_core::commands::common::get_hive_node_names; 14 - use wire_core::hive::node::{Goal as HiveGoal, HandleUnreachable, Name, SwitchToConfigurationGoal}; 14 + use wire_core::hive::node::{ 15 + ApplyGoal as HiveGoal, HandleUnreachable, Name, SwitchToConfigurationGoal, 16 + }; 15 17 use wire_core::hive::{Hive, get_hive_location}; 16 18 17 19 use std::io::IsTerminal;
+6 -5
crates/cli/src/main.rs
··· 28 28 use wire_core::commands::common::get_hive_node_names; 29 29 use wire_core::hive::Hive; 30 30 use wire_core::hive::get_hive_location; 31 - use wire_core::hive::node::ApplyObjective; 32 - use wire_core::hive::node::Objective; 33 31 use wire_core::hive::node::should_apply_locally; 32 + use wire_core::hive::plan::ApplyGoalArgs; 33 + use wire_core::hive::plan::Goal; 34 34 35 35 #[macro_use] 36 36 extern crate enum_display_derive; ··· 86 86 match args.command { 87 87 cli::Commands::Apply(apply_args) => { 88 88 let mut hive = Hive::new_from_path(&location, cache.clone(), modifiers).await?; 89 - let goal: wire_core::hive::node::Goal = apply_args.goal.clone().try_into().unwrap(); 89 + let goal = apply_args.goal.clone().try_into().unwrap(); 90 90 91 91 // Respect user's --always-build-local arg 92 92 hive.force_always_local(apply_args.always_build_local)?; ··· 98 98 apply_args.common, 99 99 Partitions::default(), 100 100 |name, node| { 101 - Objective::Apply(ApplyObjective { 101 + Goal::Apply(ApplyGoalArgs { 102 102 goal, 103 103 no_keys: apply_args.no_keys, 104 104 reboot: apply_args.reboot, ··· 108 108 &name.0, 109 109 ), 110 110 handle_unreachable: apply_args.handle_unreachable.clone().into(), 111 + host_platform: node.host_platform.clone(), 111 112 }) 112 113 }, 113 114 modifiers, ··· 123 124 location, 124 125 build_args.common, 125 126 build_args.partition.unwrap_or_default(), 126 - |_name, _node| Objective::BuildLocally, 127 + |_name, _node| Goal::Build, 127 128 modifiers, 128 129 ) 129 130 .await?;
+13 -11
crates/core/src/commands/common.rs
··· 14 14 errors::{CommandError, HiveInitialisationError, HiveLibError}, 15 15 hive::{ 16 16 HiveLocation, 17 - node::{Context, Objective, Push}, 17 + node::{Context, Push, SharedTarget}, 18 18 }, 19 19 }; 20 20 ··· 28 28 } 29 29 } 30 30 31 - pub async fn push(context: &Context<'_>, push: Push<'_>) -> Result<(), HiveLibError> { 31 + pub async fn push( 32 + context: &Context, 33 + target: &SharedTarget, 34 + push: Push<'_>, 35 + substitute_on_destination: bool, 36 + ) -> Result<(), HiveLibError> { 37 + let target = target.0.read().await; 38 + 32 39 let mut command_string = CommandStringBuilder::nix(); 33 40 34 41 command_string.args(&["--extra-experimental-features", "nix-command", "copy"]); 35 - if let Objective::Apply(apply_objective) = context.objective { 36 - command_string.opt_arg( 37 - apply_objective.substitute_on_destination, 38 - "--substitute-on-destination", 39 - ); 40 - } 42 + command_string.opt_arg(substitute_on_destination, "--substitute-on-destination"); 41 43 command_string.arg("--to"); 42 44 command_string.args(&[ 43 45 format!( 44 46 "ssh://{user}@{host}", 45 - user = context.node.target.user, 46 - host = context.node.target.get_preferred_host()?, 47 + user = target.user, 48 + host = target.get_preferred_host()?, 47 49 ), 48 50 match push { 49 51 Push::Derivation(drv) => format!("{drv} --derivation"), ··· 56 58 .mode(crate::commands::ChildOutputMode::Nix), 57 59 HashMap::from([( 58 60 "NIX_SSHOPTS".into(), 59 - context.node.target.create_ssh_opts(context.modifiers)?, 61 + target.create_ssh_opts(context.modifiers)?, 60 62 )]), 61 63 ) 62 64 .await?;
+20 -15
crates/core/src/commands/mod.rs
··· 1 1 // SPDX-License-Identifier: AGPL-3.0-or-later 2 2 // Copyright 2024-2025 wire Contributors 3 3 4 - use crate::commands::pty::{InteractiveChildChip, interactive_command_with_env}; 5 - use std::{collections::HashMap, str::from_utf8, sync::LazyLock}; 4 + use crate::{ 5 + commands::pty::{InteractiveChildChip, interactive_command_with_env}, 6 + hive::node::SharedTarget, 7 + }; 8 + use std::{ 9 + collections::HashMap, 10 + str::from_utf8, 11 + sync::{Arc, LazyLock}, 12 + }; 6 13 7 14 use aho_corasick::AhoCorasick; 8 15 use gjson::Value; ··· 15 22 SubCommandModifiers, 16 23 commands::noninteractive::{NonInteractiveChildChip, non_interactive_command_with_env}, 17 24 errors::{CommandError, HiveLibError}, 18 - hive::node::{Node, Target}, 19 25 }; 20 26 21 27 pub(crate) mod builder; ··· 37 43 } 38 44 39 45 #[derive(Debug)] 40 - pub(crate) struct CommandArguments<'t, S: AsRef<str>> { 46 + pub(crate) struct CommandArguments<S: AsRef<str>> { 41 47 modifiers: SubCommandModifiers, 42 - target: Option<&'t Target>, 48 + target: Option<SharedTarget>, 43 49 output_mode: ChildOutputMode, 44 50 command_string: S, 45 51 keep_stdin_open: bool, ··· 55 61 .unwrap() 56 62 }); 57 63 58 - impl<'a, S: AsRef<str>> CommandArguments<'a, S> { 64 + impl<S: AsRef<str>> CommandArguments<S> { 59 65 pub(crate) const fn new(command_string: S, modifiers: SubCommandModifiers) -> Self { 60 66 Self { 61 67 command_string, ··· 68 74 } 69 75 } 70 76 71 - pub(crate) const fn execute_on_remote(mut self, target: Option<&'a Target>) -> Self { 77 + pub(crate) fn execute_on_remote(mut self, target: Option<SharedTarget>) -> Self { 72 78 self.target = target; 73 79 self 74 80 } ··· 83 89 self 84 90 } 85 91 86 - pub(crate) fn elevated(mut self, node: &Node) -> Self { 87 - self.privilege_escalation_command = 88 - Some(node.privilege_escalation_command.iter().join(" ")); 92 + pub(crate) fn privileged(mut self, escalation_command: &[Arc<str>]) -> Self { 93 + self.privilege_escalation_command = Some(escalation_command.iter().join(" ")); 89 94 self 90 95 } 91 96 ··· 100 105 } 101 106 102 107 pub(crate) async fn run_command<S: AsRef<str>>( 103 - arguments: &CommandArguments<'_, S>, 108 + arguments: &CommandArguments<S>, 104 109 ) -> Result<Either<InteractiveChildChip, NonInteractiveChildChip>, HiveLibError> { 105 110 run_command_with_env(arguments, HashMap::new()).await 106 111 } 107 112 108 113 pub(crate) async fn run_command_with_env<S: AsRef<str>>( 109 - arguments: &CommandArguments<'_, S>, 114 + arguments: &CommandArguments<S>, 110 115 envs: HashMap<String, String>, 111 116 ) -> Result<Either<InteractiveChildChip, NonInteractiveChildChip>, HiveLibError> { 112 117 // use the non interactive command runner when forced ··· 114 119 if arguments.modifiers.non_interactive 115 120 || (arguments.target.is_none() && !arguments.is_elevated()) 116 121 { 117 - return Ok(Either::Right(non_interactive_command_with_env( 118 - arguments, envs, 119 - )?)); 122 + return Ok(Either::Right( 123 + non_interactive_command_with_env(arguments, envs).await?, 124 + )); 120 125 } 121 126 122 127 Ok(Either::Left(
+8 -7
crates/core/src/commands/noninteractive.rs
··· 11 11 SubCommandModifiers, 12 12 commands::{ChildOutputMode, CommandArguments, WireCommandChip}, 13 13 errors::{CommandError, HiveLibError}, 14 - hive::node::Target, 14 + hive::node::SharedTarget, 15 15 }; 16 16 use itertools::Itertools; 17 17 use tokio::{ ··· 32 32 } 33 33 34 34 #[instrument(skip_all, name = "run", fields(elevated = %arguments.is_elevated()))] 35 - pub(crate) fn non_interactive_command_with_env<S: AsRef<str>>( 35 + pub(crate) async fn non_interactive_command_with_env<S: AsRef<str>>( 36 36 arguments: &CommandArguments<S>, 37 37 envs: HashMap<String, String>, 38 38 ) -> Result<NonInteractiveChildChip, HiveLibError> { 39 - let mut command = if let Some(target) = arguments.target { 40 - create_sync_ssh_command(target, arguments.modifiers)? 39 + let mut command = if let Some(ref target) = arguments.target { 40 + create_sync_ssh_command(target, arguments.modifiers).await? 41 41 } else { 42 42 let mut command = Command::new("sh"); 43 43 ··· 187 187 debug!("io_handler: goodbye!"); 188 188 } 189 189 190 - fn create_sync_ssh_command( 191 - target: &Target, 190 + async fn create_sync_ssh_command( 191 + target: &SharedTarget, 192 192 modifiers: SubCommandModifiers, 193 193 ) -> Result<Command, HiveLibError> { 194 + let target = target.0.read().await; 194 195 let mut command = Command::new("ssh"); 195 - command.args(target.create_ssh_args(modifiers, true)?); 196 + command.args(target.create_ssh_args(modifiers)?); 196 197 command.arg(target.get_preferred_host()?.to_string()); 197 198 198 199 Ok(command)
+29 -23
crates/core/src/commands/pty/mod.rs
··· 2 2 // Copyright 2024-2025 wire Contributors 3 3 4 4 use crate::commands::pty::output::{WatchStdoutArguments, handle_pty_stdout}; 5 + use crate::hive::node::SharedTarget; 5 6 use crate::status::STATUS; 6 7 use aho_corasick::PatternID; 7 8 use itertools::Itertools; ··· 29 30 use crate::{ 30 31 commands::{ChildOutputMode, WireCommandChip}, 31 32 errors::HiveLibError, 32 - hive::node::Target, 33 33 }; 34 34 35 35 mod input; ··· 85 85 const IO_SUBS: &str = "1> >(while IFS= read -r line; do echo \"#$line\"; done)"; 86 86 87 87 fn create_ending_segment<S: AsRef<str>>( 88 - arguments: &CommandArguments<'_, S>, 88 + arguments: &CommandArguments<S>, 89 89 needles: &Needles, 90 90 ) -> String { 91 91 let Needles { ··· 110 110 } 111 111 112 112 fn create_starting_segment<S: AsRef<str>>( 113 - arguments: &CommandArguments<'_, S>, 113 + arguments: &CommandArguments<S>, 114 114 start_needle: &Arc<Vec<u8>>, 115 115 ) -> String { 116 116 if matches!(arguments.output_mode, ChildOutputMode::Interactive) { ··· 125 125 126 126 #[instrument(skip_all, name = "run-int", fields(elevated = %arguments.is_elevated(), mode = ?arguments.output_mode))] 127 127 pub(crate) async fn interactive_command_with_env<S: AsRef<str>>( 128 - arguments: &CommandArguments<'_, S>, 128 + arguments: &CommandArguments<S>, 129 129 envs: std::collections::HashMap<String, String>, 130 130 ) -> Result<InteractiveChildChip, HiveLibError> { 131 - print_authenticate_warning(arguments)?; 131 + print_authenticate_warning(arguments).await?; 132 132 133 133 let needles = create_needles(); 134 134 let pty_system = NativePtySystem::default(); ··· 148 148 149 149 debug!("{command_string}"); 150 150 151 - let mut command = build_command(arguments, command_string)?; 151 + let mut command = build_command(arguments, command_string).await?; 152 152 153 153 // give command all env vars 154 154 for (key, value) in envs { ··· 242 242 }) 243 243 } 244 244 245 - fn print_authenticate_warning<S: AsRef<str>>( 245 + async fn print_authenticate_warning<S: AsRef<str>>( 246 246 arguments: &CommandArguments<S>, 247 247 ) -> Result<(), HiveLibError> { 248 248 if !arguments.is_elevated() { 249 249 return Ok(()); 250 250 } 251 251 252 + let target_display = if let Some(ref target) = arguments.target { 253 + let target = target.0.read().await; 254 + 255 + format!( 256 + "{}@{}:{}", 257 + target.user, 258 + target.get_preferred_host()?, 259 + target.port 260 + ) 261 + } else { 262 + "localhost (!)".to_string() 263 + }; 264 + 252 265 let _ = STATUS.lock().write_above_status( 253 266 &format!( 254 - "{} | Authenticate for \"sudo {}\":\n", 255 - arguments 256 - .target 257 - .map_or(Ok("localhost (!)".to_string()), |target| Ok(format!( 258 - "{}@{}:{}", 259 - target.user, 260 - target.get_preferred_host()?, 261 - target.port 262 - )))?, 267 + "{target_display} | Authenticate for \"sudo {}\":\n", 263 268 arguments.command_string.as_ref() 264 269 ) 265 270 .into_bytes(), ··· 306 311 Ok(()) 307 312 } 308 313 309 - fn build_command<S: AsRef<str>>( 310 - arguments: &CommandArguments<'_, S>, 314 + async fn build_command<S: AsRef<str>>( 315 + arguments: &CommandArguments<S>, 311 316 command_string: &String, 312 317 ) -> Result<CommandBuilder, HiveLibError> { 313 - let mut command = if let Some(target) = arguments.target { 314 - let mut command = create_int_ssh_command(target, arguments.modifiers)?; 318 + let mut command = if let Some(ref target) = arguments.target { 319 + let mut command = create_int_ssh_command(target, arguments.modifiers).await?; 315 320 316 321 // force ssh to use our pseudo terminal 317 322 command.arg("-tt"); ··· 428 433 } 429 434 } 430 435 431 - fn create_int_ssh_command( 432 - target: &Target, 436 + async fn create_int_ssh_command( 437 + target: &SharedTarget, 433 438 modifiers: SubCommandModifiers, 434 439 ) -> Result<portable_pty::CommandBuilder, HiveLibError> { 440 + let target = target.0.read().await; 435 441 let mut command = portable_pty::CommandBuilder::new("ssh"); 436 - command.args(target.create_ssh_args(modifiers, false)?); 442 + command.args(target.create_ssh_args(modifiers)?); 437 443 command.arg(target.get_preferred_host()?.to_string()); 438 444 Ok(command) 439 445 }
+167
crates/core/src/hive/executor.rs
··· 1 + use crate::hive::node::Step; 2 + use std::{assert_matches::debug_assert_matches, sync::Arc}; 3 + 4 + use tracing::{Instrument, Span, debug, error, event, instrument}; 5 + 6 + use crate::{ 7 + EvalGoal, SubCommandModifiers, 8 + commands::common::evaluate_hive_attribute, 9 + errors::HiveLibError, 10 + hive::{ 11 + HiveLocation, 12 + node::{Context, Derivation, ExecuteStep, Name}, 13 + plan::NodePlan, 14 + }, 15 + status::STATUS, 16 + }; 17 + 18 + /// returns Err if the application should shut down. 19 + fn app_shutdown_guard(context: &Context) -> Result<(), HiveLibError> { 20 + if context 21 + .should_quit 22 + .load(std::sync::atomic::Ordering::Relaxed) 23 + { 24 + return Err(HiveLibError::Sigint); 25 + } 26 + 27 + Ok(()) 28 + } 29 + 30 + /// Task that evaluates the node. 31 + #[instrument(skip_all, name = "eval")] 32 + async fn evaluate_task( 33 + tx: tokio::sync::oneshot::Sender<Result<Derivation, HiveLibError>>, 34 + hive_location: Arc<HiveLocation>, 35 + name: Name, 36 + modifiers: SubCommandModifiers, 37 + ) { 38 + let output = evaluate_hive_attribute(&hive_location, &EvalGoal::GetTopLevel(&name), modifiers) 39 + .await 40 + .and_then(|output| { 41 + serde_json::from_str(&output).map_err(|e| { 42 + HiveLibError::HiveInitialisationError( 43 + crate::errors::HiveInitialisationError::ParseEvaluateError(e), 44 + ) 45 + }) 46 + }); 47 + 48 + debug!(output = ?output, done = true); 49 + 50 + let _ = tx.send(output); 51 + } 52 + 53 + /// Iterates and executes the steps in the plan. 54 + /// Performs some optimisations such as greedily executing evaluation before 55 + /// other steps independent of evaluation's result. 56 + #[instrument(skip_all, fields(node = %plan.context.name))] 57 + pub async fn execute(mut plan: NodePlan) -> Result<(), HiveLibError> { 58 + app_shutdown_guard(&plan.context)?; 59 + 60 + let (tx, rx) = tokio::sync::oneshot::channel(); 61 + plan.context.state.evaluation_rx = Some(rx); 62 + 63 + // The name of this span should never be changed without updating 64 + // `wire/cli/tracing_setup.rs` 65 + debug_assert_matches!(Span::current().metadata().unwrap().name(), "execute"); 66 + // This span should always have a `node` field by the same file 67 + debug_assert!( 68 + Span::current() 69 + .metadata() 70 + .unwrap() 71 + .fields() 72 + .field("node") 73 + .is_some() 74 + ); 75 + 76 + if plan.greedy_evaluate { 77 + tokio::spawn( 78 + evaluate_task( 79 + tx, 80 + plan.context.hive_location.clone(), 81 + plan.context.name.clone(), 82 + plan.context.modifiers, 83 + ) 84 + .in_current_span(), 85 + ); 86 + } 87 + 88 + let length = plan.steps.len(); 89 + 90 + for (position, step) in plan.steps.iter().enumerate() { 91 + app_shutdown_guard(&plan.context)?; 92 + 93 + event!( 94 + tracing::Level::INFO, 95 + step = step.to_string(), 96 + progress = format!("{}/{length}", position + 1) 97 + ); 98 + 99 + STATUS 100 + .lock() 101 + .set_node_step(&plan.context.name, step.to_string()); 102 + 103 + if let Err(err) = step.execute(&mut plan.context).await.inspect_err(|_| { 104 + error!("Failed to execute `{step}`"); 105 + }) { 106 + if matches!(step, Step::Ping(..)) && plan.ignore_failed_ping { 107 + return Ok(()); 108 + } 109 + 110 + STATUS.lock().mark_node_failed(&plan.context.name); 111 + 112 + return Err(err); 113 + } 114 + } 115 + 116 + STATUS.lock().mark_node_succeeded(&plan.context.name); 117 + 118 + Ok(()) 119 + } 120 + 121 + #[cfg(test)] 122 + mod tests { 123 + use crate::{ 124 + SubCommandModifiers, 125 + errors::HiveLibError, 126 + function_name, get_test_path, 127 + hive::{ 128 + executor::execute, 129 + node::{ApplyGoal, HandleUnreachable, Name, Node, SwitchToConfigurationGoal}, 130 + plan::{ApplyGoalArgs, Goal, plan_for_node}, 131 + }, 132 + location, 133 + }; 134 + use std::{assert_matches::assert_matches, path::PathBuf}; 135 + use std::{ 136 + env, 137 + sync::{Arc, atomic::AtomicBool}, 138 + }; 139 + 140 + #[tokio::test] 141 + async fn plan_executor_quits_sigint() { 142 + let location = location!(get_test_path!()); 143 + let node = Node::default(); 144 + let name = &Name(function_name!().into()); 145 + let should_quit = Arc::new(AtomicBool::new(true)); 146 + let plan = plan_for_node( 147 + &node.clone(), 148 + name.clone(), 149 + &Goal::Apply(ApplyGoalArgs { 150 + goal: ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), 151 + should_apply_locally: true, 152 + no_keys: true, 153 + substitute_on_destination: true, 154 + reboot: false, 155 + host_platform: "x86_64-linux".into(), 156 + handle_unreachable: HandleUnreachable::default(), 157 + }), 158 + location.clone().into(), 159 + &SubCommandModifiers::default(), 160 + should_quit.clone(), 161 + ); 162 + 163 + let status = execute(plan).await; 164 + 165 + assert_matches!(status, Err(HiveLibError::Sigint)); 166 + } 167 + }
+18 -15
crates/core/src/hive/mod.rs
··· 22 22 use crate::commands::{CommandArguments, Either, WireCommandChip, run_command}; 23 23 use crate::errors::{HiveInitialisationError, HiveLocationError}; 24 24 use crate::{EvalGoal, HiveLibError, SubCommandModifiers}; 25 + pub mod executor; 25 26 pub mod node; 27 + pub mod plan; 26 28 pub mod steps; 27 29 28 30 #[derive(Serialize, Deserialize, Debug, PartialEq)] ··· 180 182 } 181 183 } 182 184 183 - #[derive(Debug, PartialEq, Eq, Deserialize)] 185 + #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] 184 186 pub struct FlakePrefetch { 185 187 pub(crate) hash: String, 186 188 #[serde(rename = "storePath")] 187 189 pub(crate) store_path: String, 188 190 } 189 191 190 - #[derive(Debug, PartialEq, Eq)] 192 + #[derive(Debug, Clone, PartialEq, Eq)] 191 193 pub enum HiveLocation { 192 194 HiveNix(PathBuf), 193 195 Flake { ··· 288 290 289 291 #[cfg(test)] 290 292 mod tests { 291 - use im::vector; 292 - 293 293 use crate::{ 294 294 errors::CommandError, 295 295 get_test_path, ··· 346 346 347 347 let node = Node { 348 348 target: node::Target::from_host("name"), 349 - keys: vector![Key { 350 - name: "different-than-a".into(), 351 - dest_dir: "/run/keys/".into(), 352 - path: "/run/keys/different-than-a".into(), 353 - group: "root".into(), 354 - user: "root".into(), 355 - permissions: "0600".into(), 356 - source: Source::String("hi".into()), 357 - upload_at: UploadKeyAt::PreActivation, 358 - environment: im::HashMap::new() 359 - }], 349 + keys: vec![ 350 + Key { 351 + name: "different-than-a".into(), 352 + dest_dir: "/run/keys/".into(), 353 + path: "/run/keys/different-than-a".into(), 354 + group: "root".into(), 355 + user: "root".into(), 356 + permissions: "0600".into(), 357 + source: Source::String("hi".into()), 358 + upload_at: UploadKeyAt::PreActivation, 359 + environment: im::HashMap::new(), 360 + } 361 + .into(), 362 + ], 360 363 build_remotely: true, 361 364 ..Default::default() 362 365 };
+115 -585
crates/core/src/hive/node.rs
··· 5 5 use enum_dispatch::enum_dispatch; 6 6 use gethostname::gethostname; 7 7 use serde::{Deserialize, Serialize}; 8 - use std::assert_matches::debug_assert_matches; 9 8 use std::fmt::Display; 10 9 use std::sync::Arc; 11 10 use std::sync::atomic::AtomicBool; 12 - use tokio::sync::oneshot; 13 - use tracing::{Instrument, Level, Span, debug, error, event, instrument, trace}; 11 + use tokio::sync::{RwLock, oneshot}; 12 + use tracing::instrument; 14 13 15 14 use crate::commands::builder::CommandStringBuilder; 16 - use crate::commands::common::evaluate_hive_attribute; 17 15 use crate::commands::{CommandArguments, WireCommandChip, run_command}; 18 16 use crate::errors::NetworkError; 19 17 use crate::hive::HiveLocation; 20 18 use crate::hive::steps::build::Build; 21 - use crate::hive::steps::cleanup::CleanUp; 22 19 use crate::hive::steps::evaluate::Evaluate; 23 - use crate::hive::steps::keys::{Key, Keys, PushKeyAgent, UploadKeyAt}; 20 + use crate::hive::steps::keys::{Key, Keys, PushKeyAgent}; 24 21 use crate::hive::steps::ping::Ping; 25 22 use crate::hive::steps::push::{PushBuildOutput, PushEvaluatedOutput}; 26 - use crate::status::STATUS; 27 - use crate::{EvalGoal, StrictHostKeyChecking, SubCommandModifiers}; 23 + use crate::{StrictHostKeyChecking, SubCommandModifiers}; 28 24 29 25 use super::HiveLibError; 30 26 use super::steps::activate::SwitchToConfiguration; ··· 44 40 current_host: usize, 45 41 } 46 42 43 + #[derive(Clone, Debug)] 44 + pub struct SharedTarget(pub Arc<RwLock<Target>>); 45 + 46 + // Hack specifically for testing if two steps that have the same shared target 47 + // are equal 48 + #[cfg(test)] 49 + impl PartialEq for SharedTarget { 50 + fn eq(&self, other: &Self) -> bool { 51 + let self_guard = self 52 + .0 53 + .try_read() 54 + .expect("failed to target read in test context"); 55 + let other_guard = other 56 + .0 57 + .try_read() 58 + .expect("failed to target read in test context"); 59 + 60 + *self_guard == *other_guard 61 + } 62 + } 63 + 47 64 impl Target { 48 65 #[instrument(ret(level = tracing::Level::DEBUG), skip_all)] 49 66 pub fn create_ssh_opts(&self, modifiers: SubCommandModifiers) -> Result<String, HiveLibError> { 50 - self.create_ssh_args(modifiers, false).map(|x| x.join(" ")) 67 + self.create_ssh_args(modifiers).map(|x| x.join(" ")) 51 68 } 52 69 53 70 #[instrument(ret(level = tracing::Level::DEBUG))] 54 71 pub fn create_ssh_args( 55 72 &self, 56 73 modifiers: SubCommandModifiers, 57 - non_interactive_forced: bool, 58 74 ) -> Result<Vec<String>, HiveLibError> { 59 75 let mut vector = vec![ 60 76 "-l".to_string(), ··· 81 97 82 98 Ok(vector) 83 99 } 100 + 101 + /// Tests the connection to a node 102 + pub async fn ping(&self, modifiers: SubCommandModifiers) -> Result<(), HiveLibError> { 103 + let host = self.get_preferred_host()?; 104 + 105 + let mut command_string = CommandStringBuilder::new("ssh"); 106 + command_string.arg(format!("{}@{host}", self.user)); 107 + command_string.arg(self.create_ssh_opts(modifiers)?); 108 + command_string.arg("exit"); 109 + 110 + let output = run_command( 111 + &CommandArguments::new(command_string, modifiers) 112 + .log_stdout() 113 + .mode(crate::commands::ChildOutputMode::Interactive), 114 + ) 115 + .await?; 116 + 117 + output.wait_till_success().await.map_err(|source| { 118 + HiveLibError::NetworkError(NetworkError::HostUnreachable { 119 + host: host.to_string(), 120 + source, 121 + }) 122 + })?; 123 + 124 + Ok(()) 125 + } 84 126 } 85 127 86 128 #[cfg(test)] ··· 91 133 user: "root".into(), 92 134 port: 22, 93 135 current_host: 0, 94 - } 95 - } 96 - } 97 - 98 - #[cfg(test)] 99 - impl<'a> Context<'a> { 100 - fn create_test_context( 101 - hive_location: HiveLocation, 102 - name: &'a Name, 103 - node: &'a mut Node, 104 - ) -> Self { 105 - Context { 106 - name, 107 - node, 108 - hive_location: Arc::new(hive_location), 109 - modifiers: SubCommandModifiers::default(), 110 - objective: Objective::Apply(ApplyObjective { 111 - goal: Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), 112 - no_keys: false, 113 - reboot: false, 114 - should_apply_locally: false, 115 - substitute_on_destination: false, 116 - handle_unreachable: HandleUnreachable::default(), 117 - }), 118 - state: StepState::default(), 119 - should_quit: Arc::new(AtomicBool::new(false)), 120 136 } 121 137 } 122 138 } ··· 171 187 pub tags: im::HashSet<String>, 172 188 173 189 #[serde(rename(deserialize = "_keys", serialize = "keys"))] 174 - pub keys: im::Vector<Key>, 190 + pub keys: Vec<Arc<Key>>, 175 191 176 192 #[serde(rename(deserialize = "_hostPlatform", serialize = "host_platform"))] 177 193 pub host_platform: Arc<str>, ··· 180 196 deserialize = "privilegeEscalationCommand", 181 197 serialize = "privilege_escalation_command" 182 198 ))] 183 - pub privilege_escalation_command: im::Vector<Arc<str>>, 199 + pub privilege_escalation_command: Arc<Vec<Arc<str>>>, 184 200 } 185 201 186 202 #[cfg(test)] ··· 188 204 fn default() -> Self { 189 205 Node { 190 206 target: Target::default(), 191 - keys: im::Vector::new(), 207 + keys: Vec::new(), 192 208 tags: im::HashSet::new(), 193 209 privilege_escalation_command: vec!["sudo".into(), "--".into()].into(), 194 210 allow_local_deployment: true, ··· 207 223 ..Default::default() 208 224 } 209 225 } 210 - 211 - /// Tests the connection to a node 212 - pub async fn ping(&self, modifiers: SubCommandModifiers) -> Result<(), HiveLibError> { 213 - let host = self.target.get_preferred_host()?; 214 - 215 - let mut command_string = CommandStringBuilder::new("ssh"); 216 - command_string.arg(format!("{}@{host}", self.target.user)); 217 - command_string.arg(self.target.create_ssh_opts(modifiers)?); 218 - command_string.arg("exit"); 219 - 220 - let output = run_command( 221 - &CommandArguments::new(command_string, modifiers) 222 - .log_stdout() 223 - .mode(crate::commands::ChildOutputMode::Interactive), 224 - ) 225 - .await?; 226 - 227 - output.wait_till_success().await.map_err(|source| { 228 - HiveLibError::NetworkError(NetworkError::HostUnreachable { 229 - host: host.to_string(), 230 - source, 231 - }) 232 - })?; 233 - 234 - Ok(()) 235 - } 236 226 } 237 227 238 228 #[must_use] ··· 255 245 } 256 246 } 257 247 258 - #[derive(derive_more::Display, Debug, Clone, Copy)] 248 + #[derive(derive_more::Display, Debug, Clone, Copy, PartialEq, Eq)] 259 249 pub enum SwitchToConfigurationGoal { 260 250 Switch, 261 251 Boot, ··· 263 253 DryActivate, 264 254 } 265 255 266 - #[derive(derive_more::Display, Clone, Copy)] 267 - pub enum Goal { 256 + #[derive(derive_more::Display, Debug, Clone, Copy)] 257 + pub enum ApplyGoal { 268 258 SwitchToConfiguration(SwitchToConfigurationGoal), 269 259 Build, 270 260 Push, 271 261 Keys, 272 262 } 273 263 274 - // TODO: Get rid of this allow and resolve it 275 - #[allow(clippy::struct_excessive_bools)] 276 - #[derive(Clone, Copy)] 277 - pub struct ApplyObjective { 278 - pub goal: Goal, 279 - pub no_keys: bool, 280 - pub reboot: bool, 281 - pub should_apply_locally: bool, 282 - pub substitute_on_destination: bool, 283 - pub handle_unreachable: HandleUnreachable, 284 - } 285 - 286 - #[derive(Clone, Copy)] 287 - pub enum Objective { 288 - Apply(ApplyObjective), 289 - BuildLocally, 290 - } 291 - 292 264 #[enum_dispatch] 293 265 pub(crate) trait ExecuteStep: Send + Sync + Display + std::fmt::Debug { 294 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError>; 295 - 296 - fn should_execute(&self, context: &Context) -> bool; 266 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError>; 297 267 } 298 268 299 269 // may include other options such as FailAll in the future ··· 313 283 pub key_agent_directory: Option<String>, 314 284 } 315 285 316 - pub struct Context<'a> { 317 - pub name: &'a Name, 318 - pub node: &'a mut Node, 286 + pub struct Context { 319 287 pub hive_location: Arc<HiveLocation>, 320 288 pub modifiers: SubCommandModifiers, 321 289 pub state: StepState, 322 290 pub should_quit: Arc<AtomicBool>, 323 - pub objective: Objective, 291 + pub name: Name, 324 292 } 325 293 326 294 #[enum_dispatch(ExecuteStep)] 327 - #[derive(Debug, PartialEq)] 328 - enum Step { 295 + #[derive(Debug)] 296 + #[cfg_attr(test, derive(PartialEq))] 297 + pub enum Step { 329 298 Ping, 330 299 PushKeyAgent, 331 300 Keys, ··· 334 303 Build, 335 304 PushBuildOutput, 336 305 SwitchToConfiguration, 337 - CleanUp, 338 306 } 339 307 340 308 impl Display for Step { ··· 348 316 Self::Build(step) => step.fmt(f), 349 317 Self::PushBuildOutput(step) => step.fmt(f), 350 318 Self::SwitchToConfiguration(step) => step.fmt(f), 351 - Self::CleanUp(step) => step.fmt(f), 352 319 } 353 320 } 354 321 } 355 322 356 - pub struct GoalExecutor<'a> { 357 - steps: Vec<Step>, 358 - context: Context<'a>, 359 - } 360 - 361 - /// returns Err if the application should shut down. 362 - fn app_shutdown_guard(context: &Context) -> Result<(), HiveLibError> { 363 - if context 364 - .should_quit 365 - .load(std::sync::atomic::Ordering::Relaxed) 366 - { 367 - return Err(HiveLibError::Sigint); 368 - } 369 - 370 - Ok(()) 371 - } 372 - 373 - impl<'a> GoalExecutor<'a> { 374 - #[must_use] 375 - pub fn new(context: Context<'a>) -> Self { 376 - Self { 377 - steps: vec![ 378 - Step::Ping(Ping), 379 - Step::PushKeyAgent(PushKeyAgent), 380 - Step::Keys(Keys { 381 - filter: UploadKeyAt::NoFilter, 382 - }), 383 - Step::Keys(Keys { 384 - filter: UploadKeyAt::PreActivation, 385 - }), 386 - Step::Evaluate(super::steps::evaluate::Evaluate), 387 - Step::PushEvaluatedOutput(super::steps::push::PushEvaluatedOutput), 388 - Step::Build(super::steps::build::Build), 389 - Step::PushBuildOutput(super::steps::push::PushBuildOutput), 390 - Step::SwitchToConfiguration(SwitchToConfiguration), 391 - Step::Keys(Keys { 392 - filter: UploadKeyAt::PostActivation, 393 - }), 394 - ], 395 - context, 396 - } 397 - } 398 - 399 - #[instrument(skip_all, name = "eval")] 400 - async fn evaluate_task( 401 - tx: oneshot::Sender<Result<Derivation, HiveLibError>>, 402 - hive_location: Arc<HiveLocation>, 403 - name: Name, 404 - modifiers: SubCommandModifiers, 405 - ) { 406 - let output = 407 - evaluate_hive_attribute(&hive_location, &EvalGoal::GetTopLevel(&name), modifiers) 408 - .await 409 - .map(|output| { 410 - serde_json::from_str::<Derivation>(&output).expect("failed to parse derivation") 411 - }); 412 - 413 - debug!(output = ?output, done = true); 414 - 415 - let _ = tx.send(output); 416 - } 417 - 418 - #[instrument(skip_all, fields(node = %self.context.name))] 419 - pub async fn execute(mut self) -> Result<(), HiveLibError> { 420 - app_shutdown_guard(&self.context)?; 421 - 422 - let (tx, rx) = oneshot::channel(); 423 - self.context.state.evaluation_rx = Some(rx); 424 - 425 - // The name of this span should never be changed without updating 426 - // `wire/cli/tracing_setup.rs` 427 - debug_assert_matches!(Span::current().metadata().unwrap().name(), "execute"); 428 - // This span should always have a `node` field by the same file 429 - debug_assert!( 430 - Span::current() 431 - .metadata() 432 - .unwrap() 433 - .fields() 434 - .field("node") 435 - .is_some() 436 - ); 437 - 438 - let spawn_evaluator = match self.context.objective { 439 - Objective::Apply(apply_objective) => !matches!(apply_objective.goal, Goal::Keys), 440 - Objective::BuildLocally => true, 441 - }; 442 - 443 - if spawn_evaluator { 444 - tokio::spawn( 445 - GoalExecutor::evaluate_task( 446 - tx, 447 - self.context.hive_location.clone(), 448 - self.context.name.clone(), 449 - self.context.modifiers, 450 - ) 451 - .in_current_span(), 452 - ); 453 - } 454 - 455 - let steps = self 456 - .steps 457 - .iter() 458 - .filter(|step| step.should_execute(&self.context)) 459 - .inspect(|step| { 460 - trace!("Will execute step `{step}` for {}", self.context.name); 461 - }) 462 - .collect::<Vec<_>>(); 463 - let length = steps.len(); 464 - 465 - for (position, step) in steps.iter().enumerate() { 466 - app_shutdown_guard(&self.context)?; 467 - 468 - event!( 469 - Level::INFO, 470 - step = step.to_string(), 471 - progress = format!("{}/{length}", position + 1) 472 - ); 473 - 474 - STATUS 475 - .lock() 476 - .set_node_step(self.context.name, step.to_string()); 477 - 478 - if let Err(err) = step.execute(&mut self.context).await.inspect_err(|_| { 479 - error!("Failed to execute `{step}`"); 480 - }) { 481 - // discard error from cleanup 482 - let _ = CleanUp.execute(&mut self.context).await; 483 - 484 - if let Objective::Apply(apply_objective) = self.context.objective 485 - && matches!(step, Step::Ping(..)) 486 - && matches!( 487 - apply_objective.handle_unreachable, 488 - HandleUnreachable::Ignore, 489 - ) 490 - { 491 - return Ok(()); 492 - } 493 - 494 - STATUS.lock().mark_node_failed(self.context.name); 495 - 496 - return Err(err); 497 - } 498 - } 499 - 500 - STATUS.lock().mark_node_succeeded(self.context.name); 501 - 502 - Ok(()) 503 - } 504 - } 505 - 506 323 #[cfg(test)] 507 324 mod tests { 508 325 use rand::distr::Alphabetic; 509 326 510 327 use super::*; 511 - use crate::{ 512 - function_name, get_test_path, 513 - hive::{Hive, get_hive_location}, 514 - location, 515 - }; 516 - use std::{assert_matches::assert_matches, path::PathBuf}; 517 - use std::{collections::HashMap, env}; 518 - 519 - fn get_steps(goal_executor: GoalExecutor) -> std::vec::Vec<Step> { 520 - goal_executor 521 - .steps 522 - .into_iter() 523 - .filter(|step| step.should_execute(&goal_executor.context)) 524 - .collect::<Vec<_>>() 525 - } 526 - 527 - #[tokio::test] 528 - #[cfg_attr(feature = "no_web_tests", ignore)] 529 - async fn default_values_match() { 530 - let mut path = get_test_path!(); 531 - 532 - let location = 533 - get_hive_location(path.display().to_string(), SubCommandModifiers::default()) 534 - .await 535 - .unwrap(); 536 - let hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) 537 - .await 538 - .unwrap(); 539 - 540 - let node = Node::default(); 541 - 542 - let mut nodes = HashMap::new(); 543 - nodes.insert(Name("NAME".into()), node); 544 - 545 - path.push("hive.nix"); 546 - 547 - assert_eq!( 548 - hive, 549 - Hive { 550 - nodes, 551 - schema: Hive::SCHEMA_VERSION 552 - } 553 - ); 554 - } 555 - 556 - #[tokio::test] 557 - async fn order_build_locally() { 558 - let location = location!(get_test_path!()); 559 - let mut node = Node { 560 - build_remotely: false, 561 - ..Default::default() 562 - }; 563 - let name = &Name(function_name!().into()); 564 - let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); 565 - let steps = get_steps(executor); 566 - 567 - assert_eq!( 568 - steps, 569 - vec![ 570 - Ping.into(), 571 - PushKeyAgent.into(), 572 - Keys { 573 - filter: UploadKeyAt::PreActivation 574 - } 575 - .into(), 576 - crate::hive::steps::evaluate::Evaluate.into(), 577 - crate::hive::steps::build::Build.into(), 578 - crate::hive::steps::push::PushBuildOutput.into(), 579 - SwitchToConfiguration.into(), 580 - Keys { 581 - filter: UploadKeyAt::PostActivation 582 - } 583 - .into(), 584 - ] 585 - ); 586 - } 587 - 588 - #[tokio::test] 589 - async fn order_keys_only() { 590 - let location = location!(get_test_path!()); 591 - let mut node = Node::default(); 592 - let name = &Name(function_name!().into()); 593 - let mut context = Context::create_test_context(location, name, &mut node); 594 - 595 - let Objective::Apply(ref mut apply_objective) = context.objective else { 596 - unreachable!() 597 - }; 598 - 599 - apply_objective.goal = Goal::Keys; 600 - 601 - let executor = GoalExecutor::new(context); 602 - let steps = get_steps(executor); 603 - 604 - assert_eq!( 605 - steps, 606 - vec![ 607 - Ping.into(), 608 - PushKeyAgent.into(), 609 - Keys { 610 - filter: UploadKeyAt::NoFilter 611 - } 612 - .into(), 613 - ] 614 - ); 615 - } 616 - 617 - #[tokio::test] 618 - async fn order_build() { 619 - let location = location!(get_test_path!()); 620 - let mut node = Node::default(); 621 - let name = &Name(function_name!().into()); 622 - let mut context = Context::create_test_context(location, name, &mut node); 623 - 624 - let Objective::Apply(ref mut apply_objective) = context.objective else { 625 - unreachable!() 626 - }; 627 - apply_objective.goal = Goal::Build; 628 - 629 - let executor = GoalExecutor::new(context); 630 - let steps = get_steps(executor); 631 - 632 - assert_eq!( 633 - steps, 634 - vec![ 635 - Ping.into(), 636 - crate::hive::steps::evaluate::Evaluate.into(), 637 - crate::hive::steps::build::Build.into(), 638 - crate::hive::steps::push::PushBuildOutput.into(), 639 - ] 640 - ); 641 - } 642 - 643 - #[tokio::test] 644 - async fn order_push_only() { 645 - let location = location!(get_test_path!()); 646 - let mut node = Node::default(); 647 - let name = &Name(function_name!().into()); 648 - let mut context = Context::create_test_context(location, name, &mut node); 649 - 650 - let Objective::Apply(ref mut apply_objective) = context.objective else { 651 - unreachable!() 652 - }; 653 - apply_objective.goal = Goal::Push; 654 - 655 - let executor = GoalExecutor::new(context); 656 - let steps = get_steps(executor); 657 - 658 - assert_eq!( 659 - steps, 660 - vec![ 661 - Ping.into(), 662 - crate::hive::steps::evaluate::Evaluate.into(), 663 - crate::hive::steps::push::PushEvaluatedOutput.into(), 664 - ] 665 - ); 666 - } 667 - 668 - #[tokio::test] 669 - async fn order_remote_build() { 670 - let location = location!(get_test_path!()); 671 - let mut node = Node { 672 - build_remotely: true, 673 - ..Default::default() 674 - }; 675 - 676 - let name = &Name(function_name!().into()); 677 - let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); 678 - let steps = get_steps(executor); 679 - 680 - assert_eq!( 681 - steps, 682 - vec![ 683 - Ping.into(), 684 - PushKeyAgent.into(), 685 - Keys { 686 - filter: UploadKeyAt::PreActivation 687 - } 688 - .into(), 689 - crate::hive::steps::evaluate::Evaluate.into(), 690 - crate::hive::steps::push::PushEvaluatedOutput.into(), 691 - crate::hive::steps::build::Build.into(), 692 - SwitchToConfiguration.into(), 693 - Keys { 694 - filter: UploadKeyAt::PostActivation 695 - } 696 - .into(), 697 - ] 698 - ); 699 - } 700 - 701 - #[tokio::test] 702 - async fn order_nokeys() { 703 - let location = location!(get_test_path!()); 704 - let mut node = Node::default(); 705 - 706 - let name = &Name(function_name!().into()); 707 - let mut context = Context::create_test_context(location, name, &mut node); 708 - 709 - let Objective::Apply(ref mut apply_objective) = context.objective else { 710 - unreachable!() 711 - }; 712 - apply_objective.no_keys = true; 713 - 714 - let executor = GoalExecutor::new(context); 715 - let steps = get_steps(executor); 716 - 717 - assert_eq!( 718 - steps, 719 - vec![ 720 - Ping.into(), 721 - crate::hive::steps::evaluate::Evaluate.into(), 722 - crate::hive::steps::build::Build.into(), 723 - crate::hive::steps::push::PushBuildOutput.into(), 724 - SwitchToConfiguration.into(), 725 - ] 726 - ); 727 - } 728 - 729 - #[tokio::test] 730 - async fn order_should_apply_locally() { 731 - let location = location!(get_test_path!()); 732 - let mut node = Node::default(); 733 - 734 - let name = &Name(function_name!().into()); 735 - let mut context = Context::create_test_context(location, name, &mut node); 736 - 737 - let Objective::Apply(ref mut apply_objective) = context.objective else { 738 - unreachable!() 739 - }; 740 - apply_objective.no_keys = true; 741 - apply_objective.should_apply_locally = true; 742 - 743 - let executor = GoalExecutor::new(context); 744 - let steps = get_steps(executor); 745 - 746 - assert_eq!( 747 - steps, 748 - vec![ 749 - crate::hive::steps::evaluate::Evaluate.into(), 750 - crate::hive::steps::build::Build.into(), 751 - SwitchToConfiguration.into(), 752 - ] 753 - ); 754 - } 755 - 756 - #[tokio::test] 757 - async fn order_build_only() { 758 - let location = location!(get_test_path!()); 759 - let mut node = Node::default(); 760 - 761 - let name = &Name(function_name!().into()); 762 - let mut context = Context::create_test_context(location, name, &mut node); 763 - 764 - context.objective = Objective::BuildLocally; 765 - 766 - let executor = GoalExecutor::new(context); 767 - let steps = get_steps(executor); 768 - 769 - assert_eq!( 770 - steps, 771 - vec![ 772 - crate::hive::steps::evaluate::Evaluate.into(), 773 - crate::hive::steps::build::Build.into() 774 - ] 775 - ); 776 - } 777 - 778 - #[test] 779 - fn target_fails_increments() { 780 - let mut target = Target::from_host("localhost"); 781 - 782 - assert_eq!(target.current_host, 0); 783 - 784 - for i in 0..100 { 785 - target.host_failed(); 786 - assert_eq!(target.current_host, i + 1); 787 - } 788 - } 789 - 790 - #[test] 791 - fn get_preferred_host_fails() { 792 - let mut target = Target { 793 - hosts: vec![ 794 - "un.reachable.1".into(), 795 - "un.reachable.2".into(), 796 - "un.reachable.3".into(), 797 - "un.reachable.4".into(), 798 - "un.reachable.5".into(), 799 - ], 800 - ..Default::default() 801 - }; 802 - 803 - assert_ne!( 804 - target.get_preferred_host().unwrap().to_string(), 805 - "un.reachable.5" 806 - ); 807 - 808 - for i in 1..=5 { 809 - assert_eq!( 810 - target.get_preferred_host().unwrap().to_string(), 811 - format!("un.reachable.{i}") 812 - ); 813 - target.host_failed(); 814 - } 815 - 816 - for _ in 0..5 { 817 - assert_matches!( 818 - target.get_preferred_host(), 819 - Err(HiveLibError::NetworkError(NetworkError::HostsExhausted)) 820 - ); 821 - } 822 - } 328 + use std::{assert_matches::assert_matches, env}; 823 329 824 330 #[test] 825 331 fn test_ssh_opts() { ··· 850 356 "KbdInteractiveAuthentication=no".to_string(), 851 357 ]; 852 358 853 - assert_eq!( 854 - target.create_ssh_args(subcommand_modifiers, false).unwrap(), 855 - args 856 - ); 359 + assert_eq!(target.create_ssh_args(subcommand_modifiers).unwrap(), args); 857 360 assert_eq!( 858 361 target.create_ssh_opts(subcommand_modifiers).unwrap(), 859 362 args.join(" ") 860 363 ); 861 364 862 365 assert_eq!( 863 - target.create_ssh_args(subcommand_modifiers, false).unwrap(), 366 + target.create_ssh_args(subcommand_modifiers).unwrap(), 864 367 [ 865 368 "-l".to_string(), 866 369 target.user.to_string(), ··· 876 379 ); 877 380 878 381 assert_eq!( 879 - target.create_ssh_args(subcommand_modifiers, true).unwrap(), 382 + target.create_ssh_args(subcommand_modifiers).unwrap(), 880 383 [ 881 384 "-l".to_string(), 882 385 target.user.to_string(), ··· 893 396 894 397 // forced non interactive is the same as --non-interactive 895 398 assert_eq!( 896 - target.create_ssh_args(subcommand_modifiers, true).unwrap(), 399 + target.create_ssh_args(subcommand_modifiers).unwrap(), 897 400 target 898 - .create_ssh_args( 899 - SubCommandModifiers { 900 - non_interactive: true, 901 - ..Default::default() 902 - }, 903 - false 904 - ) 401 + .create_ssh_args(SubCommandModifiers { 402 + non_interactive: true, 403 + ..Default::default() 404 + }) 905 405 .unwrap() 906 406 ); 907 407 } 908 408 909 - #[tokio::test] 910 - async fn context_quits_sigint() { 911 - let location = location!(get_test_path!()); 912 - let mut node = Node::default(); 409 + #[test] 410 + fn target_fails_increments() { 411 + let mut target = Target::from_host("localhost"); 412 + 413 + assert_eq!(target.current_host, 0); 414 + 415 + for i in 0..100 { 416 + target.host_failed(); 417 + assert_eq!(target.current_host, i + 1); 418 + } 419 + } 420 + 421 + #[test] 422 + fn get_preferred_host_fails() { 423 + let mut target = Target { 424 + hosts: vec![ 425 + "un.reachable.1".into(), 426 + "un.reachable.2".into(), 427 + "un.reachable.3".into(), 428 + "un.reachable.4".into(), 429 + "un.reachable.5".into(), 430 + ], 431 + ..Default::default() 432 + }; 433 + 434 + assert_ne!( 435 + target.get_preferred_host().unwrap().to_string(), 436 + "un.reachable.5" 437 + ); 913 438 914 - let name = &Name(function_name!().into()); 915 - let context = Context::create_test_context(location, name, &mut node); 916 - context 917 - .should_quit 918 - .store(true, std::sync::atomic::Ordering::Relaxed); 919 - let executor = GoalExecutor::new(context); 920 - let status = executor.execute().await; 439 + for i in 1..=5 { 440 + assert_eq!( 441 + target.get_preferred_host().unwrap().to_string(), 442 + format!("un.reachable.{i}") 443 + ); 444 + target.host_failed(); 445 + } 921 446 922 - assert_matches!(status, Err(HiveLibError::Sigint)); 447 + for _ in 0..5 { 448 + assert_matches!( 449 + target.get_preferred_host(), 450 + Err(HiveLibError::NetworkError(NetworkError::HostsExhausted)) 451 + ); 452 + } 923 453 } 924 454 }
+735
crates/core/src/hive/plan.rs
··· 1 + use std::sync::{Arc, atomic::AtomicBool}; 2 + 3 + use tokio::sync::RwLock; 4 + 5 + use crate::{ 6 + SubCommandModifiers, 7 + hive::{ 8 + HiveLocation, 9 + node::{ 10 + ApplyGoal, Context, HandleUnreachable, Name, Node, SharedTarget, Step, StepState, 11 + SwitchToConfigurationGoal, 12 + }, 13 + steps::{ 14 + activate::SwitchToConfiguration, 15 + build::Build, 16 + evaluate::Evaluate, 17 + keys::{Keys, PushKeyAgent, UploadKeyAt}, 18 + ping::Ping, 19 + push::{PushBuildOutput, PushEvaluatedOutput}, 20 + }, 21 + }, 22 + }; 23 + 24 + pub struct NodePlan { 25 + pub context: Context, 26 + pub steps: Vec<Step>, 27 + pub greedy_evaluate: bool, 28 + pub ignore_failed_ping: bool, 29 + } 30 + 31 + #[allow(clippy::struct_excessive_bools)] 32 + pub struct ApplyGoalArgs { 33 + pub goal: ApplyGoal, 34 + pub should_apply_locally: bool, 35 + pub no_keys: bool, 36 + pub substitute_on_destination: bool, 37 + pub reboot: bool, 38 + pub host_platform: Arc<str>, 39 + pub handle_unreachable: HandleUnreachable, 40 + } 41 + 42 + pub enum Goal { 43 + Apply(ApplyGoalArgs), 44 + Build, 45 + } 46 + 47 + fn apply_plan_keys( 48 + args: &ApplyGoalArgs, 49 + node: &Node, 50 + target: &SharedTarget, 51 + ) -> (Vec<Step>, Vec<Step>) { 52 + let ApplyGoalArgs { 53 + goal, 54 + substitute_on_destination, 55 + should_apply_locally, 56 + host_platform, 57 + .. 58 + } = args; 59 + let mut front_steps = Vec::new(); 60 + let mut end_steps = Vec::new(); 61 + 62 + let (pre_keys, post_keys) = match goal { 63 + ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) => node 64 + .keys 65 + .clone() 66 + .into_iter() 67 + .partition(|x| matches!(x.upload_at, UploadKeyAt::PreActivation)), 68 + ApplyGoal::Keys => (node.keys.clone(), Vec::new()), 69 + ApplyGoal::Build | ApplyGoal::Push | ApplyGoal::SwitchToConfiguration(_) => { 70 + unreachable!("apply_plan_keys called with non-key goal: {:?}", goal) 71 + } 72 + }; 73 + 74 + // only push key agent if there are any keys at all 75 + if !pre_keys.is_empty() || !post_keys.is_empty() { 76 + front_steps.push(Step::PushKeyAgent(PushKeyAgent { 77 + substitute_on_destination: *substitute_on_destination, 78 + host_platform: host_platform.clone(), 79 + target: if *should_apply_locally { 80 + None 81 + } else { 82 + Some(target.clone()) 83 + }, 84 + })); 85 + } 86 + 87 + if !pre_keys.is_empty() { 88 + front_steps.push(Step::Keys(Keys { 89 + keys: pre_keys, 90 + target: if *should_apply_locally { 91 + None 92 + } else { 93 + Some(target.clone()) 94 + }, 95 + privilege_escalation_command: node.privilege_escalation_command.clone(), 96 + })); 97 + } 98 + 99 + if !post_keys.is_empty() { 100 + end_steps.push(Step::Keys(Keys { 101 + keys: post_keys, 102 + target: if *should_apply_locally { 103 + None 104 + } else { 105 + Some(target.clone()) 106 + }, 107 + privilege_escalation_command: node.privilege_escalation_command.clone(), 108 + })); 109 + } 110 + 111 + (front_steps, end_steps) 112 + } 113 + 114 + fn apply_plan( 115 + args: &ApplyGoalArgs, 116 + node: &Node, 117 + name: &Name, 118 + modifiers: SubCommandModifiers, 119 + hive_location: Arc<HiveLocation>, 120 + should_quit: Arc<AtomicBool>, 121 + ) -> NodePlan { 122 + let ApplyGoalArgs { 123 + goal, 124 + should_apply_locally, 125 + no_keys, 126 + substitute_on_destination, 127 + reboot, 128 + handle_unreachable, 129 + .. 130 + } = args; 131 + 132 + let mut steps: Vec<Step> = Vec::new(); 133 + let mut end: Vec<Step> = Vec::new(); 134 + let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); 135 + 136 + if !*should_apply_locally { 137 + steps.push(Step::Ping(Ping { 138 + target: target.clone(), 139 + })); 140 + } 141 + 142 + if !*no_keys 143 + && matches!( 144 + &goal, 145 + ApplyGoal::Keys | ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) 146 + ) 147 + { 148 + let (pre, post) = apply_plan_keys(args, node, &target); 149 + steps.extend(pre); 150 + end.extend(post); 151 + } 152 + 153 + if !matches!(goal, ApplyGoal::Keys) { 154 + steps.push(Step::Evaluate(Evaluate)); 155 + } 156 + 157 + if !matches!(goal, ApplyGoal::Keys) 158 + && !should_apply_locally 159 + && (node.build_remotely || matches!(goal, ApplyGoal::Push)) 160 + { 161 + steps.push(Step::PushEvaluatedOutput(PushEvaluatedOutput { 162 + substitute_on_destination: *substitute_on_destination, 163 + target: target.clone(), 164 + })); 165 + } 166 + 167 + if !matches!(goal, ApplyGoal::Keys | ApplyGoal::Push) { 168 + steps.push(Step::Build(Build { 169 + target: if node.build_remotely && !*should_apply_locally { 170 + Some(target.clone()) 171 + } else { 172 + None 173 + }, 174 + })); 175 + } 176 + 177 + if !node.build_remotely 178 + && !should_apply_locally 179 + && !matches!(goal, ApplyGoal::Keys | ApplyGoal::Push) 180 + { 181 + steps.push(Step::PushBuildOutput(PushBuildOutput { 182 + substitute_on_destination: *substitute_on_destination, 183 + target: target.clone(), 184 + })); 185 + } 186 + 187 + if let ApplyGoal::SwitchToConfiguration(goal) = goal { 188 + steps.push(Step::SwitchToConfiguration(SwitchToConfiguration { 189 + goal: *goal, 190 + reboot: *reboot, 191 + target: if *should_apply_locally { 192 + None 193 + } else { 194 + Some(target.clone()) 195 + }, 196 + privilege_escalation_command: node.privilege_escalation_command.clone(), 197 + })); 198 + } 199 + 200 + steps.extend(end); 201 + 202 + NodePlan { 203 + context: Context { 204 + state: StepState::default(), 205 + name: name.clone(), 206 + hive_location, 207 + modifiers, 208 + should_quit, 209 + }, 210 + steps, 211 + greedy_evaluate: !matches!(&goal, ApplyGoal::Keys), 212 + ignore_failed_ping: matches!(handle_unreachable, HandleUnreachable::Ignore), 213 + } 214 + } 215 + 216 + #[allow(clippy::too_many_lines)] 217 + pub fn plan_for_node( 218 + node: &Node, 219 + name: Name, 220 + goal: &'_ Goal, 221 + hive_location: Arc<HiveLocation>, 222 + modifiers: &SubCommandModifiers, 223 + should_quit: Arc<AtomicBool>, 224 + ) -> NodePlan { 225 + match goal { 226 + Goal::Build => NodePlan { 227 + context: Context { 228 + state: StepState::default(), 229 + modifiers: *modifiers, 230 + hive_location, 231 + should_quit, 232 + name, 233 + }, 234 + steps: vec![ 235 + Step::Evaluate(Evaluate), 236 + Step::Build(Build { target: None }), 237 + ], 238 + greedy_evaluate: true, 239 + ignore_failed_ping: false, 240 + }, 241 + Goal::Apply(args) => apply_plan(args, node, &name, *modifiers, hive_location, should_quit), 242 + } 243 + } 244 + 245 + #[cfg(test)] 246 + mod tests { 247 + use tokio::sync::RwLock; 248 + 249 + use crate::{ 250 + SubCommandModifiers, function_name, get_test_path, 251 + hive::{ 252 + node::{ 253 + ApplyGoal, HandleUnreachable, Name, Node, SharedTarget, Step, 254 + SwitchToConfigurationGoal, 255 + }, 256 + plan::{ApplyGoalArgs, Goal, plan_for_node}, 257 + steps::{ 258 + activate::SwitchToConfiguration, 259 + build::Build, 260 + evaluate::Evaluate, 261 + keys::{Key, Keys, PushKeyAgent, Source, UploadKeyAt}, 262 + ping::Ping, 263 + push::PushEvaluatedOutput, 264 + }, 265 + }, 266 + location, 267 + }; 268 + use std::path::PathBuf; 269 + use std::{ 270 + env, 271 + sync::{Arc, atomic::AtomicBool}, 272 + }; 273 + 274 + fn new_key(upload_at: &UploadKeyAt) -> Key { 275 + Key { 276 + upload_at: upload_at.clone(), 277 + source: Source::String(match upload_at { 278 + UploadKeyAt::PreActivation => "pre".into(), 279 + UploadKeyAt::PostActivation => "post".into(), 280 + UploadKeyAt::NoFilter => "none".into(), 281 + }), 282 + ..Default::default() 283 + } 284 + } 285 + 286 + #[tokio::test] 287 + async fn order_build() { 288 + let location = location!(get_test_path!()); 289 + let node = Node { 290 + build_remotely: false, 291 + ..Default::default() 292 + }; 293 + let name = &Name(function_name!().into()); 294 + let should_quit = Arc::new(AtomicBool::new(false)); 295 + let plan = plan_for_node( 296 + &node, 297 + name.clone(), 298 + &Goal::Build, 299 + location.clone().into(), 300 + &SubCommandModifiers::default(), 301 + should_quit.clone(), 302 + ); 303 + 304 + assert_eq!( 305 + plan.steps, 306 + vec![ 307 + Evaluate.into(), 308 + Build { target: None }.into() // TODO: this was previously used in an old test, may lose 309 + // coverage by deleting it. 310 + // Ping { }.into(), 311 + // PushKeyAgent { host_platform: "x86_64-linux".into(), substitute_on_destination: true, target: Target::default() }.into(), 312 + // Keys { .. }.into(), 313 + // crate::hive::steps::evaluate::Evaluate.into(), 314 + // crate::hive::steps::build::Build { .. }.into(), 315 + // crate::hive::steps::push::PushBuildOutput { .. }.into(), 316 + // SwitchToConfiguration { .. }.into(), 317 + // Keys { 318 + // filter: UploadKeyAt::PostActivation 319 + // } 320 + // .into(), 321 + ] 322 + ); 323 + } 324 + 325 + #[tokio::test] 326 + async fn order_apply_build() { 327 + let location = location!(get_test_path!()); 328 + let node = Node { 329 + build_remotely: true, 330 + ..Default::default() 331 + }; 332 + let name = &Name(function_name!().into()); 333 + let should_quit = Arc::new(AtomicBool::new(false)); 334 + let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); 335 + let plan = plan_for_node( 336 + &node, 337 + name.clone(), 338 + &Goal::Apply(ApplyGoalArgs { 339 + goal: ApplyGoal::Build, 340 + should_apply_locally: false, 341 + no_keys: true, 342 + substitute_on_destination: true, 343 + reboot: false, 344 + host_platform: "x86_64-linux".into(), 345 + handle_unreachable: HandleUnreachable::default(), 346 + }), 347 + location.clone().into(), 348 + &SubCommandModifiers::default(), 349 + should_quit.clone(), 350 + ); 351 + 352 + assert_eq!( 353 + plan.steps, 354 + vec![ 355 + Ping { 356 + target: target.clone() 357 + } 358 + .into(), 359 + crate::hive::steps::evaluate::Evaluate.into(), 360 + crate::hive::steps::push::PushEvaluatedOutput { 361 + substitute_on_destination: true, 362 + target: target.clone() 363 + } 364 + .into(), 365 + crate::hive::steps::build::Build { 366 + target: Some(target.clone()) 367 + } 368 + .into(), 369 + ] 370 + ); 371 + 372 + let node = Node { 373 + build_remotely: false, 374 + ..Default::default() 375 + }; 376 + let plan = plan_for_node( 377 + &node, 378 + name.clone(), 379 + &Goal::Apply(ApplyGoalArgs { 380 + goal: ApplyGoal::Build, 381 + should_apply_locally: false, 382 + no_keys: true, 383 + substitute_on_destination: true, 384 + reboot: false, 385 + host_platform: "x86_64-linux".into(), 386 + handle_unreachable: HandleUnreachable::default(), 387 + }), 388 + location.clone().into(), 389 + &SubCommandModifiers::default(), 390 + should_quit.clone(), 391 + ); 392 + 393 + assert_eq!( 394 + plan.steps, 395 + vec![ 396 + Ping { 397 + target: target.clone() 398 + } 399 + .into(), 400 + crate::hive::steps::evaluate::Evaluate.into(), 401 + crate::hive::steps::build::Build { target: None }.into(), 402 + crate::hive::steps::push::PushBuildOutput { 403 + substitute_on_destination: true, 404 + target: target.clone() 405 + } 406 + .into(), 407 + ] 408 + ); 409 + } 410 + 411 + #[tokio::test] 412 + async fn order_keys_only() { 413 + let location = location!(get_test_path!()); 414 + let node = Node { 415 + keys: vec![ 416 + new_key(&UploadKeyAt::PreActivation).into(), 417 + new_key(&UploadKeyAt::PostActivation).into(), 418 + new_key(&UploadKeyAt::PreActivation).into(), 419 + new_key(&UploadKeyAt::PostActivation).into(), 420 + ], 421 + ..Default::default() 422 + }; 423 + let name = &Name(function_name!().into()); 424 + let should_quit = Arc::new(AtomicBool::new(false)); 425 + let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); 426 + let plan_apply_keys = plan_for_node( 427 + &node.clone(), 428 + name.clone(), 429 + &Goal::Apply(ApplyGoalArgs { 430 + goal: ApplyGoal::Keys, 431 + should_apply_locally: false, 432 + no_keys: false, 433 + substitute_on_destination: true, 434 + reboot: false, 435 + host_platform: "x86_64-linux".into(), 436 + handle_unreachable: HandleUnreachable::default(), 437 + }), 438 + location.clone().into(), 439 + &SubCommandModifiers::default(), 440 + should_quit.clone(), 441 + ); 442 + 443 + assert_eq!( 444 + plan_apply_keys.steps, 445 + vec![ 446 + Ping { 447 + target: target.clone() 448 + } 449 + .into(), 450 + PushKeyAgent { 451 + substitute_on_destination: true, 452 + target: Some(target.clone()), 453 + host_platform: node.host_platform.clone() 454 + } 455 + .into(), 456 + Keys { 457 + target: Some(target.clone()), 458 + // test that all keys are included 459 + keys: node.keys.clone(), 460 + privilege_escalation_command: node.privilege_escalation_command.clone() 461 + } 462 + .into(), 463 + ] 464 + ); 465 + } 466 + 467 + #[tokio::test] 468 + async fn order_key_split() { 469 + let location = location!(get_test_path!()); 470 + let node = Node { 471 + keys: vec![ 472 + new_key(&UploadKeyAt::PreActivation).into(), 473 + new_key(&UploadKeyAt::PostActivation).into(), 474 + new_key(&UploadKeyAt::PreActivation).into(), 475 + new_key(&UploadKeyAt::PostActivation).into(), 476 + ], 477 + ..Default::default() 478 + }; 479 + let name = &Name(function_name!().into()); 480 + let should_quit = Arc::new(AtomicBool::new(false)); 481 + 482 + // Test that keys are split by their `upload_at`, also tests that key 483 + // step's `target` abides by should_apply_locally 484 + let plan_activate_with_keys = plan_for_node( 485 + &node, 486 + name.clone(), 487 + &Goal::Apply(ApplyGoalArgs { 488 + goal: ApplyGoal::SwitchToConfiguration( 489 + crate::hive::node::SwitchToConfigurationGoal::Switch, 490 + ), 491 + should_apply_locally: true, 492 + no_keys: false, 493 + substitute_on_destination: true, 494 + reboot: false, 495 + host_platform: "x86_64-linux".into(), 496 + handle_unreachable: HandleUnreachable::default(), 497 + }), 498 + location.clone().into(), 499 + &SubCommandModifiers::default(), 500 + should_quit.clone(), 501 + ); 502 + 503 + assert_eq!( 504 + plan_activate_with_keys 505 + .steps 506 + .into_iter() 507 + .filter(|x| matches!( 508 + x, 509 + Step::Keys(Keys { .. }) | Step::PushKeyAgent(PushKeyAgent { .. }) 510 + )) 511 + .collect::<Vec<Step>>(), 512 + vec![ 513 + PushKeyAgent { 514 + substitute_on_destination: true, 515 + target: None, 516 + host_platform: node.host_platform.clone() 517 + } 518 + .into(), 519 + Keys { 520 + target: None, 521 + keys: node 522 + .keys 523 + .iter() 524 + .filter(|key| matches!(key.upload_at, UploadKeyAt::PreActivation)) 525 + .cloned() 526 + .collect::<Vec<_>>(), 527 + privilege_escalation_command: node.privilege_escalation_command.clone() 528 + } 529 + .into(), 530 + Keys { 531 + target: None, 532 + keys: node 533 + .keys 534 + .iter() 535 + .filter(|key| matches!(key.upload_at, UploadKeyAt::PostActivation)) 536 + .cloned() 537 + .collect::<Vec<_>>(), 538 + privilege_escalation_command: node.privilege_escalation_command.clone() 539 + } 540 + .into(), 541 + ] 542 + ); 543 + } 544 + 545 + #[tokio::test] 546 + async fn order_push_only() { 547 + let location = location!(get_test_path!()); 548 + let node = Node::default(); 549 + let name = &Name(function_name!().into()); 550 + let should_quit = Arc::new(AtomicBool::new(false)); 551 + let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); 552 + let plan = plan_for_node( 553 + &node.clone(), 554 + name.clone(), 555 + &Goal::Apply(ApplyGoalArgs { 556 + goal: ApplyGoal::Push, 557 + should_apply_locally: false, 558 + no_keys: false, 559 + substitute_on_destination: true, 560 + reboot: false, 561 + host_platform: "x86_64-linux".into(), 562 + handle_unreachable: HandleUnreachable::default(), 563 + }), 564 + location.clone().into(), 565 + &SubCommandModifiers::default(), 566 + should_quit.clone(), 567 + ); 568 + 569 + assert_eq!( 570 + plan.steps, 571 + vec![ 572 + Ping { 573 + target: target.clone() 574 + } 575 + .into(), 576 + Evaluate.into(), 577 + PushEvaluatedOutput { 578 + substitute_on_destination: true, 579 + target: target.clone() 580 + } 581 + .into() 582 + ] 583 + ); 584 + } 585 + 586 + #[tokio::test] 587 + async fn order_remote_build() { 588 + let location = location!(get_test_path!()); 589 + let node = Node { 590 + build_remotely: true, 591 + ..Default::default() 592 + }; 593 + let name = &Name(function_name!().into()); 594 + let should_quit = Arc::new(AtomicBool::new(false)); 595 + let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); 596 + let plan = plan_for_node( 597 + &node.clone(), 598 + name.clone(), 599 + &Goal::Apply(ApplyGoalArgs { 600 + goal: ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), 601 + should_apply_locally: false, 602 + no_keys: false, 603 + substitute_on_destination: true, 604 + reboot: false, 605 + host_platform: "x86_64-linux".into(), 606 + handle_unreachable: HandleUnreachable::default(), 607 + }), 608 + location.clone().into(), 609 + &SubCommandModifiers::default(), 610 + should_quit.clone(), 611 + ); 612 + 613 + assert_eq!( 614 + plan.steps, 615 + vec![ 616 + Ping { 617 + target: target.clone() 618 + } 619 + .into(), 620 + Evaluate.into(), 621 + PushEvaluatedOutput { 622 + substitute_on_destination: true, 623 + target: target.clone() 624 + } 625 + .into(), 626 + Build { 627 + target: Some(target.clone()) 628 + } 629 + .into(), 630 + SwitchToConfiguration { 631 + goal: SwitchToConfigurationGoal::Switch, 632 + reboot: false, 633 + target: Some(target.clone()), 634 + privilege_escalation_command: node.privilege_escalation_command, 635 + } 636 + .into(), 637 + ] 638 + ); 639 + } 640 + 641 + #[tokio::test] 642 + async fn order_nokeys() { 643 + let location = location!(get_test_path!()); 644 + let node = Node { 645 + keys: vec![Key::default().into(), Key::default().into()], 646 + build_remotely: true, 647 + ..Default::default() 648 + }; 649 + let name = &Name(function_name!().into()); 650 + let should_quit = Arc::new(AtomicBool::new(false)); 651 + let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); 652 + let plan = plan_for_node( 653 + &node.clone(), 654 + name.clone(), 655 + &Goal::Apply(ApplyGoalArgs { 656 + goal: ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), 657 + should_apply_locally: false, 658 + no_keys: true, 659 + substitute_on_destination: true, 660 + reboot: false, 661 + host_platform: "x86_64-linux".into(), 662 + handle_unreachable: HandleUnreachable::default(), 663 + }), 664 + location.clone().into(), 665 + &SubCommandModifiers::default(), 666 + should_quit.clone(), 667 + ); 668 + 669 + assert_eq!( 670 + plan.steps, 671 + vec![ 672 + Ping { 673 + target: target.clone() 674 + } 675 + .into(), 676 + Evaluate.into(), 677 + PushEvaluatedOutput { 678 + substitute_on_destination: true, 679 + target: target.clone() 680 + } 681 + .into(), 682 + Build { 683 + target: Some(target.clone()) 684 + } 685 + .into(), 686 + SwitchToConfiguration { 687 + goal: SwitchToConfigurationGoal::Switch, 688 + reboot: false, 689 + target: Some(target.clone()), 690 + privilege_escalation_command: node.privilege_escalation_command, 691 + } 692 + .into(), 693 + ] 694 + ); 695 + } 696 + 697 + #[tokio::test] 698 + async fn order_should_apply_locally() { 699 + let location = location!(get_test_path!()); 700 + let node = Node::default(); 701 + let name = &Name(function_name!().into()); 702 + let should_quit = Arc::new(AtomicBool::new(false)); 703 + let plan = plan_for_node( 704 + &node.clone(), 705 + name.clone(), 706 + &Goal::Apply(ApplyGoalArgs { 707 + goal: ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), 708 + should_apply_locally: true, 709 + no_keys: true, 710 + substitute_on_destination: true, 711 + reboot: false, 712 + host_platform: "x86_64-linux".into(), 713 + handle_unreachable: HandleUnreachable::default(), 714 + }), 715 + location.clone().into(), 716 + &SubCommandModifiers::default(), 717 + should_quit.clone(), 718 + ); 719 + 720 + assert_eq!( 721 + plan.steps, 722 + vec![ 723 + Evaluate.into(), 724 + Build { target: None }.into(), 725 + SwitchToConfiguration { 726 + goal: SwitchToConfigurationGoal::Switch, 727 + reboot: false, 728 + target: None, 729 + privilege_escalation_command: node.privilege_escalation_command, 730 + } 731 + .into(), 732 + ] 733 + ); 734 + } 735 + }
+79 -85
crates/core/src/hive/steps/activate.rs
··· 1 1 // SPDX-License-Identifier: AGPL-3.0-or-later 2 2 // Copyright 2024-2025 wire Contributors 3 3 4 - use std::fmt::Display; 4 + use std::{fmt::Display, sync::Arc}; 5 5 6 6 use tracing::{error, info, instrument, warn}; 7 7 ··· 9 9 HiveLibError, 10 10 commands::{CommandArguments, WireCommandChip, builder::CommandStringBuilder, run_command}, 11 11 errors::{ActivationError, NetworkError}, 12 - hive::node::{Context, ExecuteStep, Goal, Objective, SwitchToConfigurationGoal}, 12 + hive::node::{Context, ExecuteStep, SharedTarget, SwitchToConfigurationGoal}, 13 13 }; 14 14 15 - #[derive(Debug, PartialEq)] 16 - pub struct SwitchToConfiguration; 15 + #[derive(Debug)] 16 + #[cfg_attr(test, derive(PartialEq))] 17 + pub struct SwitchToConfiguration { 18 + pub goal: SwitchToConfigurationGoal, 19 + pub reboot: bool, 20 + pub target: Option<SharedTarget>, 21 + pub privilege_escalation_command: Arc<Vec<Arc<str>>>, 22 + } 17 23 18 24 impl Display for SwitchToConfiguration { 19 25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { ··· 21 27 } 22 28 } 23 29 24 - async fn wait_for_ping(ctx: &Context<'_>) -> Result<(), HiveLibError> { 25 - let host = ctx.node.target.get_preferred_host()?; 26 - let mut result = ctx.node.ping(ctx.modifiers).await; 30 + async fn wait_for_ping(target: &SharedTarget, ctx: &Context) -> Result<(), HiveLibError> { 31 + let target = target.0.read().await; 32 + let host = target.get_preferred_host()?; 27 33 28 - for num in 0..2 { 34 + for num in 0..3 { 29 35 warn!("Trying to ping {host} (attempt {}/3)", num + 1); 30 36 31 - result = ctx.node.ping(ctx.modifiers).await; 37 + let result = target.ping(ctx.modifiers).await; 32 38 33 39 if result.is_ok() { 34 40 info!("Regained connection to {} via {host}", ctx.name); 35 41 36 - break; 42 + return Ok(()); 37 43 } 38 44 } 39 45 40 - result 46 + Err(HiveLibError::NetworkError(NetworkError::HostsExhausted)) 41 47 } 42 48 43 - async fn set_profile( 44 - goal: SwitchToConfigurationGoal, 45 - built_path: &String, 46 - ctx: &Context<'_>, 47 - ) -> Result<(), HiveLibError> { 48 - info!("Setting profiles in anticipation for switch-to-configuration {goal}"); 49 - 50 - let mut command_string = CommandStringBuilder::new("nix-env"); 51 - command_string.args(&["-p", "/nix/var/nix/profiles/system", "--set"]); 52 - command_string.arg(built_path); 49 + impl SwitchToConfiguration { 50 + async fn set_profile(&self, built_path: &String, ctx: &Context) -> Result<(), HiveLibError> { 51 + info!( 52 + "Setting profiles in anticipation for switch-to-configuration {}", 53 + self.goal 54 + ); 53 55 54 - let Objective::Apply(apply_objective) = ctx.objective else { 55 - unreachable!() 56 - }; 56 + let mut command_string = CommandStringBuilder::new("nix-env"); 57 + command_string.args(&["-p", "/nix/var/nix/profiles/system", "--set"]); 58 + command_string.arg(built_path); 57 59 58 - let child = run_command( 59 - &CommandArguments::new(command_string, ctx.modifiers) 60 - .mode(crate::commands::ChildOutputMode::Nix) 61 - .execute_on_remote(if apply_objective.should_apply_locally { 62 - None 63 - } else { 64 - Some(&ctx.node.target) 65 - }) 66 - .elevated(ctx.node), 67 - ) 68 - .await?; 60 + let child = run_command( 61 + &CommandArguments::new(command_string, ctx.modifiers) 62 + .mode(crate::commands::ChildOutputMode::Nix) 63 + .execute_on_remote(self.target.clone()) 64 + .privileged(&self.privilege_escalation_command), 65 + ) 66 + .await?; 69 67 70 - let _ = child 71 - .wait_till_success() 72 - .await 73 - .map_err(HiveLibError::CommandError)?; 68 + let _ = child 69 + .wait_till_success() 70 + .await 71 + .map_err(HiveLibError::CommandError)?; 74 72 75 - info!("Set system profile"); 73 + info!("Set system profile"); 76 74 77 - Ok(()) 75 + Ok(()) 76 + } 78 77 } 79 78 80 79 impl ExecuteStep for SwitchToConfiguration { 81 - fn should_execute(&self, ctx: &Context) -> bool { 82 - let Objective::Apply(apply_objective) = ctx.objective else { 83 - return false; 84 - }; 85 - 86 - matches!(apply_objective.goal, Goal::SwitchToConfiguration(..)) 87 - } 88 - 89 80 #[allow(clippy::too_many_lines)] 90 81 #[instrument(skip_all, name = "activate")] 91 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 82 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 92 83 let built_path = ctx.state.build.as_ref().unwrap(); 93 84 94 - let Objective::Apply(apply_objective) = ctx.objective else { 95 - unreachable!() 96 - }; 97 - 98 - let Goal::SwitchToConfiguration(goal) = &apply_objective.goal else { 99 - unreachable!("Cannot reach as guarded by should_execute") 100 - }; 101 - 102 85 if matches!( 103 - goal, 86 + self.goal, 104 87 // switch profile if switch or boot 105 88 // https://github.com/NixOS/nixpkgs/blob/a2c92aa34735a04010671e3378e2aa2d109b2a72/pkgs/by-name/ni/nixos-rebuild-ng/src/nixos_rebuild/services.py#L224 106 89 SwitchToConfigurationGoal::Switch | SwitchToConfigurationGoal::Boot 107 90 ) { 108 - set_profile(*goal, built_path, ctx).await?; 91 + self.set_profile(built_path, ctx).await?; 109 92 } 110 93 111 - info!("Running switch-to-configuration {goal}"); 94 + info!("Running switch-to-configuration {}", self.goal); 112 95 113 96 let mut command_string = 114 97 CommandStringBuilder::new(format!("{built_path}/bin/switch-to-configuration")); 115 - command_string.arg(match goal { 98 + command_string.arg(match self.goal { 116 99 SwitchToConfigurationGoal::Switch => "switch", 117 100 SwitchToConfigurationGoal::Boot => "boot", 118 101 SwitchToConfigurationGoal::Test => "test", ··· 121 104 122 105 let child = run_command( 123 106 &CommandArguments::new(command_string, ctx.modifiers) 124 - .execute_on_remote(if apply_objective.should_apply_locally { 125 - None 126 - } else { 127 - Some(&ctx.node.target) 128 - }) 129 - .elevated(ctx.node) 107 + .execute_on_remote(self.target.clone()) 108 + .privileged(&self.privilege_escalation_command) 130 109 .log_stdout(), 131 110 ) 132 111 .await?; ··· 135 114 136 115 match result { 137 116 Ok(_) => { 138 - if !apply_objective.reboot { 117 + if !self.reboot { 139 118 return Ok(()); 140 119 } 141 120 142 - if apply_objective.should_apply_locally { 121 + let Some(ref target) = self.target else { 143 122 error!("Refusing to reboot local machine!"); 144 123 145 124 return Ok(()); 146 - } 125 + }; 147 126 148 127 warn!("Rebooting {name}!", name = ctx.name); 149 128 150 129 let reboot = run_command( 151 130 &CommandArguments::new("reboot now", ctx.modifiers) 152 131 .log_stdout() 153 - .execute_on_remote(Some(&ctx.node.target)) 154 - .elevated(ctx.node), 132 + .execute_on_remote(Some(target.clone())) 133 + .privileged(&self.privilege_escalation_command), 155 134 ) 156 135 .await?; 157 136 ··· 164 143 165 144 info!("Rebooted {name}, waiting to reconnect...", name = ctx.name); 166 145 167 - if wait_for_ping(ctx).await.is_ok() { 146 + if wait_for_ping(target, ctx).await.is_ok() { 168 147 return Ok(()); 169 148 } 149 + 150 + let target = target.0.read().await; 170 151 171 152 error!( 172 153 "Failed to get regain connection to {name} via {host} after reboot.", 173 154 name = ctx.name, 174 - host = ctx.node.target.get_preferred_host()? 155 + host = target.get_preferred_host()? 175 156 ); 176 157 177 158 return Err(HiveLibError::NetworkError( 178 159 NetworkError::HostUnreachableAfterReboot( 179 - ctx.node.target.get_preferred_host()?.to_string(), 160 + target.get_preferred_host()?.to_string(), 180 161 ), 181 162 )); 182 163 } ··· 188 169 189 170 // Bail if the command couldn't of broken the system 190 171 // and don't try to regain connection to localhost 191 - if matches!(goal, SwitchToConfigurationGoal::DryActivate) 192 - || apply_objective.should_apply_locally 193 - { 172 + let Some(target) = self 173 + .target 174 + .as_ref() 175 + .filter(|_| !matches!(self.goal, SwitchToConfigurationGoal::DryActivate)) 176 + else { 194 177 return Err(HiveLibError::ActivationError( 195 - ActivationError::SwitchToConfigurationError(*goal, ctx.name.clone(), error), 178 + ActivationError::SwitchToConfigurationError( 179 + self.goal, 180 + ctx.name.clone(), 181 + error, 182 + ), 196 183 )); 197 - } 184 + }; 198 185 199 - if wait_for_ping(ctx).await.is_ok() { 186 + if wait_for_ping(target, ctx).await.is_ok() { 200 187 return Err(HiveLibError::ActivationError( 201 - ActivationError::SwitchToConfigurationError(*goal, ctx.name.clone(), error), 188 + ActivationError::SwitchToConfigurationError( 189 + self.goal, 190 + ctx.name.clone(), 191 + error, 192 + ), 202 193 )); 203 194 } 195 + 196 + let target = target.0.read().await; 204 197 205 198 error!( 206 199 "Failed to get regain connection to {name} via {host} after {goal} activation.", 207 200 name = ctx.name, 208 - host = ctx.node.target.get_preferred_host()? 201 + host = target.get_preferred_host()?, 202 + goal = self.goal 209 203 ); 210 204 211 205 return Err(HiveLibError::NetworkError( 212 206 NetworkError::HostUnreachableAfterReboot( 213 - ctx.node.target.get_preferred_host()?.to_string(), 207 + target.get_preferred_host()?.to_string(), 214 208 ), 215 209 )); 216 210 }
+8 -23
crates/core/src/hive/steps/build.rs
··· 11 11 CommandArguments, Either, WireCommandChip, builder::CommandStringBuilder, 12 12 run_command_with_env, 13 13 }, 14 - hive::node::{Context, ExecuteStep, Goal, Objective}, 14 + hive::node::{Context, ExecuteStep, SharedTarget}, 15 15 }; 16 16 17 - #[derive(Debug, PartialEq)] 18 - pub struct Build; 17 + #[derive(Debug)] 18 + #[cfg_attr(test, derive(PartialEq))] 19 + pub struct Build { 20 + pub(crate) target: Option<SharedTarget>, 21 + } 19 22 20 23 impl Display for Build { 21 24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { ··· 24 27 } 25 28 26 29 impl ExecuteStep for Build { 27 - fn should_execute(&self, ctx: &Context) -> bool { 28 - match ctx.objective { 29 - Objective::Apply(apply_objective) => { 30 - !matches!(apply_objective.goal, Goal::Keys | Goal::Push) 31 - } 32 - Objective::BuildLocally => true, 33 - } 34 - } 35 - 36 30 #[instrument(skip_all, name = "build")] 37 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 31 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 38 32 let top_level = ctx.state.evaluation.as_ref().unwrap(); 39 33 40 34 let mut command_string = CommandStringBuilder::nix(); ··· 51 45 let status = run_command_with_env( 52 46 &CommandArguments::new(command_string, ctx.modifiers) 53 47 // build remotely if asked for AND we arent applying locally 54 - .execute_on_remote( 55 - if ctx.node.build_remotely 56 - && let Objective::Apply(apply_objective) = ctx.objective 57 - && !apply_objective.should_apply_locally 58 - { 59 - Some(&ctx.node.target) 60 - } else { 61 - None 62 - }, 63 - ) 48 + .execute_on_remote(self.target.clone()) 64 49 .mode(crate::commands::ChildOutputMode::Nix) 65 50 .log_stdout(), 66 51 std::collections::HashMap::new(),
-28
crates/core/src/hive/steps/cleanup.rs
··· 1 - // SPDX-License-Identifier: AGPL-3.0-or-later 2 - // Copyright 2024-2025 wire Contributors 3 - 4 - use std::fmt::Display; 5 - 6 - use crate::{ 7 - errors::HiveLibError, 8 - hive::node::{Context, ExecuteStep}, 9 - }; 10 - 11 - #[derive(PartialEq, Debug)] 12 - pub(crate) struct CleanUp; 13 - 14 - impl Display for CleanUp { 15 - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 16 - write!(f, "Clean up") 17 - } 18 - } 19 - 20 - impl ExecuteStep for CleanUp { 21 - fn should_execute(&self, _ctx: &Context) -> bool { 22 - false 23 - } 24 - 25 - async fn execute(&self, _ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 26 - Ok(()) 27 - } 28 - }
+2 -9
crates/core/src/hive/steps/evaluate.rs
··· 7 7 8 8 use crate::{ 9 9 HiveLibError, 10 - hive::node::{Context, ExecuteStep, Goal, Objective}, 10 + hive::node::{Context, ExecuteStep}, 11 11 }; 12 12 13 13 #[derive(Debug, PartialEq)] ··· 20 20 } 21 21 22 22 impl ExecuteStep for Evaluate { 23 - fn should_execute(&self, ctx: &Context) -> bool { 24 - match ctx.objective { 25 - Objective::Apply(apply_objective) => !matches!(apply_objective.goal, Goal::Keys), 26 - Objective::BuildLocally => true, 27 - } 28 - } 29 - 30 23 #[instrument(skip_all, name = "eval")] 31 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 24 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 32 25 let rx = ctx.state.evaluation_rx.take().unwrap(); 33 26 34 27 ctx.state.evaluation = Some(rx.await.unwrap()?);
+36 -139
crates/core/src/hive/steps/keys.rs
··· 4 4 use base64::Engine; 5 5 use base64::prelude::BASE64_STANDARD; 6 6 use futures::future::join_all; 7 - use im::Vector; 8 7 use itertools::{Itertools, Position}; 9 8 use owo_colors::OwoColorize; 10 9 use prost::Message; ··· 19 18 use std::pin::Pin; 20 19 use std::process::Stdio; 21 20 use std::str::from_utf8; 21 + use std::sync::Arc; 22 22 use std::vec::IntoIter; 23 23 use tokio::io::AsyncReadExt as _; 24 24 use tokio::process::Command; ··· 31 31 use crate::commands::common::push; 32 32 use crate::commands::{CommandArguments, WireCommandChip, run_command}; 33 33 use crate::errors::KeyError; 34 - use crate::hive::node::{Context, ExecuteStep, Goal, Objective, Push, SwitchToConfigurationGoal}; 34 + use crate::hive::node::{Context, ExecuteStep, Push, SharedTarget}; 35 35 36 36 #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)] 37 37 #[serde(tag = "t", content = "c")] ··· 179 179 )) 180 180 } 181 181 182 - #[derive(Debug, PartialEq)] 182 + #[derive(Debug)] 183 + #[cfg_attr(test, derive(PartialEq))] 183 184 pub struct Keys { 184 - pub filter: UploadKeyAt, 185 + pub keys: Vec<Arc<Key>>, 186 + pub target: Option<SharedTarget>, 187 + pub privilege_escalation_command: Arc<Vec<Arc<str>>>, 188 + } 189 + 190 + #[derive(Debug)] 191 + #[cfg_attr(test, derive(PartialEq))] 192 + pub struct PushKeyAgent { 193 + pub substitute_on_destination: bool, 194 + pub host_platform: Arc<str>, 195 + pub target: Option<SharedTarget>, 185 196 } 186 - #[derive(Debug, PartialEq)] 187 - pub struct PushKeyAgent; 188 197 189 198 impl Display for Keys { 190 199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 191 - write!(f, "Upload key @ {:?}", self.filter) 200 + write!(f, "Upload {} key(s)", self.keys.len()) 192 201 } 193 202 } 194 203 ··· 225 234 } 226 235 227 236 impl ExecuteStep for Keys { 228 - fn should_execute(&self, ctx: &Context) -> bool { 229 - let Objective::Apply(apply_objective) = ctx.objective else { 230 - return false; 231 - }; 232 - 233 - if apply_objective.no_keys { 234 - return false; 235 - } 236 - 237 - // should execute if no filter, and the goal is keys. 238 - // otherwise, only execute if the goal is switch and non-nofilter 239 - matches!( 240 - (&self.filter, &apply_objective.goal), 241 - (UploadKeyAt::NoFilter, Goal::Keys) 242 - | ( 243 - UploadKeyAt::PreActivation | UploadKeyAt::PostActivation, 244 - Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) 245 - ) 246 - ) 247 - } 248 - 249 237 #[instrument(skip_all, name = "keys")] 250 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 238 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 251 239 let agent_directory = ctx.state.key_agent_directory.as_ref().unwrap(); 252 240 253 - let mut keys = self.select_keys(&ctx.node.keys).await?; 241 + let mut keys = self.select_keys(&self.keys).await?; 254 242 255 243 if keys.peek().is_none() { 256 244 debug!("Had no keys to push, ending KeyStep early."); ··· 259 247 260 248 let command_string = 261 249 CommandStringBuilder::new(format!("{agent_directory}/bin/wire-key-agent")); 262 - 263 - let Objective::Apply(apply_objective) = ctx.objective else { 264 - unreachable!() 265 - }; 266 250 267 251 let mut child = run_command( 268 252 &CommandArguments::new(command_string, ctx.modifiers) 269 - .execute_on_remote(if apply_objective.should_apply_locally { 270 - None 271 - } else { 272 - Some(&ctx.node.target) 273 - }) 274 - .elevated(ctx.node) 253 + .execute_on_remote(self.target.clone()) 254 + .privileged(&self.privilege_escalation_command) 275 255 .keep_stdin_open() 276 256 .log_stdout(), 277 257 ) ··· 306 286 impl Keys { 307 287 async fn select_keys( 308 288 &self, 309 - keys: &Vector<Key>, 289 + keys: &[Arc<Key>], 310 290 ) -> Result<Peekable<IntoIter<(wire_key_agent::keys::KeySpec, std::vec::Vec<u8>)>>, HiveLibError> 311 291 { 312 - let futures = keys 313 - .iter() 314 - .filter(|key| self.filter == UploadKeyAt::NoFilter || (key.upload_at == self.filter)) 315 - .map(|key| async move { 316 - process_key(key) 317 - .await 318 - .map_err(|err| HiveLibError::KeyError(key.name.clone(), err)) 319 - }); 292 + let futures = keys.iter().map(|key| async move { 293 + process_key(key) 294 + .await 295 + .map_err(|err| HiveLibError::KeyError(key.name.clone(), err)) 296 + }); 320 297 321 298 Ok(join_all(futures) 322 299 .await ··· 328 305 } 329 306 330 307 impl ExecuteStep for PushKeyAgent { 331 - fn should_execute(&self, ctx: &Context) -> bool { 332 - let Objective::Apply(apply_objective) = ctx.objective else { 333 - return false; 334 - }; 335 - 336 - if apply_objective.no_keys { 337 - return false; 338 - } 339 - 340 - matches!( 341 - &apply_objective.goal, 342 - Goal::Keys | Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) 343 - ) 344 - } 345 - 346 308 #[instrument(skip_all, name = "push_agent")] 347 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 309 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 348 310 let arg_name = format!( 349 311 "WIRE_KEY_AGENT_{platform}", 350 - platform = ctx.node.host_platform.replace('-', "_") 312 + platform = self.host_platform.replace('-', "_") 351 313 ); 352 314 353 315 let agent_directory = match env::var_os(&arg_name) { ··· 359 321 ), 360 322 }; 361 323 362 - let Objective::Apply(apply_objective) = ctx.objective else { 363 - unreachable!() 364 - }; 365 - 366 - if !apply_objective.should_apply_locally { 367 - push(ctx, Push::Path(&agent_directory)).await?; 324 + if let Some(ref target) = self.target { 325 + push( 326 + ctx, 327 + target, 328 + Push::Path(&agent_directory), 329 + self.substitute_on_destination, 330 + ) 331 + .await?; 368 332 } 369 333 370 334 ctx.state.key_agent_directory = Some(agent_directory); ··· 372 336 Ok(()) 373 337 } 374 338 } 375 - 376 - #[cfg(test)] 377 - mod tests { 378 - use im::Vector; 379 - 380 - use crate::hive::steps::keys::{Key, Keys, UploadKeyAt, process_key}; 381 - 382 - fn new_key(upload_at: &UploadKeyAt) -> Key { 383 - Key { 384 - upload_at: upload_at.clone(), 385 - source: super::Source::String(match upload_at { 386 - UploadKeyAt::PreActivation => "pre".into(), 387 - UploadKeyAt::PostActivation => "post".into(), 388 - UploadKeyAt::NoFilter => "none".into(), 389 - }), 390 - ..Default::default() 391 - } 392 - } 393 - 394 - #[tokio::test] 395 - async fn key_filtering() { 396 - let keys = Vector::from(vec![ 397 - new_key(&UploadKeyAt::PreActivation), 398 - new_key(&UploadKeyAt::PostActivation), 399 - new_key(&UploadKeyAt::PreActivation), 400 - new_key(&UploadKeyAt::PostActivation), 401 - ]); 402 - 403 - for (_, buf) in (Keys { 404 - filter: crate::hive::steps::keys::UploadKeyAt::PreActivation, 405 - }) 406 - .select_keys(&keys) 407 - .await 408 - .unwrap() 409 - { 410 - assert_eq!(String::from_utf8_lossy(&buf), "pre"); 411 - } 412 - 413 - for (_, buf) in (Keys { 414 - filter: crate::hive::steps::keys::UploadKeyAt::PostActivation, 415 - }) 416 - .select_keys(&keys) 417 - .await 418 - .unwrap() 419 - { 420 - assert_eq!(String::from_utf8_lossy(&buf), "post"); 421 - } 422 - 423 - // test that NoFilter processes all keys. 424 - let processed_all = 425 - futures::future::join_all(keys.iter().map(async |x| process_key(x).await)) 426 - .await 427 - .iter() 428 - .flatten() 429 - .cloned() 430 - .collect::<Vec<_>>(); 431 - let no_filter = (Keys { 432 - filter: crate::hive::steps::keys::UploadKeyAt::NoFilter, 433 - }) 434 - .select_keys(&keys) 435 - .await 436 - .unwrap() 437 - .collect::<Vec<_>>(); 438 - 439 - assert_eq!(processed_all, no_filter); 440 - } 441 - }
-1
crates/core/src/hive/steps/mod.rs
··· 3 3 4 4 pub mod activate; 5 5 pub mod build; 6 - pub mod cleanup; 7 6 pub mod evaluate; 8 7 pub mod keys; 9 8 pub mod ping;
+17 -17
crates/core/src/hive/steps/ping.rs
··· 7 7 8 8 use crate::{ 9 9 HiveLibError, 10 - hive::node::{Context, ExecuteStep, Objective}, 10 + hive::node::{Context, ExecuteStep, SharedTarget}, 11 11 }; 12 12 13 - #[derive(Debug, PartialEq)] 14 - pub struct Ping; 13 + #[derive(Debug)] 14 + #[cfg_attr(test, derive(PartialEq))] 15 + pub struct Ping { 16 + pub target: SharedTarget, 17 + } 15 18 16 19 impl Display for Ping { 17 20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { ··· 20 23 } 21 24 22 25 impl ExecuteStep for Ping { 23 - fn should_execute(&self, ctx: &Context) -> bool { 24 - let Objective::Apply(apply_objective) = ctx.objective else { 25 - return false; 26 - }; 27 - 28 - !apply_objective.should_apply_locally 29 - } 30 - 31 26 #[instrument(skip_all, name = "ping")] 32 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 27 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 33 28 loop { 29 + let target = self.target.0.read().await; 30 + 34 31 event!( 35 32 Level::INFO, 36 33 status = "attempting", 37 - host = ctx.node.target.get_preferred_host()?.to_string() 34 + host = target.get_preferred_host()?.to_string() 38 35 ); 39 36 40 - if ctx.node.ping(ctx.modifiers).await.is_ok() { 37 + if target.ping(ctx.modifiers).await.is_ok() { 41 38 event!( 42 39 Level::INFO, 43 40 status = "success", 44 - host = ctx.node.target.get_preferred_host()?.to_string() 41 + host = target.get_preferred_host()?.to_string() 45 42 ); 46 43 return Ok(()); 47 44 } ··· 50 47 event!( 51 48 Level::WARN, 52 49 status = "failed to ping", 53 - host = ctx.node.target.get_preferred_host()?.to_string() 50 + host = target.get_preferred_host()?.to_string() 54 51 ); 55 - ctx.node.target.host_failed(); 52 + 53 + drop(target); 54 + 55 + self.target.0.write().await.host_failed(); 56 56 } 57 57 } 58 58 }
+30 -42
crates/core/src/hive/steps/push.rs
··· 8 8 use crate::{ 9 9 HiveLibError, 10 10 commands::common::push, 11 - hive::node::{Context, ExecuteStep, Goal, Objective}, 11 + hive::node::{Context, ExecuteStep, SharedTarget}, 12 12 }; 13 13 14 - #[derive(Debug, PartialEq)] 15 - pub struct PushEvaluatedOutput; 16 - #[derive(Debug, PartialEq)] 17 - pub struct PushBuildOutput; 14 + #[derive(Debug)] 15 + #[cfg_attr(test, derive(PartialEq))] 16 + pub struct PushEvaluatedOutput { 17 + pub substitute_on_destination: bool, 18 + pub target: SharedTarget, 19 + } 20 + 21 + #[derive(Debug)] 22 + #[cfg_attr(test, derive(PartialEq))] 23 + pub struct PushBuildOutput { 24 + pub substitute_on_destination: bool, 25 + pub target: SharedTarget, 26 + } 18 27 19 28 impl Display for PushEvaluatedOutput { 20 29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { ··· 29 38 } 30 39 31 40 impl ExecuteStep for PushEvaluatedOutput { 32 - fn should_execute(&self, ctx: &Context) -> bool { 33 - let Objective::Apply(apply_objective) = ctx.objective else { 34 - return false; 35 - }; 36 - 37 - !matches!(apply_objective.goal, Goal::Keys) 38 - && !apply_objective.should_apply_locally 39 - && (ctx.node.build_remotely | matches!(apply_objective.goal, Goal::Push)) 40 - } 41 - 42 41 #[instrument(skip_all, name = "push_eval")] 43 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 42 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 44 43 let top_level = ctx.state.evaluation.as_ref().unwrap(); 45 44 46 - push(ctx, crate::hive::node::Push::Derivation(top_level)).await?; 45 + push( 46 + ctx, 47 + &self.target, 48 + crate::hive::node::Push::Derivation(top_level), 49 + self.substitute_on_destination, 50 + ) 51 + .await?; 47 52 48 53 Ok(()) 49 54 } 50 55 } 51 56 52 57 impl ExecuteStep for PushBuildOutput { 53 - fn should_execute(&self, ctx: &Context) -> bool { 54 - let Objective::Apply(apply_objective) = ctx.objective else { 55 - return false; 56 - }; 57 - 58 - if matches!(apply_objective.goal, Goal::Keys | Goal::Push) { 59 - // skip if we are not building 60 - return false; 61 - } 62 - 63 - if ctx.node.build_remotely { 64 - // skip if we are building remotely 65 - return false; 66 - } 67 - 68 - if apply_objective.should_apply_locally { 69 - // skip step if we are applying locally 70 - return false; 71 - } 72 - 73 - true 74 - } 75 - 76 58 #[instrument(skip_all, name = "push_build")] 77 - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 59 + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { 78 60 let built_path = ctx.state.build.as_ref().unwrap(); 79 61 80 - push(ctx, crate::hive::node::Push::Path(built_path)).await?; 62 + push( 63 + ctx, 64 + &self.target, 65 + crate::hive::node::Push::Path(built_path), 66 + self.substitute_on_destination, 67 + ) 68 + .await?; 81 69 82 70 Ok(()) 83 71 }