lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

cancel in more places for quick shutdown

phil b98ae9c8 d95d941a

+30 -8
+17 -5
src/identity.rs
··· 31 31 use mini_moka::sync::Cache; 32 32 use tracing::{debug, info, warn}; 33 33 34 + use crate::util::TokenExt; 35 + 34 36 #[derive(Debug, thiserror::Error)] 35 37 pub enum IdentityError { 36 38 #[error("jacquard: {0}")] ··· 176 178 primary: JacquardResolver, 177 179 fallback: Option<JacquardResolver>, 178 180 cache: IdentityCache, 181 + /// Cancellation token for cooperative shutdown. 182 + token: tokio_util::sync::CancellationToken, 179 183 } 180 184 181 185 impl Resolver { ··· 240 244 did: &Did<'_>, 241 245 ) -> Result<(Url, PublicKey<'static>), IdentityError> { 242 246 let doc = match &self.fallback { 243 - None => Self::try_backend(&self.primary, did).await, 244 - Some(fb) => match Self::try_backend(&self.primary, did).await { 247 + None => self.try_backend(&self.primary, did).await, 248 + Some(fb) => match self.try_backend(&self.primary, did).await { 245 249 ok @ Ok(_) => ok, 246 250 Err(primary_err) => { 247 251 debug!( ··· 249 253 error = %primary_err, 250 254 "primary identity resolution failed, trying fallback" 251 255 ); 252 - Self::try_backend(fb, did).await 256 + self.try_backend(fb, did).await 253 257 } 254 258 }, 255 259 }?; ··· 275 279 /// 276 280 /// Non-rate-limit errors return immediately. After exhausting retries the 277 281 /// final 429 error is returned so the caller can fall through to a fallback 278 - /// backend or propagate the error. 282 + /// backend or propagate the error. Sleeps are cancellation-aware when a 283 + /// token has been provided via [`Resolver::set_cancellation_token`]. 279 284 async fn try_backend( 285 + &self, 280 286 backend: &JacquardResolver, 281 287 did: &Did<'_>, 282 288 ) -> std::result::Result< ··· 297 303 delay_ms = delay.as_millis() as u64, 298 304 "identity service rate-limited, retrying" 299 305 ); 300 - tokio::time::sleep(delay).await; 306 + if !self.token.sleep(delay).await { 307 + return Err(e); // shutting down 308 + } 301 309 } 302 310 Err(e) => { 303 311 if is_rate_limited(&e) { ··· 327 335 slingshot_url: Option<Url>, 328 336 plc_url: Option<Url>, 329 337 cache_size: u64, 338 + token: tokio_util::sync::CancellationToken, 330 339 ) -> Resolver { 331 340 let cache = IdentityCache::new(cache_size); 332 341 ··· 343 352 primary: make_resolver(PlcSource::PlcDirectory { base }), 344 353 fallback: None, 345 354 cache, 355 + token: token.clone(), 346 356 } 347 357 } 348 358 (Some(slingshot), None) => { ··· 357 367 primary: make_resolver(PlcSource::Slingshot { base: slingshot }), 358 368 fallback: None, 359 369 cache, 370 + token: token.clone(), 360 371 } 361 372 } 362 373 (Some(slingshot), Some(plc)) => { ··· 371 382 primary: make_resolver(PlcSource::Slingshot { base: slingshot }), 372 383 fallback: Some(make_resolver(PlcSource::PlcDirectory { base: plc })), 373 384 cache, 385 + token, 374 386 } 375 387 } 376 388 }
+3 -1
src/main.rs
··· 129 129 "could not get host from --upstream".to_string(), 130 130 ))?; 131 131 132 + let token = CancellationToken::new(); 133 + 132 134 let slingshot_url = args.slingshot_url; 133 135 let plc_url = args.plc_url; 134 136 let ident_cache_size = args.ident_cache_size; ··· 136 138 slingshot_url, 137 139 plc_url, 138 140 ident_cache_size, 141 + token.clone(), 139 142 )); 140 143 141 144 if let Some(addr) = args.metrics_listen { ··· 144 147 145 148 let db = storage::open(&args.db_path, args.fjall_cache_mb)?; 146 149 let client = lightrail::http::build_client(args.crawl_qps); 147 - let token = CancellationToken::new(); 148 150 149 151 let dispatcher_state: resync::dispatcher::DispatcherState = std::sync::Arc::new( 150 152 std::sync::Mutex::new(resync::dispatcher::DispatcherSnapshot::default()),
+4 -1
src/sync/backfill.rs
··· 114 114 let dids_with_hosts: Vec<(Did<'static>, Arc<Url>)> = { 115 115 let mut out = Vec::with_capacity(dids.len()); 116 116 for did in dids { 117 - let pds_host = match resolver.resolve(&did).await { 117 + let Some(res) = token.run(resolver.resolve(&did)).await else { 118 + return Ok(false); // cancelled 119 + }; 120 + let pds_host = match res { 118 121 Ok(resolved) => resolved.pds.clone(), 119 122 Err(e) => { 120 123 error!(did = %did, error = %e, "failed to resolve host for validated did; not enqueuing resync");
+6 -1
src/sync/firehose/event_dispatcher.rs
··· 432 432 } 433 433 434 434 fn make_resolver() -> Arc<crate::identity::Resolver> { 435 - Arc::new(crate::identity::build_resolver(None, None, 1_000)) 435 + Arc::new(crate::identity::build_resolver( 436 + None, 437 + None, 438 + 1_000, 439 + tokio_util::sync::CancellationToken::new(), 440 + )) 436 441 } 437 442 438 443 /// Drive the dispatcher loop until no workers remain or a cancellation