very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[backfill,ingest] decrease repo counter, and get rid of repos in in-flight properly if we cant acquire a permit

dawn 9d643386 384086be

+36 -5
+9 -2
src/backfill/mod.rs
··· 118 118 119 119 let permit = match self.semaphore.clone().try_acquire_owned() { 120 120 Ok(p) => p, 121 - Err(_) => break, 121 + Err(_) => { 122 + // remove before breaking so the DID isn't permanently stuck in 123 + // in_flight with no task to clean it up 124 + self.in_flight.remove_sync(&did); 125 + break; 126 + } 122 127 }; 123 128 124 129 let guard = InFlightGuard { ··· 445 450 error!(err = %e, "failed to wipe repo during backfill"); 446 451 } 447 452 batch.commit().into_diagnostic()?; 448 - return Ok(Some(previous_state)); // stop backfill 453 + // return None so did_task handles the repos/pending count decrements 454 + // and skips sending BackfillFinished (nothing to drain for a deleted repo) 455 + return Ok(None); 449 456 } 450 457 451 458 let inactive_status = match e {
+24 -2
src/db/mod.rs
··· 593 593 if delta >= 0 { 594 594 *entry = entry.saturating_add(delta as u64); 595 595 } else { 596 - *entry = entry.saturating_sub(delta.unsigned_abs()); 596 + let decrement = delta.unsigned_abs(); 597 + if *entry < decrement { 598 + error!( 599 + key, 600 + current = *entry, 601 + decrement, 602 + "count underflow !!! this is a bug" 603 + ); 604 + *entry = 0; 605 + } else { 606 + *entry -= decrement; 607 + } 597 608 } 598 609 } 599 610 ··· 606 617 if delta >= 0 { 607 618 *entry = entry.saturating_add(delta as u64); 608 619 } else { 609 - *entry = entry.saturating_sub(delta.unsigned_abs()); 620 + let decrement = delta.unsigned_abs(); 621 + if *entry < decrement { 622 + error!( 623 + key, 624 + current = *entry, 625 + decrement, 626 + "count underflow !!! this is a bug" 627 + ); 628 + *entry = 0; 629 + } else { 630 + *entry -= decrement; 631 + } 610 632 } 611 633 } 612 634
+3 -1
src/ingest/worker.rs
··· 321 321 match Self::process_message(&mut ctx, &msg, did, repo_state, pre_status) 322 322 { 323 323 Ok(RepoProcessResult::Ok(_)) => {} 324 - Ok(RepoProcessResult::Deleted) => {} 324 + Ok(RepoProcessResult::Deleted) => { 325 + state.db.update_count("repos", -1); 326 + } 325 327 Ok(RepoProcessResult::NeedsBackfill(Some(commit))) => { 326 328 if let Err(e) = 327 329 ops::persist_to_resync_buffer(&state.db, did, commit)