very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] make throttling less strict on firehose errors to better match ref behaviour

dawn 3f22754e fd64a728

+76 -109
+76 -109
src/ingest/firehose.rs
··· 3 3 use crate::ingest::{BufferTx, IngestMessage}; 4 4 use crate::pds_meta::HostStatus; 5 5 use crate::state::AppState; 6 + use crate::util::WatchEnabledExt; 6 7 use crate::util::throttle::ThrottleHandle; 7 - use crate::util::{ 8 - WatchEnabledExt, is_io_error_their_fault, is_timeout, is_tls_cert_error, 9 - is_tls_error_their_fault, 10 - }; 11 8 use jacquard_common::IntoStatic; 12 9 use jacquard_common::types::did::Did; 13 10 use miette::{IntoDiagnostic, Result}; ··· 18 15 use std::sync::atomic::Ordering; 19 16 use std::time::Duration; 20 17 use tokio::sync::watch; 21 - use tokio_websockets::Error as WsError; 22 18 use tracing::{Span, debug, error, info, trace, warn}; 23 19 use url::Url; 24 20 25 - fn is_throttle_worthy(e: &WsError) -> bool { 26 - use std::error::Error; 27 - 28 - if is_timeout(e) { 29 - return true; 30 - } 31 - 32 - match e { 33 - WsError::Rustls(e) if is_tls_error_their_fault(e) => return true, 34 - WsError::Io(e) if is_io_error_their_fault(e) || is_tls_cert_error(e) => return true, 35 - WsError::CannotResolveHost => return true, 36 - // we treat every upgrade error as error because uh too bad so sad, im not doing anything wrong 37 - WsError::Protocol(_) | WsError::PayloadTooLong { .. } | WsError::Upgrade(_) => return true, 38 - _ => {} 39 - } 40 - 41 - let mut src = e.source(); 42 - while let Some(s) = src { 43 - if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 44 - if is_tls_cert_error(io_err) { 45 - return true; 46 - } 47 - } 48 - src = s.source(); 49 - } 50 - 51 - false 52 - } 21 + // these match ref relay 22 + const MAX_FAILURES: usize = 15; 23 + const MAX_BACKOFF: Duration = Duration::from_secs(60); 53 24 54 25 trait AddJitter: rand::Rng { 55 26 fn add_jitter(&mut self, timeout: Duration) -> Duration { ··· 94 65 } 95 66 } 96 67 97 - #[tracing::instrument(skip(self), fields(relay = %self.relay_host))] 68 + #[tracing::instrument(skip(self), fields(host = %self.relay_host))] 98 69 pub async fn run(mut self) -> Result<()> { 99 70 let host = self.relay_host.host_str().unwrap_or(""); 100 71 let count_key = crate::db::keys::pds_account_count_key(&host); 101 - 102 - // this is not for connection throttling (thats handled by ThrottleHandle) 103 - // its for stream errors (cbor decode etc) 104 - let mut backoff = Duration::from_secs(0); 105 - const MAX_BACKOFF: Duration = Duration::from_secs(60 * 60); // 1 hour 106 72 107 73 let mut rng: SmallRng = rand::make_rng(); 108 74 ··· 131 97 { 132 98 Ok(s) => s, 133 99 Err(e) => { 134 - // todo: figure out how to pass timeout to tungesteite i guess 135 - // if let FirehoseError::WebSocket(err) = &e 136 - // && is_timeout(&err) 137 - // { 138 - // if !self.throttle.record_timeout() { 139 - // continue; 140 - // } 141 - // } 142 - let do_throttle = matches!(&e, FirehoseError::WebSocket(e) if is_throttle_worthy(e)) 143 - || matches!(&e, FirehoseError::EmptyFrame); 144 - let timeout = if do_throttle { 145 - self.throttle.record_failure(); 146 - if self.is_pds && self.throttle.consecutive_failures() >= 4 { 147 - if let Err(e) = self.set_host_status(HostStatus::Offline) { 148 - error!(err = %e, "failed to update host status to offline"); 149 - } 150 - } 151 - let until = self.throttle.throttled_until(); 152 - Duration::from_secs( 153 - 0.max((until - chrono::Utc::now().timestamp()) as i64) as u64 154 - ) 155 - } else { 156 - Duration::from_secs(10) 100 + let Some(secs) = self.on_failure().await else { 101 + break Ok(()); 157 102 }; 158 - let timeout = rng.add_jitter(timeout); 103 + let timeout = rng.add_jitter(Duration::from_secs(secs).min(MAX_BACKOFF)); 159 104 let fmt = humantime::format_duration(timeout); 160 105 error!(err = %e, in = %fmt, "failed to connect to firehose, retrying later"); 161 106 tokio::time::sleep(timeout).await; ··· 163 108 } 164 109 }; 165 110 166 - self.throttle.record_success(); 167 111 info!("firehose connected"); 168 112 let mut marked_active = false; 169 113 let active_sleep_secs = if cfg!(debug_assertions) { 1 } else { 60 }; ··· 202 146 self.handle_message(msg).await; 203 147 }, 204 148 Err(e) => match e { 205 - // dont disconnect on unknown op or type 149 + // don't disconnect on unknown op or type 206 150 FirehoseError::UnknownOp(op) => { 207 151 warn!(op = %op, "unknown frame op"); 208 152 continue; ··· 218 162 } 219 163 _ = &mut active_sleep, if !marked_active => { 220 164 marked_active = true; 221 - backoff = Duration::from_secs(0); 165 + // only reset failure state once the stream has been healthy for 166 + // a full window — prevents hosts that connect but immediately 167 + // send garbage from resetting their backoff on every attempt 168 + self.throttle.record_success(); 222 169 if self.is_pds { 223 170 let (current_status, tier) = { 224 171 let meta = self.state.pds_meta.load(); 225 172 (meta.status(host), meta.tier_for(host, &self.state.rate_tiers)) 226 173 }; 227 - if current_status != HostStatus::Banned { 228 - let count = self.state.db.get_count_sync(&count_key); 229 - let new_status = tier.account_limit.is_some_and(|l| count >= l) 230 - .then_some(HostStatus::Throttled).unwrap_or(HostStatus::Active); 174 + if current_status == HostStatus::Banned { 175 + break Ok(()); 176 + } 177 + let count = self.state.db.get_count_sync(&count_key); 178 + let new_status = tier.account_limit.is_some_and(|l| count >= l) 179 + .then_some(HostStatus::Throttled).unwrap_or(HostStatus::Active); 231 180 232 - if current_status != new_status { 233 - if let Err(e) = self.set_host_status(new_status) { 234 - error!(err = %e, "failed to update host status"); 235 - } 181 + if current_status != new_status { 182 + if let Err(e) = self.set_host_status(new_status) { 183 + error!(err = %e, "failed to update host status"); 236 184 } 237 185 } 238 186 } ··· 246 194 } 247 195 }; 248 196 249 - if let Err(e) = res { 250 - match &e { 251 - FirehoseError::StreamClosed { code: 1001, reason } => { 252 - debug!(reason = %reason, "host gone away"); 253 - tokio::time::sleep(Duration::from_secs(1)).await; 254 - continue; 255 - } 256 - FirehoseError::FutureCursor => { 257 - if self.is_pds 258 - && let Err(e) = self.set_host_status(HostStatus::Idle) 259 - { 260 - error!(err = %e, "failed to update host status to idle"); 261 - } 262 - debug!("outdated cursor, backing off"); 263 - tokio::time::sleep(Duration::from_secs(60)).await; 264 - continue; 265 - } 266 - FirehoseError::RelayError { error, message } => { 267 - let message = message 268 - .as_deref() 269 - .map_or(Cow::Borrowed("<no message>"), Cow::Borrowed); 270 - error!(err = %error, "relay sent error: {message}"); 197 + match res { 198 + Ok(()) => {} 199 + Err(FirehoseError::StreamClosed { code: 1001, reason }) => { 200 + debug!(reason = %reason, "host gone away"); 201 + tokio::time::sleep(Duration::from_secs(1)).await; 202 + } 203 + Err(FirehoseError::FutureCursor) => { 204 + if self.is_pds 205 + && let Err(e) = self.set_host_status(HostStatus::Idle) 206 + { 207 + error!(err = %e, "failed to update host status to idle"); 271 208 } 272 - _ if backoff.as_secs() < 60 => { 273 - // stop logging errors after a minute of retries 274 - // as to not spam logs, unlikely for error to change atp 275 - error!(err = %e, "firehose stream error"); 276 - } 277 - _ => {} 209 + debug!("outdated cursor, backing off"); 210 + tokio::time::sleep(Duration::from_secs(60)).await; 278 211 } 279 - if backoff.is_zero() { 280 - backoff = Duration::from_secs(5); 212 + Err(FirehoseError::RelayError { error, message }) => { 213 + let message = message 214 + .as_deref() 215 + .map_or(Cow::Borrowed("<no message>"), Cow::Borrowed); 216 + error!(err = %error, "relay sent error: {message}"); 217 + let Some(secs) = self.on_failure().await else { 218 + break Ok(()); 219 + }; 220 + let timeout = rng.add_jitter(Duration::from_secs(secs).min(MAX_BACKOFF)); 221 + let fmt = humantime::format_duration(timeout); 222 + error!(in = %fmt, "firehose disconnected, reconnecting later"); 223 + tokio::time::sleep(timeout).await; 281 224 } 282 - let timeout = rng.add_jitter(backoff); 283 - let fmt = humantime::format_duration(timeout); 284 - error!(in = %fmt, "firehose disconnected, reconnecting later"); 285 - tokio::time::sleep(timeout).await; 286 - backoff = (backoff * 2).min(MAX_BACKOFF); 225 + Err(e) => { 226 + let Some(secs) = self.on_failure().await else { 227 + break Ok(()); 228 + }; 229 + let timeout = rng.add_jitter(Duration::from_secs(secs).min(MAX_BACKOFF)); 230 + let fmt = humantime::format_duration(timeout); 231 + error!(err = %e, in = %fmt, "firehose stream error, reconnecting later"); 232 + tokio::time::sleep(timeout).await; 233 + } 287 234 } 288 235 } 236 + } 237 + 238 + /// record a failure and return the backoff duration in seconds, 239 + /// or `None` if the failure threshold was reached and the subscriber should stop 240 + async fn on_failure(&self) -> Option<u64> { 241 + let secs = self.throttle.record_failure().unwrap_or_else(|| { 242 + let until = self.throttle.throttled_until(); 243 + 0.max(until - chrono::Utc::now().timestamp()) as u64 244 + }); 245 + let failures = self.throttle.consecutive_failures(); 246 + if failures >= MAX_FAILURES { 247 + warn!(failures, "too many consecutive failures, giving up on host"); 248 + if self.is_pds { 249 + if let Err(e) = self.set_host_status(HostStatus::Offline) { 250 + error!(err = %e, "failed to update host status to offline"); 251 + } 252 + } 253 + return None; 254 + } 255 + Some(secs) 289 256 } 290 257 291 258 fn set_host_status(&self, status: HostStatus) -> Result<()> {