Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(postgres): semaphore on car endpoint & more efficient query

Lewis: May this revision serve well! <lu5a@proton.me>

+312 -185
+22 -22
Cargo.lock
··· 7405 7405 7406 7406 [[package]] 7407 7407 name = "tranquil-api" 7408 - version = "0.5.1" 7408 + version = "0.5.2" 7409 7409 dependencies = [ 7410 7410 "anyhow", 7411 7411 "axum", ··· 7456 7456 7457 7457 [[package]] 7458 7458 name = "tranquil-auth" 7459 - version = "0.5.1" 7459 + version = "0.5.2" 7460 7460 dependencies = [ 7461 7461 "anyhow", 7462 7462 "base32", ··· 7479 7479 7480 7480 [[package]] 7481 7481 name = "tranquil-cache" 7482 - version = "0.5.1" 7482 + version = "0.5.2" 7483 7483 dependencies = [ 7484 7484 "async-trait", 7485 7485 "base64 0.22.1", ··· 7493 7493 7494 7494 [[package]] 7495 7495 name = "tranquil-comms" 7496 - version = "0.5.1" 7496 + version = "0.5.2" 7497 7497 dependencies = [ 7498 7498 "async-trait", 7499 7499 "base64 0.22.1", ··· 7511 7511 7512 7512 [[package]] 7513 7513 name = "tranquil-config" 7514 - version = "0.5.1" 7514 + version = "0.5.2" 7515 7515 dependencies = [ 7516 7516 "confique", 7517 7517 "serde", ··· 7519 7519 7520 7520 [[package]] 7521 7521 name = "tranquil-crypto" 7522 - version = "0.5.1" 7522 + version = "0.5.2" 7523 7523 dependencies = [ 7524 7524 "aes-gcm", 7525 7525 "base64 0.22.1", ··· 7535 7535 7536 7536 [[package]] 7537 7537 name = "tranquil-db" 7538 - version = "0.5.1" 7538 + version = "0.5.2" 7539 7539 dependencies = [ 7540 7540 "async-trait", 7541 7541 "chrono", ··· 7552 7552 7553 7553 [[package]] 7554 7554 name = "tranquil-db-traits" 7555 - version = "0.5.1" 7555 + version = "0.5.2" 7556 7556 dependencies = [ 7557 7557 "async-trait", 7558 7558 "base64 0.22.1", ··· 7568 7568 7569 7569 [[package]] 7570 7570 name = "tranquil-infra" 7571 - version = "0.5.1" 7571 + version = "0.5.2" 7572 7572 dependencies = [ 7573 7573 "async-trait", 7574 7574 "bytes", ··· 7579 7579 7580 7580 [[package]] 7581 7581 name = "tranquil-lexicon" 7582 - version = "0.5.1" 7582 + version = "0.5.2" 7583 7583 dependencies = [ 7584 7584 "chrono", 7585 7585 "hickory-resolver", ··· 7597 7597 7598 7598 [[package]] 7599 7599 name = "tranquil-oauth" 7600 - version = "0.5.1" 7600 + version = "0.5.2" 7601 7601 dependencies = [ 7602 7602 "anyhow", 7603 7603 "axum", ··· 7620 7620 7621 7621 [[package]] 7622 7622 name = "tranquil-oauth-server" 7623 - version = "0.5.1" 7623 + version = "0.5.2" 7624 7624 dependencies = [ 7625 7625 "axum", 7626 7626 "base64 0.22.1", ··· 7653 7653 7654 7654 [[package]] 7655 7655 name = "tranquil-pds" 7656 - version = "0.5.1" 7656 + version = "0.5.2" 7657 7657 dependencies = [ 7658 7658 "aes-gcm", 7659 7659 "anyhow", ··· 7745 7745 7746 7746 [[package]] 7747 7747 name = "tranquil-repo" 7748 - version = "0.5.1" 7748 + version = "0.5.2" 7749 7749 dependencies = [ 7750 7750 "bytes", 7751 7751 "cid", ··· 7757 7757 7758 7758 [[package]] 7759 7759 name = "tranquil-ripple" 7760 - version = "0.5.1" 7760 + version = "0.5.2" 7761 7761 dependencies = [ 7762 7762 "async-trait", 7763 7763 "backon", ··· 7782 7782 7783 7783 [[package]] 7784 7784 name = "tranquil-scopes" 7785 - version = "0.5.1" 7785 + version = "0.5.2" 7786 7786 dependencies = [ 7787 7787 "axum", 7788 7788 "futures", ··· 7798 7798 7799 7799 [[package]] 7800 7800 name = "tranquil-server" 7801 - version = "0.5.1" 7801 + version = "0.5.2" 7802 7802 dependencies = [ 7803 7803 "axum", 7804 7804 "clap", ··· 7819 7819 7820 7820 [[package]] 7821 7821 name = "tranquil-signal" 7822 - version = "0.5.1" 7822 + version = "0.5.2" 7823 7823 dependencies = [ 7824 7824 "async-trait", 7825 7825 "chrono", ··· 7842 7842 7843 7843 [[package]] 7844 7844 name = "tranquil-storage" 7845 - version = "0.5.1" 7845 + version = "0.5.2" 7846 7846 dependencies = [ 7847 7847 "async-trait", 7848 7848 "aws-config", ··· 7859 7859 7860 7860 [[package]] 7861 7861 name = "tranquil-store" 7862 - version = "0.5.1" 7862 + version = "0.5.2" 7863 7863 dependencies = [ 7864 7864 "async-trait", 7865 7865 "bytes", ··· 7905 7905 7906 7906 [[package]] 7907 7907 name = "tranquil-sync" 7908 - version = "0.5.1" 7908 + version = "0.5.2" 7909 7909 dependencies = [ 7910 7910 "anyhow", 7911 7911 "axum", ··· 7927 7927 7928 7928 [[package]] 7929 7929 name = "tranquil-types" 7930 - version = "0.5.1" 7930 + version = "0.5.2" 7931 7931 dependencies = [ 7932 7932 "chrono", 7933 7933 "cid",
+1 -1
Cargo.toml
··· 26 26 ] 27 27 28 28 [workspace.package] 29 - version = "0.5.1" 29 + version = "0.5.2" 30 30 edition = "2024" 31 31 license = "AGPL-3.0-or-later" 32 32
+4
crates/tranquil-config/src/lib.rs
··· 720 720 #[config(env = "FIREHOSE_MAX_LAG", default = 5000)] 721 721 pub max_lag: u64, 722 722 723 + /// Maximum concurrent full-repo exports, eg. getRepo without `since`. 724 + #[config(env = "MAX_CONCURRENT_REPO_EXPORTS", default = 4)] 725 + pub max_concurrent_repo_exports: usize, 726 + 723 727 /// List of relay / crawler notification URLs. 724 728 #[config(env = "CRAWLERS", parse_env = split_comma_list)] 725 729 pub crawlers: Option<Vec<String>>,
+20 -9
crates/tranquil-pds/src/comms/service.rs
··· 3 3 use std::time::Duration; 4 4 5 5 use chrono::Utc; 6 - use tokio::time::interval; 6 + 7 7 use tokio_util::sync::CancellationToken; 8 8 use tracing::{debug, error, info, warn}; 9 9 use tranquil_comms::{ ··· 75 75 ); 76 76 } 77 77 info!( 78 - poll_interval_secs = self.poll_interval.as_secs(), 78 + poll_interval_ms = self.poll_interval.as_millis() as u64, 79 79 batch_size = self.batch_size, 80 80 channels = ?self.senders.keys().collect::<Vec<_>>(), 81 81 "Starting comms service" 82 82 ); 83 - let mut ticker = interval(self.poll_interval); 83 + let base = self.poll_interval; 84 + let max_backoff = Duration::from_secs(30); 85 + let mut current_delay = base; 84 86 loop { 85 87 tokio::select! { 86 - _ = ticker.tick() => { 87 - if let Err(e) = self.process_batch().await { 88 - error!(error = %e, "Failed to process comms batch"); 88 + _ = tokio::time::sleep(current_delay) => { 89 + match self.process_batch().await { 90 + Ok(had_work) => { 91 + current_delay = match had_work { 92 + true => base, 93 + false => max_backoff.min(current_delay.saturating_mul(2)), 94 + }; 95 + } 96 + Err(e) => { 97 + error!(error = %e, "Failed to process comms batch"); 98 + current_delay = max_backoff.min(current_delay.saturating_mul(2)); 99 + } 89 100 } 90 101 } 91 102 _ = shutdown.cancelled() => { ··· 96 107 } 97 108 } 98 109 99 - async fn process_batch(&self) -> Result<(), tranquil_db_traits::DbError> { 110 + async fn process_batch(&self) -> Result<bool, tranquil_db_traits::DbError> { 100 111 let items = self.fetch_pending().await?; 101 112 if items.is_empty() { 102 - return Ok(()); 113 + return Ok(false); 103 114 } 104 115 debug!(count = items.len(), "Processing comms batch"); 105 116 futures::future::join_all(items.into_iter().map(|item| self.process_item(item))).await; 106 - Ok(()) 117 + Ok(true) 107 118 } 108 119 109 120 async fn fetch_pending(&self) -> Result<Vec<QueuedComms>, tranquil_db_traits::DbError> {
+14 -13
crates/tranquil-pds/src/scheduled.rs
··· 667 667 Ok(()) 668 668 } 669 669 670 + const CAR_BLOCK_BATCH_SIZE: usize = 500; 671 + 670 672 pub async fn generate_repo_car( 671 673 block_store: &AnyBlockStore, 672 674 head_cid: &Cid, ··· 683 685 }) 684 686 .collect(); 685 687 686 - let car_bytes = encode_car_header(head_cid).context("Failed to encode CAR header")?; 688 + let mut car_bytes = encode_car_header(head_cid).context("Failed to encode CAR header")?; 687 689 688 - let blocks = block_store 689 - .get_many(&block_cids) 690 - .await 691 - .context("Failed to fetch blocks")?; 690 + for chunk in block_cids.chunks(CAR_BLOCK_BATCH_SIZE) { 691 + let blocks = block_store 692 + .get_many(chunk) 693 + .await 694 + .context("Failed to fetch blocks")?; 692 695 693 - let car_bytes = block_cids 694 - .iter() 695 - .zip(blocks.iter()) 696 - .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 697 - .fold(car_bytes, |mut acc, (cid, block)| { 698 - acc.extend(encode_car_block(cid, block)); 699 - acc 700 - }); 696 + chunk 697 + .iter() 698 + .zip(blocks.iter()) 699 + .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 700 + .for_each(|(cid, block)| car_bytes.extend(encode_car_block(cid, block))); 701 + } 701 702 702 703 Ok(car_bytes) 703 704 }
+4
crates/tranquil-pds/src/state.rs
··· 50 50 pub signal_sender: Option<Arc<tranquil_signal::SignalSlot>>, 51 51 pub signal_store_provider: Option<Arc<dyn tranquil_signal::SignalStoreProvider>>, 52 52 pub eventlog_segments_dir: Option<PathBuf>, 53 + pub repo_export_semaphore: Arc<tokio::sync::Semaphore>, 53 54 } 54 55 55 56 #[derive(Debug, Clone, Copy)] ··· 394 395 signal_sender: None, 395 396 signal_store_provider, 396 397 eventlog_segments_dir, 398 + repo_export_semaphore: Arc::new(tokio::sync::Semaphore::new( 399 + cfg.firehose.max_concurrent_repo_exports, 400 + )), 397 401 } 398 402 } 399 403
+147
crates/tranquil-pds/tests/sync_resource_limits.rs
··· 1 + mod common; 2 + mod helpers; 3 + use common::*; 4 + use helpers::*; 5 + use reqwest::StatusCode; 6 + use std::sync::Once; 7 + 8 + static SET_SEMAPHORE: Once = Once::new(); 9 + 10 + fn ensure_low_semaphore() { 11 + SET_SEMAPHORE.call_once(|| unsafe { 12 + std::env::set_var("MAX_CONCURRENT_REPO_EXPORTS", "1"); 13 + }); 14 + } 15 + 16 + #[tokio::test] 17 + async fn test_get_repo_succeeds_with_many_records() { 18 + ensure_low_semaphore(); 19 + let client = client(); 20 + let (did, jwt) = setup_new_user("sync-batched-car").await; 21 + 22 + let create_futures = (0..20).map(|i| { 23 + let client = &client; 24 + let did = &did; 25 + let jwt = &jwt; 26 + async move { 27 + create_post(client, did, jwt, &format!("Batch test post {}", i)).await; 28 + } 29 + }); 30 + futures::future::join_all(create_futures).await; 31 + 32 + let res = client 33 + .get(format!( 34 + "{}/xrpc/com.atproto.sync.getRepo", 35 + base_url().await 36 + )) 37 + .query(&[("did", did.as_str())]) 38 + .send() 39 + .await 40 + .expect("Failed to send getRepo request"); 41 + 42 + assert_eq!(res.status(), StatusCode::OK); 43 + assert_eq!( 44 + res.headers() 45 + .get("content-type") 46 + .and_then(|h| h.to_str().ok()), 47 + Some("application/vnd.ipld.car") 48 + ); 49 + let car_bytes = res.bytes().await.expect("Failed to read response body"); 50 + assert!( 51 + car_bytes.len() > 200, 52 + "CAR with 20 records should have substantial data, got {} bytes", 53 + car_bytes.len() 54 + ); 55 + } 56 + 57 + #[tokio::test] 58 + async fn test_get_repo_semaphore_rejects_excess_concurrency() { 59 + ensure_low_semaphore(); 60 + let client = client(); 61 + let (did, jwt) = setup_new_user("sync-semaphore").await; 62 + 63 + for i in 0..50 { 64 + create_post(&client, &did, &jwt, &format!("Padding post {}", i)).await; 65 + } 66 + 67 + let base = base_url().await; 68 + let concurrent_requests = 10; 69 + 70 + let request_futures = (0..concurrent_requests).map(|_| { 71 + let client = client.clone(); 72 + let did = did.clone(); 73 + async move { 74 + client 75 + .get(format!("{}/xrpc/com.atproto.sync.getRepo", base)) 76 + .query(&[("did", did.as_str())]) 77 + .send() 78 + .await 79 + .expect("Failed to send request") 80 + .status() 81 + } 82 + }); 83 + 84 + let statuses: Vec<StatusCode> = futures::future::join_all(request_futures).await; 85 + let ok_count = statuses.iter().filter(|s| **s == StatusCode::OK).count(); 86 + let rejected_count = statuses 87 + .iter() 88 + .filter(|s| **s == StatusCode::SERVICE_UNAVAILABLE) 89 + .count(); 90 + 91 + assert!(ok_count >= 1, "at least one request should succeed"); 92 + assert!( 93 + rejected_count > 0, 94 + "semaphore=1 with {} concurrent requests, expected some 503 rejections", 95 + concurrent_requests 96 + ); 97 + assert!( 98 + ok_count + rejected_count == statuses.len(), 99 + "expected only 200 or 503 responses: {:?}", 100 + statuses 101 + ); 102 + } 103 + 104 + #[tokio::test] 105 + async fn test_get_repo_since_not_affected_by_semaphore() { 106 + ensure_low_semaphore(); 107 + let client = client(); 108 + let (did, jwt) = setup_new_user("sync-since-no-sem").await; 109 + create_post(&client, &did, &jwt, "First post").await; 110 + 111 + let latest_res = client 112 + .get(format!( 113 + "{}/xrpc/com.atproto.sync.getLatestCommit", 114 + base_url().await 115 + )) 116 + .query(&[("did", did.as_str())]) 117 + .send() 118 + .await 119 + .expect("Failed to get latest commit"); 120 + let body: serde_json::Value = latest_res.json().await.unwrap(); 121 + let rev = body["rev"].as_str().unwrap(); 122 + 123 + create_post(&client, &did, &jwt, "Second post").await; 124 + 125 + let base = base_url().await; 126 + let request_futures = (0..10).map(|_| { 127 + let client = client.clone(); 128 + let did = did.clone(); 129 + let rev = rev.to_string(); 130 + async move { 131 + client 132 + .get(format!("{}/xrpc/com.atproto.sync.getRepo", base)) 133 + .query(&[("did", did.as_str()), ("since", rev.as_str())]) 134 + .send() 135 + .await 136 + .expect("Failed to send request") 137 + .status() 138 + } 139 + }); 140 + 141 + let statuses: Vec<StatusCode> = futures::future::join_all(request_futures).await; 142 + assert!( 143 + statuses.iter().all(|s| *s == StatusCode::OK), 144 + "getRepo with since should bypass semaphore, got: {:?}", 145 + statuses 146 + ); 147 + }
+1 -3
crates/tranquil-store/benches/recovery.rs
··· 100 100 } 101 101 102 102 let loc = self.data_writer.append_block(cid, data).unwrap(); 103 - self.hint_writer 104 - .append_hint(cid, loc.file_id, loc.offset, loc.length) 105 - .unwrap(); 103 + self.hint_writer.append_hint(cid, &loc).unwrap(); 106 104 self.blocks_in_file += 1; 107 105 108 106 if self.blocks_in_file.is_multiple_of(10_000) {
+2 -14
crates/tranquil-store/src/blockstore/compaction.rs
··· 162 162 } => match index.get(&cid_bytes) { 163 163 Some(e) if e.location.file_id == source_file_id && !e.refcount.is_zero() => { 164 164 let loc = writer.append_block(&cid_bytes, &data)?; 165 - hint_writer.append_relocate( 166 - &cid_bytes, 167 - loc.file_id, 168 - loc.offset, 169 - loc.length, 170 - e.refcount.raw(), 171 - )?; 165 + hint_writer.append_relocate(&cid_bytes, &loc, e.refcount.raw())?; 172 166 relocations.push((cid_bytes, loc)); 173 167 live_count = live_count.saturating_add(1); 174 168 } ··· 188 182 } 189 183 false => { 190 184 let loc = writer.append_block(&cid_bytes, &data)?; 191 - hint_writer.append_relocate( 192 - &cid_bytes, 193 - loc.file_id, 194 - loc.offset, 195 - loc.length, 196 - e.refcount.raw(), 197 - )?; 185 + hint_writer.append_relocate(&cid_bytes, &loc, e.refcount.raw())?; 198 186 relocations.push((cid_bytes, loc)); 199 187 live_count = live_count.saturating_add(1); 200 188 }
+1 -1
crates/tranquil-store/src/blockstore/group_commit.rs
··· 1136 1136 } 1137 1137 1138 1138 let loc = data_writer.append_block(cid_bytes, data)?; 1139 - hint_writer.append_hint(cid_bytes, loc.file_id, loc.offset, loc.length)?; 1139 + hint_writer.append_hint(cid_bytes, &loc)?; 1140 1140 1141 1141 block_bytes = block_bytes.saturating_add(data.len() as u64); 1142 1142 block_count = block_count.saturating_add(1);
+66 -108
crates/tranquil-store/src/blockstore/hint.rs
··· 55 55 io.write_all_at(fd, write_offset.raw(), record) 56 56 } 57 57 58 + fn encode_location_fields(record: &mut [u8; HINT_RECORD_SIZE], loc: &BlockLocation) { 59 + record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4].copy_from_slice(&loc.file_id.raw().to_le_bytes()); 60 + record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8] 61 + .copy_from_slice(&loc.length.raw().to_le_bytes()); 62 + record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8] 63 + .copy_from_slice(&loc.offset.raw().to_le_bytes()); 64 + } 65 + 58 66 pub(crate) fn encode_hint_record<S: StorageIO>( 59 67 io: &S, 60 68 fd: FileId, 61 69 write_offset: HintOffset, 62 70 cid_bytes: &[u8; CID_SIZE], 63 - file_id: DataFileId, 64 - block_offset: BlockOffset, 65 - length: BlockLength, 71 + loc: &BlockLocation, 66 72 ) -> io::Result<()> { 67 73 let mut record = [0u8; HINT_RECORD_SIZE]; 68 74 record[TYPE_OFFSET] = RECORD_TYPE_PUT; 69 75 record[VERSION_OFFSET] = HINT_FORMAT_VERSION; 70 76 record[CID_OFFSET..CID_OFFSET + CID_SIZE].copy_from_slice(cid_bytes); 71 - record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4].copy_from_slice(&file_id.raw().to_le_bytes()); 72 - record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8].copy_from_slice(&length.raw().to_le_bytes()); 73 - record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8].copy_from_slice(&block_offset.raw().to_le_bytes()); 77 + encode_location_fields(&mut record, loc); 74 78 75 79 let checksum = hint_checksum(&record[..HINT_PAYLOAD_SIZE]); 76 80 record[CHECKSUM_OFFSET..].copy_from_slice(&checksum.to_le_bytes()); ··· 85 89 fd: FileId, 86 90 write_offset: HintOffset, 87 91 cid_bytes: &[u8; CID_SIZE], 88 - file_id: DataFileId, 89 - block_offset: BlockOffset, 90 - length: BlockLength, 92 + loc: &BlockLocation, 91 93 refcount: u32, 92 94 ) -> io::Result<()> { 93 95 let mut record = [0u8; HINT_RECORD_SIZE]; ··· 96 98 let rc16 = u16::try_from(refcount).unwrap_or(u16::MAX); 97 99 record[REFCOUNT_OFFSET..REFCOUNT_OFFSET + 2].copy_from_slice(&rc16.to_le_bytes()); 98 100 record[CID_OFFSET..CID_OFFSET + CID_SIZE].copy_from_slice(cid_bytes); 99 - record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4].copy_from_slice(&file_id.raw().to_le_bytes()); 100 - record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8].copy_from_slice(&length.raw().to_le_bytes()); 101 - record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8].copy_from_slice(&block_offset.raw().to_le_bytes()); 101 + encode_location_fields(&mut record, loc); 102 102 103 103 let checksum = hint_checksum(&record[..HINT_PAYLOAD_SIZE]); 104 104 record[CHECKSUM_OFFSET..].copy_from_slice(&checksum.to_le_bytes()); ··· 323 323 pub fn append_hint( 324 324 &mut self, 325 325 cid_bytes: &[u8; CID_SIZE], 326 - file_id: DataFileId, 327 - offset: BlockOffset, 328 - length: BlockLength, 326 + loc: &BlockLocation, 329 327 ) -> io::Result<()> { 330 - encode_hint_record( 331 - self.io, 332 - self.fd, 333 - self.position, 334 - cid_bytes, 335 - file_id, 336 - offset, 337 - length, 338 - )?; 328 + encode_hint_record(self.io, self.fd, self.position, cid_bytes, loc)?; 339 329 self.position = self.position.advance(HINT_RECORD_SIZE as u64); 340 330 Ok(()) 341 331 } ··· 354 344 pub fn append_relocate( 355 345 &mut self, 356 346 cid_bytes: &[u8; CID_SIZE], 357 - file_id: DataFileId, 358 - offset: BlockOffset, 359 - length: BlockLength, 347 + loc: &BlockLocation, 360 348 refcount: u32, 361 349 ) -> io::Result<()> { 362 - encode_relocate_record( 363 - self.io, 364 - self.fd, 365 - self.position, 366 - cid_bytes, 367 - file_id, 368 - offset, 369 - length, 370 - refcount, 371 - )?; 350 + encode_relocate_record(self.io, self.fd, self.position, cid_bytes, loc, refcount)?; 372 351 self.position = self.position.advance(HINT_RECORD_SIZE as u64); 373 352 Ok(()) 374 353 } ··· 862 841 let offset = BlockOffset::new(1024); 863 842 let length = BlockLength::new(256); 864 843 865 - encode_hint_record(&sim, fd, HintOffset::new(0), &cid, file_id, offset, length).unwrap(); 844 + let loc = BlockLocation { file_id, offset, length }; 845 + encode_hint_record(&sim, fd, HintOffset::new(0), &cid, &loc).unwrap(); 866 846 867 847 let file_size = sim.file_size(fd).unwrap(); 868 848 let record = decode_hint_record(&sim, fd, HintOffset::new(0), file_size) ··· 920 900 (0u8..5).for_each(|i| { 921 901 let cid = test_cid(i); 922 902 let write_offset = HintOffset::new(i as u64 * HINT_RECORD_SIZE as u64); 923 - encode_hint_record( 924 - &sim, 925 - fd, 926 - write_offset, 927 - &cid, 928 - DataFileId::new(i as u32), 929 - BlockOffset::new(i as u64 * 100), 930 - BlockLength::new(50 + i as u32), 931 - ) 932 - .unwrap(); 903 + let loc = BlockLocation { 904 + file_id: DataFileId::new(i as u32), 905 + offset: BlockOffset::new(i as u64 * 100), 906 + length: BlockLength::new(50 + i as u32), 907 + }; 908 + encode_hint_record(&sim, fd, write_offset, &cid, &loc).unwrap(); 933 909 }); 934 910 935 911 let file_size = sim.file_size(fd).unwrap(); ··· 971 947 fn detects_corrupted_hint() { 972 948 let (sim, fd) = setup(); 973 949 let cid = test_cid(1); 974 - encode_hint_record( 975 - &sim, 976 - fd, 977 - HintOffset::new(0), 978 - &cid, 979 - DataFileId::new(0), 980 - BlockOffset::new(0), 981 - BlockLength::new(100), 982 - ) 983 - .unwrap(); 950 + let loc = BlockLocation { 951 + file_id: DataFileId::new(0), 952 + offset: BlockOffset::new(0), 953 + length: BlockLength::new(100), 954 + }; 955 + encode_hint_record(&sim, fd, HintOffset::new(0), &cid, &loc).unwrap(); 984 956 985 957 sim.write_all_at(fd, 10, &[0xFF]).unwrap(); 986 958 ··· 1006 978 fn oversized_length_treated_as_corrupted() { 1007 979 let (sim, fd) = setup(); 1008 980 let cid = test_cid(1); 1009 - encode_hint_record( 1010 - &sim, 1011 - fd, 1012 - HintOffset::new(0), 1013 - &cid, 1014 - DataFileId::new(0), 1015 - BlockOffset::new(0), 1016 - BlockLength::new(100), 1017 - ) 1018 - .unwrap(); 981 + let loc = BlockLocation { 982 + file_id: DataFileId::new(0), 983 + offset: BlockOffset::new(0), 984 + length: BlockLength::new(100), 985 + }; 986 + encode_hint_record(&sim, fd, HintOffset::new(0), &cid, &loc).unwrap(); 1019 987 1020 988 let length_offset = FIELD_A_OFFSET as u64 + 4; 1021 989 let oversized = (MAX_BLOCK_SIZE + 1).to_le_bytes(); ··· 1040 1008 let mut writer = HintFileWriter::new(&sim, fd); 1041 1009 1042 1010 (0u8..5).for_each(|i| { 1043 - writer 1044 - .append_hint( 1045 - &test_cid(i), 1046 - DataFileId::new(0), 1047 - BlockOffset::new(i as u64 * 100), 1048 - BlockLength::new(50 + i as u32), 1049 - ) 1050 - .unwrap(); 1011 + let loc = BlockLocation { 1012 + file_id: DataFileId::new(0), 1013 + offset: BlockOffset::new(i as u64 * 100), 1014 + length: BlockLength::new(50 + i as u32), 1015 + }; 1016 + writer.append_hint(&test_cid(i), &loc).unwrap(); 1051 1017 }); 1052 1018 1053 1019 assert_eq!( ··· 1074 1040 fn hint_writer_resume_continues_at_position() { 1075 1041 let (sim, fd) = setup(); 1076 1042 let mut writer = HintFileWriter::new(&sim, fd); 1077 - writer 1078 - .append_hint( 1079 - &test_cid(0), 1080 - DataFileId::new(0), 1081 - BlockOffset::new(0), 1082 - BlockLength::new(100), 1083 - ) 1084 - .unwrap(); 1043 + let loc0 = BlockLocation { 1044 + file_id: DataFileId::new(0), 1045 + offset: BlockOffset::new(0), 1046 + length: BlockLength::new(100), 1047 + }; 1048 + writer.append_hint(&test_cid(0), &loc0).unwrap(); 1085 1049 1086 1050 let pos = writer.position(); 1087 1051 let mut writer2 = HintFileWriter::resume(&sim, fd, pos); 1088 - writer2 1089 - .append_hint( 1090 - &test_cid(1), 1091 - DataFileId::new(0), 1092 - BlockOffset::new(100), 1093 - BlockLength::new(200), 1094 - ) 1095 - .unwrap(); 1052 + let loc1 = BlockLocation { 1053 + file_id: DataFileId::new(0), 1054 + offset: BlockOffset::new(100), 1055 + length: BlockLength::new(200), 1056 + }; 1057 + writer2.append_hint(&test_cid(1), &loc1).unwrap(); 1096 1058 1097 1059 let reader = HintFileReader::open(&sim, fd).unwrap(); 1098 1060 let valid_count = reader ··· 1115 1077 fn hint_reader_stops_on_truncated() { 1116 1078 let (sim, fd) = setup(); 1117 1079 let mut writer = HintFileWriter::new(&sim, fd); 1118 - writer 1119 - .append_hint( 1120 - &test_cid(0), 1121 - DataFileId::new(0), 1122 - BlockOffset::new(0), 1123 - BlockLength::new(100), 1124 - ) 1125 - .unwrap(); 1080 + let loc = BlockLocation { 1081 + file_id: DataFileId::new(0), 1082 + offset: BlockOffset::new(0), 1083 + length: BlockLength::new(100), 1084 + }; 1085 + writer.append_hint(&test_cid(0), &loc).unwrap(); 1126 1086 1127 1087 sim.write_all_at(fd, writer.position().raw(), &[0u8; HINT_RECORD_SIZE - 1]) 1128 1088 .unwrap(); ··· 1140 1100 let mut writer = HintFileWriter::new(&sim, fd); 1141 1101 1142 1102 (0u8..3).for_each(|i| { 1143 - writer 1144 - .append_hint( 1145 - &test_cid(i), 1146 - DataFileId::new(0), 1147 - BlockOffset::new(i as u64 * 100), 1148 - BlockLength::new(50), 1149 - ) 1150 - .unwrap(); 1103 + let loc = BlockLocation { 1104 + file_id: DataFileId::new(0), 1105 + offset: BlockOffset::new(i as u64 * 100), 1106 + length: BlockLength::new(50), 1107 + }; 1108 + writer.append_hint(&test_cid(i), &loc).unwrap(); 1151 1109 }); 1152 1110 1153 1111 sim.write_all_at(fd, HINT_RECORD_SIZE as u64 + 5, &[0xFF])
+2 -2
crates/tranquil-store/tests/sim_blockstore.rs
··· 75 75 let data = vec![seed as u8; data_size]; 76 76 let loc = writer.append_block(&cid, &data).unwrap(); 77 77 hint_writer 78 - .append_hint(&cid, loc.file_id, loc.offset, loc.length) 78 + .append_hint(&cid, &loc) 79 79 .unwrap(); 80 80 (cid, loc) 81 81 }) ··· 539 539 let data = vec![i as u8; 64]; 540 540 let loc = writer.append_block(&cid, &data).ok()?; 541 541 hint_writer 542 - .append_hint(&cid, loc.file_id, loc.offset, loc.length) 542 + .append_hint(&cid, &loc) 543 543 .ok()?; 544 544 Some(()) 545 545 })?;
+28 -12
crates/tranquil-sync/src/repo.rs
··· 142 142 return get_repo_since(&state, &did, &head_cid, since).await; 143 143 } 144 144 145 + let _permit = match state.repo_export_semaphore.try_acquire() { 146 + Ok(permit) => permit, 147 + Err(_) => { 148 + return ( 149 + StatusCode::SERVICE_UNAVAILABLE, 150 + "Too many concurrent repo exports", 151 + ) 152 + .into_response(); 153 + } 154 + }; 155 + 145 156 let car_bytes = match generate_repo_car_from_user_blocks( 146 157 state.repos.repo.as_ref(), 147 158 &state.block_store, ··· 213 224 .into_response(); 214 225 } 215 226 216 - let blocks = match state.block_store.get_many(&block_cids).await { 217 - Ok(b) => b, 218 - Err(e) => { 219 - error!("Block store error in get_repo_since: {:?}", e); 220 - return ApiError::InternalError(Some("Failed to get blocks".into())).into_response(); 221 - } 222 - }; 227 + for chunk_start in (0..block_cids.len()).step_by(500) { 228 + let chunk_end = (chunk_start + 500).min(block_cids.len()); 229 + let chunk = &block_cids[chunk_start..chunk_end]; 230 + let blocks = match state.block_store.get_many(chunk).await { 231 + Ok(b) => b, 232 + Err(e) => { 233 + error!("Block store error in get_repo_since: {:?}", e); 234 + return ApiError::InternalError(Some("Failed to get blocks".into())) 235 + .into_response(); 236 + } 237 + }; 223 238 224 - blocks 225 - .into_iter() 226 - .enumerate() 227 - .filter_map(|(i, block_opt)| block_opt.map(|block| (block_cids[i], block))) 228 - .for_each(|(cid, block)| car_bytes.extend_from_slice(&encode_car_block(&cid, &block))); 239 + chunk 240 + .iter() 241 + .zip(blocks.into_iter()) 242 + .filter_map(|(cid, block_opt)| block_opt.map(|block| (*cid, block))) 243 + .for_each(|(cid, block)| car_bytes.extend_from_slice(&encode_car_block(&cid, &block))); 244 + } 229 245 230 246 ( 231 247 StatusCode::OK,