wip: you can use your pds as a yrs-relay if you want to
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

finished refactor

notplants fb96dd4a 2a42bcf1

+426 -578
+7 -7
src/export.rs
··· 7 7 use std::path::Path; 8 8 9 9 use crate::pds_client::PdsClient; 10 - use crate::types::{FileKind, YrsRepo, COLLECTION}; 10 + use crate::types::{FileKind, YrsBranch, BRANCH_COLLECTION}; 11 11 use crate::yrs_pds; 12 12 13 13 /// Export a repo from PDS to plain text files. ··· 21 21 output_dir: &Path, 22 22 verbose: bool, 23 23 ) -> Result<usize, String> { 24 - // Fetch repo record 24 + // Fetch branch record 25 25 let record = client 26 - .get_record(did, COLLECTION, rkey) 26 + .get_record(did, BRANCH_COLLECTION, rkey) 27 27 .await? 28 - .ok_or_else(|| format!("repo record not found: {}", rkey))?; 28 + .ok_or_else(|| format!("branch record not found: {}", rkey))?; 29 29 30 - let repo: YrsRepo = 31 - serde_json::from_value(record.value).map_err(|e| format!("parse YrsRepo: {}", e))?; 30 + let branch: YrsBranch = 31 + serde_json::from_value(record.value).map_err(|e| format!("parse YrsBranch: {}", e))?; 32 32 33 33 // Load file index from blob 34 - let file_index = yrs_pds::load_file_index(&repo, client, did).await?; 34 + let file_index = yrs_pds::load_file_index(&branch, client, did).await?; 35 35 36 36 let mut files_exported = 0; 37 37
+2 -2
src/lib.rs
··· 21 21 pub use pack::{ 22 22 chunk_data, compress, create_compressed_pack, create_pack, decompress, extract_item, is_gzip, 23 23 is_precompressed_extension, parse_pack, parse_pack_auto, reassemble_chunks, Pack, 24 - PackDataType, PackItem, CHUNK_SIZE, 24 + PackItemDataType, PackItem, CHUNK_SIZE, 25 25 }; 26 26 pub use pds_client::PdsClient; 27 27 pub use save::{save, save_filtered}; 28 28 pub use sync::{sync_loop, SyncConfig, SyncCycleResult}; 29 - pub use types::{collect_blob_refs, Collaborator, FileIndex, FileKind, COLLECTION, MANIFEST_KEY}; 29 + pub use types::{collect_blob_refs, BranchRef, FileIndex, FileKind, YrsBranch, YrsRepo, BRANCH_COLLECTION, MANIFEST_KEY, REPO_COLLECTION};
+33 -57
src/load.rs
··· 4 4 use std::path::Path; 5 5 6 6 use crate::pds_client::PdsClient; 7 - use crate::types::{FileKind, LoadResult, YrsRepo, COLLECTION, MANIFEST_KEY}; 7 + use crate::types::{FileKind, LoadResult, YrsBranch, BRANCH_COLLECTION, MANIFEST_KEY}; 8 8 use crate::yrs_pds::{self}; 9 9 10 10 /// Load a repo from PDS into a directory. ··· 19 19 output_dir: &Path, 20 20 verbose: bool, 21 21 ) -> Result<LoadResult, String> { 22 - // Fetch repo record 22 + // Fetch branch record 23 23 let record = client 24 - .get_record(did, COLLECTION, rkey) 24 + .get_record(did, BRANCH_COLLECTION, rkey) 25 25 .await? 26 - .ok_or_else(|| format!("repo record not found: {}", rkey))?; 26 + .ok_or_else(|| format!("branch record not found: {}", rkey))?; 27 27 28 - let repo: YrsRepo = 29 - serde_json::from_value(record.value).map_err(|e| format!("parse YrsRepo: {}", e))?; 28 + let branch: YrsBranch = 29 + serde_json::from_value(record.value).map_err(|e| format!("parse YrsBranch: {}", e))?; 30 30 31 31 // Load file index from blob 32 - let file_index = yrs_pds::load_file_index(&repo, client, did).await?; 32 + let file_index = yrs_pds::load_file_index(&branch, client, did).await?; 33 33 34 34 // Determine which files to load 35 35 let file_list = if let Some(manifest_entry) = file_index.get(MANIFEST_KEY) { ··· 46 46 let mut files_loaded = 0; 47 47 let mut blobs_downloaded = 0; 48 48 49 - // Cache for pack blobs (keyed by CID) to avoid re-downloading 49 + // Cache for downloaded pack data (keyed by first blob CID) to avoid re-downloading 50 50 let mut pack_cache: HashMap<String, Vec<u8>> = HashMap::new(); 51 51 52 52 for (rel_path, entry) in &file_index { ··· 138 138 pack_cache: &mut HashMap<String, Vec<u8>>, 139 139 blobs_downloaded: &mut usize, 140 140 ) -> Result<Vec<u8>, String> { 141 - let cid = item_ref.blob.cid().to_string(); 141 + let cache_key = item_ref.blobs[0].cid().to_string(); 142 142 143 - if !pack_cache.contains_key(&cid) { 144 - let data = if let Some(ref chunks) = item_ref.chunks { 145 - let mut chunk_data = Vec::new(); 146 - for chunk_ref in chunks { 147 - let chunk = client.get_blob(did, chunk_ref.cid()).await?; 148 - *blobs_downloaded += 1; 149 - chunk_data.push(chunk); 150 - } 151 - crate::pack::reassemble_chunks(&chunk_data) 152 - } else { 153 - let d = client.get_blob(did, &cid).await?; 154 - *blobs_downloaded += 1; 155 - d 156 - }; 157 - pack_cache.insert(cid.clone(), data); 143 + if !pack_cache.contains_key(&cache_key) { 144 + let data = download_pack_blobs(&item_ref.blobs, client, did, blobs_downloaded).await?; 145 + pack_cache.insert(cache_key.clone(), data); 158 146 } 159 147 160 - let raw_pack = pack_cache.get(&cid).unwrap(); 148 + let raw_pack = pack_cache.get(&cache_key).unwrap(); 161 149 let (_, pack_data) = crate::pack::parse_pack_auto(raw_pack)?; 162 150 163 151 let start = item_ref.offset as usize; ··· 181 169 ) -> Result<Vec<u8>, String> { 182 170 let item_ref = entry.base.as_ref() 183 171 .ok_or("missing base on FileEntry")?; 184 - let cid = item_ref.blob.cid().to_string(); 172 + fetch_pack_item_cached(item_ref, client, did, pack_cache, blobs_downloaded).await 173 + } 185 174 186 - // Fetch pack blob (or use cache), handling chunked packs 187 - if !pack_cache.contains_key(&cid) { 188 - let data = if let Some(ref chunks) = item_ref.chunks { 189 - // Reassemble chunked pack 190 - let mut chunk_data = Vec::new(); 191 - for chunk_ref in chunks { 192 - let chunk = client.get_blob(did, chunk_ref.cid()).await?; 193 - *blobs_downloaded += 1; 194 - chunk_data.push(chunk); 195 - } 196 - crate::pack::reassemble_chunks(&chunk_data) 197 - } else { 198 - let d = client.get_blob(did, &cid).await?; 175 + /// Download and reassemble pack blobs, tracking download count. 176 + async fn download_pack_blobs( 177 + blobs: &[crate::types::BlobRef], 178 + client: &PdsClient, 179 + did: &str, 180 + blobs_downloaded: &mut usize, 181 + ) -> Result<Vec<u8>, String> { 182 + if blobs.len() == 1 { 183 + let d = client.get_blob(did, blobs[0].cid()).await?; 184 + *blobs_downloaded += 1; 185 + Ok(d) 186 + } else { 187 + let mut chunks = Vec::new(); 188 + for blob in blobs { 189 + let chunk = client.get_blob(did, blob.cid()).await?; 199 190 *blobs_downloaded += 1; 200 - d 201 - }; 202 - pack_cache.insert(cid.clone(), data); 203 - } 204 - 205 - let raw_pack = pack_cache.get(&cid).unwrap(); 206 - let (_, pack_data) = crate::pack::parse_pack_auto(raw_pack)?; 207 - 208 - let start = item_ref.offset as usize; 209 - let end = start + item_ref.length as usize; 210 - if end > pack_data.len() { 211 - return Err(format!( 212 - "pack item ref out of bounds: {}..{} in {} bytes", 213 - start, 214 - end, 215 - pack_data.len() 216 - )); 191 + chunks.push(chunk); 192 + } 193 + Ok(crate::pack::reassemble_chunks(&chunks)) 217 194 } 218 - Ok(pack_data[start..end].to_vec()) 219 195 }
+5 -5
src/local_state.rs
··· 25 25 pub handle: String, 26 26 pub did: String, 27 27 pub project: String, 28 - pub repo_rkey: String, 28 + pub branch_rkey: String, 29 29 /// Cached access token (avoids re-login on every invocation). 30 30 #[serde(default, skip_serializing_if = "Option::is_none")] 31 31 pub access_token: Option<String>, ··· 172 172 ) -> Result<(String, bool), String> { 173 173 if let Some(config) = self.load_config()? { 174 174 if config.project == project { 175 - return Ok((config.repo_rkey, false)); 175 + return Ok((config.branch_rkey, false)); 176 176 } 177 177 } 178 178 let suffix = generate_random_suffix(); ··· 182 182 handle: handle.to_string(), 183 183 did: did.to_string(), 184 184 project: project.to_string(), 185 - repo_rkey: rkey.clone(), 185 + branch_rkey: rkey.clone(), 186 186 access_token: None, 187 187 refresh_token: None, 188 188 }; ··· 328 328 handle: "user.example.com".to_string(), 329 329 did: "did:plc:abc123".to_string(), 330 330 project: "my-site".to_string(), 331 - repo_rkey: "my-site-a1b2c3d4".to_string(), 331 + branch_rkey: "my-site-a1b2c3d4".to_string(), 332 332 access_token: None, 333 333 refresh_token: None, 334 334 }; ··· 337 337 let loaded = state.load_config().unwrap().unwrap(); 338 338 assert_eq!(loaded.pds_url, "https://pds.example.com"); 339 339 assert_eq!(loaded.project, "my-site"); 340 - assert_eq!(loaded.repo_rkey, "my-site-a1b2c3d4"); 340 + assert_eq!(loaded.branch_rkey, "my-site-a1b2c3d4"); 341 341 } 342 342 343 343 #[test]
+83 -100
src/main.rs
··· 61 61 #[arg(long)] 62 62 verbose: bool, 63 63 }, 64 - /// Merge repos from all devices for a project 64 + /// Merge all branches for a project 65 65 Merge { 66 - /// Project name (uses collaborators from own record, or falls back to listRecords) 66 + /// Project name 67 67 #[arg(long)] 68 68 project: String, 69 69 /// ATProto handle ··· 78 78 /// PDS URL 79 79 #[arg(long)] 80 80 pds: Option<String>, 81 - /// Working directory with .yrs/ config (to resolve own rkey for collaborator lookup) 82 - #[arg(long)] 83 - dir: Option<String>, 84 81 /// Show progress 85 82 #[arg(long)] 86 83 verbose: bool, ··· 194 191 output, 195 192 password, 196 193 pds, 197 - dir, 198 194 verbose, 199 - } => run_merge(&project, &handle, &output, &password, pds.as_deref(), dir.as_deref(), verbose).await, 195 + } => run_merge(&project, &handle, &output, &password, pds.as_deref(), verbose).await, 200 196 Command::Sync { 201 197 dir, 202 198 handle, ··· 326 322 ); 327 323 } 328 324 329 - // On first initialization, discover peers and register as collaborator 330 - let initial_collaborators = if is_new { 331 - discover_and_register(&client, &did, project, &rkey, verbose).await? 332 - } else { 333 - None 334 - }; 335 - 336 325 let inc = if include.is_empty() { 337 326 None 338 327 } else { ··· 348 337 &client, 349 338 &did, 350 339 &rkey, 351 - project, 352 - initial_collaborators.as_deref(), 353 340 inc, 354 341 exc, 355 342 verbose, 356 343 ) 357 344 .await?; 345 + 346 + // Ensure project's YrsRepo record lists our branch 347 + ensure_repo_record(&client, &did, project, &rkey, verbose).await?; 348 + 358 349 eprintln!( 359 350 "pds-yrs: saved {} file(s), skipped {} unchanged, {} bytes total", 360 351 result.files_uploaded, result.files_skipped, result.total_bytes ··· 362 353 Ok(()) 363 354 } 364 355 365 - /// On first initialization, discover existing repos for the same project, 366 - /// add our rkey as a collaborator to each, and return them as our initial collaborators. 367 - async fn discover_and_register( 356 + /// Ensure the project's YrsRepo record lists our branch rkey. 357 + /// 358 + /// 1. If the YrsRepo doesn't exist → create it with our branch 359 + /// 2. If it exists but doesn't list us → add our branch, put back with swapRecord 360 + /// 3. If it exists and already lists us → no-op 361 + async fn ensure_repo_record( 368 362 client: &pds_yrs::PdsClient, 369 363 did: &str, 370 364 project: &str, 371 - our_rkey: &str, 365 + our_branch_rkey: &str, 372 366 verbose: bool, 373 - ) -> Result<Option<Vec<pds_yrs::types::Collaborator>>, String> { 374 - let records = client 375 - .list_all_records(did, pds_yrs::COLLECTION) 367 + ) -> Result<(), String> { 368 + let existing = client 369 + .get_record(did, pds_yrs::REPO_COLLECTION, project) 376 370 .await?; 377 371 378 - let mut peer_collaborators: Vec<pds_yrs::types::Collaborator> = Vec::new(); 379 - 380 - for entry in &records { 381 - let peer_rkey = match entry.uri.rsplit('/').next() { 382 - Some(r) => r.to_string(), 383 - None => continue, 384 - }; 385 - if peer_rkey == our_rkey { 386 - continue; 387 - } 388 - let repo: pds_yrs::types::YrsRepo = match serde_json::from_value(entry.value.clone()) { 389 - Ok(r) => r, 390 - Err(_) => continue, 391 - }; 392 - if repo.name != project { 393 - continue; 372 + match existing { 373 + None => { 374 + // Create new YrsRepo 375 + let repo = pds_yrs::YrsRepo { 376 + name: project.to_string(), 377 + branches: vec![pds_yrs::BranchRef { 378 + rkey: our_branch_rkey.to_string(), 379 + label: None, 380 + }], 381 + updated_at: chrono::Utc::now().to_rfc3339(), 382 + }; 383 + let json = serde_json::to_value(&repo) 384 + .map_err(|e| format!("serialize YrsRepo: {}", e))?; 385 + client 386 + .put_record(did, pds_yrs::REPO_COLLECTION, project, json, None) 387 + .await?; 388 + if verbose { 389 + eprintln!("pds-yrs: created repo record for project '{}'", project); 390 + } 394 391 } 392 + Some(record) => { 393 + let mut repo: pds_yrs::YrsRepo = serde_json::from_value(record.value) 394 + .map_err(|e| format!("parse YrsRepo: {}", e))?; 395 395 396 - // Add our rkey as collaborator to this peer's record 397 - if !repo.collaborators.iter().any(|c| c.rkey == our_rkey) { 398 - let mut updated = repo.clone(); 399 - updated.collaborators.push(pds_yrs::types::Collaborator { 400 - rkey: our_rkey.to_string(), 401 - pds: None, 396 + if repo.branches.iter().any(|b| b.rkey == our_branch_rkey) { 397 + // Already listed — no-op 398 + return Ok(()); 399 + } 400 + 401 + // Add our branch 402 + repo.branches.push(pds_yrs::BranchRef { 403 + rkey: our_branch_rkey.to_string(), 404 + label: None, 402 405 }); 403 - let updated_json = serde_json::to_value(&updated) 404 - .map_err(|e| format!("serialize updated peer: {}", e))?; 406 + repo.updated_at = chrono::Utc::now().to_rfc3339(); 407 + 408 + let json = serde_json::to_value(&repo) 409 + .map_err(|e| format!("serialize YrsRepo: {}", e))?; 405 410 client 406 - .put_record(did, pds_yrs::COLLECTION, &peer_rkey, updated_json, entry.cid.clone()) 411 + .put_record(did, pds_yrs::REPO_COLLECTION, project, json, record.cid) 407 412 .await?; 408 413 if verbose { 409 414 eprintln!( 410 - "pds-yrs: registered as collaborator on peer '{}'", 411 - peer_rkey 415 + "pds-yrs: added branch '{}' to repo record for '{}'", 416 + our_branch_rkey, project 412 417 ); 413 418 } 414 419 } 415 - 416 - // Track this peer as our collaborator 417 - peer_collaborators.push(pds_yrs::types::Collaborator { 418 - rkey: peer_rkey, 419 - pds: None, 420 - }); 421 420 } 422 421 423 - if peer_collaborators.is_empty() { 424 - Ok(None) 425 - } else { 426 - if verbose { 427 - let names: Vec<&str> = peer_collaborators.iter().map(|c| c.rkey.as_str()).collect(); 428 - eprintln!("pds-yrs: discovered {} peer(s): {}", names.len(), names.join(", ")); 429 - } 430 - Ok(Some(peer_collaborators)) 431 - } 422 + Ok(()) 432 423 } 433 424 434 425 async fn run_load( ··· 443 434 let output_path = std::path::Path::new(output); 444 435 let local_state = pds_yrs::LocalState::open(output_path)?; 445 436 let rkey = match local_state.load_config()? { 446 - Some(config) if config.project == project => config.repo_rkey, 437 + Some(config) if config.project == project => config.branch_rkey, 447 438 _ => return Err(format!( 448 439 "no device rkey found for project '{}' — run 'save' first to initialize", 449 440 project ··· 466 457 output: &str, 467 458 password: &str, 468 459 pds_url: Option<&str>, 469 - dir: Option<&str>, 470 460 verbose: bool, 471 461 ) -> Result<(), String> { 472 462 let (client, did) = login(handle, password, pds_url).await?; 473 - // Try to resolve own rkey from local config for collaborator-based merge 474 - let own_rkey = if let Some(d) = dir { 475 - let local_state = pds_yrs::LocalState::open(std::path::Path::new(d))?; 476 - local_state 477 - .load_config()? 478 - .filter(|c| c.project == project) 479 - .map(|c| c.repo_rkey) 480 - } else { 481 - None 482 - }; 483 463 pds_yrs::merge_project( 484 464 &client, 485 465 &did, 486 466 project, 487 - own_rkey.as_deref(), 488 467 std::path::Path::new(output), 489 468 verbose, 490 469 ) ··· 508 487 let dir_path = std::path::Path::new(dir); 509 488 let (client, did) = login_cached(dir_path, handle, password, Some(url)).await?; 510 489 let local_state = pds_yrs::LocalState::open(dir_path)?; 511 - let (rkey, is_new) = local_state.ensure_device_rkey(project, url, handle, &did)?; 512 - if is_new { 513 - discover_and_register(&client, &did, project, &rkey, verbose).await?; 514 - } 490 + let (rkey, _is_new) = local_state.ensure_device_rkey(project, url, handle, &did)?; 491 + // Ensure project's YrsRepo record lists our branch 492 + ensure_repo_record(&client, &did, project, &rkey, verbose).await?; 515 493 let config = pds_yrs::SyncConfig { 516 494 dir: dir.to_string(), 517 495 interval: std::time::Duration::from_secs(interval), ··· 525 503 "pds-yrs: starting sync project='{}', rkey='{}' (interval={}s, Ctrl+C to stop)", 526 504 project, rkey, interval 527 505 ); 528 - pds_yrs::sync_loop(&client, &did, &rkey, project, &config).await?; 506 + pds_yrs::sync_loop(&client, &did, &rkey, &config).await?; 529 507 Ok(()) 530 508 } 531 509 ··· 541 519 let output_path = std::path::Path::new(output); 542 520 let local_state = pds_yrs::LocalState::open(output_path)?; 543 521 let rkey = match local_state.load_config()? { 544 - Some(config) if config.project == project => config.repo_rkey, 522 + Some(config) if config.project == project => config.branch_rkey, 545 523 _ => return Err(format!( 546 524 "no device rkey found for project '{}' — run 'save' first to initialize", 547 525 project ··· 562 540 pds_url: Option<&str>, 563 541 ) -> Result<(), String> { 564 542 let (client, did) = login(handle, password, pds_url).await?; 543 + 544 + // List YrsRepo records (project-level) 565 545 let records = client 566 - .list_all_records(&did, pds_yrs::COLLECTION) 546 + .list_all_records(&did, pds_yrs::REPO_COLLECTION) 567 547 .await?; 568 548 569 - let mut entries: Vec<(String, String, String, usize, usize)> = Vec::new(); 549 + let mut repos: Vec<pds_yrs::YrsRepo> = Vec::new(); 570 550 for entry in &records { 571 - if let Ok(repo) = serde_json::from_value::<pds_yrs::types::YrsRepo>(entry.value.clone()) { 551 + if let Ok(repo) = serde_json::from_value::<pds_yrs::YrsRepo>(entry.value.clone()) { 572 552 if let Some(filter) = project { 573 553 if repo.name != filter { 574 554 continue; 575 555 } 576 556 } 577 - let rkey = entry 578 - .uri 579 - .rsplit('/') 580 - .next() 581 - .unwrap_or("?") 582 - .to_string(); 583 - let blob_count = repo.blobs.len(); 584 - let collab_count = repo.collaborators.len(); 585 - entries.push((repo.name, rkey, repo.updated_at, blob_count, collab_count)); 557 + repos.push(repo); 586 558 } 587 559 } 588 560 589 - if entries.is_empty() { 561 + if repos.is_empty() { 590 562 eprintln!("pds-yrs: no repos found"); 591 563 return Ok(()); 592 564 } 593 565 594 - println!("{:<20} {:<30} {:<25} {:<6} {}", "PROJECT", "RKEY", "UPDATED", "BLOBS", "COLLABS"); 595 - for (name, rkey, updated, files, collabs) in &entries { 596 - println!("{:<20} {:<30} {:<25} {:<6} {}", name, rkey, updated, files, collabs); 566 + for repo in &repos { 567 + println!("PROJECT: {} (updated: {})", repo.name, repo.updated_at); 568 + if repo.branches.is_empty() { 569 + println!(" (no branches)"); 570 + } else { 571 + for branch in &repo.branches { 572 + let label = branch.label.as_deref().unwrap_or(""); 573 + if label.is_empty() { 574 + println!(" branch: {}", branch.rkey); 575 + } else { 576 + println!(" branch: {} ({})", branch.rkey, label); 577 + } 578 + } 579 + } 597 580 } 598 581 599 582 Ok(())
+65 -79
src/merge.rs
··· 1 - //! Merge multiple collaborators' repos via CRDT. 1 + //! Merge multiple branches via CRDT. 2 2 3 3 use std::collections::HashMap; 4 4 use std::path::Path; ··· 8 8 9 9 use crate::pack; 10 10 use crate::pds_client::PdsClient; 11 - use crate::types::{FileEntry, FileIndex, FileKind, PackItemRef, YrsRepo, COLLECTION, MANIFEST_KEY}; 11 + use crate::types::{FileEntry, FileIndex, FileKind, PackItemRef, YrsBranch, YrsRepo, BRANCH_COLLECTION, MANIFEST_KEY, REPO_COLLECTION}; 12 12 use crate::yrs_pds; 13 13 14 - /// Merge all repos for a project. 14 + /// Merge all branches for a project. 15 15 /// 16 - /// If `own_rkey` is provided, uses the collaborators field from that record 17 - /// to discover peer rkeys (no `listRecords` needed). Otherwise falls back 18 - /// to `listRecords` to discover all repos for the project. 16 + /// Fetches the `YrsRepo` record at `yrsrepo/<project>` to discover all 17 + /// branch rkeys, then fetches each `YrsBranch` and CRDT-merges them. 19 18 pub async fn merge_project( 20 19 client: &PdsClient, 21 20 did: &str, 22 21 project_name: &str, 23 - own_rkey: Option<&str>, 24 22 output_dir: &Path, 25 23 verbose: bool, 26 24 ) -> Result<(), String> { 27 - let mut rkeys: Vec<String> = Vec::new(); 25 + // Fetch YrsRepo to get all branch rkeys 26 + let repo_record = client 27 + .get_record(did, REPO_COLLECTION, project_name) 28 + .await? 29 + .ok_or_else(|| format!("no repo record found for project: {}", project_name))?; 28 30 29 - // Try collaborators from own record first 30 - if let Some(rkey) = own_rkey { 31 - if let Some(record) = client.get_record(did, COLLECTION, rkey).await? { 32 - if let Ok(repo) = serde_json::from_value::<YrsRepo>(record.value) { 33 - if repo.name == project_name { 34 - rkeys.push(rkey.to_string()); 35 - for collab in &repo.collaborators { 36 - if !rkeys.iter().any(|r| r == &collab.rkey) { 37 - rkeys.push(collab.rkey.clone()); 38 - } 39 - } 40 - } 41 - } 42 - } 43 - } 31 + let repo: YrsRepo = serde_json::from_value(repo_record.value) 32 + .map_err(|e| format!("parse YrsRepo for {}: {}", project_name, e))?; 44 33 45 - // Fall back to listRecords if no collaborators found 46 - if rkeys.is_empty() { 47 - if verbose { 48 - eprintln!("pds-yrs: no collaborators found, discovering via listRecords"); 49 - } 50 - let records = client.list_all_records(did, COLLECTION).await?; 51 - for entry in &records { 52 - if let Ok(repo) = serde_json::from_value::<YrsRepo>(entry.value.clone()) { 53 - if repo.name == project_name { 54 - if let Some(rkey) = entry.uri.rsplit('/').next() { 55 - rkeys.push(rkey.to_string()); 56 - } 57 - } 58 - } 59 - } 34 + if repo.branches.is_empty() { 35 + return Err(format!("no branches found for project: {}", project_name)); 60 36 } 61 37 62 - if rkeys.is_empty() { 63 - return Err(format!("no repos found for project: {}", project_name)); 64 - } 38 + let rkeys: Vec<String> = repo.branches.iter().map(|b| b.rkey.clone()).collect(); 65 39 if verbose { 66 40 eprintln!( 67 - "pds-yrs: merging {} repo(s) for project '{}': {}", 41 + "pds-yrs: merging {} branch(es) for project '{}': {}", 68 42 rkeys.len(), 69 43 project_name, 70 44 rkeys.join(", ") ··· 74 48 merge_repos(client, did, &rkey_refs, output_dir, verbose).await 75 49 } 76 50 77 - /// Merge repos from multiple rkeys into an output directory. 51 + /// Merge branches from multiple rkeys into an output directory. 78 52 /// 79 53 /// For text files: CRDT merge all Yrs Docs (conflict-free). 80 54 /// For binary files: detect conflicts via CID comparison, create ··· 87 61 output_dir: &Path, 88 62 verbose: bool, 89 63 ) -> Result<(), String> { 90 - // Fetch all repo records and load their file indices 91 - let mut repos: Vec<(String, YrsRepo, FileIndex)> = Vec::new(); 64 + // Fetch all branch records and load their file indices 65 + let mut repos: Vec<(String, YrsBranch, FileIndex)> = Vec::new(); 92 66 for rkey in rkeys { 93 67 let record = client 94 - .get_record(did, COLLECTION, rkey) 68 + .get_record(did, BRANCH_COLLECTION, rkey) 95 69 .await? 96 - .ok_or_else(|| format!("repo record not found: {}", rkey))?; 97 - let repo: YrsRepo = serde_json::from_value(record.value) 98 - .map_err(|e| format!("parse YrsRepo for {}: {}", rkey, e))?; 99 - let file_index = yrs_pds::load_file_index(&repo, client, did).await?; 100 - repos.push((rkey.to_string(), repo, file_index)); 70 + .ok_or_else(|| format!("branch record not found: {}", rkey))?; 71 + let branch: YrsBranch = serde_json::from_value(record.value) 72 + .map_err(|e| format!("parse YrsBranch for {}: {}", rkey, e))?; 73 + let file_index = yrs_pds::load_file_index(&branch, client, did).await?; 74 + repos.push((rkey.to_string(), branch, file_index)); 101 75 } 102 76 103 - // Pack cache: keyed by CID, avoids redundant blob downloads. 77 + // Pack cache: keyed by first blob CID, avoids redundant blob downloads. 104 78 // All files in a repo typically share 1-2 packs, so this reduces 105 79 // O(N × sites) blob downloads to O(packs × sites). 106 80 let mut pack_cache: HashMap<String, Vec<u8>> = HashMap::new(); ··· 165 139 client, 166 140 did, 167 141 output_dir, 142 + &mut pack_cache, 168 143 ) 169 144 .await?; 170 145 } ··· 180 155 181 156 /// CRDT-merge manifest Maps from all repos. 182 157 async fn merge_manifests( 183 - repos: &[(String, YrsRepo, FileIndex)], 158 + repos: &[(String, YrsBranch, FileIndex)], 184 159 client: &PdsClient, 185 160 did: &str, 186 161 pack_cache: &mut HashMap<String, Vec<u8>>, ··· 223 198 async fn merge_text_file( 224 199 rel_path: &str, 225 200 repo_indices: &[usize], 226 - repos: &[(String, YrsRepo, FileIndex)], 201 + repos: &[(String, YrsBranch, FileIndex)], 227 202 client: &PdsClient, 228 203 did: &str, 229 204 pack_cache: &mut HashMap<String, Vec<u8>>, ··· 254 229 Ok(yrs_pds::materialize(merged_doc)) 255 230 } 256 231 257 - /// Handle binary file merge — detect conflicts via CID comparison. 232 + /// Handle binary file merge — detect conflicts via content hash comparison. 258 233 async fn merge_binary_file( 259 234 rel_path: &str, 260 235 repo_indices: &[usize], 261 - repos: &[(String, YrsRepo, FileIndex)], 236 + repos: &[(String, YrsBranch, FileIndex)], 262 237 rkeys: &[&str], 263 238 client: &PdsClient, 264 239 did: &str, 265 240 output_dir: &Path, 241 + pack_cache: &mut HashMap<String, Vec<u8>>, 266 242 ) -> Result<(), String> { 267 243 if repo_indices.len() == 1 { 268 244 // Only one repo has this binary file — just download it 269 245 let entry = &repos[repo_indices[0]].2[rel_path]; 270 - let data = client.get_blob(did, entry.base_blob.cid()).await?; 246 + let data = fetch_file_data_cached(entry, client, did, pack_cache).await?; 271 247 let output_path = output_dir.join(rel_path); 272 248 std::fs::write(&output_path, &data) 273 249 .map_err(|e| format!("write {:?}: {}", output_path, e))?; 274 250 return Ok(()); 275 251 } 276 252 277 - // Collect CIDs from all repos 278 - let mut cid_repo: HashMap<String, Vec<usize>> = HashMap::new(); 253 + // Group repos by content hash to detect conflicts 254 + let mut hash_repo: HashMap<String, Vec<usize>> = HashMap::new(); 279 255 for &idx in repo_indices { 280 256 let entry = &repos[idx].2[rel_path]; 281 - cid_repo 282 - .entry(entry.base_blob.cid().to_string()) 257 + hash_repo 258 + .entry(entry.content_hash.clone()) 283 259 .or_default() 284 260 .push(idx); 285 261 } 286 262 287 - if cid_repo.len() == 1 { 288 - // All repos have the same CID — no conflict 263 + if hash_repo.len() == 1 { 264 + // All repos have the same content — no conflict 289 265 let entry = &repos[repo_indices[0]].2[rel_path]; 290 - let data = client.get_blob(did, entry.base_blob.cid()).await?; 266 + let data = fetch_file_data_cached(entry, client, did, pack_cache).await?; 291 267 let output_path = output_dir.join(rel_path); 292 268 std::fs::write(&output_path, &data) 293 269 .map_err(|e| format!("write {:?}: {}", output_path, e))?; ··· 298 274 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or(""); 299 275 let parent = path.parent().unwrap_or(std::path::Path::new("")); 300 276 301 - for (cid, indices) in &cid_repo { 277 + for (_hash, indices) in &hash_repo { 302 278 let repo_name = &rkeys[indices[0]]; 303 279 let conflict_name = if ext.is_empty() { 304 280 format!("{}.{}", stem, repo_name) ··· 309 285 if let Some(p) = conflict_path.parent() { 310 286 std::fs::create_dir_all(p).map_err(|e| format!("create dir {:?}: {}", p, e))?; 311 287 } 312 - let data = client.get_blob(did, cid).await?; 288 + let entry = &repos[indices[0]].2[rel_path]; 289 + let data = fetch_file_data_cached(entry, client, did, pack_cache).await?; 313 290 std::fs::write(&conflict_path, &data) 314 291 .map_err(|e| format!("write {:?}: {}", conflict_path, e))?; 315 292 } ··· 356 333 did: &str, 357 334 pack_cache: &mut HashMap<String, Vec<u8>>, 358 335 ) -> Result<Vec<u8>, String> { 359 - let cid = item_ref.blob.cid().to_string(); 336 + let cache_key = item_ref.blobs[0].cid().to_string(); 360 337 361 - if !pack_cache.contains_key(&cid) { 362 - let data = if let Some(ref chunks) = item_ref.chunks { 363 - let mut chunk_data = Vec::new(); 364 - for chunk_ref in chunks { 365 - chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); 366 - } 367 - pack::reassemble_chunks(&chunk_data) 368 - } else { 369 - client.get_blob(did, &cid).await? 370 - }; 371 - pack_cache.insert(cid.clone(), data); 338 + if !pack_cache.contains_key(&cache_key) { 339 + let data = download_pack_blobs(&item_ref.blobs, client, did).await?; 340 + pack_cache.insert(cache_key.clone(), data); 372 341 } 373 342 374 - let raw_pack = pack_cache.get(&cid).unwrap(); 343 + let raw_pack = pack_cache.get(&cache_key).unwrap(); 375 344 let (_, pack_data) = pack::parse_pack_auto(raw_pack)?; 376 345 377 346 let start = item_ref.offset as usize; ··· 383 352 )); 384 353 } 385 354 Ok(pack_data[start..end].to_vec()) 355 + } 356 + 357 + /// Download and reassemble pack blobs into a single byte vector. 358 + async fn download_pack_blobs( 359 + blobs: &[crate::types::BlobRef], 360 + client: &PdsClient, 361 + did: &str, 362 + ) -> Result<Vec<u8>, String> { 363 + if blobs.len() == 1 { 364 + client.get_blob(did, blobs[0].cid()).await 365 + } else { 366 + let mut chunks = Vec::new(); 367 + for blob in blobs { 368 + chunks.push(client.get_blob(did, blob.cid()).await?); 369 + } 370 + Ok(pack::reassemble_chunks(&chunks)) 371 + } 386 372 } 387 373 388 374 /// Generate a conflict filename: stem.repo_name.ext
+28 -28
src/pack.rs
··· 19 19 /// Length of data. 20 20 pub length: u64, 21 21 /// What this data represents. 22 - pub data_type: PackDataType, 22 + pub data_type: PackItemDataType, 23 23 } 24 24 25 25 /// Type of data in a pack item. 26 26 #[derive(Debug, Clone, Serialize, Deserialize)] 27 - pub enum PackDataType { 27 + pub enum PackItemDataType { 28 28 /// BaseYrsUpdate — full Yrs state encoded against empty StateVector. 29 29 BaseYrsUpdate, 30 30 /// YrsUpdate — incremental diff encoded against a non-empty StateVector. ··· 42 42 } 43 43 44 44 /// Build a pack from a set of named data items. 45 - pub fn create_pack(items: &[(&str, &[u8], PackDataType)]) -> Pack { 45 + pub fn create_pack(items: &[(&str, &[u8], PackItemDataType)]) -> Pack { 46 46 // Build index items 47 47 let mut pack_items = Vec::new(); 48 48 let mut offset: u64 = 0; ··· 194 194 } 195 195 196 196 /// Create a compressed pack. Compresses the entire pack with gzip. 197 - pub fn create_compressed_pack(items: &[(&str, &[u8], PackDataType)]) -> Pack { 197 + pub fn create_compressed_pack(items: &[(&str, &[u8], PackItemDataType)]) -> Pack { 198 198 let pack = create_pack(items); 199 199 let compressed = compress(&pack.data); 200 200 ··· 229 229 230 230 #[test] 231 231 fn pack_round_trip() { 232 - let items: Vec<(&str, &[u8], PackDataType)> = vec![ 232 + let items: Vec<(&str, &[u8], PackItemDataType)> = vec![ 233 233 ( 234 234 "docs/index.md", 235 235 b"base update data for index", 236 - PackDataType::BaseYrsUpdate, 236 + PackItemDataType::BaseYrsUpdate, 237 237 ), 238 238 ( 239 239 "docs/about.md", 240 240 b"base update data for about", 241 - PackDataType::BaseYrsUpdate, 241 + PackItemDataType::BaseYrsUpdate, 242 242 ), 243 - ("images/logo.png", b"raw png bytes", PackDataType::Binary), 243 + ("images/logo.png", b"raw png bytes", PackItemDataType::Binary), 244 244 ]; 245 245 246 246 let pack = create_pack(&items); ··· 264 264 265 265 #[test] 266 266 fn pack_empty() { 267 - let items: Vec<(&str, &[u8], PackDataType)> = vec![]; 267 + let items: Vec<(&str, &[u8], PackItemDataType)> = vec![]; 268 268 let pack = create_pack(&items); 269 269 let (parsed_items, _) = parse_pack(&pack.data).unwrap(); 270 270 assert_eq!(parsed_items.len(), 0); ··· 273 273 #[test] 274 274 fn pack_single_large_item() { 275 275 let big_data = vec![42u8; 100_000]; 276 - let items: Vec<(&str, &[u8], PackDataType)> = 277 - vec![("big.bin", &big_data, PackDataType::Binary)]; 276 + let items: Vec<(&str, &[u8], PackItemDataType)> = 277 + vec![("big.bin", &big_data, PackItemDataType::Binary)]; 278 278 let pack = create_pack(&items); 279 279 let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); 280 280 let extracted = extract_item(&parsed_items[0], pack_data).unwrap(); ··· 300 300 fn compressed_pack_round_trip() { 301 301 // Use repetitive data so compression actually helps 302 302 let text = "Hello world! ".repeat(100); 303 - let items: Vec<(&str, &[u8], PackDataType)> = 304 - vec![("file.md", text.as_bytes(), PackDataType::BaseYrsUpdate)]; 303 + let items: Vec<(&str, &[u8], PackItemDataType)> = 304 + vec![("file.md", text.as_bytes(), PackItemDataType::BaseYrsUpdate)]; 305 305 let pack = create_compressed_pack(&items); 306 306 // Compressed should be smaller for repetitive data 307 307 let uncompressed = create_pack(&items); ··· 316 316 317 317 #[test] 318 318 fn uncompressed_pack_auto_parse() { 319 - let items: Vec<(&str, &[u8], PackDataType)> = 320 - vec![("file.md", b"short", PackDataType::BaseYrsUpdate)]; 319 + let items: Vec<(&str, &[u8], PackItemDataType)> = 320 + vec![("file.md", b"short", PackItemDataType::BaseYrsUpdate)]; 321 321 let pack = create_pack(&items); 322 322 // Should work fine with parse_pack_auto even without compression 323 323 let (parsed_items, pack_data) = parse_pack_auto(&pack.data).unwrap(); ··· 356 356 #[test] 357 357 fn pack_mixed_data_types() { 358 358 // pack containing base update, incremental update, and binary items 359 - let items: Vec<(&str, &[u8], PackDataType)> = vec![ 360 - ("index.md", b"yrs base update bytes", PackDataType::BaseYrsUpdate), 361 - ("index.md.update", b"yrs incremental update", PackDataType::YrsUpdate), 362 - ("logo.png", b"\x89PNG raw data", PackDataType::Binary), 359 + let items: Vec<(&str, &[u8], PackItemDataType)> = vec![ 360 + ("index.md", b"yrs base update bytes", PackItemDataType::BaseYrsUpdate), 361 + ("index.md.update", b"yrs incremental update", PackItemDataType::YrsUpdate), 362 + ("logo.png", b"\x89PNG raw data", PackItemDataType::Binary), 363 363 ]; 364 364 let pack = create_pack(&items); 365 365 let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); 366 366 367 367 assert_eq!(parsed_items.len(), 3); 368 - assert!(matches!(parsed_items[0].data_type, PackDataType::BaseYrsUpdate)); 369 - assert!(matches!(parsed_items[1].data_type, PackDataType::YrsUpdate)); 370 - assert!(matches!(parsed_items[2].data_type, PackDataType::Binary)); 368 + assert!(matches!(parsed_items[0].data_type, PackItemDataType::BaseYrsUpdate)); 369 + assert!(matches!(parsed_items[1].data_type, PackItemDataType::YrsUpdate)); 370 + assert!(matches!(parsed_items[2].data_type, PackItemDataType::Binary)); 371 371 372 372 assert_eq!( 373 373 extract_item(&parsed_items[0], pack_data).unwrap(), ··· 397 397 #[test] 398 398 fn compressed_pack_skips_when_no_savings() { 399 399 // Short random-ish data where compression doesn't help 400 - let items: Vec<(&str, &[u8], PackDataType)> = vec![("a.bin", b"x", PackDataType::Binary)]; 400 + let items: Vec<(&str, &[u8], PackItemDataType)> = vec![("a.bin", b"x", PackItemDataType::Binary)]; 401 401 let compressed = create_compressed_pack(&items); 402 402 let uncompressed = create_pack(&items); 403 403 // For tiny data, create_compressed_pack should fall back to uncompressed ··· 443 443 #[test] 444 444 fn pack_item_extraction_exact_offsets() { 445 445 // verify offset/length slicing produces correct data 446 - let items: Vec<(&str, &[u8], PackDataType)> = vec![ 447 - ("a.txt", b"AAAA", PackDataType::BaseYrsUpdate), 448 - ("b.txt", b"BBBBBB", PackDataType::BaseYrsUpdate), 449 - ("c.txt", b"CC", PackDataType::BaseYrsUpdate), 446 + let items: Vec<(&str, &[u8], PackItemDataType)> = vec![ 447 + ("a.txt", b"AAAA", PackItemDataType::BaseYrsUpdate), 448 + ("b.txt", b"BBBBBB", PackItemDataType::BaseYrsUpdate), 449 + ("c.txt", b"CC", PackItemDataType::BaseYrsUpdate), 450 450 ]; 451 451 let pack = create_pack(&items); 452 452 let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); ··· 471 471 path: "bad.bin".to_string(), 472 472 offset: 100, 473 473 length: 50, 474 - data_type: PackDataType::Binary, 474 + data_type: PackItemDataType::Binary, 475 475 }; 476 476 let pack_data = b"short"; 477 477 assert!(extract_item(&item, pack_data).is_err());
+90 -150
src/save.rs
··· 3 3 use std::collections::HashMap; 4 4 use std::path::Path; 5 5 6 - use crate::pack::{self, PackDataType}; 6 + use crate::pack::{self, PackItemDataType}; 7 7 use crate::pds_client::PdsClient; 8 8 use crate::types::{ 9 - collect_blob_refs, BlobRef, Collaborator, FileEntry, FileIndex, FileKind, PackItemRef, SaveResult, 10 - YrsRepo, COLLECTION, MANIFEST_KEY, 9 + collect_blob_refs, FileEntry, FileIndex, FileKind, PackItemRef, SaveResult, 10 + YrsBranch, BRANCH_COLLECTION, MANIFEST_KEY, 11 11 }; 12 12 use crate::yrs_pds; 13 13 ··· 19 19 struct PendingItem { 20 20 path: String, 21 21 data: Vec<u8>, 22 - data_type: PackDataType, 22 + data_type: PackItemDataType, 23 23 } 24 24 25 25 /// Whether a pending item is a base update (replaces base) ··· 42 42 client: &PdsClient, 43 43 did: &str, 44 44 rkey: &str, 45 - project_name: &str, 46 - new_collaborators: Option<&[Collaborator]>, 47 45 verbose: bool, 48 46 ) -> Result<SaveResult, String> { 49 47 save_filtered( ··· 51 49 client, 52 50 did, 53 51 rkey, 54 - project_name, 55 - new_collaborators, 56 52 None, 57 53 None, 58 54 verbose, ··· 61 57 } 62 58 63 59 /// Save a directory to PDS with optional include/exclude glob filters. 64 - /// 65 - /// `new_collaborators` is used on first initialization to set initial collaborators. 66 - /// On subsequent saves, collaborators are preserved from the existing record. 67 60 pub async fn save_filtered( 68 61 dir: &Path, 69 62 client: &PdsClient, 70 63 did: &str, 71 64 rkey: &str, 72 - project_name: &str, 73 - new_collaborators: Option<&[Collaborator]>, 74 65 include: Option<&[String]>, 75 66 exclude: Option<&[String]>, 76 67 verbose: bool, ··· 82 73 } 83 74 84 75 // Fetch existing record if present 85 - let existing = client.get_record(did, COLLECTION, rkey).await?; 86 - let existing_repo: Option<YrsRepo> = existing 76 + let existing = client.get_record(did, BRANCH_COLLECTION, rkey).await?; 77 + let existing_repo: Option<YrsBranch> = existing 87 78 .as_ref() 88 79 .and_then(|r| serde_json::from_value(r.value.clone()).ok()); 89 80 let swap_cid = existing.as_ref().and_then(|r| r.cid.clone()); ··· 137 128 pending_items.push(PendingItem { 138 129 path: rel_path.clone(), 139 130 data: file_data.clone(), 140 - data_type: PackDataType::Binary, 131 + data_type: PackItemDataType::Binary, 141 132 }); 142 133 pending_kinds.insert(rel_path.clone(), PendingKind::Base); 143 134 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); ··· 175 166 pending_items.push(PendingItem { 176 167 path: rel_path.clone(), 177 168 data: diff, 178 - data_type: PackDataType::YrsUpdate, 169 + data_type: PackItemDataType::YrsUpdate, 179 170 }); 180 171 pending_kinds.insert(rel_path.clone(), PendingKind::Incremental); 181 - // Keep existing base_blob/base, will append update 172 + // Keep existing base, will append update 182 173 file_entries.insert( 183 174 rel_path.clone(), 184 175 FileEntry { 185 176 content_hash: hex_hash(materialized.as_bytes()), 186 - base_blob: existing_entry.base_blob.clone(), 187 177 state_vector: yrs_pds::base64_encode(&sv), 188 178 updates: existing_entry.updates.clone(), 189 179 updates_count: existing_entry.updates_count + 1, 190 180 base_at: existing_entry.base_at.clone(), 191 181 kind: FileKind::Text, 192 182 base: existing_entry.base.clone(), 193 - conflict_source: None, 194 183 }, 195 184 ); 196 185 files_uploaded += 1; ··· 223 212 pending_items.push(PendingItem { 224 213 path: rel_path.clone(), 225 214 data: base_update, 226 - data_type: PackDataType::BaseYrsUpdate, 215 + data_type: PackItemDataType::BaseYrsUpdate, 227 216 }); 228 217 pending_kinds.insert(rel_path.clone(), PendingKind::Base); 229 218 file_entries.insert( 230 219 rel_path.clone(), 231 220 FileEntry { 232 221 content_hash: hex_hash(content.as_bytes()), 233 - base_blob: placeholder_blob_ref(), 234 222 state_vector: yrs_pds::base64_encode(&sv), 235 223 updates: vec![], 236 224 updates_count: 0, 237 225 base_at: chrono::Utc::now().to_rfc3339(), 238 226 kind: FileKind::Text, 239 227 base: None, 240 - conflict_source: None, 241 228 }, 242 229 ); 243 230 files_uploaded += 1; ··· 271 258 pending_items.push(PendingItem { 272 259 path: rel_path.clone(), 273 260 data: file_data.clone(), 274 - data_type: PackDataType::Binary, 261 + data_type: PackItemDataType::Binary, 275 262 }); 276 263 pending_kinds.insert(rel_path.clone(), PendingKind::Base); 277 264 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); ··· 304 291 pending_items.push(PendingItem { 305 292 path: rel_path.clone(), 306 293 data: file_data.clone(), 307 - data_type: PackDataType::Binary, 294 + data_type: PackItemDataType::Binary, 308 295 }); 309 296 pending_kinds.insert(rel_path.clone(), PendingKind::Base); 310 297 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); ··· 332 319 pending_items.push(PendingItem { 333 320 path: rel_path.clone(), 334 321 data: base_update, 335 - data_type: PackDataType::BaseYrsUpdate, 322 + data_type: PackItemDataType::BaseYrsUpdate, 336 323 }); 337 324 pending_kinds.insert(rel_path.clone(), PendingKind::Base); 338 325 file_entries.insert( 339 326 rel_path.clone(), 340 327 FileEntry { 341 328 content_hash: hex_hash(materialized.as_bytes()), 342 - base_blob: placeholder_blob_ref(), 343 329 state_vector: yrs_pds::base64_encode(&sv), 344 330 updates: vec![], 345 331 updates_count: 0, 346 332 base_at: chrono::Utc::now().to_rfc3339(), 347 333 kind: FileKind::Text, 348 334 base: None, 349 - conflict_source: None, 350 - }, 335 + }, 351 336 ); 352 337 files_uploaded += 1; 353 338 } ··· 356 341 pending_items.push(PendingItem { 357 342 path: rel_path.clone(), 358 343 data: file_data.clone(), 359 - data_type: PackDataType::Binary, 344 + data_type: PackItemDataType::Binary, 360 345 }); 361 346 pending_kinds.insert(rel_path.clone(), PendingKind::Base); 362 347 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); ··· 383 368 pending_items.push(PendingItem { 384 369 path: MANIFEST_KEY.to_string(), 385 370 data: manifest_base, 386 - data_type: PackDataType::BaseYrsUpdate, 371 + data_type: PackItemDataType::BaseYrsUpdate, 387 372 }); 388 373 pending_kinds.insert(MANIFEST_KEY.to_string(), PendingKind::Base); 389 374 390 - // Upload all items as a single pack (becomes a PDS blob) 391 - let total_bytes; 392 - if pending_items.is_empty() { 393 - total_bytes = 0; 394 - let manifest_entry = yrs_pds::doc_to_file_entry(&manifest_doc, client, did).await?; 395 - file_entries.insert(MANIFEST_KEY.to_string(), manifest_entry); 396 - } else { 397 - // Build pack 398 - let items: Vec<(&str, &[u8], PackDataType)> = pending_items 399 - .iter() 400 - .map(|pi| (pi.path.as_str(), pi.data.as_slice(), pi.data_type.clone())) 401 - .collect(); 402 - let pack = pack::create_compressed_pack(&items); 403 - let is_compressed = pack::is_gzip(&pack.data); 404 - total_bytes = pack.data.len() as u64; 375 + // Upload all items as a single pack (becomes one or more PDS blobs) 376 + let items: Vec<(&str, &[u8], PackItemDataType)> = pending_items 377 + .iter() 378 + .map(|pi| (pi.path.as_str(), pi.data.as_slice(), pi.data_type.clone())) 379 + .collect(); 380 + let pack = pack::create_compressed_pack(&items); 381 + let is_compressed = pack::is_gzip(&pack.data); 382 + let total_bytes = pack.data.len() as u64; 405 383 406 - // Upload pack as PDS blob — chunk if larger than ATProto limit 407 - let (blob_ref, chunk_refs) = if pack.data.len() > pack::CHUNK_SIZE { 408 - let chunks = pack::chunk_data(&pack.data); 409 - let mut refs = Vec::new(); 410 - for (i, chunk) in chunks.iter().enumerate() { 411 - let r = client.upload_blob(chunk.clone()).await?; 412 - if verbose { 413 - eprintln!( 414 - "pds-yrs: uploaded chunk {}/{} ({} bytes)", 415 - i + 1, 416 - chunks.len(), 417 - chunk.len() 418 - ); 419 - } 420 - refs.push(r); 384 + // Upload pack as PDS blob(s) — split if larger than ATProto limit 385 + let blob_refs = if pack.data.len() > pack::CHUNK_SIZE { 386 + let chunks = pack::chunk_data(&pack.data); 387 + let mut refs = Vec::new(); 388 + for (i, chunk) in chunks.iter().enumerate() { 389 + let r = client.upload_blob(chunk.clone()).await?; 390 + if verbose { 391 + eprintln!( 392 + "pds-yrs: uploaded blob {}/{} ({} bytes)", 393 + i + 1, 394 + chunks.len(), 395 + chunk.len() 396 + ); 421 397 } 422 - let primary = refs[0].clone(); 423 - (primary, Some(refs)) 424 - } else { 425 - let r = client.upload_blob(pack.data).await?; 426 - (r, None) 427 - }; 428 - 429 - if verbose { 430 - eprintln!( 431 - "pds-yrs: uploaded pack ({} bytes, {} items{})", 432 - total_bytes, 433 - pack.items.len(), 434 - if chunk_refs.is_some() { 435 - ", chunked" 436 - } else { 437 - "" 438 - } 439 - ); 398 + refs.push(r); 440 399 } 400 + refs 401 + } else { 402 + vec![client.upload_blob(pack.data).await?] 403 + }; 441 404 442 - // Update file entries with pack item refs 443 - for item in &pack.items { 444 - let item_ref = PackItemRef { 445 - blob: blob_ref.clone(), 446 - offset: item.offset, 447 - length: item.length, 448 - compressed: is_compressed, 449 - chunks: chunk_refs.clone(), 450 - }; 405 + if verbose { 406 + eprintln!( 407 + "pds-yrs: uploaded pack ({} bytes, {} items, {} blob(s))", 408 + total_bytes, 409 + pack.items.len(), 410 + blob_refs.len(), 411 + ); 412 + } 451 413 452 - if item.path == MANIFEST_KEY { 453 - file_entries.insert( 454 - MANIFEST_KEY.to_string(), 455 - FileEntry { 456 - content_hash: String::new(), 457 - base_blob: blob_ref.clone(), 458 - state_vector: yrs_pds::base64_encode(&manifest_sv), 459 - updates: vec![], 460 - updates_count: 0, 461 - base_at: chrono::Utc::now().to_rfc3339(), 462 - kind: FileKind::Text, 463 - base: Some(item_ref), 464 - conflict_source: None, 465 - }, 466 - ); 467 - } else if let Some(fe) = file_entries.get_mut(&item.path) { 468 - let kind = pending_kinds.get(&item.path).cloned().unwrap_or(PendingKind::Base); 469 - match kind { 470 - PendingKind::Base => { 471 - // BaseYrsUpdate — replace base, clear updates 472 - fe.base_blob = blob_ref.clone(); 473 - fe.base = Some(item_ref); 474 - fe.updates.clear(); 475 - } 476 - PendingKind::Incremental => { 477 - // Incremental update — append to updates list 478 - fe.updates.push(item_ref); 479 - } 414 + // Update file entries with pack item refs 415 + for item in &pack.items { 416 + let item_ref = PackItemRef { 417 + blobs: blob_refs.clone(), 418 + offset: item.offset, 419 + length: item.length, 420 + compressed: is_compressed, 421 + }; 422 + 423 + if item.path == MANIFEST_KEY { 424 + file_entries.insert( 425 + MANIFEST_KEY.to_string(), 426 + FileEntry { 427 + content_hash: String::new(), 428 + state_vector: yrs_pds::base64_encode(&manifest_sv), 429 + updates: vec![], 430 + updates_count: 0, 431 + base_at: chrono::Utc::now().to_rfc3339(), 432 + kind: FileKind::Text, 433 + base: Some(item_ref), 434 + }, 435 + ); 436 + } else if let Some(fe) = file_entries.get_mut(&item.path) { 437 + let kind = pending_kinds.get(&item.path).cloned().unwrap_or(PendingKind::Base); 438 + match kind { 439 + PendingKind::Base => { 440 + // BaseYrsUpdate — replace base, clear updates 441 + fe.base = Some(item_ref); 442 + fe.updates.clear(); 443 + } 444 + PendingKind::Incremental => { 445 + // Incremental update — append to updates list 446 + fe.updates.push(item_ref); 480 447 } 481 448 } 482 449 } ··· 485 452 // Serialize file index as a blob entry in a separate index pack 486 453 let index_json = serde_json::to_vec(&file_entries) 487 454 .map_err(|e| format!("serialize FileIndex: {}", e))?; 488 - let index_items: Vec<(&str, &[u8], PackDataType)> = 489 - vec![("__index__", &index_json, PackDataType::BaseYrsUpdate)]; 455 + let index_items: Vec<(&str, &[u8], PackItemDataType)> = 456 + vec![("__index__", &index_json, PackItemDataType::BaseYrsUpdate)]; 490 457 let index_pack = pack::create_compressed_pack(&index_items); 491 458 let index_is_compressed = pack::is_gzip(&index_pack.data); 492 459 let index_blob_ref = client.upload_blob(index_pack.data).await?; 493 460 let index_item = &index_pack.items[0]; 494 461 let index_ref = PackItemRef { 495 - blob: index_blob_ref.clone(), 462 + blobs: vec![index_blob_ref.clone()], 496 463 offset: index_item.offset, 497 464 length: index_item.length, 498 465 compressed: index_is_compressed, 499 - chunks: None, 500 466 }; 501 467 502 468 // Collect all blob refs for GC prevention ··· 506 472 all_blobs.push(index_blob_ref); 507 473 } 508 474 509 - // Build YrsRepo — preserve existing collaborators, merge with any new ones 510 - let mut collaborators = existing_repo 511 - .as_ref() 512 - .map(|r| r.collaborators.clone()) 513 - .unwrap_or_default(); 514 - if let Some(new_collabs) = new_collaborators { 515 - for c in new_collabs { 516 - if !collaborators.iter().any(|existing| existing.rkey == c.rkey) { 517 - collaborators.push(c.clone()); 518 - } 519 - } 520 - } 521 475 let now = chrono::Utc::now().to_rfc3339(); 522 - let record = YrsRepo { 523 - name: project_name.to_string(), 476 + let record = YrsBranch { 524 477 index: index_ref, 525 478 blobs: all_blobs, 526 479 updated_at: now, 527 - collaborators, 528 480 }; 529 481 530 482 let record_json = 531 - serde_json::to_value(&record).map_err(|e| format!("serialize YrsRepo: {}", e))?; 483 + serde_json::to_value(&record).map_err(|e| format!("serialize YrsBranch: {}", e))?; 532 484 533 485 client 534 - .put_record(did, COLLECTION, rkey, record_json, swap_cid) 486 + .put_record(did, BRANCH_COLLECTION, rkey, record_json, swap_cid) 535 487 .await?; 536 488 537 489 Ok(SaveResult { ··· 541 493 }) 542 494 } 543 495 544 - /// Placeholder BlobRef — will be replaced with pack item ref after upload. 545 - fn placeholder_blob_ref() -> BlobRef { 546 - BlobRef::new( 547 - "pending".to_string(), 548 - "application/octet-stream".to_string(), 549 - 0, 550 - ) 551 - } 552 - 553 - /// Placeholder binary FileEntry. 496 + /// Placeholder binary FileEntry — base will be filled in after pack upload. 554 497 fn placeholder_binary_entry(hash: &str) -> FileEntry { 555 498 FileEntry { 556 499 content_hash: hash.to_string(), 557 - base_blob: placeholder_blob_ref(), 558 500 state_vector: String::new(), 559 501 updates: vec![], 560 502 updates_count: 0, 561 503 base_at: chrono::Utc::now().to_rfc3339(), 562 504 kind: FileKind::Binary, 563 505 base: None, 564 - conflict_source: None, 565 506 } 566 507 } 567 508 ··· 896 837 assert_eq!(entry.kind, FileKind::Binary); 897 838 assert_eq!(entry.content_hash, "deadbeef"); 898 839 assert!(entry.base.is_none()); 899 - assert!(entry.conflict_source.is_none()); 900 840 assert_eq!(entry.updates_count, 0); 901 841 } 902 842
+2 -4
src/sync.rs
··· 66 66 client: &PdsClient, 67 67 did: &str, 68 68 rkey: &str, 69 - project_name: &str, 70 69 config: &SyncConfig, 71 70 ) -> Result<Vec<SyncCycleResult>, String> { 72 71 let dir = Path::new(&config.dir); ··· 80 79 } 81 80 } 82 81 83 - let result = sync_cycle(client, did, rkey, project_name, dir, cycle, config).await?; 82 + let result = sync_cycle(client, did, rkey, dir, cycle, config).await?; 84 83 85 84 if config.verbose { 86 85 eprintln!( ··· 114 113 client: &PdsClient, 115 114 did: &str, 116 115 rkey: &str, 117 - project_name: &str, 118 116 dir: &Path, 119 117 cycle: u32, 120 118 config: &SyncConfig, ··· 159 157 } else { 160 158 // Save local changes with filters 161 159 let save_result = 162 - save::save_filtered(dir, client, did, rkey, project_name, None, inc, exc, config.verbose).await?; 160 + save::save_filtered(dir, client, did, rkey, inc, exc, config.verbose).await?; 163 161 files_uploaded = save_result.files_uploaded; 164 162 165 163 // Determine if this cycle should materialize
+86 -102
src/types.rs
··· 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::collections::{HashMap, HashSet}; 5 5 6 - /// Collection name for yrs repo records. 7 - pub const COLLECTION: &str = "net.commoninternet.yrsrepo"; 6 + /// Collection name for project-level repo records. 7 + pub const REPO_COLLECTION: &str = "net.commoninternet.yrsrepo"; 8 + 9 + /// Collection name for per-device branch records. 10 + pub const BRANCH_COLLECTION: &str = "net.commoninternet.yrsbranch"; 8 11 9 12 /// Key for the manifest FileEntry in the YrsRepo. 10 13 pub const MANIFEST_KEY: &str = "pdsyrs_manifest"; ··· 19 22 Binary, 20 23 } 21 24 22 - /// A collaborator reference — points to another device's rkey, optionally on a different PDS. 23 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 24 - pub struct Collaborator { 25 - /// The rkey of the collaborator's repo record. 26 - pub rkey: String, 27 - /// PDS URL if different from the current PDS (for cross-PDS collaboration). 28 - #[serde(skip_serializing_if = "Option::is_none")] 29 - pub pds: Option<String>, 30 - } 31 - 32 25 /// The file index — maps relative paths to FileEntry metadata. 33 26 /// Stored as a PDS blob (not inline in the record) to avoid record size limits. 34 27 pub type FileIndex = HashMap<String, FileEntry>; 35 28 36 - /// A repo stored on PDS with Yrs CRDT state per file. 29 + /// A project-level record stored at `yrsrepo/<project-name>`. 37 30 /// 38 - /// `name` is the project name — shared across all devices/writers for the same project. 39 - /// Each device gets its own rkey (auto-generated), while `name` identifies the project. 40 - /// `collaborators` lists other device rkeys for the same project, enabling merge 41 - /// without needing to list all records. 31 + /// One per project. The rkey IS the project name (deterministic). 32 + /// Lists all branches (devices) that belong to this project. 33 + #[derive(Debug, Clone, Serialize, Deserialize)] 34 + pub struct YrsRepo { 35 + pub name: String, 36 + #[serde(default, skip_serializing_if = "Vec::is_empty")] 37 + pub branches: Vec<BranchRef>, 38 + #[serde(rename = "updatedAt")] 39 + pub updated_at: String, 40 + } 41 + 42 + /// A reference to a branch within a YrsRepo. 43 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 44 + pub struct BranchRef { 45 + /// The rkey of the YrsBranch record. 46 + pub rkey: String, 47 + /// Human-readable label (e.g. device name, or the handle). 48 + #[serde(default, skip_serializing_if = "Option::is_none")] 49 + pub label: Option<String>, 50 + } 51 + 52 + /// A per-device branch record stored at `yrsbranch/<project>-<random8>`. 53 + /// 54 + /// Pure data — no back-reference to project name. The YrsRepo record 55 + /// is the only link between branches and their project. 42 56 /// 43 57 /// File metadata is stored in a separate index blob (pointed to by `index`). 44 58 /// `blobs` lists all blob refs that must be kept alive to prevent PDS garbage collection. 45 59 #[derive(Debug, Clone, Serialize, Deserialize)] 46 - pub struct YrsRepo { 47 - pub name: String, 60 + pub struct YrsBranch { 48 61 /// Pointer to the index within the PDS blob containing the pack (contains serialized FileIndex). 49 62 pub index: PackItemRef, 50 - /// All blob CIDs referenced by this repo — prevents PDS garbage collection. 63 + /// All blob CIDs referenced by this branch — prevents PDS garbage collection. 51 64 pub blobs: Vec<BlobRef>, 52 65 #[serde(rename = "updatedAt")] 53 66 pub updated_at: String, 54 - #[serde(default, skip_serializing_if = "Vec::is_empty")] 55 - pub collaborators: Vec<Collaborator>, 56 67 } 57 68 58 69 /// A single file's state, stored as Yrs CRDT. ··· 63 74 /// For text: hash of UTF-8 content. For binary: hash of raw bytes. 64 75 #[serde(rename = "contentHash")] 65 76 pub content_hash: String, 66 - /// Blob reference for GC — CID of the PDS blob containing the BaseYrsUpdate. 67 - #[serde(rename = "baseBlob")] 68 - pub base_blob: BlobRef, 69 77 /// State vector bytes, base64-encoded for inline storage. 70 78 #[serde(rename = "stateVector")] 71 79 pub state_vector: String, ··· 83 91 /// PackItemRef to BaseYrsUpdate data within a pack. 84 92 #[serde(rename = "base")] 85 93 pub base: Option<PackItemRef>, 86 - /// For binary conflict files, the original path before conflict split. 87 - #[serde(rename = "conflictSource", skip_serializing_if = "Option::is_none")] 88 - pub conflict_source: Option<String>, 89 94 } 90 95 91 96 92 - /// Reference to a pack item within a PDS blob. 97 + /// Reference to a pack item within one or more PDS blobs. 98 + /// 99 + /// A pack may be stored as a single PDS blob or split across multiple blobs 100 + /// (when it exceeds the ~50MB ATProto limit). `blobs` always contains the 101 + /// ordered list of PDS blobs — reassemble them to get the full pack data. 93 102 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 94 103 pub struct PackItemRef { 95 - /// The PDS blob containing the pack. 96 - pub blob: BlobRef, 104 + /// The PDS blob(s) containing the pack. Length 1 for normal packs, 105 + /// length N for packs split across multiple blobs. 106 + pub blobs: Vec<BlobRef>, 97 107 /// Byte offset within the pack data section. 98 108 pub offset: u64, 99 109 /// Length of data within the pack data section. ··· 101 111 /// Whether the pack is gzip-compressed. 102 112 #[serde(default, skip_serializing_if = "std::ops::Not::not")] 103 113 pub compressed: bool, 104 - /// For chunked packs (>40MB), ordered list of chunk blob refs. 105 - /// When present, `blob` is unused — reassemble from chunks instead. 106 - #[serde(skip_serializing_if = "Option::is_none")] 107 - pub chunks: Option<Vec<BlobRef>>, 108 114 } 109 115 110 116 /// A chunked blob — multiple BlobRefs that form a single logical blob. ··· 162 168 }; 163 169 164 170 for entry in entries.values() { 165 - add(&entry.base_blob, &mut seen, &mut refs); 166 171 if let Some(ref pr) = entry.base { 167 - add(&pr.blob, &mut seen, &mut refs); 168 - if let Some(ref chunks) = pr.chunks { 169 - for chunk in chunks { 170 - add(chunk, &mut seen, &mut refs); 171 - } 172 + for blob in &pr.blobs { 173 + add(blob, &mut seen, &mut refs); 172 174 } 173 175 } 174 176 for update in &entry.updates { 175 - add(&update.blob, &mut seen, &mut refs); 176 - if let Some(ref chunks) = update.chunks { 177 - for chunk in chunks { 178 - add(chunk, &mut seen, &mut refs); 179 - } 177 + for blob in &update.blobs { 178 + add(blob, &mut seen, &mut refs); 180 179 } 181 180 } 182 181 } ··· 220 219 fn file_entry_serialization() { 221 220 let entry = FileEntry { 222 221 content_hash: "abc123def456".to_string(), 223 - base_blob: BlobRef::new( 224 - "bafysnap".to_string(), 225 - "application/octet-stream".to_string(), 226 - 100, 227 - ), 228 222 state_vector: "AQID".to_string(), 229 223 updates: vec![], 230 224 updates_count: 0, 231 225 base_at: "2026-03-13T00:00:00Z".to_string(), 232 226 kind: FileKind::Text, 233 227 base: None, 234 - conflict_source: None, 235 - }; 228 + }; 236 229 let json = serde_json::to_string(&entry).unwrap(); 237 - assert!(json.contains("\"baseBlob\"")); 238 230 assert!(json.contains("\"stateVector\"")); 239 - assert!(!json.contains("updatesBlob")); // skipped when None 240 231 assert!(json.contains("\"kind\":\"text\"")); // always serialized 241 232 let deserialized: FileEntry = serde_json::from_str(&json).unwrap(); 242 233 assert_eq!(deserialized.content_hash, "abc123def456"); ··· 247 238 fn binary_file_entry_serialization() { 248 239 let entry = FileEntry { 249 240 content_hash: String::new(), 250 - base_blob: BlobRef::new( 251 - "bafybin".to_string(), 252 - "application/octet-stream".to_string(), 253 - 5000, 254 - ), 255 241 state_vector: String::new(), 256 242 updates: vec![], 257 243 updates_count: 0, 258 244 base_at: "2026-03-13T00:00:00Z".to_string(), 259 245 kind: FileKind::Binary, 260 246 base: None, 261 - conflict_source: None, 262 - }; 247 + }; 263 248 let json = serde_json::to_string(&entry).unwrap(); 264 249 assert!(json.contains("\"binary\"")); // kind is serialized for binary 265 250 let deserialized: FileEntry = serde_json::from_str(&json).unwrap(); ··· 268 253 269 254 #[test] 270 255 fn yrs_repo_serialization() { 256 + let record = YrsRepo { 257 + name: "my-site".to_string(), 258 + branches: vec![BranchRef { 259 + rkey: "my-site-a1b2c3d4".to_string(), 260 + label: None, 261 + }], 262 + updated_at: "2026-03-13T00:00:00Z".to_string(), 263 + }; 264 + let json = serde_json::to_string(&record).unwrap(); 265 + assert!(json.contains("\"branches\"")); // branches field present 266 + assert!(json.contains("my-site-a1b2c3d4")); 267 + let deserialized: YrsRepo = serde_json::from_str(&json).unwrap(); 268 + assert_eq!(deserialized.name, "my-site"); 269 + assert_eq!(deserialized.branches.len(), 1); 270 + } 271 + 272 + #[test] 273 + fn yrs_branch_serialization() { 271 274 let pack_blob = BlobRef::new( 272 275 "bafypack".to_string(), 273 276 "application/octet-stream".to_string(), 274 277 5000, 275 278 ); 276 279 let index_ref = PackItemRef { 277 - blob: pack_blob.clone(), 280 + blobs: vec![pack_blob.clone()], 278 281 offset: 0, 279 282 length: 200, 280 283 compressed: false, 281 - chunks: None, 282 284 }; 283 - let record = YrsRepo { 284 - name: "my-site".to_string(), 285 + let record = YrsBranch { 285 286 index: index_ref, 286 287 blobs: vec![pack_blob], 287 288 updated_at: "2026-03-13T00:00:00Z".to_string(), 288 - collaborators: vec![], 289 289 }; 290 290 let json = serde_json::to_string(&record).unwrap(); 291 - assert!(!json.contains("collaborators")); // empty vec is skipped 292 291 assert!(json.contains("\"index\"")); // index field present 293 292 assert!(json.contains("\"blobs\"")); // blobs field present 294 - let deserialized: YrsRepo = serde_json::from_str(&json).unwrap(); 295 - assert_eq!(deserialized.name, "my-site"); 293 + let deserialized: YrsBranch = serde_json::from_str(&json).unwrap(); 296 294 assert_eq!(deserialized.blobs.len(), 1); 297 - assert!(deserialized.collaborators.is_empty()); 298 295 } 299 296 300 297 #[test] 301 298 fn collect_blob_refs_deduplicates() { 302 299 let blob = BlobRef::new("bafyshared".to_string(), "application/octet-stream".to_string(), 100); 303 300 let item_ref = PackItemRef { 304 - blob: blob.clone(), offset: 0, length: 50, compressed: false, chunks: None, 301 + blobs: vec![blob.clone()], offset: 0, length: 50, compressed: false, 305 302 }; 306 303 let mut index = FileIndex::new(); 307 304 index.insert("a.txt".to_string(), FileEntry { 308 305 content_hash: String::new(), 309 - base_blob: blob.clone(), 310 306 state_vector: String::new(), 311 307 updates: vec![], updates_count: 0, 312 308 base_at: String::new(), kind: FileKind::Text, 313 - base: Some(item_ref.clone()), conflict_source: None, 314 - }); 309 + base: Some(item_ref.clone()), }); 315 310 index.insert("b.txt".to_string(), FileEntry { 316 311 content_hash: String::new(), 317 - base_blob: blob.clone(), 318 312 state_vector: String::new(), 319 313 updates: vec![], updates_count: 0, 320 314 base_at: String::new(), kind: FileKind::Text, 321 - base: Some(item_ref), conflict_source: None, 322 - }); 315 + base: Some(item_ref), }); 323 316 let refs = collect_blob_refs(&index); 324 317 assert_eq!(refs.len(), 1, "same blob CID should be deduplicated"); 325 318 assert_eq!(refs[0].cid(), "bafyshared"); ··· 332 325 let mut index = FileIndex::new(); 333 326 index.insert("a.txt".to_string(), FileEntry { 334 327 content_hash: String::new(), 335 - base_blob: pack_blob.clone(), 336 328 state_vector: String::new(), 337 329 updates: vec![PackItemRef { 338 - blob: update_blob.clone(), offset: 0, length: 50, compressed: false, chunks: None, 330 + blobs: vec![update_blob.clone()], offset: 0, length: 50, compressed: false, 339 331 }], 340 332 updates_count: 1, 341 333 base_at: String::new(), kind: FileKind::Text, 342 334 base: Some(PackItemRef { 343 - blob: pack_blob.clone(), offset: 0, length: 100, compressed: false, chunks: None, 335 + blobs: vec![pack_blob.clone()], offset: 0, length: 100, compressed: false, 344 336 }), 345 - conflict_source: None, 346 - }); 337 + }); 347 338 let refs = collect_blob_refs(&index); 348 339 assert_eq!(refs.len(), 2); 349 340 let cids: Vec<&str> = refs.iter().map(|r| r.cid()).collect(); ··· 354 345 #[test] 355 346 fn pack_item_ref_serialization() { 356 347 let item_ref = PackItemRef { 357 - blob: BlobRef::new( 348 + blobs: vec![BlobRef::new( 358 349 "bafypack".to_string(), 359 350 "application/octet-stream".to_string(), 360 351 5000, 361 - ), 352 + )], 362 353 offset: 100, 363 354 length: 200, 364 355 compressed: true, 365 - chunks: None, 366 356 }; 367 357 let json = serde_json::to_string(&item_ref).unwrap(); 368 358 assert!(json.contains("\"compressed\":true")); ··· 370 360 assert_eq!(deserialized.offset, 100); 371 361 assert_eq!(deserialized.length, 200); 372 362 assert!(deserialized.compressed); 373 - assert!(deserialized.chunks.is_none()); 363 + assert_eq!(deserialized.blobs.len(), 1); 374 364 } 375 365 376 366 #[test] 377 367 fn pack_item_ref_compressed_false_omitted() { 378 368 // compressed=false should be skipped in serialization 379 369 let item_ref = PackItemRef { 380 - blob: BlobRef::new( 370 + blobs: vec![BlobRef::new( 381 371 "bafypack".to_string(), 382 372 "application/octet-stream".to_string(), 383 373 1000, 384 - ), 374 + )], 385 375 offset: 0, 386 376 length: 100, 387 377 compressed: false, 388 - chunks: None, 389 378 }; 390 379 let json = serde_json::to_string(&item_ref).unwrap(); 391 380 assert!( ··· 395 384 } 396 385 397 386 #[test] 398 - fn pack_item_ref_with_chunks() { 387 + fn pack_item_ref_with_multiple_blobs() { 399 388 let item_ref = PackItemRef { 400 - blob: BlobRef::new( 401 - "bafychunk0".to_string(), 402 - "application/octet-stream".to_string(), 403 - 40000000, 404 - ), 405 - offset: 0, 406 - length: 500, 407 - compressed: true, 408 - chunks: Some(vec![ 389 + blobs: vec![ 409 390 BlobRef::new( 410 391 "bafychunk0".to_string(), 411 392 "application/octet-stream".to_string(), ··· 416 397 "application/octet-stream".to_string(), 417 398 10000000, 418 399 ), 419 - ]), 400 + ], 401 + offset: 0, 402 + length: 500, 403 + compressed: true, 420 404 }; 421 405 let json = serde_json::to_string(&item_ref).unwrap(); 422 406 assert!(json.contains("bafychunk1")); 423 407 let deserialized: PackItemRef = serde_json::from_str(&json).unwrap(); 424 - assert_eq!(deserialized.chunks.as_ref().unwrap().len(), 2); 408 + assert_eq!(deserialized.blobs.len(), 2); 425 409 } 426 410 427 411 #[test]
+25 -44
src/yrs_pds.rs
··· 7 7 use yrs::{Doc, GetString, ReadTxn, Text, Transact}; 8 8 9 9 use crate::pds_client::PdsClient; 10 - use crate::types::{FileEntry, FileIndex, FileKind, YrsRepo}; 10 + use crate::types::{FileEntry, FileIndex, FileKind, YrsBranch}; 11 11 12 12 /// Create a Yrs Doc from text content. 13 13 pub fn doc_from_text(content: &str) -> Doc { ··· 78 78 Ok(()) 79 79 } 80 80 81 - /// Upload a Doc as a FileEntry to PDS. 82 - pub async fn doc_to_file_entry( 83 - doc: &Doc, 84 - client: &PdsClient, 85 - did: &str, 86 - ) -> Result<FileEntry, String> { 87 - let base_update = encode_base_update(doc); 88 - let sv = encode_state_vector(doc); 89 - 90 - // Upload base update as a PDS blob 91 - let base_blob = client.upload_blob(base_update.clone()).await?; 92 - 93 - // We need to reference the blob in a record for it to persist, 94 - // so we return the FileEntry which will be embedded in a YrsRepo. 95 - 96 - let now = chrono::Utc::now().to_rfc3339(); 97 - let _ = did; // used by caller for the record 98 - 99 - Ok(FileEntry { 100 - content_hash: String::new(), 101 - base_blob, 102 - state_vector: base64_encode(&sv), 103 - updates: vec![], 104 - updates_count: 0, 105 - base_at: now, 106 - kind: FileKind::Text, 107 - base: None, 108 - conflict_source: None, 109 - }) 110 - } 111 - 112 81 /// Reconstruct a Doc from a FileEntry by downloading blobs from PDS. 113 82 /// 114 83 /// Downloads the BaseYrsUpdate via base, then applies incremental updates. ··· 130 99 } 131 100 132 101 /// Extract data from a PackItemRef by downloading and parsing the pack from PDS. 102 + /// 103 + /// Downloads all blobs listed in the PackItemRef, reassembles them into 104 + /// the full pack, then extracts the item at the given offset/length. 133 105 pub async fn fetch_pack_item( 134 106 item_ref: &crate::types::PackItemRef, 135 107 client: &PdsClient, 136 108 did: &str, 137 109 ) -> Result<Vec<u8>, String> { 138 - let pack_data = if let Some(ref chunks) = item_ref.chunks { 139 - let mut chunk_data = Vec::new(); 140 - for chunk_ref in chunks { 141 - chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); 142 - } 143 - crate::pack::reassemble_chunks(&chunk_data) 144 - } else { 145 - client.get_blob(did, item_ref.blob.cid()).await? 146 - }; 110 + let pack_data = download_pack_blobs(&item_ref.blobs, client, did).await?; 147 111 let (_, data_section) = crate::pack::parse_pack_auto(&pack_data)?; 148 112 let start = item_ref.offset as usize; 149 113 let end = start + item_ref.length as usize; ··· 156 120 Ok(data_section[start..end].to_vec()) 157 121 } 158 122 123 + /// Download and reassemble pack blobs into a single byte vector. 124 + async fn download_pack_blobs( 125 + blobs: &[crate::types::BlobRef], 126 + client: &PdsClient, 127 + did: &str, 128 + ) -> Result<Vec<u8>, String> { 129 + if blobs.len() == 1 { 130 + client.get_blob(did, blobs[0].cid()).await 131 + } else { 132 + let mut chunks = Vec::new(); 133 + for blob in blobs { 134 + chunks.push(client.get_blob(did, blob.cid()).await?); 135 + } 136 + Ok(crate::pack::reassemble_chunks(&chunks)) 137 + } 138 + } 139 + 159 140 /// Get the raw data for a FileEntry's base update, via its base PackItemRef. 160 141 pub async fn fetch_file_data( 161 142 entry: &FileEntry, ··· 167 148 fetch_pack_item(item_ref, client, did).await 168 149 } 169 150 170 - /// Load the FileIndex from a YrsRepo's index blob. 151 + /// Load the FileIndex from a YrsBranch's index blob. 171 152 pub async fn load_file_index( 172 - repo: &YrsRepo, 153 + branch: &YrsBranch, 173 154 client: &PdsClient, 174 155 did: &str, 175 156 ) -> Result<FileIndex, String> { 176 - let data = fetch_pack_item(&repo.index, client, did).await?; 157 + let data = fetch_pack_item(&branch.index, client, did).await?; 177 158 serde_json::from_slice(&data).map_err(|e| format!("parse FileIndex: {}", e)) 178 159 } 179 160