very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] periodically retry offline hosts (every 30 minutes)

dawn d6d96734 44c6d801

+84 -6
+27
src/config.rs
··· 380 380 /// set via `HYDRANT_NEW_HOST_LIMIT`. 381 381 pub new_host_limit: Option<u64>, 382 382 383 + /// how often offline firehose sources are automatically retried. 384 + /// set via `HYDRANT_OFFLINE_HOST_RETRY_INTERVAL` (humantime duration, e.g. `30min`). 385 + /// set to `none` to disable automatic retries. 386 + pub offline_host_retry_interval: Option<Duration>, 387 + 383 388 /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup. 384 389 /// 385 390 /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes ··· 501 506 enable_backlinks: false, 502 507 only_index_links: false, 503 508 new_host_limit: Some(50), 509 + offline_host_retry_interval: Some(Duration::from_secs(30 * 60)), 504 510 tier_rules: vec![], 505 511 tier_policy: { 506 512 let mut tiers = HashMap::new(); ··· 681 687 .ok() 682 688 .and_then(|s| s.parse().ok()); 683 689 690 + let offline_retry_interval: Option<Duration> = 691 + match std::env::var("HYDRANT_OFFLINE_HOST_RETRY_INTERVAL") 692 + .ok() 693 + .as_deref() 694 + { 695 + None => defaults.offline_host_retry_interval, 696 + Some("none") => None, 697 + Some(s) => humantime::parse_duration(s) 698 + .ok() 699 + .or(defaults.offline_host_retry_interval), 700 + }; 701 + 684 702 // start with built-in tier definitions, then layer in any env-defined overrides. 685 703 // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,... 686 704 let mut tiers = defaults.tier_policy.tiers.clone(); ··· 795 813 enable_backlinks, 796 814 only_index_links, 797 815 new_host_limit: max_pds_added_per_day, 816 + offline_host_retry_interval: offline_retry_interval, 798 817 tier_policy, 799 818 tier_rules, 800 819 cache_size, ··· 927 946 } 928 947 if let Some(limit) = self.new_host_limit { 929 948 config_line!(f, "max pds/day", limit)?; 949 + } 950 + match self.offline_host_retry_interval { 951 + Some(d) => config_line!( 952 + f, 953 + "offline retry interval", 954 + format_args!("{}sec", d.as_secs()) 955 + )?, 956 + None => config_line!(f, "offline retry interval", "disabled")?, 930 957 } 931 958 Ok(()) 932 959 }
+19 -4
src/control/firehose.rs
··· 46 46 pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>, 47 47 /// per-relay running tasks, keyed by url. 48 48 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>, 49 - /// set of known source urls, includes API-added (db-persisted) and static config sources. 50 - pub(super) known_sources: Arc<scc::HashSet<Url>>, 49 + /// known source urls → is_pds flag; includes API-added (db-persisted) and static config sources. 50 + pub(super) known_sources: Arc<scc::HashMap<Url, bool>>, 51 51 /// ids assigned to spawned tasks 52 52 next_task_id: Arc<AtomicUsize>, 53 53 } ··· 58 58 state, 59 59 shared: Arc::new(std::sync::OnceLock::new()), 60 60 tasks: Arc::new(scc::HashMap::new()), 61 - known_sources: Arc::new(scc::HashSet::new()), 61 + known_sources: Arc::new(scc::HashMap::new()), 62 62 next_task_id: Arc::new(AtomicUsize::new(0)), 63 63 } 64 64 } ··· 205 205 .await 206 206 .into_diagnostic()??; 207 207 208 - let _ = self.known_sources.insert_async(url.clone()).await; 208 + let _ = self.known_sources.insert_async(url.clone(), is_pds).await; 209 209 210 210 // reset failure state so the fresh task gets a clean slate. 211 211 // if the previous task exited after max failures, the failure counter ··· 244 244 } 245 245 246 246 Ok(self.tasks.remove_async(url).await.is_some()) 247 + } 248 + 249 + /// restart an offline firehose source without touching the database or daily limits. 250 + pub(super) async fn restart_source(&self, url: Url, is_pds: bool) -> Result<()> { 251 + let shared = self 252 + .shared 253 + .get() 254 + .ok_or_else(|| miette::miette!("firehose worker not started"))?; 255 + 256 + // clear the failure counter so the new task isn't immediately terminated 257 + let throttle = self.state.throttler.get_handle(&url).await; 258 + throttle.record_success(); 259 + 260 + self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, true) 261 + .await 247 262 } 248 263 249 264 /// reset the stored firehose cursor for a given URL.
+38 -2
src/control/mod.rs
··· 55 55 use stream::event_stream_thread; 56 56 #[cfg(feature = "relay")] 57 57 use stream::relay_stream_thread; 58 + use url::Url; 58 59 59 60 #[derive(Debug, Clone)] 60 61 /// infromation about a host hydrant is consuming from. ··· 416 417 for source in &relay_hosts { 417 418 let _ = firehose 418 419 .known_sources 419 - .insert_async(source.url.clone()) 420 + .insert_async(source.url.clone(), source.is_pds) 420 421 .await; 421 422 firehose 422 423 .spawn_firehose_ingestor(source, fire_shared, true) ··· 434 435 for source in &persisted_sources { 435 436 let _ = firehose 436 437 .known_sources 437 - .insert_async(source.url.clone()) 438 + .insert_async(source.url.clone(), source.is_pds) 438 439 .await; 439 440 if firehose.tasks.contains_async(&source.url).await { 440 441 continue; ··· 451 452 let state = state.clone(); 452 453 tokio::spawn(async move { 453 454 seed::seed_from_list_hosts(&seed_urls, &firehose, &state).await; 455 + }); 456 + } 457 + 458 + // 10d. periodic retry of offline firehose sources 459 + if let Some(retry_interval) = config.offline_host_retry_interval { 460 + tokio::spawn({ 461 + let firehose = firehose.clone(); 462 + async move { 463 + loop { 464 + tokio::time::sleep(retry_interval).await; 465 + 466 + let mut to_restart: Vec<(Url, bool)> = Vec::new(); 467 + { 468 + let meta = firehose.state.pds_meta.load(); 469 + firehose 470 + .known_sources 471 + .iter_async(|url, &is_pds| { 472 + if firehose.tasks.contains_sync(url) { 473 + return true; 474 + } 475 + let host = url.host_str().unwrap_or(url.as_str()); 476 + if meta.is_banned(host) { 477 + return true; 478 + } 479 + to_restart.push((url.clone(), is_pds)); 480 + true 481 + }) 482 + .await; 483 + } 484 + 485 + for (url, is_pds) in to_restart { 486 + let _ = firehose.restart_source(url, is_pds).await; 487 + } 488 + } 489 + } 454 490 }); 455 491 } 456 492