A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

client/push: Support flushing a PushSession

+19 -5
+19 -5
client/src/push.rs
··· 100 100 /// seconds since the last path is queued or it's been 10 seconds in total. 101 101 pub struct PushSession { 102 102 /// Sender to the batching future. 103 - sender: channel::Sender<Vec<StorePath>>, 103 + sender: channel::Sender<SessionQueueCommand>, 104 + } 105 + 106 + enum SessionQueueCommand { 107 + Paths(Vec<StorePath>), 108 + Flush, 104 109 } 105 110 106 111 enum SessionQueuePoll { 107 112 Paths(Vec<StorePath>), 113 + Flush, 108 114 Closed, 109 115 TimedOut, 110 116 } ··· 284 290 pusher: Arc<Pusher>, 285 291 config: PushSessionConfig, 286 292 known_paths_mutex: Arc<Mutex<HashSet<StorePathHash>>>, 287 - receiver: channel::Receiver<Vec<StorePath>>, 293 + receiver: channel::Receiver<SessionQueueCommand>, 288 294 ) -> Result<()> { 289 295 let mut roots = HashSet::new(); 290 296 ··· 296 302 loop { 297 303 let poll = tokio::select! { 298 304 r = receiver.recv() => match r { 299 - Ok(paths) => SessionQueuePoll::Paths(paths), 305 + Ok(SessionQueueCommand::Paths(paths)) => SessionQueuePoll::Paths(paths), 306 + Ok(SessionQueueCommand::Flush) => SessionQueuePoll::Flush, 300 307 _ => SessionQueuePoll::Closed, 301 308 }, 302 309 _ = time::sleep(Duration::from_secs(2)) => SessionQueuePoll::TimedOut, ··· 309 316 SessionQueuePoll::Closed => { 310 317 break true; 311 318 } 312 - SessionQueuePoll::TimedOut => { 319 + SessionQueuePoll::Flush | SessionQueuePoll::TimedOut => { 313 320 break false; 314 321 } 315 322 } ··· 360 367 /// Queues multiple store paths to be pushed. 361 368 pub fn queue_many(&self, store_paths: Vec<StorePath>) -> Result<()> { 362 369 self.sender 363 - .send_blocking(store_paths) 370 + .send_blocking(SessionQueueCommand::Paths(store_paths)) 371 + .map_err(|e| anyhow!(e)) 372 + } 373 + 374 + /// Flushes the worker queue. 375 + pub fn flush(&self) -> Result<()> { 376 + self.sender 377 + .send_blocking(SessionQueueCommand::Flush) 364 378 .map_err(|e| anyhow!(e)) 365 379 } 366 380 }