Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-store/gauntlet): new invariants & scenarios

Lewis: May this revision serve well! <lu5a@proton.me>

+1122 -93
+47
crates/tranquil-store/src/blockstore/cid_util.rs
··· 1 + use cid::Cid; 2 + use multihash::Multihash; 3 + use sha2::{Digest, Sha256}; 4 + 5 + use super::data_file::CID_SIZE; 6 + 7 + pub const DAG_CBOR_CODEC: u64 = 0x71; 8 + pub const SHA2_256_CODE: u64 = 0x12; 9 + 10 + pub fn hash_to_cid(data: &[u8]) -> Cid { 11 + let mut hasher = Sha256::new(); 12 + hasher.update(data); 13 + let digest = hasher.finalize(); 14 + let mh = Multihash::wrap(SHA2_256_CODE, &digest) 15 + .expect("SHA-256 digest is 32 bytes, well within multihash capacity"); 16 + Cid::new_v1(DAG_CBOR_CODEC, mh) 17 + } 18 + 19 + pub fn hash_to_cid_bytes(data: &[u8]) -> [u8; CID_SIZE] { 20 + let raw = hash_to_cid(data).to_bytes(); 21 + raw.try_into() 22 + .expect("CIDv1 + DAG-CBOR + SHA-256 always encodes to CID_SIZE bytes") 23 + } 24 + 25 + #[cfg(test)] 26 + mod tests { 27 + use super::*; 28 + 29 + #[test] 30 + fn hash_to_cid_bytes_is_deterministic() { 31 + let a = hash_to_cid_bytes(b"hello"); 32 + let b = hash_to_cid_bytes(b"hello"); 33 + assert_eq!(a, b); 34 + } 35 + 36 + #[test] 37 + fn hash_to_cid_bytes_diverges_on_single_byte_change() { 38 + assert_ne!(hash_to_cid_bytes(b"abc"), hash_to_cid_bytes(b"abd")); 39 + } 40 + 41 + #[test] 42 + fn hash_to_cid_and_bytes_agree() { 43 + let cid = hash_to_cid(b"payload"); 44 + let raw: [u8; CID_SIZE] = cid.to_bytes().try_into().expect("36 bytes"); 45 + assert_eq!(raw, hash_to_cid_bytes(b"payload")); 46 + } 47 + }
+2
crates/tranquil-store/src/blockstore/mod.rs
··· 1 + mod cid_util; 1 2 mod compaction; 2 3 mod data_file; 3 4 mod group_commit; ··· 8 9 mod store; 9 10 mod types; 10 11 12 + pub use cid_util::{DAG_CBOR_CODEC, SHA2_256_CODE, hash_to_cid, hash_to_cid_bytes}; 11 13 pub use compaction::CompactionError; 12 14 pub use data_file::{ 13 15 BLOCK_FORMAT_VERSION, BLOCK_HEADER_SIZE, BLOCK_MAGIC, BLOCK_RECORD_OVERHEAD, CID_SIZE,
+50 -28
crates/tranquil-store/src/blockstore/store.rs
··· 8 8 use jacquard_repo::error::RepoError; 9 9 use jacquard_repo::repo::CommitData; 10 10 use jacquard_repo::storage::BlockStore; 11 - use multihash::Multihash; 12 - use sha2::{Digest, Sha256}; 13 11 14 12 use crate::fsync_order::PostBlockstoreHook; 15 13 use crate::io::{OpenOptions, RealIO, StorageIO}; 16 14 15 + use super::cid_util::hash_to_cid; 17 16 use super::compaction::CompactionError; 18 17 use super::data_file::{BLOCK_RECORD_OVERHEAD, CID_SIZE, ReadBlockRecord}; 19 18 use super::group_commit::{CommitError, CommitRequest, GroupCommitConfig, GroupCommitWriter}; ··· 25 24 EpochCounter, LivenessInfo, WallClockMs, WriteCursor, 26 25 }; 27 26 28 - const DAG_CBOR_CODEC: u64 = 0x71; 29 - const SHA2_256_CODE: u64 = 0x12; 30 - 31 27 fn cid_to_bytes(cid: &Cid) -> Result<[u8; CID_SIZE], RepoError> { 32 28 let raw = cid.to_bytes(); 33 29 let len = raw.len(); ··· 39 35 ), 40 36 )) 41 37 }) 42 - } 43 - 44 - fn hash_and_cid(data: &[u8]) -> Result<Cid, RepoError> { 45 - let mut hasher = Sha256::new(); 46 - hasher.update(data); 47 - let hash = hasher.finalize(); 48 - let multihash = Multihash::wrap(SHA2_256_CODE, &hash).map_err(|e| { 49 - RepoError::storage(io::Error::new(io::ErrorKind::InvalidData, e.to_string())) 50 - })?; 51 - Ok(Cid::new_v1(DAG_CBOR_CODEC, multihash)) 52 38 } 53 39 54 40 fn block_index_err_to_repo(e: super::hash_index::BlockIndexError) -> RepoError { ··· 123 109 } 124 110 } 125 111 126 - #[derive(Clone)] 127 - pub struct TranquilBlockStore { 112 + pub struct TranquilBlockStore<S: StorageIO + Send + Sync + 'static = RealIO> { 128 113 writer: Arc<WriterHandle>, 129 - reader: Arc<BlockStoreReader<RealIO>>, 114 + reader: Arc<BlockStoreReader<S>>, 130 115 index: Arc<BlockIndex>, 131 116 epoch: EpochCounter, 132 117 data_dir: PathBuf, 133 118 } 134 119 120 + impl<S: StorageIO + Send + Sync + 'static> Clone for TranquilBlockStore<S> { 121 + fn clone(&self) -> Self { 122 + Self { 123 + writer: Arc::clone(&self.writer), 124 + reader: Arc::clone(&self.reader), 125 + index: Arc::clone(&self.index), 126 + epoch: self.epoch.clone(), 127 + data_dir: self.data_dir.clone(), 128 + } 129 + } 130 + } 131 + 135 132 struct WriterHandle { 136 133 inner: parking_lot::Mutex<Option<GroupCommitWriter>>, 137 134 } ··· 153 150 } 154 151 } 155 152 156 - impl TranquilBlockStore { 153 + impl TranquilBlockStore<RealIO> { 157 154 pub fn open(config: BlockStoreConfig) -> Result<Self, RepoError> { 158 155 Self::open_with_hook(config, None) 159 156 } ··· 162 159 config: BlockStoreConfig, 163 160 post_sync_hook: Option<Arc<dyn PostBlockstoreHook>>, 164 161 ) -> Result<Self, RepoError> { 162 + Self::open_with_io_hook(config, RealIO::new, post_sync_hook) 163 + } 164 + } 165 + 166 + impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> { 167 + pub fn open_with_io<F>(config: BlockStoreConfig, make_io: F) -> Result<Self, RepoError> 168 + where 169 + F: Fn() -> S + Send + Sync + Clone + 'static, 170 + { 171 + Self::open_with_io_hook(config, make_io, None) 172 + } 173 + 174 + pub fn open_with_io_hook<F>( 175 + config: BlockStoreConfig, 176 + make_io: F, 177 + post_sync_hook: Option<Arc<dyn PostBlockstoreHook>>, 178 + ) -> Result<Self, RepoError> 179 + where 180 + F: Fn() -> S + Send + Sync + Clone + 'static, 181 + { 165 182 if config.data_dir == config.index_dir { 166 183 return Err(RepoError::storage(io::Error::new( 167 184 io::ErrorKind::InvalidInput, ··· 173 190 174 191 let index = BlockIndex::open(&config.index_dir).map_err(RepoError::storage)?; 175 192 176 - let io = RealIO::new(); 193 + let io = make_io(); 177 194 178 195 let (replayed, file_cursors) = super::hint::replay_hints_into_block_index( 179 196 &io, ··· 195 212 let max_file_size = config.max_file_size; 196 213 let shard_count = config.shard_count; 197 214 let data_dir_for_closure = data_dir.clone(); 215 + let make_io_for_manager = make_io.clone(); 198 216 let make_manager = move || { 199 - DataFileManager::new(RealIO::new(), data_dir_for_closure.clone(), max_file_size) 217 + DataFileManager::new( 218 + make_io_for_manager(), 219 + data_dir_for_closure.clone(), 220 + max_file_size, 221 + ) 200 222 }; 201 223 202 224 let checkpoint_epoch = index.loaded_checkpoint_epoch(); ··· 214 236 let epoch = writer.epoch().clone(); 215 237 216 238 let manager_for_reader = Arc::new(DataFileManager::new( 217 - RealIO::new(), 239 + make_io(), 218 240 data_dir.clone(), 219 241 max_file_size, 220 242 )); ··· 234 256 }) 235 257 } 236 258 237 - fn recover_from_file_cursors<S: StorageIO>( 259 + fn recover_from_file_cursors( 238 260 io: &S, 239 261 data_dir: &Path, 240 262 index: &BlockIndex, ··· 256 278 }) 257 279 } 258 280 259 - fn replay_single_file<S: StorageIO>( 281 + fn replay_single_file( 260 282 io: &S, 261 283 data_dir: &Path, 262 284 index: &BlockIndex, ··· 284 306 result 285 307 } 286 308 287 - fn scan_and_index<S: StorageIO>( 309 + fn scan_and_index( 288 310 io: &S, 289 311 index: &BlockIndex, 290 312 fd: crate::io::FileId, ··· 594 616 } 595 617 } 596 618 597 - impl BlockStore for TranquilBlockStore { 619 + impl<S: StorageIO + Send + Sync + 'static> BlockStore for TranquilBlockStore<S> { 598 620 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 599 621 let cid_bytes = cid_to_bytes(cid)?; 600 622 let reader = Arc::clone(&self.reader); ··· 605 627 } 606 628 607 629 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 608 - let cid = hash_and_cid(data)?; 630 + let cid = hash_to_cid(data); 609 631 let cid_bytes = cid_to_bytes(&cid)?; 610 632 self.send_put_blocks(vec![(cid_bytes, data.to_vec())]) 611 633 .await?; ··· 666 688 } 667 689 } 668 690 669 - impl TranquilBlockStore { 691 + impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> { 670 692 pub async fn decrement_refs(&self, cids: &[Cid]) -> Result<(), RepoError> { 671 693 if cids.is_empty() { 672 694 return Ok(());
+516 -17
crates/tranquil-store/src/gauntlet/invariants.rs
··· 1 1 use std::collections::{HashMap, HashSet}; 2 + use std::path::PathBuf; 2 3 use std::sync::Arc; 3 4 4 5 use async_trait::async_trait; ··· 6 7 use jacquard_repo::mst::Mst; 7 8 8 9 use super::oracle::{Oracle, hex_short, try_cid_to_fixed}; 9 - use crate::blockstore::{CidBytes, TranquilBlockStore}; 10 + use crate::blockstore::{CidBytes, CompactionError, TranquilBlockStore, hash_to_cid_bytes}; 11 + use crate::eventlog::{EventSequence, SegmentId}; 12 + use crate::io::{RealIO, StorageIO}; 10 13 11 14 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 12 15 pub struct InvariantSet(u32); ··· 18 21 pub const ACKED_WRITE_PERSISTENCE: Self = Self(1 << 2); 19 22 pub const READ_AFTER_WRITE: Self = Self(1 << 3); 20 23 pub const RESTART_IDEMPOTENT: Self = Self(1 << 4); 24 + pub const COMPACTION_IDEMPOTENT: Self = Self(1 << 5); 25 + pub const NO_ORPHAN_FILES: Self = Self(1 << 6); 26 + pub const BYTE_BUDGET: Self = Self(1 << 7); 27 + pub const MANIFEST_EQUALS_REALITY: Self = Self(1 << 8); 28 + pub const CHECKSUM_COVERAGE: Self = Self(1 << 9); 29 + pub const MONOTONIC_SEQ: Self = Self(1 << 10); 30 + pub const FSYNC_ORDERING: Self = Self(1 << 11); 31 + pub const TOMBSTONE_BOUND: Self = Self(1 << 12); 21 32 22 33 const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 23 34 | Self::REACHABILITY.0 24 35 | Self::ACKED_WRITE_PERSISTENCE.0 25 36 | Self::READ_AFTER_WRITE.0 26 - | Self::RESTART_IDEMPOTENT.0; 37 + | Self::RESTART_IDEMPOTENT.0 38 + | Self::COMPACTION_IDEMPOTENT.0 39 + | Self::NO_ORPHAN_FILES.0 40 + | Self::BYTE_BUDGET.0 41 + | Self::MANIFEST_EQUALS_REALITY.0 42 + | Self::CHECKSUM_COVERAGE.0 43 + | Self::MONOTONIC_SEQ.0 44 + | Self::FSYNC_ORDERING.0 45 + | Self::TOMBSTONE_BOUND.0; 27 46 28 47 pub const fn contains(self, other: Self) -> bool { 29 48 (self.0 & other.0) == other.0 ··· 49 68 } 50 69 } 51 70 52 - #[derive(Debug)] 71 + #[derive(Debug, Clone)] 53 72 pub struct InvariantViolation { 54 73 pub invariant: &'static str, 55 74 pub detail: String, 56 75 } 57 76 58 - pub struct InvariantCtx<'a> { 59 - pub store: &'a Arc<TranquilBlockStore>, 77 + #[derive(Debug, Clone, Copy)] 78 + pub struct SnapshotEvent { 79 + pub seq: EventSequence, 80 + pub timestamp_us: u64, 81 + pub event_type_raw: u8, 82 + pub did_hash: u32, 83 + } 84 + 85 + #[derive(Debug, Clone)] 86 + pub struct EventLogSnapshot { 87 + pub segments_dir: PathBuf, 88 + pub max_segment_size: u64, 89 + pub synced_seq: EventSequence, 90 + pub segments: Vec<SegmentId>, 91 + pub events: Vec<SnapshotEvent>, 92 + pub segment_last_ts: Vec<(SegmentId, u64)>, 93 + } 94 + 95 + pub struct InvariantCtx<'a, S: StorageIO + Send + Sync + 'static = RealIO> { 96 + pub store: &'a Arc<TranquilBlockStore<S>>, 60 97 pub oracle: &'a Oracle, 61 98 pub root: Option<Cid>, 99 + pub eventlog: Option<&'a EventLogSnapshot>, 62 100 } 63 101 64 102 #[async_trait] 65 - pub trait Invariant: Send + Sync { 103 + pub trait Invariant<S: StorageIO + Send + Sync + 'static>: Send + Sync { 66 104 fn name(&self) -> &'static str; 67 - async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation>; 105 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation>; 68 106 } 69 107 70 108 pub struct RefcountConservation; 71 109 72 110 #[async_trait] 73 - impl Invariant for RefcountConservation { 111 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for RefcountConservation { 74 112 fn name(&self) -> &'static str { 75 113 "RefcountConservation" 76 114 } 77 115 78 - async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 116 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 79 117 let live: Vec<(String, CidBytes)> = ctx.oracle.live_cids_labeled(); 80 118 let live_set: HashSet<CidBytes> = live.iter().map(|(_, c)| *c).collect(); 81 119 let index: HashMap<CidBytes, u32> = ctx ··· 116 154 pub struct Reachability; 117 155 118 156 #[async_trait] 119 - impl Invariant for Reachability { 157 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for Reachability { 120 158 fn name(&self) -> &'static str { 121 159 "Reachability" 122 160 } 123 161 124 - async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 162 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 125 163 let violations: Vec<String> = ctx 126 164 .oracle 127 165 .live_cids_labeled() ··· 147 185 pub struct AckedWritePersistence; 148 186 149 187 #[async_trait] 150 - impl Invariant for AckedWritePersistence { 188 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for AckedWritePersistence { 151 189 fn name(&self) -> &'static str { 152 190 "AckedWritePersistence" 153 191 } 154 192 155 - async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 193 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 156 194 let Some(root) = ctx.root else { 157 195 if ctx.oracle.live_count() == 0 { 158 196 return Ok(()); ··· 195 233 pub struct ReadAfterWrite; 196 234 197 235 #[async_trait] 198 - impl Invariant for ReadAfterWrite { 236 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for ReadAfterWrite { 199 237 fn name(&self) -> &'static str { 200 238 "ReadAfterWrite" 201 239 } 202 240 203 - async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { 241 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 204 242 let Some(root) = ctx.root else { 205 243 return Ok(()); 206 244 }; ··· 246 284 } 247 285 } 248 286 249 - pub fn invariants_for(set: InvariantSet) -> Vec<Box<dyn Invariant>> { 287 + pub struct CompactionIdempotent; 288 + 289 + #[async_trait] 290 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for CompactionIdempotent { 291 + fn name(&self) -> &'static str { 292 + "CompactionIdempotent" 293 + } 294 + 295 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 296 + let store_a = ctx.store.clone(); 297 + let first = tokio::task::spawn_blocking(move || compact_by_liveness(&store_a)) 298 + .await 299 + .map_err(|e| InvariantViolation { 300 + invariant: "CompactionIdempotent", 301 + detail: format!("first compaction join: {e}"), 302 + })?; 303 + if let Err(e) = first { 304 + return Err(InvariantViolation { 305 + invariant: "CompactionIdempotent", 306 + detail: format!("first compaction: {e}"), 307 + }); 308 + } 309 + 310 + let pre = snapshot(ctx.store); 311 + 312 + let store_b = ctx.store.clone(); 313 + let second = tokio::task::spawn_blocking(move || compact_by_liveness(&store_b)) 314 + .await 315 + .map_err(|e| InvariantViolation { 316 + invariant: "CompactionIdempotent", 317 + detail: format!("second compaction join: {e}"), 318 + })?; 319 + if let Err(e) = second { 320 + return Err(InvariantViolation { 321 + invariant: "CompactionIdempotent", 322 + detail: format!("second compaction: {e}"), 323 + }); 324 + } 325 + 326 + let post = snapshot(ctx.store); 327 + 328 + if pre == post { 329 + Ok(()) 330 + } else { 331 + Err(InvariantViolation { 332 + invariant: "CompactionIdempotent", 333 + detail: format!( 334 + "second compaction changed observable state: pre={} entries, post={} entries", 335 + pre.len(), 336 + post.len(), 337 + ), 338 + }) 339 + } 340 + } 341 + } 342 + 343 + fn snapshot<S: StorageIO + Send + Sync + 'static>( 344 + store: &Arc<TranquilBlockStore<S>>, 345 + ) -> Vec<(CidBytes, u32)> { 346 + let mut v: Vec<(CidBytes, u32)> = store 347 + .block_index() 348 + .live_entries_snapshot() 349 + .into_iter() 350 + .map(|(c, r)| (c, r.raw())) 351 + .collect(); 352 + v.sort_unstable_by(|a, b| a.0.cmp(&b.0)); 353 + v 354 + } 355 + 356 + const COMPACT_LIVENESS_CEILING: f64 = 0.99; 357 + 358 + fn compact_by_liveness<S: StorageIO + Send + Sync + 'static>( 359 + store: &TranquilBlockStore<S>, 360 + ) -> Result<(), String> { 361 + let liveness = store 362 + .compaction_liveness(0) 363 + .map_err(|e| format!("compaction_liveness: {e}"))?; 364 + let targets: Vec<_> = liveness 365 + .iter() 366 + .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < COMPACT_LIVENESS_CEILING) 367 + .map(|(&fid, _)| fid) 368 + .collect(); 369 + targets 370 + .into_iter() 371 + .try_for_each(|fid| match store.compact_file(fid, 0) { 372 + Ok(_) => Ok(()), 373 + Err(CompactionError::ActiveFileCannotBeCompacted) => Ok(()), 374 + Err(e) => Err(format!("{fid}: {e}")), 375 + }) 376 + } 377 + 378 + pub struct NoOrphanFiles; 379 + 380 + #[async_trait] 381 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for NoOrphanFiles { 382 + fn name(&self) -> &'static str { 383 + "NoOrphanFiles" 384 + } 385 + 386 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 387 + let store_c = ctx.store.clone(); 388 + let result = tokio::task::spawn_blocking(move || { 389 + let disk = store_c.list_data_files().map_err(|e| e.to_string())?; 390 + let liveness = store_c.compaction_liveness(0).map_err(|e| e.to_string())?; 391 + let orphans: Vec<String> = disk 392 + .iter() 393 + .filter(|fid| !liveness.contains_key(fid)) 394 + .map(|fid| format!("{fid}")) 395 + .collect(); 396 + Ok::<_, String>(orphans) 397 + }) 398 + .await 399 + .map_err(|e| InvariantViolation { 400 + invariant: "NoOrphanFiles", 401 + detail: format!("join: {e}"), 402 + })?; 403 + 404 + let orphans = result.map_err(|e| InvariantViolation { 405 + invariant: "NoOrphanFiles", 406 + detail: e, 407 + })?; 408 + 409 + if orphans.is_empty() { 410 + Ok(()) 411 + } else { 412 + Err(InvariantViolation { 413 + invariant: "NoOrphanFiles", 414 + detail: format!("files on disk missing from index: {}", orphans.join(", ")), 415 + }) 416 + } 417 + } 418 + } 419 + 420 + pub struct ByteBudget { 421 + pub overhead_factor: f64, 422 + pub floor_bytes: u64, 423 + } 424 + 425 + impl Default for ByteBudget { 426 + fn default() -> Self { 427 + Self { 428 + overhead_factor: 8.0, 429 + floor_bytes: 1 << 20, 430 + } 431 + } 432 + } 433 + 434 + #[async_trait] 435 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for ByteBudget { 436 + fn name(&self) -> &'static str { 437 + "ByteBudget" 438 + } 439 + 440 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 441 + let store = ctx.store.clone(); 442 + let factor = self.overhead_factor; 443 + let floor = self.floor_bytes; 444 + tokio::task::spawn_blocking(move || { 445 + let liveness = store.compaction_liveness(0).map_err(|e| e.to_string())?; 446 + let live: u64 = liveness.values().map(|i| i.live_bytes).sum(); 447 + let total: u64 = liveness.values().map(|i| i.total_bytes).sum(); 448 + let budget = (live as f64 * factor) as u64 + floor; 449 + if total <= budget { 450 + Ok(()) 451 + } else { 452 + Err(format!( 453 + "total_bytes {total} exceeds budget {budget}: live_bytes {live}, factor {factor}, floor {floor}" 454 + )) 455 + } 456 + }) 457 + .await 458 + .map_err(|e| InvariantViolation { 459 + invariant: "ByteBudget", 460 + detail: format!("join: {e}"), 461 + })? 462 + .map_err(|e| InvariantViolation { 463 + invariant: "ByteBudget", 464 + detail: e, 465 + }) 466 + } 467 + } 468 + 469 + pub struct ManifestEqualsReality; 470 + 471 + #[async_trait] 472 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for ManifestEqualsReality { 473 + fn name(&self) -> &'static str { 474 + "ManifestEqualsReality" 475 + } 476 + 477 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 478 + let store = ctx.store.clone(); 479 + tokio::task::spawn_blocking(move || { 480 + let listed = store.list_data_files().map_err(|e| e.to_string())?; 481 + let liveness = store.compaction_liveness(0).map_err(|e| e.to_string())?; 482 + 483 + let mut violations: Vec<String> = Vec::new(); 484 + listed.iter().for_each(|fid| { 485 + let path = store.data_file_path(*fid); 486 + match std::fs::metadata(&path) { 487 + Err(e) => violations.push(format!("{fid}: metadata {e}")), 488 + Ok(meta) => { 489 + let on_disk = meta.len(); 490 + match liveness.get(fid) { 491 + None => violations.push(format!( 492 + "{fid}: listed on disk at {on_disk} B but not in index liveness" 493 + )), 494 + Some(info) if on_disk < info.total_bytes => { 495 + violations.push(format!( 496 + "{fid}: on-disk {on_disk} B < index total_bytes {}", 497 + info.total_bytes 498 + )); 499 + } 500 + Some(info) if on_disk > info.total_bytes => { 501 + violations.push(format!( 502 + "{fid}: on-disk {on_disk} B > index total_bytes {}, {} B unaccounted", 503 + info.total_bytes, 504 + on_disk - info.total_bytes 505 + )); 506 + } 507 + Some(_) => {} 508 + } 509 + } 510 + } 511 + }); 512 + 513 + let listed_set: std::collections::HashSet<_> = listed.into_iter().collect(); 514 + liveness.keys().for_each(|fid| { 515 + if !listed_set.contains(fid) { 516 + violations.push(format!("{fid}: in index liveness but missing on disk")); 517 + } 518 + }); 519 + 520 + if violations.is_empty() { 521 + Ok(()) 522 + } else { 523 + Err(violations.join("; ")) 524 + } 525 + }) 526 + .await 527 + .map_err(|e| InvariantViolation { 528 + invariant: "ManifestEqualsReality", 529 + detail: format!("join: {e}"), 530 + })? 531 + .map_err(|e| InvariantViolation { 532 + invariant: "ManifestEqualsReality", 533 + detail: e, 534 + }) 535 + } 536 + } 537 + 538 + pub struct ChecksumCoverage; 539 + 540 + #[async_trait] 541 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for ChecksumCoverage { 542 + fn name(&self) -> &'static str { 543 + "ChecksumCoverage" 544 + } 545 + 546 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 547 + let violations: Vec<String> = ctx 548 + .oracle 549 + .live_cids_labeled() 550 + .into_iter() 551 + .filter_map(|(label, expected)| match ctx.store.get_block_sync(&expected) { 552 + Ok(Some(bytes)) => { 553 + let actual = hash_to_cid_bytes(&bytes); 554 + (actual != expected).then(|| { 555 + format!( 556 + "{label}: silent corruption, bytes hash to {} but store returned them under {}", 557 + hex_short(&actual), 558 + hex_short(&expected), 559 + ) 560 + }) 561 + } 562 + Ok(None) => Some(format!( 563 + "{label}: live CID {} missing from store", 564 + hex_short(&expected) 565 + )), 566 + Err(e) => Some(format!( 567 + "{label}: read error for live CID {}: {e}", 568 + hex_short(&expected) 569 + )), 570 + }) 571 + .collect(); 572 + 573 + if violations.is_empty() { 574 + Ok(()) 575 + } else { 576 + Err(InvariantViolation { 577 + invariant: "ChecksumCoverage", 578 + detail: violations.join("; "), 579 + }) 580 + } 581 + } 582 + } 583 + 584 + pub struct MonotonicSeq; 585 + 586 + #[async_trait] 587 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for MonotonicSeq { 588 + fn name(&self) -> &'static str { 589 + "MonotonicSeq" 590 + } 591 + 592 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 593 + let Some(el) = ctx.eventlog else { 594 + return Ok(()); 595 + }; 596 + let mut violations: Vec<String> = Vec::new(); 597 + el.events 598 + .iter() 599 + .zip(el.events.iter().skip(1)) 600 + .for_each(|(prev, next)| match next.seq.raw() { 601 + n if n == prev.seq.raw() + 1 => {} 602 + n if n == prev.seq.raw() => violations.push(format!("duplicate seq {n}")), 603 + n => violations.push(format!( 604 + "gap: seq {} followed by {n}, expected {}", 605 + prev.seq.raw(), 606 + prev.seq.raw() + 1 607 + )), 608 + }); 609 + if ctx.oracle.last_retention_cutoff_us().is_none() 610 + && let Some(first) = el.events.first() 611 + && first.seq.raw() != 1 612 + { 613 + violations.push(format!( 614 + "first persisted seq is {}, expected 1", 615 + first.seq.raw() 616 + )); 617 + } 618 + let acked_max = ctx 619 + .oracle 620 + .synced_events() 621 + .iter() 622 + .map(|e| e.seq.raw()) 623 + .max() 624 + .unwrap_or(0); 625 + let disk_max = el.events.last().map(|e| e.seq.raw()).unwrap_or(0); 626 + if disk_max < acked_max { 627 + violations.push(format!( 628 + "acked seq {acked_max} missing on disk, disk max {disk_max}" 629 + )); 630 + } 631 + if violations.is_empty() { 632 + Ok(()) 633 + } else { 634 + Err(InvariantViolation { 635 + invariant: "MonotonicSeq", 636 + detail: violations.join("; "), 637 + }) 638 + } 639 + } 640 + } 641 + 642 + pub struct FsyncOrdering; 643 + 644 + #[async_trait] 645 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for FsyncOrdering { 646 + fn name(&self) -> &'static str { 647 + "FsyncOrdering" 648 + } 649 + 650 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 651 + let Some(el) = ctx.eventlog else { 652 + return Ok(()); 653 + }; 654 + let mut violations: Vec<String> = Vec::new(); 655 + 656 + let acked_seqs: HashSet<u64> = ctx 657 + .oracle 658 + .synced_events() 659 + .iter() 660 + .map(|e| e.seq.raw()) 661 + .collect(); 662 + let disk_seqs: HashSet<u64> = el.events.iter().map(|e| e.seq.raw()).collect(); 663 + let missing: Vec<u64> = acked_seqs.difference(&disk_seqs).copied().collect(); 664 + if !missing.is_empty() { 665 + let mut sorted = missing; 666 + sorted.sort_unstable(); 667 + violations.push(format!( 668 + "{} acked events lost on disk, lowest missing seq {}", 669 + sorted.len(), 670 + sorted[0] 671 + )); 672 + } 673 + 674 + if let Some(last_synced) = ctx.oracle.last_synced_seq() 675 + && el.synced_seq.raw() != 0 676 + && el.synced_seq.raw() < last_synced.raw() 677 + { 678 + violations.push(format!( 679 + "writer synced_seq {} below oracle last_synced_seq {}", 680 + el.synced_seq.raw(), 681 + last_synced.raw() 682 + )); 683 + } 684 + 685 + if violations.is_empty() { 686 + Ok(()) 687 + } else { 688 + Err(InvariantViolation { 689 + invariant: "FsyncOrdering", 690 + detail: violations.join("; "), 691 + }) 692 + } 693 + } 694 + } 695 + 696 + pub struct TombstoneBound; 697 + 698 + #[async_trait] 699 + impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for TombstoneBound { 700 + fn name(&self) -> &'static str { 701 + "TombstoneBound" 702 + } 703 + 704 + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { 705 + let Some(el) = ctx.eventlog else { 706 + return Ok(()); 707 + }; 708 + let Some(cutoff_us) = ctx.oracle.last_retention_cutoff_us() else { 709 + return Ok(()); 710 + }; 711 + 712 + let active = el.segments.last().copied(); 713 + 714 + let stale: Vec<String> = el 715 + .segment_last_ts 716 + .iter() 717 + .filter(|(id, last_ts)| Some(*id) != active && *last_ts < cutoff_us) 718 + .map(|(id, last_ts)| format!("segment {id} last_ts {last_ts} < cutoff {cutoff_us}")) 719 + .collect(); 720 + 721 + if stale.is_empty() { 722 + Ok(()) 723 + } else { 724 + Err(InvariantViolation { 725 + invariant: "TombstoneBound", 726 + detail: stale.join("; "), 727 + }) 728 + } 729 + } 730 + } 731 + 732 + pub fn invariants_for<S: StorageIO + Send + Sync + 'static>( 733 + set: InvariantSet, 734 + ) -> Vec<Box<dyn Invariant<S>>> { 250 735 let unknown = set.unknown_bits(); 251 736 assert!( 252 737 unknown == 0, 253 738 "invariants_for: unknown InvariantSet bits 0x{unknown:x}; all bits must map to an impl" 254 739 ); 255 - let candidates: Vec<(InvariantSet, Box<dyn Invariant>)> = vec![ 740 + let candidates: Vec<(InvariantSet, Box<dyn Invariant<S>>)> = vec![ 256 741 ( 257 742 InvariantSet::REFCOUNT_CONSERVATION, 258 743 Box::new(RefcountConservation), ··· 263 748 Box::new(AckedWritePersistence), 264 749 ), 265 750 (InvariantSet::READ_AFTER_WRITE, Box::new(ReadAfterWrite)), 751 + ( 752 + InvariantSet::COMPACTION_IDEMPOTENT, 753 + Box::new(CompactionIdempotent), 754 + ), 755 + (InvariantSet::NO_ORPHAN_FILES, Box::new(NoOrphanFiles)), 756 + (InvariantSet::BYTE_BUDGET, Box::new(ByteBudget::default())), 757 + ( 758 + InvariantSet::MANIFEST_EQUALS_REALITY, 759 + Box::new(ManifestEqualsReality), 760 + ), 761 + (InvariantSet::CHECKSUM_COVERAGE, Box::new(ChecksumCoverage)), 762 + (InvariantSet::MONOTONIC_SEQ, Box::new(MonotonicSeq)), 763 + (InvariantSet::FSYNC_ORDERING, Box::new(FsyncOrdering)), 764 + (InvariantSet::TOMBSTONE_BOUND, Box::new(TombstoneBound)), 266 765 ]; 267 766 candidates 268 767 .into_iter()
+507 -48
crates/tranquil-store/src/gauntlet/scenarios.rs
··· 1 1 use super::invariants::InvariantSet; 2 2 use super::op::{CollectionName, Seed}; 3 3 use super::runner::{ 4 - GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, ShardCount, 5 - StoreConfig, WallMs, 4 + EventLogConfig, GauntletConfig, IoBackend, MaxFileSize, MaxSegmentSize, OpInterval, 5 + RestartPolicy, RunLimits, ShardCount, StoreConfig, WallMs, WriterConcurrency, 6 6 }; 7 7 use super::workload::{ 8 - KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel, 8 + ByteRange, DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution, 9 + ValueBytes, WorkloadModel, 9 10 }; 10 11 use crate::blockstore::GroupCommitConfig; 12 + use crate::sim::FaultConfig; 11 13 12 - #[derive(Debug, Clone, Copy)] 14 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 13 15 pub enum Scenario { 14 16 SmokePR, 15 17 MstChurn, 16 18 MstRestartChurn, 17 19 FullStackRestart, 20 + CatastrophicChurn, 21 + HugeValues, 22 + TinyBatches, 23 + GiantBatches, 24 + ManyFiles, 25 + ModerateFaults, 26 + AggressiveFaults, 27 + TornPages, 28 + Fsyncgate, 29 + FirehoseFanout, 30 + ContendedReaders, 31 + ContendedWriters, 32 + } 33 + 34 + impl Scenario { 35 + pub const fn name(self) -> &'static str { 36 + match self { 37 + Self::SmokePR => "SmokePR", 38 + Self::MstChurn => "MstChurn", 39 + Self::MstRestartChurn => "MstRestartChurn", 40 + Self::FullStackRestart => "FullStackRestart", 41 + Self::CatastrophicChurn => "CatastrophicChurn", 42 + Self::HugeValues => "HugeValues", 43 + Self::TinyBatches => "TinyBatches", 44 + Self::GiantBatches => "GiantBatches", 45 + Self::ManyFiles => "ManyFiles", 46 + Self::ModerateFaults => "ModerateFaults", 47 + Self::AggressiveFaults => "AggressiveFaults", 48 + Self::TornPages => "TornPages", 49 + Self::Fsyncgate => "Fsyncgate", 50 + Self::FirehoseFanout => "FirehoseFanout", 51 + Self::ContendedReaders => "ContendedReaders", 52 + Self::ContendedWriters => "ContendedWriters", 53 + } 54 + } 55 + 56 + pub fn from_name(name: &str) -> Option<Self> { 57 + Self::ALL.iter().copied().find(|s| s.name() == name) 58 + } 59 + 60 + pub const ALL: &'static [Scenario] = &[ 61 + Self::SmokePR, 62 + Self::MstChurn, 63 + Self::MstRestartChurn, 64 + Self::FullStackRestart, 65 + Self::CatastrophicChurn, 66 + Self::HugeValues, 67 + Self::TinyBatches, 68 + Self::GiantBatches, 69 + Self::ManyFiles, 70 + Self::ModerateFaults, 71 + Self::AggressiveFaults, 72 + Self::TornPages, 73 + Self::Fsyncgate, 74 + Self::FirehoseFanout, 75 + Self::ContendedReaders, 76 + Self::ContendedWriters, 77 + ]; 78 + } 79 + 80 + impl std::fmt::Display for Scenario { 81 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 82 + f.write_str(self.name()) 83 + } 84 + } 85 + 86 + #[derive(Debug, thiserror::Error)] 87 + #[error("unknown scenario: {0}")] 88 + pub struct UnknownScenario(pub String); 89 + 90 + impl std::str::FromStr for Scenario { 91 + type Err = UnknownScenario; 92 + 93 + fn from_str(s: &str) -> Result<Self, Self::Err> { 94 + Self::from_name(s).ok_or_else(|| UnknownScenario(s.to_string())) 95 + } 18 96 } 19 97 20 98 pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig { ··· 23 101 Scenario::MstChurn => mst_churn(seed), 24 102 Scenario::MstRestartChurn => mst_restart_churn(seed), 25 103 Scenario::FullStackRestart => full_stack_restart(seed), 104 + Scenario::CatastrophicChurn => catastrophic_churn(seed), 105 + Scenario::HugeValues => huge_values(seed), 106 + Scenario::TinyBatches => tiny_batches(seed), 107 + Scenario::GiantBatches => giant_batches(seed), 108 + Scenario::ManyFiles => many_files(seed), 109 + Scenario::ModerateFaults => moderate_faults(seed), 110 + Scenario::AggressiveFaults => aggressive_faults(seed), 111 + Scenario::TornPages => torn_pages(seed), 112 + Scenario::Fsyncgate => fsyncgate(seed), 113 + Scenario::FirehoseFanout => firehose_fanout(seed), 114 + Scenario::ContendedReaders => contended_readers(seed), 115 + Scenario::ContendedWriters => contended_writers(seed), 26 116 } 27 117 } 28 118 ··· 33 123 ] 34 124 } 35 125 126 + fn block_weights(add: u32, delete: u32, compact: u32, checkpoint: u32) -> OpWeights { 127 + OpWeights { 128 + add, 129 + delete, 130 + compact, 131 + checkpoint, 132 + ..OpWeights::default() 133 + } 134 + } 135 + 136 + fn block_workload( 137 + weights: OpWeights, 138 + size_distribution: SizeDistribution, 139 + key_space: KeySpaceSize, 140 + ) -> WorkloadModel { 141 + WorkloadModel { 142 + weights, 143 + size_distribution, 144 + collections: default_collections(), 145 + key_space, 146 + did_space: DidSpaceSize(32), 147 + retention_max_secs: RetentionMaxSecs(3600), 148 + } 149 + } 150 + 36 151 fn tiny_store() -> StoreConfig { 37 152 StoreConfig { 38 153 max_file_size: MaxFileSize(4096), ··· 49 164 GauntletConfig { 50 165 seed, 51 166 io: IoBackend::Real, 52 - workload: WorkloadModel { 53 - weights: OpWeights { 54 - add: 80, 55 - delete: 0, 56 - compact: 10, 57 - checkpoint: 10, 58 - }, 59 - size_distribution: SizeDistribution::Fixed(ValueBytes(64)), 60 - collections: default_collections(), 61 - key_space: KeySpaceSize(200), 62 - }, 167 + workload: block_workload( 168 + block_weights(80, 0, 10, 10), 169 + SizeDistribution::Fixed(ValueBytes(64)), 170 + KeySpaceSize(200), 171 + ), 63 172 op_count: OpCount(10_000), 64 173 invariants: InvariantSet::REFCOUNT_CONSERVATION 65 174 | InvariantSet::REACHABILITY ··· 71 180 }, 72 181 restart_policy: RestartPolicy::EveryNOps(OpInterval(2_000)), 73 182 store: tiny_store(), 183 + eventlog: None, 184 + writer_concurrency: WriterConcurrency(1), 74 185 } 75 186 } 76 187 ··· 78 189 GauntletConfig { 79 190 seed, 80 191 io: IoBackend::Real, 81 - workload: WorkloadModel { 82 - weights: OpWeights { 83 - add: 85, 84 - delete: 0, 85 - compact: 10, 86 - checkpoint: 5, 87 - }, 88 - size_distribution: SizeDistribution::Fixed(ValueBytes(64)), 89 - collections: default_collections(), 90 - key_space: KeySpaceSize(2_000), 91 - }, 192 + workload: block_workload( 193 + block_weights(85, 0, 10, 5), 194 + SizeDistribution::Fixed(ValueBytes(64)), 195 + KeySpaceSize(2_000), 196 + ), 92 197 op_count: OpCount(100_000), 93 198 invariants: InvariantSet::REFCOUNT_CONSERVATION 94 199 | InvariantSet::REACHABILITY ··· 100 205 }, 101 206 restart_policy: RestartPolicy::Never, 102 207 store: tiny_store(), 208 + eventlog: None, 209 + writer_concurrency: WriterConcurrency(1), 103 210 } 104 211 } 105 212 ··· 107 214 GauntletConfig { 108 215 seed, 109 216 io: IoBackend::Real, 110 - workload: WorkloadModel { 111 - weights: OpWeights { 112 - add: 85, 113 - delete: 0, 114 - compact: 10, 115 - checkpoint: 5, 116 - }, 117 - size_distribution: SizeDistribution::Fixed(ValueBytes(64)), 118 - collections: default_collections(), 119 - key_space: KeySpaceSize(2_000), 120 - }, 217 + workload: block_workload( 218 + block_weights(85, 0, 10, 5), 219 + SizeDistribution::Fixed(ValueBytes(64)), 220 + KeySpaceSize(2_000), 221 + ), 121 222 op_count: OpCount(100_000), 122 223 invariants: InvariantSet::REFCOUNT_CONSERVATION 123 224 | InvariantSet::REACHABILITY ··· 129 230 }, 130 231 restart_policy: RestartPolicy::PoissonByOps(OpInterval(5_000)), 131 232 store: tiny_store(), 233 + eventlog: None, 234 + writer_concurrency: WriterConcurrency(1), 132 235 } 133 236 } 134 237 ··· 136 239 GauntletConfig { 137 240 seed, 138 241 io: IoBackend::Real, 139 - workload: WorkloadModel { 140 - weights: OpWeights { 141 - add: 80, 142 - delete: 0, 143 - compact: 15, 144 - checkpoint: 5, 145 - }, 146 - size_distribution: SizeDistribution::Fixed(ValueBytes(80)), 147 - collections: default_collections(), 148 - key_space: KeySpaceSize(500), 149 - }, 242 + workload: block_workload( 243 + block_weights(80, 0, 15, 5), 244 + SizeDistribution::Fixed(ValueBytes(80)), 245 + KeySpaceSize(500), 246 + ), 150 247 op_count: OpCount(5_000), 151 248 invariants: InvariantSet::REFCOUNT_CONSERVATION 152 249 | InvariantSet::REACHABILITY ··· 162 259 group_commit: GroupCommitConfig::default(), 163 260 shard_count: ShardCount(1), 164 261 }, 262 + eventlog: None, 263 + writer_concurrency: WriterConcurrency(1), 264 + } 265 + } 266 + 267 + fn phase2_invariants() -> InvariantSet { 268 + InvariantSet::REFCOUNT_CONSERVATION 269 + | InvariantSet::REACHABILITY 270 + | InvariantSet::ACKED_WRITE_PERSISTENCE 271 + | InvariantSet::READ_AFTER_WRITE 272 + | InvariantSet::RESTART_IDEMPOTENT 273 + | InvariantSet::COMPACTION_IDEMPOTENT 274 + | InvariantSet::BYTE_BUDGET 275 + | InvariantSet::MANIFEST_EQUALS_REALITY 276 + | InvariantSet::CHECKSUM_COVERAGE 277 + } 278 + 279 + fn catastrophic_churn(seed: Seed) -> GauntletConfig { 280 + GauntletConfig { 281 + seed, 282 + io: IoBackend::Real, 283 + workload: block_workload( 284 + block_weights(94, 0, 5, 1), 285 + SizeDistribution::Fixed(ValueBytes(64)), 286 + KeySpaceSize(200), 287 + ), 288 + op_count: OpCount(1_000_000), 289 + invariants: phase2_invariants(), 290 + limits: RunLimits { 291 + max_wall_ms: Some(WallMs(30 * 60_000)), 292 + }, 293 + restart_policy: RestartPolicy::PoissonByOps(OpInterval(50_000)), 294 + store: tiny_store(), 295 + eventlog: None, 296 + writer_concurrency: WriterConcurrency(1), 297 + } 298 + } 299 + 300 + fn huge_values(seed: Seed) -> GauntletConfig { 301 + GauntletConfig { 302 + seed, 303 + io: IoBackend::Real, 304 + workload: block_workload( 305 + block_weights(85, 5, 8, 2), 306 + SizeDistribution::HeavyTail( 307 + ByteRange::new(ValueBytes(256), ValueBytes(16 * 1024 * 1024)) 308 + .expect("huge_values ByteRange"), 309 + ), 310 + KeySpaceSize(64), 311 + ), 312 + op_count: OpCount(2_000), 313 + invariants: InvariantSet::REFCOUNT_CONSERVATION 314 + | InvariantSet::REACHABILITY 315 + | InvariantSet::ACKED_WRITE_PERSISTENCE 316 + | InvariantSet::READ_AFTER_WRITE 317 + | InvariantSet::RESTART_IDEMPOTENT, 318 + limits: RunLimits { 319 + max_wall_ms: Some(WallMs(10 * 60_000)), 320 + }, 321 + restart_policy: RestartPolicy::EveryNOps(OpInterval(500)), 322 + store: StoreConfig { 323 + max_file_size: MaxFileSize(32 * 1024 * 1024), 324 + group_commit: GroupCommitConfig::default(), 325 + shard_count: ShardCount(1), 326 + }, 327 + eventlog: None, 328 + writer_concurrency: WriterConcurrency(1), 329 + } 330 + } 331 + 332 + fn tiny_batches(seed: Seed) -> GauntletConfig { 333 + GauntletConfig { 334 + seed, 335 + io: IoBackend::Real, 336 + workload: block_workload( 337 + block_weights(85, 0, 5, 10), 338 + SizeDistribution::Fixed(ValueBytes(64)), 339 + KeySpaceSize(500), 340 + ), 341 + op_count: OpCount(10_000), 342 + invariants: phase2_invariants(), 343 + limits: RunLimits { 344 + max_wall_ms: Some(WallMs(120_000)), 345 + }, 346 + restart_policy: RestartPolicy::EveryNOps(OpInterval(2_000)), 347 + store: StoreConfig { 348 + max_file_size: MaxFileSize(4096), 349 + group_commit: GroupCommitConfig { 350 + max_batch_size: 1, 351 + checkpoint_interval_ms: 100, 352 + checkpoint_write_threshold: 1, 353 + ..GroupCommitConfig::default() 354 + }, 355 + shard_count: ShardCount(1), 356 + }, 357 + eventlog: None, 358 + writer_concurrency: WriterConcurrency(1), 359 + } 360 + } 361 + 362 + fn giant_batches(seed: Seed) -> GauntletConfig { 363 + GauntletConfig { 364 + seed, 365 + io: IoBackend::Real, 366 + workload: block_workload( 367 + block_weights(95, 0, 3, 2), 368 + SizeDistribution::Fixed(ValueBytes(64)), 369 + KeySpaceSize(5_000), 370 + ), 371 + op_count: OpCount(50_000), 372 + invariants: phase2_invariants(), 373 + limits: RunLimits { 374 + max_wall_ms: Some(WallMs(10 * 60_000)), 375 + }, 376 + restart_policy: RestartPolicy::EveryNOps(OpInterval(10_000)), 377 + store: StoreConfig { 378 + max_file_size: MaxFileSize(16 * 1024 * 1024), 379 + group_commit: GroupCommitConfig { 380 + max_batch_size: 100_000, 381 + checkpoint_interval_ms: 5_000, 382 + checkpoint_write_threshold: 100_000, 383 + ..GroupCommitConfig::default() 384 + }, 385 + shard_count: ShardCount(1), 386 + }, 387 + eventlog: None, 388 + writer_concurrency: WriterConcurrency(1), 389 + } 390 + } 391 + 392 + fn many_files(seed: Seed) -> GauntletConfig { 393 + GauntletConfig { 394 + seed, 395 + io: IoBackend::Real, 396 + workload: block_workload( 397 + block_weights(80, 10, 5, 5), 398 + SizeDistribution::Fixed(ValueBytes(128)), 399 + KeySpaceSize(2_000), 400 + ), 401 + op_count: OpCount(200_000), 402 + invariants: phase2_invariants(), 403 + limits: RunLimits { 404 + max_wall_ms: Some(WallMs(20 * 60_000)), 405 + }, 406 + restart_policy: RestartPolicy::PoissonByOps(OpInterval(5_000)), 407 + store: StoreConfig { 408 + max_file_size: MaxFileSize(256), 409 + group_commit: GroupCommitConfig::default(), 410 + shard_count: ShardCount(1), 411 + }, 412 + eventlog: None, 413 + writer_concurrency: WriterConcurrency(1), 414 + } 415 + } 416 + 417 + fn sim_invariants() -> InvariantSet { 418 + InvariantSet::REFCOUNT_CONSERVATION 419 + | InvariantSet::REACHABILITY 420 + | InvariantSet::ACKED_WRITE_PERSISTENCE 421 + | InvariantSet::READ_AFTER_WRITE 422 + | InvariantSet::RESTART_IDEMPOTENT 423 + | InvariantSet::NO_ORPHAN_FILES 424 + | InvariantSet::BYTE_BUDGET 425 + | InvariantSet::CHECKSUM_COVERAGE 426 + } 427 + 428 + fn sim_microbench_workload() -> WorkloadModel { 429 + block_workload( 430 + block_weights(80, 10, 5, 5), 431 + SizeDistribution::Fixed(ValueBytes(128)), 432 + KeySpaceSize(500), 433 + ) 434 + } 435 + 436 + fn sim_store() -> StoreConfig { 437 + StoreConfig { 438 + max_file_size: MaxFileSize(16 * 1024), 439 + group_commit: GroupCommitConfig::default(), 440 + shard_count: ShardCount(1), 441 + } 442 + } 443 + 444 + fn moderate_faults(seed: Seed) -> GauntletConfig { 445 + GauntletConfig { 446 + seed, 447 + io: IoBackend::Simulated { 448 + fault: FaultConfig::moderate(), 449 + }, 450 + workload: sim_microbench_workload(), 451 + op_count: OpCount(50_000), 452 + invariants: sim_invariants(), 453 + limits: RunLimits { 454 + max_wall_ms: Some(WallMs(10 * 60_000)), 455 + }, 456 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(2_000)), 457 + store: sim_store(), 458 + eventlog: None, 459 + writer_concurrency: WriterConcurrency(1), 460 + } 461 + } 462 + 463 + fn aggressive_faults(seed: Seed) -> GauntletConfig { 464 + GauntletConfig { 465 + seed, 466 + io: IoBackend::Simulated { 467 + fault: FaultConfig::aggressive(), 468 + }, 469 + workload: sim_microbench_workload(), 470 + op_count: OpCount(50_000), 471 + invariants: sim_invariants(), 472 + limits: RunLimits { 473 + max_wall_ms: Some(WallMs(10 * 60_000)), 474 + }, 475 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(2_000)), 476 + store: sim_store(), 477 + eventlog: None, 478 + writer_concurrency: WriterConcurrency(1), 479 + } 480 + } 481 + 482 + fn torn_pages(seed: Seed) -> GauntletConfig { 483 + GauntletConfig { 484 + seed, 485 + io: IoBackend::Simulated { 486 + fault: FaultConfig::torn_pages_only(), 487 + }, 488 + workload: sim_microbench_workload(), 489 + op_count: OpCount(20_000), 490 + invariants: sim_invariants(), 491 + limits: RunLimits { 492 + max_wall_ms: Some(WallMs(5 * 60_000)), 493 + }, 494 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(1_000)), 495 + store: sim_store(), 496 + eventlog: None, 497 + writer_concurrency: WriterConcurrency(1), 498 + } 499 + } 500 + 501 + fn fsyncgate(seed: Seed) -> GauntletConfig { 502 + GauntletConfig { 503 + seed, 504 + io: IoBackend::Simulated { 505 + fault: FaultConfig::fsyncgate_only(), 506 + }, 507 + workload: sim_microbench_workload(), 508 + op_count: OpCount(10_000), 509 + invariants: sim_invariants(), 510 + limits: RunLimits { 511 + max_wall_ms: Some(WallMs(5 * 60_000)), 512 + }, 513 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(500)), 514 + store: sim_store(), 515 + eventlog: None, 516 + writer_concurrency: WriterConcurrency(1), 517 + } 518 + } 519 + 520 + fn firehose_fanout(seed: Seed) -> GauntletConfig { 521 + GauntletConfig { 522 + seed, 523 + io: IoBackend::Simulated { 524 + fault: FaultConfig::moderate(), 525 + }, 526 + workload: WorkloadModel { 527 + weights: OpWeights { 528 + add: 20, 529 + compact: 2, 530 + checkpoint: 3, 531 + append_event: 60, 532 + sync_event_log: 10, 533 + run_retention: 5, 534 + ..OpWeights::default() 535 + }, 536 + size_distribution: SizeDistribution::Fixed(ValueBytes(128)), 537 + collections: default_collections(), 538 + key_space: KeySpaceSize(500), 539 + did_space: DidSpaceSize(64), 540 + retention_max_secs: RetentionMaxSecs(60), 541 + }, 542 + op_count: OpCount(20_000), 543 + invariants: sim_invariants() 544 + | InvariantSet::MONOTONIC_SEQ 545 + | InvariantSet::FSYNC_ORDERING 546 + | InvariantSet::TOMBSTONE_BOUND, 547 + limits: RunLimits { 548 + max_wall_ms: Some(WallMs(10 * 60_000)), 549 + }, 550 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(2_000)), 551 + store: sim_store(), 552 + eventlog: Some(EventLogConfig { 553 + max_segment_size: MaxSegmentSize(64 * 1024), 554 + }), 555 + writer_concurrency: WriterConcurrency(1), 556 + } 557 + } 558 + 559 + fn contended_readers(seed: Seed) -> GauntletConfig { 560 + GauntletConfig { 561 + seed, 562 + io: IoBackend::Simulated { 563 + fault: FaultConfig::moderate(), 564 + }, 565 + workload: WorkloadModel { 566 + weights: OpWeights { 567 + add: 15, 568 + delete: 1, 569 + compact: 2, 570 + checkpoint: 2, 571 + read_record: 60, 572 + read_block: 20, 573 + ..OpWeights::default() 574 + }, 575 + size_distribution: SizeDistribution::Fixed(ValueBytes(128)), 576 + collections: default_collections(), 577 + key_space: KeySpaceSize(400), 578 + did_space: DidSpaceSize(32), 579 + retention_max_secs: RetentionMaxSecs(3600), 580 + }, 581 + op_count: OpCount(20_000), 582 + invariants: sim_invariants(), 583 + limits: RunLimits { 584 + max_wall_ms: Some(WallMs(10 * 60_000)), 585 + }, 586 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(2_000)), 587 + store: sim_store(), 588 + eventlog: None, 589 + writer_concurrency: WriterConcurrency(64), 590 + } 591 + } 592 + 593 + fn contended_writers(seed: Seed) -> GauntletConfig { 594 + GauntletConfig { 595 + seed, 596 + io: IoBackend::Simulated { 597 + fault: FaultConfig::moderate(), 598 + }, 599 + workload: WorkloadModel { 600 + weights: OpWeights { 601 + add: 85, 602 + delete: 5, 603 + compact: 3, 604 + checkpoint: 2, 605 + read_record: 4, 606 + read_block: 1, 607 + ..OpWeights::default() 608 + }, 609 + size_distribution: SizeDistribution::Fixed(ValueBytes(128)), 610 + collections: default_collections(), 611 + key_space: KeySpaceSize(1_000), 612 + did_space: DidSpaceSize(32), 613 + retention_max_secs: RetentionMaxSecs(3600), 614 + }, 615 + op_count: OpCount(20_000), 616 + invariants: sim_invariants(), 617 + limits: RunLimits { 618 + max_wall_ms: Some(WallMs(10 * 60_000)), 619 + }, 620 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(2_000)), 621 + store: sim_store(), 622 + eventlog: None, 623 + writer_concurrency: WriterConcurrency(32), 165 624 } 166 625 }