very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] add jitter on throttles / backoffs

dawn b1bb208e 05d3493c

+28 -12
+25 -6
src/ingest/firehose.rs
··· 10 10 use jacquard_common::IntoStatic; 11 11 use jacquard_common::types::did::Did; 12 12 use miette::{IntoDiagnostic, Result}; 13 + use rand::RngExt; 14 + use rand::rngs::SmallRng; 13 15 use std::borrow::Cow; 14 16 use std::sync::Arc; 15 17 use std::sync::atomic::Ordering; ··· 48 50 false 49 51 } 50 52 53 + trait AddJitter: rand::Rng { 54 + fn add_jitter(&mut self, timeout: Duration) -> Duration { 55 + let timeout_secs = timeout.as_secs_f32(); 56 + let amt = timeout_secs * self.random_range(-0.12..0.12); 57 + Duration::from_secs_f32(timeout_secs + amt) 58 + } 59 + } 60 + impl<R: rand::Rng> AddJitter for R {} 61 + 51 62 pub struct FirehoseIngestor { 52 63 state: Arc<AppState>, 53 64 buffer_tx: BufferTx, ··· 92 103 let mut backoff = Duration::from_secs(0); 93 104 const MAX_BACKOFF: Duration = Duration::from_secs(60 * 60); // 1 hour 94 105 106 + let mut rng: SmallRng = rand::make_rng(); 107 + 95 108 loop { 96 109 if self.state.pds_meta.load().is_banned(host) { 97 110 break Ok(()); 98 111 } 99 112 self.enabled.wait_enabled("firehose").await; 100 - 101 - // sleep stream backoff out if we have any 102 - tokio::time::sleep(backoff).await; 103 113 104 114 // get cursor 105 115 let start_cursor = self ··· 137 147 } else { 138 148 Duration::from_secs(10) 139 149 }; 150 + let timeout = rng.add_jitter(timeout); 140 151 let fmt = humantime::format_duration(timeout); 141 152 error!(err = %e, in = %fmt, "failed to connect to firehose, retrying later"); 142 153 tokio::time::sleep(timeout).await; ··· 206 217 }; 207 218 208 219 if let Err(e) = res { 220 + // todo: investigate why this happens on test server further 221 + // also idk if this is even a good idea to do but whatever 222 + if let FirehoseError::TcpDropped = e { 223 + debug!(err = %e, "tcp connection dropped!!!"); 224 + tokio::time::sleep(rng.add_jitter(Duration::from_secs(10))).await; 225 + continue; 226 + } 209 227 if let FirehoseError::RelayError { error, message } = e { 210 228 let message = message.map_or(Cow::Borrowed("<no message>"), Cow::Owned); 211 229 error!(err = %error, "relay sent error: {message}"); ··· 217 235 if backoff.is_zero() { 218 236 backoff = Duration::from_secs(5); 219 237 } 220 - let fmt = humantime::format_duration(backoff); 221 - error!(in = %fmt, "firehose disconnected, reconnecting"); 222 - tokio::time::sleep(backoff).await; 238 + let timeout = rng.add_jitter(backoff); 239 + let fmt = humantime::format_duration(timeout); 240 + error!(in = %fmt, "firehose disconnected, reconnecting later"); 241 + tokio::time::sleep(timeout).await; 223 242 backoff = (backoff * 2).min(MAX_BACKOFF); 224 243 } 225 244 }
+3 -6
src/ingest/stream.rs
··· 44 44 Cbor(String), 45 45 #[error("stream closed: {code}: {reason}")] 46 46 StreamClosed { code: u16, reason: String }, 47 + #[error("tcp layer dropped")] 48 + TcpDropped, 47 49 } 48 50 49 51 impl From<serde_ipld_dagcbor::DecodeError<Infallible>> for FirehoseError { ··· 86 88 .next() 87 89 .await 88 90 .map(|m| m.map_err(Into::into)) 89 - .unwrap_or_else(|| { 90 - Err(FirehoseError::StreamClosed { 91 - code: 0, 92 - reason: "closed".to_owned(), 93 - }) 94 - })?; 91 + .unwrap_or(Err(FirehoseError::TcpDropped))?; 95 92 match res { 96 93 msg if msg.is_binary() => { 97 94 let bytes: Bytes = msg.into_payload().into();