Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-store/gauntlet): concurrent executor, eventlog, fault recovery

Lewis: May this revision serve well! <lu5a@proton.me>

+1213 -96
+1213 -96
crates/tranquil-store/src/gauntlet/runner.rs
··· 1 + use std::ops::Range; 2 + use std::path::{Path, PathBuf}; 1 3 use std::sync::Arc; 2 4 use std::sync::atomic::{AtomicUsize, Ordering}; 3 5 use std::time::Duration; ··· 6 8 use jacquard_repo::mst::Mst; 7 9 use jacquard_repo::storage::BlockStore; 8 10 9 - use super::invariants::{InvariantCtx, InvariantSet, InvariantViolation, invariants_for}; 10 - use super::op::{Op, OpStream, Seed, ValueSeed}; 11 - use super::oracle::{CidFormatError, Oracle, hex_short, try_cid_to_fixed}; 11 + use super::invariants::{ 12 + EventLogSnapshot, InvariantCtx, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, 13 + }; 14 + use super::op::{DidSeed, EventKind, Op, OpStream, PayloadSeed, RetentionSecs, Seed, ValueSeed}; 15 + use super::oracle::{CidFormatError, EventExpectation, Oracle, hex_short, try_cid_to_fixed}; 12 16 use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel}; 13 17 use crate::blockstore::{ 14 18 BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore, 19 + hash_to_cid_bytes, 15 20 }; 21 + use crate::eventlog::{ 22 + DEFAULT_INDEX_INTERVAL, DidHash, EventLogWriter, EventTypeTag, MAX_EVENT_PAYLOAD, SegmentId, 23 + SegmentManager, SegmentReader, TimestampMicros, ValidEvent, 24 + }; 25 + use crate::io::{RealIO, StorageIO}; 26 + use crate::sim::{FaultConfig, SimulatedIO}; 16 27 17 28 #[derive(Debug, Clone, Copy)] 18 29 pub enum IoBackend { 19 30 Real, 20 - Simulated, 31 + Simulated { fault: FaultConfig }, 21 32 } 22 33 23 34 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] ··· 28 39 Never, 29 40 EveryNOps(OpInterval), 30 41 PoissonByOps(OpInterval), 42 + CrashAtSyscall(OpInterval), 31 43 } 32 44 33 45 #[derive(Debug, Clone, Copy)] ··· 42 54 pub struct MaxFileSize(pub u64); 43 55 44 56 #[derive(Debug, Clone, Copy)] 57 + pub struct MaxSegmentSize(pub u64); 58 + 59 + #[derive(Debug, Clone, Copy)] 45 60 pub struct ShardCount(pub u8); 46 61 47 62 #[derive(Debug, Clone)] ··· 51 66 pub shard_count: ShardCount, 52 67 } 53 68 69 + #[derive(Debug, Clone, Copy)] 70 + pub struct EventLogConfig { 71 + pub max_segment_size: MaxSegmentSize, 72 + } 73 + 74 + #[derive(Debug, Clone, Copy)] 75 + pub struct WriterConcurrency(pub usize); 76 + 77 + impl Default for WriterConcurrency { 78 + fn default() -> Self { 79 + Self(1) 80 + } 81 + } 82 + 54 83 #[derive(Debug, Clone)] 55 84 pub struct GauntletConfig { 56 85 pub seed: Seed, ··· 61 90 pub limits: RunLimits, 62 91 pub restart_policy: RestartPolicy, 63 92 pub store: StoreConfig, 93 + pub eventlog: Option<EventLogConfig>, 94 + pub writer_concurrency: WriterConcurrency, 64 95 } 65 96 66 97 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 67 98 pub struct OpsExecuted(pub usize); 68 99 69 100 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 101 + pub struct OpErrorCount(pub usize); 102 + 103 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 70 104 pub struct RestartCount(pub usize); 71 105 72 106 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 73 107 pub struct OpIndex(pub usize); 74 108 75 - #[derive(Debug)] 109 + #[derive(Debug, Clone)] 76 110 pub struct GauntletReport { 77 111 pub seed: Seed, 78 112 pub ops_executed: OpsExecuted, 113 + pub op_errors: OpErrorCount, 79 114 pub restarts: RestartCount, 80 115 pub violations: Vec<InvariantViolation>, 116 + pub ops: OpStream, 81 117 } 82 118 83 119 impl GauntletReport { 84 120 pub fn is_clean(&self) -> bool { 85 121 self.violations.is_empty() 86 122 } 123 + 124 + pub fn violation_invariants(&self) -> std::collections::BTreeSet<&'static str> { 125 + self.violations.iter().map(|v| v.invariant).collect() 126 + } 87 127 } 88 128 89 129 #[derive(Debug, thiserror::Error)] ··· 106 146 Join(String), 107 147 #[error("cid format: {0}")] 108 148 CidFormat(#[from] CidFormatError), 149 + #[error("eventlog append: {0}")] 150 + EventLogAppend(String), 151 + #[error("eventlog sync: {0}")] 152 + EventLogSync(String), 153 + #[error("eventlog retention: {0}")] 154 + EventLogRetention(String), 155 + } 156 + 157 + pub struct EventLogState<S: StorageIO + Send + Sync + 'static> { 158 + pub writer: EventLogWriter<S>, 159 + pub manager: Arc<SegmentManager<S>>, 160 + pub segments_dir: PathBuf, 161 + pub max_segment_size: u64, 162 + } 163 + 164 + pub struct Harness<S: StorageIO + Send + Sync + 'static> { 165 + pub store: Arc<TranquilBlockStore<S>>, 166 + pub eventlog: Option<EventLogState<S>>, 167 + } 168 + 169 + pub struct WriteState<S: StorageIO + Send + Sync + 'static> { 170 + pub root: Option<Cid>, 171 + pub oracle: Oracle, 172 + pub eventlog: Option<EventLogState<S>>, 173 + } 174 + 175 + pub struct SharedState<S: StorageIO + Send + Sync + 'static> { 176 + pub store: Arc<TranquilBlockStore<S>>, 177 + pub write: tokio::sync::Mutex<WriteState<S>>, 109 178 } 110 179 111 180 pub struct Gauntlet { ··· 113 182 } 114 183 115 184 #[derive(Debug, thiserror::Error)] 116 - pub enum GauntletBuildError { 117 - #[error("IoBackend::Simulated not wired yet")] 118 - UnsupportedIoBackend, 119 - } 185 + pub enum GauntletBuildError {} 120 186 121 187 impl Gauntlet { 122 188 pub fn new(config: GauntletConfig) -> Result<Self, GauntletBuildError> { 123 - match config.io { 124 - IoBackend::Real => Ok(Self { config }), 125 - IoBackend::Simulated => Err(GauntletBuildError::UnsupportedIoBackend), 126 - } 189 + Ok(Self { config }) 190 + } 191 + 192 + pub fn generate_ops(&self) -> OpStream { 193 + self.config 194 + .workload 195 + .generate(self.config.seed, self.config.op_count) 127 196 } 128 197 129 198 pub async fn run(self) -> GauntletReport { 199 + let ops = self.generate_ops(); 200 + self.run_with_ops(ops).await 201 + } 202 + 203 + pub async fn run_with_ops(self, ops: OpStream) -> GauntletReport { 130 204 let deadline = self 131 205 .config 132 206 .limits ··· 134 208 .map(|WallMs(ms)| Duration::from_millis(ms)); 135 209 136 210 let seed = self.config.seed; 211 + let ops_for_report = ops.clone(); 137 212 let ops_counter = Arc::new(AtomicUsize::new(0)); 213 + let op_errors_counter = Arc::new(AtomicUsize::new(0)); 138 214 let restarts_counter = Arc::new(AtomicUsize::new(0)); 139 - let fut = run_real_inner(self.config, ops_counter.clone(), restarts_counter.clone()); 140 - match deadline { 215 + let fut: std::pin::Pin<Box<dyn std::future::Future<Output = GauntletReport> + Send>> = 216 + match self.config.io { 217 + IoBackend::Real => Box::pin(run_inner_real( 218 + self.config, 219 + ops, 220 + ops_counter.clone(), 221 + op_errors_counter.clone(), 222 + restarts_counter.clone(), 223 + )), 224 + IoBackend::Simulated { fault } => Box::pin(run_inner_simulated( 225 + self.config, 226 + fault, 227 + ops, 228 + ops_counter.clone(), 229 + op_errors_counter.clone(), 230 + restarts_counter.clone(), 231 + )), 232 + }; 233 + let mut report = match deadline { 141 234 Some(d) => match tokio::time::timeout(d, fut).await { 142 235 Ok(r) => r, 143 236 Err(_) => GauntletReport { 144 237 seed, 145 238 ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), 239 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 146 240 restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), 147 241 violations: vec![InvariantViolation { 148 242 invariant: "WallClockBudget", 149 243 detail: format!("exceeded max_wall_ms of {} ms", d.as_millis()), 150 244 }], 245 + ops: OpStream::empty(), 151 246 }, 152 247 }, 153 248 None => fut.await, 249 + }; 250 + report.ops = ops_for_report; 251 + report 252 + } 253 + } 254 + 255 + fn segments_subdir(root: &Path) -> PathBuf { 256 + root.join("segments") 257 + } 258 + 259 + async fn run_inner_real( 260 + config: GauntletConfig, 261 + ops: OpStream, 262 + ops_counter: Arc<AtomicUsize>, 263 + op_errors_counter: Arc<AtomicUsize>, 264 + restarts_counter: Arc<AtomicUsize>, 265 + ) -> GauntletReport { 266 + let dir = tempfile::TempDir::new().expect("tempdir"); 267 + let cfg = blockstore_config(dir.path(), &config.store); 268 + let eventlog_cfg = config.eventlog; 269 + let segments_dir = segments_subdir(dir.path()); 270 + let open = { 271 + let segments_dir = segments_dir.clone(); 272 + move || -> Result<Harness<RealIO>, String> { 273 + let store = TranquilBlockStore::open(cfg.clone()) 274 + .map(Arc::new) 275 + .map_err(|e| e.to_string())?; 276 + let eventlog = match eventlog_cfg { 277 + None => None, 278 + Some(elc) => Some( 279 + open_eventlog(RealIO::new(), segments_dir.clone(), elc.max_segment_size.0) 280 + .map_err(|e| format!("eventlog: {e}"))?, 281 + ), 282 + }; 283 + Ok(Harness { store, eventlog }) 154 284 } 285 + }; 286 + if config.writer_concurrency.0 > 1 { 287 + run_inner_generic_concurrent::<RealIO, _, _>( 288 + config, 289 + ops, 290 + ops_counter, 291 + op_errors_counter, 292 + restarts_counter, 293 + open, 294 + || {}, 295 + false, 296 + ) 297 + .await 298 + } else { 299 + run_inner_generic::<RealIO, _, _>( 300 + config, 301 + ops, 302 + ops_counter, 303 + op_errors_counter, 304 + restarts_counter, 305 + open, 306 + || {}, 307 + false, 308 + ) 309 + .await 155 310 } 156 311 } 157 312 158 - async fn run_real_inner( 313 + async fn run_inner_simulated( 159 314 config: GauntletConfig, 315 + fault: FaultConfig, 316 + ops: OpStream, 160 317 ops_counter: Arc<AtomicUsize>, 318 + op_errors_counter: Arc<AtomicUsize>, 161 319 restarts_counter: Arc<AtomicUsize>, 162 320 ) -> GauntletReport { 163 321 let dir = tempfile::TempDir::new().expect("tempdir"); 164 - let op_stream: OpStream = config.workload.generate(config.seed, config.op_count); 322 + let cfg = blockstore_config(dir.path(), &config.store); 323 + let tolerate_errors = fault.injects_errors(); 324 + let eventlog_cfg = config.eventlog; 325 + let segments_dir = segments_subdir(dir.path()); 326 + let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::new(config.seed.0, fault)); 327 + let sim_for_open = Arc::clone(&sim); 328 + let open = { 329 + let segments_dir = segments_dir.clone(); 330 + move || -> Result<Harness<Arc<SimulatedIO>>, String> { 331 + let factory_sim = Arc::clone(&sim_for_open); 332 + let make_io = move || Arc::clone(&factory_sim); 333 + let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(cfg.clone(), make_io) 334 + .map(Arc::new) 335 + .map_err(|e| e.to_string())?; 336 + let eventlog = match eventlog_cfg { 337 + None => None, 338 + Some(elc) => Some( 339 + open_eventlog( 340 + Arc::clone(&sim_for_open), 341 + segments_dir.clone(), 342 + elc.max_segment_size.0, 343 + ) 344 + .map_err(|e| format!("eventlog: {e}"))?, 345 + ), 346 + }; 347 + Ok(Harness { store, eventlog }) 348 + } 349 + }; 350 + let sim_for_crash = Arc::clone(&sim); 351 + let crash = move || sim_for_crash.crash(); 352 + if config.writer_concurrency.0 > 1 { 353 + run_inner_generic_concurrent::<Arc<SimulatedIO>, _, _>( 354 + config, 355 + ops, 356 + ops_counter, 357 + op_errors_counter, 358 + restarts_counter, 359 + open, 360 + crash, 361 + tolerate_errors, 362 + ) 363 + .await 364 + } else { 365 + run_inner_generic::<Arc<SimulatedIO>, _, _>( 366 + config, 367 + ops, 368 + ops_counter, 369 + op_errors_counter, 370 + restarts_counter, 371 + open, 372 + crash, 373 + tolerate_errors, 374 + ) 375 + .await 376 + } 377 + } 165 378 379 + fn open_eventlog<S: StorageIO + Send + Sync + 'static>( 380 + io: S, 381 + segments_dir: PathBuf, 382 + max_segment_size: u64, 383 + ) -> std::io::Result<EventLogState<S>> { 384 + let manager = Arc::new(SegmentManager::new( 385 + io, 386 + segments_dir.clone(), 387 + max_segment_size, 388 + )?); 389 + let writer = EventLogWriter::open( 390 + Arc::clone(&manager), 391 + DEFAULT_INDEX_INTERVAL, 392 + MAX_EVENT_PAYLOAD, 393 + )?; 394 + Ok(EventLogState { 395 + writer, 396 + manager, 397 + segments_dir, 398 + max_segment_size, 399 + }) 400 + } 401 + 402 + #[allow(clippy::too_many_arguments)] 403 + async fn run_inner_generic<S, Open, Crash>( 404 + config: GauntletConfig, 405 + op_stream: OpStream, 406 + ops_counter: Arc<AtomicUsize>, 407 + op_errors_counter: Arc<AtomicUsize>, 408 + restarts_counter: Arc<AtomicUsize>, 409 + mut open: Open, 410 + mut crash: Crash, 411 + tolerate_op_errors: bool, 412 + ) -> GauntletReport 413 + where 414 + S: StorageIO + Send + Sync + 'static, 415 + Open: FnMut() -> Result<Harness<S>, String>, 416 + Crash: FnMut(), 417 + { 166 418 let mut oracle = Oracle::new(); 167 419 let mut violations: Vec<InvariantViolation> = Vec::new(); 168 420 169 - let mut store = Arc::new( 170 - TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)).expect("open store"), 171 - ); 421 + let mut harness: Option<Harness<S>> = match open() { 422 + Ok(h) => Some(h), 423 + Err(e) => { 424 + return GauntletReport { 425 + seed: config.seed, 426 + ops_executed: OpsExecuted(0), 427 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 428 + restarts: RestartCount(0), 429 + violations: vec![InvariantViolation { 430 + invariant: "OpenStore", 431 + detail: format!("initial open: {e}"), 432 + }], 433 + ops: OpStream::empty(), 434 + }; 435 + } 436 + }; 172 437 let mut root: Option<Cid> = None; 173 438 let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); 439 + let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); 174 440 let mut halt_ops = false; 175 441 176 - let mid_run_set = config 177 - .invariants 178 - .without(InvariantSet::RESTART_IDEMPOTENT) 179 - .without(InvariantSet::ACKED_WRITE_PERSISTENCE); 180 442 let post_reopen_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); 181 443 182 444 for (idx, op) in op_stream.iter().enumerate() { 183 445 if halt_ops { 184 446 break; 185 447 } 186 - match apply_op(&store, &mut root, &mut oracle, op, &config.workload).await { 448 + let live = harness 449 + .as_mut() 450 + .expect("harness invariant: present when halt_ops is false"); 451 + let root_before = root; 452 + match apply_op(live, &mut root, &mut oracle, op, &config.workload).await { 187 453 Ok(()) => {} 188 454 Err(e) => { 455 + if tolerate_op_errors { 456 + op_errors_counter.fetch_add(1, Ordering::Relaxed); 457 + continue; 458 + } 189 459 violations.push(InvariantViolation { 190 460 invariant: "OpExecution", 191 461 detail: format!("op {idx}: {e}"), ··· 194 464 continue; 195 465 } 196 466 } 467 + let _ = root_before; 197 468 ops_counter.store(idx + 1, Ordering::Relaxed); 198 469 199 - if should_restart(config.restart_policy, OpIndex(idx), &mut restart_rng) { 200 - drop(store); 201 - store = Arc::new( 202 - TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) 203 - .expect("reopen store"), 204 - ); 205 - let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; 206 - 207 - if let Err(e) = refresh_oracle_graph(&store, &mut oracle, root).await { 470 + let action = should_restart(config.restart_policy, OpIndex(idx), &mut restart_rng); 471 + let crashing = matches!(action, RestartAction::Crash); 472 + if matches!(action, RestartAction::None) { 473 + continue; 474 + } 475 + if crashing { 476 + crash(); 477 + oracle.record_crash(); 478 + } 479 + shutdown_harness(&mut harness); 480 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { 481 + Ok(reopened) => { 482 + harness = Some(reopened); 483 + let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; 484 + let live = harness.as_ref().expect("just reopened"); 485 + let before = violations.len(); 486 + violations.extend( 487 + run_quick_check( 488 + &live.store, 489 + &oracle, 490 + root, 491 + &mut sample_rng, 492 + QUICK_SAMPLE_SIZE, 493 + n, 494 + ) 495 + .await, 496 + ); 497 + if violations.len() > before { 498 + halt_ops = true; 499 + } 500 + } 501 + Err(detail) => { 208 502 violations.push(InvariantViolation { 209 - invariant: "OpExecution", 210 - detail: format!("refresh after restart {n}: {e}"), 503 + invariant: "ReopenFailed", 504 + detail: format!("reopen after op {idx}: {detail}"), 211 505 }); 212 506 halt_ops = true; 213 - continue; 507 + break; 214 508 } 215 - let before = violations.len(); 216 - violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); 217 - if violations.len() > before { 509 + } 510 + } 511 + 512 + if !halt_ops && tolerate_op_errors && harness.is_some() { 513 + crash(); 514 + oracle.record_crash(); 515 + shutdown_harness(&mut harness); 516 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { 517 + Ok(reopened) => harness = Some(reopened), 518 + Err(detail) => { 519 + violations.push(InvariantViolation { 520 + invariant: "ReopenFailed", 521 + detail: format!("reopen after post-run crash: {detail}"), 522 + }); 218 523 halt_ops = true; 219 524 } 220 525 } 221 526 } 222 527 223 - if !halt_ops { 224 - match refresh_oracle_graph(&store, &mut oracle, root).await { 528 + let end_of_run_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); 529 + if !halt_ops && let Some(live) = harness.as_ref() { 530 + match refresh_oracle_graph(&live.store, &mut oracle, root).await { 225 531 Ok(()) => { 226 532 let before = violations.len(); 227 - violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); 533 + let snapshot = eventlog_snapshot(live.eventlog.as_ref()); 534 + violations.extend( 535 + run_invariants(&live.store, &oracle, root, snapshot, end_of_run_set).await, 536 + ); 228 537 if violations.len() > before { 229 538 halt_ops = true; 230 539 } 231 540 } 232 541 Err(e) => { 233 542 violations.push(InvariantViolation { 234 - invariant: "OpExecution", 235 - detail: format!("refresh at end: {e}"), 543 + invariant: "MstRootDurability", 544 + detail: format!("refresh after final reopen: {e}"), 236 545 }); 237 546 halt_ops = true; 238 547 } 239 548 } 240 549 } 241 550 242 - if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) && !halt_ops { 243 - let pre_snapshot = snapshot_block_index(&store); 244 - drop(store); 245 - let reopened = Arc::new( 246 - TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) 247 - .expect("reopen for RestartIdempotent"), 248 - ); 249 - let post_snapshot = snapshot_block_index(&reopened); 250 - if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { 251 - violations.push(InvariantViolation { 252 - invariant: "RestartIdempotent", 253 - detail, 254 - }); 255 - } else { 256 - violations.extend(run_invariants(&reopened, &oracle, root, post_reopen_set).await); 551 + if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) 552 + && !halt_ops 553 + && let Some(live) = harness.as_ref() 554 + { 555 + let pre_snapshot = snapshot_block_index(&live.store); 556 + shutdown_harness(&mut harness); 557 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { 558 + Ok(reopened) => { 559 + let post_snapshot = snapshot_block_index(&reopened.store); 560 + if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { 561 + violations.push(InvariantViolation { 562 + invariant: "RestartIdempotent", 563 + detail, 564 + }); 565 + } else { 566 + let snapshot = eventlog_snapshot(reopened.eventlog.as_ref()); 567 + violations.extend( 568 + run_invariants(&reopened.store, &oracle, root, snapshot, post_reopen_set) 569 + .await, 570 + ); 571 + } 572 + } 573 + Err(detail) => { 574 + violations.push(InvariantViolation { 575 + invariant: "ReopenFailed", 576 + detail: format!("reopen for idempotency check: {detail}"), 577 + }); 578 + } 257 579 } 258 580 } 259 581 260 582 GauntletReport { 261 583 seed: config.seed, 262 584 ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), 585 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 263 586 restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), 264 587 violations, 588 + ops: OpStream::empty(), 265 589 } 266 590 } 267 591 268 - async fn run_invariants( 269 - store: &Arc<TranquilBlockStore>, 592 + fn eventlog_snapshot<S: StorageIO + Send + Sync + 'static>( 593 + state: Option<&EventLogState<S>>, 594 + ) -> Option<EventLogSnapshot> { 595 + let s = state?; 596 + let segments = s.manager.list_segments().unwrap_or_default(); 597 + let mut events: Vec<SnapshotEvent> = Vec::new(); 598 + let mut segment_last_ts: Vec<(SegmentId, u64)> = Vec::new(); 599 + segments.iter().for_each(|&id| { 600 + let per_segment: Vec<ValidEvent> = match s.manager.open_for_read(id) { 601 + Ok(fd) => match SegmentReader::open(s.manager.io(), fd, MAX_EVENT_PAYLOAD) { 602 + Ok(reader) => reader.valid_prefix().unwrap_or_default(), 603 + Err(_) => Vec::new(), 604 + }, 605 + Err(_) => Vec::new(), 606 + }; 607 + if let Some(last) = per_segment.last() { 608 + segment_last_ts.push((id, last.timestamp.raw())); 609 + } 610 + per_segment.into_iter().for_each(|e| { 611 + events.push(SnapshotEvent { 612 + seq: e.seq, 613 + timestamp_us: e.timestamp.raw(), 614 + event_type_raw: e.event_type.raw(), 615 + did_hash: e.did_hash.raw(), 616 + }); 617 + }); 618 + }); 619 + Some(EventLogSnapshot { 620 + segments_dir: s.segments_dir.clone(), 621 + max_segment_size: s.max_segment_size, 622 + synced_seq: s.writer.synced_seq(), 623 + segments, 624 + events, 625 + segment_last_ts, 626 + }) 627 + } 628 + 629 + fn shutdown_harness<S: StorageIO + Send + Sync + 'static>(harness: &mut Option<Harness<S>>) { 630 + if let Some(h) = harness.as_mut() 631 + && let Some(el) = h.eventlog.as_mut() 632 + { 633 + let _ = el.writer.shutdown(); 634 + el.manager.shutdown(); 635 + } 636 + let _ = harness.take(); 637 + } 638 + 639 + const MAX_REOPEN_ATTEMPTS: usize = 5; 640 + 641 + fn reopen_with_recovery<S, Open, Crash>( 642 + open: &mut Open, 643 + crash: &mut Crash, 644 + tolerate: bool, 645 + ) -> Result<Harness<S>, String> 646 + where 647 + S: StorageIO + Send + Sync + 'static, 648 + Open: FnMut() -> Result<Harness<S>, String>, 649 + Crash: FnMut(), 650 + { 651 + let mut errors: Vec<String> = Vec::new(); 652 + (0..MAX_REOPEN_ATTEMPTS) 653 + .find_map(|attempt| match open() { 654 + Ok(h) => Some(Ok(h)), 655 + Err(e) => { 656 + errors.push(format!("attempt {attempt}: {e}")); 657 + if !tolerate { 658 + return Some(Err(errors.join(" | "))); 659 + } 660 + crash(); 661 + None 662 + } 663 + }) 664 + .unwrap_or_else(|| Err(errors.join(" | "))) 665 + } 666 + 667 + const QUICK_SAMPLE_SIZE: usize = 32; 668 + 669 + fn sample_distinct(rng: &mut Lcg, n: usize, k: usize) -> Vec<usize> { 670 + assert!(k <= n, "sample_distinct: k {k} > n {n}"); 671 + let mut selected: std::collections::HashSet<usize> = 672 + std::collections::HashSet::with_capacity(k); 673 + ((n - k)..n) 674 + .map(|i| { 675 + let t = (rng.next_u64() as usize) % (i + 1); 676 + let pick = if selected.contains(&t) { i } else { t }; 677 + selected.insert(pick); 678 + pick 679 + }) 680 + .collect() 681 + } 682 + 683 + async fn run_quick_check<S: StorageIO + Send + Sync + 'static>( 684 + store: &Arc<TranquilBlockStore<S>>, 685 + oracle: &Oracle, 686 + root: Option<Cid>, 687 + rng: &mut Lcg, 688 + sample_size: usize, 689 + restart_seq: usize, 690 + ) -> Vec<InvariantViolation> { 691 + let Some(r) = root else { 692 + return if oracle.live_count() == 0 { 693 + Vec::new() 694 + } else { 695 + vec![InvariantViolation { 696 + invariant: "QuickHealth", 697 + detail: format!( 698 + "restart {restart_seq}: oracle has {} live records but reopened store has no root", 699 + oracle.live_count() 700 + ), 701 + }] 702 + }; 703 + }; 704 + 705 + let mst = Mst::load(store.clone(), r, None); 706 + let live: Vec<(super::op::CollectionName, super::op::RecordKey, CidBytes)> = oracle 707 + .live_records() 708 + .map(|(c, k, v)| (c.clone(), k.clone(), *v)) 709 + .collect(); 710 + let total = live.len(); 711 + let picks: Vec<usize> = if total <= sample_size { 712 + (0..total).collect() 713 + } else { 714 + sample_distinct(rng, total, sample_size) 715 + }; 716 + 717 + let mut violations: Vec<String> = Vec::new(); 718 + for idx in picks { 719 + let (coll, rkey, expected) = &live[idx]; 720 + let key = format!("{}/{}", coll.0, rkey.0); 721 + match mst.get(&key).await { 722 + Ok(Some(cid)) => match try_cid_to_fixed(&cid) { 723 + Ok(actual) if actual == *expected => {} 724 + Ok(actual) => violations.push(format!( 725 + "{key}: MST cid {} != oracle cid {}", 726 + hex_short(&actual), 727 + hex_short(expected) 728 + )), 729 + Err(e) => violations.push(format!("{key}: cid format: {e}")), 730 + }, 731 + Ok(None) => violations.push(format!("{key}: missing after reopen")), 732 + Err(e) => violations.push(format!("{key}: mst.get error: {e}")), 733 + } 734 + } 735 + 736 + if violations.is_empty() { 737 + Vec::new() 738 + } else { 739 + vec![InvariantViolation { 740 + invariant: "QuickHealth", 741 + detail: format!( 742 + "restart {restart_seq}, sampled {}/{}: {}", 743 + violations.len(), 744 + sample_size.min(total), 745 + violations.join("; ") 746 + ), 747 + }] 748 + } 749 + } 750 + 751 + async fn run_invariants<S: StorageIO + Send + Sync + 'static>( 752 + store: &Arc<TranquilBlockStore<S>>, 270 753 oracle: &Oracle, 271 754 root: Option<Cid>, 755 + eventlog: Option<EventLogSnapshot>, 272 756 set: InvariantSet, 273 757 ) -> Vec<InvariantViolation> { 274 - let ctx = InvariantCtx { 758 + let ctx = InvariantCtx::<S> { 275 759 store, 276 760 oracle, 277 761 root, 762 + eventlog: eventlog.as_ref(), 278 763 }; 279 764 let mut out = Vec::new(); 280 - for inv in invariants_for(set) { 765 + for inv in invariants_for::<S>(set) { 281 766 if let Err(v) = inv.check(&ctx).await { 282 767 out.push(v); 283 768 } ··· 285 770 out 286 771 } 287 772 288 - fn snapshot_block_index(store: &TranquilBlockStore) -> Vec<(CidBytes, u32)> { 773 + fn snapshot_block_index<S: StorageIO + Send + Sync + 'static>( 774 + store: &TranquilBlockStore<S>, 775 + ) -> Vec<(CidBytes, u32)> { 289 776 let mut v: Vec<(CidBytes, u32)> = store 290 777 .block_index() 291 778 .live_entries_snapshot() ··· 302 789 if pre == post { 303 790 return None; 304 791 } 305 - let pre_map: std::collections::HashMap<CidBytes, u32> = pre.iter().copied().collect(); 306 - let post_map: std::collections::HashMap<CidBytes, u32> = post.iter().copied().collect(); 792 + let pre_map: std::collections::BTreeMap<CidBytes, u32> = pre.iter().copied().collect(); 793 + let post_map: std::collections::BTreeMap<CidBytes, u32> = post.iter().copied().collect(); 307 794 308 795 let only_pre: Vec<String> = pre_map 309 796 .iter() ··· 336 823 items.push(format!("+{} more", total - items.len())); 337 824 } 338 825 Some(format!( 339 - "block index changed across clean reopen: pre={} entries, post={} entries; {}", 826 + "block index changed across clean reopen: {} -> {} entries; {}", 340 827 pre.len(), 341 828 post.len(), 342 829 items.join("; "), 343 830 )) 344 831 } 345 832 346 - async fn refresh_oracle_graph( 347 - store: &Arc<TranquilBlockStore>, 833 + async fn refresh_oracle_graph<S: StorageIO + Send + Sync + 'static>( 834 + store: &Arc<TranquilBlockStore<S>>, 348 835 oracle: &mut Oracle, 349 836 root: Option<Cid>, 350 837 ) -> Result<(), String> { ··· 371 858 } 372 859 } 373 860 374 - fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> bool { 861 + enum RestartAction { 862 + None, 863 + Clean, 864 + Crash, 865 + } 866 + 867 + fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> RestartAction { 375 868 match policy { 376 - RestartPolicy::Never => false, 377 - RestartPolicy::EveryNOps(OpInterval(n)) => n > 0 && (idx.0 + 1).is_multiple_of(n), 869 + RestartPolicy::Never => RestartAction::None, 870 + RestartPolicy::EveryNOps(OpInterval(n)) => { 871 + if n > 0 && (idx.0 + 1).is_multiple_of(n) { 872 + RestartAction::Clean 873 + } else { 874 + RestartAction::None 875 + } 876 + } 378 877 RestartPolicy::PoissonByOps(OpInterval(n)) => { 379 - if n == 0 { 380 - false 878 + if n > 0 && rng.next_u64().is_multiple_of(n as u64) { 879 + RestartAction::Clean 880 + } else { 881 + RestartAction::None 882 + } 883 + } 884 + RestartPolicy::CrashAtSyscall(OpInterval(n)) => { 885 + if n > 0 && rng.next_u64().is_multiple_of(n as u64) { 886 + RestartAction::Crash 381 887 } else { 382 - rng.next_u64().is_multiple_of(n as u64) 888 + RestartAction::None 383 889 } 384 890 } 385 891 } ··· 405 911 let span = u64::from(hi.saturating_sub(lo)).max(1); 406 912 (lo as usize) + (u64::from(raw) % span) as usize 407 913 } 914 + SizeDistribution::HeavyTail(range) => { 915 + let ValueBytes(lo) = range.min(); 916 + let ValueBytes(hi) = range.max(); 917 + let lo64 = u64::from(lo); 918 + let hi64 = u64::from(hi); 919 + let span = hi64.saturating_sub(lo64).max(1); 920 + let roll = u64::from(raw) % 1024; 921 + let extra = match roll { 922 + 0..=820 => span / 64, 923 + 821..=1000 => span / 8, 924 + 1001..=1015 => span / 2, 925 + _ => span, 926 + }; 927 + (lo64 + (extra.min(span))) as usize 928 + } 408 929 }; 409 930 let target_len = target_len.max(8); 410 931 let seed_bytes = raw.to_le_bytes(); ··· 413 934 .collect() 414 935 } 415 936 416 - async fn apply_op( 417 - store: &Arc<TranquilBlockStore>, 937 + fn event_payload_bytes(payload_seed: PayloadSeed) -> Vec<u8> { 938 + let raw = payload_seed.0; 939 + let len: usize = 48 + ((raw as usize) % 256); 940 + let seed_bytes = raw.to_le_bytes(); 941 + (0..len) 942 + .map(|i| seed_bytes[i % 4] ^ (i as u8).wrapping_mul(17)) 943 + .collect() 944 + } 945 + 946 + fn event_kind_to_tag(kind: EventKind) -> EventTypeTag { 947 + match kind { 948 + EventKind::Commit => EventTypeTag::COMMIT, 949 + EventKind::Identity => EventTypeTag::IDENTITY, 950 + EventKind::Account => EventTypeTag::ACCOUNT, 951 + EventKind::Sync => EventTypeTag::SYNC, 952 + } 953 + } 954 + 955 + fn did_hash_for_seed(seed: DidSeed) -> DidHash { 956 + DidHash::from_did(&format!("did:plc:gauntlet{:08x}", seed.0)) 957 + } 958 + 959 + async fn apply_op<S: StorageIO + Send + Sync + 'static>( 960 + harness: &mut Harness<S>, 418 961 root: &mut Option<Cid>, 419 962 oracle: &mut Oracle, 420 963 op: &Op, ··· 427 970 value_seed, 428 971 } => { 429 972 let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); 430 - let record_cid = store 973 + let record_cid = harness 974 + .store 431 975 .put(&record_bytes) 432 976 .await 433 977 .map_err(|e| OpError::PutRecord(e.to_string()))?; 434 978 let record_cid_bytes = try_cid_to_fixed(&record_cid)?; 435 979 436 - let outcome = 437 - add_record_inner(store, *root, collection, rkey, record_cid, record_cid_bytes) 438 - .await; 980 + let outcome = add_record_inner( 981 + &harness.store, 982 + *root, 983 + collection, 984 + rkey, 985 + record_cid, 986 + record_cid_bytes, 987 + ) 988 + .await; 439 989 match outcome { 440 990 Ok((new_root, applied)) => { 441 991 *root = Some(new_root); ··· 446 996 } 447 997 Err(e) => { 448 998 if let Err(cleanup_err) = 449 - decrement_obsolete(store, vec![record_cid_bytes]).await 999 + decrement_obsolete(&harness.store, vec![record_cid_bytes]).await 450 1000 { 451 1001 tracing::warn!( 452 1002 op_error = %e, 453 1003 cleanup_error = %cleanup_err, 454 - "AddRecord cleanup decrement failed; refcount may leak", 1004 + "AddRecord cleanup decrement failed", 455 1005 ); 456 1006 } 457 1007 Err(e) ··· 460 1010 } 461 1011 Op::DeleteRecord { collection, rkey } => { 462 1012 let Some(old_root) = *root else { return Ok(()) }; 463 - if oracle.delete(collection, rkey).is_none() { 1013 + if !oracle.contains_record(collection, rkey) { 464 1014 return Ok(()); 465 1015 } 466 1016 let key = format!("{}/{}", collection.0, rkey.0); 467 - let loaded = Mst::load(store.clone(), old_root, None); 1017 + let loaded = Mst::load(harness.store.clone(), old_root, None); 468 1018 let updated = loaded 469 1019 .delete(&key) 470 1020 .await ··· 473 1023 .persist() 474 1024 .await 475 1025 .map_err(|e| OpError::MstPersist(e.to_string()))?; 476 - apply_mst_diff(store, old_root, new_root).await?; 1026 + apply_mst_diff(&harness.store, old_root, new_root).await?; 1027 + oracle.delete(collection, rkey); 477 1028 *root = Some(new_root); 478 1029 Ok(()) 479 1030 } 480 1031 Op::Compact => { 481 - let s = store.clone(); 1032 + let s = harness.store.clone(); 482 1033 tokio::task::spawn_blocking(move || compact_by_liveness(&s)) 483 1034 .await 484 1035 .map_err(|e| OpError::Join(e.to_string()))? 485 1036 } 486 1037 Op::Checkpoint => { 487 - let s = store.clone(); 1038 + let s = harness.store.clone(); 488 1039 tokio::task::spawn_blocking(move || { 489 1040 s.apply_commit_blocking(vec![], vec![]) 490 1041 .map_err(|e| e.to_string()) ··· 493 1044 .map_err(|e| OpError::Join(e.to_string()))? 494 1045 .map_err(OpError::ApplyCommit) 495 1046 } 1047 + Op::AppendEvent { 1048 + did_seed, 1049 + event_kind, 1050 + payload_seed, 1051 + } => { 1052 + let Some(el) = harness.eventlog.as_mut() else { 1053 + return Ok(()); 1054 + }; 1055 + let did_hash = did_hash_for_seed(*did_seed); 1056 + let tag = event_kind_to_tag(*event_kind); 1057 + let payload = event_payload_bytes(*payload_seed); 1058 + let ts_before = TimestampMicros::now().raw(); 1059 + match el.writer.append(did_hash, tag, payload) { 1060 + Ok(seq) => { 1061 + oracle.record_event_append(EventExpectation { 1062 + seq, 1063 + timestamp_us: ts_before, 1064 + kind: *event_kind, 1065 + did_hash: did_hash.raw(), 1066 + }); 1067 + let _ = el.writer.rotate_if_needed(); 1068 + Ok(()) 1069 + } 1070 + Err(e) => Err(OpError::EventLogAppend(e.to_string())), 1071 + } 1072 + } 1073 + Op::SyncEventLog => { 1074 + let Some(el) = harness.eventlog.as_mut() else { 1075 + return Ok(()); 1076 + }; 1077 + match el.writer.sync() { 1078 + Ok(result) => { 1079 + let _ = el.manager.io().sync_dir(el.segments_dir.as_path()); 1080 + let _ = el.writer.rotate_if_needed(); 1081 + oracle.record_event_sync(result.synced_through); 1082 + Ok(()) 1083 + } 1084 + Err(e) => Err(OpError::EventLogSync(e.to_string())), 1085 + } 1086 + } 1087 + Op::RunRetention { max_age_secs } => { 1088 + let Some(el) = harness.eventlog.as_mut() else { 1089 + return Ok(()); 1090 + }; 1091 + run_retention(el, oracle, *max_age_secs).map_err(OpError::EventLogRetention) 1092 + } 1093 + Op::ReadRecord { collection, rkey } => { 1094 + let Some(r) = *root else { return Ok(()) }; 1095 + let key = format!("{}/{}", collection.0, rkey.0); 1096 + let mst = Mst::load(harness.store.clone(), r, None); 1097 + let _ = mst.get(&key).await; 1098 + Ok(()) 1099 + } 1100 + Op::ReadBlock { value_seed } => { 1101 + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); 1102 + let record_cid = hash_to_cid_bytes(&record_bytes); 1103 + let _ = harness.store.get_block_sync(&record_cid); 1104 + Ok(()) 1105 + } 496 1106 } 497 1107 } 498 1108 499 - async fn add_record_inner( 500 - store: &Arc<TranquilBlockStore>, 1109 + fn run_retention<S: StorageIO + Send + Sync + 'static>( 1110 + el: &mut EventLogState<S>, 1111 + oracle: &mut Oracle, 1112 + max_age: RetentionSecs, 1113 + ) -> Result<(), String> { 1114 + let sync_result = el.writer.sync().map_err(|e| e.to_string())?; 1115 + let _ = el.writer.rotate_if_needed(); 1116 + oracle.record_event_sync(sync_result.synced_through); 1117 + let active_id = sync_result.segment_id; 1118 + let now_us = TimestampMicros::now().raw(); 1119 + let max_age_us = u64::from(max_age.0).saturating_mul(1_000_000); 1120 + let cutoff_us = now_us.saturating_sub(max_age_us); 1121 + let segments = el.manager.list_segments().map_err(|e| e.to_string())?; 1122 + segments 1123 + .iter() 1124 + .take_while(|&&id| id != active_id) 1125 + .try_for_each(|&id| -> Result<(), String> { 1126 + let last_ts = segment_last_timestamp(&el.manager, id).map_err(|e| e.to_string())?; 1127 + match last_ts { 1128 + Some(ts) if ts < cutoff_us => { 1129 + el.manager.delete_segment(id).map_err(|e| e.to_string()) 1130 + } 1131 + _ => Ok(()), 1132 + } 1133 + })?; 1134 + oracle.record_retention(cutoff_us); 1135 + Ok(()) 1136 + } 1137 + 1138 + fn segment_last_timestamp<S: StorageIO + Send + Sync + 'static>( 1139 + manager: &SegmentManager<S>, 1140 + id: SegmentId, 1141 + ) -> std::io::Result<Option<u64>> { 1142 + let fd = manager.open_for_read(id)?; 1143 + let reader = SegmentReader::open(manager.io(), fd, MAX_EVENT_PAYLOAD)?; 1144 + let events = reader.valid_prefix()?; 1145 + Ok(events.last().map(|e: &ValidEvent| e.timestamp.raw())) 1146 + } 1147 + 1148 + async fn add_record_inner<S: StorageIO + Send + Sync + 'static>( 1149 + store: &Arc<TranquilBlockStore<S>>, 501 1150 root: Option<Cid>, 502 1151 collection: &super::op::CollectionName, 503 1152 rkey: &super::op::RecordKey, ··· 531 1180 } 532 1181 } 533 1182 534 - async fn decrement_obsolete( 535 - store: &Arc<TranquilBlockStore>, 1183 + async fn decrement_obsolete<S: StorageIO + Send + Sync + 'static>( 1184 + store: &Arc<TranquilBlockStore<S>>, 536 1185 obsolete: Vec<CidBytes>, 537 1186 ) -> Result<(), OpError> { 538 1187 let s = store.clone(); ··· 545 1194 .map_err(OpError::ApplyCommit) 546 1195 } 547 1196 548 - async fn apply_mst_diff( 549 - store: &Arc<TranquilBlockStore>, 1197 + async fn apply_mst_diff<S: StorageIO + Send + Sync + 'static>( 1198 + store: &Arc<TranquilBlockStore<S>>, 550 1199 old_root: Cid, 551 1200 new_root: Cid, 552 1201 ) -> Result<(), OpError> { ··· 574 1223 575 1224 const COMPACT_LIVENESS_CEILING: f64 = 0.99; 576 1225 577 - fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> { 1226 + fn compact_by_liveness<S: StorageIO + Send + Sync + 'static>( 1227 + store: &TranquilBlockStore<S>, 1228 + ) -> Result<(), OpError> { 578 1229 let liveness = store 579 1230 .compaction_liveness(0) 580 1231 .map_err(|e| OpError::CompactFile(format!("compaction_liveness: {e}")))?; ··· 591 1242 Err(e) => Err(OpError::CompactFile(format!("{fid}: {e}"))), 592 1243 }) 593 1244 } 1245 + 1246 + async fn apply_op_concurrent<S: StorageIO + Send + Sync + 'static>( 1247 + shared: &Arc<SharedState<S>>, 1248 + op: &Op, 1249 + workload: &WorkloadModel, 1250 + ) -> Result<(), OpError> { 1251 + match op { 1252 + Op::AddRecord { 1253 + collection, 1254 + rkey, 1255 + value_seed, 1256 + } => { 1257 + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); 1258 + let record_cid = shared 1259 + .store 1260 + .put(&record_bytes) 1261 + .await 1262 + .map_err(|e| OpError::PutRecord(e.to_string()))?; 1263 + let record_cid_bytes = try_cid_to_fixed(&record_cid)?; 1264 + 1265 + let mut state = shared.write.lock().await; 1266 + let outcome = add_record_inner( 1267 + &shared.store, 1268 + state.root, 1269 + collection, 1270 + rkey, 1271 + record_cid, 1272 + record_cid_bytes, 1273 + ) 1274 + .await; 1275 + match outcome { 1276 + Ok((new_root, applied)) => { 1277 + state.root = Some(new_root); 1278 + if applied { 1279 + state 1280 + .oracle 1281 + .add(collection.clone(), rkey.clone(), record_cid_bytes); 1282 + } 1283 + Ok(()) 1284 + } 1285 + Err(e) => { 1286 + drop(state); 1287 + if let Err(cleanup) = 1288 + decrement_obsolete(&shared.store, vec![record_cid_bytes]).await 1289 + { 1290 + tracing::warn!( 1291 + op_error = %e, 1292 + cleanup_error = %cleanup, 1293 + "AddRecord concurrent cleanup decrement failed", 1294 + ); 1295 + } 1296 + Err(e) 1297 + } 1298 + } 1299 + } 1300 + Op::DeleteRecord { collection, rkey } => { 1301 + let mut state = shared.write.lock().await; 1302 + let Some(old_root) = state.root else { 1303 + return Ok(()); 1304 + }; 1305 + if !state.oracle.contains_record(collection, rkey) { 1306 + return Ok(()); 1307 + } 1308 + let key = format!("{}/{}", collection.0, rkey.0); 1309 + let loaded = Mst::load(shared.store.clone(), old_root, None); 1310 + let updated = loaded 1311 + .delete(&key) 1312 + .await 1313 + .map_err(|e| OpError::MstDelete(e.to_string()))?; 1314 + let new_root = updated 1315 + .persist() 1316 + .await 1317 + .map_err(|e| OpError::MstPersist(e.to_string()))?; 1318 + apply_mst_diff(&shared.store, old_root, new_root).await?; 1319 + state.oracle.delete(collection, rkey); 1320 + state.root = Some(new_root); 1321 + Ok(()) 1322 + } 1323 + Op::Compact => { 1324 + let _guard = shared.write.lock().await; 1325 + let s = shared.store.clone(); 1326 + tokio::task::spawn_blocking(move || compact_by_liveness(&s)) 1327 + .await 1328 + .map_err(|e| OpError::Join(e.to_string()))? 1329 + } 1330 + Op::Checkpoint => { 1331 + let s = shared.store.clone(); 1332 + tokio::task::spawn_blocking(move || { 1333 + s.apply_commit_blocking(vec![], vec![]) 1334 + .map_err(|e| e.to_string()) 1335 + }) 1336 + .await 1337 + .map_err(|e| OpError::Join(e.to_string()))? 1338 + .map_err(OpError::ApplyCommit) 1339 + } 1340 + Op::AppendEvent { 1341 + did_seed, 1342 + event_kind, 1343 + payload_seed, 1344 + } => { 1345 + let did_hash = did_hash_for_seed(*did_seed); 1346 + let tag = event_kind_to_tag(*event_kind); 1347 + let payload = event_payload_bytes(*payload_seed); 1348 + let ts_before = TimestampMicros::now().raw(); 1349 + let mut state = shared.write.lock().await; 1350 + let Some(el) = state.eventlog.as_mut() else { 1351 + return Ok(()); 1352 + }; 1353 + match el.writer.append(did_hash, tag, payload) { 1354 + Ok(seq) => { 1355 + let _ = el.writer.rotate_if_needed(); 1356 + state.oracle.record_event_append(EventExpectation { 1357 + seq, 1358 + timestamp_us: ts_before, 1359 + kind: *event_kind, 1360 + did_hash: did_hash.raw(), 1361 + }); 1362 + Ok(()) 1363 + } 1364 + Err(e) => Err(OpError::EventLogAppend(e.to_string())), 1365 + } 1366 + } 1367 + Op::SyncEventLog => { 1368 + let mut state = shared.write.lock().await; 1369 + let Some(el) = state.eventlog.as_mut() else { 1370 + return Ok(()); 1371 + }; 1372 + match el.writer.sync() { 1373 + Ok(result) => { 1374 + let _ = el.manager.io().sync_dir(el.segments_dir.as_path()); 1375 + let _ = el.writer.rotate_if_needed(); 1376 + state.oracle.record_event_sync(result.synced_through); 1377 + Ok(()) 1378 + } 1379 + Err(e) => Err(OpError::EventLogSync(e.to_string())), 1380 + } 1381 + } 1382 + Op::RunRetention { max_age_secs } => { 1383 + let mut state_guard = shared.write.lock().await; 1384 + let state = &mut *state_guard; 1385 + let WriteState { 1386 + oracle, eventlog, .. 1387 + } = state; 1388 + let Some(el) = eventlog.as_mut() else { 1389 + return Ok(()); 1390 + }; 1391 + run_retention(el, oracle, *max_age_secs).map_err(OpError::EventLogRetention) 1392 + } 1393 + Op::ReadRecord { collection, rkey } => { 1394 + let r = { shared.write.lock().await.root }; 1395 + let Some(r) = r else { 1396 + return Ok(()); 1397 + }; 1398 + let key = format!("{}/{}", collection.0, rkey.0); 1399 + let mst = Mst::load(shared.store.clone(), r, None); 1400 + let _ = mst.get(&key).await; 1401 + Ok(()) 1402 + } 1403 + Op::ReadBlock { value_seed } => { 1404 + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); 1405 + let record_cid = hash_to_cid_bytes(&record_bytes); 1406 + let _ = shared.store.get_block_sync(&record_cid); 1407 + Ok(()) 1408 + } 1409 + } 1410 + } 1411 + 1412 + #[allow(clippy::too_many_arguments)] 1413 + async fn writer_task<S: StorageIO + Send + Sync + 'static>( 1414 + shared: Arc<SharedState<S>>, 1415 + ops: Arc<Vec<Op>>, 1416 + index: Arc<AtomicUsize>, 1417 + end: usize, 1418 + workload: Arc<WorkloadModel>, 1419 + ops_counter: Arc<AtomicUsize>, 1420 + op_errors_counter: Arc<AtomicUsize>, 1421 + tolerate_op_errors: bool, 1422 + ) -> Option<InvariantViolation> { 1423 + loop { 1424 + let idx = index.fetch_add(1, Ordering::Relaxed); 1425 + if idx >= end { 1426 + return None; 1427 + } 1428 + let op = &ops[idx]; 1429 + match apply_op_concurrent(&shared, op, &workload).await { 1430 + Ok(()) => { 1431 + ops_counter.fetch_max(idx + 1, Ordering::Relaxed); 1432 + } 1433 + Err(e) => { 1434 + if tolerate_op_errors { 1435 + op_errors_counter.fetch_add(1, Ordering::Relaxed); 1436 + continue; 1437 + } 1438 + return Some(InvariantViolation { 1439 + invariant: "OpExecution", 1440 + detail: format!("op {idx}: {e}"), 1441 + }); 1442 + } 1443 + } 1444 + } 1445 + } 1446 + 1447 + fn compute_chunks( 1448 + policy: RestartPolicy, 1449 + total_ops: usize, 1450 + restart_rng: &mut Lcg, 1451 + ) -> Vec<(Range<usize>, RestartAction)> { 1452 + let points: Vec<(usize, RestartAction)> = (0..total_ops) 1453 + .filter_map(|i| match should_restart(policy, OpIndex(i), restart_rng) { 1454 + RestartAction::None => None, 1455 + a => Some((i + 1, a)), 1456 + }) 1457 + .collect(); 1458 + let mut chunks = Vec::new(); 1459 + let mut start = 0; 1460 + for (end, action) in points { 1461 + chunks.push((start..end, action)); 1462 + start = end; 1463 + } 1464 + if start < total_ops { 1465 + chunks.push((start..total_ops, RestartAction::None)); 1466 + } 1467 + chunks 1468 + } 1469 + 1470 + #[allow(clippy::too_many_arguments)] 1471 + async fn run_inner_generic_concurrent<S, Open, Crash>( 1472 + config: GauntletConfig, 1473 + op_stream: OpStream, 1474 + ops_counter: Arc<AtomicUsize>, 1475 + op_errors_counter: Arc<AtomicUsize>, 1476 + restarts_counter: Arc<AtomicUsize>, 1477 + mut open: Open, 1478 + mut crash: Crash, 1479 + tolerate_op_errors: bool, 1480 + ) -> GauntletReport 1481 + where 1482 + S: StorageIO + Send + Sync + 'static, 1483 + Open: FnMut() -> Result<Harness<S>, String>, 1484 + Crash: FnMut(), 1485 + { 1486 + let ops: Vec<Op> = op_stream.into_vec(); 1487 + let total_ops = ops.len(); 1488 + let ops_arc = Arc::new(ops); 1489 + let workload_arc = Arc::new(config.workload.clone()); 1490 + 1491 + let mut violations: Vec<InvariantViolation> = Vec::new(); 1492 + let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); 1493 + let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); 1494 + let chunks = compute_chunks(config.restart_policy, total_ops, &mut restart_rng); 1495 + 1496 + let mut harness: Option<Harness<S>> = match open() { 1497 + Ok(h) => Some(h), 1498 + Err(e) => { 1499 + return GauntletReport { 1500 + seed: config.seed, 1501 + ops_executed: OpsExecuted(0), 1502 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 1503 + restarts: RestartCount(0), 1504 + violations: vec![InvariantViolation { 1505 + invariant: "OpenStore", 1506 + detail: format!("initial open: {e}"), 1507 + }], 1508 + ops: OpStream::empty(), 1509 + }; 1510 + } 1511 + }; 1512 + let mut root: Option<Cid> = None; 1513 + let mut oracle = Oracle::new(); 1514 + let mut halt_ops = false; 1515 + 1516 + let writer_n = config.writer_concurrency.0.max(1); 1517 + 1518 + for (chunk_i, (chunk_range, action)) in chunks.iter().enumerate() { 1519 + if halt_ops { 1520 + break; 1521 + } 1522 + let current = harness.take().expect("harness present before chunk"); 1523 + let taken_oracle = std::mem::take(&mut oracle); 1524 + let shared = Arc::new(SharedState { 1525 + store: Arc::clone(&current.store), 1526 + write: tokio::sync::Mutex::new(WriteState { 1527 + root, 1528 + oracle: taken_oracle, 1529 + eventlog: current.eventlog, 1530 + }), 1531 + }); 1532 + 1533 + let index = Arc::new(AtomicUsize::new(chunk_range.start)); 1534 + let end = chunk_range.end; 1535 + let mut handles: Vec<tokio::task::JoinHandle<Option<InvariantViolation>>> = Vec::new(); 1536 + for _ in 0..writer_n { 1537 + handles.push(tokio::spawn(writer_task( 1538 + Arc::clone(&shared), 1539 + Arc::clone(&ops_arc), 1540 + Arc::clone(&index), 1541 + end, 1542 + Arc::clone(&workload_arc), 1543 + Arc::clone(&ops_counter), 1544 + Arc::clone(&op_errors_counter), 1545 + tolerate_op_errors, 1546 + ))); 1547 + } 1548 + for h in handles.drain(..) { 1549 + match h.await { 1550 + Ok(None) => {} 1551 + Ok(Some(v)) => { 1552 + violations.push(v); 1553 + halt_ops = true; 1554 + } 1555 + Err(join) => { 1556 + violations.push(InvariantViolation { 1557 + invariant: "TaskJoin", 1558 + detail: join.to_string(), 1559 + }); 1560 + halt_ops = true; 1561 + } 1562 + } 1563 + } 1564 + 1565 + let shared = match Arc::try_unwrap(shared) { 1566 + Ok(s) => s, 1567 + Err(still_held) => { 1568 + violations.push(InvariantViolation { 1569 + invariant: "ConcurrencyInvariant", 1570 + detail: format!( 1571 + "SharedState still held by {} refs after task join", 1572 + Arc::strong_count(&still_held) 1573 + ), 1574 + }); 1575 + halt_ops = true; 1576 + break; 1577 + } 1578 + }; 1579 + let store = shared.store; 1580 + let write_state = shared.write.into_inner(); 1581 + root = write_state.root; 1582 + oracle = write_state.oracle; 1583 + let eventlog = write_state.eventlog; 1584 + harness = Some(Harness { store, eventlog }); 1585 + 1586 + if halt_ops { 1587 + break; 1588 + } 1589 + 1590 + match action { 1591 + RestartAction::None => {} 1592 + RestartAction::Clean | RestartAction::Crash => { 1593 + if matches!(action, RestartAction::Crash) { 1594 + crash(); 1595 + oracle.record_crash(); 1596 + } 1597 + shutdown_harness(&mut harness); 1598 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { 1599 + Ok(reopened) => { 1600 + harness = Some(reopened); 1601 + let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; 1602 + let live = harness.as_ref().expect("just reopened"); 1603 + let before = violations.len(); 1604 + violations.extend( 1605 + run_quick_check( 1606 + &live.store, 1607 + &oracle, 1608 + root, 1609 + &mut sample_rng, 1610 + QUICK_SAMPLE_SIZE, 1611 + n, 1612 + ) 1613 + .await, 1614 + ); 1615 + if violations.len() > before { 1616 + halt_ops = true; 1617 + } 1618 + } 1619 + Err(detail) => { 1620 + violations.push(InvariantViolation { 1621 + invariant: "ReopenFailed", 1622 + detail: format!("reopen after chunk {chunk_i}: {detail}"), 1623 + }); 1624 + halt_ops = true; 1625 + } 1626 + } 1627 + } 1628 + } 1629 + } 1630 + 1631 + if !halt_ops && tolerate_op_errors && harness.is_some() { 1632 + crash(); 1633 + oracle.record_crash(); 1634 + shutdown_harness(&mut harness); 1635 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { 1636 + Ok(reopened) => harness = Some(reopened), 1637 + Err(detail) => { 1638 + violations.push(InvariantViolation { 1639 + invariant: "ReopenFailed", 1640 + detail: format!("reopen after post-run crash: {detail}"), 1641 + }); 1642 + halt_ops = true; 1643 + } 1644 + } 1645 + } 1646 + 1647 + let end_of_run_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); 1648 + if !halt_ops && let Some(live) = harness.as_ref() { 1649 + match refresh_oracle_graph(&live.store, &mut oracle, root).await { 1650 + Ok(()) => { 1651 + let before = violations.len(); 1652 + let snapshot = eventlog_snapshot(live.eventlog.as_ref()); 1653 + violations.extend( 1654 + run_invariants(&live.store, &oracle, root, snapshot, end_of_run_set).await, 1655 + ); 1656 + if violations.len() > before { 1657 + halt_ops = true; 1658 + } 1659 + } 1660 + Err(e) => { 1661 + violations.push(InvariantViolation { 1662 + invariant: "MstRootDurability", 1663 + detail: format!("refresh after final reopen: {e}"), 1664 + }); 1665 + halt_ops = true; 1666 + } 1667 + } 1668 + } 1669 + 1670 + let post_reopen_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); 1671 + if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) 1672 + && !halt_ops 1673 + && let Some(live) = harness.as_ref() 1674 + { 1675 + let pre_snapshot = snapshot_block_index(&live.store); 1676 + shutdown_harness(&mut harness); 1677 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { 1678 + Ok(reopened) => { 1679 + let post_snapshot = snapshot_block_index(&reopened.store); 1680 + if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { 1681 + violations.push(InvariantViolation { 1682 + invariant: "RestartIdempotent", 1683 + detail, 1684 + }); 1685 + } else { 1686 + let snapshot = eventlog_snapshot(reopened.eventlog.as_ref()); 1687 + violations.extend( 1688 + run_invariants(&reopened.store, &oracle, root, snapshot, post_reopen_set) 1689 + .await, 1690 + ); 1691 + } 1692 + } 1693 + Err(detail) => { 1694 + violations.push(InvariantViolation { 1695 + invariant: "ReopenFailed", 1696 + detail: format!("reopen for idempotency check: {detail}"), 1697 + }); 1698 + } 1699 + } 1700 + } 1701 + 1702 + GauntletReport { 1703 + seed: config.seed, 1704 + ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), 1705 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 1706 + restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), 1707 + violations, 1708 + ops: OpStream::empty(), 1709 + } 1710 + }