lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

deep crawl: check DID->host before queing

so we don't queue work from PDSs that might not be authoritative over their claimed repos.

phil ccbc2e53 9f309918

+326 -153
+16
src/identity.rs
··· 189 189 } 190 190 } 191 191 192 + /// Resolve without touching the cache. Used by the backfill validation path 193 + /// to avoid populating the cache with entries that may be rejected. 194 + pub async fn resolve_no_cache( 195 + &self, 196 + did: &Did<'_>, 197 + ) -> Result<(Url, PublicKey<'static>), IdentityError> { 198 + self.resolve_uncached(did).await 199 + } 200 + 201 + /// Insert a pre-resolved identity into the cache. 202 + /// Called after backfill validation accepts a DID, to warm the cache for 203 + /// the resync worker that will pick it up shortly. 204 + pub fn insert_resolved(&self, did: Did<'static>, pds: Url, pubkey: PublicKey<'static>) { 205 + self.cache.insert(did, pds, pubkey); 206 + } 207 + 192 208 /// Evict `did` from the identity cache. 193 209 /// 194 210 /// Called when a `#identity` firehose event is received, after all
+11 -2
src/main.rs
··· 157 157 let client = client.clone(); 158 158 let host = subscribe_host.clone(); 159 159 async move { 160 - if backfill::run(host, db, client, token.clone()).await? { 160 + if backfill::run(host, db, client, token.clone(), None).await? { 161 161 info!("backfill task idling"); 162 162 token.cancelled().await; 163 163 } ··· 197 197 let db = db.clone(); 198 198 let client = client.clone(); 199 199 let host = subscribe_host.clone(); 200 + let resolver = resolver.clone(); 200 201 async move { 201 - sync::deep_crawl::run(host, db, client, args.max_deep_crawl_workers, token).await 202 + sync::deep_crawl::run( 203 + host, 204 + db, 205 + client, 206 + args.max_deep_crawl_workers, 207 + token, 208 + resolver, 209 + ) 210 + .await 202 211 } 203 212 }); 204 213 }
+192 -90
src/sync/backfill.rs
··· 6 6 //! (any state) are skipped — the dispatcher's retry mechanism handles repos 7 7 //! that need re-syncing. 8 8 9 + use std::sync::Arc; 10 + 9 11 use jacquard_api::com_atproto::sync::list_repos::ListRepos; 10 - use jacquard_common::url::Host; 11 - use jacquard_common::{IntoStatic, xrpc::XrpcExt}; 12 + use jacquard_common::{ 13 + error::ClientErrorKind, 14 + types::string::Did, 15 + url::Host, 16 + {IntoStatic, xrpc::XrpcExt}, 17 + }; 18 + use tokio_util::sync::CancellationToken; 12 19 use tracing::{info, trace, warn}; 13 20 14 - use crate::error::Result; 15 - use crate::http::ThrottledClient; 16 - use crate::storage::{ 17 - self, DbRef, 18 - backfill_progress::{BackfillProgress, get, set}, 19 - resync_queue::ResyncItem, 21 + use crate::{ 22 + error::Result, 23 + http::ThrottledClient, 24 + identity::Resolver, 25 + storage::{ 26 + self, DbRef, 27 + backfill_progress::{BackfillProgress, get, set}, 28 + resync_queue::ResyncItem, 29 + }, 30 + util::unix_now, 20 31 }; 21 - use crate::util::unix_now; 22 32 23 33 const PAGE_LIMIT: i64 = 500; 24 - /// Delay between retry attempts after a failed page request. 34 + /// Delay between retry attempts after a transient page failure. 25 35 const RETRY_DELAY_SECS: u64 = 10; 36 + /// Maximum consecutive transient failures before giving up on this host. 37 + const MAX_PAGE_FAILURES: u32 = 3; 26 38 27 39 /// Walk the full `listRepos` feed for `host`, enqueuing newly discovered repos. 28 40 /// 29 - /// Resumes from the last persisted cursor on restart. Retries indefinitely on 30 - /// transient network errors. Returns `Ok(())` when the full walk completes. 41 + /// Resumes from the last persisted cursor on restart. 4xx errors are treated 42 + /// as fatal and abort the walk immediately. Transient errors are retried up to 43 + /// `MAX_PAGE_FAILURES` times before giving up. Returns `Ok(true)` when the 44 + /// full walk completes, `Ok(false)` if cancelled or the host gives up. 31 45 pub async fn run( 32 46 host: Host, 33 47 db: DbRef, 34 48 client: ThrottledClient, 35 - token: tokio_util::sync::CancellationToken, 49 + token: CancellationToken, 50 + resolver: Option<Arc<Resolver>>, 36 51 ) -> Result<bool> { 37 52 let base: jacquard_common::url::Url = format!("https://{host}") 38 53 .parse() ··· 44 59 let host = host.clone(); 45 60 tokio::task::spawn_blocking(move || get(&db, &host)).await?? 46 61 } 47 - .and_then(|p| { 48 - if p.cursor.is_empty() { 49 - None 50 - } else { 51 - Some(p.cursor) 52 - } 53 - }); 62 + .map(|p| p.cursor); 54 63 55 64 info!( 56 65 host = %host, ··· 65 74 return Ok(false); 66 75 } 67 76 68 - // Scope req so the borrow of `cursor` is released before we reassign it. 69 - // Extract owned Strings immediately so `resp` can be dropped in the loop. 70 - let (dids, next_cursor) = { 71 - let req = ListRepos { 72 - cursor: cursor.as_deref().map(Into::into), 73 - limit: Some(PAGE_LIMIT), 77 + let (dids, next_cursor) = 78 + match fetch_page(&host, &base, cursor.as_deref(), &client, &token).await { 79 + Some(page) => page, 80 + None => return Ok(false), 74 81 }; 75 - loop { 76 - let resp = match client.xrpc(base.clone()).send(&req).await { 77 - Ok(r) => r, 78 - Err(e) => { 79 - warn!(error = %e, host = %host, "listRepos send failed; retrying in {RETRY_DELAY_SECS}s"); 80 - tokio::select! { 81 - _ = token.cancelled() => return Ok(false), 82 - _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 83 - } 84 - continue; 85 - } 86 - }; 87 - match resp.parse() { 88 - Ok(out) => { 89 - let next = out.cursor.as_deref().map(str::to_owned); 90 - let dids = out 91 - .repos 92 - .into_iter() 93 - .map(|r| r.did.into_static()) 94 - .collect::<Vec<_>>(); 95 - break (dids, next); 96 - } 97 - Err(e) => { 98 - warn!(error = %e, host = %host, "listRepos parse failed; retrying in {RETRY_DELAY_SECS}s"); 99 - tokio::select! { 100 - _ = token.cancelled() => return Ok(false), 101 - _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 102 - } 103 - } 104 - } 105 - } 106 - }; 107 82 108 83 let page_len = dids.len(); 109 84 let now = unix_now(); 110 85 111 - // Process all page entries and persist progress in a single blocking 112 - // task so we don't interleave fjall I/O with the async runtime. 113 - let page_queued: u64 = { 86 + let dids = match &resolver { 87 + Some(r) => validate_dids(dids, r, &host).await, 88 + None => dids, 89 + }; 90 + 91 + let progress_cursor = next_cursor.clone().unwrap_or_default(); 92 + let page_queued = { 114 93 let db = db.clone(); 115 - let progress_cursor = next_cursor.clone().unwrap_or_default(); 116 94 let host = host.clone(); 117 - tokio::task::spawn_blocking(move || { 118 - let mut count: u64 = 0; 119 - for did in dids { 120 - let newly_inserted = storage::repo::ensure_repo(&db, &did)?; 121 - if newly_inserted { 122 - let item = ResyncItem { 123 - did, 124 - retry_count: 0, 125 - retry_reason: "backfill".to_string(), 126 - commit_cbor: vec![], 127 - }; 128 - storage::resync_queue::enqueue(&db, now, &item)?; 129 - count += 1; 130 - } 131 - } 132 - // Persist progress before advancing so a crash during the next 133 - // page restarts from here, not the beginning. 134 - set( 135 - &db, 136 - &host, 137 - &BackfillProgress { 138 - cursor: progress_cursor, 139 - completed_at: None, 140 - }, 141 - )?; 142 - Ok::<u64, crate::error::Error>(count) 143 - }) 144 - .await?? 95 + tokio::task::spawn_blocking(move || store_page(&db, &host, dids, progress_cursor, now)) 96 + .await?? 145 97 }; 146 98 147 99 total_queued += page_queued; ··· 177 129 } 178 130 } 179 131 } 132 + 133 + // --------------------------------------------------------------------------- 134 + // Page fetch 135 + // --------------------------------------------------------------------------- 136 + 137 + /// Fetch one `listRepos` page with retry logic. 138 + /// 139 + /// Returns `None` if the host gives a 4xx, exceeds `MAX_PAGE_FAILURES` 140 + /// transient errors, or the token is cancelled during a retry sleep. 141 + async fn fetch_page( 142 + host: &Host, 143 + base: &jacquard_common::url::Url, 144 + cursor: Option<&str>, 145 + client: &ThrottledClient, 146 + token: &CancellationToken, 147 + ) -> Option<(Vec<Did<'static>>, Option<String>)> { 148 + let req = ListRepos { 149 + cursor: cursor.map(Into::into), 150 + limit: Some(PAGE_LIMIT), 151 + }; 152 + let mut failures: u32 = 0; 153 + loop { 154 + let result = match client.xrpc(base.clone()).send(&req).await { 155 + Err(e) => { 156 + let is_client_err = matches!( 157 + e.kind(), 158 + ClientErrorKind::Http { status } if status.is_client_error() 159 + ); 160 + if is_client_err { 161 + warn!(error = %e, host = %host, 162 + "listRepos failed with client error; giving up on this host"); 163 + return None; 164 + } 165 + warn!(error = %e, host = %host, "listRepos request failed"); 166 + None 167 + } 168 + Ok(resp) => match resp.parse() { 169 + Ok(out) => { 170 + let next = out.cursor.as_deref().map(str::to_owned); 171 + let dids = out 172 + .repos 173 + .into_iter() 174 + .map(|r| r.did.into_static()) 175 + .collect::<Vec<_>>(); 176 + Some((dids, next)) 177 + } 178 + Err(e) => { 179 + warn!(error = %e, host = %host, "listRepos response parse failed"); 180 + None 181 + } 182 + }, 183 + }; 184 + 185 + match result { 186 + Some(page) => return Some(page), 187 + None => { 188 + failures += 1; 189 + if failures >= MAX_PAGE_FAILURES { 190 + warn!(host = %host, failures, 191 + "listRepos page failed {MAX_PAGE_FAILURES} times; giving up on this host"); 192 + return None; 193 + } 194 + tokio::select! { 195 + _ = token.cancelled() => return None, 196 + _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 197 + } 198 + } 199 + } 200 + } 201 + } 202 + 203 + // --------------------------------------------------------------------------- 204 + // DID validation 205 + // --------------------------------------------------------------------------- 206 + 207 + /// Filter `dids` to those whose resolved PDS endpoint matches `host`. 208 + /// 209 + /// Bypasses the identity cache to avoid populating it with entries that may 210 + /// be rejected. Accepted DIDs are inserted into the cache explicitly so the 211 + /// resync worker gets a warm hit when it picks them up shortly after. 212 + async fn validate_dids( 213 + dids: Vec<Did<'static>>, 214 + resolver: &Resolver, 215 + host: &Host, 216 + ) -> Vec<Did<'static>> { 217 + let host_str = host.to_string(); 218 + let mut valid = Vec::with_capacity(dids.len()); 219 + for did in dids { 220 + match resolver.resolve_no_cache(&did).await { 221 + Ok((pds_url, pubkey)) if pds_url.host_str() == Some(host_str.as_str()) => { 222 + resolver.insert_resolved(did.clone().into_static(), pds_url, pubkey); 223 + valid.push(did); 224 + } 225 + Ok((pds_url, _)) => { 226 + metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "pds_mismatch") 227 + .increment(1); 228 + trace!(did = %did, resolved_pds = %pds_url, expected = %host, 229 + "DID resolves to different PDS; skipping"); 230 + } 231 + Err(e) => { 232 + metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "resolution_failed") 233 + .increment(1); 234 + trace!(did = %did, error = %e, 235 + "DID resolution failed during backfill validation; skipping"); 236 + } 237 + } 238 + } 239 + valid 240 + } 241 + 242 + // --------------------------------------------------------------------------- 243 + // Storage commit 244 + // --------------------------------------------------------------------------- 245 + 246 + /// Enqueue newly-seen DIDs and persist the backfill cursor in one blocking task. 247 + /// 248 + /// Returns the number of DIDs that were newly inserted into the resync queue. 249 + fn store_page( 250 + db: &DbRef, 251 + host: &Host, 252 + dids: Vec<Did<'static>>, 253 + progress_cursor: String, 254 + now: u64, 255 + ) -> Result<u64> { 256 + let mut count: u64 = 0; 257 + for did in dids { 258 + let newly_inserted = storage::repo::ensure_repo(db, &did)?; 259 + if newly_inserted { 260 + let item = ResyncItem { 261 + did, 262 + retry_count: 0, 263 + retry_reason: "backfill".to_string(), 264 + commit_cbor: vec![], 265 + }; 266 + storage::resync_queue::enqueue(db, now, &item)?; 267 + count += 1; 268 + } 269 + } 270 + // Persist progress before advancing so a crash during the next 271 + // page restarts from here, not the beginning. 272 + set( 273 + db, 274 + host, 275 + &BackfillProgress { 276 + cursor: progress_cursor, 277 + completed_at: None, 278 + }, 279 + )?; 280 + Ok(count) 281 + }
+107 -61
src/sync/deep_crawl.rs
··· 5 5 //! have already had a completed backfill are skipped — new repos on those PDSes 6 6 //! arrive via the firehose. 7 7 8 + use std::sync::Arc; 9 + 8 10 use jacquard_api::com_atproto::sync::list_hosts::ListHosts; 9 11 use jacquard_common::{url::Host, xrpc::XrpcExt}; 10 12 use tokio::task::JoinSet; 11 13 use tokio_util::sync::CancellationToken; 12 14 use tracing::{info, warn}; 13 15 14 - use crate::error::Result; 15 - use crate::http::ThrottledClient; 16 - use crate::storage::{DbRef, backfill_progress, list_hosts_cursor}; 17 - use crate::sync::backfill; 16 + use crate::{ 17 + error::Result, 18 + http::ThrottledClient, 19 + identity::Resolver, 20 + storage::{DbRef, backfill_progress, list_hosts_cursor}, 21 + sync::backfill, 22 + }; 18 23 19 24 const PAGE_LIMIT: i64 = 500; 20 25 /// Delay between retry attempts after a failed page request. 21 26 const RETRY_DELAY_SECS: u64 = 10; 27 + /// Maximum number of consecutive page failures before abandoning the pass. 28 + const MAX_PAGE_FAILURES: u32 = 3; 22 29 /// How long to wait between full passes through all listHosts pages. 23 - const REPOLL_SECS: u64 = 30 * 60; 30 + const REPOLL_SECS: u64 = 20 * 60 * 60; 24 31 25 32 /// Discover PDS hosts via `listHosts` on `relay_host` and crawl each one, 26 - /// re-polling every 30 minutes after each full pass. 33 + /// re-polling every 20 hours after each full pass. 27 34 pub async fn run( 28 35 relay_host: Host, 29 36 db: DbRef, 30 37 client: ThrottledClient, 31 38 max_workers: usize, 32 39 token: CancellationToken, 40 + resolver: Arc<Resolver>, 33 41 ) -> Result<()> { 34 42 info!(relay = %relay_host, "deep crawl started"); 35 43 ··· 39 47 } 40 48 41 49 info!(relay = %relay_host, "deep crawl pass started"); 42 - do_pass( 50 + crawl_pass( 43 51 relay_host.clone(), 44 52 db.clone(), 45 53 client.clone(), 46 54 max_workers, 47 55 token.clone(), 56 + resolver.clone(), 48 57 ) 49 58 .await?; 50 59 ··· 55 64 info!( 56 65 relay = %relay_host, 57 66 repoll_secs = REPOLL_SECS, 58 - "deep crawl pass complete; re-polling in 30 minutes" 67 + "deep crawl pass complete; re-polling in 20 hours" 59 68 ); 60 69 61 70 tokio::select! { ··· 65 74 } 66 75 } 67 76 68 - async fn do_pass( 77 + /// Page through the full `listHosts` feed once, spawning a `backfill::run` 78 + /// worker for each PDS host not yet crawled. Workers run concurrently up to 79 + /// `max_workers`; the cursor is persisted after each page so a restart resumes 80 + /// mid-pass rather than from the beginning. 81 + async fn crawl_pass( 69 82 relay_host: Host, 70 83 db: DbRef, 71 84 client: ThrottledClient, 72 85 max_workers: usize, 73 86 token: CancellationToken, 87 + resolver: Arc<Resolver>, 74 88 ) -> Result<()> { 75 89 let base: jacquard_common::url::Url = format!("https://{relay_host}") 76 90 .parse() ··· 91 105 return Ok(()); 92 106 } 93 107 94 - let (hosts, next_cursor) = { 95 - let req = ListHosts { 96 - cursor: cursor.as_deref().map(Into::into), 97 - limit: Some(PAGE_LIMIT), 98 - }; 99 - loop { 100 - let resp = match client.xrpc(base.clone()).send(&req).await { 101 - Ok(r) => r, 102 - Err(e) => { 103 - warn!( 104 - error = %e, 105 - relay = %relay_host, 106 - "listHosts send failed; retrying in {RETRY_DELAY_SECS}s" 107 - ); 108 - tokio::select! { 109 - _ = token.cancelled() => { 110 - drain_workers(&mut workers).await; 111 - return Ok(()); 112 - } 113 - _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 114 - } 115 - continue; 116 - } 117 - }; 118 - match resp.parse() { 119 - Ok(out) => { 120 - let next = out.cursor.as_deref().map(str::to_owned); 121 - let hostnames = out 122 - .hosts 123 - .into_iter() 124 - .map(|h| h.hostname.to_string()) 125 - .collect::<Vec<_>>(); 126 - break (hostnames, next); 127 - } 128 - Err(e) => { 129 - warn!( 130 - error = %e, 131 - relay = %relay_host, 132 - "listHosts parse failed; retrying in {RETRY_DELAY_SECS}s" 133 - ); 134 - tokio::select! { 135 - _ = token.cancelled() => { 136 - drain_workers(&mut workers).await; 137 - return Ok(()); 138 - } 139 - _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 140 - } 141 - } 108 + let (hosts, next_cursor) = 109 + match fetch_hosts_page(&relay_host, &base, cursor.as_deref(), &client, &token).await { 110 + Some(page) => page, 111 + None => { 112 + drain_workers(&mut workers).await; 113 + return Ok(()); 142 114 } 115 + }; 116 + 117 + // Spawn a per-PDS backfill worker for each new host on this page. 118 + for hostname in hosts { 119 + if token.is_cancelled() { 120 + drain_workers(&mut workers).await; 121 + return Ok(()); 143 122 } 144 - }; 145 123 146 - // Spawn a per-PDS backfill worker for each new host. 147 - for hostname in hosts { 148 124 let pds_host = match Host::parse(&hostname) { 149 125 Ok(h) => h, 150 126 Err(e) => { ··· 179 155 let client2 = client.clone(); 180 156 let child = token.child_token(); 181 157 let host2 = pds_host.clone(); 158 + let resolver2 = resolver.clone(); 182 159 workers.spawn(async move { 183 - let outcome = backfill::run(host2.clone(), db2, client2, child).await; 160 + let outcome = 161 + backfill::run(host2.clone(), db2, client2, child, Some(resolver2)).await; 184 162 (host2, outcome) 185 163 }); 186 164 metrics::gauge!("lightrail_deep_crawl_workers").set(workers.len() as f64); ··· 211 189 } 212 190 } 213 191 } 192 + 193 + // --------------------------------------------------------------------------- 194 + // Page fetch 195 + // --------------------------------------------------------------------------- 196 + 197 + /// Fetch one `listHosts` page with retry logic. 198 + /// 199 + /// Returns `None` if the host exceeds `MAX_PAGE_FAILURES` transient errors or 200 + /// the token is cancelled during a retry sleep. 201 + async fn fetch_hosts_page( 202 + relay_host: &Host, 203 + base: &jacquard_common::url::Url, 204 + cursor: Option<&str>, 205 + client: &ThrottledClient, 206 + token: &CancellationToken, 207 + ) -> Option<(Vec<String>, Option<String>)> { 208 + let req = ListHosts { 209 + cursor: cursor.map(Into::into), 210 + limit: Some(PAGE_LIMIT), 211 + }; 212 + let mut failures: u32 = 0; 213 + loop { 214 + let result = match client.xrpc(base.clone()).send(&req).await { 215 + Err(e) => { 216 + warn!(error = %e, relay = %relay_host, "listHosts request failed"); 217 + None 218 + } 219 + Ok(resp) => match resp.parse() { 220 + Ok(out) => { 221 + let next = out.cursor.as_deref().map(str::to_owned); 222 + let hostnames = out 223 + .hosts 224 + .into_iter() 225 + .map(|h| h.hostname.to_string()) 226 + .collect::<Vec<_>>(); 227 + Some((hostnames, next)) 228 + } 229 + Err(e) => { 230 + warn!(error = %e, relay = %relay_host, "listHosts response parse failed"); 231 + None 232 + } 233 + }, 234 + }; 235 + 236 + match result { 237 + Some(page) => return Some(page), 238 + None => { 239 + failures += 1; 240 + if failures >= MAX_PAGE_FAILURES { 241 + warn!( 242 + relay = %relay_host, 243 + failures, 244 + "listHosts page failed {MAX_PAGE_FAILURES} times; abandoning pass" 245 + ); 246 + return None; 247 + } 248 + tokio::select! { 249 + _ = token.cancelled() => return None, 250 + _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 251 + } 252 + } 253 + } 254 + } 255 + } 256 + 257 + // --------------------------------------------------------------------------- 258 + // Worker pool helpers 259 + // --------------------------------------------------------------------------- 214 260 215 261 async fn drain_workers(workers: &mut JoinSet<(Host, Result<bool>)>) { 216 262 while let Some(result) = workers.join_next().await {