Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

keep a trim cursor to avoid iterating prev trims

takes it from ~600ms per trim to ~3ms on my laptop

phil 974a82d7 59524c35

+48 -16
+32 -16
ufos/src/storage_fjall.rs
··· 7 7 JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 8 8 NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 9 9 RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue, 10 - WeekTruncatedCursor, WeeklyRollupKey, 10 + TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyRollupKey, 11 11 }; 12 12 use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord}; 13 13 use async_trait::async_trait; ··· 46 46 /// - key: "rollup_cursor" (literal) 47 47 /// - val: u64 (tracks behind js_cursor) 48 48 /// 49 + /// - Feed trim cursor (bg work: delete oldest excess records) 50 + /// - key: "trim_cursor" || nullstr (nsid) 51 + /// - val: u64 (earliest previously-removed feed entry jetstream cursor) 49 52 /// 50 53 /// Partition: 'feed' 51 54 /// ··· 756 759 Ok((cursors_stepped, dirty_nsids)) 757 760 } 758 761 759 - fn trim_collection( 760 - &mut self, 761 - collection: &Nsid, 762 - limit: usize, 763 - // TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end) 764 - ) -> StorageResult<()> { 762 + fn trim_collection(&mut self, collection: &Nsid, limit: usize) -> StorageResult<()> { 765 763 let mut dangling_feed_keys_cleaned = 0; 766 764 let mut records_deleted = 0; 767 765 766 + let feed_trim_cursor_key = 767 + TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?; 768 + let trim_cursor = self 769 + .global 770 + .get(&feed_trim_cursor_key)? 771 + .map(|value_bytes| db_complete(&value_bytes)) 772 + .transpose()? 773 + .unwrap_or(Cursor::from_start()); 774 + 775 + let live_range = 776 + NsidRecordFeedKey::from_pair(collection.clone(), trim_cursor).range_to_prefix_end()?; 777 + 778 + let mut live_records_found = 0; 779 + let mut latest_expired_feed_cursor = None; 768 780 let mut batch = self.keyspace.batch(); 769 - 770 - let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 771 - let mut found = 0; 772 - for kv in self.feeds.prefix(prefix).rev() { 781 + for kv in self.feeds.range(live_range).rev() { 773 782 let (key_bytes, val_bytes) = kv?; 774 783 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 775 784 let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; ··· 805 814 batch = self.keyspace.batch(); 806 815 } 807 816 808 - found += 1; 809 - if found <= limit { 817 + live_records_found += 1; 818 + if live_records_found <= limit { 810 819 continue; 820 + } else if latest_expired_feed_cursor.is_none() { 821 + latest_expired_feed_cursor = Some(feed_key.cursor()); 822 + batch.insert( 823 + &self.global, 824 + &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?, 825 + &feed_key.cursor().to_db_bytes()?, 826 + ); 811 827 } 812 828 813 829 batch.remove(&self.feeds, key_bytes); ··· 846 862 async fn run(mut self) -> StorageResult<()> { 847 863 let mut dirty_nsids = HashSet::new(); 848 864 849 - let mut rollup = tokio::time::interval(Duration::from_millis(42)); 865 + let mut rollup = tokio::time::interval(Duration::from_millis(240)); 850 866 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 851 867 852 868 let mut trim = tokio::time::interval(Duration::from_millis(3_000)); 853 - trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 869 + trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 854 870 855 871 loop { 856 872 tokio::select! { 857 873 _ = rollup.tick() => { 858 874 let (n, dirty) = self.0.step_rollup().inspect_err(|e| log::error!("rollup error: {e:?}"))?; 859 875 dirty_nsids.extend(dirty); 860 - log::info!("rolled up {n} items {dirty_nsids:?} ({} collections now dirty)", dirty_nsids.len()); 876 + log::info!("rolled up {n} items ({} collections now dirty)", dirty_nsids.len()); 861 877 }, 862 878 _ = trim.tick() => { 863 879 log::info!("trimming {} nsids: {dirty_nsids:?}", dirty_nsids.len());
+16
ufos/src/store_types.rs
··· 28 28 /// value format: [rollup_cursor(Cursor)|collection(Nsid)] 29 29 pub type NewRollupCursorValue = Cursor; 30 30 31 + #[derive(Debug, PartialEq)] 32 + pub struct _TrimCollectionStaticStr {} 33 + impl StaticStr for _TrimCollectionStaticStr { 34 + fn static_str() -> &'static str { 35 + "trim_cursor" 36 + } 37 + } 38 + type TrimCollectionCursorPrefix = DbStaticStr<_TrimCollectionStaticStr>; 39 + pub type TrimCollectionCursorKey = DbConcat<TrimCollectionCursorPrefix, Nsid>; 40 + impl TrimCollectionCursorKey { 41 + pub fn new(collection: Nsid) -> Self { 42 + Self::from_pair(Default::default(), collection) 43 + } 44 + } 45 + pub type TrimCollectionCursorValue = Cursor; 46 + 31 47 /// key format: ["js_endpoint"] 32 48 #[derive(Debug, PartialEq)] 33 49 pub struct TakeoffKey {}