lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

enqueue resync at half qps limit

since we need two requests per resync typically

phil ea66d22c 7723c291

+25 -14
+25 -14
src/sync/backfill.rs
··· 17 17 url::Host, 18 18 {IntoStatic, xrpc::XrpcExt}, 19 19 }; 20 + use reqwest::Url; 20 21 use tokio::time::Duration; 21 22 use tokio_util::sync::CancellationToken; 22 - use tracing::{info, trace, warn}; 23 + use tracing::{error, info, trace, warn}; 23 24 24 25 use crate::{ 25 26 error::Result, ··· 39 40 const RETRY_DELAY_SECS: u64 = 10; 40 41 /// Maximum consecutive transient failures before giving up on this host. 41 42 const MAX_PAGE_FAILURES: u32 = 3; 43 + /// Typical number of requests required to complete one repo resync 44 + const REQUESTS_PER_RESYNC: u64 = 2; 42 45 43 46 /// Walk the full `listRepos` feed for `host`, enqueuing newly discovered repos. 44 47 /// ··· 81 84 ); 82 85 83 86 let mut total_queued: u64 = 0; 84 - // Per-host staggering: track last-scheduled timestamp (in seconds, with 85 - // fractional precision via a separate sub-second counter) so that items 86 - // for the same host are spread across time at 1/crawl_qps intervals. 87 + // Per-host staggering: track last-scheduled timestamp so that items for the 88 + // same host are spread across time at 1/crawl_qps intervals. 87 89 // This prevents the timestamp-ordered queue from bunching all items for 88 90 // a popular host together. 89 - let host_interval_ms: u64 = 1000 / crawl_qps.get() as u64; 90 - let mut host_schedule: HashMap<String, u64> = HashMap::new(); // host → last scheduled ts (seconds) 91 + let host_interval_ms = 1000 / crawl_qps.get() as u64; 92 + let mut host_schedule: HashMap<Arc<Url>, u64> = HashMap::new(); // host → last scheduled ts (seconds) 91 93 92 94 loop { 93 95 if token.is_cancelled() { ··· 102 104 103 105 let page_len = dids.len(); 104 106 let now = unix_now_ms(); 105 - let host_str = host.to_string(); 106 107 107 108 // For untrusted hosts (deep crawl), filter DIDs to those whose 108 109 // resolved PDS actually matches this host. ··· 114 115 115 116 // Resolve each DID's actual PDS host for per-host stagger. 116 117 // Cache hits are free; misses fall back to the listed host. 117 - let dids_with_hosts: Vec<(Did<'static>, String)> = { 118 + // 119 + // TODO: ...this is basically redundant with validate_dids now? 120 + let dids_with_hosts: Vec<(Did<'static>, Arc<Url>)> = { 118 121 let mut out = Vec::with_capacity(dids.len()); 119 122 for did in dids { 120 123 let pds_host = match resolver.resolve(&did).await { 121 - Ok(resolved) => resolved.pds.host_str().unwrap_or(&host_str).to_string(), 122 - Err(_) => host_str.clone(), 124 + Ok(resolved) => resolved.pds.clone(), 125 + Err(e) => { 126 + error!(did = %did, error = %e, "failed to resolve host for validated did; not enqueuing resync"); 127 + continue; 128 + } 123 129 }; 124 130 out.push((did, pds_host)); 125 131 } ··· 315 321 fn store_page( 316 322 db: &DbRef, 317 323 host: &Host, 318 - items: Vec<(Did<'static>, String)>, 324 + items: Vec<(Did<'static>, Arc<Url>)>, 319 325 progress_cursor: String, 320 326 now: u64, 321 327 interval_ms: u64, 322 - mut host_schedule: HashMap<String, u64>, 323 - ) -> Result<(u64, HashMap<String, u64>)> { 328 + mut host_schedule: HashMap<Arc<Url>, u64>, 329 + ) -> Result<(u64, HashMap<Arc<Url>, u64>)> { 324 330 let mut count: u64 = 0; 331 + let meta_interval = interval_ms * REQUESTS_PER_RESYNC; 325 332 for (did, pds) in items { 326 333 let newly_inserted = storage::repo::ensure_repo(db, &did)?; 327 334 if newly_inserted { 328 335 let last = host_schedule.get(&pds).copied().unwrap_or(now); 329 - let ts = if last >= now { last + interval_ms } else { now }; 336 + let ts = if last >= now { 337 + last + meta_interval 338 + } else { 339 + now 340 + }; 330 341 host_schedule.insert(pds, ts); 331 342 let item = ResyncItem { 332 343 did,