ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle sigint

authored by

marshmallow and committed by
marshmallow
820b77c1 32233255

+103 -1
+24
Cargo.lock
··· 2124 2124 checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 2125 2125 2126 2126 [[package]] 2127 + name = "signal-hook" 2128 + version = "0.3.18" 2129 + source = "registry+https://github.com/rust-lang/crates.io-index" 2130 + checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" 2131 + dependencies = [ 2132 + "libc", 2133 + "signal-hook-registry", 2134 + ] 2135 + 2136 + [[package]] 2127 2137 name = "signal-hook-registry" 2128 2138 version = "1.4.6" 2129 2139 source = "registry+https://github.com/rust-lang/crates.io-index" 2130 2140 checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" 2131 2141 dependencies = [ 2132 2142 "libc", 2143 + ] 2144 + 2145 + [[package]] 2146 + name = "signal-hook-tokio" 2147 + version = "0.3.1" 2148 + source = "registry+https://github.com/rust-lang/crates.io-index" 2149 + checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e" 2150 + dependencies = [ 2151 + "futures-core", 2152 + "libc", 2153 + "signal-hook", 2154 + "tokio", 2133 2155 ] 2134 2156 2135 2157 [[package]] ··· 3207 3229 "owo-colors", 3208 3230 "serde", 3209 3231 "serde_json", 3232 + "signal-hook", 3233 + "signal-hook-tokio", 3210 3234 "thiserror 2.0.17", 3211 3235 "tokio", 3212 3236 "tracing",
+2
wire/cli/Cargo.toml
··· 31 31 clap_complete = { version = "4.5.60", features = ["unstable-dynamic"] } 32 32 nix-compat = { workspace = true } 33 33 owo-colors = { workspace = true } 34 + signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } 35 + signal-hook = "0.3.18"
+3
wire/cli/src/apply.rs
··· 11 11 use std::collections::HashSet; 12 12 use std::io::{Read, stderr}; 13 13 use std::sync::Arc; 14 + use std::sync::atomic::AtomicBool; 14 15 use thiserror::Error; 15 16 use tracing::{Span, error, info}; 16 17 ··· 51 52 // #[instrument(skip_all, fields(goal = %args.goal, on = %args.on.iter().join(", ")))] 52 53 pub async fn apply( 53 54 hive: &mut Hive, 55 + should_shutdown: Arc<AtomicBool>, 54 56 location: HiveLocation, 55 57 args: ApplyArgs, 56 58 mut modifiers: SubCommandModifiers, ··· 121 123 reboot: args.reboot, 122 124 should_apply_locally, 123 125 handle_unreachable: args.handle_unreachable.clone().into(), 126 + should_shutdown: should_shutdown.clone() 124 127 }; 125 128 126 129 GoalExecutor::new(context)
+17 -1
wire/cli/src/main.rs
··· 5 5 #![feature(nonpoison_mutex)] 6 6 7 7 use std::process::Command; 8 + use std::sync::Arc; 9 + use std::sync::atomic::AtomicBool; 8 10 9 11 use crate::cli::Cli; 10 12 use crate::cli::ToSubCommandModifiers; 13 + use crate::sigint::handle_signals; 11 14 use crate::tracing_setup::setup_logging; 12 15 use clap::CommandFactory; 13 16 use clap::Parser; ··· 18 21 use lib::hive::get_hive_location; 19 22 use miette::IntoDiagnostic; 20 23 use miette::Result; 24 + use signal_hook::consts::SIGINT; 25 + use signal_hook_tokio::Signals; 21 26 use tracing::error; 22 27 use tracing::warn; 23 28 ··· 27 32 mod apply; 28 33 mod cli; 29 34 mod tracing_setup; 35 + mod sigint; 30 36 31 37 #[cfg(feature = "dhat-heap")] 32 38 #[global_allocator] ··· 57 63 miette::bail!("Nix is not available on this system."); 58 64 } 59 65 66 + let signals = Signals::new([ 67 + SIGINT, 68 + ]).into_diagnostic()?; 69 + let signals_handle = signals.handle(); 70 + let should_shutdown = Arc::new(AtomicBool::new(false)); 71 + let signals_task = tokio::spawn(handle_signals(signals, should_shutdown.clone())); 72 + 60 73 let location = get_hive_location(args.path, modifiers).await?; 61 74 let cache = InspectionCache::new().await; 62 75 63 76 match args.command { 64 77 cli::Commands::Apply(apply_args) => { 65 78 let mut hive = Hive::new_from_path(&location, cache.clone(), modifiers).await?; 66 - apply::apply(&mut hive, location, apply_args, modifiers).await?; 79 + apply::apply(&mut hive, should_shutdown, location, apply_args, modifiers).await?; 67 80 } 68 81 cli::Commands::Inspect { json, selection } => println!("{}", { 69 82 match selection { ··· 87 100 if let Some(cache) = cache { 88 101 cache.gc().await.into_diagnostic()?; 89 102 } 103 + 104 + signals_handle.close(); 105 + signals_task.await.into_diagnostic()?; 90 106 91 107 Ok(()) 92 108 }
+20
wire/cli/src/sigint.rs
··· 1 + // SPDX-License-Identifier: AGPL-3.0-or-later 2 + // Copyright 2024-2025 wire Contributors 3 + 4 + use std::sync::{Arc, atomic::AtomicBool}; 5 + 6 + use signal_hook::consts::SIGINT; 7 + use signal_hook_tokio::Signals; 8 + 9 + use futures::stream::StreamExt; 10 + use tracing::{info}; 11 + 12 + pub(crate) async fn handle_signals(mut signals: Signals, should_shutdown: Arc<AtomicBool>) { 13 + while let Some(signal) = signals.next().await { 14 + if let SIGINT = signal && !should_shutdown.load(std::sync::atomic::Ordering::Relaxed) { 15 + info!("Received SIGINT, attempting to shut down executor threads."); 16 + should_shutdown.store(true, std::sync::atomic::Ordering::Relaxed); 17 + } 18 + } 19 + } 20 +
+7
wire/lib/src/errors.rs
··· 366 366 )] 367 367 #[error("error encoding length delimited data")] 368 368 Encoding(#[source] std::io::Error), 369 + 370 + #[diagnostic( 371 + code(wire::SIGINT), 372 + url("{DOCS_URL}#{}", self.code().unwrap()) 373 + )] 374 + #[error("SIGINT recieved, shut down")] 375 + Sigint 369 376 }
+30
wire/lib/src/hive/node.rs
··· 8 8 use std::assert_matches::debug_assert_matches; 9 9 use std::fmt::Display; 10 10 use std::sync::Arc; 11 + use std::sync::atomic::AtomicBool; 11 12 use tokio::sync::oneshot; 12 13 use tracing::{Instrument, Level, Span, debug, error, event, instrument, trace}; 13 14 ··· 115 116 reboot: false, 116 117 should_apply_locally: false, 117 118 handle_unreachable: HandleUnreachable::default(), 119 + should_shutdown: Arc::new(AtomicBool::new(false)) 118 120 } 119 121 } 120 122 } ··· 305 307 pub reboot: bool, 306 308 pub should_apply_locally: bool, 307 309 pub handle_unreachable: HandleUnreachable, 310 + pub should_shutdown: Arc<AtomicBool> 308 311 } 309 312 310 313 #[enum_dispatch(ExecuteStep)] ··· 342 345 context: Context<'a>, 343 346 } 344 347 348 + /// returns Err if the application should shut down. 349 + fn app_shutdown_guard(context: &Context) -> Result<(), HiveLibError> { 350 + if context.should_shutdown.load(std::sync::atomic::Ordering::Relaxed) { 351 + return Err(HiveLibError::Sigint) 352 + } 353 + 354 + Ok(()) 355 + } 356 + 345 357 impl<'a> GoalExecutor<'a> { 346 358 #[must_use] 347 359 pub fn new(context: Context<'a>) -> Self { ··· 390 402 391 403 #[instrument(skip_all, fields(node = %self.context.name))] 392 404 pub async fn execute(mut self) -> Result<(), HiveLibError> { 405 + app_shutdown_guard(&self.context)?; 406 + 393 407 let (tx, rx) = oneshot::channel(); 394 408 self.context.state.evaluation_rx = Some(rx); 395 409 ··· 429 443 let length = steps.len(); 430 444 431 445 for (position, step) in steps.iter().enumerate() { 446 + app_shutdown_guard(&self.context)?; 447 + 432 448 event!( 433 449 Level::INFO, 434 450 step = step.to_string(), ··· 791 807 ) 792 808 .unwrap() 793 809 ); 810 + } 811 + 812 + #[tokio::test] 813 + async fn context_quits_sigint() { 814 + let location = location!(get_test_path!()); 815 + let mut node = Node::default(); 816 + 817 + let name = &Name(function_name!().into()); 818 + let context = Context::create_test_context(location, name, &mut node); 819 + context.should_shutdown.store(true, std::sync::atomic::Ordering::Relaxed); 820 + let executor = GoalExecutor::new(context); 821 + let status = executor.execute().await; 822 + 823 + assert_matches!(status, Err(HiveLibError::Sigint)); 794 824 } 795 825 }