Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-store): commit-marker batch replay, batch-boundary rotation

Lewis: May this revision serve well! <lu5a@proton.me>

+350 -142
+15
crates/tranquil-store/src/blockstore/compaction.rs
··· 201 201 Ok::<_, CompactionError>(()) 202 202 }); 203 203 204 + let record_count = u32::try_from( 205 + (live_count as u128).saturating_add(dead_count as u128), 206 + ) 207 + .unwrap_or(u32::MAX); 208 + let writer_position = writer.position(); 204 209 let finalize_result = scan_result 205 210 .and_then(|()| writer.sync().map_err(CompactionError::from)) 211 + .and_then(|()| { 212 + hint_writer 213 + .append_commit_marker( 214 + current_epoch.raw(), 215 + record_count, 216 + new_file_id, 217 + writer_position, 218 + ) 219 + .map_err(CompactionError::from) 220 + }) 206 221 .and_then(|()| hint_writer.sync().map_err(CompactionError::from)) 207 222 .and_then(|()| manager.io().sync_dir(manager.data_dir()).map_err(CompactionError::from)); 208 223
+45 -31
crates/tranquil-store/src/blockstore/group_commit.rs
··· 1217 1217 let mut hint_writer = 1218 1218 HintFileWriter::resume(manager.io(), current_hint_fd, state.hint_position); 1219 1219 1220 + if manager.should_rotate(data_writer.position()) { 1221 + data_writer.sync().map_err(CommitError::from)?; 1222 + hint_writer.sync().map_err(CommitError::from)?; 1223 + 1224 + let next_id = ctx.file_ids.allocate(); 1225 + let next_fd = manager.open_for_append(next_id)?; 1226 + 1227 + tracing::info!( 1228 + from = %data_writer.file_id(), 1229 + to = %next_id, 1230 + "data file rotation (batch boundary)" 1231 + ); 1232 + 1233 + data_writer = DataFileWriter::new(manager.io(), next_fd, next_id)?; 1234 + 1235 + let new_hint_path = hint_file_path(manager.data_dir(), next_id); 1236 + let new_hint_fd = manager 1237 + .io() 1238 + .open(&new_hint_path, OpenOptions::read_write())?; 1239 + 1240 + manager.io().sync_dir(manager.data_dir())?; 1241 + 1242 + current_hint_fd = new_hint_fd; 1243 + hint_writer = HintFileWriter::new(manager.io(), new_hint_fd); 1244 + rotations.push(RotationState { 1245 + file_id: next_id, 1246 + fd: next_fd, 1247 + hint_fd: new_hint_fd, 1248 + }); 1249 + } 1250 + 1220 1251 let mut block_bytes: u64 = 0; 1221 1252 let mut block_count: u64 = 0; 1222 1253 let mut dedup_hits: u64 = 0; ··· 1247 1278 loc 1248 1279 } 1249 1280 None => { 1250 - if manager.should_rotate(data_writer.position()) { 1251 - data_writer.sync()?; 1252 - hint_writer.sync()?; 1253 - 1254 - let next_id = ctx.file_ids.allocate(); 1255 - let next_fd = manager.open_for_append(next_id)?; 1256 - 1257 - tracing::info!( 1258 - from = %data_writer.file_id(), 1259 - to = %next_id, 1260 - "data file rotation" 1261 - ); 1262 - 1263 - data_writer = DataFileWriter::new(manager.io(), next_fd, next_id)?; 1264 - 1265 - let new_hint_path = hint_file_path(manager.data_dir(), next_id); 1266 - let new_hint_fd = manager 1267 - .io() 1268 - .open(&new_hint_path, OpenOptions::read_write())?; 1269 - 1270 - manager.io().sync_dir(manager.data_dir())?; 1271 - 1272 - current_hint_fd = new_hint_fd; 1273 - hint_writer = HintFileWriter::new(manager.io(), new_hint_fd); 1274 - rotations.push(RotationState { 1275 - file_id: next_id, 1276 - fd: next_fd, 1277 - hint_fd: new_hint_fd, 1278 - }); 1279 - } 1280 - 1281 1281 let loc = data_writer.append_block(cid_bytes, data)?; 1282 1282 hint_writer.append_hint(cid_bytes, &loc)?; 1283 1283 ··· 1325 1325 if ctx.verify_persisted_blocks { 1326 1326 verify_persisted_blocks(manager, &index_entries).map_err(rollback_on_err)?; 1327 1327 } 1328 + let batch_record_count = u32::try_from( 1329 + block_count 1330 + .saturating_add(dedup_hits) 1331 + .saturating_add(all_decrements.len() as u64), 1332 + ) 1333 + .unwrap_or(u32::MAX); 1334 + hint_writer 1335 + .append_commit_marker( 1336 + current_epoch.raw(), 1337 + batch_record_count, 1338 + data_writer.file_id(), 1339 + data_writer.position(), 1340 + ) 1341 + .map_err(|e| rollback_on_err(CommitError::from(e)))?; 1328 1342 hint_writer.sync().map_err(|e| rollback_on_err(e.into()))?; 1329 1343 let sync_nanos = t.elapsed().as_nanos() as u64; 1330 1344
+2 -1
crates/tranquil-store/src/blockstore/hash_index.rs
··· 1637 1637 ReadHintRecord::Remove { cid_bytes } => { 1638 1638 let _ = table.remove(&cid_bytes); 1639 1639 } 1640 - ReadHintRecord::UnknownVersion { .. } 1640 + ReadHintRecord::CommitMarker { .. } 1641 + | ReadHintRecord::UnknownVersion { .. } 1641 1642 | ReadHintRecord::UnknownType { .. } 1642 1643 | ReadHintRecord::Corrupted 1643 1644 | ReadHintRecord::Truncated => {}
+275 -104
crates/tranquil-store/src/blockstore/hint.rs
··· 22 22 const RECORD_TYPE_DECREMENT: u8 = 0x02; 23 23 const RECORD_TYPE_RELOCATE: u8 = 0x03; 24 24 const RECORD_TYPE_REMOVE: u8 = 0x04; 25 + const RECORD_TYPE_COMMIT_MARKER: u8 = 0x05; 25 26 26 27 const HINT_FORMAT_VERSION: u8 = 1; 27 28 ··· 142 143 write_hint_record(io, fd, write_offset, &record) 143 144 } 144 145 146 + const MARKER_DATA_OFFSET_POS: usize = CID_OFFSET; 147 + const MARKER_DATA_FILE_ID_POS: usize = CID_OFFSET + 8; 148 + const MARKER_RECORD_COUNT_POS: usize = FIELD_B_OFFSET; 149 + 150 + pub(crate) fn encode_commit_marker_record<S: StorageIO>( 151 + io: &S, 152 + fd: FileId, 153 + write_offset: HintOffset, 154 + batch_seq: u64, 155 + record_count: u32, 156 + data_file_id: DataFileId, 157 + data_offset: BlockOffset, 158 + ) -> io::Result<()> { 159 + let mut record = [0u8; HINT_RECORD_SIZE]; 160 + record[TYPE_OFFSET] = RECORD_TYPE_COMMIT_MARKER; 161 + record[VERSION_OFFSET] = HINT_FORMAT_VERSION; 162 + record[MARKER_DATA_OFFSET_POS..MARKER_DATA_OFFSET_POS + 8] 163 + .copy_from_slice(&data_offset.raw().to_le_bytes()); 164 + record[MARKER_DATA_FILE_ID_POS..MARKER_DATA_FILE_ID_POS + 4] 165 + .copy_from_slice(&data_file_id.raw().to_le_bytes()); 166 + record[FIELD_A_OFFSET..FIELD_A_OFFSET + 8].copy_from_slice(&batch_seq.to_le_bytes()); 167 + record[MARKER_RECORD_COUNT_POS..MARKER_RECORD_COUNT_POS + 4] 168 + .copy_from_slice(&record_count.to_le_bytes()); 169 + 170 + let checksum = hint_checksum(&record[..HINT_PAYLOAD_SIZE]); 171 + record[CHECKSUM_OFFSET..].copy_from_slice(&checksum.to_le_bytes()); 172 + 173 + write_hint_record(io, fd, write_offset, &record) 174 + } 175 + 145 176 #[must_use] 146 177 #[derive(Debug)] 147 178 pub enum ReadHintRecord { ··· 165 196 }, 166 197 Remove { 167 198 cid_bytes: [u8; CID_SIZE], 199 + }, 200 + CommitMarker { 201 + batch_seq: u64, 202 + record_count: u32, 203 + data_file_id: DataFileId, 204 + data_offset: BlockOffset, 168 205 }, 169 206 UnknownVersion { 170 207 version: u8, ··· 295 332 })) 296 333 } 297 334 RECORD_TYPE_REMOVE => Ok(Some(ReadHintRecord::Remove { cid_bytes })), 335 + RECORD_TYPE_COMMIT_MARKER => { 336 + let data_offset = BlockOffset::new(u64::from_le_bytes( 337 + record[MARKER_DATA_OFFSET_POS..MARKER_DATA_OFFSET_POS + 8] 338 + .try_into() 339 + .unwrap(), 340 + )); 341 + let data_file_id = DataFileId::new(u32::from_le_bytes( 342 + record[MARKER_DATA_FILE_ID_POS..MARKER_DATA_FILE_ID_POS + 4] 343 + .try_into() 344 + .unwrap(), 345 + )); 346 + let batch_seq = u64::from_le_bytes( 347 + record[FIELD_A_OFFSET..FIELD_A_OFFSET + 8] 348 + .try_into() 349 + .unwrap(), 350 + ); 351 + let record_count = u32::from_le_bytes( 352 + record[MARKER_RECORD_COUNT_POS..MARKER_RECORD_COUNT_POS + 4] 353 + .try_into() 354 + .unwrap(), 355 + ); 356 + Ok(Some(ReadHintRecord::CommitMarker { 357 + batch_seq, 358 + record_count, 359 + data_file_id, 360 + data_offset, 361 + })) 362 + } 298 363 other => Ok(Some(ReadHintRecord::UnknownType { record_type: other })), 299 364 } 300 365 } ··· 356 421 Ok(()) 357 422 } 358 423 424 + pub fn append_commit_marker( 425 + &mut self, 426 + batch_seq: u64, 427 + record_count: u32, 428 + data_file_id: DataFileId, 429 + data_offset: BlockOffset, 430 + ) -> io::Result<()> { 431 + encode_commit_marker_record( 432 + self.io, 433 + self.fd, 434 + self.position, 435 + batch_seq, 436 + record_count, 437 + data_file_id, 438 + data_offset, 439 + )?; 440 + self.position = self.position.advance(HINT_RECORD_SIZE as u64); 441 + Ok(()) 442 + } 443 + 359 444 pub fn sync(&self) -> io::Result<()> { 360 445 self.io.sync(self.fd) 361 446 } ··· 416 501 | ReadHintRecord::Decrement { .. } 417 502 | ReadHintRecord::Relocate { .. } 418 503 | ReadHintRecord::Remove { .. } 419 - | ReadHintRecord::UnknownType { .. } => { 504 + | ReadHintRecord::CommitMarker { .. } 505 + | ReadHintRecord::UnknownType { .. } 506 + | ReadHintRecord::UnknownVersion { .. } 507 + | ReadHintRecord::Corrupted => { 420 508 self.position = self.position.advance(HINT_RECORD_SIZE as u64); 421 509 } 422 - ReadHintRecord::UnknownVersion { .. } 423 - | ReadHintRecord::Corrupted 424 - | ReadHintRecord::Truncated => { 510 + ReadHintRecord::Truncated => { 425 511 self.position = HintOffset::new(self.file_size); 426 512 } 427 513 } ··· 529 615 ReadHintRecord::Decrement { .. } 530 616 | ReadHintRecord::Relocate { .. } 531 617 | ReadHintRecord::Remove { .. } 618 + | ReadHintRecord::CommitMarker { .. } 532 619 | ReadHintRecord::UnknownVersion { .. } 533 620 | ReadHintRecord::UnknownType { .. } 534 621 | ReadHintRecord::Corrupted ··· 542 629 entries 543 630 } 544 631 545 - const REPLAY_BATCH_SIZE: usize = 10_000; 632 + #[derive(Default)] 633 + struct PendingBatch { 634 + puts: Vec<([u8; CID_SIZE], BlockLocation)>, 635 + relocates: Vec<([u8; CID_SIZE], BlockLocation, u32)>, 636 + removes: Vec<[u8; CID_SIZE]>, 637 + decrements: Vec<([u8; CID_SIZE], CommitEpoch, WallClockMs)>, 638 + file_cursors: HashMap<DataFileId, BlockOffset>, 639 + max_cursor: Option<WriteCursor>, 640 + record_count: u32, 641 + boundary_lost: bool, 642 + } 643 + 644 + impl PendingBatch { 645 + fn reset(&mut self) { 646 + self.puts.clear(); 647 + self.relocates.clear(); 648 + self.removes.clear(); 649 + self.decrements.clear(); 650 + self.file_cursors.clear(); 651 + self.max_cursor = None; 652 + self.record_count = 0; 653 + self.boundary_lost = false; 654 + } 655 + 656 + fn note_record(&mut self) { 657 + self.record_count = self.record_count.saturating_add(1); 658 + } 659 + 660 + fn track_cursor(&mut self, file_id: DataFileId, end: BlockOffset) { 661 + let candidate = WriteCursor { 662 + file_id, 663 + offset: end, 664 + }; 665 + self.max_cursor = Some(match self.max_cursor { 666 + Some(c) => std::cmp::max_by_key(c, candidate, |w| (w.file_id, w.offset)), 667 + None => candidate, 668 + }); 669 + self.file_cursors 670 + .entry(file_id) 671 + .and_modify(|existing| { 672 + if end > *existing { 673 + *existing = end; 674 + } 675 + }) 676 + .or_insert(end); 677 + } 678 + } 679 + 680 + fn commit_pending_batch( 681 + pending: &mut PendingBatch, 682 + index: &super::hash_index::BlockIndex, 683 + file_cursors: &mut HashMap<DataFileId, BlockOffset>, 684 + max_cursor: &mut Option<WriteCursor>, 685 + replayed: &mut u64, 686 + ) -> Result<(), RebuildError> { 687 + if !pending.puts.is_empty() { 688 + index.batch_insert_buffered(&pending.puts)?; 689 + } 690 + if !pending.relocates.is_empty() { 691 + index.batch_relocate(&pending.relocates)?; 692 + } 693 + if !pending.removes.is_empty() { 694 + index.batch_remove(&pending.removes); 695 + } 696 + pending 697 + .decrements 698 + .iter() 699 + .try_for_each(|(cid, epoch, ts)| index.batch_decrement(&[*cid], *epoch, *ts))?; 700 + 701 + pending.file_cursors.iter().for_each(|(fid, end)| { 702 + file_cursors 703 + .entry(*fid) 704 + .and_modify(|existing| { 705 + if *end > *existing { 706 + *existing = *end; 707 + } 708 + }) 709 + .or_insert(*end); 710 + }); 711 + if let Some(c) = pending.max_cursor { 712 + *max_cursor = Some(match *max_cursor { 713 + Some(m) => std::cmp::max_by_key(m, c, |w| (w.file_id, w.offset)), 714 + None => c, 715 + }); 716 + } 717 + *replayed = replayed.saturating_add(u64::from(pending.record_count)); 718 + pending.reset(); 719 + Ok(()) 720 + } 546 721 547 722 pub fn replay_hints_into_block_index<S: StorageIO>( 548 723 io: &S, ··· 568 743 let mut max_cursor: Option<WriteCursor> = None; 569 744 let mut file_cursors: HashMap<DataFileId, BlockOffset> = HashMap::new(); 570 745 let mut replayed: u64 = 0; 571 - let mut put_buffer: Vec<([u8; CID_SIZE], BlockLocation)> = 572 - Vec::with_capacity(REPLAY_BATCH_SIZE); 573 - let mut relocate_buffer: Vec<([u8; CID_SIZE], BlockLocation, u32)> = 574 - Vec::with_capacity(REPLAY_BATCH_SIZE); 575 - let mut remove_buffer: Vec<[u8; CID_SIZE]> = Vec::with_capacity(REPLAY_BATCH_SIZE); 746 + let mut pending = PendingBatch::default(); 576 747 577 748 hint_files 578 749 .iter() ··· 604 775 offset, 605 776 length, 606 777 }; 607 - put_buffer.push((cid_bytes, loc)); 608 - 609 778 let record_end = 610 779 offset.advance(BLOCK_RECORD_OVERHEAD as u64 + length.as_u64()); 611 - let candidate = WriteCursor { 612 - file_id, 613 - offset: record_end, 614 - }; 615 - max_cursor = Some(match max_cursor { 616 - Some(c) => { 617 - std::cmp::max_by_key(c, candidate, |w| (w.file_id, w.offset)) 618 - } 619 - None => candidate, 620 - }); 621 - file_cursors 622 - .entry(file_id) 623 - .and_modify(|existing| { 624 - if record_end > *existing { 625 - *existing = record_end; 626 - } 627 - }) 628 - .or_insert(record_end); 629 - 630 - replayed = replayed.saturating_add(1); 631 - if put_buffer.len() >= REPLAY_BATCH_SIZE { 632 - index.batch_insert_buffered(&put_buffer)?; 633 - put_buffer.clear(); 634 - } 780 + pending.puts.push((cid_bytes, loc)); 781 + pending.track_cursor(file_id, record_end); 782 + pending.note_record(); 635 783 } 636 784 ReadHintRecord::Decrement { 637 785 cid_bytes, 638 786 epoch, 639 787 timestamp, 640 788 } => { 641 - if !put_buffer.is_empty() { 642 - index.batch_insert_buffered(&put_buffer)?; 643 - put_buffer.clear(); 644 - } 645 - if !relocate_buffer.is_empty() { 646 - index.batch_relocate(&relocate_buffer)?; 647 - relocate_buffer.clear(); 648 - } 649 - if !remove_buffer.is_empty() { 650 - index.batch_remove(&remove_buffer); 651 - remove_buffer.clear(); 652 - } 653 - index.batch_decrement(&[cid_bytes], epoch, timestamp)?; 654 - replayed = replayed.saturating_add(1); 789 + pending.decrements.push((cid_bytes, epoch, timestamp)); 790 + pending.note_record(); 655 791 } 656 792 ReadHintRecord::Relocate { 657 793 cid_bytes, ··· 665 801 offset, 666 802 length, 667 803 }; 668 - relocate_buffer.push((cid_bytes, loc, refcount)); 669 - 670 804 let record_end = 671 805 offset.advance(BLOCK_RECORD_OVERHEAD as u64 + length.as_u64()); 672 - file_cursors 673 - .entry(file_id) 674 - .and_modify(|existing| { 675 - if record_end > *existing { 676 - *existing = record_end; 677 - } 678 - }) 679 - .or_insert(record_end); 680 - 681 - replayed = replayed.saturating_add(1); 682 - if relocate_buffer.len() >= REPLAY_BATCH_SIZE { 683 - if !put_buffer.is_empty() { 684 - index.batch_insert_buffered(&put_buffer)?; 685 - put_buffer.clear(); 686 - } 687 - index.batch_relocate(&relocate_buffer)?; 688 - relocate_buffer.clear(); 689 - } 806 + pending.relocates.push((cid_bytes, loc, refcount)); 807 + pending.track_cursor(file_id, record_end); 808 + pending.note_record(); 690 809 } 691 810 ReadHintRecord::Remove { cid_bytes } => { 692 - remove_buffer.push(cid_bytes); 693 - replayed = replayed.saturating_add(1); 694 - if remove_buffer.len() >= REPLAY_BATCH_SIZE { 695 - if !put_buffer.is_empty() { 696 - index.batch_insert_buffered(&put_buffer)?; 697 - put_buffer.clear(); 811 + pending.removes.push(cid_bytes); 812 + pending.note_record(); 813 + } 814 + ReadHintRecord::CommitMarker { 815 + batch_seq, 816 + record_count, 817 + data_file_id, 818 + data_offset, 819 + } => { 820 + let accepts = 821 + !pending.boundary_lost && pending.record_count == record_count; 822 + match accepts { 823 + true => { 824 + pending.track_cursor(data_file_id, data_offset); 825 + commit_pending_batch( 826 + &mut pending, 827 + index, 828 + &mut file_cursors, 829 + &mut max_cursor, 830 + &mut replayed, 831 + )?; 698 832 } 699 - if !relocate_buffer.is_empty() { 700 - index.batch_relocate(&relocate_buffer)?; 701 - relocate_buffer.clear(); 833 + false => { 834 + tracing::warn!( 835 + file_id = %fid, 836 + batch_seq, 837 + expected_count = record_count, 838 + observed_count = pending.record_count, 839 + boundary_lost = pending.boundary_lost, 840 + "rolling back torn hint batch" 841 + ); 842 + pending.reset(); 702 843 } 703 - index.batch_remove(&remove_buffer); 704 - remove_buffer.clear(); 705 844 } 706 845 } 707 - ReadHintRecord::Corrupted => { 708 - tracing::warn!( 709 - file_id = %fid, 710 - "corrupted hint record during replay, skipping" 711 - ); 846 + ReadHintRecord::Corrupted 847 + | ReadHintRecord::UnknownVersion { .. } 848 + | ReadHintRecord::UnknownType { .. } => { 849 + pending.boundary_lost = true; 712 850 } 713 - ReadHintRecord::UnknownVersion { .. } 714 - | ReadHintRecord::UnknownType { .. } 715 - | ReadHintRecord::Truncated => {} 851 + ReadHintRecord::Truncated => {} 716 852 } 717 853 Ok::<_, RebuildError>(()) 718 854 })?; 719 855 720 - if !put_buffer.is_empty() { 721 - index.batch_insert_buffered(&put_buffer)?; 722 - put_buffer.clear(); 723 - } 724 - if !relocate_buffer.is_empty() { 725 - index.batch_relocate(&relocate_buffer)?; 726 - relocate_buffer.clear(); 727 - } 728 - if !remove_buffer.is_empty() { 729 - index.batch_remove(&remove_buffer); 730 - remove_buffer.clear(); 731 - } 732 - 733 856 let _ = io.close(fd); 734 857 Ok(()) 735 858 })?; 859 + 860 + if pending.record_count > 0 || pending.boundary_lost { 861 + tracing::warn!( 862 + record_count = pending.record_count, 863 + boundary_lost = pending.boundary_lost, 864 + "discarding unterminated hint batch at replay end" 865 + ); 866 + pending.reset(); 867 + } 736 868 737 869 if let Some(cursor) = max_cursor { 738 870 index.set_write_cursor(cursor)?; ··· 1097 1229 } 1098 1230 1099 1231 #[test] 1100 - fn hint_reader_stops_on_corrupted() { 1232 + fn hint_reader_reports_corrupted_and_continues() { 1101 1233 let (sim, fd) = setup(); 1102 1234 let mut writer = HintFileWriter::new(&sim, fd); 1103 1235 ··· 1115 1247 1116 1248 let reader = HintFileReader::open(&sim, fd).unwrap(); 1117 1249 let records: Vec<_> = reader.map(|r| r.unwrap()).collect(); 1118 - assert_eq!(records.len(), 2); 1250 + assert_eq!(records.len(), 3); 1119 1251 assert!(matches!(records[0], ReadHintRecord::Put { .. })); 1120 1252 assert!(matches!(records[1], ReadHintRecord::Corrupted)); 1253 + assert!(matches!(records[2], ReadHintRecord::Put { .. })); 1254 + } 1255 + 1256 + #[test] 1257 + fn commit_marker_round_trip() { 1258 + let (sim, fd) = setup(); 1259 + let data_file_id = DataFileId::new(7); 1260 + let data_offset = BlockOffset::new(9_876); 1261 + 1262 + encode_commit_marker_record( 1263 + &sim, 1264 + fd, 1265 + HintOffset::new(0), 1266 + 42, 1267 + 128, 1268 + data_file_id, 1269 + data_offset, 1270 + ) 1271 + .unwrap(); 1272 + 1273 + let file_size = sim.file_size(fd).unwrap(); 1274 + let record = decode_hint_record(&sim, fd, HintOffset::new(0), file_size) 1275 + .unwrap() 1276 + .unwrap(); 1277 + 1278 + match record { 1279 + ReadHintRecord::CommitMarker { 1280 + batch_seq, 1281 + record_count, 1282 + data_file_id: fid, 1283 + data_offset: off, 1284 + } => { 1285 + assert_eq!(batch_seq, 42); 1286 + assert_eq!(record_count, 128); 1287 + assert_eq!(fid, data_file_id); 1288 + assert_eq!(off, data_offset); 1289 + } 1290 + other => panic!("expected CommitMarker, got {other:?}"), 1291 + } 1121 1292 } 1122 1293 1123 1294 #[test]
+3 -1
crates/tranquil-store/tests/backup.rs
··· 484 484 fn incremental_is_smaller_than_full() { 485 485 with_runtime(|| { 486 486 let store = open_test_store_with_max_file_size(2048); 487 - seed_blocks(&store, 0..100); 487 + (0u16..100).step_by(10).for_each(|start| { 488 + seed_blocks(&store, start..start + 10); 489 + }); 488 490 seed_events(&store, 50); 489 491 store.metastore.persist().unwrap(); 490 492
+4
crates/tranquil-store/tests/gc.rs
··· 227 227 .collect(); 228 228 store.put_blocks_blocking(padding).unwrap(); 229 229 230 + store 231 + .put_blocks_blocking(vec![(test_cid(220), vec![220u8; 64])]) 232 + .unwrap(); 233 + 230 234 let files = store.list_data_files().unwrap(); 231 235 let first_file = files[0]; 232 236
+6 -5
crates/tranquil-store/tests/sim_blockstore.rs
··· 654 654 }; 655 655 656 656 let block_count = ((seed % 25) + 10) as u32; 657 - let blocks: Vec<(CidBytes, Vec<u8>)> = (0..block_count) 658 - .map(|i| (test_cid(i), block_data(i))) 659 - .collect(); 660 - let all_cids: Vec<CidBytes> = blocks.iter().map(|(c, _)| *c).collect(); 657 + let all_cids: Vec<CidBytes> = 658 + (0..block_count).map(test_cid).collect(); 661 659 662 660 { 663 661 let store = TranquilBlockStore::open(config.clone()).unwrap(); 664 - store.put_blocks_blocking(blocks).unwrap(); 662 + (0..block_count).try_for_each(|i| { 663 + store.put_blocks_blocking(vec![(test_cid(i), block_data(i))]) 664 + }) 665 + .unwrap(); 665 666 666 667 let files = store.list_data_files().unwrap(); 667 668 assert!(