Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

ufos: storage interface sketch

phil f69f19ab 7b87bd38

+266 -103
+17 -9
Cargo.lock
··· 531 531 532 532 [[package]] 533 533 name = "byteview" 534 - version = "0.5.4" 534 + version = "0.6.1" 535 535 source = "registry+https://github.com/rust-lang/crates.io-index" 536 - checksum = "7a4516a8561bff0598c45512f90ee04ed62cee2cb36839e650a0a0704d5f741f" 536 + checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5" 537 537 538 538 [[package]] 539 539 name = "bzip2-sys" ··· 1137 1137 1138 1138 [[package]] 1139 1139 name = "fjall" 1140 - version = "2.7.0" 1141 - source = "git+https://github.com/fjall-rs/fjall.git?branch=fix%2Flockless-ranges#d2102006958b0b30bdde0f7315b9b22539bb5f89" 1140 + version = "2.8.0" 1141 + source = "registry+https://github.com/rust-lang/crates.io-index" 1142 + checksum = "26b2ced3483989a62b3533c9f99054d73b527c6c0045cf22b00fe87956f1a46f" 1142 1143 dependencies = [ 1143 1144 "byteorder", 1144 1145 "byteview", ··· 2061 2062 2062 2063 [[package]] 2063 2064 name = "lsm-tree" 2064 - version = "2.7.0" 2065 - source = "git+https://github.com/fjall-rs/lsm-tree.git?branch=fix%2Flockless-ranges#c1684bdf57488a6195942fde5ea0c756dc0b6035" 2065 + version = "2.8.0" 2066 + source = "registry+https://github.com/rust-lang/crates.io-index" 2067 + checksum = "d0a63a5e98a38b51765274137d8aedfbd848da5f4d016867e186b673fcc06a8c" 2066 2068 dependencies = [ 2067 2069 "byteorder", 2068 2070 "crossbeam-skiplist", ··· 2071 2073 "guardian", 2072 2074 "interval-heap", 2073 2075 "log", 2076 + "lz4_flex", 2074 2077 "path-absolutize", 2075 2078 "quick_cache", 2076 2079 "rustc-hash 2.1.1", ··· 2090 2093 "cc", 2091 2094 "libc", 2092 2095 ] 2096 + 2097 + [[package]] 2098 + name = "lz4_flex" 2099 + version = "0.11.3" 2100 + source = "registry+https://github.com/rust-lang/crates.io-index" 2101 + checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" 2093 2102 2094 2103 [[package]] 2095 2104 name = "mach2" ··· 3863 3872 3864 3873 [[package]] 3865 3874 name = "value-log" 3866 - version = "1.7.2" 3875 + version = "1.8.0" 3867 3876 source = "registry+https://github.com/rust-lang/crates.io-index" 3868 - checksum = "d65573c63cf768179763226edb8d614d8b314130a3f50422d6d375d3947c529f" 3877 + checksum = "fd29b17c041f94e0885179637289815cd038f0c9fc19c4549d5a97017404fb7d" 3869 3878 dependencies = [ 3870 3879 "byteorder", 3871 3880 "bytes", ··· 3873 3882 "interval-heap", 3874 3883 "log", 3875 3884 "path-absolutize", 3876 - "quick_cache", 3877 3885 "rustc-hash 2.1.1", 3878 3886 "tempfile", 3879 3887 "varint-rs",
-3
Cargo.toml
··· 6 6 "jetstream", 7 7 "ufos", 8 8 ] 9 - 10 - [patch.crates-io] 11 - lsm-tree = { git = "https://github.com/fjall-rs/lsm-tree.git", branch = "fix/lockless-ranges" }
+1 -6
ufos/Cargo.toml
··· 10 10 clap = { version = "4.5.31", features = ["derive"] } 11 11 dropshot = "0.16.0" 12 12 env_logger = "0.11.7" 13 + fjall = { version = "2.8.0", features = ["lz4", "bytes"] } 13 14 jetstream = { path = "../jetstream" } 14 15 log = "0.4.26" 15 16 lsm-tree = "2.6.6" ··· 22 23 23 24 [target.'cfg(not(target_env = "msvc"))'.dependencies] 24 25 tikv-jemallocator = "0.6.0" 25 - 26 - [dependencies.fjall] 27 - git = "https://github.com/fjall-rs/fjall.git" 28 - branch = "fix/lockless-ranges" 29 - features = ["bytes", "single_writer_tx"] 30 - default-features = false
+11
ufos/src/error.rs
··· 1 1 use thiserror::Error; 2 + use crate::db_types::EncodingError; 2 3 3 4 #[derive(Debug, Error)] 4 5 pub enum FirehoseEventError { ··· 9 10 #[error("Commit event missing commit info")] 10 11 CommitEventMissingCommit, 11 12 } 13 + 14 + #[derive(Debug, Error)] 15 + pub enum StorageError { 16 + #[error("Failed to initialize: {0}")] 17 + InitError(String), 18 + #[error("Fjall error")] 19 + FjallError(#[from] fjall::Error), 20 + #[error("Bytes encoding error")] 21 + EncodingError(#[from] EncodingError), 22 + }
+237 -85
ufos/src/storage_fjall.rs
··· 8 8 use crate::{ 9 9 DeleteAccount, Did, EventBatch, Nsid, RecordKey, 10 10 }; 11 + use crate::error::StorageError; 11 12 use fjall::{ 12 13 Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, 13 14 }; ··· 33 34 #[derive(Clone)] 34 35 struct Db { 35 36 keyspace: Keyspace, 36 - partition: PartitionHandle, 37 + global: PartitionHandle, 38 + 37 39 } 38 40 39 41 /** 40 - * data format, roughly: 42 + * new data format, roughly: 43 + * 44 + * Partion: 'global' 45 + * 46 + * - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 47 + * key: "js_cursor" (literal) 48 + * val: u64 49 + * 50 + * - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 51 + * key: "js_endpoint" (literal) 52 + * val: string (URL of the instance) 53 + * 54 + * - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 55 + * key: "rollup_cursor" (literal) 56 + * val: u64 (tracks behind js_cursor) 57 + * 58 + * 59 + * Partition: 'feed' 60 + * 61 + * - Per-collection list of record references ordered by jetstream cursor 62 + * key: nullstr || u64 (collection nsid null-terminated, jetstream cursor) 63 + * val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.) 64 + * 65 + * 66 + * Partition: 'records' 67 + * 68 + * - Actual records by their atproto location 69 + * key: nullstr || nullstr || nullstr (did, collection, rkey) 70 + * val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 71 + * 72 + * 73 + * Partition: 'rollups' 74 + * 75 + * - Live (batched) records per collection 76 + * key: "live_records" || u64 || nullstr (js_cursor, nsid) 77 + * val: u64 78 + * 79 + * - Live (batched) DIDs estimate per collections 80 + * key: "live_dids" || u64 || nullstr 81 + * val: HLL (estimator) 41 82 * 42 - * Global Meta: 43 - * ["js_cursor"] => js_cursor(u64), // used as global sequence 44 - * ["js_endpoint"] => &str, // checked on startup because jetstream instance cursors are not interchangeable 45 - * ["mod_cursor"] => js_cursor(u64); 46 - * ["rollup_cursor"] => [js_cursor|collection]; // how far the rollup helper has progressed 47 - * Mod queue 48 - * ["mod_queue"|js_cursor] => one of { 49 - * DeleteAccount(did) // delete all account content older than cursor 50 - * DeleteRecord(did, collection, rkey) // delete record older than cursor 51 - * UpdateRecord(did, collection, rkey, new_record) // delete + put, but don't delete if cursor is newer 52 - * } 53 - * Collection and rollup meta: 54 - * ["seen_by_js_cursor_collection"|js_cursor|collection] => u64 // batched total, gets cleaned up by rollup 55 - * ["total_by_collection"|collection] => [u64, js_cursor] // rollup; live total requires scanning seen_by_collection after js_cursor 56 - * ["hour_by_collection"|hour(u64)|collection] => u64 // rollup from seen_by_js_cursor_collection 57 - * Samples: 58 - * ["by_collection"|collection|js_cursor] => [did|rkey|record] 59 - * ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 83 + * - Hourly total records per collection 84 + * key: "hourly_records" || u64 || nullstr (hour, nsid) 85 + * val: u64 (total count, not jetstream cursor) 60 86 * 61 - * TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy 62 - * fetching + caching on read. 87 + * - Hourly unique DIDs estimate per collection 88 + * key: "hourly_dids" || u64 || nullstr (hour, nsid) 89 + * val: HLL (estimator) 90 + * 91 + * - All-time total records per collection 92 + * key: "ever_records" || u64 || nullstr (total, nsid. yeah, total is in the *key*, and acts as a sorter. every update requires a delete+put) 93 + * val: (empty) 94 + * 95 + * - All-time total DIDs estimate per collection 96 + * key: "ever_dids" || u64 || nullstr (estimated cardinality, nsid. like ever_records) 97 + * val: HLL (estimator) 98 + * 99 + * TODO: moderation actions 100 + * TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 63 101 **/ 102 + pub trait StorageWhatever { // TODO: extract this 103 + fn init( 104 + path: impl AsRef<Path>, 105 + endpoint: &str, 106 + force_endpoint: bool, 107 + ) -> Result<(impl StoreReader, impl StoreWriter, bool), StorageError> where Self: Sized; 108 + } 109 + 110 + pub trait StoreWriter { 111 + fn insert_batch(batch: EventBatch) -> Result<(), StorageError>; 112 + } 113 + 114 + pub trait StoreReader: Clone {} 115 + 116 + pub struct FjallStorage {} 117 + impl StorageWhatever for FjallStorage { 118 + fn init( 119 + path: impl AsRef<Path>, 120 + endpoint: &str, 121 + force_endpoint: bool, 122 + ) -> Result<(impl StoreReader, impl StoreWriter, bool), StorageError> { 123 + let mut fresh = true; 124 + let keyspace = Config::new(path).fsync_ms(Some(4_000)).open()?; 125 + 126 + let global = keyspace.open_partition("global", PartitionCreateOptions::default())?; 127 + let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?; 128 + let records = keyspace.open_partition("records", PartitionCreateOptions::default())?; 129 + let rollups = keyspace.open_partition("rollups", PartitionCreateOptions::default())?; 130 + 131 + let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 132 + 133 + if js_cursor.is_some() { 134 + fresh = false; 135 + let stored_endpoint = get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 136 + 137 + let JetstreamEndpointValue(stored) = stored_endpoint 138 + .ok_or(StorageError::InitError("found cursor but missing js_endpoint, refusing to start.".to_string()))?; 139 + 140 + if stored != endpoint { 141 + if force_endpoint { 142 + log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 143 + insert_static_neu::<JetstreamEndpointKey>( 144 + &global, 145 + JetstreamEndpointValue(endpoint.to_string()), 146 + )?; 147 + } else { 148 + return Err(StorageError::InitError(format!( 149 + "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start."))); 150 + } 151 + } 152 + } else { 153 + insert_static_neu::<JetstreamEndpointKey>( 154 + &global, 155 + JetstreamEndpointValue(endpoint.to_string()), 156 + )?; 157 + } 158 + 159 + let reader = FjallReader { 160 + global: global.clone(), 161 + feeds: feeds.clone(), 162 + records: records.clone(), 163 + rollups: rollups.clone(), 164 + }; 165 + let writer = FjallWriter { keyspace, global, feeds, records, rollups }; 166 + Ok((reader, writer, fresh)) 167 + } 168 + } 169 + 170 + #[derive(Clone)] 171 + pub struct FjallReader { 172 + global: PartitionHandle, 173 + feeds: PartitionHandle, 174 + records: PartitionHandle, 175 + rollups: PartitionHandle, 176 + } 177 + 178 + impl StoreReader for FjallReader {} 179 + 180 + pub struct FjallWriter { 181 + keyspace: Keyspace, 182 + global: PartitionHandle, 183 + feeds: PartitionHandle, 184 + records: PartitionHandle, 185 + rollups: PartitionHandle, 186 + } 187 + 188 + impl StoreWriter for FjallWriter { 189 + fn insert_batch(_batch: EventBatch) -> Result<(), StorageError> { 190 + Ok(()) 191 + } 192 + } 193 + 194 + 64 195 #[derive(Clone)] 65 196 pub struct Storage { 66 197 /// horrible: gate all db access behind this to force global serialization to avoid deadlock ··· 70 201 impl Storage { 71 202 fn init_self(path: impl AsRef<Path>) -> anyhow::Result<Self> { 72 203 let keyspace = Config::new(path).fsync_ms(Some(4_000)).open()?; 73 - let partition = keyspace.open_partition( 204 + let global = keyspace.open_partition( 74 205 "default", 75 206 PartitionCreateOptions::default().compression(CompressionType::None), 76 207 )?; 77 208 Ok(Self { 78 209 db: Db { 79 210 keyspace, 80 - partition, 211 + global, 81 212 }, 82 213 }) 83 214 } ··· 131 262 132 263 // let db = &self.db; 133 264 // let keyspace = db.keyspace.clone(); 134 - // let partition = db.partition.clone(); 265 + // let global = db.global.clone(); 135 266 136 267 // let writer_t0 = Instant::now(); 137 268 // log::trace!("spawn_blocking for write batch"); 138 269 // tokio::task::spawn_blocking(move || { 139 270 // DBWriter { 140 271 // keyspace, 141 - // partition, 272 + // global, 142 273 // } 143 274 // .write_batch(event_batch, last) 144 275 // }) ··· 175 306 176 307 loop { 177 308 let keyspace = self.db.keyspace.clone(); 178 - let partition = self.db.partition.clone(); 309 + let global = self.db.global.clone(); 179 310 tokio::select! { 180 311 _ = time_to_update_events.tick() => { 181 312 log::debug!("beginning event update task"); 182 - tokio::task::spawn_blocking(move || Self::update_events(keyspace, partition)).await??; 313 + tokio::task::spawn_blocking(move || Self::update_events(keyspace, global)).await??; 183 314 log::debug!("finished event update task"); 184 315 } 185 316 _ = time_to_trim_surplus.tick() => { 186 317 log::debug!("beginning record trim task"); 187 - tokio::task::spawn_blocking(move || Self::trim_old_events(keyspace, partition)).await??; 318 + tokio::task::spawn_blocking(move || Self::trim_old_events(keyspace, global)).await??; 188 319 log::debug!("finished record trim task"); 189 320 } 190 321 _ = time_to_roll_up.tick() => { 191 322 log::debug!("beginning rollup task"); 192 - tokio::task::spawn_blocking(move || Self::roll_up_counts(keyspace, partition)).await??; 323 + tokio::task::spawn_blocking(move || Self::roll_up_counts(keyspace, global)).await??; 193 324 log::debug!("finished rollup task"); 194 325 }, 195 326 } 196 327 } 197 328 } 198 329 199 - fn update_events(keyspace: Keyspace, partition: PartitionHandle) -> anyhow::Result<()> { 330 + fn update_events(keyspace: Keyspace, global: PartitionHandle) -> anyhow::Result<()> { 200 331 // TODO: lock this to prevent concurrent rw 201 332 202 333 log::trace!("rw: getting rw cursor..."); 203 334 let mod_cursor = 204 - get_static::<ModCursorKey, ModCursorValue>(&partition)?.unwrap_or(Cursor::from_start()); 335 + get_static::<ModCursorKey, ModCursorValue>(&global)?.unwrap_or(Cursor::from_start()); 205 336 let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?; 206 337 207 338 let mut db_batch = keyspace.batch(); ··· 210 341 211 342 log::trace!("rw: iterating newer rw items..."); 212 343 213 - for (i, pair) in partition.range(range.clone()).enumerate() { 344 + for (i, pair) in global.range(range.clone()).enumerate() { 214 345 log::trace!("rw: iterating {i}"); 215 346 any_tasks_found = true; 216 347 ··· 233 364 log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}"); 234 365 batched_rw_items += DBWriter { 235 366 keyspace: keyspace.clone(), 236 - partition: partition.clone(), 367 + global: global.clone(), 237 368 } 238 369 .write_rw(&mut db_batch, mod_key, mod_value)?; 239 370 log::trace!("rw: iterating {i}: back from batcher."); ··· 257 388 Ok(()) 258 389 } 259 390 260 - fn trim_old_events(_keyspace: Keyspace, _partition: PartitionHandle) -> anyhow::Result<()> { 391 + fn trim_old_events(_keyspace: Keyspace, _global: PartitionHandle) -> anyhow::Result<()> { 261 392 // we *could* keep a collection dirty list in memory to reduce the amount of searching here 262 393 // actually can we use seen_by_js_cursor_collection?? 263 394 // * ["seen_by_js_cursor_collection"|js_cursor|collection] => u64 ··· 280 411 Ok(()) 281 412 } 282 413 283 - fn roll_up_counts(_keyspace: Keyspace, _partition: PartitionHandle) -> anyhow::Result<()> { 414 + fn roll_up_counts(_keyspace: Keyspace, _global: PartitionHandle) -> anyhow::Result<()> { 284 415 Ok(()) 285 416 } 286 417 ··· 290 421 _limit: usize, 291 422 ) -> anyhow::Result<Vec<()>> { 292 423 todo!(); 293 - // let partition = self.db.partition.clone(); 424 + // let global = self.db.global.clone(); 294 425 // let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?; 295 426 // tokio::task::spawn_blocking(move || { 296 427 // let mut output = Vec::new(); 297 428 298 - // for pair in partition.prefix(&prefix).rev().take(limit) { 429 + // for pair in global.prefix(&prefix).rev().take(limit) { 299 430 // let (k_bytes, v_bytes) = pair?; 300 431 // let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into(); 301 432 // let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into(); ··· 314 445 pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> { 315 446 let db = &self.db; 316 447 let keyspace = db.keyspace.clone(); 317 - let partition = db.partition.clone(); 448 + let global = db.global.clone(); 318 449 tokio::task::spawn_blocking(move || { 319 450 Ok(StorageInfo { 320 451 keyspace_disk_space: keyspace.disk_space(), 321 452 keyspace_journal_count: keyspace.journal_count(), 322 453 keyspace_sequence: keyspace.instant(), 323 - partition_approximate_len: partition.approximate_len(), 454 + global_approximate_len: global.approximate_len(), 324 455 }) 325 456 }) 326 457 .await? 327 458 } 328 459 329 460 pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> { 330 - let partition = self.db.partition.clone(); 461 + let global = self.db.global.clone(); 331 462 let collection = collection.clone(); 332 - tokio::task::spawn_blocking(move || get_unrolled_collection_seen(&partition, collection)) 463 + tokio::task::spawn_blocking(move || get_unrolled_collection_seen(&global, collection)) 333 464 .await? 334 465 } 335 466 336 467 pub async fn get_top_collections(&self) -> anyhow::Result<HashMap<String, u64>> { 337 - let partition = self.db.partition.clone(); 338 - tokio::task::spawn_blocking(move || get_unrolled_top_collections(&partition)).await? 468 + let global = self.db.global.clone(); 469 + tokio::task::spawn_blocking(move || get_unrolled_top_collections(&global)).await? 339 470 } 340 471 341 472 pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> { 342 - let partition = self.db.partition.clone(); 473 + let global = self.db.global.clone(); 343 474 tokio::task::spawn_blocking(move || { 344 - get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&partition) 475 + get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&global) 345 476 }) 346 477 .await? 347 478 } 348 479 349 480 async fn set_jetstream_endpoint(&self, endpoint: &str) -> anyhow::Result<()> { 350 - let partition = self.db.partition.clone(); 481 + let global = self.db.global.clone(); 351 482 let endpoint = endpoint.to_string(); 352 483 tokio::task::spawn_blocking(move || { 353 - insert_static::<JetstreamEndpointKey>(&partition, JetstreamEndpointValue(endpoint)) 484 + insert_static::<JetstreamEndpointKey>(&global, JetstreamEndpointValue(endpoint)) 354 485 }) 355 486 .await? 356 487 } 357 488 358 489 pub async fn get_jetstream_cursor(&self) -> anyhow::Result<Option<Cursor>> { 359 - let partition = self.db.partition.clone(); 490 + let global = self.db.global.clone(); 360 491 tokio::task::spawn_blocking(move || { 361 - get_static::<JetstreamCursorKey, JetstreamCursorValue>(&partition) 492 + get_static::<JetstreamCursorKey, JetstreamCursorValue>(&global) 362 493 }) 363 494 .await? 364 495 } 365 496 366 497 pub async fn get_mod_cursor(&self) -> anyhow::Result<Option<Cursor>> { 367 - let partition = self.db.partition.clone(); 368 - tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&partition)) 498 + let global = self.db.global.clone(); 499 + tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&global)) 369 500 .await? 370 501 } 371 502 } 372 503 373 504 /// Get a value from a fixed key 374 - fn get_static<K: StaticStr, V: DbBytes>(partition: &PartitionHandle) -> anyhow::Result<Option<V>> { 505 + fn get_static<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> anyhow::Result<Option<V>> { 506 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 507 + let value = global 508 + .get(&key_bytes)? 509 + .map(|value_bytes| db_complete(&value_bytes)) 510 + .transpose()?; 511 + Ok(value) 512 + } 513 + 514 + /// Get a value from a fixed key 515 + fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> Result<Option<V>, StorageError> { 375 516 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 376 - let value = partition 517 + let value = global 377 518 .get(&key_bytes)? 378 519 .map(|value_bytes| db_complete(&value_bytes)) 379 520 .transpose()?; ··· 382 523 383 524 /// Set a value to a fixed key 384 525 fn insert_static<K: StaticStr>( 385 - partition: &PartitionHandle, 526 + global: &PartitionHandle, 386 527 value: impl DbBytes, 387 528 ) -> anyhow::Result<()> { 388 529 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 389 530 let value_bytes = value.to_db_bytes()?; 390 - partition.insert(&key_bytes, &value_bytes)?; 531 + global.insert(&key_bytes, &value_bytes)?; 532 + Ok(()) 533 + } 534 + 535 + /// Set a value to a fixed key 536 + fn insert_static_neu<K: StaticStr>( 537 + global: &PartitionHandle, 538 + value: impl DbBytes, 539 + ) -> Result<(), StorageError> { 540 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 541 + let value_bytes = value.to_db_bytes()?; 542 + global.insert(&key_bytes, &value_bytes)?; 391 543 Ok(()) 392 544 } 393 545 394 546 /// Set a value to a fixed key 395 547 fn insert_batch_static<K: StaticStr>( 396 548 batch: &mut FjallBatch, 397 - partition: &PartitionHandle, 549 + global: &PartitionHandle, 398 550 value: impl DbBytes, 399 551 ) -> anyhow::Result<()> { 400 552 let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 401 553 let value_bytes = value.to_db_bytes()?; 402 - batch.insert(partition, &key_bytes, &value_bytes); 554 + batch.insert(global, &key_bytes, &value_bytes); 403 555 Ok(()) 404 556 } 405 557 406 558 /// Remove a key 407 559 fn remove_batch<K: DbBytes>( 408 560 batch: &mut FjallBatch, 409 - partition: &PartitionHandle, 561 + global: &PartitionHandle, 410 562 key: K, 411 563 ) -> Result<(), EncodingError> { 412 564 let key_bytes = key.to_db_bytes()?; 413 - batch.remove(partition, &key_bytes); 565 + batch.remove(global, &key_bytes); 414 566 Ok(()) 415 567 } 416 568 417 569 /// Get stats that haven't been rolled up yet 418 570 fn get_unrolled_collection_seen( 419 - partition: &PartitionHandle, 571 + global: &PartitionHandle, 420 572 collection: Nsid, 421 573 ) -> anyhow::Result<u64> { 422 574 let range = 423 - if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? { 575 + if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? { 424 576 eprintln!("found existing cursor"); 425 577 let key: ByCursorSeenKey = cursor_value.into(); 426 578 key.range_from()? ··· 434 586 let mut scanned = 0; 435 587 let mut rolled = 0; 436 588 437 - for pair in partition.range(range) { 589 + for pair in global.range(range) { 438 590 let (key_bytes, value_bytes) = pair?; 439 591 let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 440 592 let val = db_complete::<ByCursorSeenValue>(&value_bytes)?; ··· 453 605 } 454 606 455 607 fn get_unrolled_top_collections( 456 - partition: &PartitionHandle, 608 + global: &PartitionHandle, 457 609 ) -> anyhow::Result<HashMap<String, u64>> { 458 610 let range = 459 - if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? { 611 + if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? { 460 612 eprintln!("found existing cursor"); 461 613 let key: ByCursorSeenKey = cursor_value.into(); 462 614 key.range_from()? ··· 468 620 let mut res = HashMap::new(); 469 621 let mut scanned = 0; 470 622 471 - for pair in partition.range(range) { 623 + for pair in global.range(range) { 472 624 let (key_bytes, value_bytes) = pair?; 473 625 let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 474 626 let SeenCounter(n) = db_complete(&value_bytes)?; ··· 491 643 // self.add_record_modifies(&mut db_batch, event_batch.record_modifies)?; 492 644 // self.add_account_removes(&mut db_batch, event_batch.account_removes)?; 493 645 // if let Some(cursor) = last { 494 - // insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.partition, cursor)?; 646 + // insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.global, cursor)?; 495 647 // } 496 648 // log::info!("write: committing write batch..."); 497 649 // let r = db_batch.commit(); ··· 508 660 ) -> anyhow::Result<usize> { 509 661 // update the current rw cursor to this item (atomically with the batch if it succeeds) 510 662 let mod_cursor: Cursor = (&mod_key).into(); 511 - insert_batch_static::<ModCursorKey>(db_batch, &self.partition, mod_cursor.clone())?; 663 + insert_batch_static::<ModCursorKey>(db_batch, &self.global, mod_cursor.clone())?; 512 664 513 665 let items_modified = match mod_value { 514 666 ModQueueItemValue::DeleteAccount(did) => { ··· 517 669 log::trace!("rw: batcher: back from delete account (finished? {finished})"); 518 670 if finished { 519 671 // only remove the queued rw task if we have actually completed its account removal work 520 - remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 672 + remove_batch::<ModQueueItemKey>(db_batch, &self.global, mod_key)?; 521 673 items + 1 522 674 } else { 523 675 items ··· 527 679 log::trace!("rw: batcher: delete record..."); 528 680 let items = self.delete_record(db_batch, mod_cursor, did, collection, rkey)?; 529 681 log::trace!("rw: batcher: back from delete record"); 530 - remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 682 + remove_batch::<ModQueueItemKey>(db_batch, &self.global, mod_key)?; 531 683 items + 1 532 684 } 533 685 ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 534 686 let items = 535 687 self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?; 536 - remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 688 + remove_batch::<ModQueueItemKey>(db_batch, &self.global, mod_key)?; 537 689 items + 1 538 690 } 539 691 }; ··· 585 737 log::trace!("delete_record: iterate over up to current cursor..."); 586 738 587 739 for (i, pair) in self 588 - .partition 740 + .global 589 741 .range(key_prefix_bytes..key_limit) 590 742 .enumerate() 591 743 { ··· 602 754 } 603 755 604 756 // remove the by_id entry 605 - db_batch.remove(&self.partition, key_bytes); 757 + db_batch.remove(&self.global, key_bytes); 606 758 607 759 // remove its record sample 608 760 let by_collection_key_bytes = 609 761 ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?; 610 - db_batch.remove(&self.partition, by_collection_key_bytes); 762 + db_batch.remove(&self.global, by_collection_key_bytes); 611 763 612 764 items_removed += 1; 613 765 } 614 766 615 767 // if items_removed > 1 { 616 768 // log::trace!("odd, removed {items_removed} records for one record removal:"); 617 - // for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() { 769 + // for (i, pair) in self.global.prefix(&key_prefix_bytes).enumerate() { 618 770 // // find all (hopefully 1) 619 771 // let (key_bytes, _) = pair?; 620 772 // let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor(); ··· 639 791 640 792 let mut items_added = 0; 641 793 642 - for pair in self.partition.prefix(&key_prefix_bytes) { 794 + for pair in self.global.prefix(&key_prefix_bytes) { 643 795 let (key_bytes, _) = pair?; 644 796 645 797 let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into(); ··· 651 803 } 652 804 653 805 // remove the by_id entry 654 - db_batch.remove(&self.partition, key_bytes); 806 + db_batch.remove(&self.global, key_bytes); 655 807 656 808 // remove its record sample 657 809 let by_collection_key_bytes = 658 810 ByCollectionKey::new(collection, found_cursor).to_db_bytes()?; 659 - db_batch.remove(&self.partition, by_collection_key_bytes); 811 + db_batch.remove(&self.global, by_collection_key_bytes); 660 812 661 813 items_added += 1; 662 814 if items_added >= MAX_BATCHED_RW_ITEMS { ··· 683 835 // { 684 836 // if let Some(last_record) = &samples.back() { 685 837 // db_batch.insert( 686 - // &self.partition, 838 + // &self.global, 687 839 // ByCursorSeenKey::new(last_record.cursor.clone(), collection.clone()) 688 840 // .to_db_bytes()?, 689 841 // ByCursorSeenValue::new(total_seen as u64).to_db_bytes()?, ··· 718 870 ) -> anyhow::Result<()> { 719 871 // ["by_collection"|collection|js_cursor] => [did|rkey|record] 720 872 db_batch.insert( 721 - &self.partition, 873 + &self.global, 722 874 ByCollectionKey::new(collection.clone(), cursor.clone()).to_db_bytes()?, 723 875 ByCollectionValue::new(did.clone(), rkey.clone(), record).to_db_bytes()?, 724 876 ); 725 877 726 878 // ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. 727 879 db_batch.insert( 728 - &self.partition, 880 + &self.global, 729 881 ByIdKey::new(did, collection.clone(), rkey, cursor).to_db_bytes()?, 730 882 ByIdValue::default().to_db_bytes()?, 731 883 ); ··· 751 903 // ), 752 904 // }; 753 905 // db_batch.insert( 754 - // &self.partition, 906 + // &self.global, 755 907 // ModQueueItemKey::new(cursor).to_db_bytes()?, 756 908 // db_val.to_db_bytes()?, 757 909 // ); ··· 766 918 ) -> anyhow::Result<()> { 767 919 for deletion in account_removes { 768 920 db_batch.insert( 769 - &self.partition, 921 + &self.global, 770 922 ModQueueItemKey::new(deletion.cursor).to_db_bytes()?, 771 923 ModQueueItemValue::DeleteAccount(deletion.did).to_db_bytes()?, 772 924 ); ··· 780 932 pub keyspace_disk_space: u64, 781 933 pub keyspace_journal_count: usize, 782 934 pub keyspace_sequence: u64, 783 - pub partition_approximate_len: usize, 935 + pub global_approximate_len: usize, 784 936 } 785 937 786 938 struct DBWriter { 787 939 keyspace: Keyspace, 788 - partition: PartitionHandle, 940 + global: PartitionHandle, 789 941 } 790 942 791 943 ////////// temp stuff to remove: