Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

plc_rocksdb: try the native rocksdb compression subsystem

geesawra 4b06157b 4e33100a

+63 -23
+63 -23
src/plc_rocksdb.rs
··· 1 - use async_compression::tokio::write::{GzipDecoder, GzipEncoder}; 2 - use futures::{TryFutureExt, stream}; 3 - use rocksdb::Options; 1 + use bytevec::BVSize; 2 + use futures::stream; 3 + use rocksdb::{BlockBasedOptions, DBCompressionType, Options, SliceTransform}; 4 4 use std::{path::PathBuf, sync::Arc, time::Instant}; 5 - use tokio::{ 6 - io::AsyncWriteExt, 7 - sync::{Mutex, mpsc, oneshot}, 8 - }; 5 + use tokio::sync::{Mutex, mpsc, oneshot}; 9 6 use tokio_stream::StreamExt; 10 7 11 8 use crate::{ ··· 25 22 opts.set_max_open_files(1000); 26 23 opts.create_missing_column_families(true); 27 24 25 + opts.create_if_missing(true); 26 + 27 + // === WRITE OPTIMIZATION === 28 + 29 + // Larger write buffers = fewer flushes = better write throughput 30 + opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB (up from 64MB) 31 + opts.set_max_write_buffer_number(6); // 6 buffers before write stall 32 + opts.set_min_write_buffer_number_to_merge(2); // Merge 2 before flush 33 + 34 + // More background jobs for compaction/flush 35 + opts.set_max_background_jobs(8); // 2 flush + 6 compaction threads 36 + 37 + // Increase L0 file limit before slowing writes 38 + opts.set_level_zero_file_num_compaction_trigger(4); // Default is 4 39 + opts.set_level_zero_slowdown_writes_trigger(20); // Default is 20 40 + opts.set_level_zero_stop_writes_trigger(36); // Default is 36 41 + 42 + // === COMPRESSION STRATEGY === 43 + 44 + // Fast compression on hot levels (L0, L1 get most writes) 45 + opts.set_compression_per_level(&[ 46 + DBCompressionType::Lz4, // L0 - prioritize speed 47 + DBCompressionType::Lz4, // L1 - prioritize speed 48 + DBCompressionType::Zstd, // L2+ - balance 49 + DBCompressionType::Zstd, 50 + DBCompressionType::Zstd, 51 + DBCompressionType::Zstd, 52 + DBCompressionType::Zstd, 53 + ]); 54 + 55 + // Moderate Zstd level (3-6 range is good for write-heavy) 56 + opts.set_compression_options(-1, 4, 0, 4 * 1024 * 1024); 57 + opts.set_zstd_max_train_bytes(500 * 1024 * 1024); 58 + 59 + // === KEY/READ OPTIMIZATION === 60 + 61 + // Your 8-byte uint64 prefix 62 + opts.set_prefix_extractor(SliceTransform::create_fixed_prefix( 63 + u64::MAX.to_string().len() + 1, 64 + )); 65 + let mut block_opts = BlockBasedOptions::default(); 66 + 67 + block_opts.set_block_size(32 * 1024); // 32KB for JSON 68 + 69 + // Cache for your 100 reads/sec (not too large since reads are rare) 70 + let cache = rocksdb::Cache::new_lru_cache(512 * 1024 * 1024); // 512MB 71 + block_opts.set_block_cache(&cache); 72 + 73 + opts.set_block_based_table_factory(&block_opts); 74 + 28 75 let rdb = rocksdb::DB::open_cf(&opts, path, vec!["dids", "ops"])?; 29 76 30 77 Ok(RocksDatastore { ··· 120 167 pub cid: String, 121 168 pub created_at: i64, 122 169 pub nullified: bool, 123 - pub operation: Vec<u8>, 170 + pub operation: String, 124 171 } 125 172 126 173 impl RocksOp { 127 174 async fn from_op(op: &Op) -> Self { 128 - let op_str = op.operation.to_string().into_bytes(); 129 - let mut gz = GzipEncoder::new(Vec::new()); 130 - gz.write_all(&op_str).await.unwrap(); 131 - gz.flush().await.unwrap(); 132 - let op_gz = gz.into_inner(); 133 - 134 175 Self { 135 - operation: op_gz, 176 + operation: op.operation.to_string(), 136 177 did: op.did.clone(), 137 178 cid: op.cid.clone(), 138 179 created_at: op.created_at.timestamp_millis(), ··· 141 182 } 142 183 143 184 async fn to_op(&self) -> Op { 144 - let mut dec_op = Vec::new(); 145 - let mut dec = GzipDecoder::new(&mut dec_op); 146 - dec.write_all(&self.operation).await.unwrap(); 147 - dec.flush().await.unwrap(); 148 - let dec_op = String::from_utf8(dec_op).unwrap(); 149 - 150 185 Op { 151 186 did: self.did.clone(), 152 187 cid: self.cid.clone(), 153 188 created_at: Dt::from_timestamp_millis(self.created_at).unwrap(), 154 189 nullified: self.nullified, 155 - operation: serde_json::value::RawValue::from_string(dec_op).unwrap(), 190 + operation: serde_json::value::RawValue::from_string(self.operation.clone()).unwrap(), 156 191 } 157 192 } 158 193 } 159 194 160 195 fn op_key(op: &Op, id: u64) -> String { 161 - format!("{}_{}", id, op.cid) 196 + format!( 197 + "{:0width$}_{}", 198 + id, 199 + op.cid, 200 + width = u64::MAX.to_string().len(), 201 + ) 162 202 } 163 203 164 204 const LATEST_TIMESTAMP_KEY: &str = "latest_timestamp";