Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): preserve refcount in hint relocate records

Lewis: May this revision serve well! <lu5a@proton.me>

+304 -15
+8 -1
crates/tranquil-store/src/blockstore/compaction.rs
··· 162 162 } => match index.get(&cid_bytes) { 163 163 Some(e) if e.location.file_id == source_file_id && !e.refcount.is_zero() => { 164 164 let loc = writer.append_block(&cid_bytes, &data)?; 165 - hint_writer.append_relocate(&cid_bytes, loc.file_id, loc.offset, loc.length)?; 165 + hint_writer.append_relocate( 166 + &cid_bytes, 167 + loc.file_id, 168 + loc.offset, 169 + loc.length, 170 + e.refcount.raw(), 171 + )?; 166 172 relocations.push((cid_bytes, loc)); 167 173 live_count = live_count.saturating_add(1); 168 174 } ··· 187 193 loc.file_id, 188 194 loc.offset, 189 195 loc.length, 196 + e.refcount.raw(), 190 197 )?; 191 198 relocations.push((cid_bytes, loc)); 192 199 live_count = live_count.saturating_add(1);
+21 -12
crates/tranquil-store/src/blockstore/hash_index.rs
··· 378 378 &mut self, 379 379 cid: &[u8; CID_SIZE], 380 380 new_location: BlockLocation, 381 + refcount: RefCount, 381 382 ) -> Result<bool, CapacityExhausted> { 382 383 if is_empty(cid) { 383 384 return Ok(false); ··· 394 395 let slot_cid = self.slots[idx].cid; 395 396 396 397 if is_empty(&slot_cid) { 397 - self.slots[idx] = Slot::from_location(*cid, new_location); 398 + let mut slot = Slot::from_location(*cid, new_location); 399 + slot.refcount = refcount; 400 + self.slots[idx] = slot; 398 401 self.count += 1; 399 402 return Ok(false); 400 403 } ··· 411 414 let slot_dist = self.probe_distance(idx, slot_home); 412 415 if slot_dist < dist { 413 416 let mut displaced = Slot::from_location(*cid, new_location); 417 + displaced.refcount = refcount; 414 418 std::mem::swap(&mut self.slots[idx], &mut displaced); 415 419 self.count += 1; 416 420 self.relocate_displaced(displaced, idx, slot_dist); ··· 571 575 removals: &[CidBytes], 572 576 ) { 573 577 relocations.iter().for_each(|(cid, new_loc)| { 574 - if let Err(e) = self.relocate(cid, *new_loc) { 578 + if let Err(e) = self.relocate(cid, *new_loc, RefCount::one()) { 575 579 tracing::error!(?e, "capacity exhausted during compaction relocation"); 576 580 } 577 581 }); ··· 1267 1271 1268 1272 pub fn batch_relocate( 1269 1273 &self, 1270 - relocations: &[(CidBytes, BlockLocation)], 1274 + relocations: &[(CidBytes, BlockLocation, u32)], 1271 1275 ) -> Result<(), BlockIndexError> { 1272 1276 if relocations.is_empty() { 1273 1277 return Ok(()); 1274 1278 } 1275 1279 let mut table = self.table.write(); 1276 - relocations.iter().try_for_each(|(cid, location)| { 1277 - table 1278 - .relocate(cid, *location) 1279 - .map(|_| ()) 1280 - .map_err(|_| BlockIndexError::CapacityExhausted) 1281 - }) 1280 + relocations 1281 + .iter() 1282 + .try_for_each(|(cid, location, refcount)| { 1283 + table 1284 + .relocate(cid, *location, RefCount::new(*refcount)) 1285 + .map(|_| ()) 1286 + .map_err(|_| BlockIndexError::CapacityExhausted) 1287 + }) 1282 1288 } 1283 1289 1284 1290 pub fn batch_remove(&self, cids: &[CidBytes]) { ··· 1499 1505 file_id, 1500 1506 offset, 1501 1507 length, 1508 + refcount, 1502 1509 } => { 1503 1510 let loc = BlockLocation { 1504 1511 file_id, 1505 1512 offset, 1506 1513 length, 1507 1514 }; 1508 - table.relocate(&cid_bytes, loc).map_err(|_| { 1509 - io::Error::other("hash table capacity exhausted during rebuild") 1510 - })?; 1515 + table 1516 + .relocate(&cid_bytes, loc, RefCount::new(refcount)) 1517 + .map_err(|_| { 1518 + io::Error::other("hash table capacity exhausted during rebuild") 1519 + })?; 1511 1520 } 1512 1521 ReadHintRecord::Remove { cid_bytes } => { 1513 1522 let _ = table.remove(&cid_bytes);
+21 -2
crates/tranquil-store/src/blockstore/hint.rs
··· 78 78 write_hint_record(io, fd, write_offset, &record) 79 79 } 80 80 81 + const REFCOUNT_OFFSET: usize = 2; 82 + 81 83 pub(crate) fn encode_relocate_record<S: StorageIO>( 82 84 io: &S, 83 85 fd: FileId, ··· 86 88 file_id: DataFileId, 87 89 block_offset: BlockOffset, 88 90 length: BlockLength, 91 + refcount: u32, 89 92 ) -> io::Result<()> { 90 93 let mut record = [0u8; HINT_RECORD_SIZE]; 91 94 record[TYPE_OFFSET] = RECORD_TYPE_RELOCATE; 92 95 record[VERSION_OFFSET] = HINT_FORMAT_VERSION; 96 + let rc16 = u16::try_from(refcount).unwrap_or(u16::MAX); 97 + record[REFCOUNT_OFFSET..REFCOUNT_OFFSET + 2].copy_from_slice(&rc16.to_le_bytes()); 93 98 record[CID_OFFSET..CID_OFFSET + CID_SIZE].copy_from_slice(cid_bytes); 94 99 record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4].copy_from_slice(&file_id.raw().to_le_bytes()); 95 100 record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8].copy_from_slice(&length.raw().to_le_bytes()); ··· 158 163 file_id: DataFileId, 159 164 offset: BlockOffset, 160 165 length: BlockLength, 166 + refcount: u32, 161 167 }, 162 168 Remove { 163 169 cid_bytes: [u8; CID_SIZE], ··· 255 261 })) 256 262 } 257 263 RECORD_TYPE_RELOCATE => { 264 + let rc16 = u16::from_le_bytes( 265 + record[REFCOUNT_OFFSET..REFCOUNT_OFFSET + 2] 266 + .try_into() 267 + .unwrap(), 268 + ); 269 + let refcount = match rc16 { 270 + 0 => 1, 271 + n => u32::from(n), 272 + }; 258 273 let file_id = DataFileId::new(u32::from_le_bytes( 259 274 record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4] 260 275 .try_into() ··· 278 293 file_id, 279 294 offset: block_offset, 280 295 length: BlockLength::new(raw_length), 296 + refcount, 281 297 })) 282 298 } 283 299 RECORD_TYPE_REMOVE => Ok(Some(ReadHintRecord::Remove { cid_bytes })), ··· 341 357 file_id: DataFileId, 342 358 offset: BlockOffset, 343 359 length: BlockLength, 360 + refcount: u32, 344 361 ) -> io::Result<()> { 345 362 encode_relocate_record( 346 363 self.io, ··· 350 367 file_id, 351 368 offset, 352 369 length, 370 + refcount, 353 371 )?; 354 372 self.position = self.position.advance(HINT_RECORD_SIZE as u64); 355 373 Ok(()) ··· 575 593 let mut replayed: u64 = 0; 576 594 let mut put_buffer: Vec<([u8; CID_SIZE], BlockLocation)> = 577 595 Vec::with_capacity(REPLAY_BATCH_SIZE); 578 - let mut relocate_buffer: Vec<([u8; CID_SIZE], BlockLocation)> = 596 + let mut relocate_buffer: Vec<([u8; CID_SIZE], BlockLocation, u32)> = 579 597 Vec::with_capacity(REPLAY_BATCH_SIZE); 580 598 let mut remove_buffer: Vec<[u8; CID_SIZE]> = Vec::with_capacity(REPLAY_BATCH_SIZE); 581 599 ··· 663 681 file_id, 664 682 offset, 665 683 length, 684 + refcount, 666 685 } => { 667 686 let loc = BlockLocation { 668 687 file_id, 669 688 offset, 670 689 length, 671 690 }; 672 - relocate_buffer.push((cid_bytes, loc)); 691 + relocate_buffer.push((cid_bytes, loc, refcount)); 673 692 674 693 let record_end = 675 694 offset.advance(BLOCK_RECORD_OVERHEAD as u64 + length.as_u64());
+254
crates/tranquil-store/tests/compaction_liveness.rs
··· 1 + mod common; 2 + 3 + use std::collections::HashSet; 4 + 5 + use tranquil_store::blockstore::{ 6 + BlockStoreConfig, CidBytes, GroupCommitConfig, TranquilBlockStore, 7 + }; 8 + 9 + fn tiny_store_config(dir: &std::path::Path) -> BlockStoreConfig { 10 + BlockStoreConfig { 11 + data_dir: dir.join("data"), 12 + index_dir: dir.join("index"), 13 + max_file_size: 4096, 14 + group_commit: GroupCommitConfig { 15 + checkpoint_interval_ms: 600_000, 16 + checkpoint_write_threshold: 1_000_000, 17 + ..GroupCommitConfig::default() 18 + }, 19 + shard_count: 1, 20 + } 21 + } 22 + 23 + fn make_block(seed: u32, size: usize) -> (CidBytes, Vec<u8>) { 24 + ( 25 + common::test_cid(seed), 26 + common::block_data(seed) 27 + .into_iter() 28 + .cycle() 29 + .take(size) 30 + .collect(), 31 + ) 32 + } 33 + 34 + fn verify_live_blocks(store: &TranquilBlockStore, live: &HashSet<u32>, context: &str) { 35 + let missing: Vec<u32> = live 36 + .iter() 37 + .copied() 38 + .filter(|&seed| { 39 + store 40 + .get_block_sync(&common::test_cid(seed)) 41 + .unwrap() 42 + .is_none() 43 + }) 44 + .collect(); 45 + 46 + assert!( 47 + missing.is_empty(), 48 + "{context}: {count} live blocks missing from store: {missing:?}", 49 + count = missing.len(), 50 + ); 51 + } 52 + 53 + fn compact_sealed(store: &TranquilBlockStore) { 54 + let files = store.list_data_files().unwrap(); 55 + files 56 + .iter() 57 + .copied() 58 + .take(files.len().saturating_sub(1)) 59 + .for_each(|fid| { 60 + let _ = store.compact_file(fid, 0); 61 + }); 62 + } 63 + 64 + fn delete_checkpoints(index_dir: &std::path::Path) { 65 + let _ = std::fs::remove_file(index_dir.join("checkpoint_a.tqc")); 66 + let _ = std::fs::remove_file(index_dir.join("checkpoint_b.tqc")); 67 + } 68 + 69 + #[test] 70 + fn relocate_loses_refcount_on_hint_rebuild() { 71 + common::with_runtime(|| { 72 + let dir = tempfile::TempDir::new().unwrap(); 73 + 74 + let target = common::test_cid(1); 75 + let target_data = vec![0xABu8; 200]; 76 + 77 + { 78 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 79 + 80 + store 81 + .put_blocks_blocking(vec![(target, target_data.clone())]) 82 + .unwrap(); 83 + store 84 + .put_blocks_blocking(vec![(target, target_data.clone())]) 85 + .unwrap(); 86 + 87 + let padding: Vec<_> = (100..130u32).map(|s| make_block(s, 300)).collect(); 88 + store.put_blocks_blocking(padding).unwrap(); 89 + 90 + std::thread::sleep(std::time::Duration::from_millis(10)); 91 + compact_sealed(&store); 92 + 93 + store.apply_commit_blocking(vec![], vec![target]).unwrap(); 94 + 95 + let data = store.get_block_sync(&target).unwrap(); 96 + assert!(data.is_some(), "target should be live, refcount 2 - 1 = 1"); 97 + } 98 + 99 + delete_checkpoints(&dir.path().join("index")); 100 + 101 + { 102 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 103 + 104 + let data = store.get_block_sync(&target).unwrap(); 105 + assert!( 106 + data.is_some(), 107 + "BUG: target missing after hint-only rebuild. \ 108 + RELOCATE created entry with refcount 1 instead of 2, \ 109 + then DEC brought it to 0 instead of 1." 110 + ); 111 + 112 + std::thread::sleep(std::time::Duration::from_millis(10)); 113 + compact_sealed(&store); 114 + 115 + let data = store.get_block_sync(&target).unwrap(); 116 + assert!( 117 + data.is_some(), 118 + "BUG: target removed by compaction after hint rebuild \ 119 + incorrectly set refcount to 0" 120 + ); 121 + } 122 + }); 123 + } 124 + 125 + #[test] 126 + fn multi_restart_with_compaction_between_put_and_dec() { 127 + common::with_runtime(|| { 128 + let dir = tempfile::TempDir::new().unwrap(); 129 + 130 + let shared = common::test_cid(42); 131 + let shared_data = vec![0xCDu8; 200]; 132 + 133 + { 134 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 135 + 136 + store 137 + .put_blocks_blocking(vec![(shared, shared_data.clone())]) 138 + .unwrap(); 139 + store 140 + .put_blocks_blocking(vec![(shared, shared_data.clone())]) 141 + .unwrap(); 142 + store 143 + .put_blocks_blocking(vec![(shared, shared_data.clone())]) 144 + .unwrap(); 145 + 146 + let filler: Vec<_> = (200..230u32).map(|s| make_block(s, 300)).collect(); 147 + store.put_blocks_blocking(filler).unwrap(); 148 + } 149 + 150 + delete_checkpoints(&dir.path().join("index")); 151 + 152 + { 153 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 154 + 155 + let data = store.get_block_sync(&shared).unwrap(); 156 + assert!( 157 + data.is_some(), 158 + "round 1: shared block present after rebuild" 159 + ); 160 + 161 + std::thread::sleep(std::time::Duration::from_millis(10)); 162 + compact_sealed(&store); 163 + 164 + store.apply_commit_blocking(vec![], vec![shared]).unwrap(); 165 + 166 + let data = store.get_block_sync(&shared).unwrap(); 167 + assert!( 168 + data.is_some(), 169 + "round 1: shared block should survive, refcount 3 - 1 = 2" 170 + ); 171 + } 172 + 173 + delete_checkpoints(&dir.path().join("index")); 174 + 175 + { 176 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 177 + 178 + let data = store.get_block_sync(&shared).unwrap(); 179 + assert!( 180 + data.is_some(), 181 + "round 2: shared block should survive hint rebuild, refcount should be 2" 182 + ); 183 + 184 + store.apply_commit_blocking(vec![], vec![shared]).unwrap(); 185 + 186 + let data = store.get_block_sync(&shared).unwrap(); 187 + assert!( 188 + data.is_some(), 189 + "round 2: shared block should survive DEC, refcount 2 - 1 = 1" 190 + ); 191 + 192 + std::thread::sleep(std::time::Duration::from_millis(10)); 193 + compact_sealed(&store); 194 + 195 + let data = store.get_block_sync(&shared).unwrap(); 196 + assert!( 197 + data.is_some(), 198 + "BUG: shared block removed by compaction. \ 199 + Multiple restarts with RELOCATE collapsed refcount \ 200 + from 3 down to 1, two DECs made it 0." 201 + ); 202 + } 203 + }); 204 + } 205 + 206 + #[test] 207 + fn stress_create_delete_restart_cycle_matches_bug_report() { 208 + common::with_runtime(|| { 209 + let dir = tempfile::TempDir::new().unwrap(); 210 + let mut live: HashSet<u32> = HashSet::new(); 211 + let mut rng = common::Rng::new(12345); 212 + let mut next_seed: u32 = 0; 213 + 214 + (0..4).for_each(|cycle| { 215 + { 216 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 217 + 218 + (0..20).for_each(|_| { 219 + let seed_a = next_seed; 220 + let seed_b = next_seed + 1; 221 + next_seed += 2; 222 + 223 + store 224 + .put_blocks_blocking(vec![make_block(seed_a, 150), make_block(seed_b, 150)]) 225 + .unwrap(); 226 + live.insert(seed_a); 227 + live.insert(seed_b); 228 + 229 + if rng.next_u32() % 2 == 0 { 230 + let victim: Option<u32> = live.iter().copied().next(); 231 + if let Some(v) = victim { 232 + store 233 + .apply_commit_blocking(vec![], vec![common::test_cid(v)]) 234 + .unwrap(); 235 + live.remove(&v); 236 + } 237 + } 238 + }); 239 + 240 + std::thread::sleep(std::time::Duration::from_millis(10)); 241 + compact_sealed(&store); 242 + 243 + verify_live_blocks(&store, &live, &format!("cycle {cycle} before kill")); 244 + } 245 + 246 + delete_checkpoints(&dir.path().join("index")); 247 + 248 + { 249 + let store = TranquilBlockStore::open(tiny_store_config(dir.path())).unwrap(); 250 + verify_live_blocks(&store, &live, &format!("cycle {cycle} after hint rebuild")); 251 + } 252 + }); 253 + }); 254 + }