very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] jitter connection startup when we are spawning persisted / configged firehoses

dawn 0aeeea43 147baa33

+17 -3
+15 -1
src/control/firehose.rs
··· 1 1 use std::sync::Arc; 2 2 use std::sync::atomic::{AtomicUsize, Ordering}; 3 + use std::time::Duration; 3 4 4 5 use miette::{IntoDiagnostic, Result}; 6 + use rand::RngExt; 5 7 use tokio_util::sync::CancellationToken; 6 8 use tracing::{error, info}; 7 9 use url::Url; ··· 67 69 &self, 68 70 source: &FirehoseSource, 69 71 shared: &FirehoseShared, 72 + delay_startup: bool, 70 73 ) -> Result<()> { 71 74 use std::sync::atomic::AtomicI64; 72 75 let state = &self.state; ··· 100 103 let tasks = self.tasks.clone(); 101 104 let token = cancel.clone(); 102 105 async move { 106 + // jitter connection start so we dont cause thundering herd problems 107 + if delay_startup { 108 + let jitter_ms = rand::rng().random_range(0u64..2000); 109 + tokio::select! { 110 + _ = tokio::time::sleep(Duration::from_millis(jitter_ms)) => {} 111 + _ = token.cancelled() => { 112 + info!(relay = %relay_url, "firehose ingestor cancelled"); 113 + return; 114 + } 115 + } 116 + } 103 117 tokio::select! { 104 118 res = ingestor.run() => { 105 119 // only remove our own entry because an upsert could replace us ··· 185 199 186 200 let _ = self.persisted.insert_async(url.clone()).await; 187 201 188 - self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared) 202 + self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, false) 189 203 .await?; 190 204 191 205 Ok(())
+2 -2
src/control/mod.rs
··· 409 409 ); 410 410 for source in &relay_hosts { 411 411 firehose 412 - .spawn_firehose_ingestor(source, fire_shared) 412 + .spawn_firehose_ingestor(source, fire_shared, true) 413 413 .await?; 414 414 } 415 415 } ··· 427 427 continue; 428 428 } 429 429 firehose 430 - .spawn_firehose_ingestor(source, fire_shared) 430 + .spawn_firehose_ingestor(source, fire_shared, true) 431 431 .await?; 432 432 } 433 433