lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

split into two keyspaces

scan-heavy one with big blocks and compression etc

phil 362c4b79 fad4608b

+33 -12
+14 -10
src/storage/collection_index.rs
··· 113 113 let prefix_len = prefix.len(); 114 114 115 115 let lower_did = cursor.map(rbc_suffix).unwrap_or(vec![]); 116 - let mut ranger = db.ks.range(prefixed_range(prefix.clone(), lower_did..)); 116 + let mut ranger = db 117 + .index_ks 118 + .range(prefixed_range(prefix.clone(), lower_did..)); 117 119 118 120 let mut dids = Vec::with_capacity(limit); 119 121 for guard in ranger.by_ref() { ··· 158 160 }; 159 161 160 162 let mut cols = Vec::with_capacity(limit); 161 - for guard in db.ks.range(start_key..) { 163 + for guard in db.index_ks.range(start_key..) { 162 164 let (k, _v) = guard.into_inner()?; 163 165 if !k.starts_with(&prefix) { 164 166 break; ··· 190 192 did: &Did<'_>, 191 193 collection: Nsid<'_>, 192 194 ) { 193 - batch.insert(&db.ks, rbc(collection.clone(), did), b""); 194 - batch.insert(&db.ks, cbr(did, collection), b""); 195 + batch.insert(&db.index_ks, rbc(collection.clone(), did), b""); 196 + batch.insert(&db.index_ks, cbr(did, collection), b""); 195 197 } 196 198 197 199 /// Remove a `(collection, did)` pair from both indexes within an existing batch. ··· 204 206 did: &Did<'_>, 205 207 collection: Nsid<'_>, 206 208 ) { 207 - batch.remove(&db.ks, rbc(collection.clone(), did)); 208 - batch.remove(&db.ks, cbr(did, collection)); 209 + batch.remove(&db.index_ks, rbc(collection.clone(), did)); 210 + batch.remove(&db.index_ks, cbr(did, collection)); 209 211 } 210 212 211 213 /// Return `true` if `did` has `collection` in the cbr index. 212 214 pub fn has_collection(db: &DbRef, did: &Did<'_>, collection: Nsid<'_>) -> StorageResult<bool> { 213 - Ok(db.ks.get(cbr(did, collection))?.is_some()) 215 + Ok(db.index_ks.get(cbr(did, collection))?.is_some()) 214 216 } 215 217 216 218 /// Like [`scan_rbc`] but merges results from multiple collections, ··· 238 240 let prefix = rbc_prefix(col.clone()); 239 241 let prefix_len = prefix.len(); 240 242 let lower_did = cursor.map(rbc_suffix).unwrap_or_default(); 241 - let ranger = db.ks.range(prefixed_range(prefix.clone(), lower_did..)); 243 + let ranger = db 244 + .index_ks 245 + .range(prefixed_range(prefix.clone(), lower_did..)); 242 246 let mut it: Box<dyn Iterator<Item = StorageResult<Did<'static>>>> = 243 247 Box::new(ranger.map(move |guard| -> StorageResult<Did<'static>> { 244 248 let key = guard.key()?; ··· 333 337 let prefix_len = prefix.len(); 334 338 for cbr_key in &collections { 335 339 if let Some(col) = cbr_parse_collection(cbr_key, prefix_len) { 336 - batch.remove(&db.ks, rbc(col, did)); 340 + batch.remove(&db.index_ks, rbc(col, did)); 337 341 } 338 - batch.remove(&db.ks, cbr_key.as_slice()); 342 + batch.remove(&db.index_ks, cbr_key.as_slice()); 339 343 } 340 344 Ok(collections.len()) 341 345 }
+19 -2
src/storage/mod.rs
··· 51 51 /// `Keyspace` is an individual column-family (the old `PartitionHandle`). 52 52 pub struct Db { 53 53 pub(crate) database: fjall::Database, 54 - // for now, put all keys in one keyspace 54 + /// General-purpose keyspace: repo state, queues, cursors, etc. 55 55 pub(crate) ks: fjall::Keyspace, 56 + /// Collection index keyspace: rbc + cbr ranges. 57 + /// 58 + /// Tuned for scan-heavy access: 64 KiB blocks (amortises per-block overhead 59 + /// across sequential reads) and Lz4 compression at all levels (higher 60 + /// on-disk density means more data fits in the block cache). 61 + pub(crate) index_ks: fjall::Keyspace, 56 62 } 57 63 58 64 /// Cheaply-cloneable reference to the shared database. ··· 86 92 87 93 let database = builder.open()?; 88 94 let ks = database.keyspace("default", fjall::KeyspaceCreateOptions::default)?; 89 - Ok(Arc::new(Db { database, ks })) 95 + let index_ks = database.keyspace("index", || { 96 + fjall::KeyspaceCreateOptions::default() 97 + .data_block_size_policy(fjall::config::BlockSizePolicy::all(64 * 1_024)) 98 + .data_block_compression_policy(fjall::config::CompressionPolicy::all( 99 + fjall::CompressionType::Lz4, 100 + )) 101 + })?; 102 + Ok(Arc::new(Db { 103 + database, 104 + ks, 105 + index_ks, 106 + })) 90 107 }