very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[crawler] sleep only a second to recheck signals in by collection, .load() is fast

dawn ea534517 b3d49c28

+7 -13
+5 -7
src/crawler/by_collection.rs
··· 35 35 36 36 let filter = self.state.filter.load(); 37 37 if filter.signals.is_empty() { 38 - debug!("no signals configured, by-collection crawler sleeping 5m"); 39 - tokio::time::sleep(Duration::from_secs(300)).await; 38 + trace!("no signals configured, by-collection crawler sleeping 1s"); 39 + tokio::time::sleep(Duration::from_secs(1)).await; 40 40 continue; 41 41 } 42 - 43 - let signals: Vec<String> = filter.signals.iter().map(|s| s.to_string()).collect(); 44 - drop(filter); 42 + let filter = arc_swap::Guard::into_inner(filter); 45 43 46 44 info!( 47 45 host = self.index_url.host_str(), 48 - signal_count = signals.len(), 46 + signal_count = filter.signals.len(), 49 47 "starting by-collection discovery pass" 50 48 ); 51 49 52 - for collection in &signals { 50 + for collection in &filter.signals { 53 51 self.enabled.wait_enabled("by-collection crawler").await; 54 52 let span = tracing::info_span!("by_collection", %collection); 55 53 use tracing::Instrument as _;
+2 -6
src/crawler/relay.rs
··· 144 144 false 145 145 } 146 146 147 - // -- SignalChecker -------------------------------------------------------- 148 - 149 147 /// shared describeRepo signal-checking logic used by both relay and retry producers. 150 148 #[derive(Clone)] 151 149 pub(crate) struct SignalChecker { ··· 315 313 /// into `batch`, and returns a vec containing only the guards for confirmed DIDs. 316 314 /// 317 315 /// guards for non-confirmed DIDs are dropped here, releasing their in-flight slots. 318 - /// confirmed DIDs' retry entries are NOT removed here — the worker removes them 316 + /// confirmed DIDs' retry entries are NOT removed here. the worker removes them 319 317 /// atomically with the pending insert. 320 318 pub(super) async fn check_signals_batch( 321 319 &self, ··· 387 385 .collect()) 388 386 } 389 387 } 390 - 391 - // -- RelayProducer -------------------------------------------------------- 392 388 393 389 pub(crate) struct RelayProducer { 394 390 pub(crate) relay_url: Url, ··· 465 461 } 466 462 }; 467 463 468 - let filter = self.checker.state.filter.load(); 464 + let filter = self.checker.state.filter.load_full(); 469 465 470 466 struct ParseResult { 471 467 unknown_dids: Vec<Did<'static>>,