Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): checkpoint-hint race & missing dedup hints

Lewis: May this revision serve well! <lu5a@proton.me>

+292 -13
+1
crates/tranquil-store/Cargo.toml
··· 52 52 rand = { workspace = true } 53 53 tikv-jemallocator = "0.6" 54 54 tracing-subscriber = { workspace = true, features = ["env-filter"] } 55 + libc = "0.2" 55 56 56 57 [[bench]] 57 58 name = "blockstore"
+1 -2
crates/tranquil-store/src/blockstore/compaction.rs
··· 93 93 Err(e) 94 94 } 95 95 Ok((new_size, live_count, dead_count)) => { 96 - let positions = hint_positions.snapshot(); 97 - if let Err(e) = index.write_checkpoint(epoch.current(), &positions) { 96 + if let Err(e) = index.write_checkpoint(epoch.current(), hint_positions) { 98 97 tracing::warn!(error = %e, "pre-delete checkpoint failed during compaction"); 99 98 } 100 99
+16 -9
crates/tranquil-store/src/blockstore/group_commit.rs
··· 813 813 if !elapsed && !threshold { 814 814 return; 815 815 } 816 - let positions = hint_positions.snapshot(); 817 - match index.write_checkpoint(epoch.current(), &positions) { 816 + match index.write_checkpoint(epoch.current(), hint_positions) { 818 817 Ok(()) => { 819 818 *last_checkpoint = std::time::Instant::now(); 820 819 *writes_since_checkpoint = 0; ··· 831 830 epoch: &EpochCounter, 832 831 hint_positions: &ShardHintPositions, 833 832 ) { 834 - let positions = hint_positions.snapshot(); 835 - match index.write_checkpoint(epoch.current(), &positions) { 833 + match index.write_checkpoint(epoch.current(), hint_positions) { 836 834 Ok(()) => tracing::debug!("shutdown checkpoint written"), 837 835 Err(e) => tracing::warn!(error = %e, "shutdown checkpoint failed"), 838 836 } ··· 924 922 if let Ok((ref dedup, _)) = result { 925 923 writes_since_checkpoint = 926 924 writes_since_checkpoint.saturating_add(dedup.len() as u64); 927 - ctx.hint_positions 928 - .update(ctx.shard_id, state.file_id, state.hint_position); 929 925 } 930 926 931 927 dispatch_responses(drain.entries, result.map(|(dedup, _proof)| dedup)); ··· 1024 1020 1025 1021 if let Ok((ref _dedup, ref proof)) = result { 1026 1022 run_post_sync_hook(post_sync_hook, proof); 1027 - ctx.hint_positions 1028 - .update(ctx.shard_id, state.file_id, state.hint_position); 1029 1023 } 1030 1024 1031 1025 dispatch_responses(entries, result.map(|(dedup, _proof)| dedup)); ··· 1102 1096 let location = match dedup.get(cid_bytes) { 1103 1097 Some(&loc) => { 1104 1098 dedup_hits = dedup_hits.saturating_add(1); 1099 + hint_writer.append_hint(cid_bytes, &loc)?; 1105 1100 loc 1106 1101 } 1107 1102 None => { ··· 1194 1189 }; 1195 1190 let t = std::time::Instant::now(); 1196 1191 index 1197 - .batch_put(&index_entries, &all_decrements, cursor, current_epoch, now) 1192 + .batch_put_and_advance_position( 1193 + &index_entries, 1194 + &all_decrements, 1195 + cursor, 1196 + current_epoch, 1197 + now, 1198 + super::hash_index::PositionUpdate { 1199 + hint_positions: &ctx.hint_positions, 1200 + shard_id: ctx.shard_id, 1201 + file_id: state.file_id, 1202 + offset: state.hint_position, 1203 + }, 1204 + ) 1198 1205 .map_err(CommitError::from)?; 1199 1206 let index_nanos = t.elapsed().as_nanos() as u64; 1200 1207
+50 -1
crates/tranquil-store/src/blockstore/hash_index.rs
··· 5 5 use parking_lot::RwLock; 6 6 7 7 use super::data_file::CID_SIZE; 8 + use super::group_commit::ShardHintPositions; 8 9 use super::types::{ 9 10 BlockLength, BlockLocation, BlockOffset, CidBytes, CollectionResult, CommitEpoch, DataFileId, 10 - HintOffset, IndexEntry, LivenessInfo, RefCount, WallClockMs, WriteCursor, 11 + HintOffset, IndexEntry, LivenessInfo, RefCount, ShardId, WallClockMs, WriteCursor, 11 12 }; 13 + 14 + pub struct PositionUpdate<'a> { 15 + pub hint_positions: &'a ShardHintPositions, 16 + pub shard_id: ShardId, 17 + pub file_id: DataFileId, 18 + pub offset: HintOffset, 19 + } 12 20 13 21 const EMPTY_CID: [u8; CID_SIZE] = [0u8; CID_SIZE]; 14 22 ··· 1198 1206 epoch: CommitEpoch, 1199 1207 now: WallClockMs, 1200 1208 ) -> Result<(), BlockIndexError> { 1209 + self.batch_put_inner(entries, decrements, cursor, epoch, now, None) 1210 + } 1211 + 1212 + pub fn batch_put_and_advance_position( 1213 + &self, 1214 + entries: &[([u8; CID_SIZE], BlockLocation)], 1215 + decrements: &[[u8; CID_SIZE]], 1216 + cursor: WriteCursor, 1217 + epoch: CommitEpoch, 1218 + now: WallClockMs, 1219 + position_update: PositionUpdate<'_>, 1220 + ) -> Result<(), BlockIndexError> { 1221 + self.batch_put_inner(entries, decrements, cursor, epoch, now, Some(position_update)) 1222 + } 1223 + 1224 + fn batch_put_inner( 1225 + &self, 1226 + entries: &[([u8; CID_SIZE], BlockLocation)], 1227 + decrements: &[[u8; CID_SIZE]], 1228 + cursor: WriteCursor, 1229 + epoch: CommitEpoch, 1230 + now: WallClockMs, 1231 + position_update: Option<PositionUpdate<'_>>, 1232 + ) -> Result<(), BlockIndexError> { 1201 1233 let mut table = self.table.write(); 1202 1234 1203 1235 entries.iter().try_for_each(|(cid, location)| { ··· 1217 1249 }); 1218 1250 1219 1251 table.set_write_cursor(cursor); 1252 + 1253 + if let Some(pos) = position_update { 1254 + pos.hint_positions 1255 + .update(pos.shard_id, pos.file_id, pos.offset); 1256 + } 1257 + 1220 1258 Ok(()) 1221 1259 } 1222 1260 ··· 1418 1456 } 1419 1457 1420 1458 pub fn write_checkpoint( 1459 + &self, 1460 + epoch: CommitEpoch, 1461 + hint_positions: &ShardHintPositions, 1462 + ) -> io::Result<()> { 1463 + let _guard = self.checkpoint_lock.lock(); 1464 + let table = self.table.read(); 1465 + let positions = hint_positions.snapshot(); 1466 + write_checkpoint_ab(&table, &self.index_dir, epoch, &positions) 1467 + } 1468 + 1469 + pub fn write_checkpoint_with_positions( 1421 1470 &self, 1422 1471 epoch: CommitEpoch, 1423 1472 positions: &CheckpointPositions,
+223
crates/tranquil-store/tests/checkpoint_race.rs
··· 1 + mod common; 2 + 3 + use std::io; 4 + use std::sync::Arc; 5 + use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 6 + 7 + use tranquil_store::blockstore::{ 8 + BlockStoreConfig, BlocksSynced, CidBytes, GroupCommitConfig, TranquilBlockStore, 9 + }; 10 + use tranquil_store::PostBlockstoreHook; 11 + 12 + struct SlowHook; 13 + 14 + impl PostBlockstoreHook for SlowHook { 15 + fn on_blocks_synced(&self, _proof: &BlocksSynced) -> io::Result<()> { 16 + std::thread::sleep(std::time::Duration::from_millis(1)); 17 + Ok(()) 18 + } 19 + } 20 + 21 + fn refcount(store: &TranquilBlockStore, cid: &CidBytes) -> Option<u32> { 22 + store.block_index().get(cid).map(|e| e.refcount.raw()) 23 + } 24 + 25 + fn race_config(dir: &std::path::Path) -> BlockStoreConfig { 26 + BlockStoreConfig { 27 + data_dir: dir.join("data"), 28 + index_dir: dir.join("index"), 29 + max_file_size: 256 * 1024, 30 + group_commit: GroupCommitConfig { 31 + checkpoint_interval_ms: 10, 32 + checkpoint_write_threshold: 20, 33 + ..GroupCommitConfig::default() 34 + }, 35 + shard_count: 4, 36 + } 37 + } 38 + 39 + fn cid_for(shard: u8, seq: u32) -> CidBytes { 40 + let mut cid = [0u8; 36]; 41 + cid[0] = 0x01; 42 + cid[1] = 0x71; 43 + cid[2] = 0x12; 44 + cid[3] = 0x20; 45 + cid[4] = shard; 46 + cid[8..12].copy_from_slice(&seq.to_le_bytes()); 47 + (12..36).for_each(|i| cid[i] = (seq as u8).wrapping_add(i as u8)); 48 + cid 49 + } 50 + 51 + fn write_phase(base: &std::path::Path, use_hook: bool) -> Vec<CidBytes> { 52 + let config = race_config(base); 53 + let hook: Option<Arc<dyn PostBlockstoreHook>> = use_hook.then(|| Arc::new(SlowHook) as _); 54 + let store = Arc::new(TranquilBlockStore::open_with_hook(config, hook).unwrap()); 55 + 56 + let running = Arc::new(AtomicBool::new(true)); 57 + let total_cycles = Arc::new(AtomicU64::new(0)); 58 + 59 + let writers: Vec<_> = (0..4u8) 60 + .map(|shard| { 61 + let store = Arc::clone(&store); 62 + let running = Arc::clone(&running); 63 + let total_cycles = Arc::clone(&total_cycles); 64 + std::thread::spawn(move || { 65 + let mut targets = Vec::new(); 66 + let mut seq = 0u32; 67 + while running.load(Ordering::Relaxed) { 68 + let cid = cid_for(shard, seq); 69 + store 70 + .put_blocks_blocking(vec![(cid, vec![shard; 60])]) 71 + .unwrap(); 72 + store 73 + .put_blocks_blocking(vec![(cid, vec![shard; 60])]) 74 + .unwrap(); 75 + store 76 + .apply_commit_blocking(vec![], vec![cid]) 77 + .unwrap(); 78 + targets.push(cid); 79 + seq += 1; 80 + total_cycles.fetch_add(1, Ordering::Relaxed); 81 + } 82 + targets 83 + }) 84 + }) 85 + .collect(); 86 + 87 + while total_cycles.load(Ordering::Relaxed) < 500 { 88 + std::thread::yield_now(); 89 + } 90 + 91 + running.store(false, Ordering::Relaxed); 92 + 93 + let all_targets: Vec<CidBytes> = writers 94 + .into_iter() 95 + .flat_map(|w| w.join().unwrap()) 96 + .collect(); 97 + 98 + all_targets.iter().for_each(|cid| { 99 + assert_eq!(refcount(&store, cid), Some(1), "pre-crash sanity"); 100 + }); 101 + 102 + let store = Arc::try_unwrap(store).ok().unwrap(); 103 + std::mem::forget(store); 104 + 105 + all_targets 106 + } 107 + 108 + fn verify_phase(base: &std::path::Path, targets: &[CidBytes]) -> usize { 109 + let config = race_config(base); 110 + let store = TranquilBlockStore::open(config).unwrap(); 111 + let bad = targets 112 + .iter() 113 + .filter(|cid| refcount(&store, cid) != Some(1)) 114 + .count(); 115 + drop(store); 116 + bad 117 + } 118 + 119 + #[test] 120 + fn crash_recovery_preserves_refcounts() { 121 + common::with_runtime(|| { 122 + let mut corrupted = 0u32; 123 + let total = 20u32; 124 + 125 + (0..total).for_each(|_| { 126 + let dir = tempfile::TempDir::new().unwrap(); 127 + let exe = std::env::current_exe().unwrap(); 128 + let dir_str = dir.path().to_str().unwrap(); 129 + 130 + let output = std::process::Command::new(&exe) 131 + .arg("--exact") 132 + .arg("__crash_write_phase") 133 + .env("CRASH_TEST_DIR", dir_str) 134 + .env("CRASH_TEST_HOOK", "0") 135 + .output() 136 + .unwrap(); 137 + 138 + assert!(output.status.success() || output.status.code() == Some(0)); 139 + 140 + let target_bytes = std::fs::read(dir.path().join("targets.bin")).unwrap(); 141 + let targets: Vec<CidBytes> = target_bytes 142 + .chunks_exact(36) 143 + .map(|chunk| { 144 + let mut cid = [0u8; 36]; 145 + cid.copy_from_slice(chunk); 146 + cid 147 + }) 148 + .collect(); 149 + 150 + if verify_phase(dir.path(), &targets) > 0 { 151 + corrupted += 1; 152 + } 153 + }); 154 + 155 + assert_eq!( 156 + corrupted, 0, 157 + "{corrupted}/{total} iterations had refcount corruption after crash recovery" 158 + ); 159 + }); 160 + } 161 + 162 + #[test] 163 + fn crash_with_slow_hook_preserves_refcounts() { 164 + common::with_runtime(|| { 165 + let mut corrupted = 0u32; 166 + let total = 20u32; 167 + 168 + (0..total).for_each(|_| { 169 + let dir = tempfile::TempDir::new().unwrap(); 170 + let exe = std::env::current_exe().unwrap(); 171 + let dir_str = dir.path().to_str().unwrap(); 172 + 173 + let output = std::process::Command::new(&exe) 174 + .arg("--exact") 175 + .arg("__crash_write_phase") 176 + .env("CRASH_TEST_DIR", dir_str) 177 + .env("CRASH_TEST_HOOK", "1") 178 + .output() 179 + .unwrap(); 180 + 181 + assert!(output.status.success() || output.status.code() == Some(0)); 182 + 183 + let target_bytes = std::fs::read(dir.path().join("targets.bin")).unwrap(); 184 + let targets: Vec<CidBytes> = target_bytes 185 + .chunks_exact(36) 186 + .map(|chunk| { 187 + let mut cid = [0u8; 36]; 188 + cid.copy_from_slice(chunk); 189 + cid 190 + }) 191 + .collect(); 192 + 193 + if verify_phase(dir.path(), &targets) > 0 { 194 + corrupted += 1; 195 + } 196 + }); 197 + 198 + assert_eq!( 199 + corrupted, 0, 200 + "{corrupted}/{total} iterations had refcount corruption after crash with slow hook" 201 + ); 202 + }); 203 + } 204 + 205 + #[test] 206 + fn __crash_write_phase() { 207 + let dir = match std::env::var("CRASH_TEST_DIR") { 208 + Ok(d) => d, 209 + Err(_) => return, 210 + }; 211 + let use_hook = std::env::var("CRASH_TEST_HOOK").map(|v| v == "1").unwrap_or(false); 212 + let base = std::path::Path::new(&dir); 213 + 214 + let rt = tokio::runtime::Runtime::new().unwrap(); 215 + let _guard = rt.enter(); 216 + 217 + let targets = write_phase(base, use_hook); 218 + 219 + let target_bytes: Vec<u8> = targets.iter().flat_map(|cid| cid.iter().copied()).collect(); 220 + std::fs::write(base.join("targets.bin"), &target_bytes).unwrap(); 221 + 222 + unsafe { libc::_exit(0) } 223 + }
+1 -1
crates/tranquil-store/tests/sim_blockstore.rs
··· 112 112 HintOffset::new(entries.len() as u64 * HINT_RECORD_SIZE as u64), 113 113 ); 114 114 index 115 - .write_checkpoint(CommitEpoch::zero(), &positions) 115 + .write_checkpoint_with_positions(CommitEpoch::zero(), &positions) 116 116 .unwrap(); 117 117 } 118 118