···106106 #[arg(long, env = "LIGHTRAIL_FJALL_CACHE_MB", default_value_t = 256)]
107107 fjall_cache_mb: u64,
108108109109+ /// Number of fjall background worker threads (flush + compaction).
110110+ /// Defaults to fjall's own heuristic (min(CPU cores, 4)).
111111+ #[arg(long, env = "LIGHTRAIL_FJALL_WORKER_THREADS")]
112112+ fjall_worker_threads: Option<usize>,
113113+109114 /// Max concurrent per-PDS listRepos workers during deep crawl.
110115 #[arg(
111116 long,
···153158 install_metrics(addr)?;
154159 }
155160156156- let db = storage::open(&args.db_path, args.fjall_cache_mb)?;
161161+ let db = storage::open(
162162+ &args.db_path,
163163+ args.fjall_cache_mb,
164164+ args.fjall_worker_threads,
165165+ )?;
157166 let client = lightrail::http::build_client(args.crawl_qps);
158167159168 let dispatcher_state: resync::dispatcher::DispatcherState = std::sync::Arc::new(
···274283 let mut interval = tokio::time::interval(Duration::from_secs(60));
275284 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
276285 while token.run(interval.tick()).await.is_some() {
286286+ // Flush the journal BufWriter to the OS. We use
287287+ // manual_journal_persist so individual writes skip this;
288288+ // doing it here batches many writes into one flush.
289289+ if let Err(e) = db.persist_journal() {
290290+ warn!(error = %e, "failed to persist journal buffer");
291291+ }
277292 if let Err(e) = storage::meta::save(&db) {
278293 warn!(error = %e, "failed to periodically save meta stats");
279294 }
+38-5
src/storage/mod.rs
···8787}
88888989impl Db {
9090+ /// Flush the journal write buffer to the OS page cache.
9191+ ///
9292+ /// With `manual_journal_persist` enabled, individual writes skip the flush;
9393+ /// call this periodically to batch many writes into a single flush.
9494+ /// Does NOT fsync — crash recovery may lose up to one flush interval of
9595+ /// writes, which is acceptable since all data can be re-fetched.
9696+ pub fn persist_journal(&self) -> StorageResult<()> {
9797+ self.database.persist(fjall::PersistMode::Buffer)?;
9898+ Ok(())
9999+ }
100100+90101 /// Collect a snapshot of fjall storage stats.
91102 pub fn storage_stats(&self) -> StorageStats {
92103 StorageStats {
···105116pub type DbRef = Arc<Db>;
106117107118/// Open (or create) the fjall database at `path` and return a shared handle.
108108-pub fn open(path: &Path, cache_mb: u64) -> StorageResult<DbRef> {
109109- open_inner(path, DbConfig::ForReal { cache_mb })
119119+///
120120+/// `worker_threads`: number of fjall background threads for flush + compaction.
121121+/// `None` uses fjall's own default (`min(cores, 4)`).
122122+pub fn open(path: &Path, cache_mb: u64, worker_threads: Option<usize>) -> StorageResult<DbRef> {
123123+ open_inner(
124124+ path,
125125+ DbConfig::ForReal {
126126+ cache_mb,
127127+ worker_threads,
128128+ },
129129+ )
110130}
111131112132enum DbConfig {
···114134 #[allow(dead_code)]
115135 Testing,
116136 /// bumpable cache for prod
117117- ForReal { cache_mb: u64 },
137137+ ForReal {
138138+ cache_mb: u64,
139139+ worker_threads: Option<usize>,
140140+ },
118141}
119142120143/// Open a temporary database that deletes itself on drop. For tests only.
···128151}
129152130153fn open_inner(path: &Path, config: DbConfig) -> StorageResult<DbRef> {
131131- let builder = fjall::Database::builder(path);
154154+ let builder = fjall::Database::builder(path).manual_journal_persist(true);
132155 let builder = match config {
133156 DbConfig::Testing => builder.temporary(true),
134134- DbConfig::ForReal { cache_mb } => builder.cache_size(cache_mb * 2_u64.pow(20)),
157157+ DbConfig::ForReal {
158158+ cache_mb,
159159+ worker_threads,
160160+ } => {
161161+ let b = builder.cache_size(cache_mb * 2_u64.pow(20));
162162+ if let Some(n) = worker_threads {
163163+ b.worker_threads(n)
164164+ } else {
165165+ b
166166+ }
167167+ }
135168 };
136169 let database = builder.open()?;
137170 let ks = database.keyspace("default", fjall::KeyspaceCreateOptions::default)?;
+5-4
src/storage/resync_buffer.rs
···66//! is newer than the fetched rev, and acks (deletes) it as it goes.
77//!
88//! Keys are written exactly once — firehose sequence numbers are unique — so
99-//! removals could use a weak/single delete to avoid full tombstones. Fjall
1010-//! 3.0.3 does not yet expose that API; switch to it when available.
99+//! removals use fjall's weak tombstone (`remove_weak`) which is annihilated
1010+//! on first compaction contact with the corresponding insert, avoiding the
1111+//! write amplification of full tombstones propagating through all LSM levels.
11121213use std::sync::atomic::Ordering;
1314···9697 let mut count: u64 = 0;
9798 for guard in db.ks.prefix(&prefix) {
9899 let (key_slice, _) = guard.into_inner()?;
9999- db.ks.remove(key_slice.as_ref())?;
100100+ db.ks.remove_weak(key_slice.as_ref())?;
100101 db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed);
101102 metrics::gauge!("lightrail_resync_buffer_depth").decrement(1);
102103 count += 1;
···113114pub fn ack_buffer_entry(db: &DbRef, did: Did<'_>, seq: u64) -> StorageResult<()> {
114115 debug!(did = did.as_str(), seq, "ack buffered event");
115116 let key = key(did, seq);
116116- db.ks.remove(key)?;
117117+ db.ks.remove_weak(key)?;
117118 db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed);
118119 metrics::gauge!("lightrail_resync_buffer_depth").decrement(1);
119120 Ok(())
+2-2
src/storage/resync_queue.rs
···235235 retry = item.retry_count,
236236 "dequeue resync"
237237 );
238238- db.ks.remove(key_bytes)?;
238238+ db.ks.remove_weak(key_bytes)?;
239239 let next_since = key_bytes
240240 .get(prefix.len()..)
241241 .expect("a resync queue key must start with the resync queue prefix");
···304304305305 // Atomically: remove from queue + write state=Resyncing.
306306 let mut batch = db.database.batch();
307307- batch.remove(&db.ks, key_bytes);
307307+ batch.remove_weak(&db.ks, key_bytes);
308308 batch.insert(&db.ks, &repo_key, repo::encode_repo_info(&new_info));
309309 batch.commit()?;
310310