Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): arc-counted cache handles, reader-eviction race

Lewis: May this revision serve well! <lu5a@proton.me>

+619 -300
+5 -5
crates/tranquil-store/src/blockstore/compaction.rs
··· 68 68 return Err(CompactionError::ActiveFileCannotBeCompacted); 69 69 } 70 70 71 - let source_fd = manager.open_for_read(source_file_id)?; 72 - let source_size = manager.io().file_size(source_fd)?; 71 + let source_handle = manager.open_for_read(source_file_id)?; 72 + let source_size = manager.io().file_size(source_handle.fd())?; 73 73 74 74 let new_file_id = file_ids.allocate(); 75 75 ··· 77 77 manager, 78 78 index, 79 79 source_file_id, 80 - source_fd, 80 + source_handle.fd(), 81 81 new_file_id, 82 82 current_epoch, 83 83 grace_period_ms, ··· 148 148 let mut reader = DataFileReader::open(manager.io(), source_fd)?; 149 149 let now = crate::wall_clock_ms(); 150 150 151 - let new_fd = manager.open_for_append(new_file_id)?; 152 - let mut writer = DataFileWriter::new(manager.io(), new_fd, new_file_id)?; 151 + let new_handle = manager.open_for_append(new_file_id)?; 152 + let mut writer = DataFileWriter::new(manager.io(), new_handle.fd(), new_file_id)?; 153 153 154 154 let hint_path = hint_file_path(manager.data_dir(), new_file_id); 155 155 let hint_fd = manager.io().open(&hint_path, OpenOptions::read_write())?;
+15 -11
crates/tranquil-store/src/blockstore/group_commit.rs
··· 553 553 554 554 match cursor { 555 555 Some(wc) => { 556 - let fd = manager.open_for_append(wc.file_id)?; 556 + let handle = manager.open_for_append(wc.file_id)?; 557 + let fd = handle.fd(); 557 558 let file_size = manager.io().file_size(fd)?; 558 559 559 560 if file_size < wc.offset.raw() { ··· 588 589 None => { 589 590 let file_id = file_ids.allocate(); 590 591 591 - let fd = manager.open_for_append(file_id)?; 592 + let handle = manager.open_for_append(file_id)?; 593 + let fd = handle.fd(); 592 594 let writer = DataFileWriter::new(manager.io(), fd, file_id)?; 593 595 writer.sync()?; 594 596 let position = writer.position(); ··· 1074 1076 shutdown_checkpoint(index, epoch, &ctx.hint_positions); 1075 1077 } 1076 1078 1077 - struct RotationState { 1079 + struct RotationState<S: StorageIO> { 1078 1080 file_id: DataFileId, 1079 - fd: FileId, 1081 + handle: Arc<super::manager::CachedHandle<S>>, 1080 1082 hint_fd: FileId, 1081 1083 } 1082 1084 ··· 1178 1180 fn rollback_batch<S: StorageIO>( 1179 1181 manager: &DataFileManager<S>, 1180 1182 state: &ActiveState, 1181 - rotations: &[RotationState], 1183 + rotations: &[RotationState<S>], 1182 1184 ) { 1183 1185 let _ = manager.io().truncate(state.fd, state.position.raw()); 1184 1186 let _ = manager.io().sync(state.fd); ··· 1187 1189 .truncate(state.hint_fd, state.hint_position.raw()); 1188 1190 let _ = manager.io().sync(state.hint_fd); 1189 1191 rotations.iter().for_each(|rot| { 1190 - manager.rollback_rotation(rot.file_id, rot.fd); 1192 + manager.rollback_rotation(rot.file_id); 1191 1193 let _ = manager.io().close(rot.hint_fd); 1192 1194 let _ = manager 1193 1195 .io() ··· 1210 1212 let mut all_decrements: Vec<[u8; CID_SIZE]> = Vec::new(); 1211 1213 1212 1214 let mut current_hint_fd = state.hint_fd; 1213 - let mut rotations: Vec<RotationState> = Vec::new(); 1215 + let mut rotations: Vec<RotationState<S>> = Vec::new(); 1214 1216 1215 1217 let mut data_writer = 1216 1218 DataFileWriter::resume(manager.io(), state.fd, state.file_id, state.position); ··· 1222 1224 hint_writer.sync().map_err(CommitError::from)?; 1223 1225 1224 1226 let next_id = ctx.file_ids.allocate(); 1225 - let next_fd = manager.open_for_append(next_id)?; 1227 + let next_handle = manager.open_for_append(next_id)?; 1228 + let next_fd = next_handle.fd(); 1226 1229 1227 1230 tracing::info!( 1228 1231 from = %data_writer.file_id(), 1229 1232 to = %next_id, 1230 - "data file rotation (batch boundary)" 1233 + trigger = "batch_boundary", 1234 + "data file rotation" 1231 1235 ); 1232 1236 1233 1237 data_writer = DataFileWriter::new(manager.io(), next_fd, next_id)?; ··· 1243 1247 hint_writer = HintFileWriter::new(manager.io(), new_hint_fd); 1244 1248 rotations.push(RotationState { 1245 1249 file_id: next_id, 1246 - fd: next_fd, 1250 + handle: next_handle, 1247 1251 hint_fd: new_hint_fd, 1248 1252 }); 1249 1253 } ··· 1348 1352 let last_idx = rotations.len() - 1; 1349 1353 rotations.iter().enumerate().for_each(|(i, rot)| { 1350 1354 if i == last_idx { 1351 - manager.commit_rotation(rot.file_id, rot.fd); 1355 + manager.commit_rotation(rot.file_id, &rot.handle); 1352 1356 ctx.active_files.register(ctx.shard_id, rot.file_id); 1353 1357 } else { 1354 1358 let _ = manager.io().close(rot.hint_fd);
+109 -92
crates/tranquil-store/src/blockstore/manager.rs
··· 1 1 use std::collections::HashMap; 2 2 use std::io; 3 3 use std::path::{Path, PathBuf}; 4 + use std::sync::Arc; 4 5 5 6 use parking_lot::RwLock; 6 7 ··· 13 14 14 15 pub(crate) const DATA_FILE_EXTENSION: &str = "tqb"; 15 16 16 - struct CachedHandle { 17 + pub struct CachedHandle<S: StorageIO> { 17 18 fd: FileId, 19 + io: Arc<S>, 18 20 writable: bool, 19 21 } 20 22 23 + impl<S: StorageIO> CachedHandle<S> { 24 + pub fn fd(&self) -> FileId { 25 + self.fd 26 + } 27 + 28 + pub fn is_writable(&self) -> bool { 29 + self.writable 30 + } 31 + } 32 + 33 + impl<S: StorageIO> Drop for CachedHandle<S> { 34 + fn drop(&mut self) { 35 + let _ = self.io.close(self.fd); 36 + } 37 + } 38 + 21 39 pub struct DataFileManager<S: StorageIO> { 22 - io: S, 40 + io: Arc<S>, 23 41 data_dir: PathBuf, 24 42 max_file_size: u64, 25 - handles: RwLock<HashMap<DataFileId, CachedHandle>>, 43 + handles: RwLock<HashMap<DataFileId, Arc<CachedHandle<S>>>>, 26 44 } 27 45 28 46 impl<S: StorageIO> DataFileManager<S> { 29 47 pub fn new(io: S, data_dir: PathBuf, max_file_size: u64) -> Self { 30 48 Self { 31 - io, 49 + io: Arc::new(io), 32 50 data_dir, 33 51 max_file_size, 34 52 handles: RwLock::new(HashMap::new()), ··· 40 58 } 41 59 42 60 pub fn io(&self) -> &S { 43 - &self.io 61 + self.io.as_ref() 44 62 } 45 63 46 64 pub fn data_dir(&self) -> &Path { ··· 56 74 .join(format!("{file_id}.{DATA_FILE_EXTENSION}")) 57 75 } 58 76 59 - pub fn open_for_append(&self, file_id: DataFileId) -> io::Result<FileId> { 77 + pub fn open_for_append(&self, file_id: DataFileId) -> io::Result<Arc<CachedHandle<S>>> { 60 78 { 61 79 let cache = self.handles.read(); 62 80 if let Some(entry) = cache.get(&file_id) 63 81 && entry.writable 64 82 { 65 - return Ok(entry.fd); 83 + return Ok(Arc::clone(entry)); 66 84 } 67 85 } 68 86 let path = self.data_file_path(file_id); 69 87 let fd = self.io.open(&path, OpenOptions::read_write())?; 70 88 let mut cache = self.handles.write(); 71 - match cache.get(&file_id) { 89 + match cache.get(&file_id).cloned() { 72 90 Some(entry) if entry.writable => { 73 91 let _ = self.io.close(fd); 74 - Ok(entry.fd) 92 + Ok(entry) 75 93 } 76 - Some(entry) => { 77 - let old_fd = entry.fd; 78 - cache.insert(file_id, CachedHandle { fd, writable: true }); 79 - let _ = self.io.close(old_fd); 80 - Ok(fd) 81 - } 82 - None => { 83 - cache.insert(file_id, CachedHandle { fd, writable: true }); 84 - Ok(fd) 94 + _ => { 95 + let handle = Arc::new(CachedHandle { 96 + fd, 97 + io: Arc::clone(&self.io), 98 + writable: true, 99 + }); 100 + cache.insert(file_id, Arc::clone(&handle)); 101 + Ok(handle) 85 102 } 86 103 } 87 104 } 88 105 89 - pub fn open_for_read(&self, file_id: DataFileId) -> io::Result<FileId> { 106 + pub fn open_for_read(&self, file_id: DataFileId) -> io::Result<Arc<CachedHandle<S>>> { 90 107 if let Some(entry) = self.handles.read().get(&file_id) { 91 - return Ok(entry.fd); 108 + return Ok(Arc::clone(entry)); 92 109 } 93 110 let path = self.data_file_path(file_id); 94 111 let fd = self.io.open(&path, OpenOptions::read_only_existing())?; 95 112 let mut cache = self.handles.write(); 96 - match cache.get(&file_id) { 113 + match cache.get(&file_id).cloned() { 97 114 Some(entry) => { 98 115 let _ = self.io.close(fd); 99 - Ok(entry.fd) 116 + Ok(entry) 100 117 } 101 118 None => { 102 - cache.insert( 103 - file_id, 104 - CachedHandle { 105 - fd, 106 - writable: false, 107 - }, 108 - ); 109 - Ok(fd) 119 + let handle = Arc::new(CachedHandle { 120 + fd, 121 + io: Arc::clone(&self.io), 122 + writable: false, 123 + }); 124 + cache.insert(file_id, Arc::clone(&handle)); 125 + Ok(handle) 110 126 } 111 127 } 112 128 } 113 129 114 - pub fn prepare_rotation(&self, current: DataFileId) -> io::Result<(DataFileId, FileId)> { 130 + pub fn prepare_rotation( 131 + &self, 132 + current: DataFileId, 133 + ) -> io::Result<(DataFileId, Arc<CachedHandle<S>>)> { 115 134 let next = current.next(); 116 135 let path = self.data_file_path(next); 117 136 let fd = self.io.open(&path, OpenOptions::read_write())?; 118 - Ok((next, fd)) 137 + let handle = Arc::new(CachedHandle { 138 + fd, 139 + io: Arc::clone(&self.io), 140 + writable: true, 141 + }); 142 + Ok((next, handle)) 119 143 } 120 144 121 - pub fn commit_rotation(&self, file_id: DataFileId, fd: FileId) { 122 - self.handles 123 - .write() 124 - .insert(file_id, CachedHandle { fd, writable: true }); 145 + pub fn commit_rotation(&self, file_id: DataFileId, handle: &Arc<CachedHandle<S>>) { 146 + self.handles.write().insert(file_id, Arc::clone(handle)); 125 147 } 126 148 127 - pub fn rollback_rotation(&self, file_id: DataFileId, fd: FileId) { 128 - let _ = self.io.close(fd); 149 + pub fn rollback_rotation(&self, file_id: DataFileId) { 129 150 self.handles.write().remove(&file_id); 130 151 let _ = self.io.delete(&self.data_file_path(file_id)); 131 152 } ··· 135 156 } 136 157 137 158 pub fn list_files(&self) -> io::Result<Vec<DataFileId>> { 138 - list_files_by_extension(&self.io, &self.data_dir, DATA_FILE_EXTENSION) 159 + list_files_by_extension(&*self.io, &self.data_dir, DATA_FILE_EXTENSION) 139 160 } 140 161 141 162 pub fn evict_handle(&self, file_id: DataFileId) { 142 - let removed = self.handles.write().remove(&file_id); 143 - if let Some(entry) = removed { 144 - let _ = self.io.close(entry.fd); 145 - } 163 + self.handles.write().remove(&file_id); 146 164 } 147 165 148 166 pub fn delete_data_file(&self, file_id: DataFileId) -> io::Result<()> { 149 167 self.evict_handle(file_id); 150 168 let path = self.data_file_path(file_id); 151 169 self.io.delete(&path) 152 - } 153 - } 154 - 155 - impl<S: StorageIO> Drop for DataFileManager<S> { 156 - fn drop(&mut self) { 157 - self.handles.write().drain().for_each(|(_, entry)| { 158 - let _ = self.io.close(entry.fd); 159 - }); 160 170 } 161 171 } 162 172 ··· 178 188 #[test] 179 189 fn open_for_append_creates_file() { 180 190 let mgr = setup_manager(1024); 181 - let fd = mgr.open_for_append(DataFileId::new(0)).unwrap(); 182 - assert_eq!(mgr.io().file_size(fd).unwrap(), 0); 191 + let handle = mgr.open_for_append(DataFileId::new(0)).unwrap(); 192 + assert_eq!(mgr.io().file_size(handle.fd()).unwrap(), 0); 183 193 } 184 194 185 195 #[test] ··· 191 201 #[test] 192 202 fn handle_cache_returns_same_fd() { 193 203 let mgr = setup_manager(1024); 194 - let fd1 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 195 - let fd2 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 196 - assert_eq!(fd1, fd2); 204 + let h1 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 205 + let h2 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 206 + assert_eq!(h1.fd(), h2.fd()); 197 207 } 198 208 199 209 #[test] 200 210 fn open_for_read_uses_cache_from_append() { 201 211 let mgr = setup_manager(1024); 202 - let fd_write = mgr.open_for_append(DataFileId::new(0)).unwrap(); 203 - let fd_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 204 - assert_eq!(fd_write, fd_read); 212 + let h_write = mgr.open_for_append(DataFileId::new(0)).unwrap(); 213 + let h_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 214 + assert_eq!(h_write.fd(), h_read.fd()); 205 215 } 206 216 207 217 #[test] 208 218 fn rotation_lifecycle_prepare_commit() { 209 219 let mgr = setup_manager(1024); 210 - let _fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 211 - let (next_id, next_fd) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); 220 + let _h0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 221 + let (next_id, next_handle) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); 212 222 assert_eq!(next_id, DataFileId::new(1)); 213 - assert_eq!(mgr.io().file_size(next_fd).unwrap(), 0); 223 + assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0); 214 224 mgr.io().sync_dir(mgr.data_dir()).unwrap(); 215 - mgr.commit_rotation(next_id, next_fd); 216 - assert_eq!(mgr.open_for_read(next_id).unwrap(), next_fd); 225 + mgr.commit_rotation(next_id, &next_handle); 226 + assert_eq!( 227 + mgr.open_for_read(next_id).unwrap().fd(), 228 + next_handle.fd() 229 + ); 217 230 } 218 231 219 232 #[test] 220 233 fn rotation_rollback_cleans_handle_and_deletes_file() { 221 234 let mgr = setup_manager(1024); 222 - let _fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 223 - let (next_id, next_fd) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); 224 - mgr.commit_rotation(next_id, next_fd); 235 + let _h0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 236 + let (next_id, next_handle) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); 237 + mgr.commit_rotation(next_id, &next_handle); 225 238 226 - assert_eq!(mgr.open_for_read(next_id).unwrap(), next_fd); 227 - mgr.rollback_rotation(next_id, next_fd); 239 + assert_eq!( 240 + mgr.open_for_read(next_id).unwrap().fd(), 241 + next_handle.fd() 242 + ); 243 + drop(next_handle); 244 + mgr.rollback_rotation(next_id); 228 245 229 246 let reopen = mgr.open_for_read(next_id); 230 247 assert!( 231 248 reopen.is_err_and(|e| e.kind() == io::ErrorKind::NotFound), 232 - "rollback must delete the data file so recovery cannot resurrect uncommitted bytes" 249 + "rollback_rotation must delete the data file so recovery cannot resurrect uncommitted bytes" 233 250 ); 234 251 } 235 252 ··· 245 262 #[test] 246 263 fn list_files_finds_data_files() { 247 264 let mgr = setup_manager(1024); 248 - let _fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 249 - let _fd3 = mgr.open_for_append(DataFileId::new(3)).unwrap(); 265 + let _h0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 266 + let _h3 = mgr.open_for_append(DataFileId::new(3)).unwrap(); 250 267 251 268 let files = mgr.list_files().unwrap(); 252 269 assert_eq!(files, vec![DataFileId::new(0), DataFileId::new(3)]); ··· 255 272 #[test] 256 273 fn list_files_ignores_non_data_files() { 257 274 let mgr = setup_manager(1024); 258 - let _fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 275 + let _h0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 259 276 mgr.io() 260 277 .open(Path::new("/data/notes.txt"), OpenOptions::read_write()) 261 278 .unwrap(); ··· 280 297 #[test] 281 298 fn rotate_and_write_across_files() { 282 299 let mgr = setup_manager(1024); 283 - let fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 284 - let mut writer0 = DataFileWriter::new(mgr.io(), fd0, DataFileId::new(0)).unwrap(); 300 + let h0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); 301 + let mut writer0 = DataFileWriter::new(mgr.io(), h0.fd(), DataFileId::new(0)).unwrap(); 285 302 let _ = writer0 286 303 .append_block(&test_cid(1), b"first file data") 287 304 .unwrap(); 288 305 writer0.sync().unwrap(); 289 306 290 - let (id1, fd1) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); 307 + let (id1, h1) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); 291 308 mgr.io().sync_dir(mgr.data_dir()).unwrap(); 292 - mgr.commit_rotation(id1, fd1); 293 - let mut writer1 = DataFileWriter::new(mgr.io(), fd1, id1).unwrap(); 309 + mgr.commit_rotation(id1, &h1); 310 + let mut writer1 = DataFileWriter::new(mgr.io(), h1.fd(), id1).unwrap(); 294 311 let _ = writer1 295 312 .append_block(&test_cid(2), b"second file data") 296 313 .unwrap(); 297 314 writer1.sync().unwrap(); 298 315 299 - let fd0_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 300 - let blocks0 = DataFileReader::open(mgr.io(), fd0_read) 316 + let h0_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 317 + let blocks0 = DataFileReader::open(mgr.io(), h0_read.fd()) 301 318 .unwrap() 302 319 .valid_blocks() 303 320 .unwrap(); 304 321 assert_eq!(blocks0.len(), 1); 305 322 assert_eq!(blocks0[0].2, b"first file data"); 306 323 307 - let fd1_read = mgr.open_for_read(id1).unwrap(); 308 - let blocks1 = DataFileReader::open(mgr.io(), fd1_read) 324 + let h1_read = mgr.open_for_read(id1).unwrap(); 325 + let blocks1 = DataFileReader::open(mgr.io(), h1_read.fd()) 309 326 .unwrap() 310 327 .valid_blocks() 311 328 .unwrap(); ··· 316 333 #[test] 317 334 fn read_cache_hit_from_writable_entry() { 318 335 let mgr = setup_manager(1024); 319 - let fd_write = mgr.open_for_append(DataFileId::new(0)).unwrap(); 320 - DataFileWriter::new(mgr.io(), fd_write, DataFileId::new(0)).unwrap(); 336 + let h_write = mgr.open_for_append(DataFileId::new(0)).unwrap(); 337 + DataFileWriter::new(mgr.io(), h_write.fd(), DataFileId::new(0)).unwrap(); 321 338 322 - let fd_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 323 - assert_eq!(fd_write, fd_read); 339 + let h_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 340 + assert_eq!(h_write.fd(), h_read.fd()); 324 341 } 325 342 326 343 #[test] ··· 339 356 mgr.io().sync_dir(mgr.data_dir()).unwrap(); 340 357 mgr.io().close(raw_fd).unwrap(); 341 358 342 - let fd_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 343 - let _reader = DataFileReader::open(mgr.io(), fd_read).unwrap(); 359 + let h_read = mgr.open_for_read(DataFileId::new(0)).unwrap(); 360 + let _reader = DataFileReader::open(mgr.io(), h_read.fd()).unwrap(); 344 361 345 - let fd_append = mgr.open_for_append(DataFileId::new(0)).unwrap(); 346 - assert_ne!(fd_read, fd_append); 362 + let h_append = mgr.open_for_append(DataFileId::new(0)).unwrap(); 363 + assert_ne!(h_read.fd(), h_append.fd()); 347 364 348 365 let mut writer = DataFileWriter::resume( 349 366 mgr.io(), 350 - fd_append, 367 + h_append.fd(), 351 368 DataFileId::new(0), 352 369 BlockOffset::new(BLOCK_HEADER_SIZE as u64), 353 370 ); ··· 356 373 .unwrap(); 357 374 writer.sync().unwrap(); 358 375 359 - let blocks = DataFileReader::open(mgr.io(), fd_append) 376 + let blocks = DataFileReader::open(mgr.io(), h_append.fd()) 360 377 .unwrap() 361 378 .valid_blocks() 362 379 .unwrap();
+1 -1
crates/tranquil-store/src/blockstore/mod.rs
··· 24 24 HINT_FILE_EXTENSION, HINT_RECORD_SIZE, HintFileReader, HintFileWriter, HintIndex, 25 25 ReadHintRecord, RebuildError, decode_hint_record, hint_file_path, scan_hints_to_memory, 26 26 }; 27 - pub use manager::{DEFAULT_MAX_FILE_SIZE, DataFileManager}; 27 + pub use manager::{CachedHandle, DEFAULT_MAX_FILE_SIZE, DataFileManager}; 28 28 pub use reader::{BlockStoreReader, ReadError}; 29 29 pub use store::QuiesceGuard; 30 30 pub use store::{BlockStoreConfig, DEFAULT_SHARD_COUNT, TranquilBlockStore};
+6 -6
crates/tranquil-store/src/blockstore/reader.rs
··· 104 104 }); 105 105 106 106 by_file.into_iter().try_for_each(|(file_id, mut entries)| { 107 - let fd = self.manager.open_for_read(file_id)?; 108 - let file_size = self.manager.io().file_size(fd)?; 107 + let handle = self.manager.open_for_read(file_id)?; 108 + let file_size = self.manager.io().file_size(handle.fd())?; 109 109 entries.sort_by_key(|(_, loc)| loc.offset); 110 110 entries.into_iter().try_for_each(|(orig_idx, loc)| { 111 - let data = self.decode_and_validate(fd, file_size, loc)?; 111 + let data = self.decode_and_validate(handle.fd(), file_size, loc)?; 112 112 results[orig_idx] = Some(data); 113 113 Ok::<_, ReadError>(()) 114 114 }) ··· 116 116 } 117 117 118 118 fn read_block_at(&self, location: BlockLocation) -> Result<Bytes, ReadError> { 119 - let fd = self.manager.open_for_read(location.file_id)?; 120 - let file_size = self.manager.io().file_size(fd)?; 121 - self.decode_and_validate(fd, file_size, location) 119 + let handle = self.manager.open_for_read(location.file_id)?; 120 + let file_size = self.manager.io().file_size(handle.fd())?; 121 + self.decode_and_validate(handle.fd(), file_size, location) 122 122 } 123 123 124 124 fn decode_and_validate(
+133 -130
crates/tranquil-store/src/eventlog/manager.rs
··· 1 1 use std::collections::HashMap; 2 2 use std::io; 3 3 use std::path::{Path, PathBuf}; 4 - use std::sync::atomic::{AtomicU64, Ordering}; 4 + use std::sync::Arc; 5 + use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 5 6 6 7 use parking_lot::RwLock; 7 8 ··· 25 26 (ext == SEGMENT_FILE_EXTENSION).then(|| stem.parse::<u32>().ok().map(SegmentId::new))? 26 27 } 27 28 28 - struct CachedSegmentHandle { 29 + pub struct CachedSegmentHandle<S: StorageIO> { 29 30 fd: FileId, 30 - sealed: bool, 31 + io: Arc<S>, 32 + sealed: AtomicBool, 31 33 writable: bool, 32 34 } 33 35 36 + impl<S: StorageIO> CachedSegmentHandle<S> { 37 + pub fn fd(&self) -> FileId { 38 + self.fd 39 + } 40 + 41 + pub fn is_sealed(&self) -> bool { 42 + self.sealed.load(Ordering::Acquire) 43 + } 44 + 45 + pub fn is_writable(&self) -> bool { 46 + self.writable 47 + } 48 + } 49 + 50 + impl<S: StorageIO> Drop for CachedSegmentHandle<S> { 51 + fn drop(&mut self) { 52 + let _ = self.io.close(self.fd); 53 + } 54 + } 55 + 34 56 pub struct SegmentManager<S: StorageIO> { 35 - io: S, 57 + io: Arc<S>, 36 58 segments_dir: PathBuf, 37 59 max_segment_size: u64, 38 - handles: RwLock<HashMap<SegmentId, CachedSegmentHandle>>, 60 + handles: RwLock<HashMap<SegmentId, Arc<CachedSegmentHandle<S>>>>, 39 61 retention_epoch: AtomicU64, 40 62 } 41 63 ··· 51 73 ); 52 74 io.mkdir(&segments_dir)?; 53 75 Ok(Self { 54 - io, 76 + io: Arc::new(io), 55 77 segments_dir, 56 78 max_segment_size, 57 79 handles: RwLock::new(HashMap::new()), ··· 60 82 } 61 83 62 84 pub fn io(&self) -> &S { 63 - &self.io 85 + self.io.as_ref() 64 86 } 65 87 66 88 pub fn segments_dir(&self) -> &Path { ··· 92 114 Ok(ids) 93 115 } 94 116 95 - pub fn open_for_read(&self, id: SegmentId) -> io::Result<FileId> { 117 + pub fn open_for_read(&self, id: SegmentId) -> io::Result<Arc<CachedSegmentHandle<S>>> { 96 118 if let Some(entry) = self.handles.read().get(&id) { 97 - return Ok(entry.fd); 119 + return Ok(Arc::clone(entry)); 98 120 } 99 121 let path = self.segment_path(id); 100 122 let fd = self.io.open(&path, OpenOptions::read_only_existing())?; 101 123 let mut cache = self.handles.write(); 102 - match cache.get(&id) { 124 + match cache.get(&id).cloned() { 103 125 Some(entry) => { 104 126 let _ = self.io.close(fd); 105 - Ok(entry.fd) 127 + Ok(entry) 106 128 } 107 129 None => { 108 - cache.insert( 109 - id, 110 - CachedSegmentHandle { 111 - fd, 112 - sealed: false, 113 - writable: false, 114 - }, 115 - ); 116 - Ok(fd) 130 + let handle = Arc::new(CachedSegmentHandle { 131 + fd, 132 + io: Arc::clone(&self.io), 133 + sealed: AtomicBool::new(false), 134 + writable: false, 135 + }); 136 + cache.insert(id, Arc::clone(&handle)); 137 + Ok(handle) 117 138 } 118 139 } 119 140 } 120 141 121 - pub fn open_for_append(&self, id: SegmentId) -> io::Result<FileId> { 142 + pub fn open_for_append(&self, id: SegmentId) -> io::Result<Arc<CachedSegmentHandle<S>>> { 122 143 { 123 144 let cache = self.handles.read(); 124 145 if let Some(entry) = cache.get(&id) { 125 - if entry.sealed { 146 + if entry.is_sealed() { 126 147 return Err(io::Error::new( 127 148 io::ErrorKind::InvalidInput, 128 149 format!("cannot append to sealed segment {id}"), 129 150 )); 130 151 } 131 152 if entry.writable { 132 - return Ok(entry.fd); 153 + return Ok(Arc::clone(entry)); 133 154 } 134 155 } 135 156 } 136 157 let path = self.segment_path(id); 137 158 let fd = self.io.open(&path, OpenOptions::read_write())?; 138 159 let mut cache = self.handles.write(); 139 - match cache.get(&id) { 140 - Some(entry) if entry.sealed => { 160 + match cache.get(&id).cloned() { 161 + Some(entry) if entry.is_sealed() => { 141 162 let _ = self.io.close(fd); 142 163 Err(io::Error::new( 143 164 io::ErrorKind::InvalidInput, ··· 146 167 } 147 168 Some(entry) if entry.writable => { 148 169 let _ = self.io.close(fd); 149 - Ok(entry.fd) 170 + Ok(entry) 150 171 } 151 - Some(entry) => { 152 - let old_fd = entry.fd; 153 - cache.insert( 154 - id, 155 - CachedSegmentHandle { 156 - fd, 157 - sealed: false, 158 - writable: true, 159 - }, 160 - ); 161 - let _ = self.io.close(old_fd); 162 - Ok(fd) 163 - } 164 - None => { 165 - cache.insert( 166 - id, 167 - CachedSegmentHandle { 168 - fd, 169 - sealed: false, 170 - writable: true, 171 - }, 172 - ); 173 - Ok(fd) 172 + _ => { 173 + let handle = Arc::new(CachedSegmentHandle { 174 + fd, 175 + io: Arc::clone(&self.io), 176 + sealed: AtomicBool::new(false), 177 + writable: true, 178 + }); 179 + cache.insert(id, Arc::clone(&handle)); 180 + Ok(handle) 174 181 } 175 182 } 176 183 } ··· 179 186 position.raw() >= self.max_segment_size 180 187 } 181 188 182 - pub fn prepare_rotation(&self, current_id: SegmentId) -> io::Result<(SegmentId, FileId)> { 189 + pub fn prepare_rotation( 190 + &self, 191 + current_id: SegmentId, 192 + ) -> io::Result<(SegmentId, Arc<CachedSegmentHandle<S>>)> { 183 193 let next = current_id.next(); 184 194 let path = self.segment_path(next); 185 195 let fd = self.io.open(&path, OpenOptions::read_write())?; 186 196 self.io.truncate(fd, 0)?; 187 197 self.io.sync_dir(&self.segments_dir)?; 188 - Ok((next, fd)) 198 + let handle = Arc::new(CachedSegmentHandle { 199 + fd, 200 + io: Arc::clone(&self.io), 201 + sealed: AtomicBool::new(false), 202 + writable: true, 203 + }); 204 + Ok((next, handle)) 189 205 } 190 206 191 - pub fn commit_rotation(&self, new_id: SegmentId, fd: FileId) { 192 - self.handles.write().insert( 193 - new_id, 194 - CachedSegmentHandle { 195 - fd, 196 - sealed: false, 197 - writable: true, 198 - }, 199 - ); 207 + pub fn commit_rotation(&self, new_id: SegmentId, handle: &Arc<CachedSegmentHandle<S>>) { 208 + self.handles.write().insert(new_id, Arc::clone(handle)); 200 209 } 201 210 202 211 pub fn seal_segment(&self, id: SegmentId, index: &SegmentIndex) -> io::Result<()> { 203 212 let path = self.index_path(id); 204 - index.save(&self.io, &path)?; 205 - let mut cache = self.handles.write(); 206 - let entry = cache.get_mut(&id).ok_or_else(|| { 213 + index.save(self.io.as_ref(), &path)?; 214 + let cache = self.handles.read(); 215 + let entry = cache.get(&id).ok_or_else(|| { 207 216 io::Error::new( 208 217 io::ErrorKind::InvalidInput, 209 218 format!("seal_segment: segment {id} not in handle cache"), 210 219 ) 211 220 })?; 212 - entry.sealed = true; 221 + entry.sealed.store(true, Ordering::Release); 213 222 Ok(()) 214 223 } 215 224 ··· 217 226 self.handles 218 227 .read() 219 228 .get(&id) 220 - .is_some_and(|entry| entry.sealed) 229 + .is_some_and(|entry| entry.is_sealed()) 221 230 } 222 231 223 - pub fn rollback_rotation(&self, new_id: SegmentId, fd: FileId) { 224 - let _ = self.io.close(fd); 232 + pub fn rollback_rotation(&self, new_id: SegmentId) { 225 233 self.handles.write().remove(&new_id); 226 234 let _ = self.io.delete(&self.segment_path(new_id)); 227 235 } 228 236 229 237 pub fn delete_segment(&self, id: SegmentId) -> io::Result<()> { 230 - { 231 - let mut cache = self.handles.write(); 232 - if let Some(entry) = cache.remove(&id) { 233 - let _ = self.io.close(entry.fd); 234 - } 235 - } 238 + self.handles.write().remove(&id); 236 239 [self.index_path(id), self.sidecar_path(id)] 237 240 .iter() 238 241 .try_for_each(|path| match self.io.delete(path) { ··· 255 258 } 256 259 257 260 pub fn shutdown(&self) { 258 - self.handles.write().drain().for_each(|(_, handle)| { 259 - let _ = self.io.close(handle.fd); 260 - }); 261 - } 262 - } 263 - 264 - impl<S: StorageIO> Drop for SegmentManager<S> { 265 - fn drop(&mut self) { 266 - self.shutdown(); 261 + self.handles.write().clear(); 267 262 } 268 263 } 269 264 ··· 329 324 #[test] 330 325 fn open_for_append_creates_file() { 331 326 let mgr = setup_manager(1024); 332 - let fd = mgr.open_for_append(SegmentId::new(1)).unwrap(); 327 + let fd = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 333 328 assert_eq!(mgr.io().file_size(fd).unwrap(), 0); 334 329 } 335 330 ··· 342 337 #[test] 343 338 fn handle_cache_returns_same_fd() { 344 339 let mgr = setup_manager(1024); 345 - let fd1 = mgr.open_for_append(SegmentId::new(1)).unwrap(); 346 - let fd2 = mgr.open_for_append(SegmentId::new(1)).unwrap(); 340 + let fd1 = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 341 + let fd2 = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 347 342 assert_eq!(fd1, fd2); 348 343 } 349 344 350 345 #[test] 351 346 fn open_for_read_uses_cache_from_append() { 352 347 let mgr = setup_manager(1024); 353 - let fd_write = mgr.open_for_append(SegmentId::new(1)).unwrap(); 354 - let fd_read = mgr.open_for_read(SegmentId::new(1)).unwrap(); 348 + let fd_write = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 349 + let fd_read = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 355 350 assert_eq!(fd_write, fd_read); 356 351 } 357 352 358 353 #[test] 359 354 fn list_segments_finds_segment_files() { 360 355 let mgr = setup_manager(1024); 361 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 362 - mgr.open_for_append(SegmentId::new(3)).unwrap(); 356 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 357 + mgr.open_for_append(SegmentId::new(3)).unwrap().fd(); 363 358 364 359 let segments = mgr.list_segments().unwrap(); 365 360 assert_eq!(segments, vec![SegmentId::new(1), SegmentId::new(3)]); ··· 368 363 #[test] 369 364 fn list_segments_ignores_non_segment_files() { 370 365 let mgr = setup_manager(1024); 371 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 366 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 372 367 mgr.io() 373 368 .open(Path::new("/segments/notes.txt"), OpenOptions::read_write()) 374 369 .unwrap(); ··· 380 375 #[test] 381 376 fn list_segments_ignores_index_files() { 382 377 let mgr = setup_manager(1024); 383 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 378 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 384 379 mgr.io() 385 380 .open( 386 381 Path::new("/segments/00000001.tqi"), ··· 395 390 #[test] 396 391 fn list_segments_sorted_ascending() { 397 392 let mgr = setup_manager(1024); 398 - mgr.open_for_append(SegmentId::new(5)).unwrap(); 399 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 400 - mgr.open_for_append(SegmentId::new(3)).unwrap(); 393 + mgr.open_for_append(SegmentId::new(5)).unwrap().fd(); 394 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 395 + mgr.open_for_append(SegmentId::new(3)).unwrap().fd(); 401 396 402 397 let segments = mgr.list_segments().unwrap(); 403 398 assert_eq!( ··· 418 413 #[test] 419 414 fn rotation_lifecycle_prepare_commit() { 420 415 let mgr = setup_manager(1024); 421 - let _fd0 = mgr.open_for_append(SegmentId::new(1)).unwrap(); 422 - let (next_id, next_fd) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 416 + let _h0 = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 417 + let (next_id, next_handle) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 423 418 assert_eq!(next_id, SegmentId::new(2)); 424 - assert_eq!(mgr.io().file_size(next_fd).unwrap(), 0); 425 - mgr.commit_rotation(next_id, next_fd); 426 - assert_eq!(mgr.open_for_read(next_id).unwrap(), next_fd); 419 + assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0); 420 + mgr.commit_rotation(next_id, &next_handle); 421 + assert_eq!( 422 + mgr.open_for_read(next_id).unwrap().fd(), 423 + next_handle.fd() 424 + ); 427 425 } 428 426 429 427 #[test] 430 428 fn rotation_rollback_cleans_up() { 431 429 let mgr = setup_manager(1024); 432 - let _fd0 = mgr.open_for_append(SegmentId::new(1)).unwrap(); 433 - let (next_id, next_fd) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 434 - mgr.commit_rotation(next_id, next_fd); 430 + let _h0 = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 431 + let (next_id, next_handle) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 432 + mgr.commit_rotation(next_id, &next_handle); 435 433 436 - assert_eq!(mgr.open_for_read(next_id).unwrap(), next_fd); 437 - mgr.rollback_rotation(next_id, next_fd); 434 + assert_eq!( 435 + mgr.open_for_read(next_id).unwrap().fd(), 436 + next_handle.fd() 437 + ); 438 + drop(next_handle); 439 + mgr.rollback_rotation(next_id); 438 440 439 441 let segments = mgr.list_segments().unwrap(); 440 442 assert_eq!(segments, vec![SegmentId::new(1)]); ··· 443 445 #[test] 444 446 fn seal_segment_persists_index_and_marks_sealed() { 445 447 let mgr = setup_manager(64 * 1024); 446 - let fd = mgr.open_for_append(SegmentId::new(1)).unwrap(); 448 + let fd = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 447 449 let mut writer = SegmentWriter::new( 448 450 mgr.io(), 449 451 fd, ··· 476 478 #[test] 477 479 fn delete_segment_removes_files_and_handle() { 478 480 let mgr = setup_manager(64 * 1024); 479 - let fd = mgr.open_for_append(SegmentId::new(1)).unwrap(); 481 + let fd = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 480 482 let mut writer = SegmentWriter::new( 481 483 mgr.io(), 482 484 fd, ··· 507 509 let mgr = setup_manager(1024); 508 510 assert_eq!(mgr.oldest_segment().unwrap(), None); 509 511 510 - mgr.open_for_append(SegmentId::new(3)).unwrap(); 511 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 512 - mgr.open_for_append(SegmentId::new(5)).unwrap(); 512 + mgr.open_for_append(SegmentId::new(3)).unwrap().fd(); 513 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 514 + mgr.open_for_append(SegmentId::new(5)).unwrap().fd(); 513 515 514 516 assert_eq!(mgr.oldest_segment().unwrap(), Some(SegmentId::new(1))); 515 517 } ··· 524 526 fn rotate_and_write_across_segments() { 525 527 let mgr = setup_manager(1024); 526 528 527 - let fd1 = mgr.open_for_append(SegmentId::new(1)).unwrap(); 529 + let fd1 = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 528 530 let mut writer1 = SegmentWriter::new( 529 531 mgr.io(), 530 532 fd1, ··· 538 540 .unwrap(); 539 541 writer1.sync(mgr.io()).unwrap(); 540 542 541 - let (id2, fd2) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 542 - mgr.commit_rotation(id2, fd2); 543 + let (id2, handle2) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 544 + let fd2 = handle2.fd(); 545 + mgr.commit_rotation(id2, &handle2); 543 546 544 547 let mut writer2 = 545 548 SegmentWriter::new(mgr.io(), fd2, id2, EventSequence::new(2), MAX_EVENT_PAYLOAD) ··· 549 552 .unwrap(); 550 553 writer2.sync(mgr.io()).unwrap(); 551 554 552 - let fd1_read = mgr.open_for_read(SegmentId::new(1)).unwrap(); 555 + let fd1_read = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 553 556 let events1 = crate::eventlog::SegmentReader::open(mgr.io(), fd1_read, MAX_EVENT_PAYLOAD) 554 557 .unwrap() 555 558 .valid_prefix() ··· 557 560 assert_eq!(events1.len(), 1); 558 561 assert_eq!(events1[0].payload, b"first segment"); 559 562 560 - let fd2_read = mgr.open_for_read(id2).unwrap(); 563 + let fd2_read = mgr.open_for_read(id2).unwrap().fd(); 561 564 let events2 = crate::eventlog::SegmentReader::open(mgr.io(), fd2_read, MAX_EVENT_PAYLOAD) 562 565 .unwrap() 563 566 .valid_prefix() ··· 569 572 #[test] 570 573 fn seal_then_append_errors() { 571 574 let mgr = setup_manager(64 * 1024); 572 - let fd = mgr.open_for_append(SegmentId::new(1)).unwrap(); 575 + let fd = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 573 576 SegmentWriter::new( 574 577 mgr.io(), 575 578 fd, ··· 596 599 #[test] 597 600 fn multiple_deletions_increment_epoch() { 598 601 let mgr = setup_manager(1024); 599 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 600 - mgr.open_for_append(SegmentId::new(2)).unwrap(); 601 - mgr.open_for_append(SegmentId::new(3)).unwrap(); 602 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 603 + mgr.open_for_append(SegmentId::new(2)).unwrap().fd(); 604 + mgr.open_for_append(SegmentId::new(3)).unwrap().fd(); 602 605 603 606 assert_eq!(mgr.retention_epoch(), 0); 604 607 mgr.delete_segment(SegmentId::new(1)).unwrap(); ··· 610 613 #[test] 611 614 fn open_for_read_does_not_infer_sealed_from_index_file() { 612 615 let mgr = setup_manager(64 * 1024); 613 - let fd = mgr.open_for_append(SegmentId::new(1)).unwrap(); 616 + let fd = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 614 617 let mut writer = SegmentWriter::new( 615 618 mgr.io(), 616 619 fd, ··· 630 633 631 634 mgr.handles.write().remove(&SegmentId::new(1)); 632 635 633 - let _read_fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 636 + let _read_fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 634 637 assert!(!mgr.is_sealed(SegmentId::new(1))); 635 638 } 636 639 637 640 #[test] 638 641 fn open_for_read_unsealed_allows_append() { 639 642 let mgr = setup_manager(1024); 640 - let _fd = mgr.open_for_append(SegmentId::new(1)).unwrap(); 643 + let _fd = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 641 644 642 645 mgr.handles.write().remove(&SegmentId::new(1)); 643 646 644 - let _read_fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 647 + let _read_fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 645 648 assert!(!mgr.is_sealed(SegmentId::new(1))); 646 649 } 647 650 648 651 #[test] 649 652 fn shutdown_clears_handles() { 650 653 let mgr = setup_manager(1024); 651 - mgr.open_for_append(SegmentId::new(1)).unwrap(); 652 - mgr.open_for_append(SegmentId::new(2)).unwrap(); 654 + mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 655 + mgr.open_for_append(SegmentId::new(2)).unwrap().fd(); 653 656 654 657 mgr.shutdown(); 655 658 assert!(mgr.handles.read().is_empty()); ··· 665 668 #[test] 666 669 fn prepare_rotation_truncates_stale_file() { 667 670 let mgr = setup_manager(1024); 668 - let _fd0 = mgr.open_for_append(SegmentId::new(1)).unwrap(); 671 + let _fd0 = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 669 672 670 673 let stale_path = mgr.segment_path(SegmentId::new(2)); 671 674 let stale_fd = mgr ··· 677 680 assert_eq!(mgr.io().file_size(stale_fd).unwrap(), 4096); 678 681 mgr.io().close(stale_fd).unwrap(); 679 682 680 - let (next_id, next_fd) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 683 + let (next_id, next_handle) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); 681 684 assert_eq!(next_id, SegmentId::new(2)); 682 - assert_eq!(mgr.io().file_size(next_fd).unwrap(), 0); 685 + assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0); 683 686 } 684 687 685 688 #[test] 686 689 fn open_for_append_upgrades_read_only_handle() { 687 690 let mgr = setup_manager(1024); 688 - let fd_append = mgr.open_for_append(SegmentId::new(1)).unwrap(); 691 + let fd_append = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 689 692 690 693 mgr.handles.write().remove(&SegmentId::new(1)); 691 694 692 - let fd_read = mgr.open_for_read(SegmentId::new(1)).unwrap(); 695 + let fd_read = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 693 696 assert_ne!(fd_read, fd_append); 694 697 assert!(!mgr.handles.read().get(&SegmentId::new(1)).unwrap().writable); 695 698 696 - let fd_upgraded = mgr.open_for_append(SegmentId::new(1)).unwrap(); 699 + let fd_upgraded = mgr.open_for_append(SegmentId::new(1)).unwrap().fd(); 697 700 assert_ne!(fd_upgraded, fd_read); 698 701 assert!(mgr.handles.read().get(&SegmentId::new(1)).unwrap().writable); 699 702 }
+2 -2
crates/tranquil-store/src/eventlog/mod.rs
··· 411 411 pub fn disk_usage(&self) -> io::Result<u64> { 412 412 let segments = self.manager.list_segments()?; 413 413 segments.iter().try_fold(0u64, |acc, &id| { 414 - let fd = self.manager.open_for_read(id)?; 415 - let size = self.manager.io().file_size(fd)?; 414 + let handle = self.manager.open_for_read(id)?; 415 + let size = self.manager.io().file_size(handle.fd())?; 416 416 Ok(acc.saturating_add(size)) 417 417 }) 418 418 }
+9 -9
crates/tranquil-store/src/eventlog/reader.rs
··· 145 145 } 146 146 147 147 fn rebuild_index(&self, segment_id: SegmentId) -> io::Result<SegmentIndex> { 148 - let fd = self.manager.open_for_read(segment_id)?; 148 + let handle = self.manager.open_for_read(segment_id)?; 149 149 let (idx, _) = rebuild_from_segment( 150 150 self.manager.io(), 151 - fd, 151 + handle.fd(), 152 152 DEFAULT_INDEX_INTERVAL, 153 153 self.max_payload, 154 154 )?; ··· 243 243 return Ok(Arc::clone(m)); 244 244 } 245 245 246 - let fd = self.manager.open_for_read(segment_id)?; 247 - let mapped = self.manager.io().mmap_file(fd)?; 246 + let handle = self.manager.open_for_read(segment_id)?; 247 + let mapped = self.manager.io().mmap_file(handle.fd())?; 248 248 let arc = Arc::new(mapped); 249 249 self.mmaps.write().insert(segment_id, Arc::clone(&arc)); 250 250 Ok(arc) ··· 269 269 predicate, 270 270 ) 271 271 } else { 272 - let fd = self.manager.open_for_read(segment_id)?; 273 - let file_size = self.manager.io().file_size(fd)?; 272 + let handle = self.manager.open_for_read(segment_id)?; 273 + let file_size = self.manager.io().file_size(handle.fd())?; 274 274 self.scan_direct( 275 - fd, 275 + handle.fd(), 276 276 file_size, 277 277 start_offset, 278 278 start_seq, ··· 513 513 } 514 514 515 515 fn rebuild_sidecar(&self, segment_id: SegmentId) -> io::Result<SidecarIndex> { 516 - let fd = self.manager.open_for_read(segment_id)?; 517 - let sidecar = build_sidecar_from_segment(self.manager.io(), fd, self.max_payload)?; 516 + let handle = self.manager.open_for_read(segment_id)?; 517 + let sidecar = build_sidecar_from_segment(self.manager.io(), handle.fd(), self.max_payload)?; 518 518 let _ = sidecar.save(self.manager.io(), &self.manager.sidecar_path(segment_id)); 519 519 Ok(sidecar) 520 520 }
+17 -15
crates/tranquil-store/src/eventlog/writer.rs
··· 66 66 index_interval: usize, 67 67 max_payload: u32, 68 68 ) -> io::Result<Self> { 69 - let fd = manager.open_for_append(segment_id)?; 70 - manager.io().truncate(fd, 0)?; 71 - let writer = SegmentWriter::new(manager.io(), fd, segment_id, next_seq, max_payload)?; 69 + let handle = manager.open_for_append(segment_id)?; 70 + manager.io().truncate(handle.fd(), 0)?; 71 + let writer = SegmentWriter::new(manager.io(), handle.fd(), segment_id, next_seq, max_payload)?; 72 72 writer.sync(manager.io())?; 73 73 manager.io().sync_dir(manager.segments_dir())?; 74 74 ··· 93 93 index_interval: usize, 94 94 max_payload: u32, 95 95 ) -> io::Result<Self> { 96 - let fd = manager.open_for_append(active_id)?; 96 + let handle = manager.open_for_append(active_id)?; 97 + let fd = handle.fd(); 97 98 98 99 let (index, last_seq_in_active) = match rebuild_from_segment( 99 100 manager.io(), ··· 308 309 Err(e) => warn!(segment = %old_id, error = %e, "non-fatal sidecar build failure"), 309 310 } 310 311 311 - let (new_id, new_fd) = self.manager.prepare_rotation(old_id)?; 312 + let (new_id, new_handle) = self.manager.prepare_rotation(old_id)?; 312 313 313 314 match SegmentWriter::new::<S>( 314 315 self.manager.io(), 315 - new_fd, 316 + new_handle.fd(), 316 317 new_id, 317 318 self.next_seq, 318 319 self.max_payload, ··· 322 323 self.active_index = SegmentIndex::new(); 323 324 self.event_count_in_segment = 0; 324 325 self.last_event_offset = None; 325 - self.manager.commit_rotation(new_id, new_fd); 326 + self.manager.commit_rotation(new_id, &new_handle); 326 327 Ok(Some(old_id)) 327 328 } 328 329 Err(e) => { 329 - self.manager.rollback_rotation(new_id, new_fd); 330 + drop(new_handle); 331 + self.manager.rollback_rotation(new_id); 330 332 Err(e) 331 333 } 332 334 } ··· 361 363 } 362 364 363 365 fn build_sidecar_for_segment(&self, segment_id: SegmentId) -> io::Result<()> { 364 - let fd = self.manager.open_for_read(segment_id)?; 365 - let sidecar = build_sidecar_from_segment(self.manager.io(), fd, self.max_payload)?; 366 + let handle = self.manager.open_for_read(segment_id)?; 367 + let sidecar = build_sidecar_from_segment(self.manager.io(), handle.fd(), self.max_payload)?; 366 368 let path = self.manager.sidecar_path(segment_id); 367 369 sidecar.save(self.manager.io(), &path) 368 370 } ··· 397 399 Ok(Some(idx)) => Ok(idx.last_seq()), 398 400 Err(e) if e.kind() != io::ErrorKind::InvalidData => Err(e), 399 401 _ => { 400 - let fd = manager.open_for_read(seg_id)?; 402 + let handle = manager.open_for_read(seg_id)?; 401 403 let (_, last_seq) = 402 - rebuild_from_segment(manager.io(), fd, DEFAULT_INDEX_INTERVAL, max_payload)?; 404 + rebuild_from_segment(manager.io(), handle.fd(), DEFAULT_INDEX_INTERVAL, max_payload)?; 403 405 Ok(last_seq) 404 406 } 405 407 } ··· 551 553 assert_eq!(writer.synced_seq(), EventSequence::new(5)); 552 554 assert_eq!(writer.active_segment_id(), SegmentId::new(1)); 553 555 554 - let fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 556 + let fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 555 557 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 556 558 .unwrap() 557 559 .valid_prefix() ··· 796 798 .unwrap(); 797 799 assert_eq!(writer.next_seq, EventSequence::new(3)); 798 800 799 - let fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 801 + let fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 800 802 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 801 803 .unwrap() 802 804 .valid_prefix() ··· 1003 1005 assert_eq!(seq, EventSequence::new(4)); 1004 1006 writer.sync().unwrap(); 1005 1007 1006 - let fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 1008 + let fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 1007 1009 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 1008 1010 .unwrap() 1009 1011 .valid_prefix()
+3 -3
crates/tranquil-store/src/gauntlet/runner.rs
··· 595 595 let mut segment_last_ts: Vec<(SegmentId, u64)> = Vec::new(); 596 596 segments.iter().for_each(|&id| { 597 597 let per_segment: Vec<ValidEvent> = match s.manager.open_for_read(id) { 598 - Ok(fd) => match SegmentReader::open(s.manager.io(), fd, MAX_EVENT_PAYLOAD) { 598 + Ok(handle) => match SegmentReader::open(s.manager.io(), handle.fd(), MAX_EVENT_PAYLOAD) { 599 599 Ok(reader) => reader.valid_prefix().unwrap_or_default(), 600 600 Err(_) => Vec::new(), 601 601 }, ··· 1108 1108 manager: &SegmentManager<S>, 1109 1109 id: SegmentId, 1110 1110 ) -> std::io::Result<Option<u64>> { 1111 - let fd = manager.open_for_read(id)?; 1112 - let reader = SegmentReader::open(manager.io(), fd, MAX_EVENT_PAYLOAD)?; 1111 + let handle = manager.open_for_read(id)?; 1112 + let reader = SegmentReader::open(manager.io(), handle.fd(), MAX_EVENT_PAYLOAD)?; 1113 1113 let events = reader.valid_prefix()?; 1114 1114 Ok(events.last().map(|e: &ValidEvent| e.timestamp.raw())) 1115 1115 }
+8 -8
crates/tranquil-store/tests/eventlog_crash.rs
··· 52 52 "seed {seed}: expected all synced events to survive" 53 53 ); 54 54 55 - let fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 55 + let fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 56 56 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 57 57 .unwrap() 58 58 .valid_prefix() ··· 181 181 mgr.io().sync_dir(Path::new("/segments")).unwrap(); 182 182 } 183 183 184 - let fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 184 + let fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 185 185 let file_size = mgr.io().file_size(fd).unwrap(); 186 186 let partial_bytes = ((seed % 20) + 1) as usize; 187 187 let junk: Vec<u8> = (0..partial_bytes) ··· 252 252 ); 253 253 254 254 sealed_segments[..sealed_count].iter().for_each(|&seg_id| { 255 - let fd = mgr.open_for_read(seg_id).unwrap(); 255 + let fd = mgr.open_for_read(seg_id).unwrap().fd(); 256 256 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 257 257 .unwrap() 258 258 .valid_prefix() ··· 342 342 let index_path = mgr.index_path(SegmentId::new(1)); 343 343 let _ = mgr.io().delete(&index_path); 344 344 345 - let fd = mgr.open_for_read(SegmentId::new(1)).unwrap(); 345 + let fd = mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 346 346 347 347 let start = std::time::Instant::now(); 348 348 let (index, last_seq) = rebuild_from_segment(mgr.io(), fd, 256, MAX_EVENT_PAYLOAD).unwrap(); ··· 417 417 } 418 418 pristine_mgr.shutdown(); 419 419 420 - let pristine_fd = pristine_mgr.open_for_read(SegmentId::new(1)).unwrap(); 420 + let pristine_fd = pristine_mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 421 421 let pristine_events = 422 422 SegmentReader::open(pristine_mgr.io(), pristine_fd, MAX_EVENT_PAYLOAD) 423 423 .unwrap() ··· 465 465 return Ok(None); 466 466 } 467 467 468 - let fd = faulty_clone.open_for_read(SegmentId::new(1))?; 468 + let fd = faulty_clone.open_for_read(SegmentId::new(1))?.fd(); 469 469 let events = SegmentReader::open(faulty_clone.io(), fd, MAX_EVENT_PAYLOAD)? 470 470 .valid_prefix()?; 471 471 Ok(Some(events)) ··· 592 592 } 593 593 pristine_mgr.shutdown(); 594 594 595 - let pristine_fd = pristine_mgr.open_for_read(SegmentId::new(1)).unwrap(); 595 + let pristine_fd = pristine_mgr.open_for_read(SegmentId::new(1)).unwrap().fd(); 596 596 let pristine_events = SegmentReader::open(pristine_mgr.io(), pristine_fd, MAX_EVENT_PAYLOAD) 597 597 .unwrap() 598 598 .valid_prefix() ··· 637 637 return Ok(None); 638 638 } 639 639 640 - let fd = faulty_clone.open_for_read(SegmentId::new(1))?; 640 + let fd = faulty_clone.open_for_read(SegmentId::new(1))?.fd(); 641 641 let events = SegmentReader::open(faulty_clone.io(), fd, MAX_EVENT_PAYLOAD)? 642 642 .valid_prefix()?; 643 643 Ok(Some(events))
+53
crates/tranquil-store/tests/eventlog_manager_race.rs
··· 1 + use std::path::PathBuf; 2 + use std::sync::Arc; 3 + 4 + use tranquil_store::SimulatedIO; 5 + use tranquil_store::StorageIO; 6 + use tranquil_store::eventlog::{SegmentId, SegmentManager}; 7 + 8 + #[test] 9 + fn concurrent_reader_survives_evict_on_segment_delete() { 10 + let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::pristine(0x1eed7a11)); 11 + let segments_dir = PathBuf::from("/segments"); 12 + 13 + let manager = Arc::new( 14 + SegmentManager::new(Arc::clone(&sim), segments_dir.clone(), 1 << 20).unwrap(), 15 + ); 16 + 17 + let seg_id = SegmentId::new(1); 18 + 19 + let write_handle = manager.open_for_append(seg_id).unwrap(); 20 + sim.write_at(write_handle.fd(), 0, b"arbitrary seed bytes for the segment") 21 + .unwrap(); 22 + sim.sync(write_handle.fd()).unwrap(); 23 + sim.sync_dir(&segments_dir).unwrap(); 24 + drop(write_handle); 25 + 26 + let ready_to_evict = Arc::new(std::sync::Barrier::new(2)); 27 + let evict_done = Arc::new(std::sync::Barrier::new(2)); 28 + 29 + let reader_manager = Arc::clone(&manager); 30 + let reader_io = Arc::clone(&sim); 31 + let reader_ready = Arc::clone(&ready_to_evict); 32 + let reader_done = Arc::clone(&evict_done); 33 + 34 + let reader = std::thread::spawn(move || { 35 + let read_handle = reader_manager.open_for_read(seg_id).unwrap(); 36 + reader_ready.wait(); 37 + reader_done.wait(); 38 + reader_io.file_size(read_handle.fd()) 39 + }); 40 + 41 + ready_to_evict.wait(); 42 + manager.delete_segment(seg_id).unwrap(); 43 + evict_done.wait(); 44 + 45 + let read_result = reader.join().unwrap(); 46 + assert!( 47 + read_result.is_ok(), 48 + "read against a FileId obtained before delete_segment must still succeed; \ 49 + SegmentManager's delete_segment / rollback_rotation close the fd while a reader holds it. \ 50 + error: {:?}", 51 + read_result.err() 52 + ); 53 + }
+2 -2
crates/tranquil-store/tests/eventlog_properties.rs
··· 611 611 SegmentManager::new(Arc::clone(&sim), PathBuf::from("/segments"), 64 * 1024).unwrap(), 612 612 ); 613 613 614 - let block_fd = block_mgr.open_for_append(DataFileId::new(0)).unwrap(); 614 + let block_handle = block_mgr.open_for_append(DataFileId::new(0)).unwrap(); 615 615 let mut block_writer = 616 - DataFileWriter::new(block_mgr.io(), block_fd, DataFileId::new(0)).unwrap(); 616 + DataFileWriter::new(block_mgr.io(), block_handle.fd(), DataFileId::new(0)).unwrap(); 617 617 let cid = test_cid(1); 618 618 let _ = block_writer.append_block(&cid, &[0xAA; 128]).unwrap(); 619 619 block_writer.sync().unwrap();
+234
crates/tranquil-store/tests/rotation_robustness.rs
··· 1 + mod common; 2 + 3 + use std::collections::HashMap; 4 + use std::io; 5 + use std::path::{Path, PathBuf}; 6 + use std::sync::{Arc, Mutex}; 7 + use std::sync::atomic::{AtomicBool, Ordering}; 8 + 9 + use tranquil_store::blockstore::{ 10 + BlockStoreConfig, DataFileId, DataFileManager, DataFileWriter, GroupCommitConfig, 11 + TranquilBlockStore, 12 + }; 13 + use tranquil_store::{FileId, MappedFile, OpenOptions, RealIO, SimulatedIO, StorageIO}; 14 + 15 + use common::{test_cid, with_runtime}; 16 + 17 + struct FailSpec { 18 + target_path: Mutex<Option<PathBuf>>, 19 + armed: AtomicBool, 20 + tripped: AtomicBool, 21 + } 22 + 23 + impl FailSpec { 24 + fn new() -> Self { 25 + Self { 26 + target_path: Mutex::new(None), 27 + armed: AtomicBool::new(false), 28 + tripped: AtomicBool::new(false), 29 + } 30 + } 31 + 32 + fn arm_fail_first_sync_on(&self, path: &Path) { 33 + *self.target_path.lock().unwrap() = Some(path.to_path_buf()); 34 + self.tripped.store(false, Ordering::SeqCst); 35 + self.armed.store(true, Ordering::SeqCst); 36 + } 37 + 38 + fn fired(&self) -> bool { 39 + self.tripped.load(Ordering::SeqCst) 40 + } 41 + } 42 + 43 + struct FailingIO { 44 + inner: RealIO, 45 + spec: Arc<FailSpec>, 46 + fd_to_path: Mutex<HashMap<FileId, PathBuf>>, 47 + } 48 + 49 + impl FailingIO { 50 + fn new(spec: Arc<FailSpec>) -> Self { 51 + Self { 52 + inner: RealIO::new(), 53 + spec, 54 + fd_to_path: Mutex::new(HashMap::new()), 55 + } 56 + } 57 + } 58 + 59 + impl StorageIO for FailingIO { 60 + fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> { 61 + let fd = self.inner.open(path, opts)?; 62 + self.fd_to_path.lock().unwrap().insert(fd, path.to_path_buf()); 63 + Ok(fd) 64 + } 65 + 66 + fn close(&self, fd: FileId) -> io::Result<()> { 67 + self.fd_to_path.lock().unwrap().remove(&fd); 68 + self.inner.close(fd) 69 + } 70 + 71 + fn read_at(&self, fd: FileId, offset: u64, buf: &mut [u8]) -> io::Result<usize> { 72 + self.inner.read_at(fd, offset, buf) 73 + } 74 + 75 + fn write_at(&self, fd: FileId, offset: u64, buf: &[u8]) -> io::Result<usize> { 76 + self.inner.write_at(fd, offset, buf) 77 + } 78 + 79 + fn sync(&self, fd: FileId) -> io::Result<()> { 80 + let should_fail = self.spec.armed.load(Ordering::SeqCst) 81 + && !self.spec.tripped.load(Ordering::SeqCst) 82 + && match ( 83 + self.fd_to_path.lock().unwrap().get(&fd).cloned(), 84 + self.spec.target_path.lock().unwrap().clone(), 85 + ) { 86 + (Some(fd_path), Some(target)) => fd_path == target, 87 + _ => false, 88 + }; 89 + match should_fail { 90 + true => { 91 + self.spec.tripped.store(true, Ordering::SeqCst); 92 + Err(io::Error::other("injected sync failure on target path")) 93 + } 94 + false => self.inner.sync(fd), 95 + } 96 + } 97 + 98 + fn file_size(&self, fd: FileId) -> io::Result<u64> { 99 + self.inner.file_size(fd) 100 + } 101 + 102 + fn truncate(&self, fd: FileId, size: u64) -> io::Result<()> { 103 + self.inner.truncate(fd, size) 104 + } 105 + 106 + fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { 107 + self.inner.rename(from, to) 108 + } 109 + 110 + fn delete(&self, path: &Path) -> io::Result<()> { 111 + self.inner.delete(path) 112 + } 113 + 114 + fn mkdir(&self, path: &Path) -> io::Result<()> { 115 + self.inner.mkdir(path) 116 + } 117 + 118 + fn sync_dir(&self, path: &Path) -> io::Result<()> { 119 + self.inner.sync_dir(path) 120 + } 121 + 122 + fn list_dir(&self, path: &Path) -> io::Result<Vec<PathBuf>> { 123 + self.inner.list_dir(path) 124 + } 125 + 126 + fn mmap_file(&self, fd: FileId) -> io::Result<MappedFile> { 127 + self.inner.mmap_file(fd) 128 + } 129 + } 130 + 131 + #[test] 132 + fn post_rotation_sync_failure_deletes_new_rotation_files() { 133 + with_runtime(|| { 134 + let dir = tempfile::TempDir::new().unwrap(); 135 + let data_dir = dir.path().join("data"); 136 + let index_dir = dir.path().join("index"); 137 + 138 + let spec = Arc::new(FailSpec::new()); 139 + let spec_for_factory = Arc::clone(&spec); 140 + 141 + let config = BlockStoreConfig { 142 + data_dir: data_dir.clone(), 143 + index_dir, 144 + max_file_size: 256, 145 + group_commit: GroupCommitConfig::default(), 146 + shard_count: 1, 147 + }; 148 + 149 + let store = TranquilBlockStore::<FailingIO>::open_with_io(config, move || { 150 + FailingIO::new(Arc::clone(&spec_for_factory)) 151 + }) 152 + .unwrap(); 153 + 154 + store 155 + .put_blocks_blocking(vec![(test_cid(1), vec![0xAA; 300])]) 156 + .expect("priming put succeeds"); 157 + 158 + let rotated_data_path = data_dir.join("000002.tqb"); 159 + let rotated_hint_path = data_dir.join("000002.tqh"); 160 + 161 + spec.arm_fail_first_sync_on(&rotated_data_path); 162 + 163 + let result = store.put_blocks_blocking(vec![(test_cid(2), vec![0xBB; 300])]); 164 + assert!( 165 + result.is_err(), 166 + "put_blocks_blocking should surface the injected post-rotation sync failure" 167 + ); 168 + assert!( 169 + spec.fired(), 170 + "injector never observed a sync on the rotated data file; timing changed" 171 + ); 172 + 173 + assert!( 174 + !rotated_data_path.exists(), 175 + "rotation rollback must delete the new data file after a post-write sync failure; \ 176 + leaked file at {rotated_data_path:?}" 177 + ); 178 + assert!( 179 + !rotated_hint_path.exists(), 180 + "rotation rollback must delete the new hint file after a post-write sync failure; \ 181 + leaked file at {rotated_hint_path:?}" 182 + ); 183 + }); 184 + } 185 + 186 + #[test] 187 + fn concurrent_reader_survives_evict_handle() { 188 + let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::pristine(0x13579bdf)); 189 + let data_dir = Path::new("/data"); 190 + sim.mkdir(data_dir).unwrap(); 191 + sim.sync_dir(data_dir).unwrap(); 192 + 193 + let manager = Arc::new(DataFileManager::new( 194 + Arc::clone(&sim), 195 + data_dir.to_path_buf(), 196 + 1 << 20, 197 + )); 198 + 199 + let file_id = DataFileId::new(0); 200 + let write_handle = manager.open_for_append(file_id).unwrap(); 201 + { 202 + let mut writer = DataFileWriter::new(&*sim, write_handle.fd(), file_id).unwrap(); 203 + let _ = writer.append_block(&test_cid(1), &vec![0x11; 128]).unwrap(); 204 + writer.sync().unwrap(); 205 + } 206 + drop(write_handle); 207 + 208 + let ready_to_evict = Arc::new(std::sync::Barrier::new(2)); 209 + let evict_done = Arc::new(std::sync::Barrier::new(2)); 210 + 211 + let reader_manager = Arc::clone(&manager); 212 + let reader_io = Arc::clone(&sim); 213 + let reader_ready = Arc::clone(&ready_to_evict); 214 + let reader_done = Arc::clone(&evict_done); 215 + 216 + let reader = std::thread::spawn(move || { 217 + let read_handle = reader_manager.open_for_read(file_id).unwrap(); 218 + reader_ready.wait(); 219 + reader_done.wait(); 220 + reader_io.file_size(read_handle.fd()) 221 + }); 222 + 223 + ready_to_evict.wait(); 224 + manager.evict_handle(file_id); 225 + evict_done.wait(); 226 + 227 + let read_result = reader.join().unwrap(); 228 + assert!( 229 + read_result.is_ok(), 230 + "read against a FileId obtained before evict_handle must still succeed; \ 231 + evict_handle closed the underlying fd while the reader held it. error: {:?}", 232 + read_result.err() 233 + ); 234 + }
+6 -3
crates/tranquil-store/tests/sim_blockstore.rs
··· 35 35 Arc::clone(&self.sim), 36 36 self.data_dir.to_path_buf(), 37 37 ); 38 - let fd = manager.open_for_append(file_id).unwrap(); 38 + let handle = manager.open_for_append(file_id).unwrap(); 39 + let fd = handle.fd(); 39 40 let file_size = self.sim.file_size(fd).unwrap(); 40 41 match file_size { 41 42 0 => { ··· 504 505 let manager = 505 506 DataFileManager::with_default_max_size(Arc::clone(&sim), data_dir.to_path_buf()); 506 507 507 - let Ok(fd) = manager.open_for_append(file_id) else { 508 + let Ok(handle) = manager.open_for_append(file_id) else { 508 509 return; 509 510 }; 511 + let fd = handle.fd(); 510 512 511 513 let writer_result = DataFileWriter::new(&*sim, fd, file_id); 512 514 let Ok(writer) = writer_result else { return }; ··· 515 517 return; 516 518 }; 517 519 let start_pos = writer.position(); 518 - let _ = sim.close(fd); 520 + drop(writer); 521 + drop(handle); 519 522 520 523 let mut rng = Rng::new(seed); 521 524 let block_count = (rng.range_u32(15) + 5) as u16;
+10 -9
crates/tranquil-store/tests/sim_eventlog.rs
··· 39 39 .flat_map(|&seg_id| { 40 40 let fd = mgr 41 41 .open_for_read(seg_id) 42 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read({seg_id}) failed: {e}")); 42 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read({seg_id}) failed: {e}")).fd(); 43 43 SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 44 44 .unwrap_or_else(|e| { 45 45 panic!( ··· 256 256 257 257 let seg2_fd = mgr 258 258 .open_for_read(SegmentId::new(2)) 259 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(2) failed: {e}")); 259 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(2) failed: {e}")).fd(); 260 260 let seg2_events = SegmentReader::open(mgr.io(), seg2_fd, MAX_EVENT_PAYLOAD) 261 261 .unwrap_or_else(|e| { 262 262 panic!("seed {seed}: SegmentReader::open(2, MAX_EVENT_PAYLOAD) failed: {e}") ··· 271 271 272 272 let seg3_fd = mgr 273 273 .open_for_read(SegmentId::new(3)) 274 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(3) failed: {e}")); 274 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(3) failed: {e}")).fd(); 275 275 let seg3_events = SegmentReader::open(mgr.io(), seg3_fd, MAX_EVENT_PAYLOAD) 276 276 .unwrap_or_else(|e| { 277 277 panic!("seed {seed}: SegmentReader::open(3, MAX_EVENT_PAYLOAD) failed: {e}") ··· 388 388 389 389 let fd = mgr 390 390 .open_for_read(SegmentId::new(1)) 391 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")); 391 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd(); 392 392 let recovered = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 393 393 .unwrap_or_else(|e| { 394 394 panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}") ··· 458 458 459 459 let fd = mgr 460 460 .open_for_read(SegmentId::new(1)) 461 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")); 461 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd(); 462 462 let recovered = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 463 463 .unwrap_or_else(|e| { 464 464 panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}") ··· 579 579 580 580 let fd = mgr 581 581 .open_for_read(SegmentId::new(1)) 582 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")); 582 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd(); 583 583 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 584 584 .unwrap_or_else(|e| { 585 585 panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}") ··· 652 652 653 653 let fd = mgr 654 654 .open_for_read(SegmentId::new(1)) 655 - .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")); 655 + .unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd(); 656 656 let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) 657 657 .unwrap_or_else(|e| { 658 658 panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}") ··· 938 938 939 939 let pristine_fd = pristine_mgr 940 940 .open_for_read(SegmentId::new(1)) 941 - .unwrap_or_else(|e| panic!("seed {seed}: pristine open_for_read(1) failed: {e}")); 941 + .unwrap_or_else(|e| panic!("seed {seed}: pristine open_for_read(1) failed: {e}")).fd(); 942 942 let pristine_events = SegmentReader::open( 943 943 pristine_mgr.io(), 944 944 pristine_fd, ··· 985 985 return; 986 986 } 987 987 988 - let Ok(fd) = mgr.open_for_read(SegmentId::new(1)) else { 988 + let Ok(handle) = mgr.open_for_read(SegmentId::new(1)) else { 989 989 return; 990 990 }; 991 + let fd = handle.fd(); 991 992 let Ok(reader) = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD) else { 992 993 return; 993 994 };
+6 -4
crates/tranquil-store/tests/verify_rollback_orphan.rs
··· 55 55 { 56 56 let io: Arc<RealIO> = Arc::new(RealIO::new()); 57 57 let manager = DataFileManager::new(Arc::clone(&io), data_dir.clone(), 4096); 58 - let (next_id, next_fd) = manager.prepare_rotation(DataFileId::new(0)).unwrap(); 59 - manager.commit_rotation(next_id, next_fd); 58 + let (next_id, next_handle) = manager.prepare_rotation(DataFileId::new(0)).unwrap(); 59 + manager.commit_rotation(next_id, &next_handle); 60 60 61 - let mut writer = DataFileWriter::new(&*io, next_fd, next_id).unwrap(); 61 + let mut writer = DataFileWriter::new(&*io, next_handle.fd(), next_id).unwrap(); 62 62 let _ = writer.append_block(&orphan_cid, &vec![0xAB; 256]).unwrap(); 63 63 writer.sync().unwrap(); 64 64 io.sync_dir(&data_dir).unwrap(); 65 65 66 66 let _ = io.delete(&hint_file_path(&data_dir, next_id)); 67 - manager.rollback_rotation(next_id, next_fd); 67 + drop(writer); 68 + drop(next_handle); 69 + manager.rollback_rotation(next_id); 68 70 } 69 71 70 72 let store = TranquilBlockStore::open(config).unwrap();