very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::db::types::{DbAction, DbRkey, TrimmedDid};
2use crate::db::{self, CountDeltas, Db, keys, ser_repo_state};
3use crate::filter::FilterMode;
4use crate::ops;
5use crate::resolver::ResolverError;
6use crate::state::AppState;
7use crate::types::{Commit, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState};
8
9use fjall::Slice;
10use jacquard_api::com_atproto::sync::get_repo::{GetRepo, GetRepoError};
11use jacquard_common::IntoStatic;
12use jacquard_common::error::{ClientError, ClientErrorKind};
13use jacquard_common::types::cid::Cid;
14use jacquard_common::types::did::Did;
15use jacquard_common::xrpc::{XrpcError, XrpcExt};
16use jacquard_repo::mst::Mst;
17use jacquard_repo::{BlockStore, MemoryBlockStore};
18use miette::{Diagnostic, IntoDiagnostic, Result};
19use reqwest::StatusCode;
20use smol_str::{SmolStr, ToSmolStr};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use thiserror::Error;
26use tokio::sync::Semaphore;
27use tracing::{Instrument, debug, error, info, trace, warn};
28#[cfg(feature = "indexer_stream")]
29use {
30 crate::types::{AccountEvt, BroadcastEvent, StoredData, StoredEvent},
31 jacquard_common::CowStr,
32 std::sync::atomic::Ordering,
33};
34
35pub mod manager;
36
37use crate::ingest::indexer::{IndexerMessage, IndexerTx};
38use crate::util::{WatchEnabledExt, url_to_fluent_uri};
39
40pub struct BackfillWorker {
41 state: Arc<AppState>,
42 buffer_tx: IndexerTx,
43 http: reqwest::Client,
44 semaphore: Arc<Semaphore>,
45 verify_signatures: bool,
46 in_flight: Arc<scc::HashSet<Did<'static>>>,
47 enabled: tokio::sync::watch::Receiver<bool>,
48}
49
50impl BackfillWorker {
51 pub fn new(
52 state: Arc<AppState>,
53 buffer_tx: IndexerTx,
54 timeout: Duration,
55 concurrency_limit: usize,
56 verify_signatures: bool,
57 enabled: tokio::sync::watch::Receiver<bool>,
58 ) -> Self {
59 Self {
60 state,
61 buffer_tx,
62 http: reqwest::Client::builder()
63 .timeout(timeout)
64 .zstd(true)
65 .brotli(true)
66 .gzip(true)
67 .build()
68 .expect("failed to build http client"),
69 semaphore: Arc::new(Semaphore::new(concurrency_limit)),
70 verify_signatures,
71 in_flight: Arc::new(scc::HashSet::new()),
72 enabled,
73 }
74 }
75}
76
77struct InFlightGuard {
78 did: Did<'static>,
79 set: Arc<scc::HashSet<Did<'static>>>,
80}
81
82impl Drop for InFlightGuard {
83 fn drop(&mut self) {
84 let _ = self.set.remove_sync(&self.did);
85 }
86}
87
88impl BackfillWorker {
89 pub async fn run(mut self) {
90 info!("backfill worker started");
91
92 loop {
93 self.enabled.wait_enabled("backfill").await;
94 let mut spawned = 0;
95
96 for guard in self.state.db.pending.iter() {
97 let (key, value) = match guard.into_inner() {
98 Ok(kv) => kv,
99 Err(e) => {
100 error!(err = %e, "failed to read pending entry");
101 db::check_poisoned(&e);
102 continue;
103 }
104 };
105
106 let did = match TrimmedDid::try_from(value.as_ref()) {
107 Ok(d) => d.to_did(),
108 Err(e) => {
109 error!(err = %e, "invalid did in pending value");
110 continue;
111 }
112 };
113
114 // check before trying to acquire a permit so we dont acquire a permit
115 // for no reason, the read will be cheap anyhow
116 if self.in_flight.contains_sync(&did) {
117 continue;
118 }
119
120 let permit = match self.semaphore.clone().try_acquire_owned() {
121 Ok(p) => p,
122 Err(_) => break,
123 };
124
125 // only mark as in flight if we can acquire a permit
126 if self
127 .in_flight
128 .insert_sync(did.clone().into_static())
129 .is_err()
130 {
131 // a task is already running, weh
132 // so we don't need this one anymore...
133 break;
134 }
135
136 let guard = InFlightGuard {
137 did: did.clone().into_static(),
138 set: self.in_flight.clone(),
139 };
140
141 let state = self.state.clone();
142 let http = self.http.clone();
143 let did = did.clone();
144 let buffer_tx = self.buffer_tx.clone();
145 let verify = self.verify_signatures;
146
147 let span = tracing::info_span!("backfill", did = %did);
148 tokio::spawn(
149 async move {
150 let _guard = guard;
151 let res =
152 did_task(&state, http, buffer_tx, &did, key, permit, verify).await;
153
154 if let Err(e) = res {
155 error!(err = %e, "process failed");
156 if let BackfillError::Generic(report) = &e {
157 db::check_poisoned_report(report);
158 }
159 }
160
161 // wake worker to pick up more (in case we were sleeping at limit)
162 state.backfill_notify.notify_one();
163 }
164 .instrument(span),
165 );
166
167 spawned += 1;
168 }
169
170 if spawned == 0 {
171 // wait for new tasks
172 self.state.backfill_notify.notified().await;
173 }
174 // loop immediately since we might have more tasks
175 }
176 }
177}
178
179async fn did_task(
180 state: &Arc<AppState>,
181 http: reqwest::Client,
182 buffer_tx: IndexerTx,
183 did: &Did<'static>,
184 pending_key: Slice,
185 _permit: tokio::sync::OwnedSemaphorePermit,
186 verify_signatures: bool,
187) -> Result<(), BackfillError> {
188 let db = &state.db;
189
190 match process_did(&state, &http, &did, verify_signatures).await {
191 Ok(Some(_repo_state)) => {
192 let did_key = keys::repo_key(&did);
193
194 // determine old gauge state
195 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly.
196 let mut batch = db.inner.batch();
197 let mut count_deltas = CountDeltas::default();
198 // unconditionally remove from pending
199 batch.remove(&db.pending, pending_key);
200 // remove from resync, just in case
201 batch.remove(&db.resync, &did_key);
202 count_deltas.add_gauge_diff(&GaugeState::Pending, &GaugeState::Synced);
203 let reservation = db.stage_count_deltas(&mut batch, &count_deltas);
204
205 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
206 .await
207 .into_diagnostic()??;
208 db.apply_count_deltas(&count_deltas);
209 drop(reservation);
210
211 let state = state.clone();
212 tokio::task::spawn_blocking(move || {
213 state
214 .db
215 .inner
216 .persist(fjall::PersistMode::Buffer)
217 .into_diagnostic()
218 })
219 .await
220 .into_diagnostic()??;
221
222 if let Err(e) = buffer_tx
223 .send(IndexerMessage::BackfillFinished(did.clone()))
224 .await
225 {
226 error!(err = %e, "failed to send BackfillFinished");
227 }
228 Ok(())
229 }
230 Ok(None) => Ok(()),
231 Err(BackfillError::Deleted) => {
232 warn!("orphaned pending entry, cleaning up");
233 let mut batch = db.inner.batch();
234 let mut count_deltas = CountDeltas::default();
235 batch.remove(&db.pending, pending_key);
236 count_deltas.add("pending", -1);
237 let reservation = db.stage_count_deltas(&mut batch, &count_deltas);
238 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
239 .await
240 .into_diagnostic()??;
241 db.apply_count_deltas(&count_deltas);
242 drop(reservation);
243 Ok(())
244 }
245 Err(e) => {
246 match &e {
247 BackfillError::Ratelimited => {
248 debug!("too many requests");
249 }
250 BackfillError::Transport(reason) => {
251 error!(%reason, "transport error");
252 }
253 BackfillError::Generic(e) => {
254 error!(err = %e, "failed");
255 }
256 BackfillError::Deleted => unreachable!("already handled"),
257 }
258
259 let error_kind = match &e {
260 BackfillError::Ratelimited => ResyncErrorKind::Ratelimited,
261 BackfillError::Transport(_) => ResyncErrorKind::Transport,
262 BackfillError::Generic(_) => ResyncErrorKind::Generic,
263 BackfillError::Deleted => unreachable!("already handled"),
264 };
265
266 let did_key = keys::repo_key(&did);
267
268 // 1. get current retry count
269 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| {
270 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic())
271 .transpose()
272 })?;
273
274 let (mut retry_count, prev_kind) = match existing_state {
275 Some(ResyncState::Error {
276 kind, retry_count, ..
277 }) => (retry_count, Some(kind)),
278 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really?
279 None => (0, None),
280 };
281
282 // Calculate new stats
283 retry_count += 1;
284 let next_retry = ResyncState::next_backoff(retry_count);
285
286 let resync_state = ResyncState::Error {
287 kind: error_kind.clone(),
288 retry_count,
289 next_retry,
290 };
291 let old_gauge = prev_kind
292 .map(|k| GaugeState::Resync(Some(k)))
293 .unwrap_or(GaugeState::Pending);
294 let new_gauge = GaugeState::Resync(Some(error_kind.clone()));
295 let mut count_deltas = CountDeltas::default();
296 count_deltas.add_gauge_diff(&old_gauge, &new_gauge);
297
298 let error_string = e.to_string();
299
300 tokio::task::spawn_blocking({
301 let state = state.clone();
302 let did_key = did_key.into_static();
303 let count_deltas = count_deltas.clone();
304 move || {
305 // 3. save to resync
306 let serialized_resync_state =
307 rmp_serde::to_vec(&resync_state).into_diagnostic()?;
308
309 // 4. and update the main repo state
310 let serialized_repo_state = if let Some(state_bytes) =
311 state.db.repos.get(&did_key).into_diagnostic()?
312 {
313 let mut state: RepoState =
314 rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
315 state.active = true;
316 state.status = RepoStatus::Error(error_string.into());
317 Some(rmp_serde::to_vec(&state).into_diagnostic()?)
318 } else {
319 None
320 };
321
322 let mut batch = state.db.inner.batch();
323 batch.insert(&state.db.resync, &did_key, serialized_resync_state);
324 batch.remove(&state.db.pending, pending_key.clone());
325 if let Some(state_bytes) = serialized_repo_state {
326 batch.insert(&state.db.repos, &did_key, state_bytes);
327 }
328 let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas);
329 batch.commit().into_diagnostic().inspect(|_| {
330 state.db.apply_count_deltas(&count_deltas);
331 drop(reservation);
332 })
333 }
334 })
335 .await
336 .into_diagnostic()??;
337
338 Err(e)
339 }
340 }
341}
342
343#[derive(Debug, Diagnostic, Error)]
344enum BackfillError {
345 #[error("{0}")]
346 Generic(miette::Report),
347 #[error("too many requests")]
348 Ratelimited,
349 #[error("transport error: {0}")]
350 Transport(SmolStr),
351 #[error("repo was concurrently deleted")]
352 Deleted,
353}
354
355impl From<ClientError> for BackfillError {
356 fn from(e: ClientError) -> Self {
357 match e.kind() {
358 ClientErrorKind::Http {
359 status: StatusCode::TOO_MANY_REQUESTS,
360 } => Self::Ratelimited,
361 ClientErrorKind::Transport => Self::Transport(
362 e.source_err()
363 .expect("transport error without source")
364 .to_smolstr(),
365 ),
366 _ => Self::Generic(e.into()),
367 }
368 }
369}
370
371impl From<miette::Report> for BackfillError {
372 fn from(e: miette::Report) -> Self {
373 Self::Generic(e)
374 }
375}
376
377impl From<ResolverError> for BackfillError {
378 fn from(e: ResolverError) -> Self {
379 match e {
380 ResolverError::Ratelimited => Self::Ratelimited,
381 ResolverError::Transport(s) => Self::Transport(s),
382 ResolverError::Generic(e) => Self::Generic(e),
383 }
384 }
385}
386
387async fn process_did<'i>(
388 app_state: &Arc<AppState>,
389 http: &reqwest::Client,
390 did: &Did<'static>,
391 verify_signatures: bool,
392) -> Result<Option<RepoState<'static>>, BackfillError> {
393 debug!("starting...");
394
395 // always invalidate doc before backfilling
396 app_state.resolver.invalidate(did).await;
397
398 let db = &app_state.db;
399 let did_key = keys::repo_key(did);
400 let Some(state_bytes) = Db::get(db.repos.clone(), did_key).await? else {
401 return Err(BackfillError::Deleted);
402 };
403 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes)
404 .into_diagnostic()?
405 .into_static();
406 let previous_state = state.clone();
407
408 // 1. resolve pds
409 let start = Instant::now();
410 let doc = app_state.resolver.resolve_doc(did).await?;
411 let pds = doc.pds.clone();
412 trace!(
413 pds = %doc.pds,
414 handle = %doc.handle.as_deref().unwrap_or("handle.invalid"),
415 elapsed = %start.elapsed().as_secs_f32(),
416 "resolved to pds"
417 );
418 state.update_from_doc(doc);
419
420 #[cfg(feature = "indexer_stream")]
421 let emit_identity = |status: &RepoStatus, active: bool| {
422 let status = match status {
423 RepoStatus::Deactivated => "deactivated",
424 RepoStatus::Takendown => "takendown",
425 RepoStatus::Suspended => "suspended",
426 RepoStatus::Deleted => "deleted",
427 RepoStatus::Desynchronized => "desynchronized",
428 RepoStatus::Throttled => "throttled",
429 _ => "",
430 };
431 let evt = AccountEvt {
432 did: did.clone(),
433 active,
434 status: status
435 .is_empty()
436 .then_some(None)
437 .unwrap_or_else(|| Some(status.into())),
438 };
439 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt));
440 };
441
442 // 2. fetch repo (car)
443 let start = Instant::now();
444 let req = GetRepo::new().did(did.clone()).build();
445 let resp = http.xrpc(url_to_fluent_uri(&pds)).send(&req).await?;
446
447 let car_bytes = match resp.into_output() {
448 Ok(o) => o,
449 Err(XrpcError::Xrpc(e)) => {
450 if matches!(e, GetRepoError::RepoNotFound(_)) {
451 warn!("repo not found, deleting");
452 let mut batch = db.inner.batch();
453 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) {
454 error!(err = %e, "failed to wipe repo during backfill");
455 }
456 batch.commit().into_diagnostic()?;
457 // return None so did_task handles the repos/pending count decrements
458 // and skips sending BackfillFinished (nothing to drain for a deleted repo)
459 return Ok(None);
460 }
461
462 let inactive_status = match e {
463 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated),
464 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown),
465 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended),
466 _ => None,
467 };
468
469 if let Some(status) = inactive_status {
470 warn!(?status, "repo is inactive, stopping backfill");
471
472 #[cfg(feature = "indexer_stream")]
473 emit_identity(&status, false);
474
475 let resync_state = ResyncState::Gone {
476 status: status.clone(),
477 };
478 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?;
479
480 let app_state_clone = app_state.clone();
481 app_state
482 .db
483 .update_repo_state_async(did, move |state, (key, batch)| {
484 state.active = false;
485 state.status = status;
486 batch.insert(&app_state_clone.db.resync, key, resync_bytes);
487 Ok((true, ()))
488 })
489 .await?;
490
491 // return success so wrapper stops retrying
492 return Ok(Some(previous_state));
493 }
494
495 Err(e).into_diagnostic()?
496 }
497 Err(e) => Err(e).into_diagnostic()?,
498 };
499
500 // emit identity event so any consumers know, but only if something changed
501 #[cfg(feature = "indexer_stream")]
502 if state.active != previous_state.active
503 || state.status != previous_state.status
504 || previous_state.pds.is_none()
505 {
506 emit_identity(&state.status, state.active);
507 }
508
509 trace!(
510 bytes = car_bytes.body.len(),
511 elapsed = ?start.elapsed(),
512 "fetched car bytes"
513 );
514
515 // 3. import repo
516 let start = Instant::now();
517 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body)
518 .await
519 .into_diagnostic()?;
520 trace!(elapsed = %start.elapsed().as_secs_f32(), "parsed car");
521
522 let start = Instant::now();
523 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
524 trace!(
525 blocks = store.len(),
526 elapsed = ?start.elapsed(),
527 "stored blocks in memory"
528 );
529
530 // 4. parse root commit to get mst root
531 let root_bytes = store
532 .get(&parsed.root)
533 .await
534 .into_diagnostic()?
535 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
536
537 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?;
538 debug!(
539 rev = %root_commit.rev,
540 cid = %root_commit.data,
541 "repo at revision"
542 );
543
544 // 4.5. verify commit signature
545 if verify_signatures {
546 let pubkey = app_state.resolver.resolve_signing_key(did).await?;
547 root_commit
548 .verify(&pubkey)
549 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
550 trace!("signature verified");
551 }
552
553 let root_commit = Commit::from(root_commit);
554
555 // 5. walk mst
556 let start = Instant::now();
557 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None);
558 let leaves = mst.leaves().await.into_diagnostic()?;
559 trace!(elapsed = %start.elapsed().as_secs_f32(), "walked mst");
560
561 // 6. insert records into db
562 let start = Instant::now();
563 let result = {
564 let app_state = app_state.clone();
565 let did = did.clone();
566 #[cfg(feature = "indexer_stream")]
567 let rev = root_commit.rev;
568
569 tokio::task::spawn_blocking(move || {
570 let filter = app_state.filter.load();
571 let ephemeral = app_state.ephemeral;
572 let only_index_links = app_state.only_index_links;
573 let mut count = 0;
574 let mut delta = 0;
575 let mut added_blocks = 0;
576 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new();
577 let mut batch = app_state.db.inner.batch();
578 let store = mst.storage();
579
580 let prefix = keys::record_prefix_did(&did);
581 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new();
582
583 if !ephemeral {
584 for guard in app_state.db.records.prefix(&prefix) {
585 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
586 // key is did|collection|rkey
587 // skip did|
588 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b));
589 let collection_raw = remaining
590 .next()
591 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
592 let rkey_raw = remaining
593 .next()
594 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
595
596 let collection = std::str::from_utf8(collection_raw)
597 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?;
598
599 let rkey = keys::parse_rkey(rkey_raw)
600 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?;
601
602 let cid = cid::Cid::read_bytes(cid_bytes.as_ref())
603 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))?
604 .to_smolstr();
605
606 existing_cids.insert((collection.into(), rkey), cid);
607 }
608 }
609
610 let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty();
611
612 for (key, cid) in leaves {
613 let val_bytes = tokio::runtime::Handle::current()
614 .block_on(store.get(&cid))
615 .into_diagnostic()?;
616
617 if let Some(val) = val_bytes {
618 let (collection, rkey) = ops::parse_path(&key)?;
619
620 if !filter.matches_collection(collection) {
621 continue;
622 }
623
624 if !signal_seen && filter.matches_signal(collection) {
625 debug!(collection = %collection, "signal matched");
626 signal_seen = true;
627 }
628
629 let rkey = DbRkey::new(rkey);
630 let path = (collection.to_smolstr(), rkey.clone());
631 let cid_obj = Cid::ipld(cid);
632
633 *collection_counts.entry(path.0.clone()).or_default() += 1;
634
635 // check if this record already exists with same CID
636 let existing_cid = existing_cids.remove(&path);
637 let action = if let Some(existing_cid) = &existing_cid {
638 if existing_cid == cid_obj.as_str() {
639 trace!(collection = %collection, rkey = %rkey, cid = %cid, "skip unchanged record");
640 continue; // skip unchanged record
641 }
642 DbAction::Update
643 } else {
644 DbAction::Create
645 };
646 trace!(collection = %collection, rkey = %rkey, cid = %cid, ?action, "action record");
647
648 // key is did|collection|rkey
649 let db_key = keys::record_key(&did, collection, &rkey);
650
651 let cid_raw = cid.to_bytes();
652 let block_key = Slice::from(keys::block_key(collection, &cid_raw));
653 if !ephemeral {
654 if !only_index_links {
655 batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref());
656 }
657 batch.insert(&app_state.db.records, db_key, cid_raw);
658 #[cfg(feature = "backlinks")]
659 if let Ok(value) = serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(val.as_ref()) {
660 crate::backlinks::store::index_record(
661 &mut batch,
662 &app_state.db.backlinks,
663 did.as_str(),
664 collection,
665 &rkey.to_smolstr(),
666 &value,
667 )?;
668 }
669 }
670
671 added_blocks += 1;
672 if action == DbAction::Create {
673 delta += 1;
674 }
675
676 #[cfg(feature = "indexer_stream")]
677 {
678 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
679 let evt = StoredEvent {
680 live: false,
681 did: TrimmedDid::from(&did),
682 rev,
683 collection: CowStr::Borrowed(collection),
684 rkey,
685 action,
686 data: if ephemeral {
687 StoredData::Block(val)
688 } else if only_index_links {
689 StoredData::Nothing
690 } else {
691 StoredData::Ptr(cid_obj.to_ipld().expect("valid cid"))
692 },
693 };
694 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
695 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
696 }
697
698 count += 1;
699 }
700 }
701
702 // remove any remaining existing records (they weren't in the new MST)
703 for ((collection, rkey), cid) in existing_cids {
704 trace!(collection = %collection, rkey = %rkey, cid = %cid, "remove existing record");
705
706 // we dont have to put if ephemeral around here since
707 // existing_cids will be empty anyway
708 batch.remove(
709 &app_state.db.records,
710 keys::record_key(&did, &collection, &rkey),
711 );
712 #[cfg(feature = "backlinks")]
713 crate::backlinks::store::delete_record(
714 &mut batch,
715 &app_state.db.backlinks,
716 did.as_str(),
717 &collection,
718 &rkey.to_smolstr(),
719 )?;
720
721 #[cfg(feature = "indexer_stream")]
722 {
723 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
724 let evt = StoredEvent {
725 live: false,
726 did: TrimmedDid::from(&did),
727 rev,
728 collection: CowStr::Borrowed(&collection),
729 rkey,
730 action: DbAction::Delete,
731 data: StoredData::Nothing,
732 };
733 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
734 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
735 }
736
737 delta -= 1;
738 count += 1;
739 }
740
741 if !signal_seen {
742 trace!(signals = ?filter.signals, "no signal-matching records found, discarding repo");
743 return Ok::<_, miette::Report>(None);
744 }
745
746 // 6. update data, status is updated in worker shard
747 state.root = Some(root_commit);
748 state.touch();
749
750 batch.insert(
751 &app_state.db.repos,
752 keys::repo_key(&did),
753 ser_repo_state(&state)?,
754 );
755
756 let metadata_key = keys::repo_metadata_key(&did);
757 let metadata_bytes = app_state
758 .db
759 .repo_metadata
760 .get(&metadata_key)
761 .into_diagnostic()?
762 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
763 let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
764 metadata.tracked = true;
765 batch.insert(
766 &app_state.db.repo_metadata,
767 &metadata_key,
768 crate::db::ser_repo_meta(&metadata)?,
769 );
770
771 // add the counts
772 if !ephemeral {
773 db::replace_record_counts(
774 &mut batch,
775 &app_state.db,
776 &did,
777 collection_counts.iter().map(|(col, cnt)| (col.as_str(), *cnt)),
778 )?;
779 }
780
781 let mut count_deltas = CountDeltas::default();
782 if delta != 0 {
783 count_deltas.add("records", delta);
784 }
785 if added_blocks > 0 {
786 count_deltas.add("blocks", added_blocks);
787 }
788 let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas);
789 batch.commit().into_diagnostic()?;
790 app_state.db.apply_count_deltas(&count_deltas);
791 drop(reservation);
792
793 Ok::<_, miette::Report>(Some(count))
794 })
795 .await
796 .into_diagnostic()??
797 };
798
799 let metadata_key = keys::repo_metadata_key(did);
800 let metadata_bytes = db
801 .repo_metadata
802 .get(&metadata_key)
803 .into_diagnostic()?
804 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
805 let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?;
806
807 let Some(count) = result else {
808 // signal mode: no signal-matching records found, clean up the optimistically-added repo
809 let did_key = keys::repo_key(did);
810 let backfill_pending_key = keys::pending_key(metadata.index_id);
811 let app_state = app_state.clone();
812 tokio::task::spawn_blocking(move || {
813 let mut batch = app_state.db.inner.batch();
814 let mut count_deltas = CountDeltas::default();
815 batch.remove(&app_state.db.repos, &did_key);
816 batch.remove(&app_state.db.repo_metadata, &metadata_key);
817 batch.remove(&app_state.db.pending, backfill_pending_key);
818 count_deltas.add("repos", -1);
819 count_deltas.add("pending", -1);
820 let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas);
821 batch.commit().into_diagnostic().inspect(|_| {
822 app_state.db.apply_count_deltas(&count_deltas);
823 drop(reservation);
824 })
825 })
826 .await
827 .into_diagnostic()??;
828 return Ok(None);
829 };
830
831 trace!(ops = count, elapsed = %start.elapsed().as_secs_f32(), "did ops");
832 trace!(
833 elapsed = %start.elapsed().as_secs_f32(),
834 "committed backfill batch"
835 );
836
837 #[cfg(feature = "indexer_stream")]
838 let _ = db.event_tx.send(BroadcastEvent::Persisted(
839 db.next_event_id.load(Ordering::SeqCst) - 1,
840 ));
841
842 trace!("complete");
843 Ok(Some(previous_state))
844}