A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

client/push: Support retrieving results from a PushSession

+40 -19
+40 -19
client/src/push.rs
··· 28 28 use futures::future::join_all; 29 29 use futures::stream::{Stream, TryStreamExt}; 30 30 use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; 31 - use tokio::sync::Mutex; 31 + use tokio::sync::{mpsc, Mutex}; 32 32 use tokio::task::{spawn, JoinHandle}; 33 33 use tokio::time; 34 34 ··· 101 101 pub struct PushSession { 102 102 /// Sender to the batching future. 103 103 sender: channel::Sender<SessionQueueCommand>, 104 + 105 + /// Receiver of results. 106 + result_receiver: mpsc::Receiver<Result<HashMap<StorePath, Result<()>>>>, 104 107 } 105 108 106 109 enum SessionQueueCommand { 107 110 Paths(Vec<StorePath>), 108 111 Flush, 112 + Terminate, 109 113 } 110 114 111 115 enum SessionQueuePoll { 112 116 Paths(Vec<StorePath>), 113 117 Flush, 118 + Terminate, 114 119 Closed, 115 120 TimedOut, 116 121 } ··· 261 266 impl PushSession { 262 267 pub fn with_pusher(pusher: Pusher, config: PushSessionConfig) -> Self { 263 268 let (sender, receiver) = channel::unbounded(); 269 + let (result_sender, result_receiver) = mpsc::channel(1); 264 270 265 271 let known_paths_mutex = Arc::new(Mutex::new(HashSet::new())); 266 272 267 - // FIXME 268 273 spawn(async move { 269 - let pusher = Arc::new(pusher); 270 - loop { 271 - if let Err(e) = Self::worker( 272 - pusher.clone(), 273 - config, 274 - known_paths_mutex.clone(), 275 - receiver.clone(), 276 - ) 277 - .await 278 - { 279 - eprintln!("Worker exited: {:?}", e); 280 - } else { 281 - break; 282 - } 274 + if let Err(e) = Self::worker( 275 + pusher, 276 + config, 277 + known_paths_mutex.clone(), 278 + receiver.clone(), 279 + result_sender.clone(), 280 + ) 281 + .await 282 + { 283 + let _ = result_sender.send(Err(e)).await; 283 284 } 284 285 }); 285 286 286 - Self { sender } 287 + Self { 288 + sender, 289 + result_receiver, 290 + } 287 291 } 288 292 289 293 async fn worker( 290 - pusher: Arc<Pusher>, 294 + pusher: Pusher, 291 295 config: PushSessionConfig, 292 296 known_paths_mutex: Arc<Mutex<HashSet<StorePathHash>>>, 293 297 receiver: channel::Receiver<SessionQueueCommand>, 298 + result_sender: mpsc::Sender<Result<HashMap<StorePath, Result<()>>>>, 294 299 ) -> Result<()> { 295 300 let mut roots = HashSet::new(); 296 301 ··· 304 309 r = receiver.recv() => match r { 305 310 Ok(SessionQueueCommand::Paths(paths)) => SessionQueuePoll::Paths(paths), 306 311 Ok(SessionQueueCommand::Flush) => SessionQueuePoll::Flush, 312 + Ok(SessionQueueCommand::Terminate) => SessionQueuePoll::Terminate, 307 313 _ => SessionQueuePoll::Closed, 308 314 }, 309 315 _ = time::sleep(Duration::from_secs(2)) => SessionQueuePoll::TimedOut, ··· 313 319 SessionQueuePoll::Paths(store_paths) => { 314 320 roots.extend(store_paths.into_iter()); 315 321 } 316 - SessionQueuePoll::Closed => { 322 + SessionQueuePoll::Closed | SessionQueuePoll::Terminate => { 317 323 break true; 318 324 } 319 325 SessionQueuePoll::Flush | SessionQueuePoll::TimedOut => { ··· 359 365 drop(known_paths); 360 366 361 367 if done { 368 + let result = pusher.wait().await; 369 + result_sender.send(Ok(result)).await?; 362 370 return Ok(()); 363 371 } 364 372 } 373 + } 374 + 375 + /// Waits for all workers to terminate, returning all results. 376 + pub async fn wait(mut self) -> Result<HashMap<StorePath, Result<()>>> { 377 + self.flush()?; 378 + 379 + // The worker might have died 380 + let _ = self.sender.send(SessionQueueCommand::Terminate).await; 381 + 382 + self.result_receiver 383 + .recv() 384 + .await 385 + .expect("Nothing in result channel") 365 386 } 366 387 367 388 /// Queues multiple store paths to be pushed.