Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): no more orphan data files, recover torn-hint tails, header-safe resume

Lewis: May this revision serve well! <lu5a@proton.me>

+188 -91
+2 -1
crates/tranquil-store/Cargo.toml
··· 37 37 tempfile = { version = "3", optional = true } 38 38 clap = { workspace = true, optional = true } 39 39 toml = { version = "0.8", optional = true } 40 + tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true } 40 41 41 42 [features] 42 43 test-harness = ["dep:tempfile"] 43 - gauntlet-cli = ["test-harness", "dep:clap", "dep:toml"] 44 + gauntlet-cli = ["test-harness", "dep:clap", "dep:toml", "dep:tracing-subscriber"] 44 45 45 46 [[bin]] 46 47 name = "tranquil-gauntlet"
+7
crates/tranquil-store/src/bin/tranquil_gauntlet.rs
··· 365 365 } 366 366 367 367 fn main() -> ExitCode { 368 + let _ = tracing_subscriber::fmt() 369 + .with_env_filter( 370 + tracing_subscriber::EnvFilter::try_from_default_env() 371 + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("off")), 372 + ) 373 + .with_writer(io::stderr) 374 + .try_init(); 368 375 let cli = Cli::parse(); 369 376 match cli.cmd { 370 377 Cmd::Farm {
+7
crates/tranquil-store/src/blockstore/compaction.rs
··· 102 102 .io() 103 103 .delete(&hint_file_path(manager.data_dir(), source_file_id)) 104 104 .ok(); 105 + if live_count == 0 { 106 + manager.delete_data_file(new_file_id).ok(); 107 + manager 108 + .io() 109 + .delete(&hint_file_path(manager.data_dir(), new_file_id)) 110 + .ok(); 111 + } 105 112 manager.io().sync_dir(manager.data_dir())?; 106 113 107 114 let reclaimed_bytes = source_size.saturating_sub(new_size);
+54 -33
crates/tranquil-store/src/blockstore/group_commit.rs
··· 563 563 ))); 564 564 } 565 565 566 + let header_end = super::data_file::BLOCK_HEADER_SIZE as u64; 567 + let position = match file_size < header_end { 568 + true => { 569 + let writer = DataFileWriter::new(manager.io(), fd, wc.file_id)?; 570 + writer.sync()?; 571 + writer.position() 572 + } 573 + false => BlockOffset::new(file_size), 574 + }; 575 + 566 576 let hint_path = hint_file_path(data_dir, wc.file_id); 567 577 let hint_fd = manager.io().open(&hint_path, OpenOptions::read_write())?; 568 578 let hint_size = manager.io().file_size(hint_fd)?; ··· 570 580 Ok(ActiveState { 571 581 file_id: wc.file_id, 572 582 fd, 573 - position: BlockOffset::new(file_size), 583 + position, 574 584 hint_fd, 575 585 hint_position: HintOffset::new(hint_size), 576 586 }) ··· 1170 1180 state: &ActiveState, 1171 1181 rotations: &[RotationState], 1172 1182 ) { 1183 + let _ = manager.io().truncate(state.fd, state.position.raw()); 1184 + let _ = manager.io().sync(state.fd); 1173 1185 let _ = manager 1174 1186 .io() 1175 1187 .truncate(state.hint_fd, state.hint_position.raw()); ··· 1226 1238 hint_writer.append_hint(cid_bytes, &loc)?; 1227 1239 loc 1228 1240 } 1229 - None => { 1230 - if manager.should_rotate(data_writer.position()) { 1231 - data_writer.sync()?; 1232 - hint_writer.sync()?; 1241 + None => match index.get(cid_bytes) { 1242 + Some(existing) => { 1243 + dedup_hits = dedup_hits.saturating_add(1); 1244 + let loc = existing.location; 1245 + hint_writer.append_hint(cid_bytes, &loc)?; 1246 + dedup.insert(*cid_bytes, loc); 1247 + loc 1248 + } 1249 + None => { 1250 + if manager.should_rotate(data_writer.position()) { 1251 + data_writer.sync()?; 1252 + hint_writer.sync()?; 1233 1253 1234 - let next_id = ctx.file_ids.allocate(); 1235 - let next_fd = manager.open_for_append(next_id)?; 1254 + let next_id = ctx.file_ids.allocate(); 1255 + let next_fd = manager.open_for_append(next_id)?; 1236 1256 1237 - tracing::info!( 1238 - from = %data_writer.file_id(), 1239 - to = %next_id, 1240 - "data file rotation" 1241 - ); 1257 + tracing::info!( 1258 + from = %data_writer.file_id(), 1259 + to = %next_id, 1260 + "data file rotation" 1261 + ); 1242 1262 1243 - data_writer = DataFileWriter::new(manager.io(), next_fd, next_id)?; 1263 + data_writer = DataFileWriter::new(manager.io(), next_fd, next_id)?; 1244 1264 1245 - let new_hint_path = hint_file_path(manager.data_dir(), next_id); 1246 - let new_hint_fd = manager 1247 - .io() 1248 - .open(&new_hint_path, OpenOptions::read_write())?; 1265 + let new_hint_path = hint_file_path(manager.data_dir(), next_id); 1266 + let new_hint_fd = manager 1267 + .io() 1268 + .open(&new_hint_path, OpenOptions::read_write())?; 1249 1269 1250 - manager.io().sync_dir(manager.data_dir())?; 1270 + manager.io().sync_dir(manager.data_dir())?; 1251 1271 1252 - current_hint_fd = new_hint_fd; 1253 - hint_writer = HintFileWriter::new(manager.io(), new_hint_fd); 1254 - rotations.push(RotationState { 1255 - file_id: next_id, 1256 - fd: next_fd, 1257 - hint_fd: new_hint_fd, 1258 - }); 1259 - } 1272 + current_hint_fd = new_hint_fd; 1273 + hint_writer = HintFileWriter::new(manager.io(), new_hint_fd); 1274 + rotations.push(RotationState { 1275 + file_id: next_id, 1276 + fd: next_fd, 1277 + hint_fd: new_hint_fd, 1278 + }); 1279 + } 1260 1280 1261 - let loc = data_writer.append_block(cid_bytes, data)?; 1262 - hint_writer.append_hint(cid_bytes, &loc)?; 1281 + let loc = data_writer.append_block(cid_bytes, data)?; 1282 + hint_writer.append_hint(cid_bytes, &loc)?; 1263 1283 1264 - block_bytes = block_bytes.saturating_add(data.len() as u64); 1265 - block_count = block_count.saturating_add(1); 1266 - dedup.insert(*cid_bytes, loc); 1267 - loc 1268 - } 1284 + block_bytes = block_bytes.saturating_add(data.len() as u64); 1285 + block_count = block_count.saturating_add(1); 1286 + dedup.insert(*cid_bytes, loc); 1287 + loc 1288 + } 1289 + }, 1269 1290 }; 1270 1291 1271 1292 index_entries.push((*cid_bytes, location));
+94 -47
crates/tranquil-store/src/blockstore/hash_index.rs
··· 704 704 const CHECKPOINT_MAGIC: [u8; 8] = *b"TQCKPT01"; 705 705 const CHECKPOINT_VERSION_V1: u32 = 1; 706 706 const CHECKPOINT_VERSION_V2: u32 = 2; 707 + const CHECKPOINT_VERSION_V3: u32 = 3; 707 708 const CHECKPOINT_HEADER_SIZE: usize = 128; 708 709 const TRAILER_MAGIC: u64 = 0xDEAD_BEEF_CAFE_F00D; 709 710 const SLOT_SIZE: usize = std::mem::size_of::<Slot>(); ··· 733 734 const H_HINT_FILE_ID: usize = 64; 734 735 const H_HINT_OFFSET: usize = 72; 735 736 const H_HEADER_CHECKSUM: usize = 80; 737 + const H_GENERATION: usize = 88; 736 738 737 739 fn header_checksum(buf: &[u8; CHECKPOINT_HEADER_SIZE]) -> u64 { 738 740 xxhash_rust::xxh3::xxh3_64(&buf[..H_HEADER_CHECKSUM]) ··· 745 747 cursor_offset: u64, 746 748 checkpoint_epoch: u64, 747 749 shard_count: u16, 750 + generation: u64, 748 751 ) -> [u8; CHECKPOINT_HEADER_SIZE] { 749 752 let mut buf = [0u8; CHECKPOINT_HEADER_SIZE]; 750 753 buf[H_MAGIC..H_MAGIC + 8].copy_from_slice(&CHECKPOINT_MAGIC); 751 - buf[H_VERSION..H_VERSION + 4].copy_from_slice(&CHECKPOINT_VERSION_V2.to_le_bytes()); 754 + buf[H_VERSION..H_VERSION + 4].copy_from_slice(&CHECKPOINT_VERSION_V3.to_le_bytes()); 752 755 buf[H_SHARD_COUNT..H_SHARD_COUNT + 2].copy_from_slice(&shard_count.to_le_bytes()); 753 756 buf[H_SLOT_COUNT..H_SLOT_COUNT + 8].copy_from_slice(&slot_count.to_le_bytes()); 754 757 buf[H_ENTRY_COUNT..H_ENTRY_COUNT + 8].copy_from_slice(&entry_count.to_le_bytes()); ··· 756 759 buf[H_CURSOR_OFFSET..H_CURSOR_OFFSET + 8].copy_from_slice(&cursor_offset.to_le_bytes()); 757 760 buf[H_CHECKPOINT_EPOCH..H_CHECKPOINT_EPOCH + 8] 758 761 .copy_from_slice(&checkpoint_epoch.to_le_bytes()); 762 + buf[H_GENERATION..H_GENERATION + 8].copy_from_slice(&generation.to_le_bytes()); 759 763 let checksum = header_checksum(&buf); 760 764 buf[H_HEADER_CHECKSUM..H_HEADER_CHECKSUM + 8].copy_from_slice(&checksum.to_le_bytes()); 761 765 buf ··· 796 800 table: &HashTable, 797 801 path: &Path, 798 802 epoch: CommitEpoch, 803 + generation: u64, 799 804 positions: &CheckpointPositions, 800 805 ) -> io::Result<()> { 801 806 use std::io::Write; ··· 817 822 cursor_offset, 818 823 epoch.raw(), 819 824 shard_count, 825 + generation, 820 826 ); 821 827 822 828 let slot_bytes = slots_as_bytes(&table.slots); ··· 844 850 Ok(()) 845 851 } 846 852 847 - fn parse_checkpoint_header(data: &[u8]) -> io::Result<(usize, usize, u32, u64, u64, u16)> { 853 + fn parse_checkpoint_header(data: &[u8]) -> io::Result<(usize, usize, u32, u64, u64, u16, u64)> { 848 854 if data.len() < CHECKPOINT_HEADER_SIZE + 16 { 849 855 return Err(io::Error::new( 850 856 io::ErrorKind::InvalidData, ··· 863 869 } 864 870 865 871 let version = u32::from_le_bytes(hdr[H_VERSION..H_VERSION + 4].try_into().unwrap()); 866 - if version != CHECKPOINT_VERSION_V1 && version != CHECKPOINT_VERSION_V2 { 872 + if version != CHECKPOINT_VERSION_V1 873 + && version != CHECKPOINT_VERSION_V2 874 + && version != CHECKPOINT_VERSION_V3 875 + { 867 876 return Err(io::Error::new( 868 877 io::ErrorKind::InvalidData, 869 878 format!("checkpoint version {version} unsupported"), ··· 921 930 ); 922 931 923 932 let shard_count = match version { 924 - CHECKPOINT_VERSION_V2 => { 933 + CHECKPOINT_VERSION_V2 | CHECKPOINT_VERSION_V3 => { 925 934 u16::from_le_bytes(hdr[H_SHARD_COUNT..H_SHARD_COUNT + 2].try_into().unwrap()) 926 935 } 927 936 _ => 0, 928 937 }; 929 938 939 + let generation = match version { 940 + CHECKPOINT_VERSION_V3 => { 941 + u64::from_le_bytes(hdr[H_GENERATION..H_GENERATION + 8].try_into().unwrap()) 942 + } 943 + _ => 0, 944 + }; 945 + 930 946 Ok(( 931 947 slot_count, 932 948 entry_count, ··· 934 950 cursor_offset, 935 951 checkpoint_epoch, 936 952 shard_count, 953 + generation, 937 954 )) 938 955 } 939 956 ··· 948 965 .collect() 949 966 } 950 967 951 - pub fn read_checkpoint(path: &Path) -> io::Result<(HashTable, CommitEpoch, CheckpointPositions)> { 968 + pub fn read_checkpoint( 969 + path: &Path, 970 + ) -> io::Result<(HashTable, CommitEpoch, CheckpointPositions, u64)> { 952 971 let data = std::fs::read(path)?; 953 972 954 - let (slot_count, entry_count, cursor_file_id, cursor_offset, checkpoint_epoch, shard_count) = 955 - parse_checkpoint_header(&data)?; 973 + let ( 974 + slot_count, 975 + entry_count, 976 + cursor_file_id, 977 + cursor_offset, 978 + checkpoint_epoch, 979 + shard_count, 980 + generation, 981 + ) = parse_checkpoint_header(&data)?; 956 982 957 983 let hdr: &[u8; CHECKPOINT_HEADER_SIZE] = data[..CHECKPOINT_HEADER_SIZE].try_into().unwrap(); 958 984 let version = u32::from_le_bytes(hdr[H_VERSION..H_VERSION + 4].try_into().unwrap()); ··· 972 998 let shard_pos_region = &data[shard_pos_start..shard_pos_start + shard_pos_size]; 973 999 974 1000 let data_checksum = match version { 975 - CHECKPOINT_VERSION_V2 => { 1001 + CHECKPOINT_VERSION_V2 | CHECKPOINT_VERSION_V3 => { 976 1002 let mut hasher = xxhash_rust::xxh3::Xxh3::new(); 977 1003 hasher.update(slot_region); 978 1004 hasher.update(shard_pos_region); ··· 1035 1061 let epoch = CommitEpoch::new(checkpoint_epoch); 1036 1062 1037 1063 let positions = match version { 1038 - CHECKPOINT_VERSION_V2 if shard_count > 0 => CheckpointPositions( 1064 + CHECKPOINT_VERSION_V2 | CHECKPOINT_VERSION_V3 if shard_count > 0 => CheckpointPositions( 1039 1065 deserialize_shard_positions(shard_pos_region, shard_count as usize), 1040 1066 ), 1041 1067 _ => { ··· 1047 1073 } 1048 1074 }; 1049 1075 1050 - Ok((table, epoch, positions)) 1076 + Ok((table, epoch, positions, generation)) 1051 1077 } 1052 1078 1053 1079 pub fn load_best_checkpoint( 1054 1080 index_dir: &Path, 1055 - ) -> Option<(HashTable, CommitEpoch, CheckpointPositions)> { 1081 + ) -> Option<(HashTable, CommitEpoch, CheckpointPositions, u64)> { 1056 1082 let path_a = index_dir.join("checkpoint_a.tqc"); 1057 1083 let path_b = index_dir.join("checkpoint_b.tqc"); 1058 1084 ··· 1060 1086 let result_b = read_checkpoint(&path_b).ok(); 1061 1087 1062 1088 match (result_a, result_b) { 1063 - (Some(a), Some(b)) => match a.1.raw() >= b.1.raw() { 1089 + (Some(a), Some(b)) => match (a.3, a.1.raw()) >= (b.3, b.1.raw()) { 1064 1090 true => Some(a), 1065 1091 false => Some(b), 1066 1092 }, ··· 1070 1096 } 1071 1097 } 1072 1098 1073 - fn read_checkpoint_epoch(path: &Path) -> Option<u64> { 1099 + fn read_checkpoint_meta(path: &Path) -> Option<(u64, u64)> { 1074 1100 let mut file = std::fs::File::open(path).ok()?; 1075 1101 let mut buf = [0u8; CHECKPOINT_HEADER_SIZE]; 1076 1102 std::io::Read::read_exact(&mut file, &mut buf).ok()?; ··· 1081 1107 } 1082 1108 1083 1109 let version = u32::from_le_bytes(buf[H_VERSION..H_VERSION + 4].try_into().ok()?); 1084 - if version != CHECKPOINT_VERSION_V1 && version != CHECKPOINT_VERSION_V2 { 1110 + if version != CHECKPOINT_VERSION_V1 1111 + && version != CHECKPOINT_VERSION_V2 1112 + && version != CHECKPOINT_VERSION_V3 1113 + { 1085 1114 return None; 1086 1115 } 1087 1116 ··· 1094 1123 return None; 1095 1124 } 1096 1125 1097 - Some(u64::from_le_bytes( 1126 + let epoch = u64::from_le_bytes( 1098 1127 buf[H_CHECKPOINT_EPOCH..H_CHECKPOINT_EPOCH + 8] 1099 1128 .try_into() 1100 1129 .ok()?, 1101 - )) 1130 + ); 1131 + let generation = match version { 1132 + CHECKPOINT_VERSION_V3 => { 1133 + u64::from_le_bytes(buf[H_GENERATION..H_GENERATION + 8].try_into().ok()?) 1134 + } 1135 + _ => 0, 1136 + }; 1137 + Some((epoch, generation)) 1102 1138 } 1103 1139 1104 1140 pub fn write_checkpoint_ab( 1105 1141 table: &HashTable, 1106 1142 index_dir: &Path, 1107 1143 epoch: CommitEpoch, 1144 + generation: u64, 1108 1145 positions: &CheckpointPositions, 1109 1146 ) -> io::Result<()> { 1110 1147 let path_a = index_dir.join("checkpoint_a.tqc"); 1111 1148 let path_b = index_dir.join("checkpoint_b.tqc"); 1112 1149 1113 - let epoch_a = read_checkpoint_epoch(&path_a); 1114 - let epoch_b = read_checkpoint_epoch(&path_b); 1150 + let meta_a = read_checkpoint_meta(&path_a); 1151 + let meta_b = read_checkpoint_meta(&path_b); 1115 1152 1116 - let target_path = match (epoch_a, epoch_b) { 1117 - (Some(a), Some(b)) if a >= b => path_b, 1153 + let target_path = match (meta_a, meta_b) { 1154 + (Some(a), Some(b)) if (a.1, a.0) >= (b.1, b.0) => path_b, 1118 1155 (Some(_), Some(_)) => path_a, 1119 1156 (Some(_), None) => path_b, 1120 1157 (None, _) => path_a, 1121 1158 }; 1122 1159 1123 - write_checkpoint(table, &target_path, epoch, positions) 1160 + write_checkpoint(table, &target_path, epoch, generation, positions) 1124 1161 } 1125 1162 1126 1163 #[derive(Debug)] ··· 1146 1183 checkpoint_lock: parking_lot::Mutex<()>, 1147 1184 loaded_checkpoint_positions: Option<CheckpointPositions>, 1148 1185 loaded_checkpoint_epoch: Option<CommitEpoch>, 1186 + next_generation: std::sync::atomic::AtomicU64, 1149 1187 } 1150 1188 1151 1189 impl BlockIndex { ··· 1156 1194 checkpoint_lock: parking_lot::Mutex::new(()), 1157 1195 loaded_checkpoint_positions: None, 1158 1196 loaded_checkpoint_epoch: None, 1197 + next_generation: std::sync::atomic::AtomicU64::new(1), 1159 1198 } 1160 1199 } 1161 1200 1162 1201 pub fn open(index_dir: &Path) -> io::Result<Self> { 1163 1202 std::fs::create_dir_all(index_dir)?; 1164 - let (table, checkpoint_positions, checkpoint_epoch) = match load_best_checkpoint(index_dir) 1165 - { 1166 - Some((table, epoch, positions)) => { 1167 - tracing::info!( 1168 - blocks = table.len(), 1169 - epoch = epoch.raw(), 1170 - shard_positions = positions.0.len(), 1171 - "loaded block index from checkpoint" 1172 - ); 1173 - (table, Some(positions), Some(epoch)) 1174 - } 1175 - None => { 1176 - tracing::info!("no valid checkpoint found, starting with empty index"); 1177 - (HashTable::with_capacity(64), None, None) 1178 - } 1179 - }; 1203 + let (table, checkpoint_positions, checkpoint_epoch, loaded_generation) = 1204 + match load_best_checkpoint(index_dir) { 1205 + Some((table, epoch, positions, gen_value)) => { 1206 + tracing::info!( 1207 + blocks = table.len(), 1208 + epoch = epoch.raw(), 1209 + shard_positions = positions.0.len(), 1210 + generation = gen_value, 1211 + "loaded block index from checkpoint" 1212 + ); 1213 + (table, Some(positions), Some(epoch), gen_value) 1214 + } 1215 + None => { 1216 + tracing::info!("no valid checkpoint found, starting with empty index"); 1217 + (HashTable::with_capacity(64), None, None, 0) 1218 + } 1219 + }; 1180 1220 Ok(Self { 1181 1221 table: RwLock::new(table), 1182 1222 index_dir: index_dir.to_path_buf(), 1183 1223 checkpoint_lock: parking_lot::Mutex::new(()), 1184 1224 loaded_checkpoint_positions: checkpoint_positions, 1185 1225 loaded_checkpoint_epoch: checkpoint_epoch, 1226 + next_generation: std::sync::atomic::AtomicU64::new(loaded_generation + 1), 1186 1227 }) 1187 1228 } 1188 1229 ··· 1481 1522 hint_positions: &ShardHintPositions, 1482 1523 ) -> io::Result<()> { 1483 1524 let _guard = self.checkpoint_lock.lock(); 1525 + let generation = self 1526 + .next_generation 1527 + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); 1484 1528 let table = self.table.read(); 1485 1529 let positions = hint_positions.snapshot(); 1486 - write_checkpoint_ab(&table, &self.index_dir, epoch, &positions) 1530 + write_checkpoint_ab(&table, &self.index_dir, epoch, generation, &positions) 1487 1531 } 1488 1532 1489 1533 pub fn write_checkpoint_with_positions( ··· 1492 1536 positions: &CheckpointPositions, 1493 1537 ) -> io::Result<()> { 1494 1538 let _guard = self.checkpoint_lock.lock(); 1539 + let generation = self 1540 + .next_generation 1541 + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); 1495 1542 let table = self.table.read(); 1496 - write_checkpoint_ab(&table, &self.index_dir, epoch, positions) 1543 + write_checkpoint_ab(&table, &self.index_dir, epoch, generation, positions) 1497 1544 } 1498 1545 1499 1546 pub fn index_dir(&self) -> &Path { ··· 2045 2092 let epoch = CommitEpoch::new(42); 2046 2093 let positions = CheckpointPositions::single(DataFileId::new(5), HintOffset::new(12345)); 2047 2094 2048 - write_checkpoint(&table, &path, epoch, &positions).unwrap(); 2049 - let (restored, restored_epoch, restored_pos) = read_checkpoint(&path).unwrap(); 2095 + write_checkpoint(&table, &path, epoch, 7, &positions).unwrap(); 2096 + let (restored, restored_epoch, restored_pos, _gen) = read_checkpoint(&path).unwrap(); 2050 2097 2051 2098 assert_eq!(restored.len(), 10); 2052 2099 assert_eq!(restored_epoch.raw(), 42); ··· 2078 2125 table 2079 2126 .insert_or_increment(&test_cid(1), test_loc(0, 0, 10)) 2080 2127 .unwrap(); 2081 - write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(1), &pos).unwrap(); 2128 + write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(1), 1, &pos).unwrap(); 2082 2129 2083 2130 table 2084 2131 .insert_or_increment(&test_cid(2), test_loc(0, 100, 10)) 2085 2132 .unwrap(); 2086 - write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(2), &pos).unwrap(); 2133 + write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(2), 2, &pos).unwrap(); 2087 2134 2088 - let (best, epoch, _) = load_best_checkpoint(dir.path()).unwrap(); 2135 + let (best, epoch, _, _) = load_best_checkpoint(dir.path()).unwrap(); 2089 2136 assert_eq!(epoch.raw(), 2); 2090 2137 assert_eq!(best.len(), 2); 2091 2138 } ··· 2099 2146 table 2100 2147 .insert_or_increment(&test_cid(1), test_loc(0, 0, 10)) 2101 2148 .unwrap(); 2102 - write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(1), &pos).unwrap(); 2149 + write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(1), 1, &pos).unwrap(); 2103 2150 2104 2151 table 2105 2152 .insert_or_increment(&test_cid(2), test_loc(0, 100, 10)) 2106 2153 .unwrap(); 2107 - write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(2), &pos).unwrap(); 2154 + write_checkpoint_ab(&table, dir.path(), CommitEpoch::new(2), 2, &pos).unwrap(); 2108 2155 2109 2156 std::fs::write(dir.path().join("checkpoint_b.tqc"), b"corrupt").unwrap(); 2110 2157 2111 - let (best, epoch, _) = load_best_checkpoint(dir.path()).unwrap(); 2158 + let (best, epoch, _, _) = load_best_checkpoint(dir.path()).unwrap(); 2112 2159 assert_eq!(epoch.raw(), 1); 2113 2160 assert_eq!(best.len(), 1); 2114 2161 }
+3 -2
crates/tranquil-store/src/blockstore/store.rs
··· 393 393 io.sync(fd).map_err(RepoError::storage)?; 394 394 } 395 395 396 - if !hint_exists && !scanned_entries.is_empty() { 396 + if !scanned_entries.is_empty() { 397 397 tracing::info!( 398 398 file_id = %file_id, 399 399 scanned = scanned_entries.len(), 400 - "rebuilding index from data file (no hint file, treating as restored backup)" 400 + hint_exists, 401 + "reindexing blocks past hint coverage" 401 402 ); 402 403 let cursor = super::types::WriteCursor { 403 404 file_id,
+21 -8
crates/tranquil-store/src/gauntlet/invariants.rs
··· 7 7 use jacquard_repo::mst::Mst; 8 8 9 9 use super::oracle::{Oracle, hex_short, try_cid_to_fixed}; 10 - use crate::blockstore::{CidBytes, CompactionError, TranquilBlockStore, hash_to_cid_bytes}; 10 + use crate::blockstore::{ 11 + BLOCK_HEADER_SIZE, CidBytes, CompactionError, TranquilBlockStore, hash_to_cid_bytes, 12 + }; 11 13 use crate::eventlog::{EventSequence, SegmentId}; 12 14 use crate::io::{RealIO, StorageIO}; 13 15 ··· 388 390 let result = tokio::task::spawn_blocking(move || { 389 391 let disk = store_c.list_data_files().map_err(|e| e.to_string())?; 390 392 let liveness = store_c.compaction_liveness(0).map_err(|e| e.to_string())?; 393 + let header = BLOCK_HEADER_SIZE as u64; 391 394 let orphans: Vec<String> = disk 392 395 .iter() 393 396 .filter(|fid| !liveness.contains_key(fid)) 394 - .map(|fid| format!("{fid}")) 397 + .filter_map(|fid| { 398 + let path = store_c.data_file_path(*fid); 399 + let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0); 400 + match size > header { 401 + true => Some(format!("{fid} ({size} B)")), 402 + false => None, 403 + } 404 + }) 395 405 .collect(); 396 406 Ok::<_, String>(orphans) 397 407 }) ··· 479 489 tokio::task::spawn_blocking(move || { 480 490 let listed = store.list_data_files().map_err(|e| e.to_string())?; 481 491 let liveness = store.compaction_liveness(0).map_err(|e| e.to_string())?; 492 + let header = BLOCK_HEADER_SIZE as u64; 482 493 483 494 let mut violations: Vec<String> = Vec::new(); 484 495 listed.iter().for_each(|fid| { ··· 487 498 Err(e) => violations.push(format!("{fid}: metadata {e}")), 488 499 Ok(meta) => { 489 500 let on_disk = meta.len(); 501 + let content = on_disk.saturating_sub(header); 490 502 match liveness.get(fid) { 491 - None => violations.push(format!( 503 + None if on_disk > header => violations.push(format!( 492 504 "{fid}: listed on disk at {on_disk} B but not in index liveness" 493 505 )), 494 - Some(info) if on_disk < info.total_bytes => { 506 + None => {} 507 + Some(info) if content < info.total_bytes => { 495 508 violations.push(format!( 496 - "{fid}: on-disk {on_disk} B < index total_bytes {}", 509 + "{fid}: on-disk {on_disk} B (content {content}) < index total_bytes {}", 497 510 info.total_bytes 498 511 )); 499 512 } 500 - Some(info) if on_disk > info.total_bytes => { 513 + Some(info) if content > info.total_bytes => { 501 514 violations.push(format!( 502 - "{fid}: on-disk {on_disk} B > index total_bytes {}, {} B unaccounted", 515 + "{fid}: on-disk {on_disk} B (content {content}) > index total_bytes {}, {} B unaccounted", 503 516 info.total_bytes, 504 - on_disk - info.total_bytes 517 + content - info.total_bytes 505 518 )); 506 519 } 507 520 Some(_) => {}