lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

lenient relay-mode for backfill retries

keep trying more for longer with the relay

phil 779491aa e5a5460b

+101 -25
+1 -1
src/main.rs
··· 209 209 client, 210 210 token.clone(), 211 211 resolver, 212 - false, 212 + backfill::BackfillMode::Relay, 213 213 discovery_queue, 214 214 ) 215 215 .await
+90 -22
src/sync/backfill.rs
··· 39 39 use std::time::SystemTime; 40 40 41 41 const PAGE_LIMIT: i64 = 500; 42 - /// Delay between retry attempts after a transient page failure. 43 - const RETRY_DELAY_SECS: u64 = 10; 44 - /// Maximum consecutive transient failures before giving up on this host. 45 - const MAX_PAGE_FAILURES: u32 = 3; 42 + 43 + // --------------------------------------------------------------------------- 44 + // Mode + retry policy 45 + // --------------------------------------------------------------------------- 46 + 47 + /// Which host role this backfill is running against. 48 + /// 49 + /// Bundles two decisions that always move together: 50 + /// - whether to validate DIDs against the host (trust model), and 51 + /// - how patiently to retry transient page failures (criticality). 52 + #[derive(Clone, Copy, Debug, PartialEq, Eq)] 53 + pub enum BackfillMode { 54 + /// Primary relay / upstream host. Trusted for DID listings; retried 55 + /// generously because a transient outage here will take the whole service 56 + /// down when the task exits. 57 + Relay, 58 + /// Untrusted PDS discovered via deep crawl. DIDs are validated against the 59 + /// host, and a few transient failures are enough to move on. 60 + DeepCrawl, 61 + } 62 + 63 + impl BackfillMode { 64 + /// True when listed DIDs must be independently verified to actually live 65 + /// on `host` before we trust their account status. 66 + fn validates_dids(self) -> bool { 67 + matches!(self, Self::DeepCrawl) 68 + } 69 + 70 + fn retry_policy(self) -> RetryPolicy { 71 + match self { 72 + Self::Relay => RetryPolicy::RELAY, 73 + Self::DeepCrawl => RetryPolicy::DEEP_CRAWL, 74 + } 75 + } 76 + } 77 + 78 + /// How many transient page failures to tolerate and how long to wait between. 79 + struct RetryPolicy { 80 + max_page_failures: u32, 81 + retry_delay: Duration, 82 + } 83 + 84 + impl RetryPolicy { 85 + /// Relay budget: 60 attempts × 30s = 30 minutes of retries before giving 86 + /// up and triggering service shutdown. Enough to absorb typical relay 87 + /// hiccups without flapping. 88 + const RELAY: Self = Self { 89 + max_page_failures: 60, 90 + retry_delay: Duration::from_secs(30), 91 + }; 92 + /// Deep-crawl budget: 3 × 10s. Fast give-up so dead PDSes don't monopolise 93 + /// crawl workers. 94 + const DEEP_CRAWL: Self = Self { 95 + max_page_failures: 3, 96 + retry_delay: Duration::from_secs(10), 97 + }; 98 + } 46 99 47 100 // --------------------------------------------------------------------------- 48 101 // Listed account state ··· 76 129 77 130 /// Walk `listRepos` on `host` and enqueue new repos for resync. 78 131 /// 79 - /// When `validate` is true (deep crawl / untrusted PDS), DIDs are verified to 80 - /// actually live on `host` and non-matching ones are rejected. Validated DIDs 81 - /// are considered authoritative for account status — if the PDS says an account 82 - /// is deactivated, we trust it even if our local state is Active. Without 83 - /// validation (relay backfill), we only write non-active status for accounts 84 - /// we haven't seen before, to avoid overwriting Active state with stale relay 85 - /// data (e.g. accounts that migrated off and appear deactivated on the old PDS). 132 + /// `mode` selects both the trust model (whether DIDs are validated against 133 + /// `host`) and the retry policy (how long we retry transient page failures 134 + /// before giving up). See [`BackfillMode`]. 135 + /// 136 + /// In `DeepCrawl` mode, DIDs are verified to actually live on `host` and 137 + /// non-matching ones are rejected. Validated DIDs are considered authoritative 138 + /// for account status — if the PDS says an account is deactivated, we trust 139 + /// it even if our local state is Active. In `Relay` mode we only write 140 + /// non-active status for accounts we haven't seen before, to avoid 141 + /// overwriting Active state with stale relay data (e.g. accounts that 142 + /// migrated off and appear deactivated on the old PDS). 86 143 pub async fn run( 87 144 host: Host, 88 145 db: DbRef, 89 146 client: ThrottledClient, 90 147 token: CancellationToken, 91 148 resolver: Arc<Resolver>, 92 - validate: bool, 149 + mode: BackfillMode, 93 150 discovery_queue: Arc<DiscoveryQueue>, 94 151 ) -> Result<bool> { 152 + let validate = mode.validates_dids(); 153 + let retry_policy = mode.retry_policy(); 95 154 let base: jacquard_common::url::Url = format!("https://{host}") 96 155 .parse() 97 156 .map_err(|e: jacquard_common::url::ParseError| crate::error::Error::Other(e.to_string()))?; ··· 117 176 return Ok(false); 118 177 } 119 178 120 - let (dids, next_cursor) = 121 - match fetch_page(&host, &base, cursor.as_deref(), &client, &token).await { 122 - Some(page) => page, 123 - None => return Ok(false), 124 - }; 179 + let (dids, next_cursor) = match fetch_page( 180 + &host, 181 + &base, 182 + cursor.as_deref(), 183 + &client, 184 + &token, 185 + &retry_policy, 186 + ) 187 + .await 188 + { 189 + Some(page) => page, 190 + None => return Ok(false), 191 + }; 125 192 126 193 let page_len = dids.len(); 127 194 ··· 228 295 229 296 /// Fetch one `listRepos` page with retry logic. 230 297 /// 231 - /// Returns `None` if the host gives a 4xx, exceeds `MAX_PAGE_FAILURES` 298 + /// Returns `None` if the host gives a 4xx, exceeds `policy.max_page_failures` 232 299 /// transient errors, or the token is cancelled (including mid-request). 233 300 async fn fetch_page( 234 301 host: &Host, ··· 236 303 cursor: Option<&str>, 237 304 client: &ThrottledClient, 238 305 token: &CancellationToken, 306 + policy: &RetryPolicy, 239 307 ) -> Option<(Vec<(Did<'static>, RepoState)>, Option<String>)> { 240 308 let req = ListRepos { 241 309 cursor: cursor.map(Into::into), ··· 281 349 Some(page) => return Some(page), 282 350 None => { 283 351 failures += 1; 284 - if failures >= MAX_PAGE_FAILURES { 285 - warn!(host = %host, failures, 286 - "listRepos page failed {MAX_PAGE_FAILURES} times; giving up on this host"); 352 + if failures >= policy.max_page_failures { 353 + warn!(host = %host, failures, max = policy.max_page_failures, 354 + "listRepos page failed too many times; giving up on this host"); 287 355 return None; 288 356 } 289 - if !token.sleep(Duration::from_secs(RETRY_DELAY_SECS)).await { 357 + if !token.sleep(policy.retry_delay).await { 290 358 return None; 291 359 } 292 360 }
+10 -2
src/sync/deep_crawl.rs
··· 167 167 let resolver2 = resolver.clone(); 168 168 let dq2 = discovery_queue.clone(); 169 169 workers.spawn(async move { 170 - let outcome = 171 - backfill::run(host2.clone(), db2, client2, child, resolver2, true, dq2).await; 170 + let outcome = backfill::run( 171 + host2.clone(), 172 + db2, 173 + client2, 174 + child, 175 + resolver2, 176 + backfill::BackfillMode::DeepCrawl, 177 + dq2, 178 + ) 179 + .await; 172 180 (host2, outcome) 173 181 }); 174 182 metrics::gauge!("lightrail_deep_crawl_workers").set(workers.len() as f64);