tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Revert "feat(server): zstd compression"

This reverts commit 4174678bd999d13cda01cc541c10ef0a62f49bea.

dusk 9b006d33 852d8aea

+51 -178
-1
server/.gitignore
··· 1 1 target 2 2 .fjall_data* 3 - zstd_dict
+1 -5
server/Cargo.toml
··· 3 3 version = "0.1.0" 4 4 edition = "2024" 5 5 6 - [features] 7 - default = ["compress"] 8 - compress = ["dep:zstd"] 9 - 10 6 [dependencies] 11 7 anyhow = "1.0" 12 8 async-trait = "0.1" ··· 34 30 rayon = "1.10.0" 35 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 36 32 rclite = "0.2.7" 37 - zstd = { version = "0.13.3", optional = true, features = ["experimental"] } 33 + zstd = "0.13.3" 38 34 39 35 [target.'cfg(target_env = "msvc")'.dependencies] 40 36 snmalloc-rs = "0.3.8"
+16 -113
server/src/db/handle.rs
··· 14 14 use rclite::Arc; 15 15 use smol_str::SmolStr; 16 16 17 - #[cfg(feature = "compress")] 18 - use zstd::bulk::{Compressor as ZstdCompressor, Decompressor as ZstdDecompressor}; 19 - 20 17 use crate::{ 21 18 db::{EventRecord, NsidHit, block}, 22 19 error::AppResult, 23 20 utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 24 21 }; 25 22 26 - #[cfg(feature = "compress")] 27 - thread_local! { 28 - static COMPRESSOR: std::cell::RefCell<Option<ZstdCompressor<'static>>> = std::cell::RefCell::new(None); 29 - static DECOMPRESSOR: std::cell::RefCell<Option<ZstdDecompressor<'static>>> = std::cell::RefCell::new(None); 30 - } 31 - 32 - type ItemDecoder = block::ItemDecoder<Cursor<Vec<u8>>, NsidHit>; 33 - type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 23 + pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 24 + pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 34 25 pub type Item = block::Item<NsidHit>; 35 26 36 - #[derive(Clone)] 37 - pub enum Compression { 38 - None, 39 - #[cfg(feature = "compress")] 40 - Zstd(ByteView), 41 - } 42 - 43 - impl Compression { 44 - #[cfg(feature = "compress")] 45 - fn get_dict(&self) -> Option<&ByteView> { 46 - match self { 47 - Compression::None => None, 48 - Compression::Zstd(dict) => Some(dict), 49 - } 50 - } 51 - } 52 - 53 27 pub struct Block { 54 28 pub written: usize, 55 29 pub key: ByteView, ··· 62 36 buf: Arc<Mutex<Vec<EventRecord>>>, 63 37 last_insert: AtomicU64, // relaxed 64 38 eps: DefaultRateTracker, 65 - compress: Compression, 66 39 } 67 40 68 41 impl Debug for LexiconHandle { ··· 82 55 } 83 56 84 57 impl LexiconHandle { 85 - pub fn new(keyspace: &Keyspace, nsid: &str, compress: Compression) -> Self { 58 + pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 86 59 let opts = PartitionCreateOptions::default() 87 60 .block_size(1024 * 128) 88 - .compression(fjall::CompressionType::Lz4); 61 + .compression(fjall::CompressionType::Miniz(9)); 89 62 Self { 90 63 tree: keyspace.open_partition(nsid, opts).unwrap(), 91 64 nsid: nsid.into(), 92 65 buf: Default::default(), 93 66 last_insert: AtomicU64::new(0), 94 67 eps: RateTracker::new(Duration::from_secs(10)), 95 - compress, 96 68 } 97 69 } 98 70 99 - #[cfg(feature = "compress")] 100 - fn with_compressor<T>(&self, mut f: impl FnMut(&mut ZstdCompressor<'static>) -> T) -> T { 101 - COMPRESSOR.with_borrow_mut(|compressor| { 102 - if compressor.is_none() { 103 - *compressor = Some({ 104 - let mut c = ZstdCompressor::new(9).expect("cant construct zstd compressor"); 105 - c.include_checksum(false).unwrap(); 106 - if let Some(dict) = self.compress.get_dict() { 107 - c.set_dictionary(9, dict).expect("cant set dict"); 108 - } 109 - c 110 - }); 111 - } 112 - // SAFETY: this is safe because we just initialized the compressor 113 - f(unsafe { compressor.as_mut().unwrap_unchecked() }) 114 - }) 115 - } 116 - 117 - #[cfg(feature = "compress")] 118 - pub fn compress(&self, data: impl AsRef<[u8]>) -> std::io::Result<Vec<u8>> { 119 - self.with_compressor(|compressor| compressor.compress(data.as_ref())) 120 - } 121 - 122 - #[cfg(feature = "compress")] 123 - fn with_decompressor<T>(&self, mut f: impl FnMut(&mut ZstdDecompressor<'static>) -> T) -> T { 124 - DECOMPRESSOR.with_borrow_mut(|decompressor| { 125 - if decompressor.is_none() { 126 - *decompressor = Some({ 127 - let mut d = ZstdDecompressor::new().expect("cant construct zstd decompressor"); 128 - if let Some(dict) = self.compress.get_dict() { 129 - d.set_dictionary(dict).expect("cant set dict"); 130 - } 131 - d 132 - }); 133 - } 134 - // SAFETY: this is safe because we just initialized the decompressor 135 - f(unsafe { decompressor.as_mut().unwrap_unchecked() }) 136 - }) 137 - } 138 - 139 - #[cfg(feature = "compress")] 140 - pub fn decompress(&self, data: impl AsRef<[u8]>) -> std::io::Result<Vec<u8>> { 141 - self.with_decompressor(|decompressor| { 142 - decompressor.decompress(data.as_ref(), 1024 * 1024 * 20) 143 - }) 144 - } 145 - 146 71 pub fn nsid(&self) -> &SmolStr { 147 72 &self.nsid 148 73 } ··· 198 123 } 199 124 200 125 let start_blocks_size = blocks_to_compact.len(); 201 - let keys_to_delete = blocks_to_compact 202 - .iter() 203 - .map(|(key, _)| key) 204 - .cloned() 205 - .collect_vec(); 126 + let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key); 206 127 let mut all_items = 207 128 blocks_to_compact 208 - .into_iter() 129 + .iter() 209 130 .try_fold(Vec::new(), |mut acc, (key, value)| { 210 - let decoder = self.get_decoder_for(key, value)?; 131 + let mut timestamps = Cursor::new(key); 132 + let start_timestamp = timestamps.read_varint()?; 133 + let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?; 211 134 let mut items = decoder.collect::<Result<Vec<_>, _>>()?; 212 135 acc.append(&mut items); 213 136 AppResult::Ok(acc) ··· 226 149 .into_par_iter() 227 150 .map(|chunk| { 228 151 let count = chunk.len(); 229 - self.encode_block_from_items(chunk, count) 152 + Self::encode_block_from_items(chunk, count) 230 153 }) 231 154 .collect::<Result<Vec<_>, _>>()?; 232 155 let end_blocks_size = new_blocks.len(); ··· 250 173 } 251 174 252 175 pub fn encode_block_from_items( 253 - &self, 254 176 items: impl IntoIterator<Item = Item>, 255 177 count: usize, 256 178 ) -> AppResult<Block> { ··· 282 204 .into()); 283 205 } 284 206 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 285 - let data = self.put_raw_block(writer.finish()?)?; 207 + let value = writer.finish()?; 286 208 let key = varints_unsigned_encoded([start_timestamp, end_timestamp]); 287 - return Ok(Block { written, key, data }); 209 + return Ok(Block { 210 + written, 211 + key, 212 + data: value, 213 + }); 288 214 } 289 215 Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into()) 290 216 } ··· 302 228 ) 303 229 }) 304 230 .collect() 305 - } 306 - 307 - pub fn get_raw_block(&self, value: Slice) -> std::io::Result<Vec<u8>> { 308 - match &self.compress { 309 - Compression::None => Ok(value.as_ref().into()), 310 - #[cfg(feature = "compress")] 311 - Compression::Zstd(_) => self.decompress(value), 312 - } 313 - } 314 - 315 - pub fn put_raw_block(&self, value: Vec<u8>) -> std::io::Result<Vec<u8>> { 316 - match &self.compress { 317 - Compression::None => Ok(value), 318 - #[cfg(feature = "compress")] 319 - Compression::Zstd(_) => self.compress(value), 320 - } 321 - } 322 - 323 - pub fn get_decoder_for(&self, key: Slice, value: Slice) -> AppResult<ItemDecoder> { 324 - let mut timestamps = Cursor::new(key); 325 - let start_timestamp = timestamps.read_varint()?; 326 - let decoder = ItemDecoder::new(Cursor::new(self.get_raw_block(value)?), start_timestamp)?; 327 - Ok(decoder) 328 231 } 329 232 }
+23 -48
server/src/db/mod.rs
··· 3 3 fmt::Debug, 4 4 io::Cursor, 5 5 ops::{Bound, Deref, RangeBounds}, 6 - path::Path, 6 + path::{Path, PathBuf}, 7 7 time::Duration, 8 8 }; 9 9 10 10 use byteview::StrView; 11 - use fjall::{Keyspace, Partition, PartitionCreateOptions}; 11 + use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 12 12 use itertools::{Either, Itertools}; 13 - use rayon::iter::{IntoParallelIterator, ParallelIterator}; 13 + use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; 14 14 use rclite::Arc; 15 15 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 16 16 use smol_str::{SmolStr, ToSmolStr}; ··· 18 18 use tokio_util::sync::CancellationToken; 19 19 20 20 use crate::{ 21 - db::handle::{Compression, LexiconHandle}, 21 + db::handle::{ItemDecoder, LexiconHandle}, 22 22 error::{AppError, AppResult}, 23 23 jetstream::JetstreamEvent, 24 24 utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded}, ··· 77 77 78 78 pub struct DbConfig { 79 79 pub ks_config: fjall::Config, 80 - #[cfg(feature = "compress")] 81 - pub dict_path: std::path::PathBuf, 82 80 pub min_block_size: usize, 83 81 pub max_block_size: usize, 84 82 pub max_last_activity: u64, ··· 100 98 fn default() -> Self { 101 99 Self { 102 100 ks_config: fjall::Config::default(), 103 - #[cfg(feature = "compress")] 104 - dict_path: "zstd_dict".parse().unwrap(), 105 101 min_block_size: 512, 106 102 max_block_size: 500_000, 107 103 max_last_activity: Duration::from_secs(10).as_nanos() as u64, ··· 120 116 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 121 117 eps: RateTracker<100>, 122 118 cancel_token: CancellationToken, 123 - compression: Compression, 124 119 } 125 120 126 121 impl Db { 127 122 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> { 128 123 tracing::info!("opening db..."); 129 124 let ks = cfg.ks_config.clone().open()?; 130 - let _compression = Compression::None; 131 - #[cfg(feature = "compress")] 132 - let dict = std::fs::File::open(&cfg.dict_path).ok().and_then(|mut f| { 133 - let meta = f.metadata().ok()?; 134 - byteview::ByteView::from_reader(&mut f, meta.len() as usize).ok() 135 - }); 136 - #[cfg(feature = "compress")] 137 - let _compression = match dict { 138 - Some(dict) => { 139 - tracing::info!( 140 - "using zstd compression with dict from {}", 141 - cfg.dict_path.to_string_lossy() 142 - ); 143 - Compression::Zstd(dict) 144 - } 145 - None => Compression::None, 146 - }; 147 125 Ok(Self { 148 126 cfg, 149 127 hits: Default::default(), ··· 158 136 event_broadcaster: broadcast::channel(1000).0, 159 137 eps: RateTracker::new(Duration::from_secs(1)), 160 138 cancel_token, 161 - compression: _compression, 162 139 }) 163 140 } 164 141 ··· 236 213 .into_par_iter() 237 214 .map(|(i, items, handle)| { 238 215 let count = items.len(); 239 - let block = handle.encode_block_from_items(items, count)?; 216 + let block = LexiconHandle::encode_block_from_items(items, count)?; 240 217 tracing::info!( 241 218 "{}: encoded block with {} items", 242 219 handle.nsid(), ··· 305 282 Some(handle) => handle.clone(), 306 283 None => { 307 284 if self.ks.partition_exists(nsid.as_ref()) { 308 - let handle = Arc::new(LexiconHandle::new( 309 - &self.ks, 310 - nsid.as_ref(), 311 - self.compression.clone(), 312 - )); 285 + let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref())); 313 286 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 314 287 handle 315 288 } else { ··· 322 295 323 296 #[inline(always)] 324 297 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> { 325 - self.hits.entry(nsid.clone()).or_insert_with(|| { 326 - Arc::new(LexiconHandle::new( 327 - &self.ks, 328 - &nsid, 329 - self.compression.clone(), 330 - )) 331 - }) 298 + self.hits 299 + .entry(nsid.clone()) 300 + .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid))) 332 301 } 333 302 334 303 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { ··· 397 366 }; 398 367 let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 399 368 let (key, value) = item?; 400 - let decoder = handle.get_decoder_for(key, value)?; 369 + let mut timestamps = Cursor::new(key); 370 + let start_timestamp = timestamps.read_varint()?; 371 + let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 401 372 acc.push(decoder.item_count()); 402 373 AppResult::Ok(acc) 403 374 })?; ··· 409 380 }) 410 381 } 411 382 412 - // train zstd dict with 1000 blocks from every lexicon 413 - #[cfg(feature = "compress")] 383 + // train zstd dict with 100 blocks from every lexicon 414 384 pub fn train_zstd_dict(&self) -> AppResult<Vec<u8>> { 415 385 let samples = self 416 386 .get_nsids() ··· 418 388 .map(|handle| { 419 389 handle 420 390 .iter() 421 - .map(move |res| { 391 + .rev() 392 + .map(|res| { 422 393 res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 423 - .and_then(|(_, value)| Ok(Cursor::new(handle.get_raw_block(value)?))) 394 + .map(|(_, value)| Cursor::new(value)) 424 395 }) 425 396 .take(1000) 426 397 }) ··· 449 420 return Either::Right(std::iter::empty()); 450 421 }; 451 422 452 - let map_block = |(key, val)| { 453 - let decoder = handle.get_decoder_for(key, val)?; 454 - let items = decoder 423 + let map_block = move |(key, val)| { 424 + let mut key_reader = Cursor::new(key); 425 + let start_timestamp = key_reader.read_varint::<u64>()?; 426 + if start_timestamp < start_limit { 427 + return Ok(None); 428 + } 429 + let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)? 455 430 .take_while(move |item| { 456 431 item.as_ref().map_or(true, |item| { 457 432 item.timestamp <= end_limit && item.timestamp >= start_limit
+11 -11
server/src/main.rs
··· 53 53 debug(); 54 54 return; 55 55 } 56 + Some("traindict") => { 57 + train_zstd_dict(); 58 + return; 59 + } 56 60 Some(x) => { 57 61 tracing::error!("unknown command: {}", x); 58 62 return; ··· 207 211 db.sync(true).expect("cant sync db"); 208 212 } 209 213 214 + fn train_zstd_dict() { 215 + let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 216 + let dict_data = db.train_zstd_dict().expect("cant train zstd dict"); 217 + std::fs::write("zstd_dict", dict_data).expect("cant save zstd dict") 218 + } 219 + 210 220 fn debug() { 211 221 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 212 222 let info = db.info().expect("cant get db info"); ··· 236 246 DbConfig::default().ks(|c| { 237 247 c.max_journaling_size(u64::MAX) 238 248 .max_write_buffer_size(u64::MAX) 239 - .compaction_workers(rayon::current_num_threads() * 4) 240 - .flush_workers(rayon::current_num_threads() * 4) 241 249 }), 242 250 CancellationToken::new(), 243 251 ) ··· 261 269 262 270 fn migrate() { 263 271 let cancel_token = CancellationToken::new(); 264 - 265 272 let from = Arc::new( 266 273 Db::new( 267 274 DbConfig::default().path(".fjall_data_from"), ··· 269 276 ) 270 277 .expect("couldnt create db"), 271 278 ); 272 - #[cfg(feature = "compress")] 273 - std::fs::write( 274 - "zstd_dict", 275 - from.train_zstd_dict().expect("cant get zstd dict"), 276 - ) 277 - .expect("cant write zstd dict"); 278 - 279 279 let to = Arc::new( 280 280 Db::new( 281 281 DbConfig::default().path(".fjall_data_to").ks(|c| { ··· 290 290 ); 291 291 292 292 let nsids = from.get_nsids().collect::<Vec<_>>(); 293 - let _eps_thread = std::thread::spawn({ 293 + let eps_thread = std::thread::spawn({ 294 294 let to = to.clone(); 295 295 move || { 296 296 loop {