A public mirror for the whole atmosphere hubble.microcosm.blue
27
fork

Configure Feed

Select the types of activity you want to include in your feed.

try to go a lil faster

phil 7e9d71e2 dec81837

+37 -12
+3
space-efficiency-check/Cargo.toml
··· 12 12 tokio = { workspace = true, features = ["full"] } 13 13 tracing = { workspace = true } 14 14 tracing-subscriber = { workspace = true } 15 + 16 + [profile.release] 17 + debug = true
+18 -3
space-efficiency-check/src/main.rs
··· 1 1 use clap::Parser; 2 - use rust_rocksdb::{DB, Options}; 2 + use rust_rocksdb::{DB, DBCompressionType, Options}; 3 3 4 4 use std::path::{Path, PathBuf}; 5 5 use std::sync::{Arc, atomic::Ordering}; ··· 32 32 // we don't need durability 33 33 opts.set_manual_wal_flush(true); 34 34 35 - // best compression option 36 - opts.set_compression_type(rust_rocksdb::DBCompressionType::Zstd); 35 + // compress lower levels more 36 + opts.set_compression_per_level(&[ 37 + DBCompressionType::Lz4, // L0 38 + DBCompressionType::Lz4, // L1 39 + DBCompressionType::Zstd, // L2 40 + DBCompressionType::Zstd, // L3 41 + DBCompressionType::Zstd, // L4 42 + DBCompressionType::Zstd, // L5 43 + DBCompressionType::Zstd, // L6 44 + ]); 45 + opts.set_bottommost_compression_type(rust_rocksdb::DBCompressionType::Zstd); 46 + 47 + // write into new buffer while flushing? 48 + opts.set_max_write_buffer_number(4); 49 + 50 + // try to speed up compaction? 51 + opts.set_max_background_jobs(8); 37 52 38 53 // larger write buffer for less frequent flushes 39 54 opts.set_write_buffer_size(64 * 1024 * 1024);
+16 -9
space-efficiency-check/src/work.rs
··· 47 47 mem_limit_mb: usize, 48 48 ) -> Result<Stats, ProcessError> { 49 49 let stats = Arc::new(Stats::default()); 50 - let (tx, rx) = async_channel::bounded(workers * 2); 50 + let (tx, rx) = async_channel::bounded(1024); 51 51 let start = Instant::now(); 52 52 53 53 let mut set = JoinSet::new(); ··· 96 96 stats: Arc<Stats>, 97 97 mem_limit_mb: usize, 98 98 ) { 99 - let driver = DriverBuilder::new().with_mem_limit_mb(mem_limit_mb); 100 - 101 99 while let Ok(path) = rx.recv().await { 102 - if let Err(e) = process_car(&path, &driver, &db, &stats).await { 100 + if let Err(e) = process_car(&path, mem_limit_mb, db.clone(), &stats).await { 103 101 warn!(?path, %e, "failed to process CAR"); 104 102 stats.failed_repos.fetch_add(1, Ordering::Relaxed); 105 103 } ··· 108 106 109 107 async fn process_car( 110 108 path: &Path, 111 - driver: &DriverBuilder, 112 - db: &DB, 109 + mem_limit_mb: usize, 110 + db: Arc<DB>, 113 111 stats: &Stats, 114 112 ) -> Result<(), ProcessError> { 115 113 let file = tokio::fs::File::open(path).await?; 116 114 let reader = BufReader::new(file); 117 115 118 - let mut car = driver 116 + // try first with a small limit (async-safe) 117 + let mut car = DriverBuilder::new() 118 + .with_mem_limit_mb(mem_limit_mb) 119 119 .load_car(reader) 120 120 .await 121 121 .map_err(|e| ProcessError::Load(e.to_string()))?; ··· 123 123 let did = car.commit.did.clone(); 124 124 let mut record_count: u64 = 0; 125 125 126 - const CHUNK_SIZE: usize = 8192; 126 + const CHUNK_SIZE: usize = 32768; 127 127 128 128 while let Some(chunk) = car.next_chunk_strict(CHUNK_SIZE)? { 129 129 record_count += chunk.len() as u64; ··· 133 133 let key = format!("{did}/{}", output.key); 134 134 batch.put(key.as_bytes(), &output.data); 135 135 } 136 - db.write(&batch)?; 136 + let db = db.clone(); 137 + tokio::task::spawn_blocking(move || { 138 + let mut write_opts = rust_rocksdb::WriteOptions::default(); 139 + write_opts.disable_wal(true); 140 + db.write_opt(&batch, &write_opts) 141 + }) 142 + .await 143 + .expect("write batch not to joinerror")?; 137 144 } 138 145 139 146 if record_count == 0 {