lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

wire up the database: collections from firehose

phil f6913a96 b485b6f6

+39 -22
+10
src/storage/repo.rs
··· 293 293 .transpose() 294 294 } 295 295 296 + /// Add a [`RepoPrev`] write to an existing batch. 297 + pub fn put_prev_into( 298 + batch: &mut fjall::OwnedWriteBatch, 299 + db: &DbRef, 300 + did: Did<'_>, 301 + prev: &RepoPrev, 302 + ) { 303 + batch.insert(&db.ks, prev_key(did), encode_repo_prev(prev)); 304 + } 305 + 296 306 /// Write the transient [`RepoPrev`] for `did`. 297 307 pub fn put_prev(db: &DbRef, did: Did<'_>, prev: &RepoPrev) -> StorageResult<()> { 298 308 let key = prev_key(did);
+29 -22
src/sync/firehose/commit_event.rs
··· 476 476 } 477 477 } 478 478 479 - // All checks passed — update the chain tip for next-commit validation. 480 - storage::repo::put_prev( 481 - db, 482 - did.clone(), 483 - &RepoPrev { 484 - rev, 485 - prev_data: new_mst_root_bytes, 486 - }, 487 - )?; 488 - 489 479 if !born.is_empty() { 490 - let born = born 491 - .into_iter() 492 - .map(|b| b.to_string()) 480 + let names = born 481 + .iter() 482 + .map(|n| n.as_str()) 493 483 .collect::<Vec<_>>() 494 484 .join(", "); 495 - info!(did = %did, born, "first records for") 485 + info!(did = %did, collections = names, "collection birth"); 496 486 } 497 - 498 487 if !died.is_empty() { 499 - let died = died 500 - .into_iter() 501 - .map(|b| b.to_string()) 488 + let names = died 489 + .iter() 490 + .map(|n| n.as_str()) 502 491 .collect::<Vec<_>>() 503 492 .join(", "); 504 - info!(did = %did, died, "final records for") 493 + info!(did = %did, collections = names, "collection death"); 505 494 } 506 495 507 - // TODO: in a batched write, add any new collections to the index for the 508 - // did (born), and remove any fully-deleted collections (died). 509 - // let _ = (born, died); 496 + // All checks passed — atomically update the chain tip and the collection 497 + // index (born → insert, died → remove). 498 + let mut batch = db.database.batch(); 499 + storage::repo::put_prev_into( 500 + &mut batch, 501 + db, 502 + did.clone(), 503 + &RepoPrev { 504 + rev, 505 + prev_data: new_mst_root_bytes, 506 + }, 507 + ); 508 + for coll in born { 509 + storage::collection_index::insert_into(&mut batch, db, did.clone(), coll); 510 + } 511 + for coll in died { 512 + storage::collection_index::remove_into(&mut batch, db, did.clone(), coll); 513 + } 514 + batch 515 + .commit() 516 + .map_err(Into::<crate::storage::StorageError>::into)?; 510 517 511 518 Ok(()) 512 519 }