Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

reduce fjall mem

authored by

phil and committed by tangled.org b9b4da67 c29117a6

+9 -14
+3 -1
examples/disk-read-file/main.rs
··· 5 5 extern crate repo_stream; 6 6 7 7 use mimalloc::MiMalloc; 8 - 9 8 #[global_allocator] 10 9 static GLOBAL: MiMalloc = MiMalloc; 11 10 ··· 58 57 // at this point you might want to fetch the account's signing key 59 58 // via the DID from the commit, and then verify the signature. 60 59 log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 60 + 61 + // log::info!("now is good time to check mem usage..."); 62 + // tokio::time::sleep(std::time::Duration::from_secs(15)).await; 61 63 62 64 // pop the driver back out to get some code indentation relief 63 65 driver
+5 -12
src/disk.rs
··· 19 19 20 20 use crate::Bytes; 21 21 use crate::drive::DriveError; 22 - use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 23 - use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 24 23 use std::path::PathBuf; 25 24 26 25 #[derive(Debug, thiserror::Error)] ··· 111 110 let max_stored = max_stored_mb * 2_usize.pow(20); 112 111 let (db, partition) = tokio::task::spawn_blocking(move || { 113 112 let db = Database::builder(path) 114 - // .manual_journal_persist(true) 115 - // .flush_workers(1) 116 - // .compaction_workers(1) 117 - .journal_compression(CompressionType::None) 118 - .cache_size(cache_mb as u64 * 2_u64.pow(20)) 113 + .manual_journal_persist(true) 114 + .worker_threads(1) 115 + .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2) 119 116 .temporary(true) 120 117 .open()?; 121 118 let opts = KeyspaceCreateOptions::default() 122 - .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 123 - .filter_block_pinning_policy(PinningPolicy::disabled()) 124 119 .expect_point_read_hits(true) 125 - .data_block_compression_policy(CompressionPolicy::disabled()) 126 - .manual_journal_persist(true) 127 - .max_memtable_size(32 * 2_u64.pow(20)); 120 + .max_memtable_size(16 * 2_u64.pow(20)); 128 121 let partition = db.keyspace("z", || opts)?; 129 122 130 123 Ok::<_, DiskError>((db, partition))
+1 -1
src/drive.rs
··· 376 376 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 377 377 mem_size += maybe_processed.len(); 378 378 chunk.push((cid, maybe_processed)); 379 - if mem_size >= self.max_size { 379 + if mem_size >= (self.max_size / 2) { 380 380 // soooooo if we're setting the db cache to max_size and then letting 381 381 // multiple chunks in the queue that are >= max_size, then at any time 382 382 // we might be using some multiple of max_size?