Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

oops actually trim all dirty collections

phil faf1b8b2 f32b1932

+12 -7
+12 -7
ufos/src/storage_fjall.rs
··· 511 511 timelies: impl Iterator<Item = Result<(fjall::Slice, fjall::Slice), fjall::Error>>, 512 512 cursor_exclusive_limit: Option<Cursor>, 513 513 rollup_limit: usize, 514 - ) -> StorageResult<usize> { 514 + ) -> StorageResult<(usize, HashSet<Nsid>)> { 515 515 // current strategy is to buffer counts in mem before writing the rollups 516 516 // we *could* read+write every single batch to rollup.. but their merge is associative so 517 517 // ...so save the db some work up front? is this worth it? who knows... 518 + 519 + let mut dirty_nsids = HashSet::new(); 518 520 519 521 #[derive(Eq, Hash, PartialEq)] 520 522 enum Rollup { ··· 542 544 { 543 545 break; 544 546 } 547 + 548 + dirty_nsids.insert(key.collection().clone()); 545 549 546 550 batch.remove(&self.rollups, key_bytes); 547 551 let val = db_complete::<CountsValue>(&val_bytes)?; ··· 608 612 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?; 609 613 610 614 batch.commit()?; 611 - Ok(cursors_advanced) 615 + Ok((cursors_advanced, dirty_nsids)) 612 616 } 613 617 } 614 618 ··· 734 738 let cursors_stepped = match (timely_next, next_delete) { 735 739 (Some(timely), Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 736 740 if timely.cursor() < delete_cursor { 737 - let n = self.rollup_live_counts( 741 + let (n, dirty) = self.rollup_live_counts( 738 742 timely_iter, 739 743 Some(delete_cursor), 740 744 MAX_BATCHED_ROLLUP_COUNTS, 741 745 )?; 742 - dirty_nsids.insert(timely.collection().clone()); 746 + dirty_nsids.extend(dirty); 743 747 n 744 748 } else { 745 749 self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 746 750 } 747 751 } 748 - (Some(timely), None) => { 749 - let n = self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?; 750 - dirty_nsids.insert(timely.collection().clone()); 752 + (Some(_), None) => { 753 + let (n, dirty) = 754 + self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)?; 755 + dirty_nsids.extend(dirty); 751 756 n 752 757 } 753 758 (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => {