Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): bound writer fd usage across rotations

Lewis: May this revision serve well! <lu5a@proton.me>

+142 -12
+10 -5
crates/tranquil-store/src/blockstore/compaction.rs
··· 153 153 let mut live_count: u64 = 0; 154 154 let mut dead_count: u64 = 0; 155 155 156 - reader.try_for_each(|r| { 156 + let scan_result = reader.try_for_each(|r| { 157 157 let record = r?; 158 158 match record { 159 159 ReadBlockRecord::Valid { ··· 192 192 ReadBlockRecord::Corrupted { .. } | ReadBlockRecord::Truncated { .. } => {} 193 193 } 194 194 Ok::<_, CompactionError>(()) 195 - })?; 195 + }); 196 + 197 + let finalize_result = scan_result 198 + .and_then(|()| writer.sync().map_err(CompactionError::from)) 199 + .and_then(|()| hint_writer.sync().map_err(CompactionError::from)) 200 + .and_then(|()| manager.io().sync_dir(manager.data_dir()).map_err(CompactionError::from)); 201 + 202 + let _ = manager.io().close(hint_fd); 196 203 197 - writer.sync()?; 198 - hint_writer.sync()?; 199 - manager.io().sync_dir(manager.data_dir())?; 204 + finalize_result?; 200 205 201 206 let new_size = writer.position().raw(); 202 207
+25 -7
crates/tranquil-store/src/blockstore/group_commit.rs
··· 1054 1054 struct RotationState { 1055 1055 file_id: DataFileId, 1056 1056 fd: FileId, 1057 + hint_fd: FileId, 1057 1058 } 1058 1059 1059 1060 fn process_batch<S: StorageIO>( ··· 1071 1072 let mut all_decrements: Vec<[u8; CID_SIZE]> = Vec::new(); 1072 1073 1073 1074 let mut current_hint_fd = state.hint_fd; 1074 - let mut rotation: Option<RotationState> = None; 1075 + let mut rotations: Vec<RotationState> = Vec::new(); 1075 1076 1076 1077 let mut data_writer = 1077 1078 DataFileWriter::resume(manager.io(), state.fd, state.file_id, state.position); ··· 1124 1125 1125 1126 current_hint_fd = new_hint_fd; 1126 1127 hint_writer = HintFileWriter::new(manager.io(), new_hint_fd); 1127 - rotation = Some(RotationState { 1128 + rotations.push(RotationState { 1128 1129 file_id: next_id, 1129 1130 fd: next_fd, 1131 + hint_fd: new_hint_fd, 1130 1132 }); 1131 1133 } 1132 1134 ··· 1152 1154 }); 1153 1155 1154 1156 if let Err(e) = write_result { 1155 - if let Some(rot) = rotation { 1157 + rotations.into_iter().for_each(|rot| { 1156 1158 manager.rollback_rotation(rot.file_id, rot.fd); 1157 - } 1159 + let _ = manager.io().close(rot.hint_fd); 1160 + let _ = manager 1161 + .io() 1162 + .delete(&hint_file_path(manager.data_dir(), rot.file_id)); 1163 + }); 1158 1164 return Err(e); 1159 1165 } 1160 1166 ··· 1172 1178 hint_writer.sync()?; 1173 1179 let sync_nanos = t.elapsed().as_nanos() as u64; 1174 1180 1175 - if let Some(ref rot) = rotation { 1176 - manager.commit_rotation(rot.file_id, rot.fd); 1177 - ctx.active_files.register(ctx.shard_id, rot.file_id); 1181 + if !rotations.is_empty() { 1182 + let old_file_id = state.file_id; 1183 + let old_hint_fd = state.hint_fd; 1184 + let last_idx = rotations.len() - 1; 1185 + rotations.iter().enumerate().for_each(|(i, rot)| { 1186 + if i == last_idx { 1187 + manager.commit_rotation(rot.file_id, rot.fd); 1188 + ctx.active_files.register(ctx.shard_id, rot.file_id); 1189 + } else { 1190 + let _ = manager.io().close(rot.hint_fd); 1191 + manager.evict_handle(rot.file_id); 1192 + } 1193 + }); 1194 + manager.evict_handle(old_file_id); 1195 + let _ = manager.io().close(old_hint_fd); 1178 1196 } 1179 1197 1180 1198 state.file_id = data_writer.file_id();
+107
crates/tranquil-store/tests/fd_lifecycle.rs
··· 1 + use std::io::BufRead; 2 + use std::path::Path; 3 + 4 + use jacquard_repo::storage::BlockStore; 5 + use tranquil_store::blockstore::{BlockStoreConfig, GroupCommitConfig, TranquilBlockStore}; 6 + 7 + const POST_DROP_FD_TOLERANCE: i64 = 2; 8 + 9 + fn fd_count() -> usize { 10 + std::fs::read_dir("/proc/self/fd") 11 + .map(|it| it.count()) 12 + .unwrap_or(0) 13 + } 14 + 15 + fn log_rlimit(label: &str) { 16 + if let Ok(f) = std::fs::File::open("/proc/self/limits") { 17 + let reader = std::io::BufReader::new(f); 18 + reader 19 + .lines() 20 + .map_while(Result::ok) 21 + .find(|l| l.contains("open files")) 22 + .into_iter() 23 + .for_each(|l| eprintln!("[{label}] {l}")); 24 + } 25 + } 26 + 27 + fn config_for(dir: &Path, max_file_size: u64) -> BlockStoreConfig { 28 + BlockStoreConfig { 29 + data_dir: dir.join("data"), 30 + index_dir: dir.join("index"), 31 + max_file_size, 32 + group_commit: GroupCommitConfig { 33 + checkpoint_interval_ms: 100, 34 + checkpoint_write_threshold: 10, 35 + ..GroupCommitConfig::default() 36 + }, 37 + shard_count: 1, 38 + } 39 + } 40 + 41 + fn tiny_block(seed: u64) -> Vec<u8> { 42 + let bytes = seed.to_le_bytes(); 43 + (0..64).map(|i| bytes[i % 8] ^ (i as u8).wrapping_mul(31)).collect() 44 + } 45 + 46 + #[tokio::test] 47 + async fn fds_stable_within_store_lifetime() { 48 + log_rlimit("start"); 49 + let dir = tempfile::TempDir::new().unwrap(); 50 + let cfg = config_for(dir.path(), 4096); 51 + let base = fd_count() as i64; 52 + eprintln!("baseline fds: {base}"); 53 + 54 + let store = TranquilBlockStore::open(cfg.clone()).expect("open"); 55 + let after_open = fd_count() as i64; 56 + eprintln!("after open: {after_open} fds, delta {}", after_open - base); 57 + 58 + for i in 0..20_000u64 { 59 + let data = tiny_block(i); 60 + store.put(&data).await.expect("put"); 61 + if i.is_multiple_of(2_000) { 62 + let fds = fd_count() as i64; 63 + eprintln!("after {i} puts: {fds} fds, delta {}", fds - base); 64 + } 65 + } 66 + 67 + let final_fds = fd_count() as i64; 68 + eprintln!("final: {final_fds} fds, delta {}", final_fds - base); 69 + 70 + drop(store); 71 + let after_drop = fd_count() as i64; 72 + let delta = after_drop - base; 73 + eprintln!("after drop: {after_drop} fds, delta {delta}"); 74 + 75 + assert!( 76 + delta <= POST_DROP_FD_TOLERANCE, 77 + "fd leak after store drop: baseline {base}, after_drop {after_drop}, delta {delta}" 78 + ); 79 + } 80 + 81 + #[tokio::test] 82 + async fn fds_stable_across_reopens() { 83 + log_rlimit("start"); 84 + let dir = tempfile::TempDir::new().unwrap(); 85 + let cfg = config_for(dir.path(), 4096); 86 + let base = fd_count() as i64; 87 + eprintln!("baseline: {base}"); 88 + 89 + for cycle in 0..20usize { 90 + let store = TranquilBlockStore::open(cfg.clone()).expect("open"); 91 + for i in 0..2_000u64 { 92 + let data = tiny_block((cycle as u64) * 10_000 + i); 93 + store.put(&data).await.expect("put"); 94 + } 95 + let before_drop = fd_count() as i64; 96 + drop(store); 97 + let after_drop = fd_count() as i64; 98 + let delta = after_drop - base; 99 + eprintln!( 100 + "cycle {cycle}: before_drop {before_drop}, after_drop {after_drop}, delta {delta}" 101 + ); 102 + assert!( 103 + delta <= POST_DROP_FD_TOLERANCE, 104 + "fd leak across reopens at cycle {cycle}: baseline {base}, after_drop {after_drop}, delta {delta}" 105 + ); 106 + } 107 + }