lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

oops better cursor fix

phil b485b6f6 9df10eb0

+27 -17
+27 -17
src/sync/firehose/mod.rs
··· 92 92 MAX_COMMIT_WORKERS, 93 93 self.validate_births, 94 94 ); 95 - let mut bump_seq = false; 95 + // When Some, use this cursor on the next reconnect instead of loading 96 + // from storage — used to step past an un-decodable frame one position 97 + // at a time. Consumed by `take()` each iteration; set again only on a 98 + // fresh Decode error. 99 + let mut skip_cursor: Option<u64> = None; 96 100 let mut last_seq: i64 = 0; 97 101 let mut cursor_tick = Instant::now(); 98 102 ··· 101 105 return Ok(()); 102 106 } 103 107 104 - // Reload cursor on every reconnect so we pick up whatever was last 105 - // persisted, even if the previous connection died mid-event. 106 - let mut cursor = { 108 + // On a decode error we skip forward from wherever we connected 109 + // last; otherwise reload from storage so we pick up the latest 110 + // persisted watermark. 111 + let connect_cursor: Option<u64> = if let Some(skip) = skip_cursor.take() { 112 + Some(skip) 113 + } else { 107 114 let db = self.db.clone(); 108 115 let host = self.host.clone(); 109 116 tokio::task::spawn_blocking(move || storage::firehose_cursor::get(&db, &host)) 110 117 .await?? 111 118 }; 112 - if bump_seq { 113 - cursor = cursor.map(|c| c + 1); 114 - } 115 119 let params = SubscribeRepos { 116 - cursor: cursor.map(|c| c as i64), 120 + cursor: connect_cursor.map(|c| c as i64), 117 121 }; 118 122 119 123 info!( 120 124 host = %self.host, 121 - cursor = ?cursor, 125 + cursor = ?connect_cursor, 122 126 "connecting to firehose", 123 127 ); 124 128 ··· 141 145 _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {} 142 146 } 143 147 backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 144 - bump_seq = false; 145 148 continue 'reconnect; 146 149 } 147 150 }; ··· 183 186 match event { 184 187 Event::Stream(None) => { 185 188 info!(host = %self.host, "firehose stream ended; reconnecting"); 186 - bump_seq = false; 187 189 continue 'reconnect; 188 190 } 189 191 Event::Stream(Some(Err(e))) => { 190 - let (delay, bump) = match e.kind() { 191 - StreamErrorKind::Closed => (0, false), 192 - StreamErrorKind::Decode => (0, true), 193 - _ => (backoff_secs, false), 192 + let delay = match e.kind() { 193 + StreamErrorKind::Closed => 0, 194 + StreamErrorKind::Decode => { 195 + // Step past the un-decodable frame. We don't 196 + // know its seq; use whichever high-water mark 197 + // is furthest ahead — last_seq if we processed 198 + // events in this session, connect_cursor if the 199 + // error hit before any good events arrived. 200 + let hwm = connect_cursor.unwrap_or(0).max(last_seq as u64); 201 + skip_cursor = Some(hwm + 1); 202 + 0 203 + } 204 + _ => backoff_secs, 194 205 }; 195 206 warn!( 196 207 error = %e, 197 208 host = %self.host, 198 209 delay_secs = delay, 199 - bump, 210 + skip_cursor, 200 211 "firehose stream error; will reconnect", 201 212 ); 202 213 if delay > 0 { ··· 206 217 } 207 218 backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 208 219 } 209 - bump_seq = bump; 210 220 continue 'reconnect; 211 221 } 212 222 Event::Stream(Some(Ok(msg))) => {