very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[db] compact manually if we pruned enough events

dawn 89ecab45 06755fd1

+41 -2
+4
Cargo.toml
··· 74 74 [profile.bench] 75 75 opt-level = 3 76 76 lto = "thin" 77 + 78 + [profile.release] 79 + lto = "thin" 80 + codegen-units = 1
+37 -2
src/db/ephemeral.rs
··· 4 4 fjall::Keyspace, 5 5 miette::{IntoDiagnostic, WrapErr}, 6 6 std::sync::Arc, 7 - std::sync::atomic::Ordering, 7 + std::sync::atomic::{AtomicU64, Ordering}, 8 8 std::time::Duration, 9 - tracing::{debug, error, info}, 9 + tracing::{debug, error, info, warn}, 10 10 }; 11 11 12 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 13 + const AUTO_COMPACT_PRUNED_SEQ_INTERVAL: u64 = 250_000; 14 + #[cfg(feature = "indexer_stream")] 15 + static LAST_EVENTS_COMPACTED_SEQ: AtomicU64 = AtomicU64::new(0); 16 + #[cfg(feature = "relay")] 17 + static LAST_RELAY_EVENTS_COMPACTED_SEQ: AtomicU64 = AtomicU64::new(0); 18 + 12 19 #[cfg(feature = "indexer_stream")] 13 20 pub fn ephemeral_ttl_worker(state: Arc<crate::state::AppState>) { 14 21 info!("ephemeral TTL worker started"); ··· 41 48 keys::event_watermark_key, 42 49 &db.events, 43 50 current_seq, 51 + &LAST_EVENTS_COMPACTED_SEQ, 44 52 ) 45 53 } 46 54 ··· 54 62 keys::relay_event_watermark_key, 55 63 &db.relay_events, 56 64 current_seq, 65 + &LAST_RELAY_EVENTS_COMPACTED_SEQ, 57 66 ) 58 67 } 59 68 ··· 65 74 watermark_key: fn(u64) -> Vec<u8>, 66 75 events_ks: &Keyspace, 67 76 current_seq: u64, 77 + last_compacted_seq: &AtomicU64, 68 78 ) -> miette::Result<()> { 69 79 let now = chrono::Utc::now().timestamp() as u64; 70 80 let cutoff_ts = now.saturating_sub(ttl.as_secs()); ··· 147 157 info!(pruned, "pruned old events"); 148 158 } else { 149 159 debug!("no events were pruned"); 160 + } 161 + 162 + let pruned_since_compaction = 163 + cutoff_seq.saturating_sub(last_compacted_seq.load(Ordering::Relaxed)); 164 + if pruned_since_compaction >= AUTO_COMPACT_PRUNED_SEQ_INTERVAL { 165 + let compact_res = events_ks 166 + .rotate_memtable_and_wait() 167 + .into_diagnostic() 168 + .wrap_err("failed to rotate memtable before TTL compaction") 169 + .and_then(|_| { 170 + events_ks 171 + .compact(Arc::new(fjall::compaction::Leveled::default())) 172 + .into_diagnostic() 173 + .wrap_err("failed TTL-triggered keyspace compaction") 174 + }); 175 + 176 + if let Err(err) = compact_res { 177 + warn!(err = %err, "TTL-triggered compaction failed"); 178 + } else { 179 + last_compacted_seq.fetch_max(cutoff_seq, Ordering::Relaxed); 180 + info!( 181 + pruned_since_compaction, 182 + "completed TTL-triggered keyspace compaction" 183 + ); 184 + } 150 185 } 151 186 152 187 Ok(())