lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: missing resync buffer drains

phil 99ef9b2e 4f9f062e

+52 -4
+19
src/storage/resync_buffer.rs
··· 59 59 let key = key(did, seq); 60 60 db.ks.insert(key, cbor)?; 61 61 db.stats.resync_buffer_count.fetch_add(1, Ordering::Relaxed); 62 + metrics::gauge!("lightrail_resync_buffer_depth").increment(1); 62 63 Ok(()) 63 64 } 64 65 ··· 86 87 Ok(events) 87 88 } 88 89 90 + /// Delete all buffered events for `did`, returning the number removed. 91 + /// 92 + /// Called when a DID is permanently abandoned (e.g. max not-found retries 93 + /// exhausted) to prevent orphaned buffer entries from inflating the count. 94 + pub fn drain_buffer(db: &DbRef, did: Did<'_>) -> StorageResult<u64> { 95 + let prefix = key_prefix(did); 96 + let mut count: u64 = 0; 97 + for guard in db.ks.prefix(&prefix) { 98 + let (key_slice, _) = guard.into_inner()?; 99 + db.ks.remove(key_slice.as_ref())?; 100 + db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed); 101 + metrics::gauge!("lightrail_resync_buffer_depth").decrement(1); 102 + count += 1; 103 + } 104 + Ok(count) 105 + } 106 + 89 107 /// Delete a single buffered event after it has been successfully applied. 90 108 /// 91 109 /// Call this after confirming the event was processed, not before — if the ··· 97 115 let key = key(did, seq); 98 116 db.ks.remove(key)?; 99 117 db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed); 118 + metrics::gauge!("lightrail_resync_buffer_depth").decrement(1); 100 119 Ok(()) 101 120 } 102 121
+1
src/sync/firehose/account_event.rs
··· 58 58 }, 59 59 ); 60 60 let n_removed = if tombstone { 61 + // TODO: we might actually want to queue tombstones and work in bounded-sized delete batches 61 62 storage::collection_index::remove_all_into(&mut batch, &db, &account.did)? 62 63 } else { 63 64 0
+32 -4
src/sync/resync/dispatcher.rs
··· 244 244 metrics::counter!("lightrail_resync_completed_total", 245 245 "outcome" => "rate_limited") 246 246 .increment(1); 247 + drain_stale_buffer(&did, &db).await; 247 248 cooling_hosts.insert(pds.clone(), Instant::now() + RATE_LIMIT_COOLDOWN); 248 249 warn!(did = %did, pds = %pds, cooldown_secs = RATE_LIMIT_COOLDOWN.as_secs(), 249 250 "PDS rate-limited; cooling down"); ··· 443 444 WorkerOutcome::Retry { error, retry_count } => { 444 445 metrics::counter!("lightrail_resync_completed_total", "outcome" => "retry") 445 446 .increment(1); 447 + drain_stale_buffer(&did, &db).await; 446 448 transition_state( 447 449 db.clone(), 448 450 did.clone(), ··· 467 469 WorkerOutcome::NotFound { retry_count } => { 468 470 metrics::counter!("lightrail_resync_completed_total", "outcome" => "not_found") 469 471 .increment(1); 472 + drain_stale_buffer(&did, &db).await; 470 473 let new_retry = retry_count.saturating_add(1); 471 474 if let Some(delay) = not_found_backoff(new_retry) { 472 475 let item = ResyncItem { ··· 540 543 // Ack to avoid the entry accumulating across future resyncs. 541 544 let did_ack = did.clone(); 542 545 let db_ack = db.clone(); 543 - if let Err(e) = tokio::task::spawn_blocking(move || { 546 + match tokio::task::spawn_blocking(move || { 544 547 crate::storage::resync_buffer::ack_buffer_entry(&db_ack, did_ack, seq) 545 548 }) 546 549 .await 547 550 { 548 - warn!(did = %did, seq, error = %e, "failed to ack malformed buffered commit"); 551 + Ok(Ok(())) => {} 552 + Ok(Err(e)) => { 553 + warn!(did = %did, seq, error = %e, "failed to ack malformed buffered commit") 554 + } 555 + Err(e) => { 556 + warn!(did = %did, seq, error = %e, "ack task panicked for malformed buffered commit") 557 + } 549 558 } 550 559 continue; 551 560 } ··· 561 570 562 571 let did_ack = did.clone(); 563 572 let db_ack = db.clone(); 564 - if let Err(e) = tokio::task::spawn_blocking(move || { 573 + match tokio::task::spawn_blocking(move || { 565 574 crate::storage::resync_buffer::ack_buffer_entry(&db_ack, did_ack, seq) 566 575 }) 567 576 .await 568 577 { 569 - warn!(did = %did, seq, error = %e, "failed to ack buffered commit"); 578 + Ok(Ok(())) => {} 579 + Ok(Err(e)) => warn!(did = %did, seq, error = %e, "failed to ack buffered commit"), 580 + Err(e) => warn!(did = %did, seq, error = %e, "ack task panicked for buffered commit"), 570 581 } 571 582 } 572 583 ··· 627 638 }) 628 639 .await??; 629 640 Ok(()) 641 + } 642 + 643 + /// Drain all buffered firehose events for a DID after a failed resync. 644 + /// 645 + /// The next resync attempt will establish fresh ground truth, so events 646 + /// buffered during the failed attempt are stale and should be discarded. 647 + async fn drain_stale_buffer(did: &Did<'static>, db: &DbRef) { 648 + let did_owned = did.clone(); 649 + let db = db.clone(); 650 + match tokio::task::spawn_blocking(move || storage::resync_buffer::drain_buffer(&db, did_owned)) 651 + .await 652 + { 653 + Ok(Ok(0)) => {} 654 + Ok(Ok(n)) => debug!(did = %did, drained = n, "drained stale buffer entries"), 655 + Ok(Err(e)) => warn!(did = %did, error = %e, "failed to drain stale buffer"), 656 + Err(e) => warn!(did = %did, error = %e, "drain task panicked"), 657 + } 630 658 } 631 659 632 660 fn backoff(retry_count: u16) -> Duration {