Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-store/gauntlet): simulated io fault modes, shrinker, regression dump

Lewis: May this revision serve well! <lu5a@proton.me>

+950 -46
+290
crates/tranquil-store/src/gauntlet/regression.rs
··· 1 + use std::io; 2 + use std::path::{Path, PathBuf}; 3 + 4 + use serde::{Deserialize, Serialize}; 5 + 6 + use super::invariants::InvariantViolation; 7 + use super::op::{Op, OpStream, Seed}; 8 + use super::overrides::ConfigOverrides; 9 + use super::runner::{GauntletConfig, GauntletReport}; 10 + use super::scenarios::{Scenario, UnknownScenario, config_for}; 11 + 12 + pub const SCHEMA_VERSION: u32 = 1; 13 + pub const MIN_SUPPORTED_SCHEMA_VERSION: u32 = 1; 14 + 15 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 16 + pub struct RegressionViolation { 17 + pub invariant: String, 18 + pub detail: String, 19 + } 20 + 21 + impl From<&InvariantViolation> for RegressionViolation { 22 + fn from(v: &InvariantViolation) -> Self { 23 + Self { 24 + invariant: v.invariant.to_string(), 25 + detail: v.detail.clone(), 26 + } 27 + } 28 + } 29 + 30 + #[derive(Debug, Clone, Serialize, Deserialize)] 31 + pub struct RegressionRecord { 32 + pub schema_version: u32, 33 + pub scenario: String, 34 + pub seed: Seed, 35 + #[serde(default)] 36 + pub overrides: ConfigOverrides, 37 + pub violations: Vec<RegressionViolation>, 38 + pub ops: Vec<Op>, 39 + #[serde(default)] 40 + pub original_ops_len: usize, 41 + } 42 + 43 + #[derive(Debug, thiserror::Error)] 44 + pub enum RegressionLoadError { 45 + #[error("read {path}: {source}")] 46 + Read { path: PathBuf, source: io::Error }, 47 + #[error("parse {path}: {source}")] 48 + Parse { 49 + path: PathBuf, 50 + source: serde_json::Error, 51 + }, 52 + #[error("schema version {found} outside supported range {min}..={max}")] 53 + UnsupportedVersion { found: u32, min: u32, max: u32 }, 54 + #[error(transparent)] 55 + UnknownScenario(#[from] UnknownScenario), 56 + } 57 + 58 + impl RegressionRecord { 59 + pub fn from_report( 60 + scenario: Scenario, 61 + overrides: ConfigOverrides, 62 + report: &GauntletReport, 63 + original_ops_len: usize, 64 + shrunk_ops: OpStream, 65 + ) -> Self { 66 + Self { 67 + schema_version: SCHEMA_VERSION, 68 + scenario: scenario.name().to_string(), 69 + seed: report.seed, 70 + overrides, 71 + violations: report 72 + .violations 73 + .iter() 74 + .map(RegressionViolation::from) 75 + .collect(), 76 + ops: shrunk_ops.into_vec(), 77 + original_ops_len, 78 + } 79 + } 80 + 81 + pub fn file_path(&self, root: &Path) -> PathBuf { 82 + root.join("gauntlet") 83 + .join(sanitize(&self.scenario)) 84 + .join(format!("{:016x}.json", self.seed.0)) 85 + } 86 + 87 + pub fn write_to(&self, root: &Path) -> io::Result<PathBuf> { 88 + let path = self.file_path(root); 89 + if let Some(parent) = path.parent() { 90 + std::fs::create_dir_all(parent)?; 91 + } 92 + let json = serde_json::to_vec_pretty(self).map_err(io::Error::other)?; 93 + let tmp = path.with_extension("json.tmp"); 94 + { 95 + let mut f = std::fs::File::create(&tmp)?; 96 + io::Write::write_all(&mut f, &json)?; 97 + f.sync_all()?; 98 + } 99 + std::fs::rename(&tmp, &path)?; 100 + if let Some(parent) = path.parent() { 101 + if let Ok(dir) = std::fs::File::open(parent) { 102 + let _ = dir.sync_all(); 103 + } 104 + } 105 + Ok(path) 106 + } 107 + 108 + pub fn load(path: &Path) -> Result<Self, RegressionLoadError> { 109 + let raw = std::fs::read(path).map_err(|source| RegressionLoadError::Read { 110 + path: path.to_path_buf(), 111 + source, 112 + })?; 113 + let record: RegressionRecord = 114 + serde_json::from_slice(&raw).map_err(|source| RegressionLoadError::Parse { 115 + path: path.to_path_buf(), 116 + source, 117 + })?; 118 + if record.schema_version < MIN_SUPPORTED_SCHEMA_VERSION 119 + || record.schema_version > SCHEMA_VERSION 120 + { 121 + return Err(RegressionLoadError::UnsupportedVersion { 122 + found: record.schema_version, 123 + min: MIN_SUPPORTED_SCHEMA_VERSION, 124 + max: SCHEMA_VERSION, 125 + }); 126 + } 127 + Ok(record) 128 + } 129 + 130 + pub fn scenario_enum(&self) -> Result<Scenario, UnknownScenario> { 131 + self.scenario.parse::<Scenario>() 132 + } 133 + 134 + pub fn build_config(&self) -> Result<GauntletConfig, UnknownScenario> { 135 + let scenario = self.scenario_enum()?; 136 + let mut cfg = config_for(scenario, self.seed); 137 + self.overrides.apply_to(&mut cfg); 138 + Ok(cfg) 139 + } 140 + 141 + pub fn op_stream(&self) -> OpStream { 142 + OpStream::from_vec(self.ops.clone()) 143 + } 144 + } 145 + 146 + fn sanitize(s: &str) -> String { 147 + s.chars() 148 + .map(|c| match c { 149 + 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '-' => c, 150 + _ => '_', 151 + }) 152 + .collect() 153 + } 154 + 155 + pub fn default_root() -> PathBuf { 156 + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("proptest-regressions") 157 + } 158 + 159 + #[cfg(test)] 160 + mod tests { 161 + use super::*; 162 + use crate::gauntlet::op::{CollectionName, RecordKey, ValueSeed}; 163 + use crate::gauntlet::overrides::ConfigOverrides; 164 + 165 + fn sample_record() -> RegressionRecord { 166 + use crate::gauntlet::overrides::StoreOverrides; 167 + 168 + let ops = vec![ 169 + Op::AddRecord { 170 + collection: CollectionName("c".into()), 171 + rkey: RecordKey("r".into()), 172 + value_seed: ValueSeed(1), 173 + }, 174 + Op::Compact, 175 + ]; 176 + let overrides = ConfigOverrides { 177 + op_count: Some(128), 178 + store: StoreOverrides { 179 + max_file_size: Some(4096), 180 + ..StoreOverrides::default() 181 + }, 182 + ..ConfigOverrides::default() 183 + }; 184 + RegressionRecord { 185 + schema_version: SCHEMA_VERSION, 186 + scenario: "HugeValues".to_string(), 187 + seed: Seed(0xdeadbeef), 188 + overrides, 189 + violations: vec![RegressionViolation { 190 + invariant: "ByteBudget".to_string(), 191 + detail: "exceeded".to_string(), 192 + }], 193 + ops, 194 + original_ops_len: 500, 195 + } 196 + } 197 + 198 + #[test] 199 + fn round_trip_preserves_all_fields() { 200 + let dir = tempfile::TempDir::new().unwrap(); 201 + let original = sample_record(); 202 + let path = original.write_to(dir.path()).unwrap(); 203 + assert!(path.exists()); 204 + let loaded = RegressionRecord::load(&path).unwrap(); 205 + assert_eq!(loaded.schema_version, original.schema_version); 206 + assert_eq!(loaded.scenario, original.scenario); 207 + assert_eq!(loaded.seed.0, original.seed.0); 208 + assert_eq!(loaded.overrides, original.overrides); 209 + assert_eq!(loaded.violations, original.violations); 210 + assert_eq!(loaded.ops.len(), original.ops.len()); 211 + assert_eq!(loaded.original_ops_len, original.original_ops_len); 212 + } 213 + 214 + #[test] 215 + fn build_config_applies_overrides() { 216 + let record = sample_record(); 217 + let cfg = record.build_config().unwrap(); 218 + assert_eq!(cfg.op_count.0, 128); 219 + assert_eq!(cfg.store.max_file_size.0, 4096); 220 + } 221 + 222 + #[test] 223 + fn rejects_future_schema_version() { 224 + let dir = tempfile::TempDir::new().unwrap(); 225 + let mut r = sample_record(); 226 + r.schema_version = SCHEMA_VERSION + 1; 227 + let path = r.write_to(dir.path()).unwrap(); 228 + match RegressionRecord::load(&path) { 229 + Err(RegressionLoadError::UnsupportedVersion { found, min, max }) => { 230 + assert_eq!(found, SCHEMA_VERSION + 1); 231 + assert_eq!(min, MIN_SUPPORTED_SCHEMA_VERSION); 232 + assert_eq!(max, SCHEMA_VERSION); 233 + } 234 + other => panic!("expected UnsupportedVersion, got {other:?}"), 235 + } 236 + } 237 + 238 + #[test] 239 + fn rejects_past_schema_version_below_min() { 240 + let dir = tempfile::TempDir::new().unwrap(); 241 + let mut r = sample_record(); 242 + r.schema_version = MIN_SUPPORTED_SCHEMA_VERSION.saturating_sub(1); 243 + let path = r.write_to(dir.path()).unwrap(); 244 + match RegressionRecord::load(&path) { 245 + Err(RegressionLoadError::UnsupportedVersion { found, min, max }) => { 246 + assert_eq!(found, MIN_SUPPORTED_SCHEMA_VERSION.saturating_sub(1)); 247 + assert_eq!(min, MIN_SUPPORTED_SCHEMA_VERSION); 248 + assert_eq!(max, SCHEMA_VERSION); 249 + } 250 + other => panic!("expected UnsupportedVersion, got {other:?}"), 251 + } 252 + } 253 + 254 + #[test] 255 + fn atomic_write_leaves_no_tmp_file() { 256 + let dir = tempfile::TempDir::new().unwrap(); 257 + let r = sample_record(); 258 + let path = r.write_to(dir.path()).unwrap(); 259 + assert!(path.exists()); 260 + let tmp = path.with_extension("json.tmp"); 261 + assert!( 262 + !tmp.exists(), 263 + "tmp sibling {tmp:?} should have been renamed" 264 + ); 265 + } 266 + 267 + #[test] 268 + fn rejects_malformed_json() { 269 + let dir = tempfile::TempDir::new().unwrap(); 270 + let path = dir.path().join("bad.json"); 271 + std::fs::write(&path, b"{not json").unwrap(); 272 + assert!(matches!( 273 + RegressionRecord::load(&path), 274 + Err(RegressionLoadError::Parse { .. }) 275 + )); 276 + } 277 + 278 + #[test] 279 + fn sanitize_strips_slashes_and_traversal() { 280 + assert_eq!(sanitize("foo/bar baz"), "foo_bar_baz"); 281 + assert_eq!(sanitize("../etc"), "___etc"); 282 + } 283 + 284 + #[test] 285 + fn unknown_scenario_name_errors() { 286 + let mut r = sample_record(); 287 + r.scenario = "BogusScenario".to_string(); 288 + assert!(r.build_config().is_err()); 289 + } 290 + }
+191
crates/tranquil-store/src/gauntlet/shrink.rs
··· 1 + use std::collections::BTreeSet; 2 + 3 + use super::op::OpStream; 4 + use super::runner::{Gauntlet, GauntletConfig, GauntletReport}; 5 + 6 + pub const DEFAULT_MAX_SHRINK_ITERATIONS: usize = 256; 7 + 8 + #[derive(Debug)] 9 + pub struct ShrinkOutcome { 10 + pub ops: OpStream, 11 + pub report: GauntletReport, 12 + pub iterations: usize, 13 + } 14 + 15 + pub async fn shrink_failure( 16 + config: GauntletConfig, 17 + initial_ops: OpStream, 18 + initial_report: GauntletReport, 19 + max_iterations: usize, 20 + ) -> ShrinkOutcome { 21 + let target: BTreeSet<&'static str> = initial_report.violation_invariants(); 22 + if target.is_empty() { 23 + return ShrinkOutcome { 24 + ops: initial_ops, 25 + report: initial_report, 26 + iterations: 0, 27 + }; 28 + } 29 + 30 + let mut current_ops = initial_ops; 31 + let mut current_report = initial_report; 32 + let mut iterations = 0usize; 33 + 34 + while iterations < max_iterations { 35 + match try_one_shrink_round(&config, &current_ops, &target, max_iterations - iterations) 36 + .await 37 + { 38 + ShrinkRound::Progress { 39 + ops, 40 + report, 41 + runs_used, 42 + } => { 43 + current_ops = ops; 44 + current_report = report; 45 + iterations += runs_used; 46 + } 47 + ShrinkRound::Exhausted { runs_used } => { 48 + iterations += runs_used; 49 + break; 50 + } 51 + } 52 + } 53 + 54 + ShrinkOutcome { 55 + ops: current_ops, 56 + report: current_report, 57 + iterations, 58 + } 59 + } 60 + 61 + enum ShrinkRound { 62 + Progress { 63 + ops: OpStream, 64 + report: GauntletReport, 65 + runs_used: usize, 66 + }, 67 + Exhausted { 68 + runs_used: usize, 69 + }, 70 + } 71 + 72 + async fn try_one_shrink_round( 73 + config: &GauntletConfig, 74 + current_ops: &OpStream, 75 + target: &BTreeSet<&'static str>, 76 + budget: usize, 77 + ) -> ShrinkRound { 78 + let mut runs_used = 0usize; 79 + for candidate in current_ops.shrink_candidates() { 80 + if candidate.is_empty() || candidate.len() >= current_ops.len() { 81 + continue; 82 + } 83 + if runs_used >= budget { 84 + return ShrinkRound::Exhausted { runs_used }; 85 + } 86 + runs_used += 1; 87 + let gauntlet = match Gauntlet::new(config.clone()) { 88 + Ok(g) => g, 89 + Err(_) => continue, 90 + }; 91 + let report = gauntlet.run_with_ops(candidate.clone()).await; 92 + let got: BTreeSet<&'static str> = report.violation_invariants(); 93 + if !got.is_disjoint(target) { 94 + return ShrinkRound::Progress { 95 + ops: candidate, 96 + report, 97 + runs_used, 98 + }; 99 + } 100 + } 101 + ShrinkRound::Exhausted { runs_used } 102 + } 103 + 104 + #[cfg(test)] 105 + mod tests { 106 + use super::*; 107 + use crate::blockstore::GroupCommitConfig; 108 + use crate::gauntlet::invariants::{InvariantSet, InvariantViolation}; 109 + use crate::gauntlet::op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; 110 + use crate::gauntlet::runner::{ 111 + GauntletConfig, IoBackend, MaxFileSize, OpErrorCount, OpsExecuted, RestartCount, 112 + RestartPolicy, RunLimits, ShardCount, StoreConfig, WriterConcurrency, 113 + }; 114 + use crate::gauntlet::workload::{ 115 + DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution, 116 + ValueBytes, WorkloadModel, 117 + }; 118 + use crate::sim::FaultConfig; 119 + 120 + fn dummy_config() -> GauntletConfig { 121 + GauntletConfig { 122 + seed: Seed(1), 123 + io: IoBackend::Simulated { 124 + fault: FaultConfig::none(), 125 + }, 126 + workload: WorkloadModel { 127 + weights: OpWeights::default(), 128 + size_distribution: SizeDistribution::Fixed(ValueBytes(16)), 129 + collections: vec![CollectionName("c".into())], 130 + key_space: KeySpaceSize(4), 131 + did_space: DidSpaceSize(1), 132 + retention_max_secs: RetentionMaxSecs(60), 133 + }, 134 + op_count: OpCount(4), 135 + invariants: InvariantSet::EMPTY, 136 + limits: RunLimits { max_wall_ms: None }, 137 + restart_policy: RestartPolicy::Never, 138 + store: StoreConfig { 139 + max_file_size: MaxFileSize(4096), 140 + group_commit: GroupCommitConfig::default(), 141 + shard_count: ShardCount(1), 142 + }, 143 + eventlog: None, 144 + writer_concurrency: WriterConcurrency(1), 145 + } 146 + } 147 + 148 + fn fake_report(seed: u64, names: &[&'static str]) -> GauntletReport { 149 + GauntletReport { 150 + seed: Seed(seed), 151 + ops_executed: OpsExecuted(0), 152 + op_errors: OpErrorCount(0), 153 + restarts: RestartCount(0), 154 + violations: names 155 + .iter() 156 + .copied() 157 + .map(|n| InvariantViolation { 158 + invariant: n, 159 + detail: "x".to_string(), 160 + }) 161 + .collect(), 162 + ops: OpStream::empty(), 163 + } 164 + } 165 + 166 + fn sample_stream() -> OpStream { 167 + OpStream::from_vec(vec![ 168 + Op::AddRecord { 169 + collection: CollectionName("c".into()), 170 + rkey: RecordKey("a".into()), 171 + value_seed: ValueSeed(1), 172 + }, 173 + Op::Compact, 174 + ]) 175 + } 176 + 177 + #[test] 178 + fn clean_report_returns_input_unchanged() { 179 + let cfg = dummy_config(); 180 + let ops = sample_stream(); 181 + let clean = fake_report(1, &[]); 182 + let rt = tokio::runtime::Builder::new_current_thread() 183 + .enable_all() 184 + .build() 185 + .unwrap(); 186 + let before_len = ops.len(); 187 + let out = rt.block_on(shrink_failure(cfg, ops, clean, 8)); 188 + assert_eq!(out.iterations, 0); 189 + assert_eq!(out.ops.len(), before_len); 190 + } 191 + }
+469 -46
crates/tranquil-store/src/sim.rs
··· 1 - use std::collections::{HashMap, HashSet}; 1 + use std::collections::{HashMap, HashSet, VecDeque}; 2 2 use std::io; 3 3 use std::path::{Path, PathBuf}; 4 4 use std::sync::Mutex; 5 + use std::sync::atomic::{AtomicU64, Ordering}; 6 + use std::time::Duration; 5 7 6 8 use crate::io::{FileId, OpenOptions, StorageIO}; 7 9 10 + pub const TORN_PAGE_BYTES: usize = 4096; 11 + pub const SECTOR_BYTES: usize = 512; 12 + 13 + #[derive(Debug, Clone, Copy, PartialEq)] 14 + pub struct Probability(f64); 15 + 16 + impl Probability { 17 + pub const ZERO: Self = Self(0.0); 18 + 19 + pub fn new(p: f64) -> Self { 20 + assert!( 21 + p.is_finite() && (0.0..=1.0).contains(&p), 22 + "probability out of range: {p}" 23 + ); 24 + Self(p) 25 + } 26 + 27 + pub fn raw(self) -> f64 { 28 + self.0 29 + } 30 + 31 + pub fn is_nonzero(self) -> bool { 32 + self.0 > 0.0 33 + } 34 + } 35 + 36 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 37 + pub struct SyncReorderWindow(pub u32); 38 + 39 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 40 + pub struct LatencyNs(pub u64); 41 + 8 42 #[derive(Debug, Clone, Copy)] 9 43 pub struct FaultConfig { 10 - pub partial_write_probability: f64, 11 - pub bit_flip_on_read_probability: f64, 12 - pub sync_failure_probability: f64, 13 - pub dir_sync_failure_probability: f64, 14 - pub misdirected_write_probability: f64, 15 - pub io_error_probability: f64, 44 + pub partial_write_probability: Probability, 45 + pub bit_flip_on_read_probability: Probability, 46 + pub sync_failure_probability: Probability, 47 + pub dir_sync_failure_probability: Probability, 48 + pub misdirected_write_probability: Probability, 49 + pub io_error_probability: Probability, 50 + pub torn_page_probability: Probability, 51 + pub misdirected_read_probability: Probability, 52 + pub delayed_io_error_probability: Probability, 53 + pub sync_reorder_window: SyncReorderWindow, 54 + pub latency_distribution_ns: LatencyNs, 16 55 } 17 56 18 57 impl FaultConfig { 19 58 pub fn none() -> Self { 20 59 Self { 21 - partial_write_probability: 0.0, 22 - bit_flip_on_read_probability: 0.0, 23 - sync_failure_probability: 0.0, 24 - dir_sync_failure_probability: 0.0, 25 - misdirected_write_probability: 0.0, 26 - io_error_probability: 0.0, 60 + partial_write_probability: Probability::ZERO, 61 + bit_flip_on_read_probability: Probability::ZERO, 62 + sync_failure_probability: Probability::ZERO, 63 + dir_sync_failure_probability: Probability::ZERO, 64 + misdirected_write_probability: Probability::ZERO, 65 + io_error_probability: Probability::ZERO, 66 + torn_page_probability: Probability::ZERO, 67 + misdirected_read_probability: Probability::ZERO, 68 + delayed_io_error_probability: Probability::ZERO, 69 + sync_reorder_window: SyncReorderWindow(0), 70 + latency_distribution_ns: LatencyNs(0), 27 71 } 28 72 } 29 73 30 74 pub fn moderate() -> Self { 31 75 Self { 32 - partial_write_probability: 0.05, 33 - bit_flip_on_read_probability: 0.01, 34 - sync_failure_probability: 0.03, 35 - dir_sync_failure_probability: 0.02, 36 - misdirected_write_probability: 0.01, 37 - io_error_probability: 0.02, 76 + partial_write_probability: Probability::new(0.05), 77 + bit_flip_on_read_probability: Probability::new(0.01), 78 + sync_failure_probability: Probability::new(0.03), 79 + dir_sync_failure_probability: Probability::new(0.02), 80 + misdirected_write_probability: Probability::new(0.01), 81 + io_error_probability: Probability::new(0.02), 82 + torn_page_probability: Probability::new(0.01), 83 + misdirected_read_probability: Probability::new(0.005), 84 + delayed_io_error_probability: Probability::new(0.01), 85 + sync_reorder_window: SyncReorderWindow(4), 86 + latency_distribution_ns: LatencyNs(50_000), 38 87 } 39 88 } 40 89 41 90 pub fn aggressive() -> Self { 42 91 Self { 43 - partial_write_probability: 0.15, 44 - bit_flip_on_read_probability: 0.05, 45 - sync_failure_probability: 0.10, 46 - dir_sync_failure_probability: 0.05, 47 - misdirected_write_probability: 0.05, 48 - io_error_probability: 0.08, 92 + partial_write_probability: Probability::new(0.15), 93 + bit_flip_on_read_probability: Probability::new(0.05), 94 + sync_failure_probability: Probability::new(0.10), 95 + dir_sync_failure_probability: Probability::new(0.05), 96 + misdirected_write_probability: Probability::new(0.05), 97 + io_error_probability: Probability::new(0.08), 98 + torn_page_probability: Probability::new(0.05), 99 + misdirected_read_probability: Probability::new(0.02), 100 + delayed_io_error_probability: Probability::new(0.05), 101 + sync_reorder_window: SyncReorderWindow(8), 102 + latency_distribution_ns: LatencyNs(250_000), 49 103 } 50 104 } 105 + 106 + pub fn torn_pages_only() -> Self { 107 + Self { 108 + torn_page_probability: Probability::new(0.25), 109 + ..Self::none() 110 + } 111 + } 112 + 113 + pub fn fsyncgate_only() -> Self { 114 + Self { 115 + delayed_io_error_probability: Probability::new(0.05), 116 + ..Self::none() 117 + } 118 + } 119 + 120 + pub fn injects_errors(&self) -> bool { 121 + self.partial_write_probability.is_nonzero() 122 + || self.bit_flip_on_read_probability.is_nonzero() 123 + || self.sync_failure_probability.is_nonzero() 124 + || self.dir_sync_failure_probability.is_nonzero() 125 + || self.misdirected_write_probability.is_nonzero() 126 + || self.io_error_probability.is_nonzero() 127 + || self.torn_page_probability.is_nonzero() 128 + || self.misdirected_read_probability.is_nonzero() 129 + || self.delayed_io_error_probability.is_nonzero() 130 + || self.sync_reorder_window.0 > 0 131 + } 51 132 } 52 133 53 134 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] ··· 57 138 buffered: Vec<u8>, 58 139 durable: Vec<u8>, 59 140 dir_entry_durable: bool, 141 + io_poisoned: bool, 60 142 } 61 143 62 144 struct SimFd { ··· 108 190 }, 109 191 } 110 192 193 + struct PendingSync { 194 + storage_id: StorageId, 195 + snapshot: Vec<u8>, 196 + } 197 + 198 + struct PendingDelete { 199 + path: PathBuf, 200 + storage_id: StorageId, 201 + was_dir_durable: bool, 202 + } 203 + 111 204 struct SimState { 112 205 storage: HashMap<StorageId, SimStorage>, 113 206 paths: HashMap<PathBuf, StorageId>, ··· 117 210 rng_counter: u64, 118 211 next_fd_id: u64, 119 212 next_storage_id: u64, 213 + pending_syncs: VecDeque<PendingSync>, 214 + pending_deletes: Vec<PendingDelete>, 120 215 } 121 216 122 217 impl SimState { ··· 137 232 (mixed as usize) % max 138 233 } 139 234 140 - fn should_fault(&mut self, seed: u64, probability: f64) -> bool { 141 - probability > 0.0 && self.next_random(seed) < probability 235 + fn should_fault(&mut self, seed: u64, probability: Probability) -> bool { 236 + probability.is_nonzero() && self.next_random(seed) < probability.raw() 142 237 } 143 238 144 239 fn alloc_fd_id(&mut self) -> FileId { ··· 194 289 state: Mutex<SimState>, 195 290 fault_config: FaultConfig, 196 291 rng_seed: u64, 292 + latency_counter: AtomicU64, 197 293 } 198 294 199 295 impl SimulatedIO { ··· 208 304 rng_counter: 0, 209 305 next_fd_id: 1, 210 306 next_storage_id: 1, 307 + pending_syncs: VecDeque::new(), 308 + pending_deletes: Vec::new(), 211 309 }), 212 310 fault_config, 213 311 rng_seed: seed, 312 + latency_counter: AtomicU64::new(0), 214 313 } 215 314 } 216 315 316 + fn jitter(&self) { 317 + let max_ns = self.fault_config.latency_distribution_ns.0; 318 + if max_ns == 0 { 319 + return; 320 + } 321 + let c = self.latency_counter.fetch_add(1, Ordering::Relaxed); 322 + let r = splitmix64(self.rng_seed.wrapping_add(c)); 323 + let ns = r % max_ns; 324 + std::thread::sleep(Duration::from_nanos(ns)); 325 + } 326 + 217 327 pub fn pristine(seed: u64) -> Self { 218 328 Self::new(seed, FaultConfig::none()) 219 329 } ··· 222 332 let mut state = self.state.lock().unwrap(); 223 333 224 334 state.fds.clear(); 335 + state.pending_syncs.clear(); 336 + 337 + let pending = std::mem::take(&mut state.pending_deletes); 338 + pending.into_iter().for_each(|pd| { 339 + if pd.was_dir_durable && state.storage.contains_key(&pd.storage_id) { 340 + state.paths.insert(pd.path, pd.storage_id); 341 + } 342 + }); 225 343 226 344 let orphaned: Vec<StorageId> = state 227 345 .storage ··· 237 355 let live_sids: HashSet<StorageId> = state.storage.keys().copied().collect(); 238 356 state.paths.retain(|_, sid| live_sids.contains(sid)); 239 357 240 - state 241 - .storage 242 - .values_mut() 243 - .for_each(|s| s.buffered = s.durable.clone()); 358 + state.storage.values_mut().for_each(|s| { 359 + s.buffered = s.durable.clone(); 360 + s.io_poisoned = false; 361 + }); 244 362 } 245 363 246 364 pub fn op_log(&self) -> Vec<OpRecord> { ··· 314 432 buffered: Vec::new(), 315 433 durable: Vec::new(), 316 434 dir_entry_durable: false, 435 + io_poisoned: false, 317 436 }, 318 437 ); 319 438 state.paths.insert(path_buf.clone(), sid); ··· 345 464 let sid = fd_info.storage_id; 346 465 let unlinked = !state.paths.values().any(|s| *s == sid); 347 466 let no_remaining_fds = !state.fds.values().any(|f| f.storage_id == sid); 467 + let pending_deleted = state.pending_deletes.iter().any(|pd| pd.storage_id == sid); 348 468 349 - if unlinked && no_remaining_fds { 469 + if unlinked && no_remaining_fds && !pending_deleted { 350 470 state.storage.remove(&sid); 351 471 } 352 472 ··· 355 475 } 356 476 357 477 fn read_at(&self, id: FileId, offset: u64, buf: &mut [u8]) -> io::Result<usize> { 478 + self.jitter(); 358 479 let mut state = self.state.lock().unwrap(); 359 480 let sid = state.require_readable(id)?; 360 481 let seed = self.rng_seed; 361 482 483 + if state.storage.get(&sid).is_some_and(|s| s.io_poisoned) { 484 + return Err(io::Error::other("simulated EIO after delayed sync fault")); 485 + } 486 + 362 487 if state.should_fault(seed, self.fault_config.io_error_probability) { 363 488 return Err(io::Error::other("simulated EIO on read")); 364 489 } 365 490 491 + let read_offset = 492 + if state.should_fault(seed, self.fault_config.misdirected_read_probability) { 493 + let drift_sectors = state.next_random_usize(seed, 8) + 1; 494 + let drift = (drift_sectors * SECTOR_BYTES) as u64; 495 + if state.next_random(seed) < 0.5 { 496 + offset.saturating_sub(drift) 497 + } else { 498 + offset.saturating_add(drift) 499 + } 500 + } else { 501 + offset 502 + }; 503 + 366 504 let storage = state.storage.get(&sid).unwrap(); 367 505 368 - let off = usize::try_from(offset) 506 + let off = usize::try_from(read_offset) 369 507 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "offset exceeds usize"))?; 370 508 if off >= storage.buffered.len() { 371 509 state.op_log.push(OpRecord::ReadAt { ··· 395 533 } 396 534 397 535 fn write_at(&self, id: FileId, offset: u64, buf: &[u8]) -> io::Result<usize> { 536 + self.jitter(); 398 537 let mut state = self.state.lock().unwrap(); 399 538 let sid = state.require_writable(id)?; 400 539 let seed = self.rng_seed; 401 540 541 + if state.storage.get(&sid).is_some_and(|s| s.io_poisoned) { 542 + return Err(io::Error::other("simulated EIO after delayed sync fault")); 543 + } 544 + 402 545 if state.should_fault(seed, self.fault_config.io_error_probability) { 403 546 return Err(io::Error::other("simulated EIO on write")); 404 547 } 405 548 406 - let actual_len = if buf.len() > 1 407 - && state.should_fault(seed, self.fault_config.partial_write_probability) 408 - { 409 - let partial = state.next_random_usize(seed, buf.len()); 410 - partial.max(1) 411 - } else { 412 - buf.len() 549 + let torn_len = 550 + if buf.len() > 1 && state.should_fault(seed, self.fault_config.torn_page_probability) { 551 + let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); 552 + let page_end = page_base + TORN_PAGE_BYTES; 553 + let cap = page_end.saturating_sub(offset as usize).min(buf.len()); 554 + let max_sectors = cap / SECTOR_BYTES; 555 + (max_sectors >= 2).then(|| { 556 + let n = state.next_random_usize(seed, max_sectors - 1) + 1; 557 + n * SECTOR_BYTES 558 + }) 559 + } else { 560 + None 561 + }; 562 + 563 + let actual_len = match torn_len { 564 + Some(n) => n, 565 + None if buf.len() > 1 566 + && state.should_fault(seed, self.fault_config.partial_write_probability) => 567 + { 568 + let partial = state.next_random_usize(seed, buf.len()); 569 + partial.max(1) 570 + } 571 + None => buf.len(), 413 572 }; 414 573 415 574 let misdirected = state.should_fault(seed, self.fault_config.misdirected_write_probability); 416 575 let write_offset = if misdirected { 417 - let drift = state.next_random_usize(seed, 64) as u64; 576 + let drift_sectors = state.next_random_usize(seed, 8) + 1; 577 + let drift = (drift_sectors * SECTOR_BYTES) as u64; 418 578 if state.next_random(seed) < 0.5 { 419 579 offset.saturating_sub(drift) 420 580 } else { ··· 444 604 } 445 605 446 606 fn sync(&self, id: FileId) -> io::Result<()> { 607 + self.jitter(); 447 608 let mut state = self.state.lock().unwrap(); 448 609 let sid = state.require_open(id)?; 449 610 let seed = self.rng_seed; 450 611 612 + if state.storage.get(&sid).is_some_and(|s| s.io_poisoned) { 613 + return Err(io::Error::other("simulated EIO after delayed sync fault")); 614 + } 615 + 451 616 if state.should_fault(seed, self.fault_config.io_error_probability) { 452 617 return Err(io::Error::other("simulated EIO on sync")); 453 618 } 454 619 455 620 let sync_succeeded = !state.should_fault(seed, self.fault_config.sync_failure_probability); 621 + let poison_after = sync_succeeded 622 + && state.should_fault(seed, self.fault_config.delayed_io_error_probability); 623 + let reorder_window = self.fault_config.sync_reorder_window.0 as usize; 624 + 625 + let evicted = if sync_succeeded && reorder_window > 0 { 626 + let snapshot = state.storage.get(&sid).unwrap().buffered.clone(); 627 + state.pending_syncs.push_back(PendingSync { 628 + storage_id: sid, 629 + snapshot, 630 + }); 631 + if state.pending_syncs.len() > reorder_window { 632 + state.pending_syncs.pop_front() 633 + } else { 634 + None 635 + } 636 + } else { 637 + None 638 + }; 639 + 640 + if let Some(PendingSync { 641 + storage_id: old_sid, 642 + snapshot, 643 + }) = evicted 644 + && let Some(old) = state.storage.get_mut(&old_sid) 645 + { 646 + old.durable = snapshot; 647 + } 456 648 457 649 let storage = state.storage.get_mut(&sid).unwrap(); 458 650 459 - if sync_succeeded { 651 + if sync_succeeded && reorder_window == 0 { 460 652 storage.durable = storage.buffered.clone(); 653 + } 654 + if poison_after { 655 + storage.io_poisoned = true; 461 656 } 462 657 463 658 state.op_log.push(OpRecord::Sync { ··· 517 712 .remove(&path_buf) 518 713 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "file not found"))?; 519 714 520 - let has_open_fds = state.fds.values().any(|fd_info| fd_info.storage_id == sid); 715 + let was_dir_durable = state 716 + .storage 717 + .get(&sid) 718 + .map(|s| s.dir_entry_durable) 719 + .unwrap_or(false); 521 720 522 - if !has_open_fds { 523 - state.storage.remove(&sid); 524 - } 721 + state.pending_deletes.push(PendingDelete { 722 + path: path_buf.clone(), 723 + storage_id: sid, 724 + was_dir_durable, 725 + }); 525 726 526 727 state.op_log.push(OpRecord::Delete { path: path_buf }); 527 728 Ok(()) ··· 560 761 sids_in_dir.iter().for_each(|sid| { 561 762 if let Some(storage) = state.storage.get_mut(sid) { 562 763 storage.dir_entry_durable = true; 764 + } 765 + }); 766 + 767 + let drained = std::mem::take(&mut state.pending_deletes); 768 + let (committed, remaining): (Vec<_>, Vec<_>) = drained 769 + .into_iter() 770 + .partition(|pd| pd.path.parent() == Some(path)); 771 + state.pending_deletes = remaining; 772 + committed.into_iter().for_each(|pd| { 773 + let has_fds = state.fds.values().any(|f| f.storage_id == pd.storage_id); 774 + if !has_fds { 775 + state.storage.remove(&pd.storage_id); 563 776 } 564 777 }); 565 778 } ··· 685 898 } 686 899 687 900 #[test] 901 + fn delete_without_dir_sync_reverts_on_crash() { 902 + let sim = SimulatedIO::pristine(42); 903 + let dir = Path::new("/test"); 904 + sim.mkdir(dir).unwrap(); 905 + sim.sync_dir(dir).unwrap(); 906 + 907 + let path = Path::new("/test/file.dat"); 908 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 909 + sim.write_at(fd, 0, b"durable data").unwrap(); 910 + sim.sync(fd).unwrap(); 911 + sim.sync_dir(dir).unwrap(); 912 + sim.close(fd).unwrap(); 913 + 914 + sim.delete(path).unwrap(); 915 + sim.crash(); 916 + 917 + let fd = sim.open(path, OpenOptions::read()).unwrap(); 918 + let mut buf = vec![0u8; 12]; 919 + sim.read_at(fd, 0, &mut buf).unwrap(); 920 + assert_eq!(&buf, b"durable data"); 921 + } 922 + 923 + #[test] 924 + fn delete_commits_after_dir_sync() { 925 + let sim = SimulatedIO::pristine(42); 926 + let dir = Path::new("/test"); 927 + sim.mkdir(dir).unwrap(); 928 + sim.sync_dir(dir).unwrap(); 929 + 930 + let path = Path::new("/test/file.dat"); 931 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 932 + sim.write_at(fd, 0, b"data").unwrap(); 933 + sim.sync(fd).unwrap(); 934 + sim.sync_dir(dir).unwrap(); 935 + sim.close(fd).unwrap(); 936 + 937 + sim.delete(path).unwrap(); 938 + sim.sync_dir(dir).unwrap(); 939 + sim.crash(); 940 + 941 + let result = sim.open(path, OpenOptions::read()); 942 + assert!(result.is_err()); 943 + } 944 + 945 + #[test] 946 + fn delete_of_never_durable_file_stays_gone_on_crash() { 947 + let sim = SimulatedIO::pristine(42); 948 + let path = Path::new("/test/file.dat"); 949 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 950 + sim.write_at(fd, 0, b"volatile").unwrap(); 951 + sim.sync(fd).unwrap(); 952 + sim.close(fd).unwrap(); 953 + 954 + sim.delete(path).unwrap(); 955 + sim.crash(); 956 + 957 + let result = sim.open(path, OpenOptions::read()); 958 + assert!(result.is_err()); 959 + } 960 + 961 + #[test] 688 962 fn dir_sync_makes_file_durable() { 689 963 let sim = SimulatedIO::pristine(42); 690 964 let dir = Path::new("/test"); ··· 848 1122 849 1123 let result = sim.write_at(fd_ro, 0, b"nope"); 850 1124 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::PermissionDenied); 1125 + } 1126 + 1127 + #[test] 1128 + fn torn_page_truncates_within_page() { 1129 + let fc = FaultConfig { 1130 + torn_page_probability: Probability::new(1.0), 1131 + ..FaultConfig::none() 1132 + }; 1133 + let sim = SimulatedIO::new(123, fc); 1134 + let path = Path::new("/test/file.dat"); 1135 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 1136 + let data = vec![0xAAu8; TORN_PAGE_BYTES + 1024]; 1137 + let written = sim.write_at(fd, 0, &data).unwrap(); 1138 + assert!(written >= 1); 1139 + assert!(written <= TORN_PAGE_BYTES); 1140 + } 1141 + 1142 + #[test] 1143 + fn delayed_io_error_poisons_storage_after_sync() { 1144 + let fc = FaultConfig { 1145 + delayed_io_error_probability: Probability::new(1.0), 1146 + ..FaultConfig::none() 1147 + }; 1148 + let sim = SimulatedIO::new(7, fc); 1149 + let path = Path::new("/test/file.dat"); 1150 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 1151 + sim.write_at(fd, 0, b"hello").unwrap(); 1152 + sim.sync(fd).unwrap(); 1153 + 1154 + let err = sim.write_at(fd, 5, b"world").unwrap_err(); 1155 + assert_eq!(err.kind(), io::ErrorKind::Other); 1156 + let err2 = sim.sync(fd).unwrap_err(); 1157 + assert_eq!(err2.kind(), io::ErrorKind::Other); 1158 + let mut buf = [0u8; 5]; 1159 + let err3 = sim.read_at(fd, 0, &mut buf).unwrap_err(); 1160 + assert_eq!(err3.kind(), io::ErrorKind::Other); 1161 + } 1162 + 1163 + #[test] 1164 + fn misdirected_read_reads_wrong_offset() { 1165 + let fc = FaultConfig { 1166 + misdirected_read_probability: Probability::new(1.0), 1167 + ..FaultConfig::none() 1168 + }; 1169 + let sim = SimulatedIO::new(1, fc); 1170 + let path = Path::new("/test/file.dat"); 1171 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 1172 + let data: Vec<u8> = (0..2048u32).flat_map(|n| n.to_le_bytes()).collect(); 1173 + sim.write_at(fd, 0, &data).unwrap(); 1174 + sim.sync(fd).unwrap(); 1175 + 1176 + let mut drifted_hit = false; 1177 + for _ in 0..32 { 1178 + let mut buf = [0u8; 16]; 1179 + let target_off = 4096u64; 1180 + let expected = &data[target_off as usize..target_off as usize + 16]; 1181 + if sim.read_at(fd, target_off, &mut buf).unwrap() == 16 && buf != expected { 1182 + drifted_hit = true; 1183 + break; 1184 + } 1185 + } 1186 + assert!( 1187 + drifted_hit, 1188 + "misdirected read never drifted away from target" 1189 + ); 1190 + } 1191 + 1192 + #[test] 1193 + fn sync_reorder_window_defers_durability() { 1194 + let fc = FaultConfig { 1195 + sync_reorder_window: SyncReorderWindow(2), 1196 + ..FaultConfig::none() 1197 + }; 1198 + let sim = SimulatedIO::new(42, fc); 1199 + let dir = Path::new("/test"); 1200 + sim.mkdir(dir).unwrap(); 1201 + sim.sync_dir(dir).unwrap(); 1202 + 1203 + let a = Path::new("/test/a.dat"); 1204 + let fd_a = sim.open(a, OpenOptions::read_write()).unwrap(); 1205 + sim.write_at(fd_a, 0, b"A").unwrap(); 1206 + sim.sync(fd_a).unwrap(); 1207 + assert!(sim.durable_contents(fd_a).unwrap().is_empty()); 1208 + 1209 + let b = Path::new("/test/b.dat"); 1210 + let fd_b = sim.open(b, OpenOptions::read_write()).unwrap(); 1211 + sim.write_at(fd_b, 0, b"B").unwrap(); 1212 + sim.sync(fd_b).unwrap(); 1213 + assert!(sim.durable_contents(fd_a).unwrap().is_empty()); 1214 + assert!(sim.durable_contents(fd_b).unwrap().is_empty()); 1215 + 1216 + let c = Path::new("/test/c.dat"); 1217 + let fd_c = sim.open(c, OpenOptions::read_write()).unwrap(); 1218 + sim.write_at(fd_c, 0, b"C").unwrap(); 1219 + sim.sync(fd_c).unwrap(); 1220 + assert_eq!(sim.durable_contents(fd_a).unwrap(), b"A"); 1221 + assert!(sim.durable_contents(fd_b).unwrap().is_empty()); 1222 + assert!(sim.durable_contents(fd_c).unwrap().is_empty()); 1223 + } 1224 + 1225 + #[test] 1226 + fn sync_reorder_commits_at_sync_time_snapshot_not_current_buffer() { 1227 + let fc = FaultConfig { 1228 + sync_reorder_window: SyncReorderWindow(1), 1229 + ..FaultConfig::none() 1230 + }; 1231 + let sim = SimulatedIO::new(42, fc); 1232 + let dir = Path::new("/test"); 1233 + sim.mkdir(dir).unwrap(); 1234 + sim.sync_dir(dir).unwrap(); 1235 + 1236 + let a = Path::new("/test/a.dat"); 1237 + let fd_a = sim.open(a, OpenOptions::read_write()).unwrap(); 1238 + sim.write_at(fd_a, 0, b"first").unwrap(); 1239 + sim.sync(fd_a).unwrap(); 1240 + sim.write_at(fd_a, 0, b"second").unwrap(); 1241 + 1242 + let b = Path::new("/test/b.dat"); 1243 + let fd_b = sim.open(b, OpenOptions::read_write()).unwrap(); 1244 + sim.write_at(fd_b, 0, b"b").unwrap(); 1245 + sim.sync(fd_b).unwrap(); 1246 + 1247 + assert_eq!( 1248 + sim.durable_contents(fd_a).unwrap(), 1249 + b"first", 1250 + "reordered sync must commit buffered-at-sync-call, not current buffered" 1251 + ); 1252 + } 1253 + 1254 + #[test] 1255 + fn crash_drops_pending_reordered_syncs() { 1256 + let fc = FaultConfig { 1257 + sync_reorder_window: SyncReorderWindow(4), 1258 + ..FaultConfig::none() 1259 + }; 1260 + let sim = SimulatedIO::new(42, fc); 1261 + let dir = Path::new("/test"); 1262 + sim.mkdir(dir).unwrap(); 1263 + sim.sync_dir(dir).unwrap(); 1264 + 1265 + let path = Path::new("/test/file.dat"); 1266 + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); 1267 + sim.sync_dir(dir).unwrap(); 1268 + sim.write_at(fd, 0, b"pending").unwrap(); 1269 + sim.sync(fd).unwrap(); 1270 + sim.crash(); 1271 + 1272 + let fd2 = sim.open(path, OpenOptions::read()).unwrap(); 1273 + assert_eq!(sim.file_size(fd2).unwrap(), 0); 851 1274 } 852 1275 853 1276 #[test]