lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

firehose: don't disconnect on app-level decode err

phil f98d4a24 04b78493

+42 -39
+42 -39
src/sync/firehose/mod.rs
··· 16 16 //! 17 17 //! Reconnection is handled here with exponential backoff (1 s … 64 s). A 18 18 //! graceful server close (`StreamErrorKind::Closed`) skips the backoff and 19 - //! reconnects immediately. The backoff counter resets to 1 s after any 20 - //! successfully processed event. 19 + //! reconnects immediately. Application-level decode errors (`Decode`, 20 + //! `WrongMessageFormat`) leave the WebSocket connection alive; those are 21 + //! logged and counted but do not trigger a reconnect. The backoff counter 22 + //! resets to 1 s after any successfully processed event. 21 23 22 24 mod account_event; 23 25 pub(crate) mod commit_event; ··· 85 87 // and the watermark doesn't regress. 86 88 let mut dispatcher = 87 89 CommitDispatcher::new(self.resolver.clone(), self.db.clone(), self.max_workers); 88 - // When Some, use this cursor on the next reconnect instead of loading 89 - // from storage — used to step past an un-decodable frame one position 90 - // at a time. Consumed by `take()` each iteration; set again only on a 91 - // fresh Decode error. 92 - let mut skip_cursor: Option<u64> = None; 93 90 let mut last_seq: i64 = 0; 94 91 let mut cursor_tick = Instant::now(); 95 92 ··· 98 95 return Ok(()); 99 96 } 100 97 101 - // On a decode error we skip forward from wherever we connected 102 - // last; otherwise reload from storage so we pick up the latest 103 - // persisted watermark. 104 - let connect_cursor: Option<u64> = if let Some(skip) = skip_cursor.take() { 105 - Some(skip) 106 - } else { 98 + let connect_cursor: Option<u64> = { 107 99 let db = self.db.clone(); 108 100 let host = self.host.clone(); 109 101 tokio::task::spawn_blocking(move || storage::firehose_cursor::get(&db, &host)) ··· 182 174 continue 'reconnect; 183 175 } 184 176 Event::Stream(Some(Err(e))) => { 185 - let delay = match e.kind() { 186 - StreamErrorKind::Closed => 0, 187 - StreamErrorKind::Decode => { 188 - // Step past the un-decodable frame. We don't 189 - // know its seq; use whichever high-water mark 190 - // is furthest ahead — last_seq if we processed 191 - // events in this session, connect_cursor if the 192 - // error hit before any good events arrived. 193 - let hwm = connect_cursor.unwrap_or(0).max(last_seq as u64); 194 - skip_cursor = Some(hwm + 1); 195 - 0 177 + match e.kind() { 178 + // Application-level decode failure on a healthy 179 + // WebSocket connection: jacquard's filter_map stream 180 + // continues yielding after this error, so just log, 181 + // count, and continue reading. 182 + StreamErrorKind::Decode | StreamErrorKind::WrongMessageFormat => { 183 + metrics::counter!("lightrail_firehose_decode_errors_total") 184 + .increment(1); 185 + warn!( 186 + error = %e, 187 + host = %self.host, 188 + "firehose message decode error; skipping frame", 189 + ); 190 + // No reconnect — the WebSocket connection is 191 + // still alive. 192 + } 193 + // Graceful server close: reconnect immediately. 194 + StreamErrorKind::Closed => { 195 + info!(host = %self.host, "firehose stream closed; reconnecting"); 196 + continue 'reconnect; 196 197 } 197 - _ => backoff_secs, 198 - }; 199 - warn!( 200 - error = %e, 201 - host = %self.host, 202 - delay_secs = delay, 203 - skip_cursor, 204 - "firehose stream error; will reconnect", 205 - ); 206 - if delay > 0 { 207 - tokio::select! { 208 - _ = token.cancelled() => return Ok(()), 209 - _ = tokio::time::sleep(Duration::from_secs(delay)) => {} 198 + // Transport / protocol error: the connection is 199 + // broken; reconnect with backoff. 200 + _ => { 201 + warn!( 202 + error = %e, 203 + host = %self.host, 204 + backoff_secs, 205 + "firehose stream error; will reconnect", 206 + ); 207 + tokio::select! { 208 + _ = token.cancelled() => return Ok(()), 209 + _ = tokio::time::sleep( 210 + Duration::from_secs(backoff_secs) 211 + ) => {} 212 + } 213 + backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 214 + continue 'reconnect; 210 215 } 211 - backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 212 216 } 213 - continue 'reconnect; 214 217 } 215 218 Event::Stream(Some(Ok(msg))) => { 216 219 backoff_secs = 1;