lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

dead firehose timeout

phil 30ba6189 a8c24569

+19 -2
+2 -2
hacking.md
··· 95 95 - [x] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one. 96 96 - [?] boooo we might need more handling for pre-sync1.1 repos if they don't include adjacent keys 97 97 - [x] split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace 98 - - [ ] firehose websocket 98 + - [x] firehose websocket 99 99 - [-] ~~ping/pong (unless jacquard is already doing it):~~ seems like no but we can skip it 100 - - [ ] no-events-received timeout reconnect 100 + - [x] no-events-received timeout reconnect 101 101 - [ ] resync short-circuit: tiny repos may actually return their entire CAR for getRecord 102 102 - [ ] commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys 103 103 - [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale?
+17
src/sync/firehose/mod.rs
··· 45 45 46 46 /// Maximum reconnect delay. 47 47 const MAX_BACKOFF_SECS: u64 = 64; 48 + /// Reconnect if no message is received for this long. 49 + const IDLE_TIMEOUT: Duration = Duration::from_secs(180); 48 50 49 51 /// Manages a single logical connection to a relay firehose, with reconnection. 50 52 pub struct Subscriber { ··· 138 140 139 141 // `_sink` kept alive so the write half of the WebSocket isn't dropped. 140 142 let (_sink, mut messages) = stream.into_stream(); 143 + let mut last_message_at = Instant::now(); 141 144 142 145 loop { 143 146 // Unified select! driving both the WebSocket stream and worker ··· 159 162 >, 160 163 >, 161 164 ), 165 + Timeout, 162 166 } 163 167 164 168 let event = tokio::select! { ··· 166 170 r = messages.next() => Event::Stream(r), 167 171 r = dispatcher.workers_mut().join_next_with_id(), 168 172 if dispatcher.has_workers() => Event::Worker(r), 173 + _ = tokio::time::sleep( 174 + IDLE_TIMEOUT.saturating_sub(last_message_at.elapsed()) 175 + ) => Event::Timeout, 169 176 }; 170 177 171 178 match event { ··· 174 181 continue 'reconnect; 175 182 } 176 183 Event::Stream(Some(Err(e))) => { 184 + last_message_at = Instant::now(); 177 185 match e.kind() { 178 186 // Application-level decode failure on a healthy 179 187 // WebSocket connection: jacquard's filter_map stream ··· 213 221 } 214 222 } 215 223 Event::Stream(Some(Ok(msg))) => { 224 + last_message_at = Instant::now(); 216 225 backoff_secs = 1; 217 226 if let Some(seq) = self.dispatch(msg, &mut dispatcher) { 218 227 last_seq = seq; ··· 225 234 dispatcher.handle_completion(join_err.id(), Err(join_err)); 226 235 } 227 236 Event::Worker(None) => {} // JoinSet drained (shouldn't happen with guard) 237 + Event::Timeout => { 238 + warn!( 239 + host = %self.host, 240 + timeout_secs = IDLE_TIMEOUT.as_secs(), 241 + "firehose idle timeout; reconnecting", 242 + ); 243 + continue 'reconnect; 244 + } 228 245 } 229 246 230 247 if cursor_tick.elapsed() >= self.cursor_save_interval {