wip: you can use your pds as a yrs-relay if you want to
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fileindex as blob

notplants c2ef5c71 559081ed

+310 -217
+31 -9
src/export.rs
··· 1 1 //! Export repo content from PDS as plain text files. 2 2 //! 3 - //! This is the data portability escape hatch — reads the `content` field 4 - //! from each FileEntry without requiring Yrs decoding. 3 + //! Reconstructs files from Yrs CRDT snapshots and writes to disk. 4 + //! Functionally equivalent to load, but positioned as the 5 + //! data portability escape hatch. 5 6 6 7 use std::path::Path; 7 8 8 9 use crate::pds_client::PdsClient; 9 - use crate::types::{YrsRepo, COLLECTION}; 10 + use crate::types::{FileKind, YrsRepo, COLLECTION}; 11 + use crate::yrs_pds; 10 12 11 13 /// Export a repo from PDS to plain text files. 12 14 /// 13 - /// Reads only the `content` field from each FileEntry — no Yrs 14 - /// decoding or blob downloads needed. This works even if the Yrs 15 - /// library is unavailable. 15 + /// Reconstructs content from Yrs snapshots (for text files) or 16 + /// downloads raw blobs (for binary files). 16 17 pub async fn export( 17 18 client: &PdsClient, 18 19 did: &str, ··· 29 30 let repo: YrsRepo = 30 31 serde_json::from_value(record.value).map_err(|e| format!("parse YrsRepo: {}", e))?; 31 32 33 + // Load file index from blob 34 + let file_index = yrs_pds::load_file_index(&repo, client, did).await?; 35 + 32 36 let mut files_exported = 0; 33 37 34 - for (rel_path, entry) in &repo.files { 38 + for (rel_path, entry) in &file_index { 39 + // Skip internal entries 40 + if rel_path.starts_with("pdsyrs_") { 41 + continue; 42 + } 35 43 if verbose { 36 44 eprintln!("pds-yrs: export {}", rel_path); 37 45 } ··· 41 49 std::fs::create_dir_all(parent) 42 50 .map_err(|e| format!("create dir {:?}: {}", parent, e))?; 43 51 } 44 - std::fs::write(&output_path, &entry.content) 45 - .map_err(|e| format!("write {:?}: {}", output_path, e))?; 52 + 53 + match entry.kind { 54 + FileKind::Text => { 55 + let doc = yrs_pds::file_entry_to_doc(entry, client, did).await?; 56 + let content = yrs_pds::materialize(&doc); 57 + std::fs::write(&output_path, &content) 58 + .map_err(|e| format!("write {:?}: {}", output_path, e))?; 59 + } 60 + FileKind::Binary => { 61 + let pack_ref = entry.pack_ref.as_ref() 62 + .ok_or_else(|| format!("missing pack_ref for binary file: {}", rel_path))?; 63 + let data = yrs_pds::get_pack_ref_data(pack_ref, client, did).await?; 64 + std::fs::write(&output_path, &data) 65 + .map_err(|e| format!("write {:?}: {}", output_path, e))?; 66 + } 67 + } 46 68 47 69 files_exported += 1; 48 70 }
+1 -1
src/lib.rs
··· 26 26 pub use pds_client::PdsClient; 27 27 pub use save::{save, save_filtered}; 28 28 pub use sync::{sync_loop, SyncConfig, SyncCycleResult}; 29 - pub use types::{Collaborator, FileKind, COLLECTION, MANIFEST_KEY}; 29 + pub use types::{collect_blob_refs, Collaborator, FileIndex, FileKind, COLLECTION, MANIFEST_KEY};
+55 -67
src/load.rs
··· 5 5 6 6 use crate::pds_client::PdsClient; 7 7 use crate::types::{FileKind, LoadResult, YrsRepo, COLLECTION, MANIFEST_KEY}; 8 - use crate::yrs_pds; 8 + use crate::yrs_pds::{self}; 9 9 10 10 /// Load a repo from PDS into a directory. 11 11 /// ··· 28 28 let repo: YrsRepo = 29 29 serde_json::from_value(record.value).map_err(|e| format!("parse YrsRepo: {}", e))?; 30 30 31 + // Load file index from blob 32 + let file_index = yrs_pds::load_file_index(&repo, client, did).await?; 33 + 31 34 // Determine which files to load 32 - let file_list = if let Some(manifest_entry) = repo.files.get(MANIFEST_KEY) { 35 + let file_list = if let Some(manifest_entry) = file_index.get(MANIFEST_KEY) { 33 36 let manifest_doc = yrs_pds::file_entry_to_doc(manifest_entry, client, did).await?; 34 37 let entries = yrs_pds::manifest_entries(&manifest_doc); 35 38 if verbose { ··· 46 49 // Cache for pack blobs (keyed by CID) to avoid re-downloading 47 50 let mut pack_cache: HashMap<String, Vec<u8>> = HashMap::new(); 48 51 49 - for (rel_path, entry) in &repo.files { 52 + for (rel_path, entry) in &file_index { 50 53 if rel_path == MANIFEST_KEY { 51 54 continue; 52 55 } ··· 84 87 .map_err(|e| format!("write {:?}: {}", output_path, e))?; 85 88 } 86 89 FileKind::Text => { 87 - // If pack_ref exists, use cached pack blob extraction 88 - if entry.pack_ref.is_some() { 89 - let snapshot_data = get_blob_data_cached( 90 - entry, 90 + let snapshot_data = get_blob_data_cached( 91 + entry, 92 + client, 93 + did, 94 + &mut pack_cache, 95 + &mut blobs_downloaded, 96 + ) 97 + .await?; 98 + let doc = yrs_pds::doc_from_snapshot(&snapshot_data)?; 99 + // Apply incremental updates from pack refs 100 + for update_ref in &entry.updates { 101 + let update_data = get_pack_ref_data_cached( 102 + update_ref, 91 103 client, 92 104 did, 93 105 &mut pack_cache, 94 106 &mut blobs_downloaded, 95 107 ) 96 108 .await?; 97 - let doc = yrs_pds::doc_from_snapshot(&snapshot_data)?; 98 - // Apply incremental updates from pack refs 99 - for update_ref in &entry.updates { 100 - let update_data = get_pack_ref_data_cached( 101 - update_ref, 102 - client, 103 - did, 104 - &mut pack_cache, 105 - &mut blobs_downloaded, 106 - ) 107 - .await?; 108 - yrs_pds::apply_update(&doc, &update_data)?; 109 - } 110 - let content = yrs_pds::materialize(&doc); 111 - std::fs::write(&output_path, &content) 112 - .map_err(|e| format!("write {:?}: {}", output_path, e))?; 113 - } else { 114 - // Direct blob download (no pack ref) 115 - let doc = yrs_pds::file_entry_to_doc(entry, client, did).await?; 116 - blobs_downloaded += 1; 117 - blobs_downloaded += entry.updates.len(); 118 - let content = yrs_pds::materialize(&doc); 119 - std::fs::write(&output_path, &content) 120 - .map_err(|e| format!("write {:?}: {}", output_path, e))?; 109 + yrs_pds::apply_update(&doc, &update_data)?; 121 110 } 111 + let content = yrs_pds::materialize(&doc); 112 + std::fs::write(&output_path, &content) 113 + .map_err(|e| format!("write {:?}: {}", output_path, e))?; 122 114 } 123 115 } 124 116 ··· 187 179 pack_cache: &mut HashMap<String, Vec<u8>>, 188 180 blobs_downloaded: &mut usize, 189 181 ) -> Result<Vec<u8>, String> { 190 - if let Some(ref pack_ref) = entry.pack_ref { 191 - let cid = pack_ref.blob.cid().to_string(); 182 + let pack_ref = entry.pack_ref.as_ref() 183 + .ok_or("missing pack_ref on FileEntry")?; 184 + let cid = pack_ref.blob.cid().to_string(); 192 185 193 - // Fetch pack blob (or use cache), handling chunked packs 194 - if !pack_cache.contains_key(&cid) { 195 - let data = if let Some(ref chunks) = pack_ref.chunks { 196 - // Reassemble chunked pack 197 - let mut chunk_data = Vec::new(); 198 - for chunk_ref in chunks { 199 - let chunk = client.get_blob(did, chunk_ref.cid()).await?; 200 - *blobs_downloaded += 1; 201 - chunk_data.push(chunk); 202 - } 203 - crate::pack::reassemble_chunks(&chunk_data) 204 - } else { 205 - let d = client.get_blob(did, &cid).await?; 186 + // Fetch pack blob (or use cache), handling chunked packs 187 + if !pack_cache.contains_key(&cid) { 188 + let data = if let Some(ref chunks) = pack_ref.chunks { 189 + // Reassemble chunked pack 190 + let mut chunk_data = Vec::new(); 191 + for chunk_ref in chunks { 192 + let chunk = client.get_blob(did, chunk_ref.cid()).await?; 206 193 *blobs_downloaded += 1; 207 - d 208 - }; 209 - pack_cache.insert(cid.clone(), data); 210 - } 194 + chunk_data.push(chunk); 195 + } 196 + crate::pack::reassemble_chunks(&chunk_data) 197 + } else { 198 + let d = client.get_blob(did, &cid).await?; 199 + *blobs_downloaded += 1; 200 + d 201 + }; 202 + pack_cache.insert(cid.clone(), data); 203 + } 211 204 212 - let pack_data = pack_cache.get(&cid).unwrap(); 213 - let (_, blob_data) = crate::pack::parse_pack_auto(pack_data)?; 205 + let pack_data = pack_cache.get(&cid).unwrap(); 206 + let (_, blob_data) = crate::pack::parse_pack_auto(pack_data)?; 214 207 215 - let start = pack_ref.offset as usize; 216 - let end = start + pack_ref.length as usize; 217 - if end > blob_data.len() { 218 - return Err(format!( 219 - "pack_ref out of bounds: {}..{} in {} bytes", 220 - start, 221 - end, 222 - blob_data.len() 223 - )); 224 - } 225 - Ok(blob_data[start..end].to_vec()) 226 - } else { 227 - let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 228 - *blobs_downloaded += 1; 229 - Ok(data) 208 + let start = pack_ref.offset as usize; 209 + let end = start + pack_ref.length as usize; 210 + if end > blob_data.len() { 211 + return Err(format!( 212 + "pack_ref out of bounds: {}..{} in {} bytes", 213 + start, 214 + end, 215 + blob_data.len() 216 + )); 230 217 } 218 + Ok(blob_data[start..end].to_vec()) 231 219 }
+2
src/local_state.rs
··· 329 329 did: "did:plc:abc123".to_string(), 330 330 project: "my-site".to_string(), 331 331 repo_rkey: "my-site-a1b2c3d4".to_string(), 332 + access_token: None, 333 + refresh_token: None, 332 334 }; 333 335 state.save_config(&config).unwrap(); 334 336
+3 -7
src/main.rs
··· 580 580 .next() 581 581 .unwrap_or("?") 582 582 .to_string(); 583 - let file_count = repo 584 - .files 585 - .keys() 586 - .filter(|k| !k.starts_with("pdsyrs_")) 587 - .count(); 583 + let blob_count = repo.blobs.len(); 588 584 let collab_count = repo.collaborators.len(); 589 - entries.push((repo.name, rkey, repo.updated_at, file_count, collab_count)); 585 + entries.push((repo.name, rkey, repo.updated_at, blob_count, collab_count)); 590 586 } 591 587 } 592 588 ··· 595 591 return Ok(()); 596 592 } 597 593 598 - println!("{:<20} {:<30} {:<25} {:<6} {}", "PROJECT", "RKEY", "UPDATED", "FILES", "COLLABS"); 594 + println!("{:<20} {:<30} {:<25} {:<6} {}", "PROJECT", "RKEY", "UPDATED", "BLOBS", "COLLABS"); 599 595 for (name, rkey, updated, files, collabs) in &entries { 600 596 println!("{:<20} {:<30} {:<25} {:<6} {}", name, rkey, updated, files, collabs); 601 597 }
+21 -23
src/merge.rs
··· 8 8 9 9 use crate::pack; 10 10 use crate::pds_client::PdsClient; 11 - use crate::types::{FileEntry, FileKind, PackRef, YrsRepo, COLLECTION, MANIFEST_KEY}; 11 + use crate::types::{FileEntry, FileIndex, FileKind, PackRef, YrsRepo, COLLECTION, MANIFEST_KEY}; 12 12 use crate::yrs_pds; 13 13 14 14 /// Merge all repos for a project. ··· 87 87 output_dir: &Path, 88 88 verbose: bool, 89 89 ) -> Result<(), String> { 90 - // Fetch all repo records 91 - let mut repos: Vec<(String, YrsRepo)> = Vec::new(); 90 + // Fetch all repo records and load their file indices 91 + let mut repos: Vec<(String, YrsRepo, FileIndex)> = Vec::new(); 92 92 for rkey in rkeys { 93 93 let record = client 94 94 .get_record(did, COLLECTION, rkey) ··· 96 96 .ok_or_else(|| format!("repo record not found: {}", rkey))?; 97 97 let repo: YrsRepo = serde_json::from_value(record.value) 98 98 .map_err(|e| format!("parse YrsRepo for {}: {}", rkey, e))?; 99 - repos.push((rkey.to_string(), repo)); 99 + let file_index = yrs_pds::load_file_index(&repo, client, did).await?; 100 + repos.push((rkey.to_string(), repo, file_index)); 100 101 } 101 102 102 103 // Pack cache: keyed by CID, avoids redundant blob downloads. ··· 119 120 for (rel_path, kind) in &manifest_entries { 120 121 // Collect which repos have this file 121 122 let mut repo_indices: Vec<usize> = Vec::new(); 122 - for (i, (_, repo)) in repos.iter().enumerate() { 123 - if repo.files.contains_key(rel_path) { 123 + for (i, (_, _, file_index)) in repos.iter().enumerate() { 124 + if file_index.contains_key(rel_path) { 124 125 repo_indices.push(i); 125 126 } 126 127 } ··· 179 180 180 181 /// CRDT-merge manifest Maps from all repos. 181 182 async fn merge_manifests( 182 - repos: &[(String, YrsRepo)], 183 + repos: &[(String, YrsRepo, FileIndex)], 183 184 client: &PdsClient, 184 185 did: &str, 185 186 pack_cache: &mut HashMap<String, Vec<u8>>, 186 187 ) -> Result<Doc, String> { 187 188 let mut manifest_docs: Vec<Doc> = Vec::new(); 188 189 189 - for (_rkey, repo) in repos { 190 - let doc = if let Some(manifest_entry) = repo.files.get(MANIFEST_KEY) { 190 + for (_rkey, _repo, file_index) in repos { 191 + let doc = if let Some(manifest_entry) = file_index.get(MANIFEST_KEY) { 191 192 file_entry_to_doc_cached(manifest_entry, client, did, pack_cache).await? 192 193 } else { 193 - // Legacy repo — create manifest from its files 194 194 let doc = yrs_pds::new_manifest_doc(); 195 - for (path, entry) in &repo.files { 195 + for (path, entry) in file_index { 196 196 if path != MANIFEST_KEY { 197 197 yrs_pds::manifest_insert(&doc, path, &entry.kind); 198 198 } ··· 223 223 async fn merge_text_file( 224 224 rel_path: &str, 225 225 repo_indices: &[usize], 226 - repos: &[(String, YrsRepo)], 226 + repos: &[(String, YrsRepo, FileIndex)], 227 227 client: &PdsClient, 228 228 did: &str, 229 229 pack_cache: &mut HashMap<String, Vec<u8>>, 230 230 ) -> Result<String, String> { 231 231 if repo_indices.len() == 1 { 232 - let entry = &repos[repo_indices[0]].1.files[rel_path]; 232 + let entry = &repos[repo_indices[0]].2[rel_path]; 233 233 let doc = file_entry_to_doc_cached(entry, client, did, pack_cache).await?; 234 234 return Ok(yrs_pds::materialize(&doc)); 235 235 } 236 236 237 237 let mut docs: Vec<Doc> = Vec::new(); 238 238 for &idx in repo_indices { 239 - let entry = &repos[idx].1.files[rel_path]; 239 + let entry = &repos[idx].2[rel_path]; 240 240 let doc = file_entry_to_doc_cached(entry, client, did, pack_cache).await?; 241 241 docs.push(doc); 242 242 } ··· 258 258 async fn merge_binary_file( 259 259 rel_path: &str, 260 260 repo_indices: &[usize], 261 - repos: &[(String, YrsRepo)], 261 + repos: &[(String, YrsRepo, FileIndex)], 262 262 rkeys: &[&str], 263 263 client: &PdsClient, 264 264 did: &str, ··· 266 266 ) -> Result<(), String> { 267 267 if repo_indices.len() == 1 { 268 268 // Only one repo has this binary file — just download it 269 - let entry = &repos[repo_indices[0]].1.files[rel_path]; 269 + let entry = &repos[repo_indices[0]].2[rel_path]; 270 270 let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 271 271 let output_path = output_dir.join(rel_path); 272 272 std::fs::write(&output_path, &data) ··· 277 277 // Collect CIDs from all repos 278 278 let mut cid_repo: HashMap<String, Vec<usize>> = HashMap::new(); 279 279 for &idx in repo_indices { 280 - let entry = &repos[idx].1.files[rel_path]; 280 + let entry = &repos[idx].2[rel_path]; 281 281 cid_repo 282 282 .entry(entry.snapshot_blob.cid().to_string()) 283 283 .or_default() ··· 286 286 287 287 if cid_repo.len() == 1 { 288 288 // All repos have the same CID — no conflict 289 - let entry = &repos[repo_indices[0]].1.files[rel_path]; 289 + let entry = &repos[repo_indices[0]].2[rel_path]; 290 290 let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 291 291 let output_path = output_dir.join(rel_path); 292 292 std::fs::write(&output_path, &data) ··· 344 344 did: &str, 345 345 pack_cache: &mut HashMap<String, Vec<u8>>, 346 346 ) -> Result<Vec<u8>, String> { 347 - if let Some(ref pack_ref) = entry.pack_ref { 348 - get_pack_ref_data_cached(pack_ref, client, did, pack_cache).await 349 - } else { 350 - client.get_blob(did, entry.snapshot_blob.cid()).await 351 - } 347 + let pack_ref = entry.pack_ref.as_ref() 348 + .ok_or("missing pack_ref on FileEntry")?; 349 + get_pack_ref_data_cached(pack_ref, client, did, pack_cache).await 352 350 } 353 351 354 352 /// Extract data from a PackRef, using pack cache to avoid redundant downloads.
+54 -24
src/save.rs
··· 6 6 use crate::pack::{self, PackDataType}; 7 7 use crate::pds_client::PdsClient; 8 8 use crate::types::{ 9 - BlobRef, Collaborator, FileEntry, FileKind, PackRef, SaveResult, YrsRepo, COLLECTION, 10 - MANIFEST_KEY, 9 + collect_blob_refs, BlobRef, Collaborator, FileEntry, FileIndex, FileKind, PackRef, SaveResult, 10 + YrsRepo, COLLECTION, MANIFEST_KEY, 11 11 }; 12 12 use crate::yrs_pds; 13 13 ··· 88 88 .and_then(|r| serde_json::from_value(r.value.clone()).ok()); 89 89 let swap_cid = existing.as_ref().and_then(|r| r.cid.clone()); 90 90 91 + // Load existing file index from blob 92 + let existing_index: Option<FileIndex> = if let Some(ref repo) = existing_repo { 93 + Some(yrs_pds::load_file_index(repo, client, did).await?) 94 + } else { 95 + None 96 + }; 97 + 91 98 // Reconstruct or create manifest 92 - let manifest_doc = if let Some(ref repo) = existing_repo { 93 - if let Some(manifest_entry) = repo.files.get(MANIFEST_KEY) { 99 + let manifest_doc = if let Some(ref index) = existing_index { 100 + if let Some(manifest_entry) = index.get(MANIFEST_KEY) { 94 101 yrs_pds::file_entry_to_doc(manifest_entry, client, did).await? 95 102 } else { 96 103 let doc = yrs_pds::new_manifest_doc(); 97 - for (path, entry) in &repo.files { 104 + for (path, entry) in index { 98 105 if path != MANIFEST_KEY { 99 106 yrs_pds::manifest_insert(&doc, path, &entry.kind); 100 107 } ··· 140 147 }; 141 148 142 149 // Check if file changed since last save 143 - if let Some(ref repo) = existing_repo { 144 - if let Some(existing_entry) = repo.files.get(rel_path) { 145 - if existing_entry.content == content { 150 + if let Some(ref index) = existing_index { 151 + if let Some(existing_entry) = index.get(rel_path) { 152 + if existing_entry.content_hash == hex_hash(content.as_bytes()) { 146 153 file_entries.insert(rel_path.clone(), existing_entry.clone()); 147 154 files_skipped += 1; 148 155 if verbose { ··· 155 162 yrs_pds::manifest_insert(&manifest_doc, rel_path, &FileKind::Text); 156 163 157 164 // Incremental update path: compute diff only 158 - if existing_entry.pack_ref.is_some() 159 - && existing_entry.updates_count + 1 < COMPACTION_THRESHOLD 165 + if existing_entry.updates_count + 1 < COMPACTION_THRESHOLD 160 166 { 161 167 if let Ok(doc) = 162 168 reconstruct_and_diff(existing_entry, content, client, did).await ··· 176 182 file_entries.insert( 177 183 rel_path.clone(), 178 184 FileEntry { 179 - content: materialized, 185 + content_hash: hex_hash(materialized.as_bytes()), 180 186 snapshot_blob: existing_entry.snapshot_blob.clone(), 181 187 state_vector: yrs_pds::base64_encode(&sv), 182 188 updates: existing_entry.updates.clone(), ··· 223 229 file_entries.insert( 224 230 rel_path.clone(), 225 231 FileEntry { 226 - content: content.to_string(), 232 + content_hash: hex_hash(content.as_bytes()), 227 233 snapshot_blob: placeholder_blob_ref(), 228 234 state_vector: yrs_pds::base64_encode(&sv), 229 235 updates: vec![], ··· 240 246 } 241 247 } 242 248 FileKind::Binary => { 243 - if let Some(ref repo) = existing_repo { 244 - if let Some(existing_entry) = repo.files.get(rel_path) { 249 + if let Some(ref index) = existing_index { 250 + if let Some(existing_entry) = index.get(rel_path) { 245 251 if existing_entry.kind == FileKind::Binary { 246 252 let hash = hex_hash(file_data); 247 - if existing_entry.content == hash { 253 + if existing_entry.content_hash == hash { 248 254 file_entries.insert(rel_path.clone(), existing_entry.clone()); 249 255 files_skipped += 1; 250 256 if verbose { ··· 307 313 } 308 314 }; 309 315 // Reconstruct full doc if we have existing state, else create fresh 310 - let doc = if let Some(ref repo) = existing_repo { 311 - if let Some(existing_entry) = repo.files.get(rel_path) { 316 + let doc = if let Some(ref index) = existing_index { 317 + if let Some(existing_entry) = index.get(rel_path) { 312 318 if let Ok(d) = reconstruct_and_diff(existing_entry, content, client, did).await { 313 319 d 314 320 } else { ··· 332 338 file_entries.insert( 333 339 rel_path.clone(), 334 340 FileEntry { 335 - content: materialized, 341 + content_hash: hex_hash(materialized.as_bytes()), 336 342 snapshot_blob: placeholder_blob_ref(), 337 343 state_vector: yrs_pds::base64_encode(&sv), 338 344 updates: vec![], ··· 444 450 }; 445 451 446 452 if entry.path == MANIFEST_KEY { 447 - let manifest_content = yrs_pds::materialize_manifest_content(&manifest_doc); 448 453 file_entries.insert( 449 454 MANIFEST_KEY.to_string(), 450 455 FileEntry { 451 - content: manifest_content, 456 + content_hash: String::new(), 452 457 snapshot_blob: blob_ref.clone(), 453 458 state_vector: yrs_pds::base64_encode(&manifest_sv), 454 459 updates: vec![], ··· 477 482 } 478 483 } 479 484 485 + // Serialize file index as a blob entry in a separate index pack 486 + let index_json = serde_json::to_vec(&file_entries) 487 + .map_err(|e| format!("serialize FileIndex: {}", e))?; 488 + let index_items: Vec<(&str, &[u8], PackDataType)> = 489 + vec![("__index__", &index_json, PackDataType::Snapshot)]; 490 + let index_pack = pack::create_compressed_pack(&index_items); 491 + let index_is_compressed = pack::is_gzip(&index_pack.data); 492 + let index_blob_ref = client.upload_blob(index_pack.data).await?; 493 + let index_entry = &index_pack.entries[0]; 494 + let index_pack_ref = PackRef { 495 + blob: index_blob_ref.clone(), 496 + offset: index_entry.offset, 497 + length: index_entry.length, 498 + compressed: index_is_compressed, 499 + chunks: None, 500 + }; 501 + 502 + // Collect all blob refs for GC prevention 503 + let mut all_blobs = collect_blob_refs(&file_entries); 504 + // Also include the index blob itself 505 + if !all_blobs.iter().any(|b| b.cid() == index_blob_ref.cid()) { 506 + all_blobs.push(index_blob_ref); 507 + } 508 + 480 509 // Build YrsRepo — preserve existing collaborators, merge with any new ones 481 510 let mut collaborators = existing_repo 482 511 .as_ref() ··· 492 521 let now = chrono::Utc::now().to_rfc3339(); 493 522 let record = YrsRepo { 494 523 name: project_name.to_string(), 495 - files: file_entries, 524 + index: index_pack_ref, 525 + blobs: all_blobs, 496 526 updated_at: now, 497 527 collaborators, 498 528 }; ··· 523 553 /// Placeholder binary FileEntry. 524 554 fn placeholder_binary_entry(hash: &str) -> FileEntry { 525 555 FileEntry { 526 - content: hash.to_string(), 556 + content_hash: hash.to_string(), 527 557 snapshot_blob: placeholder_blob_ref(), 528 558 state_vector: String::new(), 529 559 updates: vec![], ··· 799 829 fn placeholder_binary_entry_has_correct_kind() { 800 830 let entry = placeholder_binary_entry("abc123"); 801 831 assert_eq!(entry.kind, FileKind::Binary); 802 - assert_eq!(entry.content, "abc123"); 832 + assert_eq!(entry.content_hash, "abc123"); 803 833 assert!(entry.state_vector.is_empty()); 804 834 assert!(entry.updates.is_empty()); 805 835 } ··· 864 894 fn placeholder_binary_entry_fields() { 865 895 let entry = placeholder_binary_entry("deadbeef"); 866 896 assert_eq!(entry.kind, FileKind::Binary); 867 - assert_eq!(entry.content, "deadbeef"); 897 + assert_eq!(entry.content_hash, "deadbeef"); 868 898 assert!(entry.pack_ref.is_none()); 869 899 assert!(entry.conflict_source.is_none()); 870 900 assert_eq!(entry.updates_count, 0);
+129 -55
src/types.rs
··· 1 1 //! ATProto types for CRDT-on-PDS storage. 2 2 3 3 use serde::{Deserialize, Serialize}; 4 - use std::collections::HashMap; 4 + use std::collections::{HashMap, HashSet}; 5 5 6 6 /// Collection name for yrs repo records. 7 7 pub const COLLECTION: &str = "net.commoninternet.yrsrepo"; ··· 29 29 pub pds: Option<String>, 30 30 } 31 31 32 + /// The file index — maps relative paths to FileEntry metadata. 33 + /// Stored as a blob (not inline in the record) to avoid record size limits. 34 + pub type FileIndex = HashMap<String, FileEntry>; 35 + 32 36 /// A repo stored on PDS with Yrs CRDT state per file. 33 37 /// 34 38 /// `name` is the project name — shared across all devices/writers for the same project. 35 39 /// Each device gets its own rkey (auto-generated), while `name` identifies the project. 36 40 /// `collaborators` lists other device rkeys for the same project, enabling merge 37 41 /// without needing to list all records. 42 + /// 43 + /// File metadata is stored in a separate index blob (pointed to by `index`). 44 + /// `blobs` lists all blob refs that must be kept alive to prevent PDS garbage collection. 38 45 #[derive(Debug, Clone, Serialize, Deserialize)] 39 46 pub struct YrsRepo { 40 47 pub name: String, 41 - pub files: HashMap<String, FileEntry>, 48 + /// Pointer to the index blob within the pack (contains serialized FileIndex). 49 + pub index: PackRef, 50 + /// All blob CIDs referenced by this repo — prevents PDS garbage collection. 51 + pub blobs: Vec<BlobRef>, 42 52 #[serde(rename = "updatedAt")] 43 53 pub updated_at: String, 44 54 #[serde(default, skip_serializing_if = "Vec::is_empty")] 45 55 pub collaborators: Vec<Collaborator>, 46 56 } 47 57 48 - /// A single file's state, stored as Yrs CRDT + plain text. 58 + /// A single file's state, stored as Yrs CRDT. 49 59 #[derive(Debug, Clone, Serialize, Deserialize)] 50 60 pub struct FileEntry { 51 - /// Plain text content (always current, for portability). 52 - /// For binary files, this is empty or a hex hash. 53 - pub content: String, 61 + /// Hash of current content (FNV-1a hex). Used for change detection 62 + /// during save — avoids re-uploading unchanged files. 63 + /// For text: hash of UTF-8 content. For binary: hash of raw bytes. 64 + #[serde(rename = "contentHash")] 65 + pub content_hash: String, 54 66 /// Full Yrs state blob reference (encode_state_as_update_v1). 55 67 #[serde(rename = "snapshotBlob")] 56 68 pub snapshot_blob: BlobRef, ··· 66 78 /// When the snapshot was taken. 67 79 #[serde(rename = "snapshotAt")] 68 80 pub snapshot_at: String, 69 - /// File kind (text or binary). Defaults to text for backward compat. 70 - #[serde(default = "default_file_kind", skip_serializing_if = "is_text_kind")] 81 + /// File kind (text or binary). 71 82 pub kind: FileKind, 72 - /// Pack blob reference (when using pack blob uploads). 73 - #[serde(rename = "packRef", skip_serializing_if = "Option::is_none")] 83 + /// Pack blob reference — points to data within a pack blob. 84 + #[serde(rename = "packRef")] 74 85 pub pack_ref: Option<PackRef>, 75 86 /// For binary conflict files, the original path before conflict split. 76 87 #[serde(rename = "conflictSource", skip_serializing_if = "Option::is_none")] 77 88 pub conflict_source: Option<String>, 78 89 } 79 90 80 - fn default_file_kind() -> FileKind { 81 - FileKind::Text 82 - } 83 - 84 - fn is_text_kind(kind: &FileKind) -> bool { 85 - matches!(kind, FileKind::Text) 86 - } 87 91 88 92 /// Reference to data within a pack blob. 89 93 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] ··· 146 150 } 147 151 } 148 152 153 + /// Collect all unique BlobRefs from a FileIndex for GC prevention. 154 + pub fn collect_blob_refs(entries: &FileIndex) -> Vec<BlobRef> { 155 + let mut seen = HashSet::new(); 156 + let mut refs = Vec::new(); 157 + 158 + let add = |blob: &BlobRef, seen: &mut HashSet<String>, refs: &mut Vec<BlobRef>| { 159 + if seen.insert(blob.cid().to_string()) { 160 + refs.push(blob.clone()); 161 + } 162 + }; 163 + 164 + for entry in entries.values() { 165 + add(&entry.snapshot_blob, &mut seen, &mut refs); 166 + if let Some(ref pr) = entry.pack_ref { 167 + add(&pr.blob, &mut seen, &mut refs); 168 + if let Some(ref chunks) = pr.chunks { 169 + for chunk in chunks { 170 + add(chunk, &mut seen, &mut refs); 171 + } 172 + } 173 + } 174 + for update in &entry.updates { 175 + add(&update.blob, &mut seen, &mut refs); 176 + if let Some(ref chunks) = update.chunks { 177 + for chunk in chunks { 178 + add(chunk, &mut seen, &mut refs); 179 + } 180 + } 181 + } 182 + } 183 + refs 184 + } 185 + 149 186 /// Result of a save operation. 150 187 #[derive(Debug)] 151 188 pub struct SaveResult { ··· 182 219 #[test] 183 220 fn file_entry_serialization() { 184 221 let entry = FileEntry { 185 - content: "Hello world".to_string(), 222 + content_hash: "abc123def456".to_string(), 186 223 snapshot_blob: BlobRef::new( 187 224 "bafysnap".to_string(), 188 225 "application/octet-stream".to_string(), ··· 200 237 assert!(json.contains("\"snapshotBlob\"")); 201 238 assert!(json.contains("\"stateVector\"")); 202 239 assert!(!json.contains("updatesBlob")); // skipped when None 203 - assert!(!json.contains("kind")); // skipped when Text (default) 240 + assert!(json.contains("\"kind\":\"text\"")); // always serialized 204 241 let deserialized: FileEntry = serde_json::from_str(&json).unwrap(); 205 - assert_eq!(deserialized.content, "Hello world"); 242 + assert_eq!(deserialized.content_hash, "abc123def456"); 206 243 assert_eq!(deserialized.kind, FileKind::Text); 207 244 } 208 245 209 246 #[test] 210 247 fn binary_file_entry_serialization() { 211 248 let entry = FileEntry { 212 - content: String::new(), 249 + content_hash: String::new(), 213 250 snapshot_blob: BlobRef::new( 214 251 "bafybin".to_string(), 215 252 "application/octet-stream".to_string(), ··· 231 268 232 269 #[test] 233 270 fn yrs_repo_serialization() { 234 - let mut files = HashMap::new(); 235 - files.insert( 236 - "index.md".to_string(), 237 - FileEntry { 238 - content: "# Home".to_string(), 239 - snapshot_blob: BlobRef::new( 240 - "bafyindex".to_string(), 241 - "application/octet-stream".to_string(), 242 - 50, 243 - ), 244 - state_vector: "AQID".to_string(), 245 - updates: vec![], 246 - updates_count: 0, 247 - snapshot_at: "2026-03-13T00:00:00Z".to_string(), 248 - kind: FileKind::Text, 249 - pack_ref: None, 250 - conflict_source: None, 251 - }, 271 + let pack_blob = BlobRef::new( 272 + "bafypack".to_string(), 273 + "application/octet-stream".to_string(), 274 + 5000, 252 275 ); 276 + let index_ref = PackRef { 277 + blob: pack_blob.clone(), 278 + offset: 0, 279 + length: 200, 280 + compressed: false, 281 + chunks: None, 282 + }; 253 283 let record = YrsRepo { 254 284 name: "my-site".to_string(), 255 - files, 285 + index: index_ref, 286 + blobs: vec![pack_blob], 256 287 updated_at: "2026-03-13T00:00:00Z".to_string(), 257 288 collaborators: vec![], 258 289 }; 259 290 let json = serde_json::to_string(&record).unwrap(); 260 291 assert!(!json.contains("collaborators")); // empty vec is skipped 292 + assert!(json.contains("\"index\"")); // index field present 293 + assert!(json.contains("\"blobs\"")); // blobs field present 261 294 let deserialized: YrsRepo = serde_json::from_str(&json).unwrap(); 262 295 assert_eq!(deserialized.name, "my-site"); 263 - assert!(deserialized.files.contains_key("index.md")); 296 + assert_eq!(deserialized.blobs.len(), 1); 264 297 assert!(deserialized.collaborators.is_empty()); 265 298 } 266 299 267 300 #[test] 301 + fn collect_blob_refs_deduplicates() { 302 + let blob = BlobRef::new("bafyshared".to_string(), "application/octet-stream".to_string(), 100); 303 + let pack_ref = PackRef { 304 + blob: blob.clone(), offset: 0, length: 50, compressed: false, chunks: None, 305 + }; 306 + let mut index = FileIndex::new(); 307 + index.insert("a.txt".to_string(), FileEntry { 308 + content_hash: String::new(), 309 + snapshot_blob: blob.clone(), 310 + state_vector: String::new(), 311 + updates: vec![], updates_count: 0, 312 + snapshot_at: String::new(), kind: FileKind::Text, 313 + pack_ref: Some(pack_ref.clone()), conflict_source: None, 314 + }); 315 + index.insert("b.txt".to_string(), FileEntry { 316 + content_hash: String::new(), 317 + snapshot_blob: blob.clone(), 318 + state_vector: String::new(), 319 + updates: vec![], updates_count: 0, 320 + snapshot_at: String::new(), kind: FileKind::Text, 321 + pack_ref: Some(pack_ref), conflict_source: None, 322 + }); 323 + let refs = collect_blob_refs(&index); 324 + assert_eq!(refs.len(), 1, "same blob CID should be deduplicated"); 325 + assert_eq!(refs[0].cid(), "bafyshared"); 326 + } 327 + 328 + #[test] 329 + fn collect_blob_refs_includes_updates() { 330 + let pack_blob = BlobRef::new("bafypack1".to_string(), "application/octet-stream".to_string(), 100); 331 + let update_blob = BlobRef::new("bafypack2".to_string(), "application/octet-stream".to_string(), 200); 332 + let mut index = FileIndex::new(); 333 + index.insert("a.txt".to_string(), FileEntry { 334 + content_hash: String::new(), 335 + snapshot_blob: pack_blob.clone(), 336 + state_vector: String::new(), 337 + updates: vec![PackRef { 338 + blob: update_blob.clone(), offset: 0, length: 50, compressed: false, chunks: None, 339 + }], 340 + updates_count: 1, 341 + snapshot_at: String::new(), kind: FileKind::Text, 342 + pack_ref: Some(PackRef { 343 + blob: pack_blob.clone(), offset: 0, length: 100, compressed: false, chunks: None, 344 + }), 345 + conflict_source: None, 346 + }); 347 + let refs = collect_blob_refs(&index); 348 + assert_eq!(refs.len(), 2); 349 + let cids: Vec<&str> = refs.iter().map(|r| r.cid()).collect(); 350 + assert!(cids.contains(&"bafypack1")); 351 + assert!(cids.contains(&"bafypack2")); 352 + } 353 + 354 + #[test] 268 355 fn pack_ref_serialization() { 269 356 let pack_ref = PackRef { 270 357 blob: BlobRef::new( ··· 337 424 assert_eq!(deserialized.chunks.as_ref().unwrap().len(), 2); 338 425 } 339 426 340 - #[test] 341 - fn pack_ref_backward_compat_no_compressed_field() { 342 - // JSON without compressed or chunks fields should deserialize with defaults 343 - let json = r#"{ 344 - "blob": {"$type":"blob","ref":{"$link":"bafyold"},"mimeType":"application/octet-stream","size":100}, 345 - "offset": 0, 346 - "length": 50 347 - }"#; 348 - let pack_ref: PackRef = serde_json::from_str(json).unwrap(); 349 - assert!(!pack_ref.compressed); 350 - assert!(pack_ref.chunks.is_none()); 351 - } 352 - 353 - #[test] 427 + #[test] 354 428 fn chunked_blob_serialization() { 355 429 let chunked = ChunkedBlob { 356 430 parts: vec![
+14 -31
src/yrs_pds.rs
··· 7 7 use yrs::{Doc, GetString, ReadTxn, Text, Transact}; 8 8 9 9 use crate::pds_client::PdsClient; 10 - use crate::types::{FileEntry, FileKind}; 10 + use crate::types::{FileEntry, FileIndex, FileKind, YrsRepo}; 11 11 12 12 /// Create a Yrs Doc from text content. 13 13 pub fn doc_from_text(content: &str) -> Doc { ··· 84 84 client: &PdsClient, 85 85 did: &str, 86 86 ) -> Result<FileEntry, String> { 87 - let content = materialize(doc); 88 87 let snapshot = encode_snapshot(doc); 89 88 let sv = encode_state_vector(doc); 90 89 ··· 98 97 let _ = did; // used by caller for the record 99 98 100 99 Ok(FileEntry { 101 - content, 100 + content_hash: String::new(), 102 101 snapshot_blob, 103 102 state_vector: base64_encode(&sv), 104 103 updates: vec![], ··· 164 163 client: &PdsClient, 165 164 did: &str, 166 165 ) -> Result<Vec<u8>, String> { 167 - if let Some(ref pack_ref) = entry.pack_ref { 168 - // Fetch pack blob, reassembling chunks if needed 169 - let pack_data = if let Some(ref chunks) = pack_ref.chunks { 170 - let mut chunk_data = Vec::new(); 171 - for chunk_ref in chunks { 172 - chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); 173 - } 174 - crate::pack::reassemble_chunks(&chunk_data) 175 - } else { 176 - client.get_blob(did, pack_ref.blob.cid()).await? 177 - }; 178 - let start = pack_ref.offset as usize; 179 - let end = start + pack_ref.length as usize; 166 + let pack_ref = entry.pack_ref.as_ref() 167 + .ok_or("missing pack_ref on FileEntry")?; 168 + get_pack_ref_data(pack_ref, client, did).await 169 + } 180 170 181 - // Parse the pack (auto-detects gzip compression) 182 - let (_, blob_data) = crate::pack::parse_pack_auto(&pack_data)?; 183 - if end > blob_data.len() { 184 - return Err(format!( 185 - "pack_ref out of bounds: {}..{} in {} bytes", 186 - start, 187 - end, 188 - blob_data.len() 189 - )); 190 - } 191 - Ok(blob_data[start..end].to_vec()) 192 - } else { 193 - // Direct blob download 194 - client.get_blob(did, entry.snapshot_blob.cid()).await 195 - } 171 + /// Load the FileIndex from a YrsRepo's index blob. 172 + pub async fn load_file_index( 173 + repo: &YrsRepo, 174 + client: &PdsClient, 175 + did: &str, 176 + ) -> Result<FileIndex, String> { 177 + let data = get_pack_ref_data(&repo.index, client, did).await?; 178 + serde_json::from_slice(&data).map_err(|e| format!("parse FileIndex: {}", e)) 196 179 } 197 180 198 181 /// Base64 encode bytes.