tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(server): add migration to miniz

dusk ce794112 57a327ab

+74 -11
+7
server/Cargo.lock
··· 1353 1353 "guardian", 1354 1354 "interval-heap", 1355 1355 "log", 1356 + "lz4_flex", 1356 1357 "miniz_oxide", 1357 1358 "path-absolutize", 1358 1359 "quick_cache", ··· 1363 1364 "varint-rs", 1364 1365 "xxhash-rust", 1365 1366 ] 1367 + 1368 + [[package]] 1369 + name = "lz4_flex" 1370 + version = "0.11.3" 1371 + source = "registry+https://github.com/rust-lang/crates.io-index" 1372 + checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" 1366 1373 1367 1374 [[package]] 1368 1375 name = "matchers"
+1 -1
server/Cargo.toml
··· 12 12 axum = { version = "0.8", features = ["json", "ws"] } 13 13 tower-http = {version = "0.6", features = ["request-id"]} 14 14 atproto-jetstream = "0.9" 15 - fjall = { version = "2", default-features = false, features = ["miniz"] } 15 + fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] } 16 16 rkyv = {version = "0.8", features = ["unaligned"]} 17 17 smol_str = { version = "0.3", features = ["serde"] } 18 18 papaya = "0.2"
+33 -8
server/src/db.rs
··· 1 + use std::{ops::Deref, path::Path}; 2 + 1 3 use atproto_jetstream::JetstreamEvent; 2 4 use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 3 5 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; ··· 21 23 } 22 24 23 25 pub struct EventRecord { 24 - nsid: SmolStr, 25 - timestamp: u64, 26 - deleted: bool, 26 + pub nsid: SmolStr, 27 + pub timestamp: u64, 28 + pub deleted: bool, 27 29 } 28 30 29 31 impl EventRecord { ··· 58 60 } 59 61 60 62 impl Db { 61 - pub fn new() -> AppResult<Self> { 63 + pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 62 64 tracing::info!("opening db..."); 63 - let ks = Config::default() 65 + let ks = Config::new(path) 64 66 .cache_size(8 * 1024 * 1024) // from talna 65 67 .open()?; 66 68 Ok(Self { ··· 79 81 } 80 82 81 83 #[inline(always)] 82 - fn run_in_nsid_tree( 84 + fn run_in_nsid_tree<T>( 83 85 &self, 84 86 nsid: &str, 85 - f: impl FnOnce(&Partition) -> AppResult<()>, 86 - ) -> AppResult<()> { 87 + f: impl FnOnce(&Partition) -> AppResult<T>, 88 + ) -> AppResult<T> { 87 89 f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || { 88 90 let opts = PartitionCreateOptions::default() 89 91 .compression(fjall::CompressionType::Miniz(9)) ··· 155 157 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 156 158 ) 157 159 }) 160 + }) 161 + } 162 + 163 + pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> { 164 + self.inner 165 + .list_partitions() 166 + .into_iter() 167 + .filter(|k| k.deref() != "_counts") 168 + } 169 + 170 + pub fn get_hits( 171 + &self, 172 + nsid: &str, 173 + ) -> AppResult<impl Iterator<Item = AppResult<(u64, NsidHit)>>> { 174 + self.run_in_nsid_tree(nsid, |tree| { 175 + Ok(tree.iter().map(|res| { 176 + res.map_err(AppError::from).map(|(key, val)| { 177 + ( 178 + u64::from_be_bytes(key.as_ref().try_into().unwrap()), 179 + unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 180 + ) 181 + }) 182 + })) 158 183 }) 159 184 } 160 185 }
+33 -2
server/src/main.rs
··· 1 - use std::sync::Arc; 1 + use std::{ops::Deref, sync::Arc}; 2 2 3 3 use atproto_jetstream::{CancellationToken, Consumer, EventHandler, JetstreamEvent}; 4 + use smol_str::ToSmolStr; 4 5 #[cfg(not(target_env = "msvc"))] 5 6 use tikv_jemallocator::Jemalloc; 6 7 use tokio::sync::mpsc::{Receiver, Sender}; ··· 49 50 async fn main() { 50 51 tracing_subscriber::fmt::fmt().compact().init(); 51 52 52 - let db = Arc::new(Db::new().expect("couldnt create db")); 53 + if std::env::args() 54 + .nth(1) 55 + .map_or(false, |arg| arg == "migrate") 56 + { 57 + migrate_to_miniz(); 58 + return; 59 + } 60 + 61 + let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db")); 53 62 54 63 tokio::fs::write("./bsky_zstd_dictionary", BSKY_ZSTD_DICT) 55 64 .await ··· 95 104 96 105 serve(db).await; 97 106 } 107 + 108 + fn migrate_to_miniz() { 109 + let from = Db::new(".fjall_data").expect("couldnt create db"); 110 + let to = Db::new(".fjall_data_miniz").expect("couldnt create db"); 111 + 112 + let mut total_count = 0_u64; 113 + for nsid in from.get_nsids() { 114 + tracing::info!("migrating {} ...", nsid.deref()); 115 + for hit in from.get_hits(&nsid).expect("cant read hits") { 116 + let (timestamp, data) = hit.expect("cant read event"); 117 + to.record_event(EventRecord { 118 + nsid: nsid.to_smolstr(), 119 + timestamp, 120 + deleted: data.deleted, 121 + }) 122 + .expect("cant record event"); 123 + total_count += 1; 124 + } 125 + } 126 + 127 + tracing::info!("migrated {total_count} events!"); 128 + }