Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
219
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-store): tranquil-gauntlet CLI, config overrides, profiles

Lewis: May this revision serve well! <lu5a@proton.me>

+996 -5
+21
.config/nextest.toml
··· 25 25 test-threads = "num-cpus" 26 26 slow-timeout = { period = "300s", terminate-after = 2 } 27 27 28 + [profile.gauntlet-pr] 29 + retries = 0 30 + fail-fast = true 31 + test-threads = "num-cpus" 32 + slow-timeout = { period = "60s", terminate-after = 5 } 33 + 34 + [[profile.gauntlet-pr.overrides]] 35 + filter = "binary(gauntlet_smoke)" 36 + slow-timeout = { period = "300s", terminate-after = 2 } 37 + 38 + [profile.gauntlet-nightly] 39 + retries = 0 40 + fail-fast = false 41 + test-threads = "num-cpus" 42 + slow-timeout = { period = "600s", terminate-after = 1 } 43 + 44 + [profile.gauntlet-soak] 45 + retries = 0 46 + fail-fast = false 47 + test-threads = 1 48 + 28 49 [test-groups] 29 50 serial-env-tests = { max-threads = 1 } 30 51 heavy-load-tests = { max-threads = 4 }
+8
crates/tranquil-store/Cargo.toml
··· 35 35 smallvec = "1" 36 36 uuid = { workspace = true } 37 37 tempfile = { version = "3", optional = true } 38 + clap = { workspace = true, optional = true } 39 + toml = { version = "0.8", optional = true } 38 40 39 41 [features] 40 42 test-harness = ["dep:tempfile"] 43 + gauntlet-cli = ["test-harness", "dep:clap", "dep:toml"] 44 + 45 + [[bin]] 46 + name = "tranquil-gauntlet" 47 + path = "src/bin/tranquil_gauntlet.rs" 48 + required-features = ["gauntlet-cli"] 41 49 42 50 [dev-dependencies] 43 51 tranquil-store = { path = ".", features = ["test-harness"] }
+698
crates/tranquil-store/src/bin/tranquil_gauntlet.rs
··· 1 + use std::io::{self, Write}; 2 + use std::path::{Path, PathBuf}; 3 + use std::process::ExitCode; 4 + use std::sync::Arc; 5 + use std::sync::atomic::{AtomicBool, Ordering}; 6 + use std::time::{Duration, Instant}; 7 + 8 + use clap::{Parser, Subcommand}; 9 + use serde::{Deserialize, Serialize}; 10 + use tokio::runtime::Runtime; 11 + use tranquil_store::gauntlet::{ 12 + ConfigOverrides, Gauntlet, GauntletReport, InvariantViolation, OpStream, RegressionRecord, 13 + Scenario, Seed, config_for, farm, 14 + shrink::{DEFAULT_MAX_SHRINK_ITERATIONS, shrink_failure}, 15 + }; 16 + 17 + const MAX_HOURS: f64 = 1.0e6; 18 + 19 + /// Deterministic storage-engine gauntlet: scenario fuzzing, shrinking, regression replay. 20 + /// 21 + /// Writes one NDjson record per seed to stdout; `farm` adds a final summary record. 22 + /// Progress, batch stats, interrupt notices, and errors go to stderr. 23 + /// Exits 0 on success, 1 on invariant violation, 2 on argument or runtime error. 24 + /// First SIGINT stops after the current batch; a second press aborts. 25 + /// 26 + /// Hopefully we'll catch super complicated tranquil-store bugs with this!! 27 + #[derive(Debug, Parser)] 28 + #[command(name = "tranquil-gauntlet", version)] 29 + struct Cli { 30 + #[command(subcommand)] 31 + cmd: Cmd, 32 + } 33 + 34 + #[derive(Debug, Subcommand)] 35 + enum Cmd { 36 + /// Run a scenario across many seeds in parallel. 37 + /// 38 + /// With --hours, the command loops batches of --seeds until the deadline passes. 39 + /// Without --hours, a single batch runs and the command exits. 40 + /// The last stdout line is always a `"type":"summary"` record. 41 + Farm { 42 + /// Scenario to run. 43 + #[arg(long, value_enum, required_unless_present = "config")] 44 + scenario: Option<Scenario>, 45 + 46 + /// First seed in the batch range. Default 0. 47 + #[arg(long)] 48 + seed_start: Option<u64>, 49 + 50 + /// Number of seeds per batch. Default 256. Must be > 0. 51 + #[arg(long)] 52 + seeds: Option<u64>, 53 + 54 + /// Wall-clock budget in hours; batches repeat until the deadline elapses. 55 + #[arg(long)] 56 + hours: Option<f64>, 57 + 58 + /// Directory to dump regression Json on failure. 59 + #[arg(long)] 60 + dump_regressions: Option<PathBuf>, 61 + 62 + /// Toml config with any of the above fields plus an `[overrides]` table. 63 + #[arg(long)] 64 + config: Option<PathBuf>, 65 + 66 + /// Skip shrinking when dumping regressions. 67 + #[arg(long)] 68 + no_shrink: bool, 69 + 70 + /// Max shrink attempts per failing seed. 71 + #[arg(long, default_value_t = DEFAULT_MAX_SHRINK_ITERATIONS, conflicts_with = "no_shrink")] 72 + shrink_budget: usize, 73 + }, 74 + /// Replay a single seed or a saved regression file. 75 + /// 76 + /// With --from, replays a regression Json produced by `farm --dump-regressions`. 77 + /// Otherwise supply --scenario and --seed, or a --config that sets them. 78 + /// Writes one NDjson record to stdout. 79 + Repro { 80 + /// Scenario to replay. Ignored when --from is set. 81 + #[arg(long, value_enum, conflicts_with = "from", required_unless_present_any = ["config", "from"])] 82 + scenario: Option<Scenario>, 83 + 84 + /// Seed to replay. Ignored when --from is set. 85 + #[arg(long, conflicts_with = "from", required_unless_present_any = ["config", "from"])] 86 + seed: Option<u64>, 87 + 88 + /// Toml config with optional scenario, seed, and overrides. 89 + #[arg(long, conflicts_with = "from")] 90 + config: Option<PathBuf>, 91 + 92 + /// Replay a saved regression Json from `farm --dump-regressions`. 93 + #[arg(long)] 94 + from: Option<PathBuf>, 95 + 96 + /// Directory to dump regression Json if replay fails. 97 + #[arg(long)] 98 + dump_regressions: Option<PathBuf>, 99 + 100 + /// Skip shrinking when dumping regressions. 101 + #[arg(long)] 102 + no_shrink: bool, 103 + 104 + /// Max shrink attempts when dumping regressions. 105 + #[arg(long, default_value_t = DEFAULT_MAX_SHRINK_ITERATIONS, conflicts_with = "no_shrink")] 106 + shrink_budget: usize, 107 + }, 108 + } 109 + 110 + #[derive(Debug, Deserialize)] 111 + #[serde(deny_unknown_fields)] 112 + struct ConfigFile { 113 + #[serde(default)] 114 + scenario: Option<Scenario>, 115 + #[serde(default)] 116 + seed: Option<u64>, 117 + #[serde(default)] 118 + seed_start: Option<u64>, 119 + #[serde(default)] 120 + seeds: Option<u64>, 121 + #[serde(default)] 122 + hours: Option<f64>, 123 + #[serde(default)] 124 + dump_regressions: Option<PathBuf>, 125 + #[serde(default)] 126 + overrides: ConfigOverrides, 127 + } 128 + 129 + fn load_config_file(path: &Path) -> Result<ConfigFile, String> { 130 + let raw = std::fs::read_to_string(path).map_err(|e| format!("read {}: {e}", path.display()))?; 131 + toml::from_str(&raw).map_err(|e| format!("parse {}: {e}", path.display())) 132 + } 133 + 134 + #[derive(Debug, Serialize)] 135 + struct NdjsonResult { 136 + scenario: &'static str, 137 + seed: u64, 138 + ops_executed: usize, 139 + op_errors: usize, 140 + restarts: usize, 141 + clean: bool, 142 + violations: Vec<NdjsonViolation>, 143 + wall_ms: u64, 144 + ops_in_stream: usize, 145 + } 146 + 147 + #[derive(Debug, Serialize)] 148 + struct NdjsonViolation { 149 + invariant: &'static str, 150 + detail: String, 151 + } 152 + 153 + #[derive(Debug, Serialize)] 154 + struct NdjsonSummary { 155 + #[serde(rename = "type")] 156 + kind: &'static str, 157 + scenario: &'static str, 158 + seeds_run: u64, 159 + clean: u64, 160 + failed: u64, 161 + total_ops: u64, 162 + wall_ms: u64, 163 + interrupted: bool, 164 + } 165 + 166 + fn emit_summary(summary: &NdjsonSummary) { 167 + let line = match serde_json::to_string(summary) { 168 + Ok(s) => s, 169 + Err(e) => { 170 + eprintln!("summary serialize failed: {e}"); 171 + return; 172 + } 173 + }; 174 + let stdout = io::stdout(); 175 + let mut w = stdout.lock(); 176 + if let Err(e) = writeln!(w, "{line}").and_then(|()| w.flush()) 177 + && e.kind() != io::ErrorKind::BrokenPipe 178 + { 179 + eprintln!("summary emit failed: {e}"); 180 + } 181 + } 182 + 183 + fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::Result<()> { 184 + let result = NdjsonResult { 185 + scenario: scenario.cli_name(), 186 + seed: report.seed.0, 187 + ops_executed: report.ops_executed.0, 188 + op_errors: report.op_errors.0, 189 + restarts: report.restarts.0, 190 + clean: report.is_clean(), 191 + violations: report 192 + .violations 193 + .iter() 194 + .map(|v: &InvariantViolation| NdjsonViolation { 195 + invariant: v.invariant, 196 + detail: v.detail.clone(), 197 + }) 198 + .collect(), 199 + wall_ms: u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX), 200 + ops_in_stream: report.ops.len(), 201 + }; 202 + let line = serde_json::to_string(&result).map_err(io::Error::other)?; 203 + let stdout = io::stdout(); 204 + let mut w = stdout.lock(); 205 + writeln!(w, "{line}")?; 206 + w.flush() 207 + } 208 + 209 + fn emit_or_log(scenario: Scenario, report: &GauntletReport, elapsed: Duration) { 210 + if let Err(e) = emit(scenario, report, elapsed) 211 + && e.kind() != io::ErrorKind::BrokenPipe 212 + { 213 + eprintln!("ndjson emit failed: {e}"); 214 + } 215 + } 216 + 217 + struct FarmPlan { 218 + scenario: Scenario, 219 + seed_start: u64, 220 + seeds: u64, 221 + hours: Option<f64>, 222 + dump_regressions: Option<PathBuf>, 223 + overrides: ConfigOverrides, 224 + shrink: bool, 225 + shrink_budget: usize, 226 + } 227 + 228 + #[allow(clippy::too_many_arguments)] 229 + fn resolve_farm( 230 + scenario: Option<Scenario>, 231 + seed_start: Option<u64>, 232 + seeds: Option<u64>, 233 + hours: Option<f64>, 234 + dump_regressions: Option<PathBuf>, 235 + config: Option<PathBuf>, 236 + shrink: bool, 237 + shrink_budget: usize, 238 + ) -> Result<FarmPlan, String> { 239 + let file: Option<ConfigFile> = config.as_ref().map(|p| load_config_file(p)).transpose()?; 240 + let scenario = scenario 241 + .or_else(|| file.as_ref().and_then(|f| f.scenario)) 242 + .ok_or("must pass --scenario or set `scenario` in --config")?; 243 + let seed_start = seed_start 244 + .or_else(|| file.as_ref().and_then(|f| f.seed_start)) 245 + .unwrap_or(0); 246 + let seeds = seeds 247 + .or_else(|| file.as_ref().and_then(|f| f.seeds)) 248 + .unwrap_or(256); 249 + if seeds == 0 { 250 + return Err("--seeds must be greater than zero".to_string()); 251 + } 252 + let hours = hours.or_else(|| file.as_ref().and_then(|f| f.hours)); 253 + if let Some(h) = hours { 254 + validate_hours(h)?; 255 + } 256 + if shrink && shrink_budget == 0 { 257 + return Err("--shrink-budget must be greater than zero".to_string()); 258 + } 259 + let dump_regressions = 260 + dump_regressions.or_else(|| file.as_ref().and_then(|f| f.dump_regressions.clone())); 261 + let overrides = file.map(|f| f.overrides).unwrap_or_default(); 262 + Ok(FarmPlan { 263 + scenario, 264 + seed_start, 265 + seeds, 266 + hours, 267 + dump_regressions, 268 + overrides, 269 + shrink, 270 + shrink_budget, 271 + }) 272 + } 273 + 274 + fn validate_hours(h: f64) -> Result<(), String> { 275 + if !h.is_finite() || h <= 0.0 { 276 + return Err(format!("invalid --hours={h}: must be positive and finite")); 277 + } 278 + if h > MAX_HOURS { 279 + return Err(format!("invalid --hours={h}: must not exceed {MAX_HOURS}")); 280 + } 281 + Ok(()) 282 + } 283 + 284 + enum ReproPlan { 285 + FromFile { 286 + record: RegressionRecord, 287 + dump_regressions: Option<PathBuf>, 288 + shrink: bool, 289 + shrink_budget: usize, 290 + }, 291 + FromSeed { 292 + scenario: Scenario, 293 + seed: Seed, 294 + overrides: ConfigOverrides, 295 + dump_regressions: Option<PathBuf>, 296 + shrink: bool, 297 + shrink_budget: usize, 298 + }, 299 + } 300 + 301 + #[allow(clippy::too_many_arguments)] 302 + fn resolve_repro( 303 + scenario: Option<Scenario>, 304 + seed: Option<u64>, 305 + config: Option<PathBuf>, 306 + from: Option<PathBuf>, 307 + dump_regressions: Option<PathBuf>, 308 + shrink: bool, 309 + shrink_budget: usize, 310 + ) -> Result<ReproPlan, String> { 311 + if shrink && shrink_budget == 0 { 312 + return Err("--shrink-budget must be greater than zero".to_string()); 313 + } 314 + if let Some(path) = from { 315 + let record = RegressionRecord::load(&path).map_err(|e| e.to_string())?; 316 + return Ok(ReproPlan::FromFile { 317 + record, 318 + dump_regressions, 319 + shrink, 320 + shrink_budget, 321 + }); 322 + } 323 + let file: Option<ConfigFile> = config.as_ref().map(|p| load_config_file(p)).transpose()?; 324 + let scenario = scenario 325 + .or_else(|| file.as_ref().and_then(|f| f.scenario)) 326 + .ok_or("must pass --scenario, set `scenario` in --config, or use --from")?; 327 + let seed = seed 328 + .or_else(|| file.as_ref().and_then(|f| f.seed)) 329 + .ok_or("must pass --seed, set `seed` in --config, or use --from")?; 330 + let overrides = file.map(|f| f.overrides).unwrap_or_default(); 331 + Ok(ReproPlan::FromSeed { 332 + scenario, 333 + seed: Seed(seed), 334 + overrides, 335 + dump_regressions, 336 + shrink, 337 + shrink_budget, 338 + }) 339 + } 340 + 341 + fn build_runtime() -> Result<Runtime, ExitCode> { 342 + Runtime::new().map_err(|e| { 343 + eprintln!("failed to build tokio runtime: {e}"); 344 + ExitCode::from(2) 345 + }) 346 + } 347 + 348 + fn install_interrupt(rt: &Runtime) -> Arc<AtomicBool> { 349 + let flag = Arc::new(AtomicBool::new(false)); 350 + let f = flag.clone(); 351 + rt.spawn(async move { 352 + if tokio::signal::ctrl_c().await.is_err() { 353 + return; 354 + } 355 + f.store(true, Ordering::Relaxed); 356 + eprintln!( 357 + "interrupt received, stopping after current batch; press Ctrl-C again to abort" 358 + ); 359 + if tokio::signal::ctrl_c().await.is_ok() { 360 + eprintln!("second interrupt, aborting"); 361 + std::process::exit(130); 362 + } 363 + }); 364 + flag 365 + } 366 + 367 + fn main() -> ExitCode { 368 + let cli = Cli::parse(); 369 + match cli.cmd { 370 + Cmd::Farm { 371 + scenario, 372 + seed_start, 373 + seeds, 374 + hours, 375 + dump_regressions, 376 + config, 377 + no_shrink, 378 + shrink_budget, 379 + } => { 380 + let plan = match resolve_farm( 381 + scenario, 382 + seed_start, 383 + seeds, 384 + hours, 385 + dump_regressions, 386 + config, 387 + !no_shrink, 388 + shrink_budget, 389 + ) { 390 + Ok(p) => p, 391 + Err(e) => { 392 + eprintln!("{e}"); 393 + return ExitCode::from(2); 394 + } 395 + }; 396 + let rt = match build_runtime() { 397 + Ok(rt) => rt, 398 + Err(code) => return code, 399 + }; 400 + let interrupt = install_interrupt(&rt); 401 + run_farm(plan, &rt, interrupt) 402 + } 403 + Cmd::Repro { 404 + scenario, 405 + seed, 406 + config, 407 + from, 408 + dump_regressions, 409 + no_shrink, 410 + shrink_budget, 411 + } => { 412 + let plan = match resolve_repro( 413 + scenario, 414 + seed, 415 + config, 416 + from, 417 + dump_regressions, 418 + !no_shrink, 419 + shrink_budget, 420 + ) { 421 + Ok(p) => p, 422 + Err(e) => { 423 + eprintln!("{e}"); 424 + return ExitCode::from(2); 425 + } 426 + }; 427 + let rt = match build_runtime() { 428 + Ok(rt) => rt, 429 + Err(code) => return code, 430 + }; 431 + run_repro(plan, &rt) 432 + } 433 + } 434 + } 435 + 436 + fn run_farm(plan: FarmPlan, rt: &Runtime, interrupt: Arc<AtomicBool>) -> ExitCode { 437 + let FarmPlan { 438 + scenario, 439 + seed_start, 440 + seeds, 441 + hours, 442 + dump_regressions, 443 + overrides, 444 + shrink, 445 + shrink_budget, 446 + } = plan; 447 + let deadline = hours.map(|h| Instant::now() + Duration::from_secs_f64(h * 3600.0)); 448 + let run_start = Instant::now(); 449 + let mut any_failed = false; 450 + let mut next_seed = seed_start; 451 + let mut total_seeds: u64 = 0; 452 + let mut total_clean: u64 = 0; 453 + let mut total_failed: u64 = 0; 454 + let mut total_ops: u64 = 0; 455 + 456 + loop { 457 + if interrupt.load(Ordering::Relaxed) { 458 + break; 459 + } 460 + if let Some(d) = deadline 461 + && Instant::now() >= d 462 + { 463 + break; 464 + } 465 + let end = match next_seed.checked_add(seeds) { 466 + Some(e) => e, 467 + None => { 468 + eprintln!("seed range overflowed u64: seed_start={next_seed} seeds={seeds}"); 469 + break; 470 + } 471 + }; 472 + let overrides_ref = &overrides; 473 + let batch_start = Instant::now(); 474 + let reports = farm::run_many_timed( 475 + |s| { 476 + let mut cfg = config_for(scenario, s); 477 + overrides_ref.apply_to(&mut cfg); 478 + cfg 479 + }, 480 + (next_seed..end).map(Seed), 481 + ); 482 + let batch_wall = batch_start.elapsed(); 483 + let batch_failed = reports.iter().filter(|(r, _)| !r.is_clean()).count(); 484 + let batch_clean = reports.len().saturating_sub(batch_failed); 485 + let batch_ops: u64 = reports 486 + .iter() 487 + .map(|(r, _)| r.ops_executed.0 as u64) 488 + .sum(); 489 + reports.iter().for_each(|(r, elapsed)| { 490 + if !r.is_clean() { 491 + any_failed = true; 492 + if let Some(root) = &dump_regressions { 493 + dump_regression(scenario, r, root, &overrides, shrink, shrink_budget, rt); 494 + } 495 + } 496 + emit_or_log(scenario, r, *elapsed); 497 + }); 498 + total_seeds += reports.len() as u64; 499 + total_clean += batch_clean as u64; 500 + total_failed += batch_failed as u64; 501 + total_ops += batch_ops; 502 + let wall_secs = batch_wall.as_secs_f64(); 503 + let ops_per_sec_display: String = if wall_secs > 0.0 { 504 + format!("{:.0} ops/s", batch_ops as f64 / wall_secs) 505 + } else { 506 + "n/a ops/s".to_string() 507 + }; 508 + eprintln!( 509 + "batch {next_seed}..{end}: {batch_clean} clean, {batch_failed} failed, {wall_secs:.1}s, {ops_per_sec_display}", 510 + ); 511 + if deadline.is_none() { 512 + break; 513 + } 514 + next_seed = end; 515 + } 516 + 517 + let wall_ms = u64::try_from(run_start.elapsed().as_millis()).unwrap_or(u64::MAX); 518 + emit_summary(&NdjsonSummary { 519 + kind: "summary", 520 + scenario: scenario.cli_name(), 521 + seeds_run: total_seeds, 522 + clean: total_clean, 523 + failed: total_failed, 524 + total_ops, 525 + wall_ms, 526 + interrupted: interrupt.load(Ordering::Relaxed), 527 + }); 528 + 529 + if any_failed { 530 + ExitCode::from(1) 531 + } else { 532 + ExitCode::SUCCESS 533 + } 534 + } 535 + 536 + fn dump_regression( 537 + scenario: Scenario, 538 + report: &GauntletReport, 539 + root: &Path, 540 + overrides: &ConfigOverrides, 541 + shrink: bool, 542 + shrink_budget: usize, 543 + rt: &Runtime, 544 + ) { 545 + let original_len = report.ops.len(); 546 + let (final_ops, final_report) = if shrink && original_len > 0 { 547 + let mut cfg = config_for(scenario, report.seed); 548 + overrides.apply_to(&mut cfg); 549 + let outcome = rt.block_on(shrink_failure( 550 + cfg, 551 + report.ops.clone(), 552 + report.clone(), 553 + shrink_budget, 554 + )); 555 + eprintln!( 556 + "shrank {} -> {} ops for seed {:016x} in {} runs", 557 + original_len, 558 + outcome.ops.len(), 559 + report.seed.0, 560 + outcome.iterations, 561 + ); 562 + (outcome.ops, outcome.report) 563 + } else { 564 + (report.ops.clone(), report.clone()) 565 + }; 566 + let record = RegressionRecord::from_report( 567 + scenario, 568 + overrides.clone(), 569 + &final_report, 570 + original_len, 571 + final_ops, 572 + ); 573 + match record.write_to(root) { 574 + Ok(path) => eprintln!("wrote regression to {}", path.display()), 575 + Err(e) => eprintln!("regression dump failed: {e}"), 576 + } 577 + } 578 + 579 + fn run_repro(plan: ReproPlan, rt: &Runtime) -> ExitCode { 580 + match plan { 581 + ReproPlan::FromFile { 582 + record, 583 + dump_regressions, 584 + shrink, 585 + shrink_budget, 586 + } => run_repro_from_record(record, dump_regressions, shrink, shrink_budget, rt), 587 + ReproPlan::FromSeed { 588 + scenario, 589 + seed, 590 + overrides, 591 + dump_regressions, 592 + shrink, 593 + shrink_budget, 594 + } => { 595 + let mut cfg = config_for(scenario, seed); 596 + overrides.apply_to(&mut cfg); 597 + let start = Instant::now(); 598 + let gauntlet = match Gauntlet::new(cfg) { 599 + Ok(g) => g, 600 + Err(e) => { 601 + eprintln!("gauntlet init failed: {e}"); 602 + return ExitCode::from(2); 603 + } 604 + }; 605 + let report = rt.block_on(gauntlet.run()); 606 + let elapsed = start.elapsed(); 607 + if !report.is_clean() 608 + && let Some(root) = &dump_regressions 609 + { 610 + dump_regression( 611 + scenario, 612 + &report, 613 + root, 614 + &overrides, 615 + shrink, 616 + shrink_budget, 617 + rt, 618 + ); 619 + } 620 + emit_or_log(scenario, &report, elapsed); 621 + if report.is_clean() { 622 + ExitCode::SUCCESS 623 + } else { 624 + ExitCode::from(1) 625 + } 626 + } 627 + } 628 + } 629 + 630 + fn run_repro_from_record( 631 + record: RegressionRecord, 632 + dump_regressions: Option<PathBuf>, 633 + shrink: bool, 634 + shrink_budget: usize, 635 + rt: &Runtime, 636 + ) -> ExitCode { 637 + let scenario = match record.scenario_enum() { 638 + Ok(s) => s, 639 + Err(e) => { 640 + eprintln!("{e}"); 641 + return ExitCode::from(2); 642 + } 643 + }; 644 + let cfg = match record.build_config() { 645 + Ok(c) => c, 646 + Err(e) => { 647 + eprintln!("{e}"); 648 + return ExitCode::from(2); 649 + } 650 + }; 651 + let shrunk_from = if record.original_ops_len > record.ops.len() { 652 + format!(", shrunk from {}", record.original_ops_len) 653 + } else { 654 + String::new() 655 + }; 656 + eprintln!( 657 + "replay {} seed {:016x}: {} ops{}, {} recorded violations", 658 + scenario.cli_name(), 659 + record.seed.0, 660 + record.ops.len(), 661 + shrunk_from, 662 + record.violations.len(), 663 + ); 664 + record.violations.iter().for_each(|v| { 665 + eprintln!("violation {}: {}", v.invariant, v.detail); 666 + }); 667 + let overrides = record.overrides.clone(); 668 + let ops: OpStream = record.op_stream(); 669 + let start = Instant::now(); 670 + let gauntlet = match Gauntlet::new(cfg) { 671 + Ok(g) => g, 672 + Err(e) => { 673 + eprintln!("build gauntlet: {e}"); 674 + return ExitCode::from(2); 675 + } 676 + }; 677 + let report = rt.block_on(gauntlet.run_with_ops(ops)); 678 + let elapsed = start.elapsed(); 679 + if !report.is_clean() 680 + && let Some(root) = &dump_regressions 681 + { 682 + dump_regression( 683 + scenario, 684 + &report, 685 + root, 686 + &overrides, 687 + shrink, 688 + shrink_budget, 689 + rt, 690 + ); 691 + } 692 + emit_or_log(scenario, &report, elapsed); 693 + if report.is_clean() { 694 + ExitCode::SUCCESS 695 + } else { 696 + ExitCode::from(1) 697 + } 698 + }
+50 -5
crates/tranquil-store/src/gauntlet/farm.rs
··· 1 1 use std::cell::RefCell; 2 + use std::panic::{AssertUnwindSafe, catch_unwind}; 3 + use std::time::{Duration, Instant}; 2 4 3 5 use rayon::prelude::*; 4 6 use tokio::runtime::Runtime; 5 7 6 - use super::op::Seed; 7 - use super::runner::{Gauntlet, GauntletConfig, GauntletReport}; 8 + use super::invariants::InvariantViolation; 9 + use super::op::{OpStream, Seed}; 10 + use super::runner::{ 11 + Gauntlet, GauntletConfig, GauntletReport, OpErrorCount, OpsExecuted, RestartCount, 12 + }; 8 13 9 14 thread_local! { 10 15 static RUNTIME: RefCell<Option<Runtime>> = const { RefCell::new(None) }; ··· 29 34 where 30 35 F: Fn(Seed) -> GauntletConfig + Sync + Send, 31 36 { 37 + run_many_timed(make_config, seeds) 38 + .into_iter() 39 + .map(|(r, _)| r) 40 + .collect() 41 + } 42 + 43 + pub fn run_many_timed<F>( 44 + make_config: F, 45 + seeds: impl IntoIterator<Item = Seed>, 46 + ) -> Vec<(GauntletReport, Duration)> 47 + where 48 + F: Fn(Seed) -> GauntletConfig + Sync + Send, 49 + { 32 50 let seeds: Vec<Seed> = seeds.into_iter().collect(); 33 51 seeds 34 52 .into_par_iter() 35 53 .map(|s| { 36 - let cfg = make_config(s); 37 - let gauntlet = Gauntlet::new(cfg).expect("build gauntlet"); 38 - with_runtime(|rt| rt.block_on(gauntlet.run())) 54 + let start = Instant::now(); 55 + let outcome = catch_unwind(AssertUnwindSafe(|| { 56 + let cfg = make_config(s); 57 + let gauntlet = Gauntlet::new(cfg).expect("build gauntlet"); 58 + with_runtime(|rt| rt.block_on(gauntlet.run())) 59 + })); 60 + let report = outcome.unwrap_or_else(|payload| { 61 + RUNTIME.with(|cell| cell.borrow_mut().take()); 62 + panic_report(s, payload) 63 + }); 64 + (report, start.elapsed()) 39 65 }) 40 66 .collect() 41 67 } 68 + 69 + fn panic_report(seed: Seed, payload: Box<dyn std::any::Any + Send>) -> GauntletReport { 70 + let msg = payload 71 + .downcast_ref::<&'static str>() 72 + .map(|s| (*s).to_string()) 73 + .or_else(|| payload.downcast_ref::<String>().cloned()) 74 + .unwrap_or_else(|| "non-string panic payload".to_string()); 75 + GauntletReport { 76 + seed, 77 + ops_executed: OpsExecuted(0), 78 + op_errors: OpErrorCount(0), 79 + restarts: RestartCount(0), 80 + violations: vec![InvariantViolation { 81 + invariant: "FarmPanic", 82 + detail: msg, 83 + }], 84 + ops: OpStream::empty(), 85 + } 86 + }
+117
crates/tranquil-store/src/gauntlet/overrides.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + 3 + use super::runner::{GauntletConfig, MaxFileSize, RunLimits, ShardCount, WallMs}; 4 + use super::workload::OpCount; 5 + 6 + #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] 7 + #[serde(deny_unknown_fields)] 8 + pub struct ConfigOverrides { 9 + #[serde(default, skip_serializing_if = "Option::is_none")] 10 + pub op_count: Option<usize>, 11 + #[serde(default, skip_serializing_if = "Option::is_none")] 12 + pub max_wall_ms: Option<u64>, 13 + #[serde(default, skip_serializing_if = "StoreOverrides::is_empty")] 14 + pub store: StoreOverrides, 15 + } 16 + 17 + #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] 18 + #[serde(deny_unknown_fields)] 19 + pub struct StoreOverrides { 20 + #[serde(default, skip_serializing_if = "Option::is_none")] 21 + pub max_file_size: Option<u64>, 22 + #[serde(default, skip_serializing_if = "Option::is_none")] 23 + pub shard_count: Option<u8>, 24 + #[serde(default, skip_serializing_if = "GroupCommitOverrides::is_empty")] 25 + pub group_commit: GroupCommitOverrides, 26 + } 27 + 28 + impl StoreOverrides { 29 + pub fn is_empty(&self) -> bool { 30 + self.max_file_size.is_none() && self.shard_count.is_none() && self.group_commit.is_empty() 31 + } 32 + } 33 + 34 + #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] 35 + #[serde(deny_unknown_fields)] 36 + pub struct GroupCommitOverrides { 37 + #[serde(default, skip_serializing_if = "Option::is_none")] 38 + pub max_batch_size: Option<usize>, 39 + #[serde(default, skip_serializing_if = "Option::is_none")] 40 + pub channel_capacity: Option<usize>, 41 + #[serde(default, skip_serializing_if = "Option::is_none")] 42 + pub checkpoint_interval_ms: Option<u64>, 43 + #[serde(default, skip_serializing_if = "Option::is_none")] 44 + pub checkpoint_write_threshold: Option<u64>, 45 + } 46 + 47 + impl GroupCommitOverrides { 48 + pub fn is_empty(&self) -> bool { 49 + self.max_batch_size.is_none() 50 + && self.channel_capacity.is_none() 51 + && self.checkpoint_interval_ms.is_none() 52 + && self.checkpoint_write_threshold.is_none() 53 + } 54 + } 55 + 56 + impl ConfigOverrides { 57 + pub fn apply_to(&self, cfg: &mut GauntletConfig) { 58 + if let Some(n) = self.op_count { 59 + cfg.op_count = OpCount(n); 60 + } 61 + if let Some(ms) = self.max_wall_ms { 62 + cfg.limits = RunLimits { 63 + max_wall_ms: Some(WallMs(ms)), 64 + }; 65 + } 66 + if let Some(n) = self.store.max_file_size { 67 + cfg.store.max_file_size = MaxFileSize(n); 68 + } 69 + if let Some(n) = self.store.shard_count { 70 + cfg.store.shard_count = ShardCount(n); 71 + } 72 + let gc = &self.store.group_commit; 73 + if let Some(n) = gc.max_batch_size { 74 + cfg.store.group_commit.max_batch_size = n; 75 + } 76 + if let Some(n) = gc.channel_capacity { 77 + cfg.store.group_commit.channel_capacity = n; 78 + } 79 + if let Some(n) = gc.checkpoint_interval_ms { 80 + cfg.store.group_commit.checkpoint_interval_ms = n; 81 + } 82 + if let Some(n) = gc.checkpoint_write_threshold { 83 + cfg.store.group_commit.checkpoint_write_threshold = n; 84 + } 85 + } 86 + } 87 + 88 + #[cfg(test)] 89 + mod tests { 90 + use super::*; 91 + 92 + #[test] 93 + fn default_overrides_serialize_empty() { 94 + let o = ConfigOverrides::default(); 95 + let json = serde_json::to_string(&o).unwrap(); 96 + assert_eq!(json, "{}"); 97 + } 98 + 99 + #[test] 100 + fn round_trip_preserves_set_fields() { 101 + let o = ConfigOverrides { 102 + op_count: Some(42), 103 + store: StoreOverrides { 104 + max_file_size: Some(4096), 105 + group_commit: GroupCommitOverrides { 106 + max_batch_size: Some(16), 107 + ..GroupCommitOverrides::default() 108 + }, 109 + ..StoreOverrides::default() 110 + }, 111 + ..ConfigOverrides::default() 112 + }; 113 + let json = serde_json::to_string(&o).unwrap(); 114 + let back: ConfigOverrides = serde_json::from_str(&json).unwrap(); 115 + assert_eq!(o, back); 116 + } 117 + }
+87
crates/tranquil-store/src/gauntlet/scenarios.rs
··· 53 53 } 54 54 } 55 55 56 + pub const fn cli_name(self) -> &'static str { 57 + match self { 58 + Self::SmokePR => "smoke-pr", 59 + Self::MstChurn => "mst-churn", 60 + Self::MstRestartChurn => "mst-restart-churn", 61 + Self::FullStackRestart => "full-stack-restart", 62 + Self::CatastrophicChurn => "catastrophic-churn", 63 + Self::HugeValues => "huge-values", 64 + Self::TinyBatches => "tiny-batches", 65 + Self::GiantBatches => "giant-batches", 66 + Self::ManyFiles => "many-files", 67 + Self::ModerateFaults => "moderate-faults", 68 + Self::AggressiveFaults => "aggressive-faults", 69 + Self::TornPages => "torn-pages", 70 + Self::Fsyncgate => "fsyncgate", 71 + Self::FirehoseFanout => "firehose-fanout", 72 + Self::ContendedReaders => "contended-readers", 73 + Self::ContendedWriters => "contended-writers", 74 + } 75 + } 76 + 77 + pub const fn description(self) -> &'static str { 78 + match self { 79 + Self::SmokePR => "60s canary, 10k ops, core invariants. Default PR gate.", 80 + Self::MstChurn => "100k churn, no restart. Refcount + reachability focus.", 81 + Self::MstRestartChurn => "100k churn with Poisson restart bursts every ~5k ops.", 82 + Self::FullStackRestart => "5k ops, deterministic restart every 500 ops.", 83 + Self::CatastrophicChurn => { 84 + "1M ops, phase-2 invariants, Poisson restart. 30 min budget." 85 + } 86 + Self::HugeValues => "Heavy-tail values up to 16 MiB. 32 MiB file cap.", 87 + Self::TinyBatches => "Group-commit batch size 1, tight checkpoints, 4 KiB files.", 88 + Self::GiantBatches => "Group-commit batch size 100k, 16 MiB files.", 89 + Self::ManyFiles => "256-byte file cap, many segments, delete-heavy.", 90 + Self::ModerateFaults => { 91 + "Simulated IO with moderate fault config. CrashAtSyscall restarts." 92 + } 93 + Self::AggressiveFaults => { 94 + "Simulated IO with aggressive fault config. CrashAtSyscall restarts." 95 + } 96 + Self::TornPages => "Torn-page faults only, 20k ops.", 97 + Self::Fsyncgate => "Fsync-drop faults only, 10k ops.", 98 + Self::FirehoseFanout => { 99 + "Eventlog-heavy workload with FSYNC_ORDERING / MONOTONIC_SEQ / TOMBSTONE_BOUND invariants." 100 + } 101 + Self::ContendedReaders => "60% reads, 64 writer tasks, simulated moderate faults.", 102 + Self::ContendedWriters => "Add/delete heavy, 32 writer tasks, simulated moderate faults.", 103 + } 104 + } 105 + 56 106 pub fn from_name(name: &str) -> Option<Self> { 57 107 Self::ALL.iter().copied().find(|s| s.name() == name) 108 + } 109 + 110 + pub fn from_cli_name(name: &str) -> Option<Self> { 111 + Self::ALL.iter().copied().find(|s| s.cli_name() == name) 58 112 } 59 113 60 114 pub const ALL: &'static [Scenario] = &[ ··· 75 129 Self::ContendedReaders, 76 130 Self::ContendedWriters, 77 131 ]; 132 + } 133 + 134 + impl serde::Serialize for Scenario { 135 + fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> { 136 + serializer.serialize_str(self.cli_name()) 137 + } 138 + } 139 + 140 + impl<'de> serde::Deserialize<'de> for Scenario { 141 + fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> { 142 + let s = <std::borrow::Cow<'de, str>>::deserialize(deserializer)?; 143 + Self::from_cli_name(&s).ok_or_else(|| { 144 + serde::de::Error::custom(format!( 145 + "unknown scenario {s:?}; expected one of {}", 146 + Self::ALL 147 + .iter() 148 + .map(|s| s.cli_name()) 149 + .collect::<Vec<_>>() 150 + .join(", ") 151 + )) 152 + }) 153 + } 154 + } 155 + 156 + #[cfg(feature = "gauntlet-cli")] 157 + impl clap::ValueEnum for Scenario { 158 + fn value_variants<'a>() -> &'a [Self] { 159 + Self::ALL 160 + } 161 + 162 + fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> { 163 + Some(clap::builder::PossibleValue::new(self.cli_name()).help(self.description())) 164 + } 78 165 } 79 166 80 167 impl std::fmt::Display for Scenario {
+15
justfile
··· 25 25 test-store-sim-nightly: 26 26 SQLX_OFFLINE=true TRANQUIL_SIM_SEEDS=10000 cargo nextest run -p tranquil-store --features tranquil-store/test-harness --profile sim-nightly 27 27 28 + gauntlet-pr: 29 + SQLX_OFFLINE=true cargo nextest run -p tranquil-store --features tranquil-store/test-harness --profile gauntlet-pr --test gauntlet_smoke 30 + 31 + gauntlet-nightly HOURS="6": 32 + SQLX_OFFLINE=true GAUNTLET_DURATION_HOURS={{HOURS}} cargo nextest run -p tranquil-store --features tranquil-store/test-harness --profile gauntlet-nightly --test gauntlet_smoke --run-ignored all 33 + 34 + gauntlet-farm SCENARIO HOURS="6" DUMP="proptest-regressions": 35 + SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- farm --scenario {{SCENARIO}} --hours {{HOURS}} --dump-regressions {{DUMP}} 36 + 37 + gauntlet-repro SEED SCENARIO="smoke-pr": 38 + SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --scenario {{SCENARIO}} --seed {{SEED}} 39 + 40 + gauntlet-repro-from FILE: 41 + SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --from {{FILE}} 42 + 28 43 test-unit: 29 44 SQLX_OFFLINE=true cargo test --test dpop_unit --test validation_edge_cases --test scope_edge_cases 30 45