very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 844 lines 32 kB view raw
1use crate::db::types::{DbAction, DbRkey, TrimmedDid}; 2use crate::db::{self, CountDeltas, Db, keys, ser_repo_state}; 3use crate::filter::FilterMode; 4use crate::ops; 5use crate::resolver::ResolverError; 6use crate::state::AppState; 7use crate::types::{Commit, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState}; 8 9use fjall::Slice; 10use jacquard_api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 11use jacquard_common::IntoStatic; 12use jacquard_common::error::{ClientError, ClientErrorKind}; 13use jacquard_common::types::cid::Cid; 14use jacquard_common::types::did::Did; 15use jacquard_common::xrpc::{XrpcError, XrpcExt}; 16use jacquard_repo::mst::Mst; 17use jacquard_repo::{BlockStore, MemoryBlockStore}; 18use miette::{Diagnostic, IntoDiagnostic, Result}; 19use reqwest::StatusCode; 20use smol_str::{SmolStr, ToSmolStr}; 21use std::collections::HashMap; 22use std::sync::Arc; 23use std::time::{Duration, Instant}; 24 25use thiserror::Error; 26use tokio::sync::Semaphore; 27use tracing::{Instrument, debug, error, info, trace, warn}; 28#[cfg(feature = "indexer_stream")] 29use { 30 crate::types::{AccountEvt, BroadcastEvent, StoredData, StoredEvent}, 31 jacquard_common::CowStr, 32 std::sync::atomic::Ordering, 33}; 34 35pub mod manager; 36 37use crate::ingest::indexer::{IndexerMessage, IndexerTx}; 38use crate::util::{WatchEnabledExt, url_to_fluent_uri}; 39 40pub struct BackfillWorker { 41 state: Arc<AppState>, 42 buffer_tx: IndexerTx, 43 http: reqwest::Client, 44 semaphore: Arc<Semaphore>, 45 verify_signatures: bool, 46 in_flight: Arc<scc::HashSet<Did<'static>>>, 47 enabled: tokio::sync::watch::Receiver<bool>, 48} 49 50impl BackfillWorker { 51 pub fn new( 52 state: Arc<AppState>, 53 buffer_tx: IndexerTx, 54 timeout: Duration, 55 concurrency_limit: usize, 56 verify_signatures: bool, 57 enabled: tokio::sync::watch::Receiver<bool>, 58 ) -> Self { 59 Self { 60 state, 61 buffer_tx, 62 http: reqwest::Client::builder() 63 .timeout(timeout) 64 .zstd(true) 65 .brotli(true) 66 .gzip(true) 67 .build() 68 .expect("failed to build http client"), 69 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 70 verify_signatures, 71 in_flight: Arc::new(scc::HashSet::new()), 72 enabled, 73 } 74 } 75} 76 77struct InFlightGuard { 78 did: Did<'static>, 79 set: Arc<scc::HashSet<Did<'static>>>, 80} 81 82impl Drop for InFlightGuard { 83 fn drop(&mut self) { 84 let _ = self.set.remove_sync(&self.did); 85 } 86} 87 88impl BackfillWorker { 89 pub async fn run(mut self) { 90 info!("backfill worker started"); 91 92 loop { 93 self.enabled.wait_enabled("backfill").await; 94 let mut spawned = 0; 95 96 for guard in self.state.db.pending.iter() { 97 let (key, value) = match guard.into_inner() { 98 Ok(kv) => kv, 99 Err(e) => { 100 error!(err = %e, "failed to read pending entry"); 101 db::check_poisoned(&e); 102 continue; 103 } 104 }; 105 106 let did = match TrimmedDid::try_from(value.as_ref()) { 107 Ok(d) => d.to_did(), 108 Err(e) => { 109 error!(err = %e, "invalid did in pending value"); 110 continue; 111 } 112 }; 113 114 // check before trying to acquire a permit so we dont acquire a permit 115 // for no reason, the read will be cheap anyhow 116 if self.in_flight.contains_sync(&did) { 117 continue; 118 } 119 120 let permit = match self.semaphore.clone().try_acquire_owned() { 121 Ok(p) => p, 122 Err(_) => break, 123 }; 124 125 // only mark as in flight if we can acquire a permit 126 if self 127 .in_flight 128 .insert_sync(did.clone().into_static()) 129 .is_err() 130 { 131 // a task is already running, weh 132 // so we don't need this one anymore... 133 break; 134 } 135 136 let guard = InFlightGuard { 137 did: did.clone().into_static(), 138 set: self.in_flight.clone(), 139 }; 140 141 let state = self.state.clone(); 142 let http = self.http.clone(); 143 let did = did.clone(); 144 let buffer_tx = self.buffer_tx.clone(); 145 let verify = self.verify_signatures; 146 147 let span = tracing::info_span!("backfill", did = %did); 148 tokio::spawn( 149 async move { 150 let _guard = guard; 151 let res = 152 did_task(&state, http, buffer_tx, &did, key, permit, verify).await; 153 154 if let Err(e) = res { 155 error!(err = %e, "process failed"); 156 if let BackfillError::Generic(report) = &e { 157 db::check_poisoned_report(report); 158 } 159 } 160 161 // wake worker to pick up more (in case we were sleeping at limit) 162 state.backfill_notify.notify_one(); 163 } 164 .instrument(span), 165 ); 166 167 spawned += 1; 168 } 169 170 if spawned == 0 { 171 // wait for new tasks 172 self.state.backfill_notify.notified().await; 173 } 174 // loop immediately since we might have more tasks 175 } 176 } 177} 178 179async fn did_task( 180 state: &Arc<AppState>, 181 http: reqwest::Client, 182 buffer_tx: IndexerTx, 183 did: &Did<'static>, 184 pending_key: Slice, 185 _permit: tokio::sync::OwnedSemaphorePermit, 186 verify_signatures: bool, 187) -> Result<(), BackfillError> { 188 let db = &state.db; 189 190 match process_did(&state, &http, &did, verify_signatures).await { 191 Ok(Some(_repo_state)) => { 192 let did_key = keys::repo_key(&did); 193 194 // determine old gauge state 195 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 196 let mut batch = db.inner.batch(); 197 let mut count_deltas = CountDeltas::default(); 198 // unconditionally remove from pending 199 batch.remove(&db.pending, pending_key); 200 // remove from resync, just in case 201 batch.remove(&db.resync, &did_key); 202 count_deltas.add_gauge_diff(&GaugeState::Pending, &GaugeState::Synced); 203 let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 204 205 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 206 .await 207 .into_diagnostic()??; 208 db.apply_count_deltas(&count_deltas); 209 drop(reservation); 210 211 let state = state.clone(); 212 tokio::task::spawn_blocking(move || { 213 state 214 .db 215 .inner 216 .persist(fjall::PersistMode::Buffer) 217 .into_diagnostic() 218 }) 219 .await 220 .into_diagnostic()??; 221 222 if let Err(e) = buffer_tx 223 .send(IndexerMessage::BackfillFinished(did.clone())) 224 .await 225 { 226 error!(err = %e, "failed to send BackfillFinished"); 227 } 228 Ok(()) 229 } 230 Ok(None) => Ok(()), 231 Err(BackfillError::Deleted) => { 232 warn!("orphaned pending entry, cleaning up"); 233 let mut batch = db.inner.batch(); 234 let mut count_deltas = CountDeltas::default(); 235 batch.remove(&db.pending, pending_key); 236 count_deltas.add("pending", -1); 237 let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 238 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 239 .await 240 .into_diagnostic()??; 241 db.apply_count_deltas(&count_deltas); 242 drop(reservation); 243 Ok(()) 244 } 245 Err(e) => { 246 match &e { 247 BackfillError::Ratelimited => { 248 debug!("too many requests"); 249 } 250 BackfillError::Transport(reason) => { 251 error!(%reason, "transport error"); 252 } 253 BackfillError::Generic(e) => { 254 error!(err = %e, "failed"); 255 } 256 BackfillError::Deleted => unreachable!("already handled"), 257 } 258 259 let error_kind = match &e { 260 BackfillError::Ratelimited => ResyncErrorKind::Ratelimited, 261 BackfillError::Transport(_) => ResyncErrorKind::Transport, 262 BackfillError::Generic(_) => ResyncErrorKind::Generic, 263 BackfillError::Deleted => unreachable!("already handled"), 264 }; 265 266 let did_key = keys::repo_key(&did); 267 268 // 1. get current retry count 269 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| { 270 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 271 .transpose() 272 })?; 273 274 let (mut retry_count, prev_kind) = match existing_state { 275 Some(ResyncState::Error { 276 kind, retry_count, .. 277 }) => (retry_count, Some(kind)), 278 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really? 279 None => (0, None), 280 }; 281 282 // Calculate new stats 283 retry_count += 1; 284 let next_retry = ResyncState::next_backoff(retry_count); 285 286 let resync_state = ResyncState::Error { 287 kind: error_kind.clone(), 288 retry_count, 289 next_retry, 290 }; 291 let old_gauge = prev_kind 292 .map(|k| GaugeState::Resync(Some(k))) 293 .unwrap_or(GaugeState::Pending); 294 let new_gauge = GaugeState::Resync(Some(error_kind.clone())); 295 let mut count_deltas = CountDeltas::default(); 296 count_deltas.add_gauge_diff(&old_gauge, &new_gauge); 297 298 let error_string = e.to_string(); 299 300 tokio::task::spawn_blocking({ 301 let state = state.clone(); 302 let did_key = did_key.into_static(); 303 let count_deltas = count_deltas.clone(); 304 move || { 305 // 3. save to resync 306 let serialized_resync_state = 307 rmp_serde::to_vec(&resync_state).into_diagnostic()?; 308 309 // 4. and update the main repo state 310 let serialized_repo_state = if let Some(state_bytes) = 311 state.db.repos.get(&did_key).into_diagnostic()? 312 { 313 let mut state: RepoState = 314 rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 315 state.active = true; 316 state.status = RepoStatus::Error(error_string.into()); 317 Some(rmp_serde::to_vec(&state).into_diagnostic()?) 318 } else { 319 None 320 }; 321 322 let mut batch = state.db.inner.batch(); 323 batch.insert(&state.db.resync, &did_key, serialized_resync_state); 324 batch.remove(&state.db.pending, pending_key.clone()); 325 if let Some(state_bytes) = serialized_repo_state { 326 batch.insert(&state.db.repos, &did_key, state_bytes); 327 } 328 let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas); 329 batch.commit().into_diagnostic().inspect(|_| { 330 state.db.apply_count_deltas(&count_deltas); 331 drop(reservation); 332 }) 333 } 334 }) 335 .await 336 .into_diagnostic()??; 337 338 Err(e) 339 } 340 } 341} 342 343#[derive(Debug, Diagnostic, Error)] 344enum BackfillError { 345 #[error("{0}")] 346 Generic(miette::Report), 347 #[error("too many requests")] 348 Ratelimited, 349 #[error("transport error: {0}")] 350 Transport(SmolStr), 351 #[error("repo was concurrently deleted")] 352 Deleted, 353} 354 355impl From<ClientError> for BackfillError { 356 fn from(e: ClientError) -> Self { 357 match e.kind() { 358 ClientErrorKind::Http { 359 status: StatusCode::TOO_MANY_REQUESTS, 360 } => Self::Ratelimited, 361 ClientErrorKind::Transport => Self::Transport( 362 e.source_err() 363 .expect("transport error without source") 364 .to_smolstr(), 365 ), 366 _ => Self::Generic(e.into()), 367 } 368 } 369} 370 371impl From<miette::Report> for BackfillError { 372 fn from(e: miette::Report) -> Self { 373 Self::Generic(e) 374 } 375} 376 377impl From<ResolverError> for BackfillError { 378 fn from(e: ResolverError) -> Self { 379 match e { 380 ResolverError::Ratelimited => Self::Ratelimited, 381 ResolverError::Transport(s) => Self::Transport(s), 382 ResolverError::Generic(e) => Self::Generic(e), 383 } 384 } 385} 386 387async fn process_did<'i>( 388 app_state: &Arc<AppState>, 389 http: &reqwest::Client, 390 did: &Did<'static>, 391 verify_signatures: bool, 392) -> Result<Option<RepoState<'static>>, BackfillError> { 393 debug!("starting..."); 394 395 // always invalidate doc before backfilling 396 app_state.resolver.invalidate(did).await; 397 398 let db = &app_state.db; 399 let did_key = keys::repo_key(did); 400 let Some(state_bytes) = Db::get(db.repos.clone(), did_key).await? else { 401 return Err(BackfillError::Deleted); 402 }; 403 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes) 404 .into_diagnostic()? 405 .into_static(); 406 let previous_state = state.clone(); 407 408 // 1. resolve pds 409 let start = Instant::now(); 410 let doc = app_state.resolver.resolve_doc(did).await?; 411 let pds = doc.pds.clone(); 412 trace!( 413 pds = %doc.pds, 414 handle = %doc.handle.as_deref().unwrap_or("handle.invalid"), 415 elapsed = %start.elapsed().as_secs_f32(), 416 "resolved to pds" 417 ); 418 state.update_from_doc(doc); 419 420 #[cfg(feature = "indexer_stream")] 421 let emit_identity = |status: &RepoStatus, active: bool| { 422 let status = match status { 423 RepoStatus::Deactivated => "deactivated", 424 RepoStatus::Takendown => "takendown", 425 RepoStatus::Suspended => "suspended", 426 RepoStatus::Deleted => "deleted", 427 RepoStatus::Desynchronized => "desynchronized", 428 RepoStatus::Throttled => "throttled", 429 _ => "", 430 }; 431 let evt = AccountEvt { 432 did: did.clone(), 433 active, 434 status: status 435 .is_empty() 436 .then_some(None) 437 .unwrap_or_else(|| Some(status.into())), 438 }; 439 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 440 }; 441 442 // 2. fetch repo (car) 443 let start = Instant::now(); 444 let req = GetRepo::new().did(did.clone()).build(); 445 let resp = http.xrpc(url_to_fluent_uri(&pds)).send(&req).await?; 446 447 let car_bytes = match resp.into_output() { 448 Ok(o) => o, 449 Err(XrpcError::Xrpc(e)) => { 450 if matches!(e, GetRepoError::RepoNotFound(_)) { 451 warn!("repo not found, deleting"); 452 let mut batch = db.inner.batch(); 453 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) { 454 error!(err = %e, "failed to wipe repo during backfill"); 455 } 456 batch.commit().into_diagnostic()?; 457 // return None so did_task handles the repos/pending count decrements 458 // and skips sending BackfillFinished (nothing to drain for a deleted repo) 459 return Ok(None); 460 } 461 462 let inactive_status = match e { 463 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated), 464 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown), 465 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended), 466 _ => None, 467 }; 468 469 if let Some(status) = inactive_status { 470 warn!(?status, "repo is inactive, stopping backfill"); 471 472 #[cfg(feature = "indexer_stream")] 473 emit_identity(&status, false); 474 475 let resync_state = ResyncState::Gone { 476 status: status.clone(), 477 }; 478 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?; 479 480 let app_state_clone = app_state.clone(); 481 app_state 482 .db 483 .update_repo_state_async(did, move |state, (key, batch)| { 484 state.active = false; 485 state.status = status; 486 batch.insert(&app_state_clone.db.resync, key, resync_bytes); 487 Ok((true, ())) 488 }) 489 .await?; 490 491 // return success so wrapper stops retrying 492 return Ok(Some(previous_state)); 493 } 494 495 Err(e).into_diagnostic()? 496 } 497 Err(e) => Err(e).into_diagnostic()?, 498 }; 499 500 // emit identity event so any consumers know, but only if something changed 501 #[cfg(feature = "indexer_stream")] 502 if state.active != previous_state.active 503 || state.status != previous_state.status 504 || previous_state.pds.is_none() 505 { 506 emit_identity(&state.status, state.active); 507 } 508 509 trace!( 510 bytes = car_bytes.body.len(), 511 elapsed = ?start.elapsed(), 512 "fetched car bytes" 513 ); 514 515 // 3. import repo 516 let start = Instant::now(); 517 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 518 .await 519 .into_diagnostic()?; 520 trace!(elapsed = %start.elapsed().as_secs_f32(), "parsed car"); 521 522 let start = Instant::now(); 523 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 524 trace!( 525 blocks = store.len(), 526 elapsed = ?start.elapsed(), 527 "stored blocks in memory" 528 ); 529 530 // 4. parse root commit to get mst root 531 let root_bytes = store 532 .get(&parsed.root) 533 .await 534 .into_diagnostic()? 535 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 536 537 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 538 debug!( 539 rev = %root_commit.rev, 540 cid = %root_commit.data, 541 "repo at revision" 542 ); 543 544 // 4.5. verify commit signature 545 if verify_signatures { 546 let pubkey = app_state.resolver.resolve_signing_key(did).await?; 547 root_commit 548 .verify(&pubkey) 549 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 550 trace!("signature verified"); 551 } 552 553 let root_commit = Commit::from(root_commit); 554 555 // 5. walk mst 556 let start = Instant::now(); 557 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 558 let leaves = mst.leaves().await.into_diagnostic()?; 559 trace!(elapsed = %start.elapsed().as_secs_f32(), "walked mst"); 560 561 // 6. insert records into db 562 let start = Instant::now(); 563 let result = { 564 let app_state = app_state.clone(); 565 let did = did.clone(); 566 #[cfg(feature = "indexer_stream")] 567 let rev = root_commit.rev; 568 569 tokio::task::spawn_blocking(move || { 570 let filter = app_state.filter.load(); 571 let ephemeral = app_state.ephemeral; 572 let only_index_links = app_state.only_index_links; 573 let mut count = 0; 574 let mut delta = 0; 575 let mut added_blocks = 0; 576 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 577 let mut batch = app_state.db.inner.batch(); 578 let store = mst.storage(); 579 580 let prefix = keys::record_prefix_did(&did); 581 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 582 583 if !ephemeral { 584 for guard in app_state.db.records.prefix(&prefix) { 585 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 586 // key is did|collection|rkey 587 // skip did| 588 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b)); 589 let collection_raw = remaining 590 .next() 591 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 592 let rkey_raw = remaining 593 .next() 594 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 595 596 let collection = std::str::from_utf8(collection_raw) 597 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 598 599 let rkey = keys::parse_rkey(rkey_raw) 600 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 601 602 let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 603 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 604 .to_smolstr(); 605 606 existing_cids.insert((collection.into(), rkey), cid); 607 } 608 } 609 610 let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty(); 611 612 for (key, cid) in leaves { 613 let val_bytes = tokio::runtime::Handle::current() 614 .block_on(store.get(&cid)) 615 .into_diagnostic()?; 616 617 if let Some(val) = val_bytes { 618 let (collection, rkey) = ops::parse_path(&key)?; 619 620 if !filter.matches_collection(collection) { 621 continue; 622 } 623 624 if !signal_seen && filter.matches_signal(collection) { 625 debug!(collection = %collection, "signal matched"); 626 signal_seen = true; 627 } 628 629 let rkey = DbRkey::new(rkey); 630 let path = (collection.to_smolstr(), rkey.clone()); 631 let cid_obj = Cid::ipld(cid); 632 633 *collection_counts.entry(path.0.clone()).or_default() += 1; 634 635 // check if this record already exists with same CID 636 let existing_cid = existing_cids.remove(&path); 637 let action = if let Some(existing_cid) = &existing_cid { 638 if existing_cid == cid_obj.as_str() { 639 trace!(collection = %collection, rkey = %rkey, cid = %cid, "skip unchanged record"); 640 continue; // skip unchanged record 641 } 642 DbAction::Update 643 } else { 644 DbAction::Create 645 }; 646 trace!(collection = %collection, rkey = %rkey, cid = %cid, ?action, "action record"); 647 648 // key is did|collection|rkey 649 let db_key = keys::record_key(&did, collection, &rkey); 650 651 let cid_raw = cid.to_bytes(); 652 let block_key = Slice::from(keys::block_key(collection, &cid_raw)); 653 if !ephemeral { 654 if !only_index_links { 655 batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref()); 656 } 657 batch.insert(&app_state.db.records, db_key, cid_raw); 658 #[cfg(feature = "backlinks")] 659 if let Ok(value) = serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(val.as_ref()) { 660 crate::backlinks::store::index_record( 661 &mut batch, 662 &app_state.db.backlinks, 663 did.as_str(), 664 collection, 665 &rkey.to_smolstr(), 666 &value, 667 )?; 668 } 669 } 670 671 added_blocks += 1; 672 if action == DbAction::Create { 673 delta += 1; 674 } 675 676 #[cfg(feature = "indexer_stream")] 677 { 678 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 679 let evt = StoredEvent { 680 live: false, 681 did: TrimmedDid::from(&did), 682 rev, 683 collection: CowStr::Borrowed(collection), 684 rkey, 685 action, 686 data: if ephemeral { 687 StoredData::Block(val) 688 } else if only_index_links { 689 StoredData::Nothing 690 } else { 691 StoredData::Ptr(cid_obj.to_ipld().expect("valid cid")) 692 }, 693 }; 694 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 695 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 696 } 697 698 count += 1; 699 } 700 } 701 702 // remove any remaining existing records (they weren't in the new MST) 703 for ((collection, rkey), cid) in existing_cids { 704 trace!(collection = %collection, rkey = %rkey, cid = %cid, "remove existing record"); 705 706 // we dont have to put if ephemeral around here since 707 // existing_cids will be empty anyway 708 batch.remove( 709 &app_state.db.records, 710 keys::record_key(&did, &collection, &rkey), 711 ); 712 #[cfg(feature = "backlinks")] 713 crate::backlinks::store::delete_record( 714 &mut batch, 715 &app_state.db.backlinks, 716 did.as_str(), 717 &collection, 718 &rkey.to_smolstr(), 719 )?; 720 721 #[cfg(feature = "indexer_stream")] 722 { 723 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 724 let evt = StoredEvent { 725 live: false, 726 did: TrimmedDid::from(&did), 727 rev, 728 collection: CowStr::Borrowed(&collection), 729 rkey, 730 action: DbAction::Delete, 731 data: StoredData::Nothing, 732 }; 733 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 734 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 735 } 736 737 delta -= 1; 738 count += 1; 739 } 740 741 if !signal_seen { 742 trace!(signals = ?filter.signals, "no signal-matching records found, discarding repo"); 743 return Ok::<_, miette::Report>(None); 744 } 745 746 // 6. update data, status is updated in worker shard 747 state.root = Some(root_commit); 748 state.touch(); 749 750 batch.insert( 751 &app_state.db.repos, 752 keys::repo_key(&did), 753 ser_repo_state(&state)?, 754 ); 755 756 let metadata_key = keys::repo_metadata_key(&did); 757 let metadata_bytes = app_state 758 .db 759 .repo_metadata 760 .get(&metadata_key) 761 .into_diagnostic()? 762 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?; 763 let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?; 764 metadata.tracked = true; 765 batch.insert( 766 &app_state.db.repo_metadata, 767 &metadata_key, 768 crate::db::ser_repo_meta(&metadata)?, 769 ); 770 771 // add the counts 772 if !ephemeral { 773 db::replace_record_counts( 774 &mut batch, 775 &app_state.db, 776 &did, 777 collection_counts.iter().map(|(col, cnt)| (col.as_str(), *cnt)), 778 )?; 779 } 780 781 let mut count_deltas = CountDeltas::default(); 782 if delta != 0 { 783 count_deltas.add("records", delta); 784 } 785 if added_blocks > 0 { 786 count_deltas.add("blocks", added_blocks); 787 } 788 let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas); 789 batch.commit().into_diagnostic()?; 790 app_state.db.apply_count_deltas(&count_deltas); 791 drop(reservation); 792 793 Ok::<_, miette::Report>(Some(count)) 794 }) 795 .await 796 .into_diagnostic()?? 797 }; 798 799 let metadata_key = keys::repo_metadata_key(did); 800 let metadata_bytes = db 801 .repo_metadata 802 .get(&metadata_key) 803 .into_diagnostic()? 804 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?; 805 let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?; 806 807 let Some(count) = result else { 808 // signal mode: no signal-matching records found, clean up the optimistically-added repo 809 let did_key = keys::repo_key(did); 810 let backfill_pending_key = keys::pending_key(metadata.index_id); 811 let app_state = app_state.clone(); 812 tokio::task::spawn_blocking(move || { 813 let mut batch = app_state.db.inner.batch(); 814 let mut count_deltas = CountDeltas::default(); 815 batch.remove(&app_state.db.repos, &did_key); 816 batch.remove(&app_state.db.repo_metadata, &metadata_key); 817 batch.remove(&app_state.db.pending, backfill_pending_key); 818 count_deltas.add("repos", -1); 819 count_deltas.add("pending", -1); 820 let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas); 821 batch.commit().into_diagnostic().inspect(|_| { 822 app_state.db.apply_count_deltas(&count_deltas); 823 drop(reservation); 824 }) 825 }) 826 .await 827 .into_diagnostic()??; 828 return Ok(None); 829 }; 830 831 trace!(ops = count, elapsed = %start.elapsed().as_secs_f32(), "did ops"); 832 trace!( 833 elapsed = %start.elapsed().as_secs_f32(), 834 "committed backfill batch" 835 ); 836 837 #[cfg(feature = "indexer_stream")] 838 let _ = db.event_tx.send(BroadcastEvent::Persisted( 839 db.next_event_id.load(Ordering::SeqCst) - 1, 840 )); 841 842 trace!("complete"); 843 Ok(Some(previous_state)) 844}