lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

try throttling did resolutions

bumping up deep-crawl workers to get more host diversity on resync walked us right into this

(but it also kinda works -- the host diversity more than the throttle)

phil e53a4977 b98ae9c8

+101 -26
+88 -18
src/identity.rs
··· 16 16 //! configured, any error from the primary resolver retries against it. 17 17 18 18 use jacquard_common::IntoStatic; 19 + use std::num::NonZeroU32; 19 20 use std::sync::Arc; 20 21 use std::time::Duration; 21 22 22 23 use dashmap::DashMap; 24 + use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; 23 25 use jacquard_common::types::crypto::PublicKey; 24 26 use jacquard_common::types::string::Did; 25 27 use jacquard_common::url::Url; ··· 82 84 err.kind(), 83 85 JacquardErrorKind::HttpStatus(status) if status.as_u16() == 429 84 86 ) 87 + } 88 + 89 + // --------------------------------------------------------------------------- 90 + // Resolution throttle 91 + // --------------------------------------------------------------------------- 92 + 93 + /// Global identity-resolution rate limiter. 94 + /// 95 + /// Backfill tasks call [`Resolver::throttle_wait`] to stay within the budget. 96 + /// All callers (firehose, resync, backfill) consume tokens on cache-miss 97 + /// resolutions via an internal notify, so firehose activity naturally reduces 98 + /// the budget available to backfill. 99 + struct IdentityThrottle { 100 + limiter: DefaultDirectRateLimiter, 101 + token_interval: Duration, 102 + } 103 + 104 + impl IdentityThrottle { 105 + fn new(rate: NonZeroU32) -> Self { 106 + Self { 107 + limiter: RateLimiter::direct(Quota::per_second(rate)), 108 + token_interval: Duration::from_secs(1) / rate.get(), 109 + } 110 + } 85 111 } 86 112 87 113 // --------------------------------------------------------------------------- ··· 180 206 cache: IdentityCache, 181 207 /// Cancellation token for cooperative shutdown. 182 208 token: tokio_util::sync::CancellationToken, 209 + /// Optional global resolution rate limit. 210 + throttle: Option<IdentityThrottle>, 183 211 } 184 212 185 213 impl Resolver { 214 + /// Wait for a resolution token (backfill / resync). 215 + /// 216 + /// Blocks until a token is available or the cancellation token fires. 217 + /// No-op when no throttle is configured. 218 + async fn throttle_wait(&self) { 219 + let Some(t) = &self.throttle else { return }; 220 + while t.limiter.check().is_err() { 221 + if !self.token.sleep(t.token_interval).await { 222 + return; // shutting down 223 + } 224 + } 225 + } 226 + 227 + /// Try to consume a resolution token without waiting (firehose). 228 + /// 229 + /// Subtracts from the shared budget so backfill/resync slow down when the 230 + /// firehose is doing lots of resolutions. No-op if the rate is already 231 + /// exceeded or no throttle is configured. 232 + fn throttle_notify(&self) { 233 + if let Some(t) = &self.throttle { 234 + let _ = t.limiter.check(); 235 + } 236 + } 237 + 186 238 /// Resolve the PDS endpoint and ATProto signing key for `did`. 187 239 /// 188 - /// Returns a cached [`CachedIdentity`] on cache hit; otherwise resolves 189 - /// via the primary (and optionally fallback) resolver and caches the result. 240 + /// Returns a cached [`CachedIdentity`] on cache hit; otherwise waits for 241 + /// a throttle token and resolves via the network. Used by backfill and 242 + /// resync paths. 190 243 pub async fn resolve(&self, did: &Did<'_>) -> Result<Arc<CachedIdentity>, IdentityError> { 244 + self.resolve_inner(did, false).await 245 + } 246 + 247 + /// Like [`resolve`] but never waits on the throttle — only notifies it. 248 + /// 249 + /// Used by the firehose path where latency matters. Cache-miss resolutions 250 + /// still subtract from the shared budget so backfill/resync slow down. 251 + pub async fn resolve_firehose( 252 + &self, 253 + did: &Did<'_>, 254 + ) -> Result<Arc<CachedIdentity>, IdentityError> { 255 + self.resolve_inner(did, true).await 256 + } 257 + 258 + async fn resolve_inner( 259 + &self, 260 + did: &Did<'_>, 261 + firehose: bool, 262 + ) -> Result<Arc<CachedIdentity>, IdentityError> { 191 263 if let Some(cached) = self.cache.get(did) { 192 264 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "hit") 193 265 .increment(1); 194 266 return Ok(cached); 195 267 } 196 268 269 + if firehose { 270 + self.throttle_notify(); 271 + } else { 272 + self.throttle_wait().await; 273 + } 274 + 197 275 match self.resolve_uncached(did).await { 198 276 Ok((pds, pubkey)) => { 199 277 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "miss") ··· 206 284 Err(e) 207 285 } 208 286 } 209 - } 210 - 211 - /// Resolve without touching the cache. Used by the backfill validation path 212 - /// to avoid populating the cache with entries that may be rejected. 213 - pub async fn resolve_no_cache( 214 - &self, 215 - did: &Did<'_>, 216 - ) -> Result<(Url, PublicKey<'static>), IdentityError> { 217 - self.resolve_uncached(did).await 218 - } 219 - 220 - /// Insert a pre-resolved identity into the cache. 221 - /// Called after backfill validation accepts a DID, to warm the cache for 222 - /// the resync worker that will pick it up shortly. 223 - pub fn insert_resolved(&self, did: Did<'static>, pds: Url, pubkey: PublicKey<'static>) { 224 - self.cache.insert(did, pds, pubkey); 225 287 } 226 288 227 289 /// Look up `did` in the cache without making network calls. ··· 336 398 plc_url: Option<Url>, 337 399 cache_size: u64, 338 400 token: tokio_util::sync::CancellationToken, 401 + identity_qps: Option<NonZeroU32>, 339 402 ) -> Resolver { 340 403 let cache = IdentityCache::new(cache_size); 404 + let throttle = identity_qps.map(|rate| { 405 + info!(rate = rate.get(), "identity resolution throttle enabled"); 406 + IdentityThrottle::new(rate) 407 + }); 341 408 342 409 match (slingshot_url, plc_url) { 343 410 (None, plc) => { ··· 353 420 fallback: None, 354 421 cache, 355 422 token: token.clone(), 423 + throttle, 356 424 } 357 425 } 358 426 (Some(slingshot), None) => { ··· 368 436 fallback: None, 369 437 cache, 370 438 token: token.clone(), 439 + throttle, 371 440 } 372 441 } 373 442 (Some(slingshot), Some(plc)) => { ··· 383 452 fallback: Some(make_resolver(PlcSource::PlcDirectory { base: plc })), 384 453 cache, 385 454 token, 455 + throttle, 386 456 } 387 457 } 388 458 }
+7
src/main.rs
··· 49 49 #[arg(long, env = "LIGHTRAIL_IDENT_CACHE_SIZE", default_value_t = 2_000_000)] 50 50 ident_cache_size: u64, 51 51 52 + /// Global identity resolution rate limit (requests/sec). 53 + /// Backfill tasks wait for tokens; firehose/resync proceed freely but 54 + /// subtract from the budget so backfill slows when the firehose is busy. 55 + #[arg(long, env = "LIGHTRAIL_IDENTITY_RESOLUTION_QPS")] 56 + identity_resolution_qps: Option<std::num::NonZeroU32>, 57 + 52 58 /// Maximum concurrent firehose commit worker tasks. 53 59 #[arg(long, env = "LIGHTRAIL_MAX_FIREHOSE_WORKERS", default_value_t = 6)] 54 60 max_firehose_workers: usize, ··· 139 145 plc_url, 140 146 ident_cache_size, 141 147 token.clone(), 148 + args.identity_resolution_qps, 142 149 )); 143 150 144 151 if let Some(addr) = args.metrics_listen {
+4 -7
src/sync/backfill.rs
··· 267 267 /// Filter `dids` to those whose resolved PDS endpoint matches `host`. 268 268 /// 269 269 /// Bypasses the identity cache to avoid populating it with entries that may 270 - /// be rejected. Accepted DIDs are inserted into the cache explicitly so the 271 - /// resync worker gets a warm hit when it picks them up shortly after. 272 270 /// Returns early with whatever has been validated so far if the token is cancelled. 273 271 async fn validate_dids( 274 272 dids: Vec<Did<'static>>, ··· 279 277 let host_str = host.to_string(); 280 278 let mut valid = Vec::with_capacity(dids.len()); 281 279 for did in dids { 282 - let Some(r) = token.run(resolver.resolve_no_cache(&did)).await else { 280 + let Some(r) = token.run(resolver.resolve(&did)).await else { 283 281 break; 284 282 }; 285 283 match r { 286 - Ok((pds_url, pubkey)) if pds_url.host_str() == Some(host_str.as_str()) => { 287 - resolver.insert_resolved(did.clone().into_static(), pds_url, pubkey); 284 + Ok(resolved) if resolved.pds.host_str() == Some(host_str.as_str()) => { 288 285 valid.push(did); 289 286 } 290 - Ok((pds_url, _)) => { 287 + Ok(resolved) => { 291 288 metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "pds_mismatch") 292 289 .increment(1); 293 - trace!(did = %did, resolved_pds = %pds_url, expected = %host, 290 + trace!(did = %did, resolved_pds = %resolved.pds, expected = %host, 294 291 "DID resolves to different PDS; skipping"); 295 292 } 296 293 Err(e) => {
+1
src/sync/firehose/event_dispatcher.rs
··· 437 437 None, 438 438 1_000, 439 439 tokio_util::sync::CancellationToken::new(), 440 + None, 440 441 )) 441 442 } 442 443
+1 -1
src/sync/firehose/validate.rs
··· 86 86 resolver: &Resolver, 87 87 label: &'static str, 88 88 ) -> Option<Arc<CachedIdentity>> { 89 - match resolver.resolve(did).await { 89 + match resolver.resolve_firehose(did).await { 90 90 Ok(r) => Some(r), 91 91 Err(e) => { 92 92 debug!(did = %did, error = %e, "{label} dropped: DID resolution failed");