very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] only reset backoff to 0 if the stream has been healthy for over a minute

dawn e1ab350c f1265e9f

+22 -21
+22 -21
src/ingest/firehose.rs
··· 157 157 158 158 self.throttle.record_success(); 159 159 info!("firehose connected"); 160 + let connected_at = tokio::time::Instant::now(); 160 161 161 162 let res = loop { 162 163 tokio::select! { ··· 189 190 } 190 191 self.handle_message(msg).await 191 192 }, 192 - Err(e) => { 193 - match e { 194 - // dont disconnect on unknown op or type 195 - FirehoseError::UnknownOp(op) => { 196 - warn!(op = %op, "unknown frame op"); 197 - continue; 198 - }, 199 - FirehoseError::UnknownType(t) => { 200 - warn!(ty = %t, "unknown frame type"); 201 - continue; 202 - }, 203 - // everything else is a hard error 204 - e => break Err(e), 205 - } 206 - } 193 + Err(e) => match e { 194 + // dont disconnect on unknown op or type 195 + FirehoseError::UnknownOp(op) => { 196 + warn!(op = %op, "unknown frame op"); 197 + continue; 198 + }, 199 + FirehoseError::UnknownType(t) => { 200 + warn!(ty = %t, "unknown frame type"); 201 + continue; 202 + }, 203 + // everything else is a hard error 204 + e => break Err(e), 205 + }, 206 + } 207 + if connected_at.elapsed() > Duration::from_secs(60) { 208 + backoff = Duration::from_secs(0); 207 209 } 208 - backoff = Duration::from_secs(0); 209 210 } 210 211 _ = self.enabled.changed() => { 211 212 if !*self.enabled.borrow() { ··· 217 218 }; 218 219 219 220 if let Err(e) = res { 220 - // todo: investigate why this happens on test server further 221 - // also idk if this is even a good idea to do but whatever 222 - if let FirehoseError::TcpDropped = e { 223 - debug!(err = %e, "tcp connection dropped!!!"); 224 - tokio::time::sleep(rng.add_jitter(Duration::from_secs(10))).await; 221 + if let FirehoseError::StreamClosed { code, reason } = &e 222 + && *code == 1001 223 + { 224 + debug!(reason = %reason, "host gone away"); 225 + tokio::time::sleep(Duration::from_secs(1)).await; 225 226 continue; 226 227 } 227 228 if let FirehoseError::RelayError { error, message } = e {