lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

yet another attempt to spread backfill resyncs

phil 08ad6919 99ef9b2e

+524 -81
+9 -2
src/main.rs
··· 10 10 use lightrail::error::{Error, Result}; 11 11 use lightrail::identity; 12 12 use lightrail::storage; 13 + use lightrail::sync::discovery_queue::DiscoveryQueue; 13 14 use lightrail::sync::{self, backfill, firehose, resync}; 14 15 use lightrail::util::TokenExt; 15 16 ··· 159 160 std::sync::Mutex::new(resync::dispatcher::DispatcherSnapshot::default()), 160 161 ); 161 162 163 + let discovery_queue = std::sync::Arc::new(DiscoveryQueue::new(4096, 36)); 164 + 162 165 let mut tasks: JoinSet<Result<()>> = JoinSet::new(); 163 166 164 167 tasks.spawn({ ··· 189 192 let client = client.clone(); 190 193 let host = subscribe_host.clone(); 191 194 let resolver = resolver.clone(); 195 + let discovery_queue = discovery_queue.clone(); 192 196 async move { 193 197 if backfill::run( 194 198 host, ··· 197 201 token.clone(), 198 202 resolver, 199 203 false, 200 - args.crawl_qps, 204 + discovery_queue, 201 205 ) 202 206 .await 203 207 .inspect(|_| info!("backfill done.")) ··· 216 220 let client = client.clone(); 217 221 let resolver = resolver.clone(); 218 222 let dispatcher_state = dispatcher_state.clone(); 223 + let discovery_queue = discovery_queue.clone(); 219 224 async move { 220 225 resync::dispatcher::run(resync::DispatcherConfig { 221 226 resolver, ··· 227 232 token, 228 233 force_get_repo: args.heavy, 229 234 state: dispatcher_state, 235 + discovery_queue, 230 236 }) 231 237 .await 232 238 .inspect(|_| info!("resync done.")) ··· 284 290 let client = client.clone(); 285 291 let host = subscribe_host.clone(); 286 292 let resolver = resolver.clone(); 293 + let discovery_queue = discovery_queue.clone(); 287 294 async move { 288 295 sync::deep_crawl::run( 289 296 host, ··· 292 299 args.max_deep_crawl_workers, 293 300 token, 294 301 resolver, 295 - args.crawl_qps, 302 + discovery_queue, 296 303 ) 297 304 .await 298 305 .inspect(|_| info!("deep crawl done."))
+28 -60
src/sync/backfill.rs
··· 6 6 //! (any state) are skipped — the dispatcher's retry mechanism handles repos 7 7 //! that need re-syncing. 8 8 9 - use std::collections::HashMap; 10 - use std::num::NonZeroU32; 11 9 use std::sync::Arc; 12 10 13 11 use jacquard_api::com_atproto::sync::list_repos::ListRepos; 14 12 use jacquard_common::{ 15 13 error::ClientErrorKind, 16 14 types::string::Did, 17 - url::Host, 15 + url::{Host, Url}, 18 16 {IntoStatic, xrpc::XrpcExt}, 19 17 }; 20 - use reqwest::Url; 21 18 use tokio::time::Duration; 22 19 use tokio_util::sync::CancellationToken; 23 20 use tracing::{error, info, trace, warn}; ··· 29 26 storage::{ 30 27 self, DbRef, 31 28 backfill_progress::{BackfillProgress, get, set}, 32 - resync_queue::ResyncItem, 33 29 }, 30 + sync::discovery_queue::{DiscoveryItem, DiscoveryQueue}, 34 31 util::TokenExt, 35 32 }; 36 33 use std::sync::atomic::Ordering; ··· 41 38 const RETRY_DELAY_SECS: u64 = 10; 42 39 /// Maximum consecutive transient failures before giving up on this host. 43 40 const MAX_PAGE_FAILURES: u32 = 3; 44 - /// Typical number of requests required to complete one repo resync 45 - const REQUESTS_PER_RESYNC: u64 = 2; 46 41 47 42 /// Walk `listRepos` on `host` and enqueue new repos for resync. 48 43 /// ··· 57 52 token: CancellationToken, 58 53 resolver: Arc<Resolver>, 59 54 validate: bool, 60 - crawl_qps: NonZeroU32, 55 + discovery_queue: Arc<DiscoveryQueue>, 61 56 ) -> Result<bool> { 62 57 let base: jacquard_common::url::Url = format!("https://{host}") 63 58 .parse() ··· 78 73 ); 79 74 80 75 let mut total_queued: u64 = 0; 81 - // Per-host staggering: track last-scheduled timestamp so that items for the 82 - // same host are spread across time at 1/crawl_qps intervals. 83 - // This prevents the timestamp-ordered queue from bunching all items for 84 - // a popular host together. 85 - let host_interval = Duration::from_secs(1) / crawl_qps.get(); 86 - let mut host_schedule: HashMap<Arc<Url>, SystemTime> = HashMap::new(); 87 76 88 77 loop { 89 78 if token.is_cancelled() { ··· 97 86 }; 98 87 99 88 let page_len = dids.len(); 100 - let now = SystemTime::now(); 101 89 102 90 // For untrusted hosts (deep crawl), filter DIDs to those whose 103 91 // resolved PDS actually matches this host. ··· 130 118 }; 131 119 132 120 let progress_cursor = next_cursor.clone().unwrap_or_default(); 133 - let (page_queued, schedule_back) = { 121 + let (page_queued, new_items) = { 134 122 let db = db.clone(); 135 123 let host = host.clone(); 136 - // Move the schedule into the blocking task so timestamps are 137 - // only advanced for DIDs that are actually newly inserted. 138 - let schedule = std::mem::take(&mut host_schedule); 139 124 tokio::task::spawn_blocking(move || { 140 - store_page( 141 - &db, 142 - &host, 143 - dids_with_hosts, 144 - progress_cursor, 145 - now, 146 - host_interval, 147 - schedule, 148 - ) 125 + persist_page(&db, &host, dids_with_hosts, progress_cursor) 149 126 }) 150 127 .await?? 151 128 }; 152 - host_schedule = schedule_back; 129 + 130 + // Push newly-discovered repos into the in-memory discovery queue. 131 + // Each push may block if the queue is full (backpressure). 132 + for (did, pds) in new_items { 133 + let Some(_) = token 134 + .run(discovery_queue.push(DiscoveryItem { did, pds })) 135 + .await 136 + else { 137 + return Ok(false); 138 + }; 139 + } 153 140 154 141 total_queued += page_queued; 155 142 ··· 181 168 &host_owned, 182 169 &BackfillProgress { 183 170 cursor: "".to_string(), 184 - completed_at: Some(crate::util::to_millis(now).to_string()), 171 + completed_at: Some(crate::util::to_millis(SystemTime::now()).to_string()), 185 172 }, 186 173 ) 187 174 }) ··· 305 292 // Storage commit 306 293 // --------------------------------------------------------------------------- 307 294 308 - /// Enqueue newly-seen DIDs and persist the backfill cursor in one blocking task. 295 + type DidWithPds = (Did<'static>, Arc<Url>); 296 + 297 + /// Insert newly-seen DIDs into the repo table and persist the backfill cursor. 309 298 /// 310 - /// Timestamps are staggered per PDS host so that the timestamp-ordered queue 311 - /// naturally interleaves items across hosts. Only newly-inserted DIDs advance 312 - /// the per-host schedule — already-known repos are skipped without leaving gaps. 313 - /// 314 - /// Returns `(count_inserted, updated_host_schedule)`. 315 - fn store_page( 299 + /// Returns `(count_inserted, new_items)` — the caller pushes `new_items` into 300 + /// the in-memory discovery queue (async, with backpressure). 301 + fn persist_page( 316 302 db: &DbRef, 317 303 host: &Host, 318 - items: Vec<(Did<'static>, Arc<Url>)>, 304 + items: Vec<DidWithPds>, 319 305 progress_cursor: String, 320 - now: SystemTime, 321 - interval: Duration, 322 - mut host_schedule: HashMap<Arc<Url>, SystemTime>, 323 - ) -> Result<(u64, HashMap<Arc<Url>, SystemTime>)> { 324 - let mut count: u64 = 0; 325 - let meta_interval = interval * REQUESTS_PER_RESYNC as u32; 306 + ) -> Result<(u64, Vec<DidWithPds>)> { 307 + let mut new_items: Vec<DidWithPds> = Vec::new(); 326 308 for (did, pds) in items { 327 309 let newly_inserted = storage::repo::ensure_repo(db, &did)?; 328 310 if newly_inserted { 329 - let last = host_schedule.get(&pds).copied().unwrap_or(now); 330 - let when = if last >= now { 331 - last + meta_interval 332 - } else { 333 - now 334 - }; 335 - host_schedule.insert(pds, when); 336 - let item = ResyncItem { 337 - did, 338 - retry_count: 0, 339 - retry_reason: "backfill".to_string(), 340 - commit_cbor: vec![], 341 - }; 342 - storage::resync_queue::enqueue(db, when, &item)?; 343 311 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 344 - count += 1; 312 + new_items.push((did, pds)); 345 313 } 346 314 } 347 315 // Persist progress before advancing so a crash during the next ··· 354 322 completed_at: None, 355 323 }, 356 324 )?; 357 - Ok((count, host_schedule)) 325 + Ok((new_items.len() as u64, new_items)) 358 326 }
+7 -15
src/sync/deep_crawl.rs
··· 5 5 //! have already had a completed backfill are skipped — new repos on those PDSes 6 6 //! arrive via the firehose. 7 7 8 - use std::num::NonZeroU32; 9 8 use std::sync::Arc; 10 9 11 10 use jacquard_api::com_atproto::sync::list_hosts::ListHosts; ··· 19 18 http::ThrottledClient, 20 19 identity::Resolver, 21 20 storage::{DbRef, backfill_progress, list_hosts_cursor}, 22 - sync::backfill, 21 + sync::{backfill, discovery_queue::DiscoveryQueue}, 23 22 util::TokenExt, 24 23 }; 25 24 ··· 40 39 max_workers: usize, 41 40 token: CancellationToken, 42 41 resolver: Arc<Resolver>, 43 - crawl_qps: NonZeroU32, 42 + discovery_queue: Arc<DiscoveryQueue>, 44 43 ) -> Result<()> { 45 44 info!(relay = %relay_host, "deep crawl started"); 46 45 ··· 57 56 max_workers, 58 57 token.clone(), 59 58 resolver.clone(), 60 - crawl_qps, 59 + discovery_queue.clone(), 61 60 ) 62 61 .await?; 63 62 ··· 88 87 max_workers: usize, 89 88 token: CancellationToken, 90 89 resolver: Arc<Resolver>, 91 - crawl_qps: NonZeroU32, 90 + discovery_queue: Arc<DiscoveryQueue>, 92 91 ) -> Result<()> { 93 92 let base: jacquard_common::url::Url = format!("https://{relay_host}") 94 93 .parse() ··· 166 165 let child = token.child_token(); 167 166 let host2 = pds_host.clone(); 168 167 let resolver2 = resolver.clone(); 168 + let dq2 = discovery_queue.clone(); 169 169 workers.spawn(async move { 170 - let outcome = backfill::run( 171 - host2.clone(), 172 - db2, 173 - client2, 174 - child, 175 - resolver2, 176 - true, 177 - crawl_qps, 178 - ) 179 - .await; 170 + let outcome = 171 + backfill::run(host2.clone(), db2, client2, child, resolver2, true, dq2).await; 180 172 (host2, outcome) 181 173 }); 182 174 metrics::gauge!("lightrail_deep_crawl_workers").set(workers.len() as f64);
+412
src/sync/discovery_queue.rs
··· 1 + //! Bounded, host-fair in-memory queue for discovery-sourced resync work. 2 + //! 3 + //! Backfill and deep-crawl tasks push newly-discovered repos here instead of 4 + //! the on-disk resync queue. The dispatcher pops items round-robin across PDS 5 + //! hosts, achieving fair throughput distribution without timestamp stagger 6 + //! arithmetic. 7 + //! 8 + //! The queue is bounded in two dimensions: 9 + //! - **Global capacity**: total items across all hosts; prevents unbounded 10 + //! memory growth. 11 + //! - **Per-host limit**: items for a single PDS; prevents one large PDS from 12 + //! monopolising the queue and starving other hosts. 13 + //! 14 + //! When either limit is reached, producers block on [`DiscoveryQueue::push`] 15 + //! until the dispatcher drains items, applying natural backpressure to crawl 16 + //! tasks. 17 + //! 18 + //! On process restart, queued items are lost. This is acceptable because 19 + //! backfill/deep-crawl cursors are persisted — the gap self-heals on the next 20 + //! crawl pass. 21 + 22 + use std::collections::{HashMap, HashSet, VecDeque}; 23 + use std::pin::pin; 24 + use std::sync::atomic::{AtomicUsize, Ordering}; 25 + use std::sync::{Arc, Mutex}; 26 + 27 + use jacquard_common::types::string::Did; 28 + use jacquard_common::url::Url; 29 + 30 + /// A newly-discovered repo awaiting its initial resync. 31 + pub struct DiscoveryItem { 32 + pub did: Did<'static>, 33 + pub pds: Arc<Url>, 34 + } 35 + 36 + pub struct DiscoveryQueue { 37 + inner: Mutex<Inner>, 38 + /// Wakes producers when space becomes available (after a pop). 39 + space_available: tokio::sync::Notify, 40 + /// Wakes the dispatcher when items arrive. 41 + item_available: tokio::sync::Notify, 42 + /// Maximum total items across all host queues. 43 + capacity: usize, 44 + /// Maximum items for a single PDS host. 45 + per_host_limit: usize, 46 + /// Total items, readable without the lock (kept in sync under the lock). 47 + len: AtomicUsize, 48 + } 49 + 50 + struct Inner { 51 + /// Ordered list of PDS hosts for round-robin traversal. 52 + hosts: Vec<Arc<Url>>, 53 + /// Per-host FIFO queues of DIDs awaiting resync. 54 + queues: HashMap<Arc<Url>, VecDeque<Did<'static>>>, 55 + /// Round-robin cursor: index into `hosts` for the next pop. 56 + cursor: usize, 57 + /// Total items across all queues. 58 + len: usize, 59 + } 60 + 61 + impl Inner { 62 + fn can_push(&self, pds: &Arc<Url>, capacity: usize, per_host_limit: usize) -> bool { 63 + if self.len >= capacity { 64 + return false; 65 + } 66 + let host_len = self.queues.get(pds).map_or(0, |q| q.len()); 67 + host_len < per_host_limit 68 + } 69 + 70 + fn insert(&mut self, item: DiscoveryItem) { 71 + if !self.queues.contains_key(&item.pds) { 72 + self.hosts.push(item.pds.clone()); 73 + self.queues.insert(item.pds.clone(), VecDeque::new()); 74 + } 75 + self.queues.get_mut(&item.pds).unwrap().push_back(item.did); 76 + self.len += 1; 77 + } 78 + } 79 + 80 + impl DiscoveryQueue { 81 + pub fn new(capacity: usize, per_host_limit: usize) -> Self { 82 + Self { 83 + inner: Mutex::new(Inner { 84 + hosts: Vec::new(), 85 + queues: HashMap::new(), 86 + cursor: 0, 87 + len: 0, 88 + }), 89 + space_available: tokio::sync::Notify::new(), 90 + item_available: tokio::sync::Notify::new(), 91 + capacity, 92 + per_host_limit, 93 + len: AtomicUsize::new(0), 94 + } 95 + } 96 + 97 + /// Push an item, blocking if the global capacity or per-host limit is 98 + /// reached. 99 + /// 100 + /// Callers should `tokio::select!` this against their cancellation token 101 + /// for clean shutdown. 102 + pub async fn push(&self, item: DiscoveryItem) { 103 + loop { 104 + // Register for wake-up BEFORE checking, so a pop between check 105 + // and await still wakes us. 106 + let mut notified = pin!(self.space_available.notified()); 107 + notified.as_mut().enable(); 108 + 109 + { 110 + let mut inner = self.inner.lock().unwrap(); 111 + if inner.can_push(&item.pds, self.capacity, self.per_host_limit) { 112 + inner.insert(item); 113 + self.len.store(inner.len, Ordering::Relaxed); 114 + drop(inner); 115 + self.item_available.notify_one(); 116 + return; 117 + } 118 + } 119 + 120 + notified.await; 121 + } 122 + } 123 + 124 + /// Pop up to `n` items round-robin across hosts. 125 + /// 126 + /// Hosts in `skip_hosts` are left in the queue (e.g. cooling after 429). 127 + /// Returns immediately with whatever is available (may be empty). 128 + pub fn pop_batch(&self, n: usize, skip_hosts: &HashSet<Arc<Url>>) -> Vec<DiscoveryItem> { 129 + let mut inner = self.inner.lock().unwrap(); 130 + let mut result = Vec::with_capacity(n); 131 + 132 + if inner.hosts.is_empty() || n == 0 { 133 + return result; 134 + } 135 + 136 + let mut consecutive_skips = 0; 137 + while result.len() < n && consecutive_skips <= inner.hosts.len() { 138 + if inner.hosts.is_empty() { 139 + break; 140 + } 141 + let idx = inner.cursor % inner.hosts.len(); 142 + let host = inner.hosts[idx].clone(); 143 + 144 + if skip_hosts.contains(&host) { 145 + inner.cursor = idx + 1; 146 + consecutive_skips += 1; 147 + continue; 148 + } 149 + 150 + let queue = inner 151 + .queues 152 + .get_mut(&host) 153 + .expect("host in ring must have a queue"); 154 + let did = queue.pop_front().expect("host in ring must have items"); 155 + let host_empty = queue.is_empty(); 156 + inner.len -= 1; 157 + consecutive_skips = 0; 158 + 159 + if host_empty { 160 + inner.hosts.remove(idx); 161 + inner.queues.remove(&host); 162 + // Don't advance cursor — the next host slid into this index. 163 + if !inner.hosts.is_empty() { 164 + inner.cursor = idx % inner.hosts.len(); 165 + } 166 + } else { 167 + inner.cursor = idx + 1; 168 + } 169 + 170 + result.push(DiscoveryItem { did, pds: host }); 171 + } 172 + 173 + let popped = result.len(); 174 + self.len.store(inner.len, Ordering::Relaxed); 175 + drop(inner); 176 + 177 + if popped > 0 { 178 + self.space_available.notify_waiters(); 179 + } 180 + result 181 + } 182 + 183 + /// Number of items across all host queues. 184 + pub fn len(&self) -> usize { 185 + self.len.load(Ordering::Relaxed) 186 + } 187 + 188 + pub fn is_empty(&self) -> bool { 189 + self.len() == 0 190 + } 191 + 192 + /// Returns a future that completes when an item is pushed. 193 + /// 194 + /// Use in `tokio::select!` alongside the idle-poll timer so the dispatcher 195 + /// wakes immediately when discovery work arrives instead of waiting for 196 + /// the next poll cycle. 197 + pub fn notified(&self) -> tokio::sync::futures::Notified<'_> { 198 + self.item_available.notified() 199 + } 200 + } 201 + 202 + #[cfg(test)] 203 + mod tests { 204 + use super::*; 205 + 206 + fn url(s: &str) -> Arc<Url> { 207 + Arc::new(s.parse().unwrap()) 208 + } 209 + 210 + fn did(s: &str) -> Did<'static> { 211 + Did::new_owned(s).unwrap() 212 + } 213 + 214 + fn item(did_str: &str, pds_str: &str) -> DiscoveryItem { 215 + DiscoveryItem { 216 + did: did(did_str), 217 + pds: url(pds_str), 218 + } 219 + } 220 + 221 + #[tokio::test] 222 + async fn push_and_pop_single() { 223 + let q = DiscoveryQueue::new(10, 10); 224 + q.push(item("did:web:alice.test", "https://pds1.test")) 225 + .await; 226 + assert_eq!(q.len(), 1); 227 + 228 + let batch = q.pop_batch(10, &HashSet::new()); 229 + assert_eq!(batch.len(), 1); 230 + assert_eq!(batch[0].did.as_str(), "did:web:alice.test"); 231 + assert_eq!(q.len(), 0); 232 + } 233 + 234 + #[tokio::test] 235 + async fn round_robin_across_hosts() { 236 + let q = DiscoveryQueue::new(100, 100); 237 + // Two items on pds1, two on pds2. 238 + q.push(item("did:web:a1.test", "https://pds1.test")).await; 239 + q.push(item("did:web:a2.test", "https://pds1.test")).await; 240 + q.push(item("did:web:b1.test", "https://pds2.test")).await; 241 + q.push(item("did:web:b2.test", "https://pds2.test")).await; 242 + 243 + let batch = q.pop_batch(4, &HashSet::new()); 244 + assert_eq!(batch.len(), 4); 245 + // Should alternate: pds1, pds2, pds1, pds2. 246 + assert_eq!(batch[0].did.as_str(), "did:web:a1.test"); 247 + assert_eq!(batch[1].did.as_str(), "did:web:b1.test"); 248 + assert_eq!(batch[2].did.as_str(), "did:web:a2.test"); 249 + assert_eq!(batch[3].did.as_str(), "did:web:b2.test"); 250 + } 251 + 252 + #[tokio::test] 253 + async fn skip_hosts_leaves_items_in_queue() { 254 + let q = DiscoveryQueue::new(100, 100); 255 + let pds1 = url("https://pds1.test"); 256 + q.push(item("did:web:a1.test", "https://pds1.test")).await; 257 + q.push(item("did:web:b1.test", "https://pds2.test")).await; 258 + 259 + let skip = HashSet::from([pds1]); 260 + let batch = q.pop_batch(10, &skip); 261 + assert_eq!(batch.len(), 1); 262 + assert_eq!(batch[0].did.as_str(), "did:web:b1.test"); 263 + // pds1 item is still queued. 264 + assert_eq!(q.len(), 1); 265 + } 266 + 267 + #[tokio::test] 268 + async fn pop_batch_respects_limit() { 269 + let q = DiscoveryQueue::new(100, 100); 270 + for i in 0..10 { 271 + q.push(item(&format!("did:web:user{i}.test"), "https://pds1.test")) 272 + .await; 273 + } 274 + 275 + let batch = q.pop_batch(3, &HashSet::new()); 276 + assert_eq!(batch.len(), 3); 277 + assert_eq!(q.len(), 7); 278 + } 279 + 280 + #[tokio::test] 281 + async fn empty_pop_returns_empty() { 282 + let q = DiscoveryQueue::new(10, 10); 283 + let batch = q.pop_batch(10, &HashSet::new()); 284 + assert!(batch.is_empty()); 285 + } 286 + 287 + /// Helper: yield to the runtime repeatedly so a spawned task has a 288 + /// chance to make progress. If a push would complete, it will within 289 + /// a few yields; if it's genuinely blocked, it stays blocked. 290 + async fn yield_to_runtime() { 291 + for _ in 0..5 { 292 + tokio::task::yield_now().await; 293 + } 294 + } 295 + 296 + #[tokio::test] 297 + async fn global_backpressure_blocks_producer() { 298 + let q = Arc::new(DiscoveryQueue::new(2, 10)); 299 + q.push(item("did:web:a.test", "https://pds1.test")).await; 300 + q.push(item("did:web:b.test", "https://pds2.test")).await; 301 + assert_eq!(q.len(), 2); 302 + 303 + // Third push should block (global capacity 2). 304 + let q2 = q.clone(); 305 + let handle = tokio::spawn(async move { 306 + q2.push(item("did:web:c.test", "https://pds3.test")).await; 307 + }); 308 + 309 + yield_to_runtime().await; 310 + assert!(!handle.is_finished()); 311 + 312 + // Pop one to free capacity — push should complete. 313 + q.pop_batch(1, &HashSet::new()); 314 + yield_to_runtime().await; 315 + assert!(handle.is_finished()); 316 + handle.await.expect("push task shouldn't panic"); 317 + assert_eq!(q.len(), 2); 318 + } 319 + 320 + #[tokio::test] 321 + async fn per_host_limit_blocks_producer() { 322 + let q = Arc::new(DiscoveryQueue::new(100, 2)); 323 + // Fill host pds1 to its per-host limit of 2. 324 + q.push(item("did:web:a.test", "https://pds1.test")).await; 325 + q.push(item("did:web:b.test", "https://pds1.test")).await; 326 + 327 + // Third push to same host should block despite global capacity. 328 + let q2 = q.clone(); 329 + let handle = tokio::spawn(async move { 330 + q2.push(item("did:web:c.test", "https://pds1.test")).await; 331 + }); 332 + 333 + yield_to_runtime().await; 334 + assert!(!handle.is_finished()); 335 + 336 + // A different host should still be pushable. 337 + q.push(item("did:web:d.test", "https://pds2.test")).await; 338 + assert_eq!(q.len(), 3); 339 + 340 + // Pop one from pds1 to unblock the waiting producer. 341 + let batch = q.pop_batch(1, &HashSet::new()); 342 + assert_eq!(batch[0].pds.as_str(), "https://pds1.test/"); 343 + yield_to_runtime().await; 344 + assert!(handle.is_finished()); 345 + handle.await.expect("push task shouldn't panic"); 346 + assert_eq!(q.len(), 3); 347 + } 348 + 349 + #[tokio::test] 350 + async fn per_host_limit_prevents_monopolisation() { 351 + let q = DiscoveryQueue::new(100, 3); 352 + // Push 3 items for pds1 (at limit), then 3 for pds2. 353 + for i in 0..3 { 354 + q.push(item(&format!("did:web:a{i}.test"), "https://pds1.test")) 355 + .await; 356 + } 357 + for i in 0..3 { 358 + q.push(item(&format!("did:web:b{i}.test"), "https://pds2.test")) 359 + .await; 360 + } 361 + assert_eq!(q.len(), 6); 362 + 363 + // Pop 6: should interleave thanks to round-robin. 364 + let batch = q.pop_batch(6, &HashSet::new()); 365 + assert_eq!(batch.len(), 6); 366 + assert_eq!(batch[0].pds.as_str(), "https://pds1.test/"); 367 + assert_eq!(batch[1].pds.as_str(), "https://pds2.test/"); 368 + assert_eq!(batch[2].pds.as_str(), "https://pds1.test/"); 369 + assert_eq!(batch[3].pds.as_str(), "https://pds2.test/"); 370 + } 371 + 372 + #[tokio::test] 373 + async fn cursor_persists_across_batches() { 374 + let q = DiscoveryQueue::new(100, 100); 375 + // 3 hosts, 1 item each. 376 + q.push(item("did:web:a.test", "https://pds1.test")).await; 377 + q.push(item("did:web:b.test", "https://pds2.test")).await; 378 + q.push(item("did:web:c.test", "https://pds3.test")).await; 379 + // Add second item to each. 380 + q.push(item("did:web:a2.test", "https://pds1.test")).await; 381 + q.push(item("did:web:b2.test", "https://pds2.test")).await; 382 + q.push(item("did:web:c2.test", "https://pds3.test")).await; 383 + 384 + // Pop 2: should get pds1, pds2. Cursor now at pds3. 385 + let batch1 = q.pop_batch(2, &HashSet::new()); 386 + assert_eq!(batch1[0].did.as_str(), "did:web:a.test"); 387 + assert_eq!(batch1[1].did.as_str(), "did:web:b.test"); 388 + 389 + // Pop 2 more: cursor resumes at pds3, then wraps to pds1. 390 + let batch2 = q.pop_batch(2, &HashSet::new()); 391 + assert_eq!(batch2[0].did.as_str(), "did:web:c.test"); 392 + assert_eq!(batch2[1].did.as_str(), "did:web:a2.test"); 393 + } 394 + 395 + #[tokio::test] 396 + async fn host_removed_when_drained() { 397 + let q = DiscoveryQueue::new(100, 100); 398 + q.push(item("did:web:a.test", "https://pds1.test")).await; 399 + q.push(item("did:web:b.test", "https://pds2.test")).await; 400 + 401 + // Drain pds1 by skipping pds2. 402 + let pds2 = url("https://pds2.test"); 403 + let batch = q.pop_batch(10, &HashSet::from([pds2])); 404 + assert_eq!(batch.len(), 1); 405 + 406 + // Now un-skip pds2 and pop. 407 + let batch = q.pop_batch(10, &HashSet::new()); 408 + assert_eq!(batch.len(), 1); 409 + assert_eq!(batch[0].did.as_str(), "did:web:b.test"); 410 + assert!(q.is_empty()); 411 + } 412 + }
+1
src/sync/mod.rs
··· 1 1 pub mod backfill; 2 2 pub mod deep_crawl; 3 + pub mod discovery_queue; 3 4 pub mod firehose; 4 5 pub mod resync;
+67 -4
src/sync/resync/dispatcher.rs
··· 29 29 repo::{AccountStatus, RepoInfo, RepoState}, 30 30 resync_queue::ResyncItem, 31 31 }; 32 + use crate::sync::discovery_queue::DiscoveryQueue; 32 33 use crate::util::TokenExt; 33 34 34 35 /// Point-in-time view of the dispatcher's operational state, published for ··· 43 44 pub hosts: Vec<HostEntry>, 44 45 /// Total number of worker tasks in the JoinSet. 45 46 pub worker_count: usize, 47 + /// Number of items in the in-memory discovery queue. 48 + pub discovery_queue_depth: usize, 46 49 } 47 50 48 51 #[derive(Clone, Serialize)] ··· 85 88 pub token: tokio_util::sync::CancellationToken, 86 89 pub force_get_repo: bool, 87 90 pub state: DispatcherState, 91 + pub discovery_queue: Arc<DiscoveryQueue>, 88 92 } 89 93 90 94 /// Run the resync dispatcher until the future is cancelled. ··· 103 107 token, 104 108 force_get_repo, 105 109 state, 110 + discovery_queue, 106 111 }: DispatcherConfig, 107 112 ) -> Result<()> { 108 113 let mut busy: HashSet<Did<'static>> = HashSet::new(); ··· 210 215 } 211 216 } 212 217 218 + // Disk queue exhausted — fill remaining slots from the memory 219 + // discovery queue (backfill / deep-crawl items). 220 + if workers.len() < max_concurrent { 221 + let now_inst = Instant::now(); 222 + let cooling_set: HashSet<Arc<Url>> = cooling_hosts 223 + .iter() 224 + .filter(|&(_, &until)| now_inst < until) 225 + .map(|(url, _)| Arc::clone(url)) 226 + .collect(); 227 + let batch = discovery_queue.pop_batch(max_concurrent - workers.len(), &cooling_set); 228 + for dq_item in batch { 229 + if busy.contains(&dq_item.did) { 230 + continue; // already being resynced via disk queue 231 + } 232 + let did = dq_item.did.clone(); 233 + // Transition to Resyncing so the firehose buffers commits. 234 + if let Err(e) = 235 + transition_state(db.clone(), did.clone(), RepoState::Resyncing, None).await 236 + { 237 + warn!(did = %did, error = %e, 238 + "failed to transition discovery item to Resyncing; skipping"); 239 + continue; 240 + } 241 + 242 + busy.insert(did.clone()); 243 + let item = ResyncItem { 244 + did: dq_item.did, 245 + retry_count: 0, 246 + retry_reason: String::new(), 247 + commit_cbor: vec![], 248 + }; 249 + let worker_stuff = AppStuff { 250 + resolver: resolver.clone(), 251 + client: client.clone(), 252 + db: db.clone(), 253 + token: token.clone(), 254 + }; 255 + let handle = workers.spawn(async move { 256 + run_worker( 257 + item, 258 + worker_stuff, 259 + describe_timeout, 260 + get_repo_timeout, 261 + force_get_repo, 262 + ) 263 + .await 264 + }); 265 + task_dids.insert(handle.id(), did.clone()); 266 + metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 267 + trace!(did = %did, running = workers.len(), "spawned resync worker (discovery)"); 268 + } 269 + } 270 + 213 271 if workers.is_empty() { 214 - // Nothing running, nothing claimable right now. Yield before polling. 215 - if !token.sleep(IDLE_POLL).await { 216 - return Ok(()); 217 - }; 272 + // Nothing running, nothing claimable right now. Wait for new 273 + // items in the discovery queue or the next poll interval. 274 + let notified = discovery_queue.notified(); 275 + tokio::select! { 276 + _ = token.cancelled() => return Ok(()), 277 + _ = tokio::time::sleep(IDLE_POLL) => {}, 278 + _ = notified => {}, 279 + } 218 280 continue; 219 281 } 220 282 ··· 339 401 entries 340 402 }, 341 403 worker_count: workers.len(), 404 + discovery_queue_depth: discovery_queue.len(), 342 405 }; 343 406 // Unwrap is safe: we never poison the lock (no panics while holding it). 344 407 *state.lock().unwrap() = snap;