wip: you can use your pds as a yrs-relay if you want to
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

working on refactor

notplants 2a42bcf1 c2ef5c71

+473 -395
+80
plans/ymap-file-index.md
··· 1 + # Plan: Move FileIndex from Opaque JSON Blob to Yrs YMap 2 + 3 + ## Context 4 + 5 + Currently the `FileIndex` (`HashMap<String, FileEntry>`) is 6 + serialized as a single opaque JSON blob and stored in a pack. 7 + This means: 8 + - No CRDT semantics on the index itself 9 + - The entire index must be replaced on every save 10 + - No per-file-entry merge capability 11 + 12 + ## Goal 13 + 14 + Store the FileIndex as a Yrs YMap (like the manifest), where 15 + each key is a file path and each value is the JSON-serialized 16 + FileEntry. This gives per-file last-writer-wins CRDT semantics. 17 + 18 + ## New Index Structure 19 + 20 + - Yrs Doc with a Map named `"index"` 21 + - Keys: relative file paths (e.g. `"docs/readme.md"`) 22 + - Values: JSON-serialized `FileEntry` strings 23 + 24 + ## File Changes 25 + 26 + ### 1. `src/yrs_pds.rs` 27 + Add index YMap helpers (mirroring manifest pattern): 28 + - `new_index_doc() -> Doc` — creates Doc with empty `"index"` 29 + Map 30 + - `index_set(doc: &Doc, path: &str, entry: &FileEntry)` — 31 + inserts JSON-serialized entry 32 + - `index_remove(doc: &Doc, path: &str)` — removes entry 33 + - `index_entries(doc: &Doc) -> FileIndex` — reads all entries, 34 + deserializes JSON values 35 + - `index_from_snapshot(data: &[u8]) -> Result<Doc>` — restore 36 + index Doc from snapshot 37 + 38 + Update `load_file_index`: 39 + - Download snapshot data from `repo.index` PackRef 40 + - Create Doc via `index_from_snapshot` 41 + - Return `index_entries(&doc)` 42 + 43 + Add `load_file_index_doc` for save.rs (returns the Doc for 44 + incremental updates). 45 + 46 + ### 2. `src/save.rs` 47 + - Load existing index as a Yrs Doc (not just HashMap) via 48 + `load_file_index_doc` 49 + - After building `file_entries`, update the index Doc: 50 + - `index_set()` for each entry in file_entries 51 + - `index_remove()` for deleted files 52 + - Encode index Doc as snapshot → store in pack as `__index__` 53 + - Rest of flow unchanged (PackRef, blob refs, YrsRepo 54 + construction) 55 + 56 + ### 3. `src/load.rs` 57 + - No change needed — `load_file_index` still returns 58 + `FileIndex` (HashMap) 59 + 60 + ### 4. `src/merge.rs` 61 + - No change needed — each repo's index is loaded independently 62 + - Future: could CRDT-merge index Docs directly 63 + 64 + ### 5. `src/export.rs` 65 + - No change needed — uses `load_file_index` which returns 66 + HashMap 67 + 68 + ### 6. `src/types.rs` 69 + - No structural changes — `YrsRepo.index` remains `PackRef` 70 + - `FileIndex` type alias unchanged 71 + 72 + ## Verification 73 + 74 + 1. `cargo build` — clean compile 75 + 2. `cargo test` — all tests pass 76 + 3. New test: `index_ymap_round_trip` — insert entries, encode 77 + snapshot, restore, verify entries match 78 + 4. New test: `index_ymap_crdt_merge` — concurrent adds from two 79 + Docs merge correctly 80 + 5. Run benchmarks to confirm no regression
+5 -5
src/export.rs
··· 12 12 13 13 /// Export a repo from PDS to plain text files. 14 14 /// 15 - /// Reconstructs content from Yrs snapshots (for text files) or 16 - /// downloads raw blobs (for binary files). 15 + /// Reconstructs content from BaseYrsUpdates (for text files) or 16 + /// downloads raw data (for binary files). 17 17 pub async fn export( 18 18 client: &PdsClient, 19 19 did: &str, ··· 58 58 .map_err(|e| format!("write {:?}: {}", output_path, e))?; 59 59 } 60 60 FileKind::Binary => { 61 - let pack_ref = entry.pack_ref.as_ref() 62 - .ok_or_else(|| format!("missing pack_ref for binary file: {}", rel_path))?; 63 - let data = yrs_pds::get_pack_ref_data(pack_ref, client, did).await?; 61 + let item_ref = entry.base.as_ref() 62 + .ok_or_else(|| format!("missing base for binary file: {}", rel_path))?; 63 + let data = yrs_pds::fetch_pack_item(item_ref, client, did).await?; 64 64 std::fs::write(&output_path, &data) 65 65 .map_err(|e| format!("write {:?}: {}", output_path, e))?; 66 66 }
+3 -3
src/lib.rs
··· 19 19 pub use local_state::LocalState; 20 20 pub use merge::{merge_project, merge_repos}; 21 21 pub use pack::{ 22 - chunk_data, compress, create_compressed_pack, create_pack, decompress, extract_entry, is_gzip, 23 - is_precompressed_extension, parse_pack, parse_pack_auto, reassemble_chunks, PackBlob, 24 - PackDataType, PackEntry, CHUNK_SIZE, 22 + chunk_data, compress, create_compressed_pack, create_pack, decompress, extract_item, is_gzip, 23 + is_precompressed_extension, parse_pack, parse_pack_auto, reassemble_chunks, Pack, 24 + PackDataType, PackItem, CHUNK_SIZE, 25 25 }; 26 26 pub use pds_client::PdsClient; 27 27 pub use save::{save, save_filtered};
+32 -32
src/load.rs
··· 75 75 76 76 match entry.kind { 77 77 FileKind::Binary => { 78 - let data = get_blob_data_cached( 78 + let data = fetch_file_data_cached( 79 79 entry, 80 80 client, 81 81 did, ··· 87 87 .map_err(|e| format!("write {:?}: {}", output_path, e))?; 88 88 } 89 89 FileKind::Text => { 90 - let snapshot_data = get_blob_data_cached( 90 + let base_data = fetch_file_data_cached( 91 91 entry, 92 92 client, 93 93 did, ··· 95 95 &mut blobs_downloaded, 96 96 ) 97 97 .await?; 98 - let doc = yrs_pds::doc_from_snapshot(&snapshot_data)?; 99 - // Apply incremental updates from pack refs 98 + let doc = yrs_pds::doc_from_base_update(&base_data)?; 99 + // Apply incremental updates from pack item refs 100 100 for update_ref in &entry.updates { 101 - let update_data = get_pack_ref_data_cached( 101 + let update_data = fetch_pack_item_cached( 102 102 update_ref, 103 103 client, 104 104 did, ··· 130 130 }) 131 131 } 132 132 133 - /// Extract data from a PackRef, using pack cache when available. 134 - async fn get_pack_ref_data_cached( 135 - pack_ref: &crate::types::PackRef, 133 + /// Extract data from a PackItemRef, using pack cache when available. 134 + async fn fetch_pack_item_cached( 135 + item_ref: &crate::types::PackItemRef, 136 136 client: &PdsClient, 137 137 did: &str, 138 138 pack_cache: &mut HashMap<String, Vec<u8>>, 139 139 blobs_downloaded: &mut usize, 140 140 ) -> Result<Vec<u8>, String> { 141 - let cid = pack_ref.blob.cid().to_string(); 141 + let cid = item_ref.blob.cid().to_string(); 142 142 143 143 if !pack_cache.contains_key(&cid) { 144 - let data = if let Some(ref chunks) = pack_ref.chunks { 144 + let data = if let Some(ref chunks) = item_ref.chunks { 145 145 let mut chunk_data = Vec::new(); 146 146 for chunk_ref in chunks { 147 147 let chunk = client.get_blob(did, chunk_ref.cid()).await?; ··· 157 157 pack_cache.insert(cid.clone(), data); 158 158 } 159 159 160 - let pack_data = pack_cache.get(&cid).unwrap(); 161 - let (_, blob_data) = crate::pack::parse_pack_auto(pack_data)?; 160 + let raw_pack = pack_cache.get(&cid).unwrap(); 161 + let (_, pack_data) = crate::pack::parse_pack_auto(raw_pack)?; 162 162 163 - let start = pack_ref.offset as usize; 164 - let end = start + pack_ref.length as usize; 165 - if end > blob_data.len() { 163 + let start = item_ref.offset as usize; 164 + let end = start + item_ref.length as usize; 165 + if end > pack_data.len() { 166 166 return Err(format!( 167 - "pack_ref out of bounds: {}..{} in {} bytes", 168 - start, end, blob_data.len() 167 + "pack item ref out of bounds: {}..{} in {} bytes", 168 + start, end, pack_data.len() 169 169 )); 170 170 } 171 - Ok(blob_data[start..end].to_vec()) 171 + Ok(pack_data[start..end].to_vec()) 172 172 } 173 173 174 - /// Get blob data for a file entry, using pack cache when available. 175 - async fn get_blob_data_cached( 174 + /// Get data for a file entry's base update, using pack cache when available. 175 + async fn fetch_file_data_cached( 176 176 entry: &crate::types::FileEntry, 177 177 client: &PdsClient, 178 178 did: &str, 179 179 pack_cache: &mut HashMap<String, Vec<u8>>, 180 180 blobs_downloaded: &mut usize, 181 181 ) -> Result<Vec<u8>, String> { 182 - let pack_ref = entry.pack_ref.as_ref() 183 - .ok_or("missing pack_ref on FileEntry")?; 184 - let cid = pack_ref.blob.cid().to_string(); 182 + let item_ref = entry.base.as_ref() 183 + .ok_or("missing base on FileEntry")?; 184 + let cid = item_ref.blob.cid().to_string(); 185 185 186 186 // Fetch pack blob (or use cache), handling chunked packs 187 187 if !pack_cache.contains_key(&cid) { 188 - let data = if let Some(ref chunks) = pack_ref.chunks { 188 + let data = if let Some(ref chunks) = item_ref.chunks { 189 189 // Reassemble chunked pack 190 190 let mut chunk_data = Vec::new(); 191 191 for chunk_ref in chunks { ··· 202 202 pack_cache.insert(cid.clone(), data); 203 203 } 204 204 205 - let pack_data = pack_cache.get(&cid).unwrap(); 206 - let (_, blob_data) = crate::pack::parse_pack_auto(pack_data)?; 205 + let raw_pack = pack_cache.get(&cid).unwrap(); 206 + let (_, pack_data) = crate::pack::parse_pack_auto(raw_pack)?; 207 207 208 - let start = pack_ref.offset as usize; 209 - let end = start + pack_ref.length as usize; 210 - if end > blob_data.len() { 208 + let start = item_ref.offset as usize; 209 + let end = start + item_ref.length as usize; 210 + if end > pack_data.len() { 211 211 return Err(format!( 212 - "pack_ref out of bounds: {}..{} in {} bytes", 212 + "pack item ref out of bounds: {}..{} in {} bytes", 213 213 start, 214 214 end, 215 - blob_data.len() 215 + pack_data.len() 216 216 )); 217 217 } 218 - Ok(blob_data[start..end].to_vec()) 218 + Ok(pack_data[start..end].to_vec()) 219 219 }
+3 -3
src/local_state.rs
··· 91 91 std::fs::create_dir_all(parent) 92 92 .map_err(|e| format!("create dir for {}: {}", rel_path, e))?; 93 93 } 94 - let snapshot = yrs_pds::encode_snapshot(doc); 94 + let snapshot = yrs_pds::encode_base_update(doc); 95 95 std::fs::write(&yrs_path, &snapshot) 96 96 .map_err(|e| format!("write doc state {}: {}", rel_path, e))?; 97 97 ··· 112 112 } 113 113 let data = 114 114 std::fs::read(&yrs_path).map_err(|e| format!("read doc state {}: {}", rel_path, e))?; 115 - let doc = yrs_pds::doc_from_snapshot(&data)?; 115 + let doc = yrs_pds::doc_from_base_update(&data)?; 116 116 Ok(Some(doc)) 117 117 } 118 118 ··· 139 139 return Ok(None); 140 140 } 141 141 let data = std::fs::read(&yrs_path).map_err(|e| format!("read manifest state: {}", e))?; 142 - let doc = yrs_pds::manifest_from_snapshot(&data)?; 142 + let doc = yrs_pds::manifest_from_base_update(&data)?; 143 143 Ok(Some(doc)) 144 144 } 145 145
+30 -30
src/merge.rs
··· 8 8 9 9 use crate::pack; 10 10 use crate::pds_client::PdsClient; 11 - use crate::types::{FileEntry, FileIndex, FileKind, PackRef, YrsRepo, COLLECTION, MANIFEST_KEY}; 11 + use crate::types::{FileEntry, FileIndex, FileKind, PackItemRef, YrsRepo, COLLECTION, MANIFEST_KEY}; 12 12 use crate::yrs_pds; 13 13 14 14 /// Merge all repos for a project. ··· 101 101 } 102 102 103 103 // Pack cache: keyed by CID, avoids redundant blob downloads. 104 - // All files in a repo typically share 1-2 pack blobs, so this reduces 105 - // O(N × sites) blob downloads to O(pack_blobs × sites). 104 + // All files in a repo typically share 1-2 packs, so this reduces 105 + // O(N × sites) blob downloads to O(packs × sites). 106 106 let mut pack_cache: HashMap<String, Vec<u8>> = HashMap::new(); 107 107 108 108 // CRDT-merge manifests ··· 267 267 if repo_indices.len() == 1 { 268 268 // Only one repo has this binary file — just download it 269 269 let entry = &repos[repo_indices[0]].2[rel_path]; 270 - let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 270 + let data = client.get_blob(did, entry.base_blob.cid()).await?; 271 271 let output_path = output_dir.join(rel_path); 272 272 std::fs::write(&output_path, &data) 273 273 .map_err(|e| format!("write {:?}: {}", output_path, e))?; ··· 279 279 for &idx in repo_indices { 280 280 let entry = &repos[idx].2[rel_path]; 281 281 cid_repo 282 - .entry(entry.snapshot_blob.cid().to_string()) 282 + .entry(entry.base_blob.cid().to_string()) 283 283 .or_default() 284 284 .push(idx); 285 285 } ··· 287 287 if cid_repo.len() == 1 { 288 288 // All repos have the same CID — no conflict 289 289 let entry = &repos[repo_indices[0]].2[rel_path]; 290 - let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 290 + let data = client.get_blob(did, entry.base_blob.cid()).await?; 291 291 let output_path = output_dir.join(rel_path); 292 292 std::fs::write(&output_path, &data) 293 293 .map_err(|e| format!("write {:?}: {}", output_path, e))?; ··· 325 325 did: &str, 326 326 pack_cache: &mut HashMap<String, Vec<u8>>, 327 327 ) -> Result<Doc, String> { 328 - let snapshot_data = get_blob_data_cached(entry, client, did, pack_cache).await?; 329 - let doc = yrs_pds::doc_from_snapshot(&snapshot_data)?; 328 + let base_data = fetch_file_data_cached(entry, client, did, pack_cache).await?; 329 + let doc = yrs_pds::doc_from_base_update(&base_data)?; 330 330 331 331 for update_ref in &entry.updates { 332 332 let update_data = 333 - get_pack_ref_data_cached(update_ref, client, did, pack_cache).await?; 333 + fetch_pack_item_cached(update_ref, client, did, pack_cache).await?; 334 334 yrs_pds::apply_update(&doc, &update_data)?; 335 335 } 336 336 337 337 Ok(doc) 338 338 } 339 339 340 - /// Get blob data for a file entry's snapshot, using pack cache. 341 - async fn get_blob_data_cached( 340 + /// Get data for a file entry's base update, using pack cache. 341 + async fn fetch_file_data_cached( 342 342 entry: &FileEntry, 343 343 client: &PdsClient, 344 344 did: &str, 345 345 pack_cache: &mut HashMap<String, Vec<u8>>, 346 346 ) -> Result<Vec<u8>, String> { 347 - let pack_ref = entry.pack_ref.as_ref() 348 - .ok_or("missing pack_ref on FileEntry")?; 349 - get_pack_ref_data_cached(pack_ref, client, did, pack_cache).await 347 + let item_ref = entry.base.as_ref() 348 + .ok_or("missing base on FileEntry")?; 349 + fetch_pack_item_cached(item_ref, client, did, pack_cache).await 350 350 } 351 351 352 - /// Extract data from a PackRef, using pack cache to avoid redundant downloads. 353 - async fn get_pack_ref_data_cached( 354 - pack_ref: &PackRef, 352 + /// Extract data from a PackItemRef, using pack cache to avoid redundant downloads. 353 + async fn fetch_pack_item_cached( 354 + item_ref: &PackItemRef, 355 355 client: &PdsClient, 356 356 did: &str, 357 357 pack_cache: &mut HashMap<String, Vec<u8>>, 358 358 ) -> Result<Vec<u8>, String> { 359 - let cid = pack_ref.blob.cid().to_string(); 359 + let cid = item_ref.blob.cid().to_string(); 360 360 361 361 if !pack_cache.contains_key(&cid) { 362 - let data = if let Some(ref chunks) = pack_ref.chunks { 362 + let data = if let Some(ref chunks) = item_ref.chunks { 363 363 let mut chunk_data = Vec::new(); 364 364 for chunk_ref in chunks { 365 365 chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); ··· 371 371 pack_cache.insert(cid.clone(), data); 372 372 } 373 373 374 - let pack_data = pack_cache.get(&cid).unwrap(); 375 - let (_, blob_data) = pack::parse_pack_auto(pack_data)?; 374 + let raw_pack = pack_cache.get(&cid).unwrap(); 375 + let (_, pack_data) = pack::parse_pack_auto(raw_pack)?; 376 376 377 - let start = pack_ref.offset as usize; 378 - let end = start + pack_ref.length as usize; 379 - if end > blob_data.len() { 377 + let start = item_ref.offset as usize; 378 + let end = start + item_ref.length as usize; 379 + if end > pack_data.len() { 380 380 return Err(format!( 381 - "pack_ref out of bounds: {}..{} in {} bytes", 382 - start, end, blob_data.len() 381 + "pack item ref out of bounds: {}..{} in {} bytes", 382 + start, end, pack_data.len() 383 383 )); 384 384 } 385 - Ok(blob_data[start..end].to_vec()) 385 + Ok(pack_data[start..end].to_vec()) 386 386 } 387 387 388 388 /// Generate a conflict filename: stem.repo_name.ext ··· 487 487 488 488 // Simulate two repos editing the same text file 489 489 let base_doc = crate::yrs_pds::doc_from_text("Hello world"); 490 - let base_snapshot = crate::yrs_pds::encode_snapshot(&base_doc); 490 + let base_snapshot = crate::yrs_pds::encode_base_update(&base_doc); 491 491 492 492 // Repo A: adds " from Alice" at end 493 - let doc_a = crate::yrs_pds::doc_from_snapshot(&base_snapshot).unwrap(); 493 + let doc_a = crate::yrs_pds::doc_from_base_update(&base_snapshot).unwrap(); 494 494 { 495 495 let text = doc_a.get_or_insert_text("content"); 496 496 let mut txn = doc_a.transact_mut(); ··· 498 498 } 499 499 500 500 // Repo B: adds "Dear " at beginning 501 - let doc_b = crate::yrs_pds::doc_from_snapshot(&base_snapshot).unwrap(); 501 + let doc_b = crate::yrs_pds::doc_from_base_update(&base_snapshot).unwrap(); 502 502 { 503 503 let text = doc_b.get_or_insert_text("content"); 504 504 let mut txn = doc_b.transact_mut();
+108 -109
src/pack.rs
··· 1 - //! Pack blob format: bundle multiple file blobs into a single upload. 1 + //! Pack format: bundle multiple items into a single PDS blob upload. 2 2 //! 3 3 //! Format: 4 4 //! ```text 5 5 //! [4 bytes: index length (u32 LE)] 6 - //! [index: JSON array of PackEntry] 7 - //! [blob data: concatenated file data] 6 + //! [index: JSON array of PackItem] 7 + //! [pack data: concatenated item data] 8 8 //! ``` 9 9 10 10 use serde::{Deserialize, Serialize}; 11 11 12 - /// An entry in the pack index. 12 + /// An item in the pack index. 13 13 #[derive(Debug, Clone, Serialize, Deserialize)] 14 - pub struct PackEntry { 14 + pub struct PackItem { 15 15 /// Relative file path. 16 16 pub path: String, 17 - /// Byte offset within the blob data section. 17 + /// Byte offset within the pack data section. 18 18 pub offset: u64, 19 19 /// Length of data. 20 20 pub length: u64, ··· 22 22 pub data_type: PackDataType, 23 23 } 24 24 25 - /// Type of data in a pack entry. 25 + /// Type of data in a pack item. 26 26 #[derive(Debug, Clone, Serialize, Deserialize)] 27 - #[serde(rename_all = "lowercase")] 28 27 pub enum PackDataType { 29 - /// Full Yrs snapshot (encode_state_as_update_v1). 30 - Snapshot, 31 - /// Incremental Yrs update (encode_diff_v1). 32 - Update, 28 + /// BaseYrsUpdate — full Yrs state encoded against empty StateVector. 29 + BaseYrsUpdate, 30 + /// YrsUpdate — incremental diff encoded against a non-empty StateVector. 31 + YrsUpdate, 33 32 /// Raw binary file data. 34 33 Binary, 35 34 } 36 35 37 - /// A pack blob ready for upload. 38 - pub struct PackBlob { 39 - /// The complete pack data (index + blob data). 36 + /// A pack ready for upload as a PDS blob. 37 + pub struct Pack { 38 + /// The complete pack data (index + item data). 40 39 pub data: Vec<u8>, 41 - /// The index entries (for building PackRefs after upload). 42 - pub entries: Vec<PackEntry>, 40 + /// The index items (for building PackItemRefs after upload). 41 + pub items: Vec<PackItem>, 43 42 } 44 43 45 - /// Build a pack blob from a set of named data blobs. 46 - pub fn create_pack(items: &[(&str, &[u8], PackDataType)]) -> PackBlob { 47 - // Build index entries 48 - let mut entries = Vec::new(); 44 + /// Build a pack from a set of named data items. 45 + pub fn create_pack(items: &[(&str, &[u8], PackDataType)]) -> Pack { 46 + // Build index items 47 + let mut pack_items = Vec::new(); 49 48 let mut offset: u64 = 0; 50 49 for &(path, data, ref data_type) in items { 51 - entries.push(PackEntry { 50 + pack_items.push(PackItem { 52 51 path: path.to_string(), 53 52 offset, 54 53 length: data.len() as u64, ··· 58 57 } 59 58 60 59 // Serialize index 61 - let index_json = serde_json::to_vec(&entries).unwrap_or_default(); 60 + let index_json = serde_json::to_vec(&pack_items).unwrap_or_default(); 62 61 let index_len = index_json.len() as u32; 63 62 64 63 // Build pack data ··· 69 68 pack_data.extend_from_slice(data); 70 69 } 71 70 72 - PackBlob { 71 + Pack { 73 72 data: pack_data, 74 - entries, 73 + items: pack_items, 75 74 } 76 75 } 77 76 78 - /// Parse a pack blob, returning the index and a reference to the data section. 79 - pub fn parse_pack(data: &[u8]) -> Result<(Vec<PackEntry>, &[u8]), String> { 77 + /// Parse a pack, returning the index and a reference to the data section. 78 + pub fn parse_pack(data: &[u8]) -> Result<(Vec<PackItem>, &[u8]), String> { 80 79 if data.len() < 4 { 81 80 return Err("pack too small".to_string()); 82 81 } ··· 93 92 )); 94 93 } 95 94 96 - let entries: Vec<PackEntry> = serde_json::from_slice(&data[index_start..index_end]) 95 + let items: Vec<PackItem> = serde_json::from_slice(&data[index_start..index_end]) 97 96 .map_err(|e| format!("parse pack index: {}", e))?; 98 97 99 - let blob_data = &data[index_end..]; 100 - Ok((entries, blob_data)) 98 + let pack_data = &data[index_end..]; 99 + Ok((items, pack_data)) 101 100 } 102 101 103 - /// Extract a single entry's data from the blob data section. 104 - pub fn extract_entry<'a>(entry: &PackEntry, blob_data: &'a [u8]) -> Result<&'a [u8], String> { 105 - let start = entry.offset as usize; 106 - let end = start + entry.length as usize; 107 - if end > blob_data.len() { 102 + /// Extract a single item's data from the pack data section. 103 + pub fn extract_item<'a>(item: &PackItem, pack_data: &'a [u8]) -> Result<&'a [u8], String> { 104 + let start = item.offset as usize; 105 + let end = start + item.length as usize; 106 + if end > pack_data.len() { 108 107 return Err(format!( 109 - "pack entry {} out of bounds: {}..{} in {} bytes", 110 - entry.path, 108 + "pack item {} out of bounds: {}..{} in {} bytes", 109 + item.path, 111 110 start, 112 111 end, 113 - blob_data.len() 112 + pack_data.len() 114 113 )); 115 114 } 116 - Ok(&blob_data[start..end]) 115 + Ok(&pack_data[start..end]) 117 116 } 118 117 119 118 /// Compress data with gzip. ··· 194 193 ) 195 194 } 196 195 197 - /// Create a compressed pack blob. Compresses the entire pack with gzip. 198 - pub fn create_compressed_pack(items: &[(&str, &[u8], PackDataType)]) -> PackBlob { 196 + /// Create a compressed pack. Compresses the entire pack with gzip. 197 + pub fn create_compressed_pack(items: &[(&str, &[u8], PackDataType)]) -> Pack { 199 198 let pack = create_pack(items); 200 199 let compressed = compress(&pack.data); 201 200 202 201 // Only use compression if it actually saves space 203 202 if compressed.len() < pack.data.len() { 204 - PackBlob { 203 + Pack { 205 204 data: compressed, 206 - entries: pack.entries, 205 + items: pack.items, 207 206 } 208 207 } else { 209 208 pack 210 209 } 211 210 } 212 211 213 - /// Parse a pack blob, auto-detecting gzip compression. 214 - pub fn parse_pack_auto(data: &[u8]) -> Result<(Vec<PackEntry>, Vec<u8>), String> { 212 + /// Parse a pack, auto-detecting gzip compression. 213 + pub fn parse_pack_auto(data: &[u8]) -> Result<(Vec<PackItem>, Vec<u8>), String> { 215 214 let decompressed; 216 215 let actual_data = if is_gzip(data) { 217 216 decompressed = decompress(data)?; ··· 220 219 data 221 220 }; 222 221 223 - let (entries, blob_data) = parse_pack(actual_data)?; 224 - Ok((entries, blob_data.to_vec())) 222 + let (items, pack_data) = parse_pack(actual_data)?; 223 + Ok((items, pack_data.to_vec())) 225 224 } 226 225 227 226 #[cfg(test)] ··· 233 232 let items: Vec<(&str, &[u8], PackDataType)> = vec![ 234 233 ( 235 234 "docs/index.md", 236 - b"snapshot data for index", 237 - PackDataType::Snapshot, 235 + b"base update data for index", 236 + PackDataType::BaseYrsUpdate, 238 237 ), 239 238 ( 240 239 "docs/about.md", 241 - b"snapshot data for about", 242 - PackDataType::Snapshot, 240 + b"base update data for about", 241 + PackDataType::BaseYrsUpdate, 243 242 ), 244 243 ("images/logo.png", b"raw png bytes", PackDataType::Binary), 245 244 ]; 246 245 247 246 let pack = create_pack(&items); 248 247 assert!(!pack.data.is_empty()); 249 - assert_eq!(pack.entries.len(), 3); 248 + assert_eq!(pack.items.len(), 3); 250 249 251 250 // Parse it back 252 - let (entries, blob_data) = parse_pack(&pack.data).unwrap(); 253 - assert_eq!(entries.len(), 3); 251 + let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); 252 + assert_eq!(parsed_items.len(), 3); 254 253 255 - // Extract each entry 256 - let index_data = extract_entry(&entries[0], blob_data).unwrap(); 257 - assert_eq!(index_data, b"snapshot data for index"); 254 + // Extract each item 255 + let index_data = extract_item(&parsed_items[0], pack_data).unwrap(); 256 + assert_eq!(index_data, b"base update data for index"); 258 257 259 - let about_data = extract_entry(&entries[1], blob_data).unwrap(); 260 - assert_eq!(about_data, b"snapshot data for about"); 258 + let about_data = extract_item(&parsed_items[1], pack_data).unwrap(); 259 + assert_eq!(about_data, b"base update data for about"); 261 260 262 - let logo_data = extract_entry(&entries[2], blob_data).unwrap(); 261 + let logo_data = extract_item(&parsed_items[2], pack_data).unwrap(); 263 262 assert_eq!(logo_data, b"raw png bytes"); 264 263 } 265 264 ··· 267 266 fn pack_empty() { 268 267 let items: Vec<(&str, &[u8], PackDataType)> = vec![]; 269 268 let pack = create_pack(&items); 270 - let (entries, _) = parse_pack(&pack.data).unwrap(); 271 - assert_eq!(entries.len(), 0); 269 + let (parsed_items, _) = parse_pack(&pack.data).unwrap(); 270 + assert_eq!(parsed_items.len(), 0); 272 271 } 273 272 274 273 #[test] 275 - fn pack_single_large_entry() { 274 + fn pack_single_large_item() { 276 275 let big_data = vec![42u8; 100_000]; 277 276 let items: Vec<(&str, &[u8], PackDataType)> = 278 277 vec![("big.bin", &big_data, PackDataType::Binary)]; 279 278 let pack = create_pack(&items); 280 - let (entries, blob_data) = parse_pack(&pack.data).unwrap(); 281 - let extracted = extract_entry(&entries[0], blob_data).unwrap(); 279 + let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); 280 + let extracted = extract_item(&parsed_items[0], pack_data).unwrap(); 282 281 assert_eq!(extracted.len(), 100_000); 283 282 assert_eq!(extracted[0], 42); 284 283 } ··· 302 301 // Use repetitive data so compression actually helps 303 302 let text = "Hello world! ".repeat(100); 304 303 let items: Vec<(&str, &[u8], PackDataType)> = 305 - vec![("file.md", text.as_bytes(), PackDataType::Snapshot)]; 304 + vec![("file.md", text.as_bytes(), PackDataType::BaseYrsUpdate)]; 306 305 let pack = create_compressed_pack(&items); 307 306 // Compressed should be smaller for repetitive data 308 307 let uncompressed = create_pack(&items); 309 308 assert!(pack.data.len() < uncompressed.data.len()); 310 309 311 310 // Parse with auto-detection 312 - let (entries, blob_data) = parse_pack_auto(&pack.data).unwrap(); 313 - assert_eq!(entries.len(), 1); 314 - let extracted = extract_entry(&entries[0], &blob_data).unwrap(); 311 + let (parsed_items, pack_data) = parse_pack_auto(&pack.data).unwrap(); 312 + assert_eq!(parsed_items.len(), 1); 313 + let extracted = extract_item(&parsed_items[0], &pack_data).unwrap(); 315 314 assert_eq!(extracted, text.as_bytes()); 316 315 } 317 316 318 317 #[test] 319 318 fn uncompressed_pack_auto_parse() { 320 319 let items: Vec<(&str, &[u8], PackDataType)> = 321 - vec![("file.md", b"short", PackDataType::Snapshot)]; 320 + vec![("file.md", b"short", PackDataType::BaseYrsUpdate)]; 322 321 let pack = create_pack(&items); 323 322 // Should work fine with parse_pack_auto even without compression 324 - let (entries, blob_data) = parse_pack_auto(&pack.data).unwrap(); 325 - assert_eq!(entries.len(), 1); 326 - let extracted = extract_entry(&entries[0], &blob_data).unwrap(); 323 + let (parsed_items, pack_data) = parse_pack_auto(&pack.data).unwrap(); 324 + assert_eq!(parsed_items.len(), 1); 325 + let extracted = extract_item(&parsed_items[0], &pack_data).unwrap(); 327 326 assert_eq!(extracted, b"short"); 328 327 } 329 328 ··· 356 355 357 356 #[test] 358 357 fn pack_mixed_data_types() { 359 - // pack containing snapshot, update, and binary entries 358 + // pack containing base update, incremental update, and binary items 360 359 let items: Vec<(&str, &[u8], PackDataType)> = vec![ 361 - ("index.md", b"yrs snapshot bytes", PackDataType::Snapshot), 362 - ("index.md.update", b"yrs update diff", PackDataType::Update), 360 + ("index.md", b"yrs base update bytes", PackDataType::BaseYrsUpdate), 361 + ("index.md.update", b"yrs incremental update", PackDataType::YrsUpdate), 363 362 ("logo.png", b"\x89PNG raw data", PackDataType::Binary), 364 363 ]; 365 364 let pack = create_pack(&items); 366 - let (entries, blob_data) = parse_pack(&pack.data).unwrap(); 365 + let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); 367 366 368 - assert_eq!(entries.len(), 3); 369 - assert!(matches!(entries[0].data_type, PackDataType::Snapshot)); 370 - assert!(matches!(entries[1].data_type, PackDataType::Update)); 371 - assert!(matches!(entries[2].data_type, PackDataType::Binary)); 367 + assert_eq!(parsed_items.len(), 3); 368 + assert!(matches!(parsed_items[0].data_type, PackDataType::BaseYrsUpdate)); 369 + assert!(matches!(parsed_items[1].data_type, PackDataType::YrsUpdate)); 370 + assert!(matches!(parsed_items[2].data_type, PackDataType::Binary)); 372 371 373 372 assert_eq!( 374 - extract_entry(&entries[0], blob_data).unwrap(), 375 - b"yrs snapshot bytes" 373 + extract_item(&parsed_items[0], pack_data).unwrap(), 374 + b"yrs base update bytes" 376 375 ); 377 376 assert_eq!( 378 - extract_entry(&entries[1], blob_data).unwrap(), 379 - b"yrs update diff" 377 + extract_item(&parsed_items[1], pack_data).unwrap(), 378 + b"yrs incremental update" 380 379 ); 381 380 assert_eq!( 382 - extract_entry(&entries[2], blob_data).unwrap(), 381 + extract_item(&parsed_items[2], pack_data).unwrap(), 383 382 b"\x89PNG raw data" 384 383 ); 385 384 } ··· 403 402 let uncompressed = create_pack(&items); 404 403 // For tiny data, create_compressed_pack should fall back to uncompressed 405 404 // (or use compressed if smaller — either way, parse_pack_auto handles both) 406 - let (entries, blob_data) = parse_pack_auto(&compressed.data).unwrap(); 407 - assert_eq!(entries.len(), 1); 408 - assert_eq!(extract_entry(&entries[0], &blob_data).unwrap(), b"x"); 405 + let (parsed_items, pack_data) = parse_pack_auto(&compressed.data).unwrap(); 406 + assert_eq!(parsed_items.len(), 1); 407 + assert_eq!(extract_item(&parsed_items[0], &pack_data).unwrap(), b"x"); 409 408 // Just verify uncompressed also works 410 - let (entries2, blob_data2) = parse_pack_auto(&uncompressed.data).unwrap(); 411 - assert_eq!(extract_entry(&entries2[0], &blob_data2).unwrap(), b"x"); 409 + let (parsed_items2, pack_data2) = parse_pack_auto(&uncompressed.data).unwrap(); 410 + assert_eq!(extract_item(&parsed_items2[0], &pack_data2).unwrap(), b"x"); 412 411 } 413 412 414 413 #[test] ··· 442 441 } 443 442 444 443 #[test] 445 - fn pack_ref_extraction_exact_offsets() { 444 + fn pack_item_extraction_exact_offsets() { 446 445 // verify offset/length slicing produces correct data 447 446 let items: Vec<(&str, &[u8], PackDataType)> = vec![ 448 - ("a.txt", b"AAAA", PackDataType::Snapshot), 449 - ("b.txt", b"BBBBBB", PackDataType::Snapshot), 450 - ("c.txt", b"CC", PackDataType::Snapshot), 447 + ("a.txt", b"AAAA", PackDataType::BaseYrsUpdate), 448 + ("b.txt", b"BBBBBB", PackDataType::BaseYrsUpdate), 449 + ("c.txt", b"CC", PackDataType::BaseYrsUpdate), 451 450 ]; 452 451 let pack = create_pack(&items); 453 - let (entries, blob_data) = parse_pack(&pack.data).unwrap(); 452 + let (parsed_items, pack_data) = parse_pack(&pack.data).unwrap(); 454 453 455 454 // check offsets are correct 456 - assert_eq!(entries[0].offset, 0); 457 - assert_eq!(entries[0].length, 4); 458 - assert_eq!(entries[1].offset, 4); 459 - assert_eq!(entries[1].length, 6); 460 - assert_eq!(entries[2].offset, 10); 461 - assert_eq!(entries[2].length, 2); 455 + assert_eq!(parsed_items[0].offset, 0); 456 + assert_eq!(parsed_items[0].length, 4); 457 + assert_eq!(parsed_items[1].offset, 4); 458 + assert_eq!(parsed_items[1].length, 6); 459 + assert_eq!(parsed_items[2].offset, 10); 460 + assert_eq!(parsed_items[2].length, 2); 462 461 463 462 // manual slice verification 464 - let start = entries[1].offset as usize; 465 - let end = start + entries[1].length as usize; 466 - assert_eq!(&blob_data[start..end], b"BBBBBB"); 463 + let start = parsed_items[1].offset as usize; 464 + let end = start + parsed_items[1].length as usize; 465 + assert_eq!(&pack_data[start..end], b"BBBBBB"); 467 466 } 468 467 469 468 #[test] 470 - fn extract_entry_out_of_bounds() { 471 - let entry = PackEntry { 469 + fn extract_item_out_of_bounds() { 470 + let item = PackItem { 472 471 path: "bad.bin".to_string(), 473 472 offset: 100, 474 473 length: 50, 475 474 data_type: PackDataType::Binary, 476 475 }; 477 - let blob_data = b"short"; 478 - assert!(extract_entry(&entry, blob_data).is_err()); 476 + let pack_data = b"short"; 477 + assert!(extract_item(&item, pack_data).is_err()); 479 478 } 480 479 }
+98 -98
src/save.rs
··· 6 6 use crate::pack::{self, PackDataType}; 7 7 use crate::pds_client::PdsClient; 8 8 use crate::types::{ 9 - collect_blob_refs, BlobRef, Collaborator, FileEntry, FileIndex, FileKind, PackRef, SaveResult, 9 + collect_blob_refs, BlobRef, Collaborator, FileEntry, FileIndex, FileKind, PackItemRef, SaveResult, 10 10 YrsRepo, COLLECTION, MANIFEST_KEY, 11 11 }; 12 12 use crate::yrs_pds; 13 13 14 14 /// Compaction threshold: when any file's updates_count reaches this, 15 - /// the entire repo is compacted (all files get fresh snapshots in one pack). 15 + /// the entire repo is compacted (all files get fresh BaseYrsUpdates in one pack). 16 16 const COMPACTION_THRESHOLD: u32 = 10; 17 17 18 - /// Pending blob to be packed into a single upload. 19 - struct PendingBlob { 18 + /// Pending item to be packed into a single upload. 19 + struct PendingItem { 20 20 path: String, 21 21 data: Vec<u8>, 22 22 data_type: PackDataType, 23 23 } 24 24 25 - /// Whether a pending blob is an incremental update (appended to updates list) 26 - /// or a snapshot (replaces pack_ref). 25 + /// Whether a pending item is a base update (replaces base) 26 + /// or an incremental update (appended to updates list). 27 27 #[derive(Clone, PartialEq)] 28 28 enum PendingKind { 29 - /// Full snapshot — will become the new pack_ref. 30 - Snapshot, 31 - /// Incremental update — will be appended to updates list. 32 - Update, 29 + /// BaseYrsUpdate — will become the new base. 30 + Base, 31 + /// Incremental YrsUpdate — will be appended to updates list. 32 + Incremental, 33 33 } 34 34 35 35 /// Save a directory to PDS. 36 36 /// 37 37 /// Maintains a CRDT manifest (Yrs Map) tracking all files. Supports both 38 38 /// text files (Yrs CRDT merge) and binary files (raw blob storage). 39 - /// All blobs are bundled into a single pack blob upload. 39 + /// All items are bundled into a single pack uploaded as a PDS blob. 40 40 pub async fn save( 41 41 dir: &Path, 42 42 client: &PdsClient, ··· 113 113 }; 114 114 115 115 let mut file_entries: HashMap<String, FileEntry> = HashMap::new(); 116 - let mut pending_blobs: Vec<PendingBlob> = Vec::new(); 117 - // Track whether each pending blob is a snapshot or incremental update 116 + let mut pending_items: Vec<PendingItem> = Vec::new(); 117 + // Track whether each pending item is a base update or incremental update 118 118 let mut pending_kinds: HashMap<String, PendingKind> = HashMap::new(); 119 119 let mut files_uploaded = 0; 120 120 let mut files_skipped = 0; ··· 124 124 let local_paths: std::collections::HashSet<String> = 125 125 local_files.iter().map(|(p, _, _)| p.clone()).collect(); 126 126 127 - // First pass: determine what needs uploading, collect blob data 127 + // First pass: determine what needs uploading, collect data 128 128 for (rel_path, file_data, kind) in &local_files { 129 129 match kind { 130 130 FileKind::Text => { ··· 134 134 // Not valid UTF-8 — treat as binary 135 135 let hash = hex_hash(file_data); 136 136 yrs_pds::manifest_insert(&manifest_doc, rel_path, &FileKind::Binary); 137 - pending_blobs.push(PendingBlob { 137 + pending_items.push(PendingItem { 138 138 path: rel_path.clone(), 139 139 data: file_data.clone(), 140 140 data_type: PackDataType::Binary, 141 141 }); 142 - pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 142 + pending_kinds.insert(rel_path.clone(), PendingKind::Base); 143 143 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 144 144 files_uploaded += 1; 145 145 continue; ··· 172 172 let diff = yrs_pds::encode_diff(&doc, &old_sv_bytes)?; 173 173 let sv = yrs_pds::encode_state_vector(&doc); 174 174 let materialized = yrs_pds::materialize(&doc); 175 - pending_blobs.push(PendingBlob { 175 + pending_items.push(PendingItem { 176 176 path: rel_path.clone(), 177 177 data: diff, 178 - data_type: PackDataType::Update, 178 + data_type: PackDataType::YrsUpdate, 179 179 }); 180 - pending_kinds.insert(rel_path.clone(), PendingKind::Update); 181 - // Keep existing snapshot/pack_ref, will append update 180 + pending_kinds.insert(rel_path.clone(), PendingKind::Incremental); 181 + // Keep existing base_blob/base, will append update 182 182 file_entries.insert( 183 183 rel_path.clone(), 184 184 FileEntry { 185 185 content_hash: hex_hash(materialized.as_bytes()), 186 - snapshot_blob: existing_entry.snapshot_blob.clone(), 186 + base_blob: existing_entry.base_blob.clone(), 187 187 state_vector: yrs_pds::base64_encode(&sv), 188 188 updates: existing_entry.updates.clone(), 189 189 updates_count: existing_entry.updates_count + 1, 190 - snapshot_at: existing_entry.snapshot_at.clone(), 190 + base_at: existing_entry.base_at.clone(), 191 191 kind: FileKind::Text, 192 - pack_ref: existing_entry.pack_ref.clone(), 192 + base: existing_entry.base.clone(), 193 193 conflict_source: None, 194 194 }, 195 195 ); ··· 207 207 } 208 208 209 209 if verbose { 210 - eprintln!("pds-yrs: full snapshot {}", rel_path); 210 + eprintln!("pds-yrs: full base update {}", rel_path); 211 211 } 212 212 } else { 213 213 yrs_pds::manifest_insert(&manifest_doc, rel_path, &FileKind::Text); ··· 216 216 yrs_pds::manifest_insert(&manifest_doc, rel_path, &FileKind::Text); 217 217 } 218 218 219 - // Full snapshot 219 + // Full base update 220 220 let doc = yrs_pds::doc_from_text(content); 221 - let snapshot = yrs_pds::encode_snapshot(&doc); 221 + let base_update = yrs_pds::encode_base_update(&doc); 222 222 let sv = yrs_pds::encode_state_vector(&doc); 223 - pending_blobs.push(PendingBlob { 223 + pending_items.push(PendingItem { 224 224 path: rel_path.clone(), 225 - data: snapshot, 226 - data_type: PackDataType::Snapshot, 225 + data: base_update, 226 + data_type: PackDataType::BaseYrsUpdate, 227 227 }); 228 - pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 228 + pending_kinds.insert(rel_path.clone(), PendingKind::Base); 229 229 file_entries.insert( 230 230 rel_path.clone(), 231 231 FileEntry { 232 232 content_hash: hex_hash(content.as_bytes()), 233 - snapshot_blob: placeholder_blob_ref(), 233 + base_blob: placeholder_blob_ref(), 234 234 state_vector: yrs_pds::base64_encode(&sv), 235 235 updates: vec![], 236 236 updates_count: 0, 237 - snapshot_at: chrono::Utc::now().to_rfc3339(), 237 + base_at: chrono::Utc::now().to_rfc3339(), 238 238 kind: FileKind::Text, 239 - pack_ref: None, 239 + base: None, 240 240 conflict_source: None, 241 241 }, 242 242 ); ··· 268 268 } 269 269 270 270 let hash = hex_hash(file_data); 271 - pending_blobs.push(PendingBlob { 271 + pending_items.push(PendingItem { 272 272 path: rel_path.clone(), 273 273 data: file_data.clone(), 274 274 data_type: PackDataType::Binary, 275 275 }); 276 - pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 276 + pending_kinds.insert(rel_path.clone(), PendingKind::Base); 277 277 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 278 278 files_uploaded += 1; 279 279 if verbose { ··· 283 283 } 284 284 } 285 285 286 - // Compaction: if any file crossed the threshold, re-snapshot ALL files 286 + // Compaction: if any file crossed the threshold, re-create BaseYrsUpdates for ALL files 287 287 if needs_compaction { 288 288 if verbose { 289 - eprintln!("pds-yrs: compaction triggered — re-snapshotting all files"); 289 + eprintln!("pds-yrs: compaction triggered — re-creating base updates for all files"); 290 290 } 291 - pending_blobs.clear(); 291 + pending_items.clear(); 292 292 pending_kinds.clear(); 293 293 file_entries.clear(); 294 294 files_uploaded = 0; ··· 301 301 Ok(s) => s, 302 302 Err(_) => { 303 303 let hash = hex_hash(file_data); 304 - pending_blobs.push(PendingBlob { 304 + pending_items.push(PendingItem { 305 305 path: rel_path.clone(), 306 306 data: file_data.clone(), 307 307 data_type: PackDataType::Binary, 308 308 }); 309 - pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 309 + pending_kinds.insert(rel_path.clone(), PendingKind::Base); 310 310 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 311 311 files_uploaded += 1; 312 312 continue; ··· 326 326 } else { 327 327 yrs_pds::doc_from_text(content) 328 328 }; 329 - let snapshot = yrs_pds::encode_snapshot(&doc); 329 + let base_update = yrs_pds::encode_base_update(&doc); 330 330 let sv = yrs_pds::encode_state_vector(&doc); 331 331 let materialized = yrs_pds::materialize(&doc); 332 - pending_blobs.push(PendingBlob { 332 + pending_items.push(PendingItem { 333 333 path: rel_path.clone(), 334 - data: snapshot, 335 - data_type: PackDataType::Snapshot, 334 + data: base_update, 335 + data_type: PackDataType::BaseYrsUpdate, 336 336 }); 337 - pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 337 + pending_kinds.insert(rel_path.clone(), PendingKind::Base); 338 338 file_entries.insert( 339 339 rel_path.clone(), 340 340 FileEntry { 341 341 content_hash: hex_hash(materialized.as_bytes()), 342 - snapshot_blob: placeholder_blob_ref(), 342 + base_blob: placeholder_blob_ref(), 343 343 state_vector: yrs_pds::base64_encode(&sv), 344 344 updates: vec![], 345 345 updates_count: 0, 346 - snapshot_at: chrono::Utc::now().to_rfc3339(), 346 + base_at: chrono::Utc::now().to_rfc3339(), 347 347 kind: FileKind::Text, 348 - pack_ref: None, 348 + base: None, 349 349 conflict_source: None, 350 350 }, 351 351 ); ··· 353 353 } 354 354 FileKind::Binary => { 355 355 let hash = hex_hash(file_data); 356 - pending_blobs.push(PendingBlob { 356 + pending_items.push(PendingItem { 357 357 path: rel_path.clone(), 358 358 data: file_data.clone(), 359 359 data_type: PackDataType::Binary, 360 360 }); 361 - pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 361 + pending_kinds.insert(rel_path.clone(), PendingKind::Base); 362 362 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 363 363 files_uploaded += 1; 364 364 } ··· 377 377 } 378 378 } 379 379 380 - // Add manifest snapshot to pending blobs 381 - let manifest_snapshot = yrs_pds::encode_snapshot(&manifest_doc); 380 + // Add manifest base update to pending items 381 + let manifest_base = yrs_pds::encode_base_update(&manifest_doc); 382 382 let manifest_sv = yrs_pds::encode_state_vector(&manifest_doc); 383 - pending_blobs.push(PendingBlob { 383 + pending_items.push(PendingItem { 384 384 path: MANIFEST_KEY.to_string(), 385 - data: manifest_snapshot, 386 - data_type: PackDataType::Snapshot, 385 + data: manifest_base, 386 + data_type: PackDataType::BaseYrsUpdate, 387 387 }); 388 - pending_kinds.insert(MANIFEST_KEY.to_string(), PendingKind::Snapshot); 388 + pending_kinds.insert(MANIFEST_KEY.to_string(), PendingKind::Base); 389 389 390 - // Upload all blobs as a single pack 390 + // Upload all items as a single pack (becomes a PDS blob) 391 391 let total_bytes; 392 - if pending_blobs.is_empty() { 392 + if pending_items.is_empty() { 393 393 total_bytes = 0; 394 394 let manifest_entry = yrs_pds::doc_to_file_entry(&manifest_doc, client, did).await?; 395 395 file_entries.insert(MANIFEST_KEY.to_string(), manifest_entry); 396 396 } else { 397 - // Build pack blob 398 - let items: Vec<(&str, &[u8], PackDataType)> = pending_blobs 397 + // Build pack 398 + let items: Vec<(&str, &[u8], PackDataType)> = pending_items 399 399 .iter() 400 - .map(|pb| (pb.path.as_str(), pb.data.as_slice(), pb.data_type.clone())) 400 + .map(|pi| (pi.path.as_str(), pi.data.as_slice(), pi.data_type.clone())) 401 401 .collect(); 402 - let pack_blob = pack::create_compressed_pack(&items); 403 - let is_compressed = pack::is_gzip(&pack_blob.data); 404 - total_bytes = pack_blob.data.len() as u64; 402 + let pack = pack::create_compressed_pack(&items); 403 + let is_compressed = pack::is_gzip(&pack.data); 404 + total_bytes = pack.data.len() as u64; 405 405 406 - // Upload pack blob — chunk if larger than ATProto limit 407 - let (blob_ref, chunk_refs) = if pack_blob.data.len() > pack::CHUNK_SIZE { 408 - let chunks = pack::chunk_data(&pack_blob.data); 406 + // Upload pack as PDS blob — chunk if larger than ATProto limit 407 + let (blob_ref, chunk_refs) = if pack.data.len() > pack::CHUNK_SIZE { 408 + let chunks = pack::chunk_data(&pack.data); 409 409 let mut refs = Vec::new(); 410 410 for (i, chunk) in chunks.iter().enumerate() { 411 411 let r = client.upload_blob(chunk.clone()).await?; ··· 422 422 let primary = refs[0].clone(); 423 423 (primary, Some(refs)) 424 424 } else { 425 - let r = client.upload_blob(pack_blob.data).await?; 425 + let r = client.upload_blob(pack.data).await?; 426 426 (r, None) 427 427 }; 428 428 429 429 if verbose { 430 430 eprintln!( 431 - "pds-yrs: uploaded pack blob ({} bytes, {} entries{})", 431 + "pds-yrs: uploaded pack ({} bytes, {} items{})", 432 432 total_bytes, 433 - pack_blob.entries.len(), 433 + pack.items.len(), 434 434 if chunk_refs.is_some() { 435 435 ", chunked" 436 436 } else { ··· 439 439 ); 440 440 } 441 441 442 - // Update file entries with pack refs 443 - for entry in &pack_blob.entries { 444 - let pack_ref = PackRef { 442 + // Update file entries with pack item refs 443 + for item in &pack.items { 444 + let item_ref = PackItemRef { 445 445 blob: blob_ref.clone(), 446 - offset: entry.offset, 447 - length: entry.length, 446 + offset: item.offset, 447 + length: item.length, 448 448 compressed: is_compressed, 449 449 chunks: chunk_refs.clone(), 450 450 }; 451 451 452 - if entry.path == MANIFEST_KEY { 452 + if item.path == MANIFEST_KEY { 453 453 file_entries.insert( 454 454 MANIFEST_KEY.to_string(), 455 455 FileEntry { 456 456 content_hash: String::new(), 457 - snapshot_blob: blob_ref.clone(), 457 + base_blob: blob_ref.clone(), 458 458 state_vector: yrs_pds::base64_encode(&manifest_sv), 459 459 updates: vec![], 460 460 updates_count: 0, 461 - snapshot_at: chrono::Utc::now().to_rfc3339(), 461 + base_at: chrono::Utc::now().to_rfc3339(), 462 462 kind: FileKind::Text, 463 - pack_ref: Some(pack_ref), 463 + base: Some(item_ref), 464 464 conflict_source: None, 465 465 }, 466 466 ); 467 - } else if let Some(fe) = file_entries.get_mut(&entry.path) { 468 - let kind = pending_kinds.get(&entry.path).cloned().unwrap_or(PendingKind::Snapshot); 467 + } else if let Some(fe) = file_entries.get_mut(&item.path) { 468 + let kind = pending_kinds.get(&item.path).cloned().unwrap_or(PendingKind::Base); 469 469 match kind { 470 - PendingKind::Snapshot => { 471 - // Full snapshot — replace pack_ref, clear updates 472 - fe.snapshot_blob = blob_ref.clone(); 473 - fe.pack_ref = Some(pack_ref); 470 + PendingKind::Base => { 471 + // BaseYrsUpdate — replace base, clear updates 472 + fe.base_blob = blob_ref.clone(); 473 + fe.base = Some(item_ref); 474 474 fe.updates.clear(); 475 475 } 476 - PendingKind::Update => { 476 + PendingKind::Incremental => { 477 477 // Incremental update — append to updates list 478 - fe.updates.push(pack_ref); 478 + fe.updates.push(item_ref); 479 479 } 480 480 } 481 481 } ··· 486 486 let index_json = serde_json::to_vec(&file_entries) 487 487 .map_err(|e| format!("serialize FileIndex: {}", e))?; 488 488 let index_items: Vec<(&str, &[u8], PackDataType)> = 489 - vec![("__index__", &index_json, PackDataType::Snapshot)]; 489 + vec![("__index__", &index_json, PackDataType::BaseYrsUpdate)]; 490 490 let index_pack = pack::create_compressed_pack(&index_items); 491 491 let index_is_compressed = pack::is_gzip(&index_pack.data); 492 492 let index_blob_ref = client.upload_blob(index_pack.data).await?; 493 - let index_entry = &index_pack.entries[0]; 494 - let index_pack_ref = PackRef { 493 + let index_item = &index_pack.items[0]; 494 + let index_ref = PackItemRef { 495 495 blob: index_blob_ref.clone(), 496 - offset: index_entry.offset, 497 - length: index_entry.length, 496 + offset: index_item.offset, 497 + length: index_item.length, 498 498 compressed: index_is_compressed, 499 499 chunks: None, 500 500 }; ··· 521 521 let now = chrono::Utc::now().to_rfc3339(); 522 522 let record = YrsRepo { 523 523 name: project_name.to_string(), 524 - index: index_pack_ref, 524 + index: index_ref, 525 525 blobs: all_blobs, 526 526 updated_at: now, 527 527 collaborators, ··· 541 541 }) 542 542 } 543 543 544 - /// Placeholder BlobRef — will be replaced with pack ref after upload. 544 + /// Placeholder BlobRef — will be replaced with pack item ref after upload. 545 545 fn placeholder_blob_ref() -> BlobRef { 546 546 BlobRef::new( 547 547 "pending".to_string(), ··· 554 554 fn placeholder_binary_entry(hash: &str) -> FileEntry { 555 555 FileEntry { 556 556 content_hash: hash.to_string(), 557 - snapshot_blob: placeholder_blob_ref(), 557 + base_blob: placeholder_blob_ref(), 558 558 state_vector: String::new(), 559 559 updates: vec![], 560 560 updates_count: 0, 561 - snapshot_at: chrono::Utc::now().to_rfc3339(), 561 + base_at: chrono::Utc::now().to_rfc3339(), 562 562 kind: FileKind::Binary, 563 - pack_ref: None, 563 + base: None, 564 564 conflict_source: None, 565 565 } 566 566 } ··· 895 895 let entry = placeholder_binary_entry("deadbeef"); 896 896 assert_eq!(entry.kind, FileKind::Binary); 897 897 assert_eq!(entry.content_hash, "deadbeef"); 898 - assert!(entry.pack_ref.is_none()); 898 + assert!(entry.base.is_none()); 899 899 assert!(entry.conflict_source.is_none()); 900 900 assert_eq!(entry.updates_count, 0); 901 901 }
+53 -53
src/types.rs
··· 30 30 } 31 31 32 32 /// The file index — maps relative paths to FileEntry metadata. 33 - /// Stored as a blob (not inline in the record) to avoid record size limits. 33 + /// Stored as a PDS blob (not inline in the record) to avoid record size limits. 34 34 pub type FileIndex = HashMap<String, FileEntry>; 35 35 36 36 /// A repo stored on PDS with Yrs CRDT state per file. ··· 45 45 #[derive(Debug, Clone, Serialize, Deserialize)] 46 46 pub struct YrsRepo { 47 47 pub name: String, 48 - /// Pointer to the index blob within the pack (contains serialized FileIndex). 49 - pub index: PackRef, 48 + /// Pointer to the index within the PDS blob containing the pack (contains serialized FileIndex). 49 + pub index: PackItemRef, 50 50 /// All blob CIDs referenced by this repo — prevents PDS garbage collection. 51 51 pub blobs: Vec<BlobRef>, 52 52 #[serde(rename = "updatedAt")] ··· 63 63 /// For text: hash of UTF-8 content. For binary: hash of raw bytes. 64 64 #[serde(rename = "contentHash")] 65 65 pub content_hash: String, 66 - /// Full Yrs state blob reference (encode_state_as_update_v1). 67 - #[serde(rename = "snapshotBlob")] 68 - pub snapshot_blob: BlobRef, 66 + /// Blob reference for GC — CID of the PDS blob containing the BaseYrsUpdate. 67 + #[serde(rename = "baseBlob")] 68 + pub base_blob: BlobRef, 69 69 /// State vector bytes, base64-encoded for inline storage. 70 70 #[serde(rename = "stateVector")] 71 71 pub state_vector: String, 72 - /// Incremental update packs since snapshot (ordered, each points into a pack blob). 72 + /// Incremental YrsUpdates since the base (ordered, each points into a pack). 73 73 #[serde(default, skip_serializing_if = "Vec::is_empty")] 74 - pub updates: Vec<PackRef>, 75 - /// Number of incremental updates applied since last snapshot. 74 + pub updates: Vec<PackItemRef>, 75 + /// Number of incremental updates applied since last base update. 76 76 #[serde(rename = "updatesCount", default)] 77 77 pub updates_count: u32, 78 - /// When the snapshot was taken. 79 - #[serde(rename = "snapshotAt")] 80 - pub snapshot_at: String, 78 + /// When the BaseYrsUpdate was created. 79 + #[serde(rename = "baseAt")] 80 + pub base_at: String, 81 81 /// File kind (text or binary). 82 82 pub kind: FileKind, 83 - /// Pack blob reference — points to data within a pack blob. 84 - #[serde(rename = "packRef")] 85 - pub pack_ref: Option<PackRef>, 83 + /// PackItemRef to BaseYrsUpdate data within a pack. 84 + #[serde(rename = "base")] 85 + pub base: Option<PackItemRef>, 86 86 /// For binary conflict files, the original path before conflict split. 87 87 #[serde(rename = "conflictSource", skip_serializing_if = "Option::is_none")] 88 88 pub conflict_source: Option<String>, 89 89 } 90 90 91 91 92 - /// Reference to data within a pack blob. 92 + /// Reference to a pack item within a PDS blob. 93 93 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 94 - pub struct PackRef { 95 - /// The pack blob reference. 94 + pub struct PackItemRef { 95 + /// The PDS blob containing the pack. 96 96 pub blob: BlobRef, 97 - /// Byte offset within the pack blob. 97 + /// Byte offset within the pack data section. 98 98 pub offset: u64, 99 - /// Length of data within the pack blob. 99 + /// Length of data within the pack data section. 100 100 pub length: u64, 101 - /// Whether the pack blob is gzip-compressed. 101 + /// Whether the pack is gzip-compressed. 102 102 #[serde(default, skip_serializing_if = "std::ops::Not::not")] 103 103 pub compressed: bool, 104 104 /// For chunked packs (>40MB), ordered list of chunk blob refs. ··· 162 162 }; 163 163 164 164 for entry in entries.values() { 165 - add(&entry.snapshot_blob, &mut seen, &mut refs); 166 - if let Some(ref pr) = entry.pack_ref { 165 + add(&entry.base_blob, &mut seen, &mut refs); 166 + if let Some(ref pr) = entry.base { 167 167 add(&pr.blob, &mut seen, &mut refs); 168 168 if let Some(ref chunks) = pr.chunks { 169 169 for chunk in chunks { ··· 220 220 fn file_entry_serialization() { 221 221 let entry = FileEntry { 222 222 content_hash: "abc123def456".to_string(), 223 - snapshot_blob: BlobRef::new( 223 + base_blob: BlobRef::new( 224 224 "bafysnap".to_string(), 225 225 "application/octet-stream".to_string(), 226 226 100, ··· 228 228 state_vector: "AQID".to_string(), 229 229 updates: vec![], 230 230 updates_count: 0, 231 - snapshot_at: "2026-03-13T00:00:00Z".to_string(), 231 + base_at: "2026-03-13T00:00:00Z".to_string(), 232 232 kind: FileKind::Text, 233 - pack_ref: None, 233 + base: None, 234 234 conflict_source: None, 235 235 }; 236 236 let json = serde_json::to_string(&entry).unwrap(); 237 - assert!(json.contains("\"snapshotBlob\"")); 237 + assert!(json.contains("\"baseBlob\"")); 238 238 assert!(json.contains("\"stateVector\"")); 239 239 assert!(!json.contains("updatesBlob")); // skipped when None 240 240 assert!(json.contains("\"kind\":\"text\"")); // always serialized ··· 247 247 fn binary_file_entry_serialization() { 248 248 let entry = FileEntry { 249 249 content_hash: String::new(), 250 - snapshot_blob: BlobRef::new( 250 + base_blob: BlobRef::new( 251 251 "bafybin".to_string(), 252 252 "application/octet-stream".to_string(), 253 253 5000, ··· 255 255 state_vector: String::new(), 256 256 updates: vec![], 257 257 updates_count: 0, 258 - snapshot_at: "2026-03-13T00:00:00Z".to_string(), 258 + base_at: "2026-03-13T00:00:00Z".to_string(), 259 259 kind: FileKind::Binary, 260 - pack_ref: None, 260 + base: None, 261 261 conflict_source: None, 262 262 }; 263 263 let json = serde_json::to_string(&entry).unwrap(); ··· 273 273 "application/octet-stream".to_string(), 274 274 5000, 275 275 ); 276 - let index_ref = PackRef { 276 + let index_ref = PackItemRef { 277 277 blob: pack_blob.clone(), 278 278 offset: 0, 279 279 length: 200, ··· 300 300 #[test] 301 301 fn collect_blob_refs_deduplicates() { 302 302 let blob = BlobRef::new("bafyshared".to_string(), "application/octet-stream".to_string(), 100); 303 - let pack_ref = PackRef { 303 + let item_ref = PackItemRef { 304 304 blob: blob.clone(), offset: 0, length: 50, compressed: false, chunks: None, 305 305 }; 306 306 let mut index = FileIndex::new(); 307 307 index.insert("a.txt".to_string(), FileEntry { 308 308 content_hash: String::new(), 309 - snapshot_blob: blob.clone(), 309 + base_blob: blob.clone(), 310 310 state_vector: String::new(), 311 311 updates: vec![], updates_count: 0, 312 - snapshot_at: String::new(), kind: FileKind::Text, 313 - pack_ref: Some(pack_ref.clone()), conflict_source: None, 312 + base_at: String::new(), kind: FileKind::Text, 313 + base: Some(item_ref.clone()), conflict_source: None, 314 314 }); 315 315 index.insert("b.txt".to_string(), FileEntry { 316 316 content_hash: String::new(), 317 - snapshot_blob: blob.clone(), 317 + base_blob: blob.clone(), 318 318 state_vector: String::new(), 319 319 updates: vec![], updates_count: 0, 320 - snapshot_at: String::new(), kind: FileKind::Text, 321 - pack_ref: Some(pack_ref), conflict_source: None, 320 + base_at: String::new(), kind: FileKind::Text, 321 + base: Some(item_ref), conflict_source: None, 322 322 }); 323 323 let refs = collect_blob_refs(&index); 324 324 assert_eq!(refs.len(), 1, "same blob CID should be deduplicated"); ··· 332 332 let mut index = FileIndex::new(); 333 333 index.insert("a.txt".to_string(), FileEntry { 334 334 content_hash: String::new(), 335 - snapshot_blob: pack_blob.clone(), 335 + base_blob: pack_blob.clone(), 336 336 state_vector: String::new(), 337 - updates: vec![PackRef { 337 + updates: vec![PackItemRef { 338 338 blob: update_blob.clone(), offset: 0, length: 50, compressed: false, chunks: None, 339 339 }], 340 340 updates_count: 1, 341 - snapshot_at: String::new(), kind: FileKind::Text, 342 - pack_ref: Some(PackRef { 341 + base_at: String::new(), kind: FileKind::Text, 342 + base: Some(PackItemRef { 343 343 blob: pack_blob.clone(), offset: 0, length: 100, compressed: false, chunks: None, 344 344 }), 345 345 conflict_source: None, ··· 352 352 } 353 353 354 354 #[test] 355 - fn pack_ref_serialization() { 356 - let pack_ref = PackRef { 355 + fn pack_item_ref_serialization() { 356 + let item_ref = PackItemRef { 357 357 blob: BlobRef::new( 358 358 "bafypack".to_string(), 359 359 "application/octet-stream".to_string(), ··· 364 364 compressed: true, 365 365 chunks: None, 366 366 }; 367 - let json = serde_json::to_string(&pack_ref).unwrap(); 367 + let json = serde_json::to_string(&item_ref).unwrap(); 368 368 assert!(json.contains("\"compressed\":true")); 369 - let deserialized: PackRef = serde_json::from_str(&json).unwrap(); 369 + let deserialized: PackItemRef = serde_json::from_str(&json).unwrap(); 370 370 assert_eq!(deserialized.offset, 100); 371 371 assert_eq!(deserialized.length, 200); 372 372 assert!(deserialized.compressed); ··· 374 374 } 375 375 376 376 #[test] 377 - fn pack_ref_compressed_false_omitted() { 377 + fn pack_item_ref_compressed_false_omitted() { 378 378 // compressed=false should be skipped in serialization 379 - let pack_ref = PackRef { 379 + let item_ref = PackItemRef { 380 380 blob: BlobRef::new( 381 381 "bafypack".to_string(), 382 382 "application/octet-stream".to_string(), ··· 387 387 compressed: false, 388 388 chunks: None, 389 389 }; 390 - let json = serde_json::to_string(&pack_ref).unwrap(); 390 + let json = serde_json::to_string(&item_ref).unwrap(); 391 391 assert!( 392 392 !json.contains("compressed"), 393 393 "compressed=false should be omitted" ··· 395 395 } 396 396 397 397 #[test] 398 - fn pack_ref_with_chunks() { 399 - let pack_ref = PackRef { 398 + fn pack_item_ref_with_chunks() { 399 + let item_ref = PackItemRef { 400 400 blob: BlobRef::new( 401 401 "bafychunk0".to_string(), 402 402 "application/octet-stream".to_string(), ··· 418 418 ), 419 419 ]), 420 420 }; 421 - let json = serde_json::to_string(&pack_ref).unwrap(); 421 + let json = serde_json::to_string(&item_ref).unwrap(); 422 422 assert!(json.contains("bafychunk1")); 423 - let deserialized: PackRef = serde_json::from_str(&json).unwrap(); 423 + let deserialized: PackItemRef = serde_json::from_str(&json).unwrap(); 424 424 assert_eq!(deserialized.chunks.as_ref().unwrap().len(), 2); 425 425 } 426 426
+53 -54
src/yrs_pds.rs
··· 38 38 text.get_string(&txn) 39 39 } 40 40 41 - /// Encode a Doc's full state as bytes. 42 - pub fn encode_snapshot(doc: &Doc) -> Vec<u8> { 41 + /// Encode a Doc as a BaseYrsUpdate (full state against empty StateVector). 42 + pub fn encode_base_update(doc: &Doc) -> Vec<u8> { 43 43 let txn = doc.transact(); 44 44 txn.encode_state_as_update_v1(&yrs::StateVector::default()) 45 45 } ··· 58 58 Ok(txn.encode_diff_v1(&sv)) 59 59 } 60 60 61 - /// Load a Doc from a snapshot blob. 62 - pub fn doc_from_snapshot(data: &[u8]) -> Result<Doc, String> { 61 + /// Load a Doc from a BaseYrsUpdate blob. 62 + pub fn doc_from_base_update(data: &[u8]) -> Result<Doc, String> { 63 63 let doc = Doc::new(); 64 64 let _text = doc.get_or_insert_text("content"); 65 - let update = yrs::Update::decode_v1(data).map_err(|e| format!("decode snapshot: {}", e))?; 65 + let update = yrs::Update::decode_v1(data).map_err(|e| format!("decode base update: {}", e))?; 66 66 doc.transact_mut() 67 67 .apply_update(update) 68 - .map_err(|e| format!("apply snapshot: {}", e))?; 68 + .map_err(|e| format!("apply base update: {}", e))?; 69 69 Ok(doc) 70 70 } 71 71 ··· 84 84 client: &PdsClient, 85 85 did: &str, 86 86 ) -> Result<FileEntry, String> { 87 - let snapshot = encode_snapshot(doc); 87 + let base_update = encode_base_update(doc); 88 88 let sv = encode_state_vector(doc); 89 89 90 - // Upload snapshot blob 91 - let snapshot_blob = client.upload_blob(snapshot.clone()).await?; 90 + // Upload base update as a PDS blob 91 + let base_blob = client.upload_blob(base_update.clone()).await?; 92 92 93 93 // We need to reference the blob in a record for it to persist, 94 94 // so we return the FileEntry which will be embedded in a YrsRepo. ··· 98 98 99 99 Ok(FileEntry { 100 100 content_hash: String::new(), 101 - snapshot_blob, 101 + base_blob, 102 102 state_vector: base64_encode(&sv), 103 103 updates: vec![], 104 104 updates_count: 0, 105 - snapshot_at: now, 105 + base_at: now, 106 106 kind: FileKind::Text, 107 - pack_ref: None, 107 + base: None, 108 108 conflict_source: None, 109 109 }) 110 110 } 111 111 112 112 /// Reconstruct a Doc from a FileEntry by downloading blobs from PDS. 113 113 /// 114 - /// If the entry has a pack_ref, extracts data from the pack blob. 115 - /// Otherwise downloads the snapshot blob directly. 114 + /// Downloads the BaseYrsUpdate via base, then applies incremental updates. 116 115 pub async fn file_entry_to_doc( 117 116 entry: &FileEntry, 118 117 client: &PdsClient, 119 118 did: &str, 120 119 ) -> Result<Doc, String> { 121 - let snapshot_data = get_file_blob_data(entry, client, did).await?; 122 - let doc = doc_from_snapshot(&snapshot_data)?; 120 + let base_data = fetch_file_data(entry, client, did).await?; 121 + let doc = doc_from_base_update(&base_data)?; 123 122 124 123 // Apply incremental updates if present 125 124 for update_ref in &entry.updates { 126 - let update_data = get_pack_ref_data(update_ref, client, did).await?; 125 + let update_data = fetch_pack_item(update_ref, client, did).await?; 127 126 apply_update(&doc, &update_data)?; 128 127 } 129 128 130 129 Ok(doc) 131 130 } 132 131 133 - /// Extract data from a PackRef by downloading and parsing the pack blob. 134 - pub async fn get_pack_ref_data( 135 - pack_ref: &crate::types::PackRef, 132 + /// Extract data from a PackItemRef by downloading and parsing the pack from PDS. 133 + pub async fn fetch_pack_item( 134 + item_ref: &crate::types::PackItemRef, 136 135 client: &PdsClient, 137 136 did: &str, 138 137 ) -> Result<Vec<u8>, String> { 139 - let pack_data = if let Some(ref chunks) = pack_ref.chunks { 138 + let pack_data = if let Some(ref chunks) = item_ref.chunks { 140 139 let mut chunk_data = Vec::new(); 141 140 for chunk_ref in chunks { 142 141 chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); 143 142 } 144 143 crate::pack::reassemble_chunks(&chunk_data) 145 144 } else { 146 - client.get_blob(did, pack_ref.blob.cid()).await? 145 + client.get_blob(did, item_ref.blob.cid()).await? 147 146 }; 148 - let (_, blob_data) = crate::pack::parse_pack_auto(&pack_data)?; 149 - let start = pack_ref.offset as usize; 150 - let end = start + pack_ref.length as usize; 151 - if end > blob_data.len() { 147 + let (_, data_section) = crate::pack::parse_pack_auto(&pack_data)?; 148 + let start = item_ref.offset as usize; 149 + let end = start + item_ref.length as usize; 150 + if end > data_section.len() { 152 151 return Err(format!( 153 - "pack_ref out of bounds: {}..{} in {} bytes", 154 - start, end, blob_data.len() 152 + "pack item ref out of bounds: {}..{} in {} bytes", 153 + start, end, data_section.len() 155 154 )); 156 155 } 157 - Ok(blob_data[start..end].to_vec()) 156 + Ok(data_section[start..end].to_vec()) 158 157 } 159 158 160 - /// Get the raw blob data for a FileEntry, handling pack_ref extraction. 161 - pub async fn get_file_blob_data( 159 + /// Get the raw data for a FileEntry's base update, via its base PackItemRef. 160 + pub async fn fetch_file_data( 162 161 entry: &FileEntry, 163 162 client: &PdsClient, 164 163 did: &str, 165 164 ) -> Result<Vec<u8>, String> { 166 - let pack_ref = entry.pack_ref.as_ref() 167 - .ok_or("missing pack_ref on FileEntry")?; 168 - get_pack_ref_data(pack_ref, client, did).await 165 + let item_ref = entry.base.as_ref() 166 + .ok_or("missing base on FileEntry")?; 167 + fetch_pack_item(item_ref, client, did).await 169 168 } 170 169 171 170 /// Load the FileIndex from a YrsRepo's index blob. ··· 174 173 client: &PdsClient, 175 174 did: &str, 176 175 ) -> Result<FileIndex, String> { 177 - let data = get_pack_ref_data(&repo.index, client, did).await?; 176 + let data = fetch_pack_item(&repo.index, client, did).await?; 178 177 serde_json::from_slice(&data).map_err(|e| format!("parse FileIndex: {}", e)) 179 178 } 180 179 ··· 360 359 result 361 360 } 362 361 363 - /// Encode the manifest Doc as a snapshot (full state). 364 - pub fn encode_manifest_snapshot(doc: &Doc) -> Vec<u8> { 365 - encode_snapshot(doc) 362 + /// Encode the manifest Doc as a BaseYrsUpdate (full state). 363 + pub fn encode_manifest_base_update(doc: &Doc) -> Vec<u8> { 364 + encode_base_update(doc) 366 365 } 367 366 368 367 /// Materialize manifest content as a string (for the content field). ··· 382 381 lines.join("\n") 383 382 } 384 383 385 - /// Restore a manifest Doc from a snapshot. 386 - pub fn manifest_from_snapshot(data: &[u8]) -> Result<Doc, String> { 384 + /// Restore a manifest Doc from a BaseYrsUpdate. 385 + pub fn manifest_from_base_update(data: &[u8]) -> Result<Doc, String> { 387 386 let doc = Doc::new(); 388 387 let _map = doc.get_or_insert_map("manifest"); 389 388 let update = 390 - yrs::Update::decode_v1(data).map_err(|e| format!("decode manifest snapshot: {}", e))?; 389 + yrs::Update::decode_v1(data).map_err(|e| format!("decode manifest base update: {}", e))?; 391 390 doc.transact_mut() 392 391 .apply_update(update) 393 - .map_err(|e| format!("apply manifest snapshot: {}", e))?; 392 + .map_err(|e| format!("apply manifest base update: {}", e))?; 394 393 Ok(doc) 395 394 } 396 395 ··· 461 460 let doc = doc_from_text(content); 462 461 assert_eq!(materialize(&doc), content); 463 462 464 - let snapshot = encode_snapshot(&doc); 465 - let restored = doc_from_snapshot(&snapshot).unwrap(); 463 + let snapshot = encode_base_update(&doc); 464 + let restored = doc_from_base_update(&snapshot).unwrap(); 466 465 assert_eq!(materialize(&restored), content); 467 466 } 468 467 469 468 #[test] 470 469 fn incremental_update() { 471 470 let doc = doc_from_text("Hello"); 472 - let snapshot = encode_snapshot(&doc); 471 + let snapshot = encode_base_update(&doc); 473 472 let sv = encode_state_vector(&doc); 474 473 475 474 // Apply an edit ··· 484 483 let diff = encode_diff(&doc, &sv).unwrap(); 485 484 486 485 // Apply diff to a copy restored from the same snapshot (same client history) 487 - let doc2 = doc_from_snapshot(&snapshot).unwrap(); 486 + let doc2 = doc_from_base_update(&snapshot).unwrap(); 488 487 assert_eq!(materialize(&doc2), "Hello"); 489 488 apply_update(&doc2, &diff).unwrap(); 490 489 assert_eq!(materialize(&doc2), "Hello world"); ··· 530 529 manifest_insert(&doc, "docs/readme.md", &FileKind::Text); 531 530 manifest_insert(&doc, "images/photo.jpg", &FileKind::Binary); 532 531 533 - let snapshot = encode_manifest_snapshot(&doc); 534 - let restored = manifest_from_snapshot(&snapshot).unwrap(); 532 + let snapshot = encode_manifest_base_update(&doc); 533 + let restored = manifest_from_base_update(&snapshot).unwrap(); 535 534 let entries = manifest_entries(&restored); 536 535 assert_eq!(entries.len(), 2); 537 536 assert_eq!(entries.get("docs/readme.md"), Some(&FileKind::Text)); ··· 542 541 // Two repos start from same base 543 542 let base = new_manifest_doc(); 544 543 manifest_insert(&base, "shared.md", &FileKind::Text); 545 - let base_snapshot = encode_manifest_snapshot(&base); 544 + let base_snapshot = encode_manifest_base_update(&base); 546 545 547 546 // Repo A adds a file 548 - let repo_a = manifest_from_snapshot(&base_snapshot).unwrap(); 547 + let repo_a = manifest_from_base_update(&base_snapshot).unwrap(); 549 548 manifest_insert(&repo_a, "page-a.md", &FileKind::Text); 550 549 551 550 // Repo B adds a different file 552 - let repo_b = manifest_from_snapshot(&base_snapshot).unwrap(); 551 + let repo_b = manifest_from_base_update(&base_snapshot).unwrap(); 553 552 manifest_insert(&repo_b, "page-b.md", &FileKind::Text); 554 553 555 554 // Merge B into A ··· 571 570 // Base has a file 572 571 let base = new_manifest_doc(); 573 572 manifest_insert(&base, "file.md", &FileKind::Text); 574 - let base_snapshot = encode_manifest_snapshot(&base); 573 + let base_snapshot = encode_manifest_base_update(&base); 575 574 576 575 // Repo A deletes the file 577 - let repo_a = manifest_from_snapshot(&base_snapshot).unwrap(); 576 + let repo_a = manifest_from_base_update(&base_snapshot).unwrap(); 578 577 manifest_remove(&repo_a, "file.md"); 579 578 580 579 // Repo B re-asserts the file (simulating an edit) 581 - let repo_b = manifest_from_snapshot(&base_snapshot).unwrap(); 580 + let repo_b = manifest_from_base_update(&base_snapshot).unwrap(); 582 581 manifest_insert(&repo_b, "file.md", &FileKind::Text); 583 582 584 583 // Merge B into A — set should win over delete
+8 -8
tests/e2e_tests.rs
··· 113 113 create_sample_site(source.path()).await; 114 114 115 115 // Save to PDS 116 - let result = pds_yrs::save(source.path(), &client, &did, &rkey, false) 116 + let result = pds_yrs::save(source.path(), &client, &did, &rkey, "e2e-test", None, false) 117 117 .await 118 118 .unwrap(); 119 119 assert_eq!(result.files_uploaded, 3); ··· 155 155 create_sample_site(site.path()).await; 156 156 157 157 // First save 158 - let result = pds_yrs::save(site.path(), &client, &did, &rkey, false) 158 + let result = pds_yrs::save(site.path(), &client, &did, &rkey, "e2e-test", None, false) 159 159 .await 160 160 .unwrap(); 161 161 assert_eq!(result.files_uploaded, 3); ··· 164 164 write_file(site.path(), "index.md", "# Home\n\nUpdated content.").await; 165 165 166 166 // Second save — should only upload 1 changed file 167 - let result = pds_yrs::save(site.path(), &client, &did, &rkey, false) 167 + let result = pds_yrs::save(site.path(), &client, &did, &rkey, "e2e-test", None, false) 168 168 .await 169 169 .unwrap(); 170 170 assert_eq!( ··· 217 217 .await; 218 218 219 219 // Save initial state for both 220 - pds_yrs::save(repo_a.path(), &client, &did, &rkey_a, false) 220 + pds_yrs::save(repo_a.path(), &client, &did, &rkey_a, "e2e-merge", None, false) 221 221 .await 222 222 .unwrap(); 223 - pds_yrs::save(repo_b.path(), &client, &did, &rkey_b, false) 223 + pds_yrs::save(repo_b.path(), &client, &did, &rkey_b, "e2e-merge", None, false) 224 224 .await 225 225 .unwrap(); 226 226 ··· 231 231 "# Shared\n\nOriginal content.\n\nAlice's addition.\n", 232 232 ) 233 233 .await; 234 - pds_yrs::save(repo_a.path(), &client, &did, &rkey_a, false) 234 + pds_yrs::save(repo_a.path(), &client, &did, &rkey_a, "e2e-merge", None, false) 235 235 .await 236 236 .unwrap(); 237 237 ··· 242 242 "# Shared\n\nBob's edit to original content.\n", 243 243 ) 244 244 .await; 245 - pds_yrs::save(repo_b.path(), &client, &did, &rkey_b, false) 245 + pds_yrs::save(repo_b.path(), &client, &did, &rkey_b, "e2e-merge", None, false) 246 246 .await 247 247 .unwrap(); 248 248 ··· 279 279 280 280 let source = tempfile::tempdir().unwrap(); 281 281 create_sample_site(source.path()).await; 282 - pds_yrs::save(source.path(), &client, &did, &rkey, false) 282 + pds_yrs::save(source.path(), &client, &did, &rkey, "e2e-test", None, false) 283 283 .await 284 284 .unwrap(); 285 285