lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

metrics for collection births/deaths

phil cfdabe16 d0e5492a

+35 -11
+1
hacking.md
··· 55 55 - [-] copy applicable ones from collectiondir 56 56 - [~] prefix-merge walker (limit by total collections to be merged?) 57 57 - [x] add an all-collections index 58 + - [ ] back out the collections-list index (not doing prefix matching after all) 58 59 - [ ] swap in repo-stream for backfill 59 60 - [ ] with memory limit 60 61 - [ ] with a global concurrency limit for big repos
+12 -5
src/storage/collection_index.rs
··· 280 280 /// Reads the current cbr entries synchronously, then queues the corresponding 281 281 /// rbc and cbr removes into `batch`. Use this when the index cleanup must be 282 282 /// atomic with other writes (e.g. a repo state update on tombstone). 283 + /// Returns the number of collections removed. 283 284 pub fn remove_all_into( 284 285 batch: &mut fjall::OwnedWriteBatch, 285 286 db: &DbRef, 286 287 did: Did<'_>, 287 - ) -> StorageResult<()> { 288 + ) -> StorageResult<usize> { 288 289 let prefix = cbr_prefix(did.clone()); 289 290 let collections: Vec<Vec<u8>> = db 290 291 .ks ··· 298 299 } 299 300 batch.remove(&db.ks, cbr_key.as_slice()); 300 301 } 301 - Ok(()) 302 + Ok(collections.len()) 302 303 } 303 304 304 305 pub fn remove_all(db: &DbRef, did: Did<'_>) -> StorageResult<()> { 305 306 let mut batch = db.database.batch(); 306 - remove_all_into(&mut batch, db, did)?; 307 + remove_all_into(&mut batch, db, did.clone())?; 307 308 batch.commit()?; 308 309 Ok(()) 309 310 } ··· 315 316 /// the index but absent from the snapshot are deleted; collections in the 316 317 /// snapshot but absent from the index are inserted. The whole diff is applied 317 318 /// in a single batch. 319 + /// 320 + /// Returns `(n_inserted, n_removed)`. 318 321 /// 319 322 /// Typically called as `sync_collections(db, did, &snapshot.collections)`. 320 323 pub fn sync_collections( 321 324 db: &DbRef, 322 325 did: Did<'_>, 323 326 collections: &[Nsid<'static>], 324 - ) -> StorageResult<()> { 327 + ) -> StorageResult<(usize, usize)> { 325 328 // Read the current set from the cbr index. Fjall iterates in key order, and 326 329 // the cbr prefix puts the collection NSID last, so the suffix scan is already 327 330 // sorted lexicographically. ··· 341 344 .collect(); 342 345 343 346 let mut batch = db.database.batch(); 347 + let mut n_inserted: usize = 0; 348 + let mut n_removed: usize = 0; 344 349 345 350 // Merge-join two sorted sequences. 346 351 // We separate the peek (to determine ordering) from the advance (next()) ··· 359 364 match ord { 360 365 Ordering::Less => { 361 366 remove_into(&mut batch, db, did.clone(), ei.next().unwrap()); 367 + n_removed += 1; 362 368 } 363 369 Ordering::Greater => { 364 370 insert_into(&mut batch, db, did.clone(), si.next().unwrap()); 371 + n_inserted += 1; 365 372 } 366 373 Ordering::Equal => { 367 374 ei.next(); ··· 371 378 } 372 379 373 380 batch.commit()?; 374 - Ok(()) 381 + Ok((n_inserted, n_removed)) 375 382 } 376 383 377 384 // ---------------------------------------------------------------------------
+9 -3
src/sync/firehose/account_event.rs
··· 56 56 error: None, 57 57 }, 58 58 ); 59 - if tombstone { 60 - storage::collection_index::remove_all_into(&mut batch, &db, account.did.clone())?; 61 - } 59 + let n_removed = if tombstone { 60 + storage::collection_index::remove_all_into(&mut batch, &db, account.did.clone())? 61 + } else { 62 + 0 63 + }; 62 64 batch 63 65 .commit() 64 66 .map_err(Into::<crate::storage::StorageError>::into)?; 67 + if n_removed > 0 { 68 + metrics::counter!("lightrail_collection_deaths_total", "source" => "tombstone") 69 + .increment(n_removed as u64); 70 + } 65 71 Ok(()) 66 72 }
+4 -2
src/sync/firehose/commit_event.rs
··· 520 520 .map_err(Into::<crate::storage::StorageError>::into)?; 521 521 522 522 if n_born > 0 { 523 - metrics::counter!("lightrail_collection_births_total").increment(n_born); 523 + metrics::counter!("lightrail_collection_births_total", "source" => "firehose") 524 + .increment(n_born); 524 525 } 525 526 if n_died > 0 { 526 - metrics::counter!("lightrail_collection_deaths_total").increment(n_died); 527 + metrics::counter!("lightrail_collection_deaths_total", "source" => "firehose") 528 + .increment(n_died); 527 529 } 528 530 metrics::counter!("lightrail_commits_indexed_total").increment(1); 529 531
+9 -1
src/sync/resync/mod.rs
··· 147 147 148 148 let db = db.clone(); 149 149 let collections = snapshot.collections; 150 - tokio::task::spawn_blocking(move || { 150 + let (n_inserted, n_removed) = tokio::task::spawn_blocking(move || { 151 151 crate::storage::collection_index::sync_collections(&db, did, &collections) 152 152 }) 153 153 .await 154 154 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 155 + if n_inserted > 0 { 156 + metrics::counter!("lightrail_collection_births_total", "source" => "resync") 157 + .increment(n_inserted as u64); 158 + } 159 + if n_removed > 0 { 160 + metrics::counter!("lightrail_collection_deaths_total", "source" => "resync") 161 + .increment(n_removed as u64); 162 + } 155 163 Ok(()) 156 164 } 157 165