Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-store): gauntlet persistence & restart invariants

Lewis: May this revision serve well! <lu5a@proton.me>

+470 -102
+4
.config/nextest.toml
··· 73 73 slow-timeout = { period = "120s", terminate-after = 4 } 74 74 75 75 [[profile.default.overrides]] 76 + filter = "binary(gauntlet_smoke)" 77 + slow-timeout = { period = "300s", terminate-after = 8 } 78 + 79 + [[profile.default.overrides]] 76 80 filter = "binary(compaction_restart) | binary(mst_refcount_integrity) | binary(gc_compaction_restart)" 77 81 slow-timeout = { period = "120s", terminate-after = 4 } 78 82
+149 -19
crates/tranquil-store/src/gauntlet/invariants.rs
··· 1 1 use std::collections::{HashMap, HashSet}; 2 + use std::sync::Arc; 2 3 3 - use super::oracle::Oracle; 4 + use async_trait::async_trait; 5 + use cid::Cid; 6 + use jacquard_repo::mst::Mst; 7 + 8 + use super::oracle::{Oracle, hex_short, try_cid_to_fixed}; 4 9 use crate::blockstore::{CidBytes, TranquilBlockStore}; 5 10 6 11 #[derive(Debug, Clone, Copy, PartialEq, Eq)] ··· 10 15 pub const EMPTY: Self = Self(0); 11 16 pub const REFCOUNT_CONSERVATION: Self = Self(1 << 0); 12 17 pub const REACHABILITY: Self = Self(1 << 1); 18 + pub const ACKED_WRITE_PERSISTENCE: Self = Self(1 << 2); 19 + pub const READ_AFTER_WRITE: Self = Self(1 << 3); 20 + pub const RESTART_IDEMPOTENT: Self = Self(1 << 4); 13 21 14 - const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 | Self::REACHABILITY.0; 22 + const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 23 + | Self::REACHABILITY.0 24 + | Self::ACKED_WRITE_PERSISTENCE.0 25 + | Self::READ_AFTER_WRITE.0 26 + | Self::RESTART_IDEMPOTENT.0; 15 27 16 28 pub const fn contains(self, other: Self) -> bool { 17 29 (self.0 & other.0) == other.0 ··· 19 31 20 32 pub const fn union(self, other: Self) -> Self { 21 33 Self(self.0 | other.0) 34 + } 35 + 36 + pub const fn without(self, other: Self) -> Self { 37 + Self(self.0 & !other.0) 22 38 } 23 39 24 40 pub const fn unknown_bits(self) -> u32 { ··· 39 55 pub detail: String, 40 56 } 41 57 42 - pub trait Invariant { 58 + pub struct InvariantCtx<'a> { 59 + pub store: &'a Arc<TranquilBlockStore>, 60 + pub oracle: &'a Oracle, 61 + pub root: Option<Cid>, 62 + } 63 + 64 + #[async_trait] 65 + pub trait Invariant: Send + Sync { 43 66 fn name(&self) -> &'static str; 44 - fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation>; 67 + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation>; 45 68 } 46 69 47 70 pub struct RefcountConservation; 48 71 72 + #[async_trait] 49 73 impl Invariant for RefcountConservation { 50 74 fn name(&self) -> &'static str { 51 75 "RefcountConservation" 52 76 } 53 77 54 - fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> { 55 - let live: Vec<(String, CidBytes)> = oracle.live_cids_labeled(); 78 + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 79 + let live: Vec<(String, CidBytes)> = ctx.oracle.live_cids_labeled(); 56 80 let live_set: HashSet<CidBytes> = live.iter().map(|(_, c)| *c).collect(); 57 - let index: HashMap<CidBytes, u32> = store 81 + let index: HashMap<CidBytes, u32> = ctx 82 + .store 58 83 .block_index() 59 84 .live_entries_snapshot() 60 85 .into_iter() ··· 90 115 91 116 pub struct Reachability; 92 117 118 + #[async_trait] 93 119 impl Invariant for Reachability { 94 120 fn name(&self) -> &'static str { 95 121 "Reachability" 96 122 } 97 123 98 - fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> { 99 - let violations: Vec<String> = oracle 124 + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 125 + let violations: Vec<String> = ctx 126 + .oracle 100 127 .live_cids_labeled() 101 128 .into_iter() 102 - .filter_map(|(label, fixed)| match store.get_block_sync(&fixed) { 129 + .filter_map(|(label, fixed)| match ctx.store.get_block_sync(&fixed) { 103 130 Ok(Some(_)) => None, 104 131 Ok(None) => Some(format!("{label}: missing")), 105 132 Err(e) => Some(format!("{label}: read error {e}")), ··· 117 144 } 118 145 } 119 146 120 - fn hex_short(cid: &CidBytes) -> String { 121 - let tail = &cid[cid.len() - 6..]; 122 - tail.iter().map(|b| format!("{b:02x}")).collect() 147 + pub struct AckedWritePersistence; 148 + 149 + #[async_trait] 150 + impl Invariant for AckedWritePersistence { 151 + fn name(&self) -> &'static str { 152 + "AckedWritePersistence" 153 + } 154 + 155 + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 156 + let Some(root) = ctx.root else { 157 + if ctx.oracle.live_count() == 0 { 158 + return Ok(()); 159 + } 160 + return Err(InvariantViolation { 161 + invariant: "AckedWritePersistence", 162 + detail: format!( 163 + "oracle has {} live records but reopened store has no root", 164 + ctx.oracle.live_count() 165 + ), 166 + }); 167 + }; 168 + let mst = Mst::load(ctx.store.clone(), root, None); 169 + let keys: Vec<String> = ctx 170 + .oracle 171 + .live_records() 172 + .map(|(c, r, _)| format!("{}/{}", c.0, r.0)) 173 + .collect(); 174 + 175 + let mut missing: Vec<String> = Vec::new(); 176 + for key in &keys { 177 + match mst.get(key).await { 178 + Ok(Some(_)) => {} 179 + Ok(None) => missing.push(format!("{key}: missing after reopen")), 180 + Err(e) => missing.push(format!("{key}: mst.get error after reopen: {e}")), 181 + } 182 + } 183 + 184 + if missing.is_empty() { 185 + Ok(()) 186 + } else { 187 + Err(InvariantViolation { 188 + invariant: "AckedWritePersistence", 189 + detail: missing.join("; "), 190 + }) 191 + } 192 + } 193 + } 194 + 195 + pub struct ReadAfterWrite; 196 + 197 + #[async_trait] 198 + impl Invariant for ReadAfterWrite { 199 + fn name(&self) -> &'static str { 200 + "ReadAfterWrite" 201 + } 202 + 203 + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 204 + let Some(root) = ctx.root else { 205 + return Ok(()); 206 + }; 207 + let mst = Mst::load(ctx.store.clone(), root, None); 208 + 209 + let entries: Vec<(String, CidBytes)> = ctx 210 + .oracle 211 + .live_records() 212 + .map(|(c, r, v)| (format!("{}/{}", c.0, r.0), *v)) 213 + .collect(); 214 + 215 + let mut violations: Vec<String> = Vec::new(); 216 + for (key, expected) in &entries { 217 + match mst.get(key).await { 218 + Ok(Some(cid)) => match try_cid_to_fixed(&cid) { 219 + Ok(actual) if actual == *expected => match ctx.store.get_block_sync(&actual) { 220 + Ok(Some(_)) => {} 221 + Ok(None) => violations.push(format!("{key}: block missing for cid")), 222 + Err(e) => violations.push(format!("{key}: block read error {e}")), 223 + }, 224 + Ok(actual) => violations.push(format!( 225 + "{key}: MST cid {} != oracle cid {}", 226 + hex_short(&actual), 227 + hex_short(expected), 228 + )), 229 + Err(e) => { 230 + violations.push(format!("{key}: unexpected CID format from MST: {e}")) 231 + } 232 + }, 233 + Ok(None) => violations.push(format!("{key}: MST returned None")), 234 + Err(e) => violations.push(format!("{key}: mst.get error {e}")), 235 + } 236 + } 237 + 238 + if violations.is_empty() { 239 + Ok(()) 240 + } else { 241 + Err(InvariantViolation { 242 + invariant: "ReadAfterWrite", 243 + detail: violations.join("; "), 244 + }) 245 + } 246 + } 123 247 } 124 248 125 249 pub fn invariants_for(set: InvariantSet) -> Vec<Box<dyn Invariant>> { ··· 128 252 unknown == 0, 129 253 "invariants_for: unknown InvariantSet bits 0x{unknown:x}; all bits must map to an impl" 130 254 ); 131 - [ 255 + let candidates: Vec<(InvariantSet, Box<dyn Invariant>)> = vec![ 132 256 ( 133 257 InvariantSet::REFCOUNT_CONSERVATION, 134 - Box::new(RefcountConservation) as Box<dyn Invariant>, 258 + Box::new(RefcountConservation), 135 259 ), 136 260 (InvariantSet::REACHABILITY, Box::new(Reachability)), 137 - ] 138 - .into_iter() 139 - .filter_map(|(flag, inv)| set.contains(flag).then_some(inv)) 140 - .collect() 261 + ( 262 + InvariantSet::ACKED_WRITE_PERSISTENCE, 263 + Box::new(AckedWritePersistence), 264 + ), 265 + (InvariantSet::READ_AFTER_WRITE, Box::new(ReadAfterWrite)), 266 + ]; 267 + candidates 268 + .into_iter() 269 + .filter_map(|(flag, inv)| set.contains(flag).then_some(inv)) 270 + .collect() 141 271 }
+3 -3
crates/tranquil-store/src/gauntlet/mod.rs
··· 10 10 pub use op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; 11 11 pub use oracle::Oracle; 12 12 pub use runner::{ 13 - CompactInterval, Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend, 14 - MaxFileSize, OpIndex, OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits, 15 - ShardCount, StoreConfig, WallMs, 13 + Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend, MaxFileSize, OpIndex, 14 + OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits, ShardCount, StoreConfig, 15 + WallMs, 16 16 }; 17 17 pub use scenarios::{Scenario, config_for}; 18 18 pub use workload::{
+22 -10
crates/tranquil-store/src/gauntlet/oracle.rs
··· 5 5 use super::op::{CollectionName, RecordKey}; 6 6 use crate::blockstore::CidBytes; 7 7 8 + #[derive(Debug, thiserror::Error, PartialEq, Eq)] 9 + #[error("unexpected CID encoding: got {actual} bytes, expected 36 for sha256 CIDv1")] 10 + pub struct CidFormatError { 11 + pub actual: usize, 12 + } 13 + 8 14 #[derive(Debug, Default)] 9 15 pub struct Oracle { 10 16 live: HashMap<(CollectionName, RecordKey), CidBytes>, 11 17 current_root: Option<Cid>, 12 - mst_node_cids: Vec<Cid>, 18 + mst_node_cids: Vec<CidBytes>, 13 19 } 14 20 15 21 impl Oracle { ··· 38 44 self.current_root 39 45 } 40 46 41 - pub fn set_node_cids(&mut self, cids: Vec<Cid>) { 47 + pub fn set_mst_node_cids(&mut self, cids: Vec<CidBytes>) { 42 48 self.mst_node_cids = cids; 43 49 } 44 50 45 - pub fn mst_node_cids(&self) -> &[Cid] { 46 - &self.mst_node_cids 51 + pub fn clear_mst_state(&mut self) { 52 + self.current_root = None; 53 + self.mst_node_cids.clear(); 47 54 } 48 55 49 56 pub fn live_records(&self) -> impl Iterator<Item = (&CollectionName, &RecordKey, &CidBytes)> { ··· 58 65 let nodes = self 59 66 .mst_node_cids 60 67 .iter() 61 - .map(|cid| (format!("mst {cid}"), cid_to_fixed(cid))); 68 + .map(|bytes| (format!("mst {}", hex_short(bytes)), *bytes)); 62 69 let records = self 63 70 .live_records() 64 71 .map(|(c, r, v)| (format!("record {}/{}", c.0, r.0), *v)); ··· 66 73 } 67 74 } 68 75 69 - pub(super) fn cid_to_fixed(cid: &Cid) -> CidBytes { 76 + pub(super) fn try_cid_to_fixed(cid: &Cid) -> Result<CidBytes, CidFormatError> { 70 77 let bytes = cid.to_bytes(); 71 - debug_assert_eq!(bytes.len(), 36, "expected 36 byte CIDv1+sha256"); 72 - let mut arr = [0u8; 36]; 73 - arr.copy_from_slice(&bytes[..36]); 74 - arr 78 + let actual = bytes.len(); 79 + bytes.try_into().map_err(|_| CidFormatError { actual }) 80 + } 81 + 82 + pub(super) fn hex_short(cid: &CidBytes) -> String { 83 + cid[cid.len() - 6..] 84 + .iter() 85 + .map(|b| format!("{b:02x}")) 86 + .collect() 75 87 }
+212 -57
crates/tranquil-store/src/gauntlet/runner.rs
··· 6 6 use jacquard_repo::mst::Mst; 7 7 use jacquard_repo::storage::BlockStore; 8 8 9 - use super::invariants::{InvariantSet, InvariantViolation, invariants_for}; 9 + use super::invariants::{InvariantCtx, InvariantSet, InvariantViolation, invariants_for}; 10 10 use super::op::{Op, OpStream, Seed, ValueSeed}; 11 - use super::oracle::{Oracle, cid_to_fixed}; 11 + use super::oracle::{CidFormatError, Oracle, hex_short, try_cid_to_fixed}; 12 12 use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel}; 13 13 use crate::blockstore::{ 14 14 BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore, ··· 44 44 #[derive(Debug, Clone, Copy)] 45 45 pub struct ShardCount(pub u8); 46 46 47 - #[derive(Debug, Clone, Copy)] 48 - pub struct CompactInterval(pub u32); 49 - 50 47 #[derive(Debug, Clone)] 51 48 pub struct StoreConfig { 52 49 pub max_file_size: MaxFileSize, 53 50 pub group_commit: GroupCommitConfig, 54 51 pub shard_count: ShardCount, 55 - pub compact_every: CompactInterval, 56 52 } 57 53 58 54 #[derive(Debug, Clone)] ··· 108 104 CompactFile(String), 109 105 #[error("join: {0}")] 110 106 Join(String), 107 + #[error("cid format: {0}")] 108 + CidFormat(#[from] CidFormatError), 111 109 } 112 110 113 111 pub struct Gauntlet { ··· 148 146 restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), 149 147 violations: vec![InvariantViolation { 150 148 invariant: "WallClockBudget", 151 - detail: format!("exceeded max_wall_ms ({} ms)", d.as_millis()), 149 + detail: format!("exceeded max_wall_ms of {} ms", d.as_millis()), 152 150 }], 153 151 }, 154 152 }, ··· 175 173 let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); 176 174 let mut halt_ops = false; 177 175 176 + let mid_run_set = config 177 + .invariants 178 + .without(InvariantSet::RESTART_IDEMPOTENT) 179 + .without(InvariantSet::ACKED_WRITE_PERSISTENCE); 180 + let post_reopen_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); 181 + 178 182 for (idx, op) in op_stream.iter().enumerate() { 179 183 if halt_ops { 180 184 break; ··· 186 190 invariant: "OpExecution", 187 191 detail: format!("op {idx}: {e}"), 188 192 }); 189 - ops_counter.store(idx + 1, Ordering::Relaxed); 190 193 halt_ops = true; 191 194 continue; 192 195 } ··· 209 212 halt_ops = true; 210 213 continue; 211 214 } 212 - violations.extend(check_all(&store, &oracle, config.invariants)); 213 - if !violations.is_empty() { 215 + let before = violations.len(); 216 + violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); 217 + if violations.len() > before { 214 218 halt_ops = true; 215 219 } 216 220 } 217 221 } 218 222 219 - match refresh_oracle_graph(&store, &mut oracle, root).await { 220 - Ok(()) => violations.extend(check_all(&store, &oracle, config.invariants)), 221 - Err(e) => violations.push(InvariantViolation { 222 - invariant: "OpExecution", 223 - detail: format!("refresh at end: {e}"), 224 - }), 223 + if !halt_ops { 224 + match refresh_oracle_graph(&store, &mut oracle, root).await { 225 + Ok(()) => { 226 + let before = violations.len(); 227 + violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); 228 + if violations.len() > before { 229 + halt_ops = true; 230 + } 231 + } 232 + Err(e) => { 233 + violations.push(InvariantViolation { 234 + invariant: "OpExecution", 235 + detail: format!("refresh at end: {e}"), 236 + }); 237 + halt_ops = true; 238 + } 239 + } 240 + } 241 + 242 + if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) && !halt_ops { 243 + let pre_snapshot = snapshot_block_index(&store); 244 + drop(store); 245 + let reopened = Arc::new( 246 + TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) 247 + .expect("reopen for RestartIdempotent"), 248 + ); 249 + let post_snapshot = snapshot_block_index(&reopened); 250 + if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { 251 + violations.push(InvariantViolation { 252 + invariant: "RestartIdempotent", 253 + detail, 254 + }); 255 + } else { 256 + violations.extend(run_invariants(&reopened, &oracle, root, post_reopen_set).await); 257 + } 225 258 } 226 259 227 260 GauntletReport { ··· 232 265 } 233 266 } 234 267 235 - fn check_all( 236 - store: &TranquilBlockStore, 268 + async fn run_invariants( 269 + store: &Arc<TranquilBlockStore>, 237 270 oracle: &Oracle, 271 + root: Option<Cid>, 238 272 set: InvariantSet, 239 273 ) -> Vec<InvariantViolation> { 240 - invariants_for(set) 274 + let ctx = InvariantCtx { 275 + store, 276 + oracle, 277 + root, 278 + }; 279 + let mut out = Vec::new(); 280 + for inv in invariants_for(set) { 281 + if let Err(v) = inv.check(&ctx).await { 282 + out.push(v); 283 + } 284 + } 285 + out 286 + } 287 + 288 + fn snapshot_block_index(store: &TranquilBlockStore) -> Vec<(CidBytes, u32)> { 289 + let mut v: Vec<(CidBytes, u32)> = store 290 + .block_index() 291 + .live_entries_snapshot() 292 + .into_iter() 293 + .map(|(c, r)| (c, r.raw())) 294 + .collect(); 295 + v.sort_unstable_by(|a, b| a.0.cmp(&b.0)); 296 + v 297 + } 298 + 299 + const SNAPSHOT_DIFF_ITEMS: usize = 16; 300 + 301 + fn diff_snapshots(pre: &[(CidBytes, u32)], post: &[(CidBytes, u32)]) -> Option<String> { 302 + if pre == post { 303 + return None; 304 + } 305 + let pre_map: std::collections::HashMap<CidBytes, u32> = pre.iter().copied().collect(); 306 + let post_map: std::collections::HashMap<CidBytes, u32> = post.iter().copied().collect(); 307 + 308 + let only_pre: Vec<String> = pre_map 309 + .iter() 310 + .filter(|(c, _)| !post_map.contains_key(*c)) 311 + .map(|(c, r)| format!("lost {} refcount {}", hex_short(c), r)) 312 + .collect(); 313 + let only_post: Vec<String> = post_map 314 + .iter() 315 + .filter(|(c, _)| !pre_map.contains_key(*c)) 316 + .map(|(c, r)| format!("gained {} refcount {}", hex_short(c), r)) 317 + .collect(); 318 + let changed: Vec<String> = pre_map 319 + .iter() 320 + .filter_map(|(c, pre_r)| match post_map.get(c) { 321 + Some(post_r) if post_r != pre_r => { 322 + Some(format!("{} refcount {} -> {}", hex_short(c), pre_r, post_r)) 323 + } 324 + _ => None, 325 + }) 326 + .collect(); 327 + 328 + let total = only_pre.len() + only_post.len() + changed.len(); 329 + let mut items: Vec<String> = only_pre 241 330 .into_iter() 242 - .filter_map(|inv| inv.check(store, oracle).err()) 243 - .collect() 331 + .chain(only_post) 332 + .chain(changed) 333 + .take(SNAPSHOT_DIFF_ITEMS) 334 + .collect(); 335 + if total > items.len() { 336 + items.push(format!("+{} more", total - items.len())); 337 + } 338 + Some(format!( 339 + "block index changed across clean reopen: pre={} entries, post={} entries; {}", 340 + pre.len(), 341 + post.len(), 342 + items.join("; "), 343 + )) 244 344 } 245 345 246 346 async fn refresh_oracle_graph( ··· 250 350 ) -> Result<(), String> { 251 351 match root { 252 352 None => { 253 - oracle.set_node_cids(Vec::new()); 353 + oracle.clear_mst_state(); 254 354 Ok(()) 255 355 } 256 356 Some(r) => { ··· 259 359 .collect_node_cids() 260 360 .await 261 361 .map_err(|e| format!("collect_node_cids: {e}"))?; 362 + let fixed: Vec<CidBytes> = cids 363 + .iter() 364 + .map(try_cid_to_fixed) 365 + .collect::<Result<_, _>>() 366 + .map_err(|e| format!("mst node cid: {e}"))?; 262 367 oracle.set_root(r); 263 - oracle.set_node_cids(cids); 368 + oracle.set_mst_node_cids(fixed); 264 369 Ok(()) 265 370 } 266 371 } ··· 298 403 let ValueBytes(lo) = range.min(); 299 404 let ValueBytes(hi) = range.max(); 300 405 let span = u64::from(hi.saturating_sub(lo)).max(1); 301 - let rng_state = u64::from(raw); 302 - (lo as usize) + (rng_state % span) as usize 406 + (lo as usize) + (u64::from(raw) % span) as usize 303 407 } 304 408 }; 305 - serde_ipld_dagcbor::to_vec(&serde_json::json!({ 306 - "$type": "app.bsky.feed.post", 307 - "text": format!("record-{raw}"), 308 - "createdAt": "2026-01-01T00:00:00Z", 309 - "pad": "x".repeat(target_len.saturating_sub(64)), 310 - })) 311 - .expect("encode record") 409 + let target_len = target_len.max(8); 410 + let seed_bytes = raw.to_le_bytes(); 411 + (0..target_len) 412 + .map(|i| seed_bytes[i % 4] ^ (i as u8).wrapping_mul(31)) 413 + .collect() 312 414 } 313 415 314 416 async fn apply_op( ··· 329 431 .put(&record_bytes) 330 432 .await 331 433 .map_err(|e| OpError::PutRecord(e.to_string()))?; 332 - let key = format!("{}/{}", collection.0, rkey.0); 333 - let loaded = match *root { 334 - None => Mst::new(store.clone()), 335 - Some(r) => Mst::load(store.clone(), r, None), 336 - }; 337 - let updated = loaded 338 - .add(&key, record_cid) 339 - .await 340 - .map_err(|e| OpError::MstAdd(e.to_string()))?; 341 - let new_root = updated 342 - .persist() 343 - .await 344 - .map_err(|e| OpError::MstPersist(e.to_string()))?; 434 + let record_cid_bytes = try_cid_to_fixed(&record_cid)?; 345 435 346 - if let Some(old_root) = *root { 347 - apply_mst_diff(store, old_root, new_root).await?; 436 + let outcome = 437 + add_record_inner(store, *root, collection, rkey, record_cid, record_cid_bytes) 438 + .await; 439 + match outcome { 440 + Ok((new_root, applied)) => { 441 + *root = Some(new_root); 442 + if applied { 443 + oracle.add(collection.clone(), rkey.clone(), record_cid_bytes); 444 + } 445 + Ok(()) 446 + } 447 + Err(e) => { 448 + if let Err(cleanup_err) = 449 + decrement_obsolete(store, vec![record_cid_bytes]).await 450 + { 451 + tracing::warn!( 452 + op_error = %e, 453 + cleanup_error = %cleanup_err, 454 + "AddRecord cleanup decrement failed; refcount may leak", 455 + ); 456 + } 457 + Err(e) 458 + } 348 459 } 349 - 350 - *root = Some(new_root); 351 - oracle.add(collection.clone(), rkey.clone(), cid_to_fixed(&record_cid)); 352 - Ok(()) 353 460 } 354 461 Op::DeleteRecord { collection, rkey } => { 355 462 let Some(old_root) = *root else { return Ok(()) }; ··· 389 496 } 390 497 } 391 498 499 + async fn add_record_inner( 500 + store: &Arc<TranquilBlockStore>, 501 + root: Option<Cid>, 502 + collection: &super::op::CollectionName, 503 + rkey: &super::op::RecordKey, 504 + record_cid: Cid, 505 + record_cid_bytes: CidBytes, 506 + ) -> Result<(Cid, bool), OpError> { 507 + let key = format!("{}/{}", collection.0, rkey.0); 508 + let loaded = match root { 509 + None => Mst::new(store.clone()), 510 + Some(r) => Mst::load(store.clone(), r, None), 511 + }; 512 + let updated = loaded 513 + .add(&key, record_cid) 514 + .await 515 + .map_err(|e| OpError::MstAdd(e.to_string()))?; 516 + let new_root = updated 517 + .persist() 518 + .await 519 + .map_err(|e| OpError::MstPersist(e.to_string()))?; 520 + 521 + match root { 522 + Some(old_root) if old_root == new_root => { 523 + decrement_obsolete(store, vec![record_cid_bytes]).await?; 524 + Ok((new_root, false)) 525 + } 526 + Some(old_root) => { 527 + apply_mst_diff(store, old_root, new_root).await?; 528 + Ok((new_root, true)) 529 + } 530 + None => Ok((new_root, true)), 531 + } 532 + } 533 + 534 + async fn decrement_obsolete( 535 + store: &Arc<TranquilBlockStore>, 536 + obsolete: Vec<CidBytes>, 537 + ) -> Result<(), OpError> { 538 + let s = store.clone(); 539 + tokio::task::spawn_blocking(move || { 540 + s.apply_commit_blocking(vec![], obsolete) 541 + .map_err(|e| e.to_string()) 542 + }) 543 + .await 544 + .map_err(|e| OpError::Join(e.to_string()))? 545 + .map_err(OpError::ApplyCommit) 546 + } 547 + 392 548 async fn apply_mst_diff( 393 549 store: &Arc<TranquilBlockStore>, 394 550 old_root: Cid, ··· 404 560 .removed_mst_blocks 405 561 .into_iter() 406 562 .chain(diff.removed_cids.into_iter()) 407 - .map(|c| cid_to_fixed(&c)) 408 - .collect(); 563 + .map(|c| try_cid_to_fixed(&c)) 564 + .collect::<Result<_, _>>()?; 409 565 let s = store.clone(); 410 566 tokio::task::spawn_blocking(move || { 411 567 s.apply_commit_blocking(vec![], obsolete) ··· 419 575 const COMPACT_LIVENESS_CEILING: f64 = 0.99; 420 576 421 577 fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> { 422 - let liveness = match store.compaction_liveness(0) { 423 - Ok(l) => l, 424 - Err(_) => return Ok(()), 425 - }; 578 + let liveness = store 579 + .compaction_liveness(0) 580 + .map_err(|e| OpError::CompactFile(format!("compaction_liveness: {e}")))?; 426 581 let targets: Vec<_> = liveness 427 582 .iter() 428 583 .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < COMPACT_LIVENESS_CEILING)
+53 -7
crates/tranquil-store/src/gauntlet/scenarios.rs
··· 1 1 use super::invariants::InvariantSet; 2 2 use super::op::{CollectionName, Seed}; 3 3 use super::runner::{ 4 - CompactInterval, GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, 5 - ShardCount, StoreConfig, WallMs, 4 + GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, ShardCount, 5 + StoreConfig, WallMs, 6 6 }; 7 7 use super::workload::{ 8 8 KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel, ··· 14 14 SmokePR, 15 15 MstChurn, 16 16 MstRestartChurn, 17 + FullStackRestart, 17 18 } 18 19 19 20 pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig { ··· 21 22 Scenario::SmokePR => smoke_pr(seed), 22 23 Scenario::MstChurn => mst_churn(seed), 23 24 Scenario::MstRestartChurn => mst_restart_churn(seed), 25 + Scenario::FullStackRestart => full_stack_restart(seed), 24 26 } 25 27 } 26 28 ··· 33 35 34 36 fn tiny_store() -> StoreConfig { 35 37 StoreConfig { 36 - max_file_size: MaxFileSize(300), 38 + max_file_size: MaxFileSize(4096), 37 39 group_commit: GroupCommitConfig { 38 40 checkpoint_interval_ms: 100, 39 41 checkpoint_write_threshold: 10, 40 42 ..GroupCommitConfig::default() 41 43 }, 42 44 shard_count: ShardCount(1), 43 - compact_every: CompactInterval(5), 44 45 } 45 46 } 46 47 ··· 60 61 key_space: KeySpaceSize(200), 61 62 }, 62 63 op_count: OpCount(10_000), 63 - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, 64 + invariants: InvariantSet::REFCOUNT_CONSERVATION 65 + | InvariantSet::REACHABILITY 66 + | InvariantSet::ACKED_WRITE_PERSISTENCE 67 + | InvariantSet::READ_AFTER_WRITE 68 + | InvariantSet::RESTART_IDEMPOTENT, 64 69 limits: RunLimits { 65 70 max_wall_ms: Some(WallMs(60_000)), 66 71 }, ··· 85 90 key_space: KeySpaceSize(2_000), 86 91 }, 87 92 op_count: OpCount(100_000), 88 - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, 93 + invariants: InvariantSet::REFCOUNT_CONSERVATION 94 + | InvariantSet::REACHABILITY 95 + | InvariantSet::ACKED_WRITE_PERSISTENCE 96 + | InvariantSet::READ_AFTER_WRITE 97 + | InvariantSet::RESTART_IDEMPOTENT, 89 98 limits: RunLimits { 90 99 max_wall_ms: Some(WallMs(600_000)), 91 100 }, ··· 110 119 key_space: KeySpaceSize(2_000), 111 120 }, 112 121 op_count: OpCount(100_000), 113 - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, 122 + invariants: InvariantSet::REFCOUNT_CONSERVATION 123 + | InvariantSet::REACHABILITY 124 + | InvariantSet::ACKED_WRITE_PERSISTENCE 125 + | InvariantSet::READ_AFTER_WRITE 126 + | InvariantSet::RESTART_IDEMPOTENT, 114 127 limits: RunLimits { 115 128 max_wall_ms: Some(WallMs(600_000)), 116 129 }, ··· 118 131 store: tiny_store(), 119 132 } 120 133 } 134 + 135 + fn full_stack_restart(seed: Seed) -> GauntletConfig { 136 + GauntletConfig { 137 + seed, 138 + io: IoBackend::Real, 139 + workload: WorkloadModel { 140 + weights: OpWeights { 141 + add: 80, 142 + delete: 0, 143 + compact: 15, 144 + checkpoint: 5, 145 + }, 146 + size_distribution: SizeDistribution::Fixed(ValueBytes(80)), 147 + collections: default_collections(), 148 + key_space: KeySpaceSize(500), 149 + }, 150 + op_count: OpCount(5_000), 151 + invariants: InvariantSet::REFCOUNT_CONSERVATION 152 + | InvariantSet::REACHABILITY 153 + | InvariantSet::ACKED_WRITE_PERSISTENCE 154 + | InvariantSet::READ_AFTER_WRITE 155 + | InvariantSet::RESTART_IDEMPOTENT, 156 + limits: RunLimits { 157 + max_wall_ms: Some(WallMs(120_000)), 158 + }, 159 + restart_policy: RestartPolicy::EveryNOps(OpInterval(500)), 160 + store: StoreConfig { 161 + max_file_size: MaxFileSize(4096), 162 + group_commit: GroupCommitConfig::default(), 163 + shard_count: ShardCount(1), 164 + }, 165 + } 166 + }
+27 -6
crates/tranquil-store/tests/gauntlet_smoke.rs
··· 1 1 use tranquil_store::blockstore::GroupCommitConfig; 2 2 use tranquil_store::gauntlet::{ 3 - CollectionName, CompactInterval, Gauntlet, GauntletConfig, InvariantSet, IoBackend, 4 - KeySpaceSize, MaxFileSize, OpCount, OpInterval, OpWeights, RestartPolicy, RunLimits, Scenario, 5 - Seed, ShardCount, SizeDistribution, StoreConfig, ValueBytes, WallMs, WorkloadModel, config_for, 6 - farm, 3 + CollectionName, Gauntlet, GauntletConfig, InvariantSet, IoBackend, KeySpaceSize, MaxFileSize, 4 + OpCount, OpInterval, OpWeights, RestartPolicy, RunLimits, Scenario, Seed, ShardCount, 5 + SizeDistribution, StoreConfig, ValueBytes, WallMs, WorkloadModel, config_for, farm, 7 6 }; 8 7 9 8 #[test] ··· 48 47 key_space: KeySpaceSize(100), 49 48 }, 50 49 op_count: OpCount(200), 51 - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, 50 + invariants: InvariantSet::REFCOUNT_CONSERVATION 51 + | InvariantSet::REACHABILITY 52 + | InvariantSet::ACKED_WRITE_PERSISTENCE 53 + | InvariantSet::READ_AFTER_WRITE 54 + | InvariantSet::RESTART_IDEMPOTENT, 52 55 limits: RunLimits { 53 56 max_wall_ms: Some(WallMs(30_000)), 54 57 }, ··· 61 64 ..GroupCommitConfig::default() 62 65 }, 63 66 shard_count: ShardCount(1), 64 - compact_every: CompactInterval(5), 65 67 }, 66 68 } 67 69 } ··· 87 89 report.restarts.0 88 90 ); 89 91 assert_eq!(report.ops_executed.0, 200); 92 + } 93 + 94 + #[tokio::test] 95 + async fn full_stack_restart_port() { 96 + let cfg = config_for(Scenario::FullStackRestart, Seed(1)); 97 + let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; 98 + assert!( 99 + report.is_clean(), 100 + "violations: {:?}", 101 + report 102 + .violations 103 + .iter() 104 + .map(|v| format!("{}: {}", v.invariant, v.detail)) 105 + .collect::<Vec<_>>() 106 + ); 107 + assert_eq!( 108 + report.restarts.0, 10, 109 + "FullStackRestart with EveryNOps(500) over 5000 ops must restart exactly 10 times", 110 + ); 90 111 } 91 112 92 113 #[tokio::test]