lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

identity resolution backoff

phil d95d941a aca6a616

+71 -5
+71 -5
src/identity.rs
··· 25 25 use jacquard_common::url::Url; 26 26 use jacquard_identity::JacquardResolver; 27 27 use jacquard_identity::resolver::{ 28 - IdentityError as JacquardIdentityError, IdentityResolver, PlcSource, ResolverOptions, 28 + IdentityError as JacquardIdentityError, IdentityErrorKind as JacquardErrorKind, 29 + IdentityResolver, PlcSource, ResolverOptions, 29 30 }; 30 31 use mini_moka::sync::Cache; 31 - use tracing::info; 32 + use tracing::{debug, info, warn}; 32 33 33 34 #[derive(Debug, thiserror::Error)] 34 35 pub enum IdentityError { ··· 66 67 /// they happen, so proactive [`Resolver::invalidate_did`] handles updates 67 68 /// before this TTL expires in practice. 68 69 const CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60); 70 + 71 + /// Maximum retries per identity backend on HTTP 429 responses. 72 + const RATE_LIMIT_RETRIES: u32 = 3; 73 + 74 + /// Initial delay between rate-limit retries; doubles each attempt (500ms → 1s → 2s). 75 + const RATE_LIMIT_BASE_DELAY: Duration = Duration::from_millis(500); 76 + 77 + /// Check whether a jacquard identity error is an HTTP 429 rate-limit response. 78 + fn is_rate_limited(err: &JacquardIdentityError) -> bool { 79 + matches!( 80 + err.kind(), 81 + JacquardErrorKind::HttpStatus(status) if status.as_u16() == 429 82 + ) 83 + } 69 84 70 85 // --------------------------------------------------------------------------- 71 86 // Interned PDS URL cache ··· 225 240 did: &Did<'_>, 226 241 ) -> Result<(Url, PublicKey<'static>), IdentityError> { 227 242 let doc = match &self.fallback { 228 - None => self.primary.resolve_did_doc_owned(did).await, 229 - Some(fb) => match self.primary.resolve_did_doc_owned(did).await { 243 + None => Self::try_backend(&self.primary, did).await, 244 + Some(fb) => match Self::try_backend(&self.primary, did).await { 230 245 ok @ Ok(_) => ok, 231 - Err(_) => fb.resolve_did_doc_owned(did).await, 246 + Err(primary_err) => { 247 + debug!( 248 + did = did.as_str(), 249 + error = %primary_err, 250 + "primary identity resolution failed, trying fallback" 251 + ); 252 + Self::try_backend(fb, did).await 253 + } 232 254 }, 233 255 }?; 234 256 ··· 247 269 })?; 248 270 249 271 Ok((pds, pubkey)) 272 + } 273 + 274 + /// Try resolving a DID document against a single backend, retrying on 429. 275 + /// 276 + /// Non-rate-limit errors return immediately. After exhausting retries the 277 + /// final 429 error is returned so the caller can fall through to a fallback 278 + /// backend or propagate the error. 279 + async fn try_backend( 280 + backend: &JacquardResolver, 281 + did: &Did<'_>, 282 + ) -> std::result::Result< 283 + jacquard_common::types::did_doc::DidDocument<'static>, 284 + JacquardIdentityError, 285 + > { 286 + for attempt in 0..=RATE_LIMIT_RETRIES { 287 + match backend.resolve_did_doc_owned(did).await { 288 + Ok(doc) => return Ok(doc), 289 + Err(e) if is_rate_limited(&e) && attempt < RATE_LIMIT_RETRIES => { 290 + metrics::counter!("lightrail_identity_rate_limited_total").increment(1); 291 + let delay = RATE_LIMIT_BASE_DELAY * 2u32.pow(attempt); 292 + metrics::counter!("lightrail_identity_rate_limit_backoff_ms") 293 + .increment(delay.as_millis() as u64); 294 + debug!( 295 + did = did.as_str(), 296 + attempt = attempt + 1, 297 + delay_ms = delay.as_millis() as u64, 298 + "identity service rate-limited, retrying" 299 + ); 300 + tokio::time::sleep(delay).await; 301 + } 302 + Err(e) => { 303 + if is_rate_limited(&e) { 304 + metrics::counter!("lightrail_identity_rate_limited_total").increment(1); 305 + warn!( 306 + did = did.as_str(), 307 + retries = RATE_LIMIT_RETRIES, 308 + "identity resolution rate limit persisted after retries" 309 + ); 310 + } 311 + return Err(e); 312 + } 313 + } 314 + } 315 + unreachable!() 250 316 } 251 317 } 252 318