very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::db::keys;
2use fjall::compaction::filter::Context;
3use lsm_tree::compaction::{CompactionFilter, Factory};
4use lsm_tree::compaction::{ItemAccessor, Verdict};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8pub struct CountsGcFilterFactory {
9 pub drop_collection_counts: bool,
10 pub delta_gc_watermark: Arc<AtomicU64>,
11}
12
13struct CountsGcFilter {
14 drop_collection_counts: bool,
15 delta_gc_watermark: Arc<AtomicU64>,
16}
17
18impl CompactionFilter for CountsGcFilter {
19 fn filter_item(&mut self, item: ItemAccessor<'_>, _: &Context) -> lsm_tree::Result<Verdict> {
20 let key = item.key();
21
22 if self.drop_collection_counts && key.starts_with(keys::COUNT_COLLECTION_PREFIX) {
23 return Ok(Verdict::Remove);
24 }
25
26 if key.starts_with(keys::COUNT_DELTA_PREFIX)
27 && let Ok((id, _)) = keys::parse_count_delta_key(key)
28 && id <= self.delta_gc_watermark.load(Ordering::Relaxed)
29 {
30 return Ok(Verdict::Destroy);
31 }
32
33 Ok(Verdict::Keep)
34 }
35}
36
37impl Factory for CountsGcFilterFactory {
38 fn name(&self) -> &str {
39 "counts_gc"
40 }
41
42 fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> {
43 Box::new(CountsGcFilter {
44 drop_collection_counts: self.drop_collection_counts,
45 delta_gc_watermark: self.delta_gc_watermark.clone(),
46 })
47 }
48}