···150150 // the poll -> stream task: poll until we're caught up, then switch to stream.
151151 // on stream disconnect, fall back to polling to resync.
152152 let send_page_bg = send_page.clone();
153153+ let db_for_poll = db.clone();
153154 tasks.spawn(async move {
154155 let mut current_seq = latest_seq;
155156 loop {
···213214 Ok(Ok(())) => tracing::info!("stream closed cleanly, resyncing via poll"),
214215 Ok(Err(e)) => tracing::warn!("stream error: {e}, resyncing via poll"),
215216 Err(e) => tracing::warn!("stream task join error: {e}"),
217217+ }
218218+219219+ // rest current_seq to what's actually in the DB. current_seq tracks
220220+ // pages forwarded to seq_pages_to_fjall, which may be ahead of what
221221+ // was actually stored (ops can be dropped by VERIFY=true). polling
222222+ // from the in-memory current_seq would permanently skip those ops.
223223+ let db = db_for_poll.clone();
224224+ match tokio::task::spawn_blocking(move || db.get_latest()).await {
225225+ Ok(Ok(Some((seq, _)))) => {
226226+ if seq < current_seq {
227227+ tracing::info!(
228228+ "resetting poll cursor from {current_seq} to db latest {seq} to avoid skipping dropped ops"
229229+ );
230230+ current_seq = seq;
231231+ }
232232+ }
233233+ Ok(Ok(None)) => {}
234234+ Ok(Err(e)) => tracing::warn!("failed to read db latest for poll reset: {e}"),
235235+ Err(e) => tracing::warn!("spawn_blocking failed for poll reset: {e}"),
216236 }
217237 }
218238 });