Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
221
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): durable-tail recovery + sync semantics

Lewis: May this revision serve well! <lu5a@proton.me>

+841 -71
+10 -7
crates/tranquil-pds/src/state.rs
··· 523 523 let metastore = 524 524 Metastore::open(&metastore_dir, metastore_config).expect("failed to open metastore"); 525 525 526 - let blockstore = TranquilBlockStore::open(BlockStoreConfig { 527 - data_dir: blockstore_data_dir, 528 - index_dir: blockstore_index_dir, 529 - max_file_size: store_cfg.max_blockstore_file_size, 530 - group_commit: Default::default(), 531 - shard_count: tranquil_store::blockstore::DEFAULT_SHARD_COUNT, 532 - }) 526 + let blockstore = TranquilBlockStore::open_with_retry( 527 + BlockStoreConfig { 528 + data_dir: blockstore_data_dir, 529 + index_dir: blockstore_index_dir, 530 + max_file_size: store_cfg.max_blockstore_file_size, 531 + group_commit: Default::default(), 532 + shard_count: tranquil_store::blockstore::DEFAULT_SHARD_COUNT, 533 + }, 534 + tranquil_store::blockstore::OpenRetryPolicy::default(), 535 + ) 533 536 .expect("failed to open blockstore"); 534 537 535 538 let event_log = EventLog::open(
+81 -2
crates/tranquil-store/src/bin/tranquil_gauntlet.rs
··· 64 64 #[arg(long)] 65 65 config: Option<PathBuf>, 66 66 67 + /// Tempdir parent for `IoBackend::Real` seeds only - ignored for 68 + /// flaky-mount and simulated backends. Repeatable; each rayon worker 69 + /// thread is pinned to one root so concurrent seeds on different 70 + /// threads land on different mounts. Default `/tmp`. Also reads 71 + /// colon-separated paths from `GAUNTLET_SCRATCH_ROOTS`. Set 72 + /// `RAYON_NUM_THREADS=N` to cap workers; for full distribution pass 73 + /// one root per worker. 74 + #[arg(long)] 75 + scratch_root: Vec<PathBuf>, 76 + 67 77 /// Skip shrinking when dumping regressions. 68 78 #[arg(long)] 69 79 no_shrink: bool, ··· 95 105 #[arg(long)] 96 106 dump_regressions: Option<PathBuf>, 97 107 108 + /// Same as `farm --scratch-root`: tempdir parent for 109 + /// `IoBackend::Real` seeds only, pinned per worker thread. Ignored 110 + /// for flaky-mount and simulated backends. Repeatable; reads 111 + /// colon-separated paths from `GAUNTLET_SCRATCH_ROOTS`. 112 + #[arg(long)] 113 + scratch_root: Vec<PathBuf>, 114 + 98 115 /// Skip shrinking when dumping regressions. 99 116 #[arg(long)] 100 117 no_shrink: bool, ··· 159 176 #[serde(default)] 160 177 dump_regressions: Option<PathBuf>, 161 178 #[serde(default)] 179 + scratch_roots: Vec<PathBuf>, 180 + #[serde(default)] 162 181 overrides: ConfigOverrides, 163 182 } 164 183 ··· 177 196 seeds: Option<u64>, 178 197 #[serde(default)] 179 198 dump_regressions: Option<PathBuf>, 199 + #[serde(default)] 200 + scratch_roots: Vec<PathBuf>, 180 201 #[serde(default)] 181 202 base_overrides: ConfigOverrides, 182 203 #[serde(default)] ··· 405 426 seeds: u64, 406 427 hours: Option<f64>, 407 428 dump_regressions: Option<PathBuf>, 429 + scratch_roots: Vec<PathBuf>, 408 430 overrides: ConfigOverrides, 409 431 shrink: bool, 410 432 shrink_budget: usize, ··· 418 440 hours: Option<f64>, 419 441 dump_regressions: Option<PathBuf>, 420 442 config: Option<PathBuf>, 443 + scratch_root: Vec<PathBuf>, 421 444 shrink: bool, 422 445 shrink_budget: usize, 423 446 ) -> Result<FarmPlan, String> { ··· 443 466 } 444 467 let dump_regressions = 445 468 dump_regressions.or_else(|| file.as_ref().and_then(|f| f.dump_regressions.clone())); 469 + let file_scratch_roots = file 470 + .as_ref() 471 + .map(|f| f.scratch_roots.clone()) 472 + .unwrap_or_default(); 473 + let scratch_roots = resolve_scratch_roots(scratch_root, file_scratch_roots)?; 446 474 let overrides = file.map(|f| f.overrides).unwrap_or_default(); 447 475 Ok(FarmPlan { 448 476 scenario, ··· 450 478 seeds, 451 479 hours, 452 480 dump_regressions, 481 + scratch_roots, 453 482 overrides, 454 483 shrink, 455 484 shrink_budget, 456 485 }) 457 486 } 458 487 488 + const SCRATCH_ROOTS_ENV: &str = "GAUNTLET_SCRATCH_ROOTS"; 489 + 490 + fn resolve_scratch_roots( 491 + cli: Vec<PathBuf>, 492 + config_file: Vec<PathBuf>, 493 + ) -> Result<Vec<PathBuf>, String> { 494 + let env_roots: Vec<PathBuf> = std::env::var(SCRATCH_ROOTS_ENV) 495 + .ok() 496 + .filter(|s| !s.is_empty()) 497 + .map(|s| s.split(':').map(PathBuf::from).collect()) 498 + .unwrap_or_default(); 499 + let candidate: Vec<PathBuf> = if !cli.is_empty() { 500 + cli 501 + } else if !config_file.is_empty() { 502 + config_file 503 + } else { 504 + env_roots 505 + }; 506 + candidate 507 + .into_iter() 508 + .map(|p| validate_scratch_root(&p).map(|_| p)) 509 + .collect() 510 + } 511 + 512 + fn validate_scratch_root(path: &Path) -> Result<(), String> { 513 + match path.metadata() { 514 + Ok(m) if m.is_dir() => {} 515 + Ok(_) => return Err(format!("scratch root not a directory: {}", path.display())), 516 + Err(e) => return Err(format!("scratch root {}: {e}", path.display())), 517 + } 518 + tempfile::Builder::new() 519 + .prefix(".tranquil-gauntlet-probe-") 520 + .tempfile_in(path) 521 + .map(|_| ()) 522 + .map_err(|e| format!("scratch root {} not writable: {e}", path.display())) 523 + } 524 + 459 525 fn validate_hours(h: f64) -> Result<(), String> { 460 526 if !h.is_finite() || h <= 0.0 { 461 527 return Err(format!("invalid --hours={h}: must be positive and finite")); ··· 564 630 hours, 565 631 dump_regressions, 566 632 config, 633 + scratch_root, 567 634 no_shrink, 568 635 shrink_budget, 569 636 } => { ··· 574 641 hours, 575 642 dump_regressions, 576 643 config, 644 + scratch_root, 577 645 !no_shrink, 578 646 shrink_budget, 579 647 ) { ··· 625 693 seed_start, 626 694 seeds, 627 695 dump_regressions, 696 + scratch_root, 628 697 no_shrink, 629 698 shrink_budget, 630 699 max_runs, ··· 634 703 seed_start, 635 704 seeds, 636 705 dump_regressions, 706 + scratch_root, 637 707 !no_shrink, 638 708 shrink_budget, 639 709 max_runs, ··· 659 729 seed_start: u64, 660 730 seeds: u64, 661 731 dump_regressions: Option<PathBuf>, 732 + scratch_roots: Vec<PathBuf>, 662 733 shrink: bool, 663 734 shrink_budget: usize, 664 735 base_overrides: ConfigOverrides, 665 736 axes: Vec<SweepAxisValues>, 666 737 } 667 738 739 + #[allow(clippy::too_many_arguments)] 668 740 fn resolve_sweep( 669 741 config: PathBuf, 670 742 seed_start: Option<u64>, 671 743 seeds: Option<u64>, 672 744 dump_regressions: Option<PathBuf>, 745 + scratch_root: Vec<PathBuf>, 673 746 shrink: bool, 674 747 shrink_budget: usize, 675 748 max_runs: u64, ··· 687 760 return Err("--shrink-budget must be greater than zero".to_string()); 688 761 } 689 762 let dump_regressions = dump_regressions.or(file.dump_regressions.clone()); 763 + let scratch_roots = resolve_scratch_roots(scratch_root, file.scratch_roots.clone())?; 690 764 let axes = file.axes.axis_values(); 691 765 if axes.is_empty() { 692 766 return Err("sweep produced no combinations".to_string()); ··· 705 779 seed_start, 706 780 seeds, 707 781 dump_regressions, 782 + scratch_roots, 708 783 shrink, 709 784 shrink_budget, 710 785 base_overrides: file.base_overrides, ··· 749 824 seed_start, 750 825 seeds, 751 826 dump_regressions, 827 + scratch_roots, 752 828 shrink, 753 829 shrink_budget, 754 830 base_overrides, ··· 782 858 axis_values.apply_to(&mut overrides); 783 859 let combo_start = Instant::now(); 784 860 let overrides_for_farm = overrides.clone(); 785 - let reports = farm::run_many_timed( 861 + let reports = farm::run_many_timed_with_scratch_roots( 786 862 move |s| { 787 863 let mut cfg = config_for(scenario, s); 788 864 overrides_for_farm.apply_to(&mut cfg); 789 865 cfg 790 866 }, 867 + &scratch_roots, 791 868 (seed_start..end).map(Seed), 792 869 ); 793 870 let combo_wall = combo_start.elapsed(); ··· 840 917 seeds, 841 918 hours, 842 919 dump_regressions, 920 + scratch_roots, 843 921 overrides, 844 922 shrink, 845 923 shrink_budget, ··· 871 949 }; 872 950 let overrides_ref = &overrides; 873 951 let batch_start = Instant::now(); 874 - let reports = farm::run_many_timed( 952 + let reports = farm::run_many_timed_with_scratch_roots( 875 953 |s| { 876 954 let mut cfg = config_for(scenario, s); 877 955 overrides_ref.apply_to(&mut cfg); 878 956 cfg 879 957 }, 958 + &scratch_roots, 880 959 (next_seed..end).map(Seed), 881 960 ); 882 961 let batch_wall = batch_start.elapsed();
+1 -1
crates/tranquil-store/src/blockstore/mod.rs
··· 27 27 pub use manager::{CachedHandle, DEFAULT_MAX_FILE_SIZE, DataFileManager}; 28 28 pub use reader::{BlockStoreReader, ReadError}; 29 29 pub use store::QuiesceGuard; 30 - pub use store::{BlockStoreConfig, DEFAULT_SHARD_COUNT, TranquilBlockStore}; 30 + pub use store::{BlockStoreConfig, DEFAULT_SHARD_COUNT, OpenRetryPolicy, TranquilBlockStore}; 31 31 pub use types::{ 32 32 BlockLength, BlockLocation, BlockOffset, BlockstoreSnapshot, CidBytes, CollectionResult, 33 33 CommitEpoch, CompactionResult, DataFileId, EpochCounter, HintOffset, IndexEntry, LivenessInfo,
+291 -15
crates/tranquil-store/src/blockstore/store.rs
··· 1 1 use std::collections::HashMap; 2 2 use std::io; 3 + use std::num::NonZeroU8; 3 4 use std::path::{Path, PathBuf}; 4 5 use std::sync::Arc; 6 + use std::time::Duration; 5 7 6 8 use bytes::Bytes; 7 9 use cid::Cid; ··· 150 152 } 151 153 } 152 154 155 + #[derive(Clone, Copy, Debug)] 156 + pub struct OpenRetryPolicy { 157 + pub max_attempts: NonZeroU8, 158 + pub initial_backoff: Duration, 159 + pub max_backoff: Duration, 160 + } 161 + 162 + impl Default for OpenRetryPolicy { 163 + fn default() -> Self { 164 + const DEFAULT_MAX_ATTEMPTS: NonZeroU8 = NonZeroU8::new(5).unwrap(); 165 + Self { 166 + max_attempts: DEFAULT_MAX_ATTEMPTS, 167 + initial_backoff: Duration::from_millis(100), 168 + max_backoff: Duration::from_secs(2), 169 + } 170 + } 171 + } 172 + 153 173 impl TranquilBlockStore<RealIO> { 154 174 pub fn open(config: BlockStoreConfig) -> Result<Self, RepoError> { 155 175 Self::open_with_hook(config, None) ··· 161 181 ) -> Result<Self, RepoError> { 162 182 Self::open_with_io_hook(config, RealIO::new, post_sync_hook) 163 183 } 184 + 185 + pub fn open_with_retry( 186 + config: BlockStoreConfig, 187 + policy: OpenRetryPolicy, 188 + ) -> Result<Self, RepoError> { 189 + retry_with_backoff(policy, &mut |_| Self::open(config.clone())) 190 + } 191 + } 192 + 193 + fn retry_with_backoff<T, F>(policy: OpenRetryPolicy, op: &mut F) -> Result<T, RepoError> 194 + where 195 + F: FnMut(u8) -> Result<T, RepoError>, 196 + { 197 + retry_attempt(policy, op, 0, policy.initial_backoff) 198 + } 199 + 200 + fn retry_attempt<T, F>( 201 + policy: OpenRetryPolicy, 202 + op: &mut F, 203 + attempt: u8, 204 + backoff: Duration, 205 + ) -> Result<T, RepoError> 206 + where 207 + F: FnMut(u8) -> Result<T, RepoError>, 208 + { 209 + match op(attempt) { 210 + Ok(t) => Ok(t), 211 + Err(e) if attempt + 1 >= policy.max_attempts.get() => Err(e), 212 + Err(e) => { 213 + tracing::warn!( 214 + attempt, 215 + error = %e, 216 + backoff_ms = u64::try_from(backoff.as_millis()).unwrap_or(u64::MAX), 217 + "blockstore open failed, retrying" 218 + ); 219 + std::thread::sleep(backoff); 220 + retry_attempt( 221 + policy, 222 + op, 223 + attempt + 1, 224 + (backoff * 2).min(policy.max_backoff), 225 + ) 226 + } 227 + } 164 228 } 165 229 166 230 impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> { ··· 331 395 let scan_pos = &mut { start_offset }; 332 396 let (scanned_entries, last_valid_end) = std::iter::from_fn(|| { 333 397 match super::data_file::decode_block_record(io, fd, *scan_pos, file_size) { 334 - Err(e) => { 335 - tracing::warn!( 336 - file_id = %file_id, 337 - offset = scan_pos.raw(), 338 - error = %e, 339 - "IO error during recovery scan, stopping" 340 - ); 341 - None 342 - } 398 + Err(e) => Some(Err(e)), 343 399 Ok(None) => None, 344 400 Ok(Some(ReadBlockRecord::Valid { 345 401 offset, ··· 354 410 let record_size = BLOCK_RECORD_OVERHEAD as u64 + u64::from(raw_len); 355 411 let new_end = offset.advance(record_size); 356 412 *scan_pos = new_end; 357 - Some(( 413 + Some(Ok(( 358 414 cid_bytes, 359 415 BlockLocation { 360 416 file_id, ··· 362 418 length, 363 419 }, 364 420 new_end, 365 - )) 421 + ))) 366 422 } 367 423 Ok(Some(ReadBlockRecord::Corrupted { .. } | ReadBlockRecord::Truncated { .. })) => { 368 424 None 369 425 } 370 426 } 371 427 }) 372 - .fold( 428 + .try_fold( 373 429 (Vec::new(), start_offset), 374 - |(mut entries, _), (cid, loc, new_end)| { 430 + |(mut entries, _), item: io::Result<_>| { 431 + let (cid, loc, new_end) = item?; 375 432 entries.push((cid, loc)); 376 - (entries, new_end) 433 + Ok::<_, io::Error>((entries, new_end)) 377 434 }, 378 - ); 435 + ) 436 + .map_err(|e| { 437 + tracing::warn!( 438 + file_id = %file_id, 439 + offset = scan_pos.raw(), 440 + error = %e, 441 + "IO error during recovery scan, aborting to preserve durable tail" 442 + ); 443 + RepoError::storage(e) 444 + })?; 379 445 380 446 if file_size > last_valid_end.raw() { 381 447 tracing::info!( ··· 713 779 Ok(self.index.get(&cid_bytes).map(|entry| entry.refcount.raw())) 714 780 } 715 781 } 782 + 783 + #[cfg(test)] 784 + mod tests { 785 + use super::*; 786 + 787 + use std::collections::HashMap; 788 + use std::sync::Mutex; 789 + use std::sync::atomic::{AtomicBool, Ordering}; 790 + 791 + use crate::blockstore::data_file::{ 792 + BLOCK_FORMAT_VERSION, BLOCK_HEADER_SIZE, BLOCK_MAGIC, encode_block_record, 793 + }; 794 + use crate::blockstore::manager::DATA_FILE_EXTENSION; 795 + use crate::io::FileId; 796 + 797 + struct EioOnReadAtRange { 798 + inner: RealIO, 799 + target_path: PathBuf, 800 + target_min: u64, 801 + target_max: u64, 802 + fired: AtomicBool, 803 + fd_paths: Mutex<HashMap<FileId, PathBuf>>, 804 + } 805 + 806 + impl StorageIO for EioOnReadAtRange { 807 + fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> { 808 + let fd = self.inner.open(path, opts)?; 809 + self.fd_paths 810 + .lock() 811 + .unwrap() 812 + .insert(fd, path.to_path_buf()); 813 + Ok(fd) 814 + } 815 + 816 + fn close(&self, fd: FileId) -> io::Result<()> { 817 + self.fd_paths.lock().unwrap().remove(&fd); 818 + self.inner.close(fd) 819 + } 820 + 821 + fn read_at(&self, fd: FileId, offset: u64, buf: &mut [u8]) -> io::Result<usize> { 822 + let path_match = self.fd_paths.lock().unwrap().get(&fd).cloned(); 823 + let in_target_range = path_match.as_ref() == Some(&self.target_path) 824 + && offset >= self.target_min 825 + && offset <= self.target_max; 826 + if in_target_range && !self.fired.swap(true, Ordering::SeqCst) { 827 + return Err(io::Error::other("simulated EIO on read")); 828 + } 829 + self.inner.read_at(fd, offset, buf) 830 + } 831 + 832 + fn write_at(&self, fd: FileId, offset: u64, buf: &[u8]) -> io::Result<usize> { 833 + self.inner.write_at(fd, offset, buf) 834 + } 835 + 836 + fn sync(&self, fd: FileId) -> io::Result<()> { 837 + self.inner.sync(fd) 838 + } 839 + 840 + fn file_size(&self, fd: FileId) -> io::Result<u64> { 841 + self.inner.file_size(fd) 842 + } 843 + 844 + fn truncate(&self, fd: FileId, size: u64) -> io::Result<()> { 845 + self.inner.truncate(fd, size) 846 + } 847 + 848 + fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { 849 + self.inner.rename(from, to) 850 + } 851 + 852 + fn delete(&self, path: &Path) -> io::Result<()> { 853 + self.inner.delete(path) 854 + } 855 + 856 + fn mkdir(&self, path: &Path) -> io::Result<()> { 857 + self.inner.mkdir(path) 858 + } 859 + 860 + fn sync_dir(&self, path: &Path) -> io::Result<()> { 861 + self.inner.sync_dir(path) 862 + } 863 + 864 + fn list_dir(&self, path: &Path) -> io::Result<Vec<PathBuf>> { 865 + self.inner.list_dir(path) 866 + } 867 + } 868 + 869 + #[test] 870 + fn scan_and_index_does_not_truncate_acked_block_on_transient_eio() { 871 + let tmp = tempfile::TempDir::new().unwrap(); 872 + let data_dir = tmp.path().join("data"); 873 + let index_dir = tmp.path().join("index"); 874 + std::fs::create_dir_all(&data_dir).unwrap(); 875 + std::fs::create_dir_all(&index_dir).unwrap(); 876 + 877 + let file_id = DataFileId::new(0); 878 + let file_path = data_dir.join(format!("{file_id}.{DATA_FILE_EXTENSION}")); 879 + 880 + let setup = RealIO::new(); 881 + let fd = setup.open(&file_path, OpenOptions::read_write()).unwrap(); 882 + let mut header = [0u8; BLOCK_HEADER_SIZE]; 883 + header[..4].copy_from_slice(&BLOCK_MAGIC); 884 + header[4] = BLOCK_FORMAT_VERSION; 885 + setup.write_all_at(fd, 0, &header).unwrap(); 886 + 887 + let cid_a = [0xAAu8; CID_SIZE]; 888 + let data_a = vec![1u8; 64]; 889 + let block_a_offset = BlockOffset::new(BLOCK_HEADER_SIZE as u64); 890 + let len_a = 891 + encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap(); 892 + 893 + let block_b_offset_raw = BLOCK_HEADER_SIZE as u64 + len_a; 894 + let block_b_offset = BlockOffset::new(block_b_offset_raw); 895 + let cid_b = [0xBBu8; CID_SIZE]; 896 + let data_b = vec![2u8; 64]; 897 + let len_b = encode_block_record(&setup, fd, block_b_offset, &cid_b, &data_b).unwrap(); 898 + 899 + setup.sync(fd).unwrap(); 900 + setup.close(fd).unwrap(); 901 + drop(setup); 902 + 903 + let total_size = block_b_offset_raw + len_b; 904 + assert_eq!(std::fs::metadata(&file_path).unwrap().len(), total_size); 905 + 906 + let wrapper = EioOnReadAtRange { 907 + inner: RealIO::new(), 908 + target_path: file_path.clone(), 909 + target_min: block_b_offset_raw, 910 + target_max: block_b_offset_raw + (BLOCK_RECORD_OVERHEAD as u64) - 1, 911 + fired: AtomicBool::new(false), 912 + fd_paths: Mutex::new(HashMap::new()), 913 + }; 914 + 915 + let index = BlockIndex::open(&index_dir).unwrap(); 916 + 917 + let result = TranquilBlockStore::<EioOnReadAtRange>::replay_single_file( 918 + &wrapper, 919 + &data_dir, 920 + &index, 921 + file_id, 922 + BlockOffset::new(BLOCK_HEADER_SIZE as u64), 923 + ); 924 + 925 + assert!( 926 + result.is_err(), 927 + "replay must surface transient EIO instead of silently truncating" 928 + ); 929 + 930 + let post_size = std::fs::metadata(&file_path).unwrap().len(); 931 + assert_eq!( 932 + post_size, total_size, 933 + "scan truncated durable acked block past EIO point: expected {total_size} bytes, got {post_size}" 934 + ); 935 + } 936 + 937 + fn instant_policy(max_attempts: u8) -> OpenRetryPolicy { 938 + OpenRetryPolicy { 939 + max_attempts: NonZeroU8::new(max_attempts).expect("max_attempts must be nonzero"), 940 + initial_backoff: Duration::ZERO, 941 + max_backoff: Duration::ZERO, 942 + } 943 + } 944 + 945 + #[test] 946 + fn retry_with_backoff_succeeds_on_first_attempt() { 947 + let calls = std::sync::atomic::AtomicUsize::new(0); 948 + let result = retry_with_backoff(instant_policy(5), &mut |_| { 949 + calls.fetch_add(1, Ordering::Relaxed); 950 + Ok::<u8, RepoError>(42) 951 + }); 952 + assert_eq!(result.expect("ok"), 42); 953 + assert_eq!(calls.load(Ordering::Relaxed), 1); 954 + } 955 + 956 + #[test] 957 + fn retry_with_backoff_recovers_after_transient_failures() { 958 + let calls = std::sync::atomic::AtomicUsize::new(0); 959 + let result = retry_with_backoff(instant_policy(5), &mut |_| { 960 + let n = calls.fetch_add(1, Ordering::Relaxed); 961 + if n >= 2 { 962 + Ok::<u8, RepoError>(7) 963 + } else { 964 + Err(RepoError::storage(io::Error::other("transient EIO"))) 965 + } 966 + }); 967 + assert_eq!(result.expect("ok"), 7); 968 + assert_eq!(calls.load(Ordering::Relaxed), 3); 969 + } 970 + 971 + #[test] 972 + fn retry_with_backoff_gives_up_after_max_attempts() { 973 + let calls = std::sync::atomic::AtomicUsize::new(0); 974 + let result: Result<u8, RepoError> = retry_with_backoff(instant_policy(3), &mut |_| { 975 + calls.fetch_add(1, Ordering::Relaxed); 976 + Err(RepoError::storage(io::Error::other("permanent EIO"))) 977 + }); 978 + assert!(result.is_err(), "expected exhaustion error"); 979 + assert_eq!(calls.load(Ordering::Relaxed), 3); 980 + } 981 + 982 + #[test] 983 + fn retry_with_backoff_passes_attempt_index_to_op() { 984 + let observed = std::sync::Mutex::new(Vec::<u8>::new()); 985 + let _result: Result<(), RepoError> = retry_with_backoff(instant_policy(4), &mut |attempt| { 986 + observed.lock().unwrap().push(attempt); 987 + Err(RepoError::storage(io::Error::other("EIO"))) 988 + }); 989 + assert_eq!(*observed.lock().unwrap(), vec![0, 1, 2, 3]); 990 + } 991 + }
+58
crates/tranquil-store/src/eventlog/writer.rs
··· 1196 1196 1197 1197 assert!(writer.rotate_if_needed().unwrap().is_none()); 1198 1198 } 1199 + 1200 + #[test] 1201 + fn sync_must_not_certify_durability_when_io_sync_silently_drops() { 1202 + use crate::sim::{FaultConfig, Probability}; 1203 + 1204 + let sim = Arc::new(SimulatedIO::new( 1205 + 0, 1206 + FaultConfig { 1207 + sync_failure_probability: Probability::new(1.0), 1208 + ..FaultConfig::none() 1209 + }, 1210 + )); 1211 + sim.set_pristine_mode(true); 1212 + 1213 + let mgr = Arc::new( 1214 + SegmentManager::new(Arc::clone(&sim), PathBuf::from("/segments"), 64 * 1024).unwrap(), 1215 + ); 1216 + 1217 + let mut writer = 1218 + EventLogWriter::open(Arc::clone(&mgr), DEFAULT_INDEX_INTERVAL, MAX_EVENT_PAYLOAD) 1219 + .unwrap(); 1220 + 1221 + sim.set_pristine_mode(false); 1222 + 1223 + writer 1224 + .append( 1225 + DidHash::from_did("did:plc:bug2"), 1226 + EventTypeTag::COMMIT, 1227 + b"bug2-payload".to_vec(), 1228 + ) 1229 + .unwrap(); 1230 + 1231 + assert!( 1232 + writer.sync().is_err(), 1233 + "sync must surface dropped fsync as an error" 1234 + ); 1235 + let claimed_synced = writer.synced_seq(); 1236 + assert_eq!( 1237 + claimed_synced.raw(), 1238 + 0, 1239 + "synced_seq must not advance past a failed sync" 1240 + ); 1241 + drop(writer); 1242 + 1243 + mgr.shutdown(); 1244 + sim.crash(); 1245 + sim.set_pristine_mode(true); 1246 + 1247 + let reopened = 1248 + EventLogWriter::open(Arc::clone(&mgr), DEFAULT_INDEX_INTERVAL, MAX_EVENT_PAYLOAD) 1249 + .unwrap(); 1250 + let actually_durable = reopened.current_seq(); 1251 + 1252 + assert!( 1253 + actually_durable >= claimed_synced, 1254 + "writer claimed sync through {claimed_synced} but post-crash recovery only reaches {actually_durable}" 1255 + ); 1256 + } 1199 1257 }
+84 -1
crates/tranquil-store/src/gauntlet/farm.rs
··· 1 1 use std::cell::RefCell; 2 2 use std::panic::{AssertUnwindSafe, catch_unwind}; 3 + use std::path::PathBuf; 3 4 use std::time::{Duration, Instant}; 4 5 5 6 use rayon::prelude::*; ··· 47 48 where 48 49 F: Fn(Seed) -> GauntletConfig + Sync + Send, 49 50 { 51 + run_many_timed_with_scratch_roots(make_config, &[], seeds) 52 + } 53 + 54 + pub fn run_many_timed_with_scratch_roots<F>( 55 + make_config: F, 56 + scratch_roots: &[PathBuf], 57 + seeds: impl IntoIterator<Item = Seed>, 58 + ) -> Vec<(GauntletReport, Duration)> 59 + where 60 + F: Fn(Seed) -> GauntletConfig + Sync + Send, 61 + { 50 62 let seeds: Vec<Seed> = seeds.into_iter().collect(); 51 63 seeds 52 64 .into_par_iter() 53 65 .map(|s| { 66 + let scratch = scratch_for_thread(scratch_roots, rayon::current_thread_index()); 54 67 let start = Instant::now(); 55 68 let outcome = catch_unwind(AssertUnwindSafe(|| { 56 69 let cfg = make_config(s); 57 - let gauntlet = Gauntlet::new(cfg).expect("build gauntlet"); 70 + let mut gauntlet = Gauntlet::new(cfg).expect("build gauntlet"); 71 + if let Some(root) = scratch { 72 + gauntlet = gauntlet.with_scratch_root(root); 73 + } 58 74 with_runtime(|rt| rt.block_on(gauntlet.run())) 59 75 })); 60 76 let report = outcome.unwrap_or_else(|payload| { ··· 66 82 .collect() 67 83 } 68 84 85 + fn scratch_for_thread(roots: &[PathBuf], thread_idx: Option<usize>) -> Option<PathBuf> { 86 + if roots.is_empty() { 87 + None 88 + } else { 89 + Some(roots[thread_idx.unwrap_or(0) % roots.len()].clone()) 90 + } 91 + } 92 + 69 93 fn panic_report(seed: Seed, payload: Box<dyn std::any::Any + Send>) -> GauntletReport { 70 94 let msg = payload 71 95 .downcast_ref::<&'static str>() ··· 84 108 ops: OpStream::empty(), 85 109 } 86 110 } 111 + 112 + #[cfg(test)] 113 + mod tests { 114 + use super::*; 115 + 116 + #[test] 117 + fn scratch_for_thread_returns_none_when_roots_empty() { 118 + assert!(scratch_for_thread(&[], Some(0)).is_none()); 119 + assert!(scratch_for_thread(&[], Some(7)).is_none()); 120 + assert!(scratch_for_thread(&[], None).is_none()); 121 + } 122 + 123 + #[test] 124 + fn scratch_for_thread_round_robins_across_roots() { 125 + let roots = vec![ 126 + PathBuf::from("/scratch/a"), 127 + PathBuf::from("/scratch/b"), 128 + PathBuf::from("/scratch/c"), 129 + ]; 130 + let assigned: Vec<PathBuf> = (0..7) 131 + .map(|i| scratch_for_thread(&roots, Some(i)).expect("scratch path")) 132 + .collect(); 133 + assert_eq!( 134 + assigned, 135 + vec![ 136 + PathBuf::from("/scratch/a"), 137 + PathBuf::from("/scratch/b"), 138 + PathBuf::from("/scratch/c"), 139 + PathBuf::from("/scratch/a"), 140 + PathBuf::from("/scratch/b"), 141 + PathBuf::from("/scratch/c"), 142 + PathBuf::from("/scratch/a"), 143 + ] 144 + ); 145 + } 146 + 147 + #[test] 148 + fn scratch_for_thread_with_single_root_returns_same_path() { 149 + let roots = vec![PathBuf::from("/scratch/only")]; 150 + (0..5).for_each(|i| { 151 + assert_eq!( 152 + scratch_for_thread(&roots, Some(i)), 153 + Some(PathBuf::from("/scratch/only")) 154 + ); 155 + }); 156 + } 157 + 158 + #[test] 159 + fn scratch_for_thread_falls_back_to_root_zero_outside_pool() { 160 + let roots = vec![ 161 + PathBuf::from("/scratch/a"), 162 + PathBuf::from("/scratch/b"), 163 + ]; 164 + assert_eq!( 165 + scratch_for_thread(&roots, None), 166 + Some(PathBuf::from("/scratch/a")) 167 + ); 168 + } 169 + }
+175 -34
crates/tranquil-store/src/gauntlet/runner.rs
··· 178 178 179 179 pub struct Gauntlet { 180 180 config: GauntletConfig, 181 + scratch_root: Option<PathBuf>, 181 182 } 182 183 183 184 #[derive(Debug, thiserror::Error)] ··· 185 186 186 187 impl Gauntlet { 187 188 pub fn new(config: GauntletConfig) -> Result<Self, GauntletBuildError> { 188 - Ok(Self { config }) 189 + Ok(Self { 190 + config, 191 + scratch_root: None, 192 + }) 193 + } 194 + 195 + pub fn with_scratch_root(mut self, root: PathBuf) -> Self { 196 + self.scratch_root = Some(root); 197 + self 189 198 } 190 199 191 200 pub fn generate_ops(&self) -> OpStream { ··· 211 220 let ops_counter = Arc::new(AtomicUsize::new(0)); 212 221 let op_errors_counter = Arc::new(AtomicUsize::new(0)); 213 222 let restarts_counter = Arc::new(AtomicUsize::new(0)); 223 + let scratch_root = self.scratch_root; 214 224 let fut: std::pin::Pin<Box<dyn std::future::Future<Output = GauntletReport> + Send>> = 215 225 match self.config.io { 216 226 IoBackend::Real => Box::pin(run_inner_real( ··· 219 229 ops_counter.clone(), 220 230 op_errors_counter.clone(), 221 231 restarts_counter.clone(), 232 + scratch_root, 222 233 )), 223 234 IoBackend::RealWithFlaky { flaky } => Box::pin(run_inner_real_with_flaky( 224 235 self.config, ··· 269 280 ops_counter: Arc<AtomicUsize>, 270 281 op_errors_counter: Arc<AtomicUsize>, 271 282 restarts_counter: Arc<AtomicUsize>, 283 + scratch_root: Option<PathBuf>, 272 284 ) -> GauntletReport { 273 - let dir = tempfile::TempDir::new().expect("tempdir"); 285 + let dir = match scratch_root.as_deref() { 286 + Some(parent) => tempfile::TempDir::new_in(parent).expect("tempdir in scratch root"), 287 + None => tempfile::TempDir::new().expect("tempdir"), 288 + }; 274 289 let root = dir.path().to_path_buf(); 275 290 let report = run_inner_real_on_root( 276 291 config, ··· 519 534 let mut oracle = Oracle::new(); 520 535 let mut violations: Vec<InvariantViolation> = Vec::new(); 521 536 522 - let mut harness: Option<Harness<S>> = match open(0) { 523 - Ok(h) => Some(h), 524 - Err(e) => { 525 - return GauntletReport { 526 - seed: config.seed, 527 - ops_executed: OpsExecuted(0), 528 - op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 529 - restarts: RestartCount(0), 530 - violations: vec![InvariantViolation { 531 - invariant: "OpenStore", 532 - detail: format!("initial open: {e}"), 533 - }], 534 - ops: OpStream::empty(), 535 - }; 536 - } 537 - }; 537 + let mut harness: Option<Harness<S>> = 538 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 539 + { 540 + Ok(h) => Some(h), 541 + Err(e) => { 542 + return GauntletReport { 543 + seed: config.seed, 544 + ops_executed: OpsExecuted(0), 545 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 546 + restarts: RestartCount(0), 547 + violations: vec![InvariantViolation { 548 + invariant: "OpenStore", 549 + detail: format!("initial open: {e}"), 550 + }], 551 + ops: OpStream::empty(), 552 + }; 553 + } 554 + }; 538 555 let mut root: Option<Cid> = None; 539 556 let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); 540 557 let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); ··· 1582 1599 let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); 1583 1600 let chunks = compute_chunks(config.restart_policy, total_ops, &mut restart_rng); 1584 1601 1585 - let mut harness: Option<Harness<S>> = match open(0) { 1586 - Ok(h) => Some(h), 1587 - Err(e) => { 1588 - return GauntletReport { 1589 - seed: config.seed, 1590 - ops_executed: OpsExecuted(0), 1591 - op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 1592 - restarts: RestartCount(0), 1593 - violations: vec![InvariantViolation { 1594 - invariant: "OpenStore", 1595 - detail: format!("initial open: {e}"), 1596 - }], 1597 - ops: OpStream::empty(), 1598 - }; 1599 - } 1600 - }; 1602 + let mut harness: Option<Harness<S>> = 1603 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 1604 + { 1605 + Ok(h) => Some(h), 1606 + Err(e) => { 1607 + return GauntletReport { 1608 + seed: config.seed, 1609 + ops_executed: OpsExecuted(0), 1610 + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), 1611 + restarts: RestartCount(0), 1612 + violations: vec![InvariantViolation { 1613 + invariant: "OpenStore", 1614 + detail: format!("initial open: {e}"), 1615 + }], 1616 + ops: OpStream::empty(), 1617 + }; 1618 + } 1619 + }; 1601 1620 let mut root: Option<Cid> = None; 1602 1621 let mut oracle = Oracle::new(); 1603 1622 let mut halt_ops = false; ··· 1806 1825 ops: OpStream::empty(), 1807 1826 } 1808 1827 } 1828 + 1829 + #[cfg(test)] 1830 + mod tests { 1831 + use super::*; 1832 + 1833 + fn minimal_config() -> GauntletConfig { 1834 + GauntletConfig { 1835 + seed: Seed(0), 1836 + io: IoBackend::Real, 1837 + workload: WorkloadModel::default(), 1838 + op_count: OpCount(0), 1839 + invariants: InvariantSet::EMPTY, 1840 + limits: RunLimits { 1841 + max_wall_ms: Some(WallMs(30_000)), 1842 + }, 1843 + restart_policy: RestartPolicy::Never, 1844 + store: StoreConfig { 1845 + max_file_size: MaxFileSize(8 * 1024), 1846 + group_commit: GroupCommitConfig::default(), 1847 + shard_count: ShardCount(1), 1848 + }, 1849 + eventlog: None, 1850 + writer_concurrency: WriterConcurrency(1), 1851 + } 1852 + } 1853 + 1854 + fn flaky_open( 1855 + attempts: Arc<AtomicUsize>, 1856 + sim: Arc<SimulatedIO>, 1857 + store_cfg: BlockStoreConfig, 1858 + ) -> impl FnMut(usize) -> Result<Harness<Arc<SimulatedIO>>, String> + Send + 'static { 1859 + move |_attempt: usize| -> Result<Harness<Arc<SimulatedIO>>, String> { 1860 + let n = attempts.fetch_add(1, Ordering::Relaxed); 1861 + if n == 0 { 1862 + return Err("simulated EIO on initial open".to_string()); 1863 + } 1864 + let factory_sim = Arc::clone(&sim); 1865 + let make_io = move || Arc::clone(&factory_sim); 1866 + TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(store_cfg.clone(), make_io) 1867 + .map(|s| Harness { 1868 + store: Arc::new(s), 1869 + eventlog: None, 1870 + }) 1871 + .map_err(|e| e.to_string()) 1872 + } 1873 + } 1874 + 1875 + #[tokio::test] 1876 + async fn run_inner_generic_retries_initial_open_on_transient_io_error() { 1877 + let dir = tempfile::TempDir::new().expect("tempdir"); 1878 + let cfg = minimal_config(); 1879 + let store_cfg = blockstore_config(dir.path(), &cfg.store); 1880 + let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::pristine(0)); 1881 + let attempts = Arc::new(AtomicUsize::new(0)); 1882 + 1883 + let report = run_inner_generic::<Arc<SimulatedIO>, _, _>( 1884 + cfg, 1885 + OpStream::empty(), 1886 + Arc::new(AtomicUsize::new(0)), 1887 + Arc::new(AtomicUsize::new(0)), 1888 + Arc::new(AtomicUsize::new(0)), 1889 + flaky_open(Arc::clone(&attempts), Arc::clone(&sim), store_cfg), 1890 + || {}, 1891 + true, 1892 + Duration::ZERO, 1893 + ) 1894 + .await; 1895 + 1896 + let opens: Vec<&InvariantViolation> = report 1897 + .violations 1898 + .iter() 1899 + .filter(|v| v.invariant == "OpenStore") 1900 + .collect(); 1901 + assert!( 1902 + opens.is_empty(), 1903 + "expected initial open to retry, got OpenStore violations: {opens:?}" 1904 + ); 1905 + let total = attempts.load(Ordering::Relaxed); 1906 + assert!( 1907 + total >= 2, 1908 + "expected at least one retry after first failure, attempts={total}" 1909 + ); 1910 + } 1911 + 1912 + #[tokio::test] 1913 + async fn run_inner_generic_concurrent_retries_initial_open_on_transient_io_error() { 1914 + let dir = tempfile::TempDir::new().expect("tempdir"); 1915 + let mut cfg = minimal_config(); 1916 + cfg.writer_concurrency = WriterConcurrency(2); 1917 + let store_cfg = blockstore_config(dir.path(), &cfg.store); 1918 + let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::pristine(0)); 1919 + let attempts = Arc::new(AtomicUsize::new(0)); 1920 + 1921 + let report = run_inner_generic_concurrent::<Arc<SimulatedIO>, _, _>( 1922 + cfg, 1923 + OpStream::empty(), 1924 + Arc::new(AtomicUsize::new(0)), 1925 + Arc::new(AtomicUsize::new(0)), 1926 + Arc::new(AtomicUsize::new(0)), 1927 + flaky_open(Arc::clone(&attempts), Arc::clone(&sim), store_cfg), 1928 + || {}, 1929 + true, 1930 + Duration::ZERO, 1931 + ) 1932 + .await; 1933 + 1934 + let opens: Vec<&InvariantViolation> = report 1935 + .violations 1936 + .iter() 1937 + .filter(|v| v.invariant == "OpenStore") 1938 + .collect(); 1939 + assert!( 1940 + opens.is_empty(), 1941 + "expected initial open to retry, got OpenStore violations: {opens:?}" 1942 + ); 1943 + let total = attempts.load(Ordering::Relaxed); 1944 + assert!( 1945 + total >= 2, 1946 + "expected at least one retry after first failure, attempts={total}" 1947 + ); 1948 + } 1949 + }
+12 -6
crates/tranquil-store/src/sim.rs
··· 692 692 return Err(io::Error::other("simulated EIO on sync")); 693 693 } 694 694 695 - let sync_succeeded = !state.should_fault(seed, fault.sync_failure_probability); 696 - let poison_after = sync_succeeded 697 - && state.should_fault(seed, fault.delayed_io_error_probability); 695 + if state.should_fault(seed, fault.sync_failure_probability) { 696 + state.op_log.push(OpRecord::Sync { 697 + fd: id, 698 + succeeded: false, 699 + }); 700 + return Err(io::Error::other("simulated dropped fsync")); 701 + } 702 + 703 + let poison_after = state.should_fault(seed, fault.delayed_io_error_probability); 698 704 let reorder_window = fault.sync_reorder_window.0 as usize; 699 705 700 - let evicted = if sync_succeeded && reorder_window > 0 { 706 + let evicted = if reorder_window > 0 { 701 707 let snapshot = state.storage.get(&sid).unwrap().buffered.clone(); 702 708 state.pending_syncs.push_back(PendingSync { 703 709 storage_id: sid, ··· 723 729 724 730 let storage = state.storage.get_mut(&sid).unwrap(); 725 731 726 - if sync_succeeded && reorder_window == 0 { 732 + if reorder_window == 0 { 727 733 storage.durable = storage.buffered.clone(); 728 734 } 729 735 if poison_after { ··· 732 738 733 739 state.op_log.push(OpRecord::Sync { 734 740 fd: id, 735 - succeeded: sync_succeeded, 741 + succeeded: true, 736 742 }); 737 743 Ok(()) 738 744 }
+112
crates/tranquil-store/tests/gauntlet_smoke.rs
··· 460 460 assert_clean(&report); 461 461 assert!(report.restarts.0 >= 1); 462 462 } 463 + 464 + #[tokio::test] 465 + async fn torn_pages_only_completes_within_budget() { 466 + let cfg = GauntletConfig { 467 + seed: Seed(0), 468 + io: IoBackend::Simulated { 469 + fault: FaultConfig::torn_pages_only(), 470 + }, 471 + workload: WorkloadModel { 472 + weights: OpWeights { 473 + add: 80, 474 + delete: 10, 475 + compact: 5, 476 + checkpoint: 5, 477 + ..OpWeights::default() 478 + }, 479 + size_distribution: SizeDistribution::Fixed(ValueBytes(128)), 480 + collections: vec![ 481 + CollectionName("app.bsky.feed.post".to_string()), 482 + CollectionName("app.bsky.feed.like".to_string()), 483 + ], 484 + key_space: KeySpaceSize(500), 485 + did_space: DidSpaceSize(32), 486 + retention_max_secs: RetentionMaxSecs(3600), 487 + }, 488 + op_count: OpCount(2_000), 489 + invariants: InvariantSet::REFCOUNT_CONSERVATION 490 + | InvariantSet::REACHABILITY 491 + | InvariantSet::ACKED_WRITE_PERSISTENCE 492 + | InvariantSet::READ_AFTER_WRITE 493 + | InvariantSet::RESTART_IDEMPOTENT, 494 + limits: RunLimits { 495 + max_wall_ms: Some(WallMs(60_000)), 496 + }, 497 + restart_policy: RestartPolicy::CrashAtSyscall(OpInterval(500)), 498 + store: StoreConfig { 499 + max_file_size: MaxFileSize(16 * 1024), 500 + group_commit: GroupCommitConfig { 501 + verify_persisted_blocks: true, 502 + ..GroupCommitConfig::default() 503 + }, 504 + shard_count: ShardCount(1), 505 + }, 506 + eventlog: None, 507 + writer_concurrency: WriterConcurrency(1), 508 + }; 509 + let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; 510 + let budget_violations: Vec<&str> = report 511 + .violations 512 + .iter() 513 + .filter(|v| v.invariant == "WallClockBudget") 514 + .map(|v| v.detail.as_str()) 515 + .collect(); 516 + assert!( 517 + budget_violations.is_empty(), 518 + "torn-pages exceeded budget: {budget_violations:?}; ops_executed={}", 519 + report.ops_executed.0 520 + ); 521 + assert_eq!( 522 + report.ops_executed.0, 2_000, 523 + "expected all ops to execute under torn-pages-only faults" 524 + ); 525 + } 526 + 527 + #[tokio::test] 528 + async fn real_io_gauntlet_uses_scratch_root_for_tempdir() { 529 + let scratch = tempfile::TempDir::new().expect("scratch dir"); 530 + let scratch_path = scratch.path().to_path_buf(); 531 + let cfg = fast_sanity_config(Seed(11)); 532 + let report = Gauntlet::new(cfg) 533 + .expect("build gauntlet") 534 + .with_scratch_root(scratch_path.clone()) 535 + .run() 536 + .await; 537 + assert_clean(&report); 538 + let entries: Vec<std::path::PathBuf> = std::fs::read_dir(&scratch_path) 539 + .expect("read scratch") 540 + .filter_map(|e| e.ok().map(|e| e.path())) 541 + .collect(); 542 + assert!( 543 + entries.is_empty(), 544 + "scratch root must be empty after gauntlet drop, found: {entries:?}" 545 + ); 546 + } 547 + 548 + #[test] 549 + fn farm_run_many_timed_with_scratch_roots_honors_assignment() { 550 + let scratch = tempfile::TempDir::new().expect("scratch dir"); 551 + let root_a = scratch.path().join("a"); 552 + let root_b = scratch.path().join("b"); 553 + std::fs::create_dir_all(&root_a).expect("mkdir a"); 554 + std::fs::create_dir_all(&root_b).expect("mkdir b"); 555 + let roots = vec![root_a.clone(), root_b.clone()]; 556 + let reports = farm::run_many_timed_with_scratch_roots( 557 + |seed| fast_sanity_config(seed), 558 + &roots, 559 + (0..2).map(Seed), 560 + ); 561 + assert_eq!(reports.len(), 2); 562 + reports.iter().for_each(|(r, _)| assert_clean(r)); 563 + [&root_a, &root_b].iter().for_each(|root| { 564 + let leftover: Vec<std::path::PathBuf> = std::fs::read_dir(root) 565 + .expect("read scratch root") 566 + .filter_map(|e| e.ok().map(|e| e.path())) 567 + .collect(); 568 + assert!( 569 + leftover.is_empty(), 570 + "scratch root {} must be empty after farm completes, found: {leftover:?}", 571 + root.display() 572 + ); 573 + }); 574 + }
+17 -5
crates/tranquil-store/tests/sim_eventlog.rs
··· 1025 1025 1026 1026 #[test] 1027 1027 fn sync_synced_seq_must_match_durable_valid_prefix() { 1028 - sim_seed_range().into_par_iter().for_each(|seed| { 1028 + let asserted = std::sync::atomic::AtomicU64::new(0); 1029 + let range = sim_seed_range(); 1030 + let total = range.end - range.start; 1031 + range.into_par_iter().for_each(|seed| { 1029 1032 let fault_config = FaultConfig { 1030 1033 partial_write_probability: Probability::new(0.05), 1031 1034 torn_page_probability: Probability::new(0.01), ··· 1037 1040 let sim = SimulatedIO::new(seed, fault_config); 1038 1041 let mgr = setup_manager(sim, 64 * 1024); 1039 1042 1040 - let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD) 1041 - .unwrap_or_else(|e| panic!("seed {seed}: open writer failed: {e}")); 1043 + let Ok(mut writer) = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD) else { 1044 + return; 1045 + }; 1042 1046 1043 1047 let event_count = 10u64; 1044 1048 (1..=event_count).for_each(|i| { ··· 1067 1071 1068 1072 let durable_max = valid.last().map(|e| e.seq.raw()).unwrap_or(0); 1069 1073 1074 + asserted.fetch_add(1, std::sync::atomic::Ordering::Relaxed); 1070 1075 assert!( 1071 1076 synced_through <= durable_max, 1072 - "seed {seed}: sync acked seq {synced_through} but durable valid prefix only reaches {durable_max} \ 1073 - (events written: {event_count}, valid_prefix.len()={})", 1077 + "seed {seed}: sync acked seq {synced_through} but durable valid prefix only reaches {durable_max}, events written: {event_count}, valid_prefix.len()={}", 1074 1078 valid.len() 1075 1079 ); 1076 1080 }); 1081 + 1082 + let asserted = asserted.load(std::sync::atomic::Ordering::Relaxed); 1083 + if total >= 50 { 1084 + assert!( 1085 + asserted * 2 >= total, 1086 + "fewer than half of {total} seeds reached the durability assertion: {asserted}" 1087 + ); 1088 + } 1077 1089 } 1078 1090 1079 1091 #[test]