A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

client: Implement watch-store

+560 -172
+89
Cargo.lock
··· 194 194 "humantime", 195 195 "indicatif", 196 196 "lazy_static", 197 + "notify", 197 198 "regex", 198 199 "reqwest", 199 200 "serde", ··· 1535 1536 ] 1536 1537 1537 1538 [[package]] 1539 + name = "filetime" 1540 + version = "0.2.19" 1541 + source = "registry+https://github.com/rust-lang/crates.io-index" 1542 + checksum = "4e884668cd0c7480504233e951174ddc3b382f7c2666e3b7310b5c4e7b0c37f9" 1543 + dependencies = [ 1544 + "cfg-if", 1545 + "libc", 1546 + "redox_syscall", 1547 + "windows-sys 0.42.0", 1548 + ] 1549 + 1550 + [[package]] 1538 1551 name = "flate2" 1539 1552 version = "1.0.25" 1540 1553 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2024 2037 ] 2025 2038 2026 2039 [[package]] 2040 + name = "inotify" 2041 + version = "0.9.6" 2042 + source = "registry+https://github.com/rust-lang/crates.io-index" 2043 + checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" 2044 + dependencies = [ 2045 + "bitflags", 2046 + "inotify-sys", 2047 + "libc", 2048 + ] 2049 + 2050 + [[package]] 2051 + name = "inotify-sys" 2052 + version = "0.1.5" 2053 + source = "registry+https://github.com/rust-lang/crates.io-index" 2054 + checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" 2055 + dependencies = [ 2056 + "libc", 2057 + ] 2058 + 2059 + [[package]] 2027 2060 name = "instant" 2028 2061 version = "0.1.12" 2029 2062 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2132 2165 ] 2133 2166 2134 2167 [[package]] 2168 + name = "kqueue" 2169 + version = "1.0.7" 2170 + source = "registry+https://github.com/rust-lang/crates.io-index" 2171 + checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98" 2172 + dependencies = [ 2173 + "kqueue-sys", 2174 + "libc", 2175 + ] 2176 + 2177 + [[package]] 2178 + name = "kqueue-sys" 2179 + version = "1.0.3" 2180 + source = "registry+https://github.com/rust-lang/crates.io-index" 2181 + checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" 2182 + dependencies = [ 2183 + "bitflags", 2184 + "libc", 2185 + ] 2186 + 2187 + [[package]] 2135 2188 name = "lazy_static" 2136 2189 version = "1.4.0" 2137 2190 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2306 2359 dependencies = [ 2307 2360 "memchr", 2308 2361 "minimal-lexical", 2362 + ] 2363 + 2364 + [[package]] 2365 + name = "notify" 2366 + version = "5.1.0" 2367 + source = "registry+https://github.com/rust-lang/crates.io-index" 2368 + checksum = "58ea850aa68a06e48fdb069c0ec44d0d64c8dbffa49bf3b6f7f0a901fdea1ba9" 2369 + dependencies = [ 2370 + "bitflags", 2371 + "filetime", 2372 + "inotify", 2373 + "kqueue", 2374 + "libc", 2375 + "mio", 2376 + "walkdir", 2377 + "windows-sys 0.42.0", 2309 2378 ] 2310 2379 2311 2380 [[package]] ··· 2994 3063 version = "1.0.12" 2995 3064 source = "registry+https://github.com/rust-lang/crates.io-index" 2996 3065 checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" 3066 + 3067 + [[package]] 3068 + name = "same-file" 3069 + version = "1.0.6" 3070 + source = "registry+https://github.com/rust-lang/crates.io-index" 3071 + checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" 3072 + dependencies = [ 3073 + "winapi-util", 3074 + ] 2997 3075 2998 3076 [[package]] 2999 3077 name = "schannel" ··· 4105 4183 version = "0.9.4" 4106 4184 source = "registry+https://github.com/rust-lang/crates.io-index" 4107 4185 checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" 4186 + 4187 + [[package]] 4188 + name = "walkdir" 4189 + version = "2.3.2" 4190 + source = "registry+https://github.com/rust-lang/crates.io-index" 4191 + checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" 4192 + dependencies = [ 4193 + "same-file", 4194 + "winapi", 4195 + "winapi-util", 4196 + ] 4108 4197 4109 4198 [[package]] 4110 4199 name = "want"
+1
client/Cargo.toml
··· 24 24 humantime = "2.1.0" 25 25 indicatif = "0.17.2" 26 26 lazy_static = "1.4.0" 27 + notify = { version = "5.1.0", default-features = false, features = ["macos_kqueue"] } 27 28 regex = "1.7.0" 28 29 reqwest = { version = "0.11.13", default-features = false, features = ["json", "rustls-tls", "rustls-tls-native-roots", "stream"] } 29 30 serde = { version = "1.0.151", features = ["derive"] }
+3
client/src/cli.rs
··· 12 12 use crate::command::login::{self, Login}; 13 13 use crate::command::push::{self, Push}; 14 14 use crate::command::r#use::{self, Use}; 15 + use crate::command::watch_store::{self, WatchStore}; 15 16 16 17 /// Attic binary cache client. 17 18 #[derive(Debug, Parser)] ··· 28 29 Use(Use), 29 30 Push(Push), 30 31 Cache(Cache), 32 + WatchStore(WatchStore), 31 33 32 34 #[clap(hide = true)] 33 35 GetClosure(GetClosure), ··· 53 55 Command::Use(_) => r#use::run(opts).await, 54 56 Command::Push(_) => push::run(opts).await, 55 57 Command::Cache(_) => cache::run(opts).await, 58 + Command::WatchStore(_) => watch_store::run(opts).await, 56 59 Command::GetClosure(_) => get_closure::run(opts).await, 57 60 } 58 61 }
+1
client/src/command/mod.rs
··· 3 3 pub mod login; 4 4 pub mod push; 5 5 pub mod r#use; 6 + pub mod watch_store;
+21 -137
client/src/command/push.rs
··· 1 - use std::collections::{HashMap, HashSet}; 2 - use std::cmp; 3 1 use std::path::PathBuf; 4 2 use std::sync::Arc; 5 3 6 4 use anyhow::{anyhow, Result}; 7 5 use clap::Parser; 8 - use futures::future::join_all; 9 6 use indicatif::MultiProgress; 10 7 11 8 use crate::api::ApiClient; 12 - use crate::cache::{CacheName, CacheRef}; 9 + use crate::cache::CacheRef; 13 10 use crate::cli::Opts; 14 11 use crate::config::Config; 15 12 use crate::push::{Pusher, PushConfig}; 16 - use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo}; 13 + use attic::nix_store::NixStore; 17 14 18 15 /// Push closures to a binary cache. 19 16 #[derive(Debug, Parser)] ··· 41 38 force_preamble: bool, 42 39 } 43 40 44 - struct PushPlan { 45 - /// Store paths to push. 46 - store_path_map: HashMap<StorePathHash, ValidPathInfo>, 47 - 48 - /// The number of paths in the original full closure. 49 - num_all_paths: usize, 50 - 51 - /// Number of paths that have been filtered out because they are already cached. 52 - num_already_cached: usize, 53 - 54 - /// Number of paths that have been filtered out because they are signed by an upstream cache. 55 - num_upstream: usize, 56 - } 57 - 58 41 pub async fn run(opts: Opts) -> Result<()> { 59 42 let sub = opts.command.as_push().unwrap(); 60 43 if sub.jobs == 0 { ··· 74 57 let (server_name, server, cache) = config.resolve_cache(&sub.cache)?; 75 58 76 59 let mut api = ApiClient::from_server_config(server.clone())?; 77 - let plan = PushPlan::plan( 78 - store.clone(), 79 - &mut api, 80 - cache, 81 - roots, 82 - sub.no_closure, 83 - sub.ignore_upstream_cache_filter, 84 - ) 85 - .await?; 60 + 61 + // Confirm remote cache validity, query cache config 62 + let cache_config = api.get_cache_config(cache).await?; 63 + 64 + if let Some(api_endpoint) = &cache_config.api_endpoint { 65 + // Use delegated API endpoint 66 + api.set_endpoint(api_endpoint)?; 67 + } 68 + 69 + let push_config = PushConfig { 70 + num_workers: sub.jobs, 71 + force_preamble: sub.force_preamble, 72 + }; 73 + 74 + let mp = MultiProgress::new(); 75 + 76 + let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config); 77 + let plan = pusher.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter).await?; 86 78 87 79 if plan.store_path_map.is_empty() { 88 80 if plan.num_all_paths == 0 { ··· 106 98 ); 107 99 } 108 100 109 - let push_config = PushConfig { 110 - num_workers: cmp::min(sub.jobs, plan.store_path_map.len()), 111 - force_preamble: sub.force_preamble, 112 - }; 113 - 114 - let mp = MultiProgress::new(); 115 - 116 - let pusher = Pusher::new(store, api, cache.to_owned(), mp, push_config); 117 101 for (_, path_info) in plan.store_path_map { 118 - pusher.push(path_info).await?; 102 + pusher.queue(path_info).await?; 119 103 } 120 104 121 105 let results = pusher.wait().await; ··· 123 107 124 108 Ok(()) 125 109 } 126 - 127 - impl PushPlan { 128 - /// Creates a plan. 129 - async fn plan( 130 - store: Arc<NixStore>, 131 - api: &mut ApiClient, 132 - cache: &CacheName, 133 - roots: Vec<StorePath>, 134 - no_closure: bool, 135 - ignore_upstream_filter: bool, 136 - ) -> Result<Self> { 137 - // Compute closure 138 - let closure = if no_closure { 139 - roots 140 - } else { 141 - store 142 - .compute_fs_closure_multi(roots, false, false, false) 143 - .await? 144 - }; 145 - 146 - let mut store_path_map: HashMap<StorePathHash, ValidPathInfo> = { 147 - let futures = closure 148 - .iter() 149 - .map(|path| { 150 - let store = store.clone(); 151 - let path = path.clone(); 152 - let path_hash = path.to_hash(); 153 - 154 - async move { 155 - let path_info = store.query_path_info(path).await?; 156 - Ok((path_hash, path_info)) 157 - } 158 - }) 159 - .collect::<Vec<_>>(); 160 - 161 - join_all(futures).await.into_iter().collect::<Result<_>>()? 162 - }; 163 - 164 - let num_all_paths = store_path_map.len(); 165 - if store_path_map.is_empty() { 166 - return Ok(Self { 167 - store_path_map, 168 - num_all_paths, 169 - num_already_cached: 0, 170 - num_upstream: 0, 171 - }); 172 - } 173 - 174 - // Confirm remote cache validity, query cache config 175 - let cache_config = api.get_cache_config(cache).await?; 176 - 177 - if let Some(api_endpoint) = &cache_config.api_endpoint { 178 - // Use delegated API endpoint 179 - api.set_endpoint(api_endpoint)?; 180 - } 181 - 182 - if !ignore_upstream_filter { 183 - // Filter out paths signed by upstream caches 184 - let upstream_cache_key_names = 185 - cache_config.upstream_cache_key_names.unwrap_or_default(); 186 - store_path_map.retain(|_, pi| { 187 - for sig in &pi.sigs { 188 - if let Some((name, _)) = sig.split_once(':') { 189 - if upstream_cache_key_names.iter().any(|u| name == u) { 190 - return false; 191 - } 192 - } 193 - } 194 - 195 - true 196 - }); 197 - } 198 - 199 - let num_filtered_paths = store_path_map.len(); 200 - if store_path_map.is_empty() { 201 - return Ok(Self { 202 - store_path_map, 203 - num_all_paths, 204 - num_already_cached: 0, 205 - num_upstream: num_all_paths - num_filtered_paths, 206 - }); 207 - } 208 - 209 - // Query missing paths 210 - let missing_path_hashes: HashSet<StorePathHash> = { 211 - let store_path_hashes = store_path_map.keys().map(|sph| sph.to_owned()).collect(); 212 - let res = api.get_missing_paths(cache, store_path_hashes).await?; 213 - res.missing_paths.into_iter().collect() 214 - }; 215 - store_path_map.retain(|sph, _| missing_path_hashes.contains(sph)); 216 - let num_missing_paths = store_path_map.len(); 217 - 218 - Ok(Self { 219 - store_path_map, 220 - num_all_paths, 221 - num_already_cached: num_filtered_paths - num_missing_paths, 222 - num_upstream: num_all_paths - num_filtered_paths, 223 - }) 224 - } 225 - }
+114
client/src/command/watch_store.rs
··· 1 + use std::path::{Path, PathBuf}; 2 + use std::sync::Arc; 3 + 4 + use anyhow::{anyhow, Result}; 5 + use clap::Parser; 6 + use indicatif::MultiProgress; 7 + use notify::{RecursiveMode, Watcher, EventKind}; 8 + 9 + use crate::api::ApiClient; 10 + use crate::cache::CacheRef; 11 + use crate::cli::Opts; 12 + use crate::config::Config; 13 + use crate::push::{Pusher, PushConfig, PushSessionConfig}; 14 + use attic::nix_store::{NixStore, StorePath}; 15 + 16 + /// Watch the Nix Store for new paths and upload them to a binary cache. 17 + #[derive(Debug, Parser)] 18 + pub struct WatchStore { 19 + /// The cache to push to. 20 + cache: CacheRef, 21 + 22 + /// Push the new paths only and do not compute closures. 23 + #[clap(long)] 24 + no_closure: bool, 25 + 26 + /// Ignore the upstream cache filter. 27 + #[clap(long)] 28 + ignore_upstream_cache_filter: bool, 29 + 30 + /// The maximum number of parallel upload processes. 31 + #[clap(short = 'j', long, default_value = "5")] 32 + jobs: usize, 33 + 34 + /// Always send the upload info as part of the payload. 35 + #[clap(long, hide = true)] 36 + force_preamble: bool, 37 + } 38 + 39 + pub async fn run(opts: Opts) -> Result<()> { 40 + let sub = opts.command.as_watch_store().unwrap(); 41 + if sub.jobs == 0 { 42 + return Err(anyhow!("The number of jobs cannot be 0")); 43 + } 44 + 45 + let config = Config::load()?; 46 + 47 + let store = Arc::new(NixStore::connect()?); 48 + let store_dir = store.store_dir().to_owned(); 49 + 50 + let (server_name, server, cache) = config.resolve_cache(&sub.cache)?; 51 + let mut api = ApiClient::from_server_config(server.clone())?; 52 + 53 + // Confirm remote cache validity, query cache config 54 + let cache_config = api.get_cache_config(cache).await?; 55 + 56 + if let Some(api_endpoint) = &cache_config.api_endpoint { 57 + // Use delegated API endpoint 58 + api.set_endpoint(api_endpoint)?; 59 + } 60 + 61 + let push_config = PushConfig { 62 + num_workers: sub.jobs, 63 + force_preamble: sub.force_preamble, 64 + }; 65 + 66 + let push_session_config = PushSessionConfig { 67 + no_closure: sub.no_closure, 68 + ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter, 69 + }; 70 + 71 + let mp = MultiProgress::new(); 72 + let session = Pusher::new(store.clone(), api, cache.to_owned(), cache_config, mp, push_config) 73 + .into_push_session(push_session_config); 74 + 75 + let mut watcher = notify::recommended_watcher(move |res: notify::Result<notify::Event>| { 76 + match res { 77 + Ok(event) => { 78 + // We watch the removals of lock files which signify 79 + // store paths becoming valid 80 + if let EventKind::Remove(_) = event.kind { 81 + let paths = event.paths 82 + .iter() 83 + .filter_map(|p| { 84 + let base = strip_lock_file(&p)?; 85 + store.parse_store_path(base).ok() 86 + }) 87 + .collect::<Vec<StorePath>>(); 88 + 89 + if !paths.is_empty() { 90 + session.queue_many(paths).unwrap(); 91 + } 92 + } 93 + } 94 + Err(e) => eprintln!("Error during watch: {:?}", e), 95 + } 96 + })?; 97 + 98 + watcher.watch(&store_dir, RecursiveMode::NonRecursive)?; 99 + 100 + eprintln!("👀 Pushing new store paths to \"{cache}\" on \"{server}\"", 101 + cache = cache.as_str(), 102 + server = server_name.as_str(), 103 + ); 104 + 105 + loop { 106 + } 107 + } 108 + 109 + fn strip_lock_file(p: &Path) -> Option<PathBuf> { 110 + p.to_str() 111 + .and_then(|p| p.strip_suffix(".lock")) 112 + .filter(|t| !t.ends_with(".drv")) 113 + .map(PathBuf::from) 114 + }
+331 -35
client/src/push.rs
··· 1 1 //! Store path uploader. 2 2 //! 3 - //! Multiple workers are spawned to upload store paths concurrently. 3 + //! There are two APIs: `Pusher` and `PushSession`. 4 + //! 5 + //! A `Pusher` simply dispatches `ValidPathInfo`s for workers to push. Use this 6 + //! when you know all store paths to push beforehand. The push plan (closure, missing 7 + //! paths, all path metadata) should be computed prior to pushing. 8 + //! 9 + //! A `PushSession`, on the other hand, accepts a stream of `StorePath`s and 10 + //! takes care of retrieving the closure and path metadata. It automatically 11 + //! batches expensive operations (closure computation, querying missing paths). 12 + //! Use this when the list of store paths is streamed from some external 13 + //! source (e.g., FS watcher, Unix Domain Socket) and a push plan cannot be 14 + //! created statically. 4 15 //! 5 16 //! TODO: Refactor out progress reporting and support a simple output style without progress bars 6 17 7 - use std::collections::HashMap; 18 + use std::collections::{HashMap, HashSet}; 8 19 use std::fmt::Write; 9 20 use std::pin::Pin; 10 21 use std::sync::Arc; ··· 18 29 use futures::future::join_all; 19 30 use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; 20 31 use tokio::task::{JoinHandle, spawn}; 32 + use tokio::time; 33 + use tokio::sync::Mutex; 21 34 35 + use attic::api::v1::cache_config::CacheConfig; 22 36 use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind}; 23 37 use attic::cache::CacheName; 24 38 use attic::error::AtticResult; 25 - use attic::nix_store::{NixStore, StorePath, ValidPathInfo}; 39 + use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo}; 26 40 use crate::api::ApiClient; 27 41 28 42 type JobSender = channel::Sender<ValidPathInfo>; ··· 38 52 pub force_preamble: bool, 39 53 } 40 54 55 + /// Configuration for a push session. 56 + #[derive(Clone, Copy, Debug)] 57 + pub struct PushSessionConfig { 58 + /// Push the specified paths only and do not compute closures. 59 + pub no_closure: bool, 60 + 61 + /// Ignore the upstream cache filter. 62 + pub ignore_upstream_cache_filter: bool, 63 + } 64 + 41 65 /// A handle to push store paths to a cache. 42 66 /// 43 67 /// The caller is responsible for computing closures and 44 68 /// checking for paths that already exist on the remote 45 69 /// cache. 46 70 pub struct Pusher { 71 + api: ApiClient, 72 + store: Arc<NixStore>, 73 + cache: CacheName, 74 + cache_config: CacheConfig, 47 75 workers: Vec<JoinHandle<HashMap<StorePath, Result<()>>>>, 48 76 sender: JobSender, 49 77 } 50 78 79 + /// A wrapper over a `Pusher` that accepts a stream of `StorePath`s. 80 + /// 81 + /// Unlike a `Pusher`, a `PushSession` takes a stream of `StorePath`s 82 + /// instead of `ValidPathInfo`s, taking care of retrieving the closure 83 + /// and path metadata. 84 + /// 85 + /// This is useful when the list of store paths is streamed from some 86 + /// external source (e.g., FS watcher, Unix Domain Socket) and a push 87 + /// plan cannot be computed statically. 88 + /// 89 + /// ## Batching 90 + /// 91 + /// Many store paths can be built in a short period of time, with each 92 + /// having a big closure. It can be very inefficient if we were to compute 93 + /// closure and query for missing paths for each individual path. This is 94 + /// especially true if we have a lot of remote builders (e.g., `attic watch-store` 95 + /// running alongside a beefy Hydra instance). 96 + /// 97 + /// `PushSession` batches operations in order to minimize the number of 98 + /// closure computations and API calls. It also remembers which paths already 99 + /// exist on the remote cache. By default, it submits a batch if it's been 2 100 + /// seconds since the last path is queued or it's been 10 seconds in total. 101 + pub struct PushSession { 102 + /// Sender to the batching future. 103 + sender: channel::Sender<Vec<StorePath>>, 104 + } 105 + 106 + enum SessionQueuePoll { 107 + Paths(Vec<StorePath>), 108 + Closed, 109 + TimedOut, 110 + } 111 + 112 + #[derive(Debug)] 113 + pub struct PushPlan { 114 + /// Store paths to push. 115 + pub store_path_map: HashMap<StorePathHash, ValidPathInfo>, 116 + 117 + /// The number of paths in the original full closure. 118 + pub num_all_paths: usize, 119 + 120 + /// Number of paths that have been filtered out because they are already cached. 121 + pub num_already_cached: usize, 122 + 123 + /// Number of paths that have been filtered out because they are signed by an upstream cache. 124 + pub num_upstream: usize, 125 + } 126 + 51 127 /// Wrapper to update a progress bar as a NAR is streamed. 52 128 struct NarStreamProgress<S> { 53 129 stream: S, ··· 55 131 } 56 132 57 133 impl Pusher { 58 - pub fn new(store: Arc<NixStore>, api: ApiClient, cache: CacheName, mp: MultiProgress, config: PushConfig) -> Self { 134 + pub fn new(store: Arc<NixStore>, api: ApiClient, cache: CacheName, cache_config: CacheConfig, mp: MultiProgress, config: PushConfig) -> Self { 59 135 let (sender, receiver) = channel::unbounded(); 60 136 let mut workers = Vec::new(); 61 137 62 138 for _ in 0..config.num_workers { 63 - workers.push(spawn(worker( 139 + workers.push(spawn(Self::worker( 64 140 receiver.clone(), 65 141 store.clone(), 66 142 api.clone(), ··· 70 146 ))); 71 147 } 72 148 73 - Self { workers, sender } 149 + Self { api, store, cache, cache_config, workers, sender } 74 150 } 75 151 76 - /// Sends a path to be pushed. 77 - pub async fn push(&self, path_info: ValidPathInfo) -> Result<()> { 152 + /// Queues a store path to be pushed. 153 + pub async fn queue(&self, path_info: ValidPathInfo) -> Result<()> { 78 154 self.sender.send(path_info).await 79 155 .map_err(|e| anyhow!(e)) 80 156 } ··· 96 172 97 173 results 98 174 } 175 + 176 + /// Creates a push plan. 177 + pub async fn plan(&self, roots: Vec<StorePath>, no_closure: bool, ignore_upstream_filter: bool) -> Result<PushPlan> { 178 + PushPlan::plan( 179 + self.store.clone(), 180 + &self.api, 181 + &self.cache, 182 + &self.cache_config, 183 + roots, 184 + no_closure, 185 + ignore_upstream_filter, 186 + ).await 187 + } 188 + 189 + /// Converts the pusher into a `PushSession`. 190 + /// 191 + /// This is useful when the list of store paths is streamed from some 192 + /// external source (e.g., FS watcher, Unix Domain Socket) and a push 193 + /// plan cannot be computed statically. 194 + pub fn into_push_session(self, config: PushSessionConfig) -> PushSession { 195 + PushSession::with_pusher(self, config) 196 + } 197 + 198 + async fn worker( 199 + receiver: JobReceiver, 200 + store: Arc<NixStore>, 201 + api: ApiClient, 202 + cache: CacheName, 203 + mp: MultiProgress, 204 + config: PushConfig, 205 + ) -> HashMap<StorePath, Result<()>> { 206 + let mut results = HashMap::new(); 207 + 208 + loop { 209 + let path_info = match receiver.recv().await { 210 + Ok(path_info) => path_info, 211 + Err(_) => { 212 + // channel is closed - we are done 213 + break; 214 + } 215 + }; 216 + 217 + let store_path = path_info.path.clone(); 218 + 219 + let r = upload_path( 220 + path_info, 221 + store.clone(), 222 + api.clone(), 223 + &cache, 224 + mp.clone(), 225 + config.force_preamble, 226 + ).await; 227 + 228 + results.insert(store_path, r); 229 + } 230 + 231 + results 232 + } 99 233 } 100 234 101 - async fn worker( 102 - receiver: JobReceiver, 103 - store: Arc<NixStore>, 104 - api: ApiClient, 105 - cache: CacheName, 106 - mp: MultiProgress, 107 - config: PushConfig, 108 - ) -> HashMap<StorePath, Result<()>> { 109 - let mut results = HashMap::new(); 235 + impl PushSession { 236 + pub fn with_pusher(pusher: Pusher, config: PushSessionConfig) -> Self { 237 + let (sender, receiver) = channel::unbounded(); 238 + 239 + let known_paths_mutex = Arc::new(Mutex::new(HashSet::new())); 110 240 111 - loop { 112 - let path_info = match receiver.recv().await { 113 - Ok(path_info) => path_info, 114 - Err(_) => { 115 - // channel is closed - we are done 116 - break; 241 + // FIXME 242 + spawn(async move { 243 + let pusher = Arc::new(pusher); 244 + loop { 245 + if let Err(e) = Self::worker( 246 + pusher.clone(), 247 + config.clone(), 248 + known_paths_mutex.clone(), 249 + receiver.clone(), 250 + ).await { 251 + eprintln!("Worker exited: {:?}", e); 252 + } else { 253 + break; 254 + } 117 255 } 256 + }); 257 + 258 + Self { 259 + sender, 260 + } 261 + } 262 + 263 + async fn worker( 264 + pusher: Arc<Pusher>, 265 + config: PushSessionConfig, 266 + known_paths_mutex: Arc<Mutex<HashSet<StorePathHash>>>, 267 + receiver: channel::Receiver<Vec<StorePath>>, 268 + ) -> Result<()> { 269 + let mut roots = HashSet::new(); 270 + 271 + loop { 272 + // Get outstanding paths in queue 273 + let done = tokio::select! { 274 + // 2 seconds since last queued path 275 + done = async { 276 + loop { 277 + let poll = tokio::select! { 278 + r = receiver.recv() => match r { 279 + Ok(paths) => SessionQueuePoll::Paths(paths), 280 + _ => SessionQueuePoll::Closed, 281 + }, 282 + _ = time::sleep(Duration::from_secs(2)) => SessionQueuePoll::TimedOut, 283 + }; 284 + 285 + match poll { 286 + SessionQueuePoll::Paths(store_paths) => { 287 + roots.extend(store_paths.into_iter()); 288 + } 289 + SessionQueuePoll::Closed => { 290 + break true; 291 + } 292 + SessionQueuePoll::TimedOut => { 293 + break false; 294 + } 295 + } 296 + } 297 + } => done, 298 + 299 + // 10 seconds 300 + _ = time::sleep(Duration::from_secs(10)) => { 301 + false 302 + }, 303 + }; 304 + 305 + // Compute push plan 306 + let roots_vec: Vec<StorePath> = { 307 + let known_paths = known_paths_mutex.lock().await; 308 + roots.drain() 309 + .filter(|root| !known_paths.contains(&root.to_hash())) 310 + .collect() 311 + }; 312 + 313 + let mut plan = pusher.plan(roots_vec, config.no_closure, config.ignore_upstream_cache_filter).await?; 314 + 315 + let mut known_paths = known_paths_mutex.lock().await; 316 + plan.store_path_map 317 + .retain(|sph, _| !known_paths.contains(&sph)); 318 + 319 + // Push everything 320 + for (store_path_hash, path_info) in plan.store_path_map.into_iter() { 321 + pusher.queue(path_info).await?; 322 + known_paths.insert(store_path_hash); 323 + } 324 + 325 + drop(known_paths); 326 + 327 + if done { 328 + return Ok(()); 329 + } 330 + } 331 + } 332 + 333 + /// Queues multiple store paths to be pushed. 334 + pub fn queue_many(&self, store_paths: Vec<StorePath>) -> Result<()> { 335 + self.sender.send_blocking(store_paths) 336 + .map_err(|e| anyhow!(e)) 337 + } 338 + } 339 + 340 + impl PushPlan { 341 + /// Creates a plan. 342 + async fn plan( 343 + store: Arc<NixStore>, 344 + api: &ApiClient, 345 + cache: &CacheName, 346 + cache_config: &CacheConfig, 347 + roots: Vec<StorePath>, 348 + no_closure: bool, 349 + ignore_upstream_filter: bool, 350 + ) -> Result<Self> { 351 + // Compute closure 352 + let closure = if no_closure { 353 + roots 354 + } else { 355 + store 356 + .compute_fs_closure_multi(roots, false, false, false) 357 + .await? 118 358 }; 119 359 120 - let store_path = path_info.path.clone(); 360 + let mut store_path_map: HashMap<StorePathHash, ValidPathInfo> = { 361 + let futures = closure 362 + .iter() 363 + .map(|path| { 364 + let store = store.clone(); 365 + let path = path.clone(); 366 + let path_hash = path.to_hash(); 367 + 368 + async move { 369 + let path_info = store.query_path_info(path).await?; 370 + Ok((path_hash, path_info)) 371 + } 372 + }) 373 + .collect::<Vec<_>>(); 374 + 375 + join_all(futures).await.into_iter().collect::<Result<_>>()? 376 + }; 377 + 378 + let num_all_paths = store_path_map.len(); 379 + if store_path_map.is_empty() { 380 + return Ok(Self { 381 + store_path_map, 382 + num_all_paths, 383 + num_already_cached: 0, 384 + num_upstream: 0, 385 + }); 386 + } 387 + 388 + if !ignore_upstream_filter { 389 + // Filter out paths signed by upstream caches 390 + let upstream_cache_key_names = 391 + cache_config.upstream_cache_key_names.as_ref().map_or([].as_slice(), |v| v.as_slice()); 392 + store_path_map.retain(|_, pi| { 393 + for sig in &pi.sigs { 394 + if let Some((name, _)) = sig.split_once(':') { 395 + if upstream_cache_key_names.iter().any(|u| name == u) { 396 + return false; 397 + } 398 + } 399 + } 400 + 401 + true 402 + }); 403 + } 404 + 405 + let num_filtered_paths = store_path_map.len(); 406 + if store_path_map.is_empty() { 407 + return Ok(Self { 408 + store_path_map, 409 + num_all_paths, 410 + num_already_cached: 0, 411 + num_upstream: num_all_paths - num_filtered_paths, 412 + }); 413 + } 121 414 122 - let r = upload_path( 123 - path_info, 124 - store.clone(), 125 - api.clone(), 126 - &cache, 127 - mp.clone(), 128 - config.force_preamble, 129 - ).await; 415 + // Query missing paths 416 + let missing_path_hashes: HashSet<StorePathHash> = { 417 + let store_path_hashes = store_path_map.keys().map(|sph| sph.to_owned()).collect(); 418 + let res = api.get_missing_paths(cache, store_path_hashes).await?; 419 + res.missing_paths.into_iter().collect() 420 + }; 421 + store_path_map.retain(|sph, _| missing_path_hashes.contains(sph)); 422 + let num_missing_paths = store_path_map.len(); 130 423 131 - results.insert(store_path, r); 424 + Ok(Self { 425 + store_path_map, 426 + num_all_paths, 427 + num_already_cached: num_filtered_paths - num_missing_paths, 428 + num_upstream: num_all_paths - num_filtered_paths, 429 + }) 132 430 } 133 - 134 - results 135 431 } 136 432 137 433 /// Uploads a single path to a cache.