very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] throttle hosts on certain errors, improve backoff

dawn 30f53f5d 408a6236

+157 -58
+1
Cargo.lock
··· 1481 1481 "glob", 1482 1482 "hex", 1483 1483 "humantime", 1484 + "hyper", 1484 1485 "iroh-car", 1485 1486 "jacquard-api", 1486 1487 "jacquard-common",
+1
Cargo.toml
··· 59 59 multibase = "0.9.2" 60 60 sha2 = "0.10.9" 61 61 parking_lot = "0.12.5" 62 + hyper = "1.8.1" 62 63 63 64 [dev-dependencies] 64 65 tempfile = "3.26.0"
+3 -1
src/control/mod.rs
··· 726 726 pub async fn stats(&self) -> Result<StatsResponse> { 727 727 let state = self.state.clone(); 728 728 729 + // todo: update stats, only return necessary info on relay vs indexer modes 730 + // (and ephemeral indexer) 729 731 let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all( 730 732 [ 731 733 "repos", 732 734 "pending", 733 - "resync", 734 735 "records", 735 736 "blocks", 737 + "resync", 736 738 "error_ratelimited", 737 739 "error_transport", 738 740 "error_generic",
+16 -27
src/crawler/list_repos.rs
··· 3 3 use crate::state::AppState; 4 4 use crate::util::throttle::{OrFailure, ThrottleHandle, Throttler}; 5 5 use crate::util::{ 6 - ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt, parse_retry_after, 6 + ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt, is_tls_cert_error, 7 + parse_retry_after, 7 8 }; 8 9 use chrono::{DateTime, TimeDelta, Utc}; 9 10 use fjall::OwnedWriteBatch; ··· 28 29 use super::worker::{CrawlerBatch, CursorUpdate}; 29 30 use super::{CrawlerStats, InFlight, InFlightGuard, base_url}; 30 31 31 - pub(super) const MAX_RETRY_ATTEMPTS: u32 = 5; 32 32 const MAX_RETRY_BATCH: usize = 1000; 33 33 const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 34 + 35 + pub(super) const MAX_RETRY_ATTEMPTS: u32 = 5; 34 36 35 37 #[derive(Debug, Serialize, Deserialize)] 36 38 pub(super) struct RetryState { ··· 74 76 } 75 77 } 76 78 77 - pub(super) enum CrawlCheckResult { 78 - Signal, 79 - NoSignal, 80 - Retry(RetryState), 81 - } 82 - 83 - impl From<RetryState> for CrawlCheckResult { 84 - fn from(value: RetryState) -> Self { 85 - Self::Retry(value) 86 - } 87 - } 88 - 89 79 trait ToRetryState { 90 80 fn to_retry_state(&self) -> RetryState; 91 81 } ··· 102 92 } 103 93 } 104 94 95 + pub(super) enum CrawlCheckResult { 96 + Signal, 97 + NoSignal, 98 + Retry(RetryState), 99 + } 100 + 101 + impl From<RetryState> for CrawlCheckResult { 102 + fn from(value: RetryState) -> Self { 103 + Self::Retry(value) 104 + } 105 + } 106 + 105 107 fn is_throttle_worthy(e: &reqwest::Error) -> bool { 106 108 use std::error::Error; 107 109 ··· 129 131 | crate::util::SITE_FROZEN 130 132 ) 131 133 }) 132 - } 133 - 134 - fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 135 - let Some(inner) = io_err.get_ref() else { 136 - return false; 137 - }; 138 - if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 139 - return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 140 - } 141 - if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 142 - return is_tls_cert_error(nested_io); 143 - } 144 - false 145 134 } 146 135 147 136 /// shared describeRepo signal-checking logic used by both relay and retry producers.
+96 -26
src/ingest/firehose.rs
··· 2 2 use crate::ingest::stream::{FirehoseError, FirehoseStream, SubscribeReposMessage, decode_frame}; 3 3 use crate::ingest::{BufferTx, IngestMessage}; 4 4 use crate::state::AppState; 5 - use crate::util::WatchEnabledExt; 6 5 use crate::util::throttle::ThrottleHandle; 6 + use crate::util::{WatchEnabledExt, is_timeout, is_tls_cert_error}; 7 + use hyper::StatusCode; 7 8 use jacquard_common::IntoStatic; 8 9 use jacquard_common::types::did::Did; 9 10 use miette::{IntoDiagnostic, Result}; 11 + use std::borrow::Cow; 10 12 use std::sync::Arc; 11 13 use std::sync::atomic::Ordering; 12 14 use std::time::Duration; 13 15 use tokio::sync::watch; 16 + use tokio_tungstenite::tungstenite::{Error as WsError, error::TlsError as WsTlsError}; 14 17 use tracing::{Span, debug, error, info, trace, warn}; 15 18 use url::Url; 16 19 20 + fn is_throttle_worthy(e: &WsError) -> bool { 21 + use std::error::Error; 22 + 23 + if is_timeout(e) { 24 + return true; 25 + } 26 + 27 + if let WsError::Tls(e) = e { 28 + match e { 29 + WsTlsError::Rustls(e) => { 30 + return matches!(e.as_ref(), rustls::Error::InvalidCertificate(_)); 31 + } 32 + WsTlsError::InvalidDnsName => {} 33 + _ => {} 34 + } 35 + } else { 36 + let mut src = e.source(); 37 + while let Some(s) = src { 38 + if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 39 + if is_tls_cert_error(io_err) { 40 + return true; 41 + } 42 + } 43 + src = s.source(); 44 + } 45 + } 46 + 47 + if let WsError::Http(resp) = e { 48 + return matches!( 49 + resp.status(), 50 + StatusCode::BAD_GATEWAY 51 + | StatusCode::SERVICE_UNAVAILABLE 52 + | StatusCode::GATEWAY_TIMEOUT 53 + | crate::util::CONNECTION_TIMEOUT 54 + | crate::util::SITE_FROZEN 55 + ); 56 + } 57 + 58 + matches!( 59 + e, 60 + WsError::AttackAttempt | WsError::Io(_) | WsError::Protocol(_) 61 + ) 62 + } 63 + 17 64 pub struct FirehoseIngestor { 18 65 state: Arc<AppState>, 19 66 buffer_tx: BufferTx, ··· 54 101 let host = self.relay_host.host_str().unwrap_or("").to_string(); 55 102 let count_key = crate::db::keys::pds_account_count_key(&host); 56 103 57 - let mut backoff = Duration::from_secs(5); 58 - const MAX_BACKOFF: Duration = Duration::from_secs(60 * 15); // 15 mins 104 + // this is not for connection throttling (thats handled by ThrottleHandle) 105 + // its for stream errors (cbor decode etc) 106 + let mut backoff = Duration::from_secs(0); 107 + const MAX_BACKOFF: Duration = Duration::from_secs(60 * 60); // 1 ohur 59 108 60 109 loop { 61 110 self.enabled.wait_enabled("firehose").await; 111 + 112 + tokio::time::sleep(backoff).await; 62 113 63 114 let start_cursor = self 64 115 .state ··· 79 130 { 80 131 Ok(s) => s, 81 132 Err(e) => { 82 - error!(err = %e, backoff_secs = backoff.as_secs(), "failed to connect to firehose, retrying"); 83 - tokio::time::sleep(backoff).await; 84 - backoff = (backoff * 2).min(MAX_BACKOFF); 133 + // todo: figure out how to pass timeout to tungesteite i guess 134 + // if let FirehoseError::WebSocket(err) = &e 135 + // && is_timeout(&err) 136 + // { 137 + // if !self.throttle.record_timeout() { 138 + // continue; 139 + // } 140 + // } 141 + let timeout = if let FirehoseError::WebSocket(e) = &e 142 + && is_throttle_worthy(e) 143 + { 144 + self.throttle.record_failure(); 145 + let until = self.throttle.throttled_until(); 146 + Duration::from_secs((until - chrono::Utc::now().timestamp()) as u64) 147 + } else { 148 + Duration::from_secs(10) 149 + }; 150 + let fmt = humantime::format_duration(timeout); 151 + error!(err = %e, in = %fmt, "failed to connect to firehose, retrying later"); 152 + tokio::time::sleep(timeout).await; 85 153 continue; 86 154 } 87 155 }; 88 156 157 + self.throttle.record_success(); 89 158 info!("firehose connected"); 90 - backoff = Duration::from_secs(5); 91 159 92 - let disconnected_by_error = loop { 160 + let res = loop { 93 161 tokio::select! { 94 162 msg = stream.next() => { 95 - let Some(bytes_res) = msg else { break true; }; 163 + let Some(bytes_res) = msg else { break Err(FirehoseError::EmptyFrame); }; 96 164 let bytes = match bytes_res { 97 165 Ok(b) => b, 98 - Err(e) => { 99 - error!(err = %e, "firehose stream error"); 100 - break true; 101 - } 166 + Err(e) => break Err(e), 102 167 }; 103 168 match decode_frame(&bytes) { 104 169 Ok(msg) => { ··· 110 175 _ = self.enabled.changed() => { 111 176 if !*self.enabled.borrow() { 112 177 info!("firehose disabled, disconnecting"); 113 - break false; 178 + break Ok(()); 114 179 } 115 180 } 116 181 } ··· 129 194 continue; 130 195 }, 131 196 // everything else is a hard error 132 - FirehoseError::RelayError { error, message } => { 133 - let message = message.unwrap_or_else(|| "<no message>".to_owned()); 134 - error!(err = %error, "relay sent error: {message}"); 135 - }, 136 - e => error!(err = %e, "firehose stream error"), 197 + e => break Err(e), 137 198 } 138 - break true; 139 199 } 140 200 } 201 + backoff = Duration::from_secs(0); 141 202 } 142 203 _ = self.enabled.changed() => { 143 204 if !*self.enabled.borrow() { 144 205 info!("firehose disabled, disconnecting"); 145 - break false; 206 + break Ok(()); 146 207 } 147 208 } 148 209 } 149 210 }; 150 211 151 - if disconnected_by_error { 152 - error!( 153 - backoff_secs = backoff.as_secs(), 154 - "firehose disconnected, reconnecting" 155 - ); 212 + if let Err(e) = res { 213 + if let FirehoseError::RelayError { error, message } = e { 214 + let message = message.map_or(Cow::Borrowed("<no message>"), Cow::Owned); 215 + error!(err = %error, "relay sent error: {message}"); 216 + } else if backoff.as_secs() < 60 { 217 + // stop logging errors after a minute of retries 218 + // as to not spam logs, unlikely for error to change atp 219 + error!(err = %e, "firehose stream error"); 220 + } 221 + if backoff.is_zero() { 222 + backoff = Duration::from_secs(5); 223 + } 224 + let fmt = humantime::format_duration(backoff); 225 + error!(in = %fmt, "firehose disconnected, reconnecting"); 156 226 tokio::time::sleep(backoff).await; 157 227 backoff = (backoff * 2).min(MAX_BACKOFF); 158 228 }
+35
src/util/mod.rs
··· 12 12 13 13 pub mod throttle; 14 14 15 + #[allow(dead_code)] 16 + /// checks if the error contains a hyper / std io timeout error 17 + pub fn is_timeout(err: &dyn std::error::Error) -> bool { 18 + let mut source = err.source(); 19 + 20 + while let Some(err) = source { 21 + if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() { 22 + if hyper_err.is_timeout() { 23 + return true; 24 + } 25 + } 26 + if let Some(io) = err.downcast_ref::<std::io::Error>() { 27 + if io.kind() == std::io::ErrorKind::TimedOut { 28 + return true; 29 + } 30 + } 31 + source = err.source(); 32 + } 33 + 34 + false 35 + } 36 + 37 + pub fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 38 + let Some(inner) = io_err.get_ref() else { 39 + return false; 40 + }; 41 + if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 42 + return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 43 + } 44 + if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 45 + return is_tls_cert_error(nested_io); 46 + } 47 + false 48 + } 49 + 15 50 /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 16 51 pub enum RetryOutcome<E> { 17 52 /// ratelimited after exhausting all retries
+5 -4
src/util/throttle.rs
··· 114 114 /// called on hard failures (timeout, TLS error, bad gateway, etc). 115 115 /// returns throttle duration in minutes if this is a *new* throttle, 116 116 /// and notifies all in-flight tasks to cancel immediately. 117 - pub fn record_failure(&self) -> Option<i64> { 117 + pub fn record_failure(&self) -> Option<u64> { 118 118 if self.is_throttled() { 119 119 return None; 120 120 } ··· 126 126 + 1; 127 127 128 128 // 30 min, 60 min, 120 min, ... capped at ~512 hours 129 - let base_minutes = 30i64; 129 + let base_minutes = 30u64; 130 130 let exponent = (failures as u32).saturating_sub(1); 131 - let minutes = base_minutes * 2i64.pow(exponent.min(10)); 132 - let until = chrono::Utc::now().timestamp() + minutes * 60; 131 + let minutes = base_minutes * 2u64.pow(exponent.min(10)); 132 + let until = chrono::Utc::now().timestamp() + (minutes * 60) as i64; 133 133 134 134 self.state.throttled_until.store(until, Ordering::Release); 135 135 self.state.failure_notify.notify_waiters(); ··· 143 143 Duration::from_secs(3 * 2u64.pow(n.min(2) as u32)) 144 144 } 145 145 146 + /// returns whether the timeout attempts are exhausted 146 147 pub fn record_timeout(&self) -> bool { 147 148 let timeouts = self 148 149 .state