very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[crawler] allow multiple relays to be crawled

dawn a0d00a78 4e627267

+203 -61
+11 -1
README.md
··· 28 28 | :--- | :--- | :--- | 29 29 | `DATABASE_PATH` | `./hydrant.db` | path to the database folder. | 30 30 | `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). standard [`tracing` env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html). | 31 - | `RELAY_HOST` | `wss://relay.fire.hose.cam` | websocket URL of the upstream firehose relay. | 31 + | `RELAY_HOST` | `http://relay.fire.hose.cam` | URL of the relay. | 32 + | `RELAY_HOSTS` | | comma-separated list of relay URLs. if unset, falls back to `RELAY_HOST`. | 32 33 | `PLC_URL` | `https://plc.wtf` | base URL(s) of the PLC directory (comma-separated for multiple). | 33 34 | `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. | 34 35 | `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter on startup (e.g. `app.bsky.feed.post,app.bsky.graph.*`). | ··· 51 52 | `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. | 52 53 | `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 53 54 | `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 55 + 56 + ### multi-relay crawling 57 + 58 + the crawler supports querying multiple relays simultaneously. when `RELAY_HOSTS` is configured with multiple URLs: 59 + 60 + - one independent crawling loop is spawned per relay 61 + - each relay maintains its own cursor state 62 + - all crawlers share the same pending queue for backfill 63 + - firehose connection uses the first relay in the list 54 64 55 65 ## api 56 66
+7 -4
src/api/debug.rs
··· 361 361 let cids = tokio::task::spawn_blocking(move || { 362 362 let mut unique_cids: std::collections::HashSet<String> = std::collections::HashSet::new(); 363 363 let db = &state_clone.db; 364 - 364 + 365 365 // 1. Scan records 366 366 let records_prefix = crate::db::keys::record_prefix_did(&did); 367 367 for guard in db.records.prefix(&records_prefix) { ··· 371 371 } 372 372 } 373 373 } 374 - 374 + 375 375 // 2. Scan events 376 376 let trimmed_did = crate::db::types::TrimmedDid::from(&did); 377 377 for guard in db.events.iter() { ··· 385 385 } 386 386 } 387 387 } 388 - 388 + 389 389 let mut counts: std::collections::HashMap<String, i64> = std::collections::HashMap::new(); 390 390 for cid_str in unique_cids { 391 391 if let Ok(cid) = cid::Cid::from_str(&cid_str) { 392 392 let cid_bytes = fjall::Slice::from(cid.to_bytes()); 393 - let count = db.block_refcounts.read_sync(cid_bytes.as_ref(), |_, v| *v).unwrap_or(0); 393 + let count = db 394 + .block_refcounts 395 + .read_sync(cid_bytes.as_ref(), |_, v| *v) 396 + .unwrap_or(0); 394 397 counts.insert(cid_str, count); 395 398 } 396 399 }
+20 -4
src/config.rs
··· 37 37 #[derive(Debug, Clone)] 38 38 pub struct Config { 39 39 pub database_path: PathBuf, 40 - pub relay_host: Url, 40 + pub relays: Vec<Url>, 41 41 pub plc_urls: Vec<Url>, 42 42 pub full_network: bool, 43 43 pub ephemeral: bool, ··· 90 90 }; 91 91 } 92 92 93 - let relay_host = cfg!( 93 + let relay_host: Url = cfg!( 94 94 "RELAY_HOST", 95 95 Url::parse("wss://relay.fire.hose.cam").unwrap() 96 96 ); 97 + let relay_hosts = std::env::var("HYDRANT_RELAY_HOSTS") 98 + .ok() 99 + .and_then(|hosts| { 100 + hosts 101 + .split(',') 102 + .map(|s| Url::parse(s.trim())) 103 + .collect::<Result<Vec<_>, _>>() 104 + .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}")) 105 + .ok() 106 + }) 107 + .unwrap_or_default(); 108 + let relay_hosts = relay_hosts 109 + .is_empty() 110 + .then(|| vec![relay_host]) 111 + .unwrap_or(relay_hosts); 112 + 97 113 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 98 114 .ok() 99 115 .map(|s| { ··· 180 196 181 197 Ok(Self { 182 198 database_path, 183 - relay_host, 199 + relays: relay_hosts, 184 200 plc_urls, 185 201 ephemeral, 186 202 full_network, ··· 225 241 const LABEL_WIDTH: usize = 27; 226 242 227 243 writeln!(f, "hydrant configuration:")?; 228 - config_line!(f, "relay host", self.relay_host)?; 244 + config_line!(f, "relay hosts", format_args!("{:?}", self.relays))?; 229 245 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?; 230 246 config_line!(f, "full network indexing", self.full_network)?; 231 247 config_line!(f, "verify signatures", self.verify_signatures)?;
+138 -42
src/crawler/mod.rs
··· 1 1 use crate::crawler::throttle::ThrottleHandle; 2 + use crate::db::keys::crawler_cursor_key; 2 3 use crate::db::{Db, keys, ser_repo_state}; 3 4 use crate::state::AppState; 4 5 use crate::types::RepoState; ··· 13 14 use rand::RngExt; 14 15 use rand::rngs::SmallRng; 15 16 use reqwest::StatusCode; 17 + use scc::HashSet; 16 18 use serde::{Deserialize, Serialize}; 17 19 use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 18 20 use std::collections::HashMap; ··· 139 141 false 140 142 } 141 143 142 - const CURSOR_KEY: &[u8] = b"crawler_cursor"; 143 - 144 144 #[derive(Debug, Serialize, Deserialize)] 145 145 enum Cursor { 146 146 Done(SmolStr), ··· 149 149 150 150 pub mod throttle; 151 151 use throttle::{OrFailure, Throttler}; 152 + 153 + type InFlight = Arc<HashSet<Did<'static>>>; 154 + 155 + struct InFlightGuard { 156 + set: InFlight, 157 + did: Did<'static>, 158 + } 159 + 160 + impl Drop for InFlightGuard { 161 + fn drop(&mut self) { 162 + self.set.remove_sync(&self.did); 163 + } 164 + } 165 + 166 + #[must_use] 167 + struct InFlightRepos { 168 + repos: Vec<Did<'static>>, 169 + guards: Vec<InFlightGuard>, 170 + } 152 171 153 172 pub struct Crawler { 154 173 state: Arc<AppState>, 155 - relay_host: Url, 174 + relays: Vec<Url>, 156 175 http: reqwest::Client, 157 176 max_pending: usize, 158 177 resume_pending: usize, ··· 160 179 crawled_count: AtomicUsize, 161 180 throttled: AtomicBool, 162 181 pds_throttler: Throttler, 182 + in_flight: InFlight, 163 183 } 164 184 165 185 impl Crawler { 166 186 pub fn new( 167 187 state: Arc<AppState>, 168 - relay_host: Url, 188 + relay_hosts: Vec<Url>, 169 189 max_pending: usize, 170 190 resume_pending: usize, 171 191 ) -> Self { ··· 181 201 182 202 Self { 183 203 state, 184 - relay_host, 204 + relays: relay_hosts, 185 205 http, 186 206 max_pending, 187 207 resume_pending, ··· 189 209 crawled_count: AtomicUsize::new(0), 190 210 throttled: AtomicBool::new(false), 191 211 pds_throttler: Throttler::new(), 212 + in_flight: Arc::new(HashSet::new()), 192 213 } 193 214 } 194 215 195 - async fn get_cursor(&self) -> Result<Cursor> { 196 - let cursor_bytes = Db::get(self.state.db.cursors.clone(), CURSOR_KEY).await?; 216 + async fn get_cursor(&self, relay_host: &Url) -> Result<Cursor> { 217 + let key = crawler_cursor_key(relay_host); 218 + let cursor_bytes = Db::get(self.state.db.cursors.clone(), &key).await?; 197 219 let cursor: Cursor = cursor_bytes 198 220 .as_deref() 199 221 .map(rmp_serde::from_slice) ··· 232 254 } 233 255 234 256 let elapsed = last_time.elapsed().as_secs_f64(); 235 - let cursor = Self::get_cursor(&crawler).await.map_or_else( 236 - |e| e.to_smolstr(), 237 - |c| match c { 238 - Cursor::Done(c) => format_smolstr!("done({c})"), 239 - Cursor::Next(None) => "none".to_smolstr(), 240 - Cursor::Next(Some(c)) => c.to_smolstr(), 241 - }, 242 - ); 257 + 258 + // fetch all cursors 259 + use futures::future::join_all; 260 + let cursor_futures: Vec<_> = crawler 261 + .relays 262 + .iter() 263 + .map(|relay_host| { 264 + let domain = relay_host.host_str().unwrap_or("unknown"); 265 + let relay_host = relay_host.clone(); 266 + let crawler = crawler.clone(); 267 + async move { 268 + let cursor_str = match crawler.get_cursor(&relay_host).await { 269 + Ok(c) => match c { 270 + Cursor::Done(c) => format_smolstr!("done({c})"), 271 + Cursor::Next(None) => "none".to_smolstr(), 272 + Cursor::Next(Some(c)) => c.to_smolstr(), 273 + }, 274 + Err(e) => e.to_smolstr(), 275 + }; 276 + format_smolstr!("{domain}={cursor_str}") 277 + } 278 + }) 279 + .collect(); 280 + 281 + let cursors: Vec<_> = join_all(cursor_futures).await.into_iter().collect(); 282 + 283 + let cursors_display = if cursors.is_empty() { 284 + "none".to_smolstr() 285 + } else { 286 + cursors.join(", ").into() 287 + }; 288 + 243 289 info!( 244 - %cursor, 290 + cursors = %cursors_display, 245 291 processed = delta_processed, 246 292 crawled = delta_crawled, 247 293 elapsed, ··· 285 331 } 286 332 }); 287 333 288 - loop { 289 - if let Err(e) = Self::crawl(crawler.clone()).await { 290 - error!(err = ?e, "fatal error, restarting in 30s"); 291 - tokio::time::sleep(Duration::from_secs(30)).await; 292 - } 334 + info!( 335 + relay_count = crawler.relays.len(), 336 + hosts = ?crawler.relays, 337 + "starting crawler" 338 + ); 339 + 340 + let mut tasks = tokio::task::JoinSet::new(); 341 + for url in crawler.relays.clone() { 342 + let crawler = crawler.clone(); 343 + let span = tracing::info_span!("crawl", %url); 344 + tasks.spawn( 345 + async move { 346 + loop { 347 + if let Err(e) = Self::crawl(crawler.clone(), &url).await { 348 + error!(err = ?e, "fatal error, restarting in 30s"); 349 + tokio::time::sleep(Duration::from_secs(30)).await; 350 + } 351 + } 352 + } 353 + .instrument(span), 354 + ); 293 355 } 356 + let _ = tasks.join_all().await; 357 + 358 + Ok(()) 294 359 } 295 360 296 - async fn crawl(crawler: Arc<Self>) -> Result<()> { 297 - let mut relay_url = crawler.relay_host.clone(); 361 + async fn crawl(crawler: Arc<Self>, relay_host: &Url) -> Result<()> { 362 + let mut relay_url = relay_host.clone(); 298 363 match relay_url.scheme() { 299 364 "wss" => relay_url 300 365 .set_scheme("https") ··· 308 373 let mut rng: SmallRng = rand::make_rng(); 309 374 let db = &crawler.state.db; 310 375 311 - let mut cursor = crawler.get_cursor().await?; 376 + let mut cursor = crawler.get_cursor(relay_host).await?; 312 377 313 378 match &cursor { 314 379 Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"), ··· 506 571 debug!(count, "fetched repos"); 507 572 crawler.crawled_count.fetch_add(count, Ordering::Relaxed); 508 573 509 - let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() { 574 + let in_flight = if filter.check_signals() && !unknown_dids.is_empty() { 510 575 // we dont need to pass any existing since we have none; we are crawling after all 511 576 crawler 512 577 .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new()) 513 578 .await? 514 579 } else { 515 - unknown_dids 580 + // no signal checking but still need dedup to avoid orphan pending entries 581 + crawler.acquire_in_flight(unknown_dids).await 516 582 }; 517 583 518 - for did in &valid_dids { 584 + for did in &in_flight.repos { 519 585 let did_key = keys::repo_key(did); 520 586 trace!(did = %did, "found new repo"); 521 587 ··· 532 598 } 533 599 batch.insert( 534 600 &db.cursors, 535 - CURSOR_KEY, 601 + crawler_cursor_key(relay_host), 536 602 rmp_serde::to_vec(&cursor) 537 603 .into_diagnostic() 538 604 .wrap_err("cant serialize cursor")?, ··· 556 622 }) 557 623 .ok(); 558 624 559 - crawler.account_new_repos(valid_dids.len()).await; 625 + drop(in_flight.guards); 626 + 627 + crawler.account_new_repos(in_flight.repos.len()).await; 560 628 561 629 if matches!(cursor, Cursor::Done(_)) { 562 630 info!("enumeration complete, sleeping 1h before next pass"); ··· 617 685 618 686 let handle = tokio::runtime::Handle::current(); 619 687 let filter = self.state.filter.load(); 620 - let valid_dids = 688 + let in_flight = 621 689 handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?; 622 690 623 691 let mut rng: SmallRng = rand::make_rng(); 624 - for did in &valid_dids { 692 + for did in &in_flight.repos { 625 693 let did_key = keys::repo_key(did); 626 694 627 695 if db.repos.contains_key(&did_key).into_diagnostic()? { ··· 635 703 636 704 batch.commit().into_diagnostic()?; 637 705 638 - if !valid_dids.is_empty() { 639 - info!(count = valid_dids.len(), "recovered from retry queue"); 640 - handle.block_on(self.account_new_repos(valid_dids.len())); 706 + drop(in_flight.guards); 707 + 708 + if !in_flight.repos.is_empty() { 709 + info!(count = in_flight.repos.len(), "recovered from retry queue"); 710 + handle.block_on(self.account_new_repos(in_flight.repos.len())); 641 711 } 642 712 643 713 // if we hit the batch cap there are more ready entries, loop back immediately ··· 812 882 813 883 async fn check_signals_batch( 814 884 &self, 815 - dids: &[Did<'static>], 885 + repos: &[Did<'static>], 816 886 filter: &Arc<crate::filter::FilterConfig>, 817 887 batch: &mut fjall::OwnedWriteBatch, 818 888 existing: &HashMap<Did<'static>, RetryState>, 819 - ) -> Result<Vec<Did<'static>>> { 889 + ) -> Result<InFlightRepos> { 820 890 let db = &self.state.db; 821 - let mut valid = Vec::new(); 891 + let in_flight = self.acquire_in_flight(repos.to_vec()).await; 892 + let mut valid = Vec::with_capacity(in_flight.repos.len()); 822 893 let mut set = tokio::task::JoinSet::new(); 823 894 824 - for did in dids { 825 - let did = did.clone(); 895 + for did in in_flight.repos { 826 896 let filter = filter.clone(); 827 897 let span = tracing::info_span!("signals", did = %did); 828 - set.spawn(self.check_repo_signals(filter, did).instrument(span)); 898 + set.spawn( 899 + self.check_repo_signals(filter, did.clone()) 900 + .instrument(span), 901 + ); 829 902 } 830 903 831 904 while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next()) ··· 839 912 let (did, result) = match res { 840 913 Ok(inner) => inner, 841 914 Err(e) => { 842 - error!(err = ?e, "signal check task failed or panicked"); 915 + error!(err = ?e, "signal check panicked"); 843 916 continue; 844 917 } 845 918 }; ··· 874 947 } 875 948 } 876 949 877 - Ok(valid) 950 + Ok(InFlightRepos { 951 + repos: valid, 952 + guards: in_flight.guards, 953 + }) 954 + } 955 + 956 + async fn acquire_in_flight(&self, dids: Vec<Did<'static>>) -> InFlightRepos { 957 + let mut filtered = Vec::with_capacity(dids.len()); 958 + let mut guards = Vec::with_capacity(dids.len()); 959 + for did in dids { 960 + if self.in_flight.insert_async(did.clone()).await.is_err() { 961 + trace!(did = %did, "repo in-flight, skipping"); 962 + continue; 963 + } 964 + guards.push(InFlightGuard { 965 + set: self.in_flight.clone(), 966 + did: did.clone(), 967 + }); 968 + filtered.push(did); 969 + } 970 + InFlightRepos { 971 + guards, 972 + repos: filtered, 973 + } 878 974 } 879 975 880 976 async fn account_new_repos(&self, count: usize) {
+8
src/db/keys.rs
··· 1 1 use jacquard_common::types::string::Did; 2 2 use smol_str::SmolStr; 3 + use url::Url; 3 4 4 5 use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 5 6 ··· 161 162 pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> { 162 163 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..]) 163 164 } 165 + 166 + pub fn crawler_cursor_key(relay_host: &Url) -> Vec<u8> { 167 + let mut key = b"crawler_cursor".to_vec(); 168 + key.push(SEP); 169 + key.extend_from_slice(relay_host.as_str().as_bytes()); 170 + key 171 + }
+13 -6
src/main.rs
··· 200 200 }); 201 201 202 202 info!("starting crawler ({:?})", state.filter.load().mode); 203 - let state_clone = state.clone(); 204 - let relay_host_clone = cfg.relay_host.clone(); 203 + let state_for_crawler = state.clone(); 204 + let relay_hosts = cfg.relays.clone(); 205 205 let crawler_max_pending = cfg.crawler_max_pending_repos; 206 206 let crawler_resume_pending = cfg.crawler_resume_pending_repos; 207 207 ··· 212 212 }; 213 213 214 214 if should_run_crawler { 215 + info!( 216 + relay_count = relay_hosts.len(), 217 + hosts = ?relay_hosts, 218 + "spawning crawler" 219 + ); 215 220 tokio::spawn(async move { 216 - // the crawler is responsible for finding new repos 217 221 let crawler = hydrant::crawler::Crawler::new( 218 - state_clone, 219 - relay_host_clone, 222 + state_for_crawler, 223 + relay_hosts, 220 224 crawler_max_pending, 221 225 crawler_resume_pending, 222 226 ); ··· 248 252 let ingestor = FirehoseIngestor::new( 249 253 state.clone(), 250 254 buffer_tx, 251 - cfg.relay_host, 255 + cfg.relays 256 + .first() 257 + .cloned() 258 + .expect("at least one relay host must be configured"), 252 259 state.filter.clone(), 253 260 matches!(cfg.verify_signatures, SignatureVerification::Full), 254 261 );
+6 -4
tests/verify_crawler.nu
··· 89 89 # check cursor persistence 90 90 print "verifying crawler cursor persistence..." 91 91 let cursor_check = try { 92 - let cursor_res = (http get $"($debug_url)/debug/get?partition=cursors&key=crawler_cursor").value 92 + # cursor key format: crawler_cursor|{scheme}://{host}:{port} 93 + let cursor_res = (http get $"($debug_url)/debug/get?partition=cursors&key=crawler_cursor|http://localhost:3008").value 93 94 print $"cursor value from debug: ($cursor_res)" 94 - 95 - if $cursor_res == "50" { 95 + 96 + # cursor should be non-empty if crawler successfully fetched repos 97 + if ($cursor_res | is-not-empty) { 96 98 print "cursor verified." 97 99 true 98 100 } else { 99 - print "cursor mismatch or missing." 101 + print "cursor missing or empty." 100 102 false 101 103 } 102 104 } catch {