very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 48 lines 1.4 kB view raw
1use crate::db::keys; 2use fjall::compaction::filter::Context; 3use lsm_tree::compaction::{CompactionFilter, Factory}; 4use lsm_tree::compaction::{ItemAccessor, Verdict}; 5use std::sync::Arc; 6use std::sync::atomic::{AtomicU64, Ordering}; 7 8pub struct CountsGcFilterFactory { 9 pub drop_collection_counts: bool, 10 pub delta_gc_watermark: Arc<AtomicU64>, 11} 12 13struct CountsGcFilter { 14 drop_collection_counts: bool, 15 delta_gc_watermark: Arc<AtomicU64>, 16} 17 18impl CompactionFilter for CountsGcFilter { 19 fn filter_item(&mut self, item: ItemAccessor<'_>, _: &Context) -> lsm_tree::Result<Verdict> { 20 let key = item.key(); 21 22 if self.drop_collection_counts && key.starts_with(keys::COUNT_COLLECTION_PREFIX) { 23 return Ok(Verdict::Remove); 24 } 25 26 if key.starts_with(keys::COUNT_DELTA_PREFIX) 27 && let Ok((id, _)) = keys::parse_count_delta_key(key) 28 && id <= self.delta_gc_watermark.load(Ordering::Relaxed) 29 { 30 return Ok(Verdict::Destroy); 31 } 32 33 Ok(Verdict::Keep) 34 } 35} 36 37impl Factory for CountsGcFilterFactory { 38 fn name(&self) -> &str { 39 "counts_gc" 40 } 41 42 fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> { 43 Box::new(CountsGcFilter { 44 drop_collection_counts: self.drop_collection_counts, 45 delta_gc_watermark: self.delta_gc_watermark.clone(), 46 }) 47 } 48}