A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #176 from jzbor/push-from-stdin

attic-client/push: Add flag to read paths from stdin

authored by

Zhaofeng Li and committed by
GitHub
61ebdef2 c4c7341a

+160 -63
+101 -39
client/src/command/push.rs
··· 4 4 use anyhow::{anyhow, Result}; 5 5 use clap::Parser; 6 6 use indicatif::MultiProgress; 7 + use tokio::io::{self, AsyncBufReadExt, BufReader}; 7 8 8 9 use crate::api::ApiClient; 9 - use crate::cache::CacheRef; 10 + use crate::cache::{CacheName, CacheRef, ServerName}; 10 11 use crate::cli::Opts; 11 12 use crate::config::Config; 12 - use crate::push::{PushConfig, Pusher}; 13 + use crate::push::{PushConfig, PushSessionConfig, Pusher}; 13 14 use attic::nix_store::NixStore; 14 15 15 16 /// Push closures to a binary cache. ··· 23 24 24 25 /// The store paths to push. 25 26 paths: Vec<PathBuf>, 27 + 28 + /// Read paths from the standard input. 29 + #[clap(long)] 30 + stdin: bool, 26 31 27 32 /// Push the specified paths only and do not compute closures. 28 33 #[clap(long)] ··· 41 46 force_preamble: bool, 42 47 } 43 48 49 + struct PushContext { 50 + store: Arc<NixStore>, 51 + cache_name: CacheName, 52 + server_name: ServerName, 53 + pusher: Pusher, 54 + no_closure: bool, 55 + ignore_upstream_cache_filter: bool, 56 + } 57 + 58 + impl PushContext { 59 + async fn push_static(self, paths: Vec<PathBuf>) -> Result<()> { 60 + let roots = paths 61 + .into_iter() 62 + .map(|p| self.store.follow_store_path(p)) 63 + .collect::<std::result::Result<Vec<_>, _>>()?; 64 + 65 + let plan = self 66 + .pusher 67 + .plan(roots, self.no_closure, self.ignore_upstream_cache_filter) 68 + .await?; 69 + 70 + if plan.store_path_map.is_empty() { 71 + if plan.num_all_paths == 0 { 72 + eprintln!("🤷 Nothing selected."); 73 + } else { 74 + eprintln!( 75 + "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", 76 + num_already_cached = plan.num_already_cached, 77 + num_upstream = plan.num_upstream, 78 + ); 79 + } 80 + 81 + return Ok(()); 82 + } else { 83 + eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", 84 + cache = self.cache_name.as_str(), 85 + server = self.server_name.as_str(), 86 + num_missing_paths = plan.store_path_map.len(), 87 + num_already_cached = plan.num_already_cached, 88 + num_upstream = plan.num_upstream, 89 + ); 90 + } 91 + 92 + for (_, path_info) in plan.store_path_map { 93 + self.pusher.queue(path_info).await?; 94 + } 95 + 96 + let results = self.pusher.wait().await; 97 + results.into_values().collect::<Result<Vec<()>>>()?; 98 + 99 + Ok(()) 100 + } 101 + 102 + async fn push_stdin(self) -> Result<()> { 103 + let session = self.pusher.into_push_session(PushSessionConfig { 104 + no_closure: self.no_closure, 105 + ignore_upstream_cache_filter: self.ignore_upstream_cache_filter, 106 + }); 107 + 108 + let stdin = BufReader::new(io::stdin()); 109 + let mut lines = stdin.lines(); 110 + while let Some(line) = lines.next_line().await? { 111 + let path = self.store.follow_store_path(line)?; 112 + session.queue_many(vec![path])?; 113 + } 114 + 115 + let results = session.wait().await?; 116 + results.into_values().collect::<Result<Vec<()>>>()?; 117 + 118 + Ok(()) 119 + } 120 + } 121 + 44 122 pub async fn run(opts: Opts) -> Result<()> { 45 123 let sub = opts.command.as_push().unwrap(); 46 124 if sub.jobs == 0 { ··· 50 128 let config = Config::load()?; 51 129 52 130 let store = Arc::new(NixStore::connect()?); 53 - let roots = sub 54 - .paths 55 - .clone() 56 - .into_iter() 57 - .map(|p| store.follow_store_path(p)) 58 - .collect::<std::result::Result<Vec<_>, _>>()?; 59 131 60 - let (server_name, server, cache) = config.resolve_cache(&sub.cache)?; 132 + let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?; 61 133 62 134 let mut api = ApiClient::from_server_config(server.clone())?; 63 135 64 136 // Confirm remote cache validity, query cache config 65 - let cache_config = api.get_cache_config(cache).await?; 137 + let cache_config = api.get_cache_config(cache_name).await?; 66 138 67 139 if let Some(api_endpoint) = &cache_config.api_endpoint { 68 140 // Use delegated API endpoint ··· 76 148 77 149 let mp = MultiProgress::new(); 78 150 79 - let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config); 80 - let plan = pusher 81 - .plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter) 82 - .await?; 151 + let pusher = Pusher::new( 152 + store.clone(), 153 + api, 154 + cache_name.to_owned(), 155 + cache_config, 156 + mp, 157 + push_config, 158 + ); 83 159 84 - if plan.store_path_map.is_empty() { 85 - if plan.num_all_paths == 0 { 86 - eprintln!("🤷 Nothing selected."); 87 - } else { 88 - eprintln!( 89 - "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", 90 - num_already_cached = plan.num_already_cached, 91 - num_upstream = plan.num_upstream, 92 - ); 93 - } 160 + let push_ctx = PushContext { 161 + store, 162 + cache_name: cache_name.clone(), 163 + server_name: server_name.clone(), 164 + pusher, 165 + no_closure: sub.no_closure, 166 + ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter, 167 + }; 94 168 95 - return Ok(()); 169 + if sub.stdin { 170 + push_ctx.push_stdin().await?; 96 171 } else { 97 - eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", 98 - cache = cache.as_str(), 99 - server = server_name.as_str(), 100 - num_missing_paths = plan.store_path_map.len(), 101 - num_already_cached = plan.num_already_cached, 102 - num_upstream = plan.num_upstream, 103 - ); 172 + push_ctx.push_static(sub.paths.clone()).await?; 104 173 } 105 - 106 - for (_, path_info) in plan.store_path_map { 107 - pusher.queue(path_info).await?; 108 - } 109 - 110 - let results = pusher.wait().await; 111 - results.into_values().collect::<Result<Vec<()>>>()?; 112 174 113 175 Ok(()) 114 176 }
+59 -24
client/src/push.rs
··· 28 28 use futures::future::join_all; 29 29 use futures::stream::{Stream, TryStreamExt}; 30 30 use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; 31 - use tokio::sync::Mutex; 31 + use tokio::sync::{mpsc, Mutex}; 32 32 use tokio::task::{spawn, JoinHandle}; 33 33 use tokio::time; 34 34 ··· 100 100 /// seconds since the last path is queued or it's been 10 seconds in total. 101 101 pub struct PushSession { 102 102 /// Sender to the batching future. 103 - sender: channel::Sender<Vec<StorePath>>, 103 + sender: channel::Sender<SessionQueueCommand>, 104 + 105 + /// Receiver of results. 106 + result_receiver: mpsc::Receiver<Result<HashMap<StorePath, Result<()>>>>, 107 + } 108 + 109 + enum SessionQueueCommand { 110 + Paths(Vec<StorePath>), 111 + Flush, 112 + Terminate, 104 113 } 105 114 106 115 enum SessionQueuePoll { 107 116 Paths(Vec<StorePath>), 117 + Flush, 118 + Terminate, 108 119 Closed, 109 120 TimedOut, 110 121 } ··· 255 266 impl PushSession { 256 267 pub fn with_pusher(pusher: Pusher, config: PushSessionConfig) -> Self { 257 268 let (sender, receiver) = channel::unbounded(); 269 + let (result_sender, result_receiver) = mpsc::channel(1); 258 270 259 271 let known_paths_mutex = Arc::new(Mutex::new(HashSet::new())); 260 272 261 - // FIXME 262 273 spawn(async move { 263 - let pusher = Arc::new(pusher); 264 - loop { 265 - if let Err(e) = Self::worker( 266 - pusher.clone(), 267 - config, 268 - known_paths_mutex.clone(), 269 - receiver.clone(), 270 - ) 271 - .await 272 - { 273 - eprintln!("Worker exited: {:?}", e); 274 - } else { 275 - break; 276 - } 274 + if let Err(e) = Self::worker( 275 + pusher, 276 + config, 277 + known_paths_mutex.clone(), 278 + receiver.clone(), 279 + result_sender.clone(), 280 + ) 281 + .await 282 + { 283 + let _ = result_sender.send(Err(e)).await; 277 284 } 278 285 }); 279 286 280 - Self { sender } 287 + Self { 288 + sender, 289 + result_receiver, 290 + } 281 291 } 282 292 283 293 async fn worker( 284 - pusher: Arc<Pusher>, 294 + pusher: Pusher, 285 295 config: PushSessionConfig, 286 296 known_paths_mutex: Arc<Mutex<HashSet<StorePathHash>>>, 287 - receiver: channel::Receiver<Vec<StorePath>>, 297 + receiver: channel::Receiver<SessionQueueCommand>, 298 + result_sender: mpsc::Sender<Result<HashMap<StorePath, Result<()>>>>, 288 299 ) -> Result<()> { 289 300 let mut roots = HashSet::new(); 290 301 ··· 296 307 loop { 297 308 let poll = tokio::select! { 298 309 r = receiver.recv() => match r { 299 - Ok(paths) => SessionQueuePoll::Paths(paths), 310 + Ok(SessionQueueCommand::Paths(paths)) => SessionQueuePoll::Paths(paths), 311 + Ok(SessionQueueCommand::Flush) => SessionQueuePoll::Flush, 312 + Ok(SessionQueueCommand::Terminate) => SessionQueuePoll::Terminate, 300 313 _ => SessionQueuePoll::Closed, 301 314 }, 302 315 _ = time::sleep(Duration::from_secs(2)) => SessionQueuePoll::TimedOut, ··· 306 319 SessionQueuePoll::Paths(store_paths) => { 307 320 roots.extend(store_paths.into_iter()); 308 321 } 309 - SessionQueuePoll::Closed => { 322 + SessionQueuePoll::Closed | SessionQueuePoll::Terminate => { 310 323 break true; 311 324 } 312 - SessionQueuePoll::TimedOut => { 325 + SessionQueuePoll::Flush | SessionQueuePoll::TimedOut => { 313 326 break false; 314 327 } 315 328 } ··· 352 365 drop(known_paths); 353 366 354 367 if done { 368 + let result = pusher.wait().await; 369 + result_sender.send(Ok(result)).await?; 355 370 return Ok(()); 356 371 } 357 372 } 358 373 } 359 374 375 + /// Waits for all workers to terminate, returning all results. 376 + pub async fn wait(mut self) -> Result<HashMap<StorePath, Result<()>>> { 377 + self.flush()?; 378 + 379 + // The worker might have died 380 + let _ = self.sender.send(SessionQueueCommand::Terminate).await; 381 + 382 + self.result_receiver 383 + .recv() 384 + .await 385 + .expect("Nothing in result channel") 386 + } 387 + 360 388 /// Queues multiple store paths to be pushed. 361 389 pub fn queue_many(&self, store_paths: Vec<StorePath>) -> Result<()> { 362 390 self.sender 363 - .send_blocking(store_paths) 391 + .send_blocking(SessionQueueCommand::Paths(store_paths)) 392 + .map_err(|e| anyhow!(e)) 393 + } 394 + 395 + /// Flushes the worker queue. 396 + pub fn flush(&self) -> Result<()> { 397 + self.sender 398 + .send_blocking(SessionQueueCommand::Flush) 364 399 .map_err(|e| anyhow!(e)) 365 400 } 366 401 }