very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[crawler] rename relay producer to list_repos

dawn 98061737 bf4fc916

+32 -31
+7 -6
README.md
··· 67 67 68 68 the crawler is configured separately from the firehose via `CRAWLER_URLS`. each 69 69 source is a `[mode::]url` entry where the mode prefix is optional and defaults 70 - to `by_collection` in filter mode or `relay` in full-network mode. 70 + to `by_collection` in filter mode or `list_repos` in full-network mode. 71 71 72 - - `relay`: enumerates the network via `com.atproto.sync.listRepos`, then checks 73 - each repo's collections via `describeRepo`. used for full-network discovery. 72 + - `list_repos`: enumerates the network via `com.atproto.sync.listRepos`, checks 73 + each repo's collections via `describeRepo`. 74 74 - `by_collection`: queries `com.atproto.sync.listReposByCollection` for each 75 75 configured signal. more efficient for filtered indexing since it only surfaces 76 - repos that have matching records. cursors are stored per collection. 76 + repos that have matching records. cursors are stored per collection. note that 77 + it won't crawl anything if no signals are specified. 77 78 78 79 ``` 79 - CRAWLER_URLS=by_collection::https://lightrail.microcosm.blue,relay::wss://bsky.network 80 + CRAWLER_URLS=by_collection::https://lightrail.microcosm.blue,list_repos::wss://bsky.network 80 81 ``` 81 82 82 83 each source maintains its own cursor so restarts resume mid-pass. ··· 221 222 the source to restart from the beginning when re-added. 222 223 - returns `200 OK` if the source was found and removed, `404 Not Found` otherwise. 223 224 - `DELETE /crawler/cursors`: reset stored cursors for a given crawler URL. body: `{ "key": "..." }` 224 - where key is a URL. clears the relay crawler cursor as well as any by-collection 225 + where key is a URL. clears the list-repos crawler cursor as well as any by-collection 225 226 cursors associated with that URL. causes the next crawler pass to restart from the beginning. 226 227 227 228 ### firehose management
+9 -8
src/config.rs
··· 36 36 37 37 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 38 38 pub enum CrawlerMode { 39 - /// enumerate via `com.atproto.sync.listRepos`, then check signals with `describeRepo`. 40 - Relay, 39 + /// enumerate via `com.atproto.sync.listRepos`, check signals with `describeRepo`. 40 + ListRepos, 41 41 /// enumerate via `com.atproto.sync.listReposByCollection` for each configured signal. 42 + /// note: if no signals are specified, this won't crawl for any repos. 42 43 ByCollection, 43 44 } 44 45 45 46 impl CrawlerMode { 46 47 fn default_for(full_network: bool) -> Self { 47 48 full_network 48 - .then_some(Self::Relay) 49 + .then_some(Self::ListRepos) 49 50 .unwrap_or(Self::ByCollection) 50 51 } 51 52 } ··· 73 74 type Err = miette::Error; 74 75 fn from_str(s: &str) -> Result<Self> { 75 76 match s { 76 - "relay" => Ok(Self::Relay), 77 + "list_repos" | "list-repos" => Ok(Self::ListRepos), 77 78 "by_collection" | "by-collection" => Ok(Self::ByCollection), 78 79 _ => Err(miette::miette!( 79 80 "invalid crawler mode: expected 'relay' or 'by_collection'" ··· 85 86 impl fmt::Display for CrawlerMode { 86 87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 87 88 match self { 88 - Self::Relay => write!(f, "relay"), 89 + Self::ListRepos => write!(f, "list_repos"), 89 90 Self::ByCollection => write!(f, "by_collection"), 90 91 } 91 92 } ··· 350 351 backfill_concurrency_limit: 64, 351 352 crawler_sources: vec![CrawlerSource { 352 353 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(), 353 - mode: CrawlerMode::Relay, 354 + mode: CrawlerMode::ListRepos, 354 355 }], 355 356 db_worker_threads: 8, 356 357 db_max_journaling_size_mb: 1024, ··· 507 508 .filter_map(|s| CrawlerSource::parse(s, default_mode)) 508 509 .collect(), 509 510 Err(_) => match default_mode { 510 - CrawlerMode::Relay => relay_hosts 511 + CrawlerMode::ListRepos => relay_hosts 511 512 .iter() 512 513 .map(|url| CrawlerSource { 513 514 url: url.clone(), 514 - mode: CrawlerMode::Relay, 515 + mode: CrawlerMode::ListRepos, 515 516 }) 516 517 .collect(), 517 518 CrawlerMode::ByCollection => defaults.crawler_sources.clone(),
+4 -4
src/control/crawler.rs
··· 48 48 enabled: watch::Receiver<bool>, 49 49 ) -> ProducerHandle { 50 50 use crate::config::CrawlerMode; 51 - use crate::crawler::{ByCollectionProducer, RelayProducer}; 51 + use crate::crawler::{ByCollectionProducer, ListReposProducer}; 52 52 use std::time::Duration; 53 53 use tracing::Instrument; 54 54 55 55 let abort = match source.mode { 56 - CrawlerMode::Relay => { 56 + CrawlerMode::ListRepos => { 57 57 info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler"); 58 58 let span = tracing::info_span!("crawl", url = %source.url); 59 59 tokio::spawn( 60 - RelayProducer { 61 - relay_url: source.url.clone(), 60 + ListReposProducer { 61 + url: source.url.clone(), 62 62 checker: checker.clone(), 63 63 in_flight: in_flight.clone(), 64 64 tx: tx.clone(),
+2 -1
src/crawler/by_collection.rs
··· 28 28 } 29 29 30 30 impl ByCollectionProducer { 31 - /// hourly loop: runs one full pass per configured signal, then sleeps. 32 31 pub(crate) async fn run(mut self) -> Result<()> { 33 32 loop { 34 33 self.enabled.wait_enabled("by-collection crawler").await; ··· 47 46 "starting by-collection discovery pass" 48 47 ); 49 48 49 + // todo: make this parallel or make use of lightrail taking multiple collections 50 + // https://github.com/bluesky-social/atproto/pull/4733 50 51 for collection in &filter.signals { 51 52 self.enabled.wait_enabled("by-collection crawler").await; 52 53 let span = tracing::info_span!("by_collection", %collection);
+3 -5
src/crawler/mod.rs
··· 11 11 use url::Url; 12 12 13 13 mod by_collection; 14 - mod relay; 14 + mod list_repos; 15 15 pub mod throttle; 16 16 mod worker; 17 17 18 18 use throttle::Throttler; 19 19 20 20 pub(crate) use by_collection::ByCollectionProducer; 21 - pub(crate) use relay::{RelayProducer, RetryProducer, SignalChecker}; 21 + pub(crate) use list_repos::{ListReposProducer, RetryProducer, SignalChecker}; 22 22 pub(crate) use worker::{CrawlerBatch, CrawlerWorker}; 23 - 24 - // -- InFlight ------------------------------------------------------------ 25 23 26 24 #[derive(Clone)] 27 25 pub(crate) struct InFlight(Arc<HashSet<Did<'static>>>); ··· 134 132 let relay_host = relay_host.clone(); 135 133 let state = self.0.state.clone(); 136 134 async move { 137 - let cursor = relay::cursor_display(&state, &relay_host).await; 135 + let cursor = list_repos::cursor_display(&state, &relay_host).await; 138 136 format!("{domain}={cursor}").into() 139 137 } 140 138 })
+7 -7
src/crawler/relay.rs src/crawler/list_repos.rs
··· 386 386 } 387 387 } 388 388 389 - pub(crate) struct RelayProducer { 390 - pub(crate) relay_url: Url, 389 + pub(crate) struct ListReposProducer { 390 + pub(crate) url: Url, 391 391 pub(crate) checker: SignalChecker, 392 392 pub(crate) in_flight: InFlight, 393 393 pub(crate) tx: mpsc::Sender<CrawlerBatch>, ··· 395 395 pub(crate) stats: CrawlerStats, 396 396 } 397 397 398 - impl RelayProducer { 398 + impl ListReposProducer { 399 399 pub(crate) async fn run(mut self) -> Result<()> { 400 400 loop { 401 401 if let Err(e) = self.crawl().await { 402 - error!(err = ?e, relay = %self.relay_url, "fatal relay crawl error, restarting in 30s"); 402 + error!(err = ?e, relay = %self.url, "fatal relay crawl error, restarting in 30s"); 403 403 tokio::time::sleep(Duration::from_secs(30)).await; 404 404 } 405 405 } 406 406 } 407 407 408 408 async fn get_cursor(&self) -> Result<Option<SmolStr>> { 409 - let key = crawler_cursor_key(self.relay_url.as_str()); 409 + let key = crawler_cursor_key(self.url.as_str()); 410 410 let cursor_bytes = Db::get(self.checker.state.db.cursors.clone(), &key).await?; 411 411 Ok(cursor_bytes 412 412 .as_deref() ··· 415 415 416 416 async fn crawl(&mut self) -> Result<()> { 417 417 let db = &self.checker.state.db; 418 - let base = base_url(&self.relay_url)?; 418 + let base = base_url(&self.url)?; 419 419 420 420 let mut cursor = self.get_cursor().await?; 421 421 match &cursor { ··· 559 559 .as_ref() 560 560 .map(|c| -> Result<CursorUpdate> { 561 561 Ok(CursorUpdate { 562 - key: crawler_cursor_key(self.relay_url.as_str()), 562 + key: crawler_cursor_key(self.url.as_str()), 563 563 value: rmp_serde::to_vec(c.as_str()) 564 564 .into_diagnostic() 565 565 .wrap_err("cant serialize cursor")?,