ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix bug related to key filtering

+118 -20
+4
CHANGELOG.md
··· 11 11 12 12 - SIGINT signal handling 13 13 14 + ### Changed 15 + 16 + - Fix a bug related to key filtering 17 + 14 18 ## [v1.0.0-beta.0] - 2025-12-02 15 19 16 20 ### Added
+114 -20
wire/lib/src/hive/steps/keys.rs
··· 4 4 use base64::Engine; 5 5 use base64::prelude::BASE64_STANDARD; 6 6 use futures::future::join_all; 7 + use im::Vector; 7 8 use itertools::{Itertools, Position}; 8 9 use owo_colors::OwoColorize; 9 10 use prost::Message; ··· 13 14 use std::env; 14 15 use std::fmt::Display; 15 16 use std::io::Cursor; 17 + use std::iter::Peekable; 16 18 use std::path::PathBuf; 17 19 use std::pin::Pin; 18 20 use std::process::Stdio; 19 21 use std::str::from_utf8; 22 + use std::vec::IntoIter; 20 23 use tokio::io::AsyncReadExt as _; 21 24 use tokio::process::Command; 22 25 use tokio::{fs::File, io::AsyncRead}; ··· 82 85 self.group, 83 86 self.permissions, 84 87 ) 88 + } 89 + } 90 + 91 + #[cfg(test)] 92 + impl Default for Key { 93 + fn default() -> Self { 94 + use im::HashMap; 95 + 96 + Self { 97 + name: "key".into(), 98 + dest_dir: "/somewhere/".into(), 99 + path: "key".into(), 100 + group: "root".into(), 101 + user: "root".into(), 102 + permissions: "0600".into(), 103 + source: Source::String("test key".into()), 104 + upload_at: UploadKeyAt::PreActivation, 105 + environment: HashMap::new(), 106 + } 85 107 } 86 108 } 87 109 ··· 223 245 async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { 224 246 let agent_directory = ctx.state.key_agent_directory.as_ref().unwrap(); 225 247 226 - let futures = ctx 227 - .node 228 - .keys 229 - .iter() 230 - .filter(|key| { 231 - self.filter == UploadKeyAt::NoFilter 232 - || (self.filter != UploadKeyAt::NoFilter && key.upload_at != self.filter) 233 - }) 234 - .map(|key| async move { 235 - process_key(key) 236 - .await 237 - .map_err(|err| HiveLibError::KeyError(key.name.clone(), err)) 238 - }); 239 - 240 - let mut keys = join_all(futures) 241 - .await 242 - .into_iter() 243 - .collect::<Result<Vec<_>, HiveLibError>>()? 244 - .into_iter() 245 - .peekable(); 248 + let mut keys = self.select_keys(&ctx.node.keys).await?; 246 249 247 250 if keys.peek().is_none() { 248 251 debug!("Had no keys to push, ending KeyStep early."); ··· 287 290 debug!("status: {status:?}"); 288 291 289 292 Ok(()) 293 + } 294 + } 295 + 296 + impl Keys { 297 + async fn select_keys( 298 + &self, 299 + keys: &Vector<Key>, 300 + ) -> Result<Peekable<IntoIter<(key_agent::keys::KeySpec, std::vec::Vec<u8>)>>, HiveLibError> 301 + { 302 + let futures = keys 303 + .iter() 304 + .filter(|key| self.filter == UploadKeyAt::NoFilter || (key.upload_at == self.filter)) 305 + .map(|key| async move { 306 + process_key(key) 307 + .await 308 + .map_err(|err| HiveLibError::KeyError(key.name.clone(), err)) 309 + }); 310 + 311 + Ok(join_all(futures) 312 + .await 313 + .into_iter() 314 + .collect::<Result<Vec<_>, HiveLibError>>()? 315 + .into_iter() 316 + .peekable()) 290 317 } 291 318 } 292 319 ··· 327 354 Ok(()) 328 355 } 329 356 } 357 + 358 + #[cfg(test)] 359 + mod tests { 360 + use im::Vector; 361 + 362 + use crate::hive::steps::keys::{Key, Keys, UploadKeyAt, process_key}; 363 + 364 + fn new_key(upload_at: &UploadKeyAt) -> Key { 365 + Key { 366 + upload_at: upload_at.clone(), 367 + source: super::Source::String(match upload_at { 368 + UploadKeyAt::PreActivation => "pre".into(), 369 + UploadKeyAt::PostActivation => "post".into(), 370 + UploadKeyAt::NoFilter => "none".into(), 371 + }), 372 + ..Default::default() 373 + } 374 + } 375 + 376 + #[tokio::test] 377 + async fn key_filtering() { 378 + let keys = Vector::from(vec![ 379 + new_key(&UploadKeyAt::PreActivation), 380 + new_key(&UploadKeyAt::PostActivation), 381 + new_key(&UploadKeyAt::PreActivation), 382 + new_key(&UploadKeyAt::PostActivation), 383 + ]); 384 + 385 + for (_, buf) in (Keys { 386 + filter: crate::hive::steps::keys::UploadKeyAt::PreActivation, 387 + }) 388 + .select_keys(&keys) 389 + .await 390 + .unwrap() 391 + { 392 + assert_eq!(String::from_utf8_lossy(&buf), "pre"); 393 + } 394 + 395 + for (_, buf) in (Keys { 396 + filter: crate::hive::steps::keys::UploadKeyAt::PostActivation, 397 + }) 398 + .select_keys(&keys) 399 + .await 400 + .unwrap() 401 + { 402 + assert_eq!(String::from_utf8_lossy(&buf), "post"); 403 + } 404 + 405 + // test that NoFilter processes all keys. 406 + let processed_all = 407 + futures::future::join_all(keys.iter().map(async |x| process_key(x).await)) 408 + .await 409 + .iter() 410 + .flatten() 411 + .cloned() 412 + .collect::<Vec<_>>(); 413 + let no_filter = (Keys { 414 + filter: crate::hive::steps::keys::UploadKeyAt::NoFilter, 415 + }) 416 + .select_keys(&keys) 417 + .await 418 + .unwrap() 419 + .collect::<Vec<_>>(); 420 + 421 + assert_eq!(processed_all, no_filter); 422 + } 423 + }