Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(repo): use mst diff instead of full tree walk for obsolete blocks

Lewis: May this revision serve well! <lu5a@proton.me>

+580 -83
+23 -22
Cargo.lock
··· 7405 7405 7406 7406 [[package]] 7407 7407 name = "tranquil-api" 7408 - version = "0.5.2" 7408 + version = "0.5.3" 7409 7409 dependencies = [ 7410 7410 "anyhow", 7411 7411 "axum", ··· 7456 7456 7457 7457 [[package]] 7458 7458 name = "tranquil-auth" 7459 - version = "0.5.2" 7459 + version = "0.5.3" 7460 7460 dependencies = [ 7461 7461 "anyhow", 7462 7462 "base32", ··· 7479 7479 7480 7480 [[package]] 7481 7481 name = "tranquil-cache" 7482 - version = "0.5.2" 7482 + version = "0.5.3" 7483 7483 dependencies = [ 7484 7484 "async-trait", 7485 7485 "base64 0.22.1", ··· 7493 7493 7494 7494 [[package]] 7495 7495 name = "tranquil-comms" 7496 - version = "0.5.2" 7496 + version = "0.5.3" 7497 7497 dependencies = [ 7498 7498 "async-trait", 7499 7499 "base64 0.22.1", ··· 7511 7511 7512 7512 [[package]] 7513 7513 name = "tranquil-config" 7514 - version = "0.5.2" 7514 + version = "0.5.3" 7515 7515 dependencies = [ 7516 7516 "confique", 7517 7517 "serde", ··· 7519 7519 7520 7520 [[package]] 7521 7521 name = "tranquil-crypto" 7522 - version = "0.5.2" 7522 + version = "0.5.3" 7523 7523 dependencies = [ 7524 7524 "aes-gcm", 7525 7525 "base64 0.22.1", ··· 7535 7535 7536 7536 [[package]] 7537 7537 name = "tranquil-db" 7538 - version = "0.5.2" 7538 + version = "0.5.3" 7539 7539 dependencies = [ 7540 7540 "async-trait", 7541 7541 "chrono", ··· 7552 7552 7553 7553 [[package]] 7554 7554 name = "tranquil-db-traits" 7555 - version = "0.5.2" 7555 + version = "0.5.3" 7556 7556 dependencies = [ 7557 7557 "async-trait", 7558 7558 "base64 0.22.1", ··· 7568 7568 7569 7569 [[package]] 7570 7570 name = "tranquil-infra" 7571 - version = "0.5.2" 7571 + version = "0.5.3" 7572 7572 dependencies = [ 7573 7573 "async-trait", 7574 7574 "bytes", ··· 7579 7579 7580 7580 [[package]] 7581 7581 name = "tranquil-lexicon" 7582 - version = "0.5.2" 7582 + version = "0.5.3" 7583 7583 dependencies = [ 7584 7584 "chrono", 7585 7585 "hickory-resolver", ··· 7597 7597 7598 7598 [[package]] 7599 7599 name = "tranquil-oauth" 7600 - version = "0.5.2" 7600 + version = "0.5.3" 7601 7601 dependencies = [ 7602 7602 "anyhow", 7603 7603 "axum", ··· 7620 7620 7621 7621 [[package]] 7622 7622 name = "tranquil-oauth-server" 7623 - version = "0.5.2" 7623 + version = "0.5.3" 7624 7624 dependencies = [ 7625 7625 "axum", 7626 7626 "base64 0.22.1", ··· 7653 7653 7654 7654 [[package]] 7655 7655 name = "tranquil-pds" 7656 - version = "0.5.2" 7656 + version = "0.5.3" 7657 7657 dependencies = [ 7658 7658 "aes-gcm", 7659 7659 "anyhow", ··· 7745 7745 7746 7746 [[package]] 7747 7747 name = "tranquil-repo" 7748 - version = "0.5.2" 7748 + version = "0.5.3" 7749 7749 dependencies = [ 7750 7750 "bytes", 7751 7751 "cid", ··· 7757 7757 7758 7758 [[package]] 7759 7759 name = "tranquil-ripple" 7760 - version = "0.5.2" 7760 + version = "0.5.3" 7761 7761 dependencies = [ 7762 7762 "async-trait", 7763 7763 "backon", ··· 7782 7782 7783 7783 [[package]] 7784 7784 name = "tranquil-scopes" 7785 - version = "0.5.2" 7785 + version = "0.5.3" 7786 7786 dependencies = [ 7787 7787 "axum", 7788 7788 "futures", ··· 7798 7798 7799 7799 [[package]] 7800 7800 name = "tranquil-server" 7801 - version = "0.5.2" 7801 + version = "0.5.3" 7802 7802 dependencies = [ 7803 7803 "axum", 7804 7804 "clap", ··· 7819 7819 7820 7820 [[package]] 7821 7821 name = "tranquil-signal" 7822 - version = "0.5.2" 7822 + version = "0.5.3" 7823 7823 dependencies = [ 7824 7824 "async-trait", 7825 7825 "chrono", ··· 7842 7842 7843 7843 [[package]] 7844 7844 name = "tranquil-storage" 7845 - version = "0.5.2" 7845 + version = "0.5.3" 7846 7846 dependencies = [ 7847 7847 "async-trait", 7848 7848 "aws-config", ··· 7859 7859 7860 7860 [[package]] 7861 7861 name = "tranquil-store" 7862 - version = "0.5.2" 7862 + version = "0.5.3" 7863 7863 dependencies = [ 7864 7864 "async-trait", 7865 7865 "bytes", ··· 7872 7872 "jacquard-common", 7873 7873 "jacquard-repo", 7874 7874 "k256", 7875 + "libc", 7875 7876 "lsm-tree", 7876 7877 "memmap2", 7877 7878 "multihash", ··· 7905 7906 7906 7907 [[package]] 7907 7908 name = "tranquil-sync" 7908 - version = "0.5.2" 7909 + version = "0.5.3" 7909 7910 dependencies = [ 7910 7911 "anyhow", 7911 7912 "axum", ··· 7927 7928 7928 7929 [[package]] 7929 7930 name = "tranquil-types" 7930 - version = "0.5.2" 7931 + version = "0.5.3" 7931 7932 dependencies = [ 7932 7933 "chrono", 7933 7934 "cid",
+1 -1
Cargo.toml
··· 26 26 ] 27 27 28 28 [workspace.package] 29 - version = "0.5.2" 29 + version = "0.5.3" 30 30 edition = "2024" 31 31 license = "AGPL-3.0-or-later" 32 32
+10 -55
crates/tranquil-pds/src/repo_ops.rs
··· 13 13 use jacquard_repo::storage::BlockStore; 14 14 use k256::ecdsa::SigningKey; 15 15 use serde_json::{Value, json}; 16 - use std::collections::BTreeSet; 17 16 use std::str::FromStr; 18 17 use std::sync::Arc; 19 18 use tokio::sync::OwnedMutexGuard; ··· 226 225 Ok((ctx, mst)) 227 226 } 228 227 229 - pub async fn compute_obsolete_cids( 230 - original_mst: &Mst<TrackingBlockStore>, 231 - new_mst: &Mst<TrackingBlockStore>, 232 - original_root_cid: CommitCid, 233 - ) -> Result<Vec<Cid>, jacquard_repo::error::RepoError> { 234 - let (old_nodes, new_nodes, old_leaves, new_leaves) = tokio::try_join!( 235 - original_mst.collect_node_cids(), 236 - new_mst.collect_node_cids(), 237 - original_mst.leaves(), 238 - new_mst.leaves(), 239 - )?; 240 - let old_nodes_set: BTreeSet<Cid> = old_nodes.into_iter().collect(); 241 - let new_nodes_set: BTreeSet<Cid> = new_nodes.into_iter().collect(); 242 - let old_leaf_set: BTreeSet<Cid> = old_leaves.iter().map(|(_, cid)| *cid).collect(); 243 - let new_leaf_set: BTreeSet<Cid> = new_leaves.iter().map(|(_, cid)| *cid).collect(); 244 - let removed_nodes = old_nodes_set.difference(&new_nodes_set).copied(); 245 - let removed_leaves = old_leaf_set.difference(&new_leaf_set).copied(); 246 - let obsolete: BTreeSet<Cid> = std::iter::once(original_root_cid.into_cid()) 247 - .chain(removed_nodes) 248 - .chain(removed_leaves) 249 - .collect(); 250 - Ok(obsolete.into_iter().collect()) 251 - } 252 - 253 228 pub async fn finalize_repo_write( 254 229 state: &AppState, 255 230 ctx: RepoWriteContext, ··· 267 242 let original_settled = Mst::load(storage_for_diff.clone(), ctx.prev_data_cid, None); 268 243 let new_settled = Mst::load(storage_for_diff, new_mst_root, None); 269 244 270 - let (obsolete_result, new_tree_result) = tokio::join!( 271 - compute_obsolete_cids( 272 - &original_settled, 273 - &new_settled, 274 - CommitCid::from(ctx.current_root_cid), 275 - ), 276 - async { 277 - let (nodes, leaves) = 278 - tokio::try_join!(new_settled.collect_node_cids(), new_settled.leaves(),)?; 279 - Ok::<Vec<Cid>, jacquard_repo::error::RepoError>( 280 - nodes 281 - .into_iter() 282 - .chain(leaves.iter().map(|(_, cid)| *cid)) 283 - .collect(), 284 - ) 285 - }, 286 - ); 245 + let new_tree_cids: Vec<Cid> = block_bytes.keys().copied().collect(); 287 246 288 - let new_tree_cids = match new_tree_result { 289 - Ok(cids) => cids, 290 - Err(e) => { 291 - error!( 292 - "new tree walk failed: {e}. \ 293 - Falling back to written-block CIDs only; \ 294 - shared subtree ownership already tracked by prior commits." 247 + let obsolete_cids = match original_settled.diff(&new_settled).await { 248 + Ok(diff) => { 249 + let mut obsolete: Vec<Cid> = Vec::with_capacity( 250 + 1 + diff.removed_mst_blocks.len() + diff.removed_cids.len(), 295 251 ); 296 - block_bytes.keys().copied().collect() 252 + obsolete.push(ctx.current_root_cid); 253 + obsolete.extend(diff.removed_mst_blocks); 254 + obsolete.extend(diff.removed_cids); 255 + obsolete 297 256 } 298 - }; 299 - 300 - let obsolete_cids = match obsolete_result { 301 - Ok(cids) => cids, 302 257 Err(e) => { 303 258 error!( 304 259 "MST diff failed during finalize_repo_write: {e}. \ 305 - Proceeding with empty obsolete set; leaked blocks \ 260 + Proceeding with commit CID only; leaked blocks \ 306 261 will be reclaimed by reachability GC." 307 262 ); 308 263 vec![ctx.current_root_cid]
+2 -3
crates/tranquil-pds/tests/account_lifecycle.rs
··· 99 99 after_delete_blocks 100 100 ); 101 101 assert!( 102 - after_delete_blocks >= initial_blocks, 103 - "Block count after delete should be at least initial count (initial {}, now {})", 104 - initial_blocks, 102 + after_delete_blocks >= 2, 103 + "Block count after delete should have at least commit + MST root (got {})", 105 104 after_delete_blocks 106 105 ); 107 106 }
+543
crates/tranquil-pds/tests/mst_diff_equivalence.rs
··· 1 + use std::collections::BTreeSet; 2 + use std::sync::Arc; 3 + 4 + use cid::Cid; 5 + use jacquard_repo::mst::Mst; 6 + use jacquard_repo::storage::MemoryBlockStore; 7 + 8 + fn test_cid(n: u32) -> Cid { 9 + let data = n.to_be_bytes(); 10 + let mut buf = [0u8; 32]; 11 + buf[..4].copy_from_slice(&data); 12 + buf[4] = (n >> 8) as u8 ^ 0xAB; 13 + buf[5] = (n & 0xFF) as u8 ^ 0xCD; 14 + let mh = multihash::Multihash::wrap(0x12, &buf).unwrap(); 15 + Cid::new_v1(0x71, mh) 16 + } 17 + 18 + async fn compute_obsolete_full_walk<S: jacquard_repo::storage::BlockStore + Sync + Send + 'static>( 19 + old: &Mst<S>, 20 + new: &Mst<S>, 21 + ) -> BTreeSet<Cid> { 22 + let old_nodes = old.collect_node_cids().await.unwrap(); 23 + let new_nodes = new.collect_node_cids().await.unwrap(); 24 + let old_leaves = old.leaves().await.unwrap(); 25 + let new_leaves = new.leaves().await.unwrap(); 26 + let old_nodes_set: BTreeSet<Cid> = old_nodes.into_iter().collect(); 27 + let new_nodes_set: BTreeSet<Cid> = new_nodes.into_iter().collect(); 28 + let old_leaf_set: BTreeSet<Cid> = old_leaves.iter().map(|(_, cid)| *cid).collect(); 29 + let new_leaf_set: BTreeSet<Cid> = new_leaves.iter().map(|(_, cid)| *cid).collect(); 30 + old_nodes_set 31 + .difference(&new_nodes_set) 32 + .copied() 33 + .chain(old_leaf_set.difference(&new_leaf_set).copied()) 34 + .collect() 35 + } 36 + 37 + fn compute_obsolete_from_diff( 38 + diff: &jacquard_repo::mst::diff::MstDiff, 39 + ) -> BTreeSet<Cid> { 40 + diff.removed_mst_blocks 41 + .iter() 42 + .copied() 43 + .chain(diff.removed_cids.iter().copied()) 44 + .collect() 45 + } 46 + 47 + async fn assert_equivalence( 48 + old_records: &[(String, u32)], 49 + new_records: &[(String, u32)], 50 + scenario: &str, 51 + ) { 52 + let storage = Arc::new(MemoryBlockStore::new()); 53 + 54 + let mut old_tree = Mst::new(storage.clone()); 55 + for (key, val) in old_records { 56 + old_tree = old_tree.add(key, test_cid(*val)).await.unwrap(); 57 + } 58 + let old_root = old_tree.persist().await.unwrap(); 59 + 60 + let mut new_tree = Mst::new(storage.clone()); 61 + for (key, val) in new_records { 62 + new_tree = new_tree.add(key, test_cid(*val)).await.unwrap(); 63 + } 64 + let new_root = new_tree.persist().await.unwrap(); 65 + 66 + let old_settled = Mst::load(storage.clone(), old_root, None); 67 + let new_settled = Mst::load(storage.clone(), new_root, None); 68 + 69 + let full_walk_obsolete = compute_obsolete_full_walk(&old_settled, &new_settled).await; 70 + 71 + let old_for_diff = Mst::load(storage.clone(), old_root, None); 72 + let new_for_diff = Mst::load(storage, new_root, None); 73 + let diff = old_for_diff.diff(&new_for_diff).await.unwrap(); 74 + let diff_obsolete = compute_obsolete_from_diff(&diff); 75 + 76 + assert_eq!( 77 + full_walk_obsolete, diff_obsolete, 78 + "MISMATCH in scenario: {scenario}\n full_walk count: {}\n diff count: {}\n in full_walk but not diff: {:?}\n in diff but not full_walk: {:?}", 79 + full_walk_obsolete.len(), 80 + diff_obsolete.len(), 81 + full_walk_obsolete.difference(&diff_obsolete).collect::<Vec<_>>(), 82 + diff_obsolete.difference(&full_walk_obsolete).collect::<Vec<_>>(), 83 + ); 84 + } 85 + 86 + fn make_key(collection: &str, i: u32) -> String { 87 + format!("{collection}/{i:06}") 88 + } 89 + 90 + fn generate_records(collection: &str, range: std::ops::Range<u32>) -> Vec<(String, u32)> { 91 + range.map(|i| (make_key(collection, i), i)).collect() 92 + } 93 + 94 + fn generate_multi_collection_records( 95 + collections: &[&str], 96 + per_collection: u32, 97 + ) -> Vec<(String, u32)> { 98 + collections 99 + .iter() 100 + .enumerate() 101 + .flat_map(|(ci, coll)| { 102 + let base = ci as u32 * per_collection; 103 + (0..per_collection).map(move |i| (make_key(coll, i), base + i)) 104 + }) 105 + .collect() 106 + } 107 + 108 + fn apply_scattered_updates( 109 + records: &[(String, u32)], 110 + stride: usize, 111 + cid_offset: u32, 112 + ) -> Vec<(String, u32)> { 113 + records 114 + .iter() 115 + .enumerate() 116 + .map(|(idx, (key, val))| { 117 + if idx % stride == 0 { 118 + (key.clone(), val + cid_offset) 119 + } else { 120 + (key.clone(), *val) 121 + } 122 + }) 123 + .collect() 124 + } 125 + 126 + fn remove_every_nth(records: &[(String, u32)], n: usize) -> Vec<(String, u32)> { 127 + records 128 + .iter() 129 + .enumerate() 130 + .filter(|(idx, _)| idx % n != 0) 131 + .map(|(_, r)| r.clone()) 132 + .collect() 133 + } 134 + 135 + fn remove_range(records: &[(String, u32)], start: usize, count: usize) -> Vec<(String, u32)> { 136 + records 137 + .iter() 138 + .enumerate() 139 + .filter(|(idx, _)| *idx < start || *idx >= start + count) 140 + .map(|(_, r)| r.clone()) 141 + .collect() 142 + } 143 + 144 + fn keep_only_collection(records: &[(String, u32)], collection: &str) -> Vec<(String, u32)> { 145 + records 146 + .iter() 147 + .filter(|(key, _)| key.starts_with(collection)) 148 + .cloned() 149 + .collect() 150 + } 151 + 152 + fn append_records( 153 + base: &[(String, u32)], 154 + collection: &str, 155 + range: std::ops::Range<u32>, 156 + cid_base: u32, 157 + ) -> Vec<(String, u32)> { 158 + let mut result = base.to_vec(); 159 + result.extend(range.map(|i| (make_key(collection, i), cid_base + i))); 160 + result.sort_by(|(a, _), (b, _)| a.cmp(b)); 161 + result 162 + } 163 + 164 + #[tokio::test] 165 + async fn massive_tree_single_create() { 166 + let old = generate_records("app.bsky.feed.post", 0..2000); 167 + let new_rec = append_records(&old, "app.bsky.feed.post", 2000..2001, 2000); 168 + assert_equivalence(&old, &new_rec, "2000 records + 1 create").await; 169 + } 170 + 171 + #[tokio::test] 172 + async fn massive_tree_single_delete() { 173 + let old = generate_records("app.bsky.feed.post", 0..2000); 174 + let new_rec = remove_range(&old, 1000, 1); 175 + assert_equivalence(&old, &new_rec, "2000 records - 1 delete from middle").await; 176 + } 177 + 178 + #[tokio::test] 179 + async fn massive_tree_single_update() { 180 + let old = generate_records("app.bsky.feed.post", 0..2000); 181 + let new_rec: Vec<_> = old 182 + .iter() 183 + .map(|(k, v)| { 184 + if k == "app.bsky.feed.post/001000" { 185 + (k.clone(), v + 50000) 186 + } else { 187 + (k.clone(), *v) 188 + } 189 + }) 190 + .collect(); 191 + assert_equivalence(&old, &new_rec, "2000 records - 1 update in middle").await; 192 + } 193 + 194 + #[tokio::test] 195 + async fn massive_tree_scattered_updates_every_3rd() { 196 + let old = generate_records("app.bsky.feed.post", 0..1500); 197 + let new_rec = apply_scattered_updates(&old, 3, 10000); 198 + assert_equivalence(&old, &new_rec, "1500 records - update every 3rd").await; 199 + } 200 + 201 + #[tokio::test] 202 + async fn massive_tree_scattered_updates_every_7th() { 203 + let old = generate_records("app.bsky.feed.post", 0..2000); 204 + let new_rec = apply_scattered_updates(&old, 7, 20000); 205 + assert_equivalence(&old, &new_rec, "2000 records - update every 7th").await; 206 + } 207 + 208 + #[tokio::test] 209 + async fn massive_tree_delete_every_2nd() { 210 + let old = generate_records("app.bsky.feed.post", 0..1000); 211 + let new_rec = remove_every_nth(&old, 2); 212 + assert_equivalence(&old, &new_rec, "1000 records - delete every 2nd").await; 213 + } 214 + 215 + #[tokio::test] 216 + async fn massive_tree_delete_every_5th() { 217 + let old = generate_records("app.bsky.feed.post", 0..2000); 218 + let new_rec = remove_every_nth(&old, 5); 219 + assert_equivalence(&old, &new_rec, "2000 records - delete every 5th").await; 220 + } 221 + 222 + #[tokio::test] 223 + async fn massive_tree_delete_first_half() { 224 + let old = generate_records("app.bsky.feed.post", 0..1500); 225 + let new_rec = remove_range(&old, 0, 750); 226 + assert_equivalence(&old, &new_rec, "1500 records - delete first 750").await; 227 + } 228 + 229 + #[tokio::test] 230 + async fn massive_tree_delete_last_half() { 231 + let old = generate_records("app.bsky.feed.post", 0..1500); 232 + let new_rec = remove_range(&old, 750, 750); 233 + assert_equivalence(&old, &new_rec, "1500 records - delete last 750").await; 234 + } 235 + 236 + #[tokio::test] 237 + async fn massive_tree_delete_middle_chunk() { 238 + let old = generate_records("app.bsky.feed.post", 0..2000); 239 + let new_rec = remove_range(&old, 800, 400); 240 + assert_equivalence(&old, &new_rec, "2000 records - delete 400 from middle").await; 241 + } 242 + 243 + #[tokio::test] 244 + async fn empty_to_massive() { 245 + let new_rec = generate_records("app.bsky.feed.post", 0..1500); 246 + assert_equivalence(&[], &new_rec, "empty to 1500 records").await; 247 + } 248 + 249 + #[tokio::test] 250 + async fn massive_to_empty() { 251 + let old = generate_records("app.bsky.feed.post", 0..1500); 252 + assert_equivalence(&old, &[], "1500 records to empty").await; 253 + } 254 + 255 + #[tokio::test] 256 + async fn massive_complete_replacement() { 257 + let old = generate_records("app.bsky.feed.post", 0..1000); 258 + let new_rec = generate_records("app.bsky.feed.post", 1000..2000); 259 + assert_equivalence(&old, &new_rec, "1000 records fully replaced with 1000 different").await; 260 + } 261 + 262 + #[tokio::test] 263 + async fn massive_no_change() { 264 + let records = generate_records("app.bsky.feed.post", 0..1500); 265 + assert_equivalence(&records, &records, "1500 records unchanged").await; 266 + } 267 + 268 + #[tokio::test] 269 + async fn multi_collection_5_collections_500_each() { 270 + let collections = [ 271 + "app.bsky.feed.like", 272 + "app.bsky.feed.post", 273 + "app.bsky.feed.repost", 274 + "app.bsky.graph.follow", 275 + "app.bsky.graph.block", 276 + ]; 277 + let old = generate_multi_collection_records(&collections, 500); 278 + let new_rec = apply_scattered_updates(&old, 4, 30000); 279 + assert_equivalence(&old, &new_rec, "5 collections x 500 records - update every 4th").await; 280 + } 281 + 282 + #[tokio::test] 283 + async fn multi_collection_wipe_one_collection() { 284 + let collections = [ 285 + "app.bsky.feed.like", 286 + "app.bsky.feed.post", 287 + "app.bsky.feed.repost", 288 + "app.bsky.graph.follow", 289 + ]; 290 + let old = generate_multi_collection_records(&collections, 400); 291 + 292 + let new_rec: Vec<_> = old 293 + .iter() 294 + .filter(|(key, _)| !key.starts_with("app.bsky.feed.repost")) 295 + .cloned() 296 + .collect(); 297 + assert_equivalence(&old, &new_rec, "4 collections x 400 - wipe repost collection").await; 298 + } 299 + 300 + #[tokio::test] 301 + async fn multi_collection_keep_only_one() { 302 + let collections = [ 303 + "app.bsky.feed.like", 304 + "app.bsky.feed.post", 305 + "app.bsky.feed.repost", 306 + "app.bsky.graph.follow", 307 + "app.bsky.graph.block", 308 + ]; 309 + let old = generate_multi_collection_records(&collections, 300); 310 + let new_rec = keep_only_collection(&old, "app.bsky.feed.post"); 311 + assert_equivalence(&old, &new_rec, "5 collections x 300 - keep only posts").await; 312 + } 313 + 314 + #[tokio::test] 315 + async fn multi_collection_add_new_collection() { 316 + let old_collections = [ 317 + "app.bsky.feed.like", 318 + "app.bsky.feed.post", 319 + ]; 320 + let old = generate_multi_collection_records(&old_collections, 500); 321 + let new_rec = append_records(&old, "app.bsky.graph.follow", 0..500, 40000); 322 + assert_equivalence(&old, &new_rec, "2 collections x 500 + add 500 follows").await; 323 + } 324 + 325 + #[tokio::test] 326 + async fn mixed_ops_massive_tree() { 327 + let collections = [ 328 + "app.bsky.feed.like", 329 + "app.bsky.feed.post", 330 + "app.bsky.feed.repost", 331 + "app.bsky.graph.follow", 332 + ]; 333 + let old = generate_multi_collection_records(&collections, 400); 334 + 335 + let mut new_rec: Vec<_> = old 336 + .iter() 337 + .filter(|(key, _)| !key.starts_with("app.bsky.feed.repost")) 338 + .enumerate() 339 + .map(|(idx, (key, val))| { 340 + if key.starts_with("app.bsky.feed.like") && idx % 3 == 0 { 341 + (key.clone(), val + 50000) 342 + } else { 343 + (key.clone(), *val) 344 + } 345 + }) 346 + .collect(); 347 + 348 + new_rec.extend((0..200u32).map(|i| (make_key("app.bsky.graph.block", i), 60000 + i))); 349 + new_rec.sort_by(|(a, _), (b, _)| a.cmp(b)); 350 + 351 + assert_equivalence( 352 + &old, 353 + &new_rec, 354 + "4 collections x 400: wipe reposts, update every 3rd like, add 200 blocks", 355 + ) 356 + .await; 357 + } 358 + 359 + #[tokio::test] 360 + async fn grow_tree_by_double() { 361 + let old = generate_records("app.bsky.feed.post", 0..1000); 362 + let new_rec = generate_records("app.bsky.feed.post", 0..2000); 363 + assert_equivalence(&old, &new_rec, "grow from 1000 to 2000").await; 364 + } 365 + 366 + #[tokio::test] 367 + async fn shrink_tree_by_half() { 368 + let old = generate_records("app.bsky.feed.post", 0..2000); 369 + let new_rec = generate_records("app.bsky.feed.post", 0..1000); 370 + assert_equivalence(&old, &new_rec, "shrink from 2000 to 1000").await; 371 + } 372 + 373 + #[tokio::test] 374 + async fn interleaved_keys_disjoint_ranges() { 375 + let old: Vec<_> = (0..1000u32) 376 + .map(|i| (make_key("app.bsky.feed.post", i * 2), i)) 377 + .collect(); 378 + let new_rec: Vec<_> = (0..1000u32) 379 + .map(|i| (make_key("app.bsky.feed.post", i * 2 + 1), i + 10000)) 380 + .collect(); 381 + assert_equivalence(&old, &new_rec, "1000 even-keyed records replaced by 1000 odd-keyed").await; 382 + } 383 + 384 + #[tokio::test] 385 + async fn sparse_keys_wide_gaps() { 386 + let old: Vec<_> = (0..500u32) 387 + .map(|i| (make_key("app.bsky.feed.post", i * 100), i)) 388 + .collect(); 389 + let new_rec: Vec<_> = (0..500u32) 390 + .map(|i| { 391 + if i % 10 == 0 { 392 + (make_key("app.bsky.feed.post", i * 100), i + 70000) 393 + } else { 394 + (make_key("app.bsky.feed.post", i * 100), i) 395 + } 396 + }) 397 + .collect(); 398 + assert_equivalence(&old, &new_rec, "500 sparse keys - update every 10th").await; 399 + } 400 + 401 + #[tokio::test] 402 + async fn many_collections_few_records_each() { 403 + let collections: Vec<String> = (0..50u32) 404 + .map(|i| format!("com.example.lexicon{i:02}.record")) 405 + .collect(); 406 + let old: Vec<_> = collections 407 + .iter() 408 + .enumerate() 409 + .flat_map(|(ci, coll)| { 410 + let base = ci as u32 * 20; 411 + (0..20u32).map(move |i| (make_key(coll, i), base + i)) 412 + }) 413 + .collect(); 414 + 415 + let new_rec: Vec<_> = old 416 + .iter() 417 + .enumerate() 418 + .filter_map(|(idx, (key, val))| { 419 + if idx % 15 == 0 { 420 + None 421 + } else if idx % 7 == 0 { 422 + Some((key.clone(), val + 80000)) 423 + } else { 424 + Some((key.clone(), *val)) 425 + } 426 + }) 427 + .collect(); 428 + 429 + assert_equivalence(&old, &new_rec, "50 collections x 20 records - delete every 15th, update every 7th").await; 430 + } 431 + 432 + #[tokio::test] 433 + async fn update_all_records() { 434 + let old = generate_records("app.bsky.feed.post", 0..1000); 435 + let new_rec: Vec<_> = old 436 + .iter() 437 + .map(|(key, val)| (key.clone(), val + 90000)) 438 + .collect(); 439 + assert_equivalence(&old, &new_rec, "1000 records - update every single one").await; 440 + } 441 + 442 + #[tokio::test] 443 + async fn delete_all_but_one() { 444 + let old = generate_records("app.bsky.feed.post", 0..1500); 445 + let new_rec = vec![old[750].clone()]; 446 + assert_equivalence(&old, &new_rec, "1500 records - delete all but middle one").await; 447 + } 448 + 449 + #[tokio::test] 450 + async fn one_to_massive() { 451 + let old = vec![(make_key("app.bsky.feed.post", 500), 500u32)]; 452 + let new_rec = generate_records("app.bsky.feed.post", 0..1500); 453 + assert_equivalence(&old, &new_rec, "1 record to 1500 records").await; 454 + } 455 + 456 + #[tokio::test] 457 + async fn delete_head_and_tail() { 458 + let old = generate_records("app.bsky.feed.post", 0..2000); 459 + let new_rec: Vec<_> = old[200..1800].to_vec(); 460 + assert_equivalence(&old, &new_rec, "2000 records - delete first 200 and last 200").await; 461 + } 462 + 463 + #[tokio::test] 464 + async fn keep_head_and_tail_only() { 465 + let old = generate_records("app.bsky.feed.post", 0..2000); 466 + let mut new_rec: Vec<_> = old[..100].to_vec(); 467 + new_rec.extend_from_slice(&old[1900..]); 468 + assert_equivalence(&old, &new_rec, "2000 records - keep only first 100 and last 100").await; 469 + } 470 + 471 + #[tokio::test] 472 + async fn massive_tree_update_first_and_last() { 473 + let old = generate_records("app.bsky.feed.post", 0..2000); 474 + let mut new_rec = old.clone(); 475 + new_rec[0].1 += 99000; 476 + new_rec[1999].1 += 99000; 477 + assert_equivalence(&old, &new_rec, "2000 records - update only first and last").await; 478 + } 479 + 480 + #[tokio::test] 481 + async fn overlapping_collection_swap() { 482 + let old_collections = [ 483 + "app.bsky.feed.like", 484 + "app.bsky.feed.post", 485 + "app.bsky.feed.repost", 486 + ]; 487 + let old = generate_multi_collection_records(&old_collections, 500); 488 + 489 + let mut new_rec: Vec<_> = old 490 + .iter() 491 + .filter(|(key, _)| key.starts_with("app.bsky.feed.post")) 492 + .cloned() 493 + .collect(); 494 + new_rec.extend((0..500u32).map(|i| (make_key("app.bsky.graph.follow", i), 70000 + i))); 495 + new_rec.extend((0..500u32).map(|i| (make_key("app.bsky.graph.block", i), 71000 + i))); 496 + new_rec.sort_by(|(a, _), (b, _)| a.cmp(b)); 497 + 498 + assert_equivalence( 499 + &old, 500 + &new_rec, 501 + "swap 2 of 3 collections, keep 1 (posts), 500 each", 502 + ) 503 + .await; 504 + } 505 + 506 + #[tokio::test] 507 + async fn swiss_cheese_deletions() { 508 + let old = generate_records("app.bsky.feed.post", 0..1500); 509 + let new_rec: Vec<_> = old 510 + .iter() 511 + .enumerate() 512 + .filter(|(idx, _)| { 513 + let bucket = idx / 50; 514 + bucket % 3 != 0 515 + }) 516 + .map(|(_, r)| r.clone()) 517 + .collect(); 518 + assert_equivalence(&old, &new_rec, "1500 records - delete every 3rd chunk of 50").await; 519 + } 520 + 521 + #[tokio::test] 522 + async fn mixed_ops_with_key_density_change() { 523 + let old: Vec<_> = (0..1000u32) 524 + .map(|i| (make_key("app.bsky.feed.post", i * 3), i)) 525 + .collect(); 526 + 527 + let mut new_rec: Vec<_> = old 528 + .iter() 529 + .filter(|(_, val)| val % 4 != 0) 530 + .cloned() 531 + .collect(); 532 + new_rec.extend((0..500u32).map(|i| { 533 + (make_key("app.bsky.feed.post", i * 3 + 1), i + 100000) 534 + })); 535 + new_rec.sort_by(|(a, _), (b, _)| a.cmp(b)); 536 + 537 + assert_equivalence( 538 + &old, 539 + &new_rec, 540 + "1000 sparse records: delete every 4th, insert 500 in gaps", 541 + ) 542 + .await; 543 + }
-1
justfile
··· 56 56 ./scripts/run-tests.sh --test actor --test commit_signing --test image_processing --test lifecycle_social --test notifications --test server --test signing_key --test verify_live_commit 57 57 58 58 test *args: 59 - @just test-store 60 59 @just test-unit 61 60 ./scripts/run-tests.sh {{args}} 62 61
+1 -1
scripts/run-tests.sh
··· 23 23 24 24 echo "Running tests..." 25 25 echo "" 26 - cargo nextest run "$@" 26 + cargo nextest run -E 'not package(tranquil-store)' "$@" 27 27 28 28 echo "" 29 29 echo "All tests passed."