wip: you can use your pds as a yrs-relay if you want to
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

efficiency improvements

notplants 559081ed 46b32727

+1468 -263
+60
Cargo.lock
··· 999 999 "chrono", 1000 1000 "clap", 1001 1001 "flate2", 1002 + "rand", 1002 1003 "reqwest", 1003 1004 "serde", 1004 1005 "serde_json", ··· 1039 1040 checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" 1040 1041 dependencies = [ 1041 1042 "zerovec", 1043 + ] 1044 + 1045 + [[package]] 1046 + name = "ppv-lite86" 1047 + version = "0.2.21" 1048 + source = "registry+https://github.com/rust-lang/crates.io-index" 1049 + checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" 1050 + dependencies = [ 1051 + "zerocopy", 1042 1052 ] 1043 1053 1044 1054 [[package]] ··· 1074 1084 version = "6.0.0" 1075 1085 source = "registry+https://github.com/rust-lang/crates.io-index" 1076 1086 checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" 1087 + 1088 + [[package]] 1089 + name = "rand" 1090 + version = "0.8.5" 1091 + source = "registry+https://github.com/rust-lang/crates.io-index" 1092 + checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 1093 + dependencies = [ 1094 + "libc", 1095 + "rand_chacha", 1096 + "rand_core", 1097 + ] 1098 + 1099 + [[package]] 1100 + name = "rand_chacha" 1101 + version = "0.3.1" 1102 + source = "registry+https://github.com/rust-lang/crates.io-index" 1103 + checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 1104 + dependencies = [ 1105 + "ppv-lite86", 1106 + "rand_core", 1107 + ] 1108 + 1109 + [[package]] 1110 + name = "rand_core" 1111 + version = "0.6.4" 1112 + source = "registry+https://github.com/rust-lang/crates.io-index" 1113 + checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 1114 + dependencies = [ 1115 + "getrandom 0.2.17", 1116 + ] 1077 1117 1078 1118 [[package]] 1079 1119 name = "redox_syscall" ··· 2067 2107 "smallstr", 2068 2108 "smallvec", 2069 2109 "thiserror", 2110 + ] 2111 + 2112 + [[package]] 2113 + name = "zerocopy" 2114 + version = "0.8.42" 2115 + source = "registry+https://github.com/rust-lang/crates.io-index" 2116 + checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" 2117 + dependencies = [ 2118 + "zerocopy-derive", 2119 + ] 2120 + 2121 + [[package]] 2122 + name = "zerocopy-derive" 2123 + version = "0.8.42" 2124 + source = "registry+https://github.com/rust-lang/crates.io-index" 2125 + checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" 2126 + dependencies = [ 2127 + "proc-macro2", 2128 + "quote", 2129 + "syn", 2070 2130 ] 2071 2131 2072 2132 [[package]]
+2 -1
Cargo.toml
··· 4 4 name = "pds-yrs" 5 5 version = "0.1.0" 6 6 edition = "2021" 7 - description = "Sync Yrs CRDT documents via AT Protocol PDS" 7 + description = "Sync Yrs CRDT documents via ATProto PDS" 8 8 license = "MIT" 9 9 10 10 [[bin]] ··· 21 21 chrono = { version = "0.4", features = ["serde"] } 22 22 similar = "2.6" 23 23 flate2 = "1.0" 24 + rand = "0.8" 24 25 25 26 [dev-dependencies] 26 27 tempfile = "3"
+180
architecture.md
··· 1 + # pds-yrs Architecture 2 + 3 + ## Overview 4 + 5 + pds-yrs stores [Yrs](https://docs.rs/yrs) CRDT documents on an ATProto PDS (Personal Data Server). Each file is tracked as a Yrs Doc, enabling conflict-free collaborative editing across multiple devices and writers — no git involved. 6 + 7 + ## Key Concepts 8 + 9 + ### Device-Aware Model 10 + 11 + Each device gets its own **rkey** (auto-generated as `{project}-{random8}`), stored in `.yrs/pdsyrs_config.json`. The `YrsRepo.name` field is the shared **project name**, not the rkey. This means two devices editing the same project create separate records that won't overwrite each other. 12 + 13 + On first initialization (`pds-yrs save --project my-wiki`): 14 + 1. A new device rkey is generated (e.g. `my-wiki-a3f7k2m9`) 15 + 2. `listRecords` discovers any existing repos for the same project 16 + 3. Our rkey is added as a **collaborator** to each existing repo's record 17 + 4. Those existing rkeys become our initial collaborators 18 + 19 + Subsequent saves skip `listRecords` entirely — the rkey is read from local config. 20 + 21 + ### YrsRepo 22 + 23 + A **YrsRepo** (`net.commoninternet.yrsrepo`) is a single ATProto record that holds one device/writer's CRDT state. It contains: 24 + 25 + - A `name` — project name (shared across all devices for the same project) 26 + - A `files` map of `FileEntry` structs (one per tracked file) 27 + - An `updatedAt` timestamp 28 + - A `collaborators` list of `Collaborator` references (rkey + optional PDS URL) 29 + 30 + A YrsRepo is **not** a complete picture of a collaborative project. To materialize the full state, you merge YrsRepos from all collaborators. 31 + 32 + ### Collaborators 33 + 34 + Each `Collaborator` has: 35 + - **`rkey`** — the rkey of another device's repo record 36 + - **`pds`** — optional PDS URL for cross-PDS collaboration (omitted when on the same PDS) 37 + 38 + Collaborators are registered automatically on first initialization and used by merge to discover peer rkeys without calling `listRecords`. Merge falls back to `listRecords` if no collaborators are found (e.g. when merging without a local config). 39 + 40 + ### FileEntry 41 + 42 + Each file in a YrsRepo has a `FileEntry` with: 43 + 44 + - **`content`** — Plain text of the current file contents. Always kept up-to-date so any client can read the data without Yrs decoding (the portability escape hatch — see Export). 45 + - **`snapshotBlob`** — BlobRef pointing to the full Yrs state (`encode_state_as_update_v1`). 46 + - **`stateVector`** — Base64-encoded Yrs state vector (reflects the state after snapshot + all updates applied). 47 + - **`updates`** — Ordered list of `PackRef`s, each pointing to an incremental Yrs update within a pack blob. These accumulate between compactions. 48 + - **`updatesCount`** — How many incremental saves have occurred since the last compaction. When any file in the repo reaches the compaction threshold (10), the entire repo is compacted. 49 + - **`kind`** — `text` (Yrs CRDT merge) or `binary` (raw blob, no CRDT). 50 + - **`packRef`** — Optional reference into a pack blob for the snapshot data (see below). 51 + 52 + ### Manifest 53 + 54 + The manifest is a special `FileEntry` stored at the key `pdsyrs_manifest`. It uses a Yrs Map (not Text) to track which files exist and their kinds. The manifest is itself CRDT-merged, so concurrent adds from different collaborators converge — "set wins over delete" semantics mean that if one writer deletes a file while another edits it, the edit wins. 55 + 56 + All internal keys use the `pdsyrs_` prefix to avoid collisions with user files. 57 + 58 + ### Local State (`.yrs/`) 59 + 60 + The `.yrs/` directory (analogous to `.git/`) caches Yrs Doc snapshots, state vectors, and snapshot CIDs between sync cycles. This enables incremental sync — only diffs since the last cycle need to be exchanged. 61 + 62 + Key files: 63 + - **`pdsyrs_config.json`** — device config: PDS URL, handle, DID, project name, and device rkey 64 + - **`pdsyrs_manifest.yrs`** / **`pdsyrs_manifest.sv`** — cached manifest CRDT state 65 + 66 + ## Pack Blobs 67 + 68 + ATProto limits the number of blob uploads per request. To work around this, pds-yrs bundles all file blobs from a single save into one **pack blob**. 69 + 70 + ### Format 71 + 72 + ``` 73 + [4 bytes: index length (u32 LE)] 74 + [index: JSON array of PackEntry] 75 + [blob data: concatenated file data] 76 + ``` 77 + 78 + Each `PackEntry` records a file's `path`, `offset`, `length`, and `data_type` (snapshot, update, or binary). After uploading the pack blob, each FileEntry gets a `PackRef` pointing into it — the blob CID plus offset/length to extract that file's data. 79 + 80 + ### Compression 81 + 82 + Packs are gzip-compressed before upload when beneficial. On load, `parse_pack_auto` detects gzip and decompresses transparently. 83 + 84 + ### Chunking 85 + 86 + ATProto has a per-blob size limit (~50MB). Packs larger than `CHUNK_SIZE` are split into chunks, each uploaded as a separate blob. The `PackRef.chunks` field holds the ordered list of chunk BlobRefs. On load, chunks are reassembled before parsing the pack. 87 + 88 + ### Pack Cache 89 + 90 + The loader and merger maintain a `pack_cache` keyed by CID. After compaction, all files share a single pack blob — one download serves all files. Between compactions, the repo may reference multiple pack blobs (the snapshot pack + update packs from subsequent saves). Each is downloaded once and cached. Since packs are immutable, previously cached packs remain valid across sync cycles. 91 + 92 + The merge path uses the same pack cache pattern, reducing O(N × sites) blob downloads to O(pack_blobs × sites). For a typical repo with one pack blob per site, merging 200 files × 2 sites requires ~4 blob downloads instead of ~400. 93 + 94 + ## Operations 95 + 96 + ### Save (incremental) 97 + 98 + 1. Resolve device rkey from `.yrs/pdsyrs_config.json` (or generate on first init). 99 + 2. On first init: discover peers via `listRecords`, register as collaborator. 100 + 3. Collect local files (with optional include/exclude glob filters). 101 + 4. Fetch the existing YrsRepo record from PDS (if any). 102 + 5. For each file: 103 + - **Unchanged**: reuse the existing FileEntry as-is (no upload). 104 + - **Changed text, below compaction threshold**: reconstruct the Yrs Doc from the existing snapshot + accumulated updates, apply a character-level diff (`similar` crate, Patience algorithm), then compute `encode_diff_v1(old_state_vector)` — only the new ops, not the full state. The diff goes into the pending pack as a `PackDataType::Update`. The FileEntry keeps its existing `packRef` (snapshot stays in the old pack) and a new `PackRef` is appended to `updates`. 105 + - **New text file**: full snapshot (no existing state to diff against). 106 + - **Binary**: store raw bytes; change detection uses FNV-1a hash. 107 + 6. Detect deletions: files in manifest but missing from disk are removed. 108 + 7. Bundle all pending diffs/snapshots into a single compressed pack, upload it, wire up `PackRef`s. 109 + 8. Preserve collaborators from existing record (merge with any new ones on first init). 110 + 9. Write the YrsRepo record to PDS via `putRecord` (with `swapCommit` for optimistic concurrency). 111 + 112 + Over successive saves, unchanged files keep pointing to their original pack blob. Only the changed files get new update entries in the new pack. 113 + 114 + #### Compaction 115 + 116 + When any file's `updatesCount` reaches the threshold (default 10), the entire repo is compacted: every file gets a fresh full snapshot in a single new pack blob. All `updates` lists are cleared, `updatesCount` resets to 0, and all old pack blobs become unreferenced (eligible for PDS garbage collection). 117 + 118 + ### Load 119 + 120 + 1. Resolve device rkey from `.yrs/pdsyrs_config.json`. 121 + 2. Fetch the YrsRepo record. 122 + 3. If a manifest exists, only load files listed in it. 123 + 4. For each text file: 124 + - Extract the snapshot data from its `packRef` pack blob (using the pack cache). 125 + - Reconstruct the Yrs Doc from the snapshot. 126 + - Apply each update in `updates` order — each may be in a different pack blob (all cached by CID). 127 + - Materialize text and write to disk. 128 + 5. For binary files: extract raw bytes from the pack and write to disk. 129 + 130 + ### Merge 131 + 132 + Merging combines YrsRepos from multiple devices/collaborators: 133 + 134 + 1. Resolve rkeys: if own rkey is known, read collaborators from our record. Otherwise fall back to `listRecords`. 135 + 2. Fetch all YrsRepo records by rkey. 136 + 3. CRDT-merge the manifest Maps (concurrent adds converge, set-wins-over-delete). 137 + 4. For each file in the merged manifest (using pack cache to avoid redundant downloads): 138 + - **Text**: CRDT-merge all Yrs Docs. Each doc's state is diffed against the merged state vector and applied — the result is conflict-free by construction. 139 + - **Binary**: compare CIDs. If all repos share the same CID, no conflict. If CIDs differ, create conflict files named `file.reponame.ext`. 140 + 141 + ### Sync 142 + 143 + A polling loop that alternates load and save cycles, using the PDS as a relay when peers can't connect directly (WebRTC fallback). Auto-generates device rkey on first init. Configurable poll interval and optional periodic materialization. 144 + 145 + ### Export 146 + 147 + The data portability escape hatch. Reads only the `content` field from each FileEntry — no Yrs decoding or blob downloads needed. Works even without the Yrs library. 148 + 149 + ### List 150 + 151 + Lists all repos stored on PDS, showing project name, rkey, last update time, file count, and collaborator count. Can be filtered by project name. 152 + 153 + ## CLI 154 + 155 + | Command | Key flags | Description | 156 + |---------|-----------|-------------| 157 + | `save` | `--project`, `--dir` | Save directory to PDS (auto-generates device rkey) | 158 + | `load` | `--project`, `--output` | Load from PDS using device rkey from local config | 159 + | `merge` | `--project`, `--output`, `--dir` (optional) | Merge all devices for a project | 160 + | `sync` | `--project`, `--dir`, `--interval` | Real-time sync loop | 161 + | `export` | `--project`, `--output` | Plain-text export | 162 + | `list` | `--project` (optional) | List repos on PDS | 163 + 164 + All commands use `--project` (the shared project name) instead of raw rkeys. Device rkeys are managed automatically. 165 + 166 + ## Module Map 167 + 168 + | Module | Responsibility | 169 + |--------|---------------| 170 + | `types.rs` | `YrsRepo`, `FileEntry`, `Collaborator`, `PackRef`, `BlobRef` structs | 171 + | `yrs_pds.rs` | Yrs Doc ↔ FileEntry conversion, manifest Map helpers, base64, file kind detection | 172 + | `pack.rs` | Pack blob creation, parsing, compression, chunking | 173 + | `save.rs` | Directory → PDS (with diffing, compaction, pack upload, collaborator merge) | 174 + | `load.rs` | PDS → directory (with pack cache) | 175 + | `merge.rs` | Multi-repo CRDT merge (with pack cache) | 176 + | `sync.rs` | Polling sync loop | 177 + | `export.rs` | Plain-text export (no Yrs needed) | 178 + | `local_state.rs` | `.yrs/` state directory, config, device rkey generation | 179 + | `pds_client.rs` | ATProto HTTP client (auth, records, blobs, listRecords) | 180 + | `main.rs` | CLI (`save`, `load`, `merge`, `sync`, `export`, `list` subcommands) |
+2 -2
plans/improvements-v1.md
··· 2 2 3 3 ## Context 4 4 5 - pds-yrs is a Rust crate that syncs Yrs CRDT documents to/from AT Protocol PDS. The benchmark work proved pure CRDT merge is both the fastest and most correct approach (2ms for 10 files, 81ms for 200 files with guaranteed conflicts, zero conflicts ever). Now we need to harden pds-yrs for real-world use. 5 + pds-yrs is a Rust crate that syncs Yrs CRDT documents to/from ATProto PDS. The benchmark work proved pure CRDT merge is both the fastest and most correct approach (2ms for 10 files, 81ms for 200 files with guaranteed conflicts, zero conflicts ever). Now we need to harden pds-yrs for real-world use. 6 6 7 7 Current limitations: text-only files, no binary support, no rate limit handling, no blob batching, no compression, no token refresh, no chunking for large files, no file deletion support, and batch-only (no real-time sync). 8 8 ··· 146 146 - FileEntry's `updates_blob` becomes actively used (not just for compaction) 147 147 - Add `last_synced_at: Option<String>` timestamp per file 148 148 149 - **Optimization — AT Protocol Event Stream**: 149 + **Optimization — ATProto Event Stream**: 150 150 - Instead of polling, subscribe to the PDS firehose for record changes 151 151 - `com.atproto.sync.subscribeRepos` WebSocket — get notified when the SiteRecord changes 152 152 - Reduces latency from poll-interval to near-real-time (~100-500ms)
+1 -1
plans/improvements-v3-crdt-manifest-final.md
··· 2 2 3 3 ## Context 4 4 5 - pds-yrs syncs Yrs CRDT documents to/from AT Protocol PDS. Benchmark proved pure CRDT merge is fastest and most correct. Now hardening for real-world use. 5 + pds-yrs syncs Yrs CRDT documents to/from ATProto PDS. Benchmark proved pure CRDT merge is fastest and most correct. Now hardening for real-world use. 6 6 7 7 Key design decisions: 8 8 - **Manifest**: Yrs Map (keys = file paths, values = file kind). CRDT operations at key granularity — no character interleaving.
+156
plans/incremental-packs.md
··· 1 + # Plan: Incremental Pack Saves 2 + 3 + ## Problem 4 + 5 + Currently, every save re-uploads **all changed files as full snapshots** in a single pack blob. Even if only one file out of 200 changed, that file's entire Yrs state is re-encoded and uploaded. The "incremental" path in the code (`updatesCount < COMPACTION_THRESHOLD`) still produces a full snapshot — it just preserves CRDT history by reconstructing the doc before diffing. Previous pack blobs become orphaned since no FileEntry references them anymore. 6 + 7 + This is wasteful for the common case: a user edits one file, saves, edits another file, saves, etc. Each save should only upload the diff for changed files. 8 + 9 + ## Design 10 + 11 + ### Core idea 12 + 13 + Each save uploads a **small pack containing only Yrs diffs** for changed files. Unchanged files keep their existing `pack_ref` pointing to the previous pack blob. Over time, a YrsRepo accumulates references to multiple pack blobs. Periodically, a **compaction** pass merges everything into a single pack with fresh full snapshots. 14 + 15 + ### Current data model (what we have) 16 + 17 + ```rust 18 + pub struct FileEntry { 19 + pub content: String, // plain text (always current) 20 + pub snapshot_blob: BlobRef, // full Yrs state blob 21 + pub state_vector: String, // base64 state vector 22 + pub updates_blob: Option<BlobRef>, // unused in practice 23 + pub updates_count: u32, // tracks edits, but always resets 24 + pub snapshot_at: String, 25 + pub kind: FileKind, 26 + pub pack_ref: Option<PackRef>, // single pack ref 27 + pub conflict_source: Option<String>, 28 + } 29 + ``` 30 + 31 + ### New data model 32 + 33 + ```rust 34 + pub struct FileEntry { 35 + pub content: String, 36 + pub snapshot_blob: BlobRef, // points to pack containing last full snapshot 37 + pub state_vector: String, // current state vector (after all updates applied) 38 + pub updates: Vec<PackRef>, // ordered list of incremental update packs since snapshot 39 + pub updates_count: u32, // total number of incremental saves since last compaction 40 + pub snapshot_at: String, 41 + pub kind: FileKind, 42 + pub pack_ref: Option<PackRef>, // where to find the snapshot within its pack blob 43 + pub conflict_source: Option<String>, 44 + } 45 + ``` 46 + 47 + Key change: `updates_blob: Option<BlobRef>` becomes `updates: Vec<PackRef>`. Each incremental save appends a new PackRef. The snapshot stays put until compaction. 48 + 49 + ### Save flow (new) 50 + 51 + 1. Collect local files, fetch existing YrsRepo record. 52 + 2. For each file: 53 + - **Unchanged**: reuse existing FileEntry as-is (no upload). 54 + - **Changed text**: 55 + - Reconstruct the Yrs Doc from the existing snapshot + all accumulated updates. 56 + - Apply the local text diff to get the new Doc state. 57 + - Compute `encode_diff_v1(existing_state_vector)` — this is just the new ops, not the full state. 58 + - Add the diff bytes to the pending pack. 59 + - Update the FileEntry: append a new PackRef to `updates`, increment `updates_count`, update `state_vector` and `content`. 60 + - Keep `snapshot_blob` and `pack_ref` pointing to the existing snapshot pack. 61 + - **New text file**: full snapshot (no existing state to diff against). Goes into pending pack. 62 + - **Changed binary**: full blob in pending pack (no CRDT diffing for binary). 63 + - **Deleted**: remove from manifest. 64 + 3. Bundle all pending diffs/blobs into a single new pack blob, upload it. 65 + 4. Wire up PackRefs for each changed file's new update entry. 66 + 5. Write the YrsRepo record via `putRecord`. 67 + 68 + ### Load flow (updated) 69 + 70 + 1. Fetch YrsRepo record. 71 + 2. For each file: 72 + - Download the snapshot pack (if not cached), extract snapshot data via `pack_ref`. 73 + - Reconstruct Doc from snapshot. 74 + - For each entry in `updates`: download that pack (if not cached), extract the update data, apply it to the Doc. 75 + - Materialize text and write to disk. 76 + 3. Pack cache keyed by CID avoids redundant downloads — multiple files may share the same pack blob, and previous packs stay referenced across saves. 77 + 78 + ### Compaction 79 + 80 + When `updates_count >= COMPACTION_THRESHOLD` for **any** file, the entire YrsRepo is compacted: 81 + 82 + 1. For every file: reconstruct full Doc (snapshot + all updates), encode a fresh full snapshot. 83 + 2. Bundle all fresh snapshots into a single new pack blob. 84 + 3. Reset all FileEntries: `pack_ref` points to the new pack, `updates` cleared, `updates_count` reset to 0. 85 + 4. Upload the compacted pack and write the YrsRepo record. 86 + 87 + After compaction, old pack blobs are no longer referenced and will be garbage-collected by the PDS. 88 + 89 + Compaction is **per-repo, not per-file**. When any file crosses the threshold, everything gets compacted together. This keeps the logic simple and ensures pack blob count stays bounded. 90 + 91 + ### Why per-repo compaction (not per-file) 92 + 93 + Per-file compaction would mean some files get new snapshots while others keep referencing old packs. The number of distinct pack blobs referenced by a YrsRepo would grow without bound — each old pack stays alive because at least one file still references it. Per-repo compaction produces exactly one pack blob, and all old packs become unreferenced simultaneously. 94 + 95 + ## Files to modify 96 + 97 + ### `src/types.rs` 98 + 99 + - Replace `updates_blob: Option<BlobRef>` with `updates: Vec<PackRef>` 100 + - Add `#[serde(default, skip_serializing_if = "Vec::is_empty")]` on `updates` 101 + - Remove the `updates_blob` serde attributes 102 + 103 + ### `src/save.rs` 104 + 105 + - **Incremental path** (changed file, below threshold): 106 + - Reconstruct Doc from snapshot + existing updates (not just snapshot) 107 + - Compute diff via `encode_diff_v1` instead of full `encode_snapshot` 108 + - Use `PackDataType::Update` for the pending blob 109 + - Preserve existing `pack_ref` and `snapshot_blob`; append new entry to `updates` 110 + - **Compaction path** (any file crosses threshold, or first save): 111 + - Encode fresh full snapshots for ALL files 112 + - Clear `updates` on all FileEntries 113 + - Reset `updates_count` to 0 114 + - **Wire-up after pack upload**: differentiate between snapshot entries and update entries when assigning PackRefs 115 + 116 + ### `src/load.rs` 117 + 118 + - After extracting snapshot data, iterate over `entry.updates` and apply each one 119 + - Each update may be in a different pack blob — use pack cache 120 + 121 + ### `src/yrs_pds.rs` 122 + 123 + - `file_entry_to_doc()`: apply all entries from `updates` (not just a single `updates_blob`) 124 + - `get_file_blob_data()`: no longer needs to handle `updates_blob` separately — caller handles the updates list 125 + 126 + ### `src/merge.rs` 127 + 128 + - `merge_text_file()` calls `file_entry_to_doc()` which handles updates — no direct changes needed 129 + - Verify that after merge, the output FileEntries have clean state (compacted) 130 + 131 + ### `src/export.rs` 132 + 133 + - No changes needed — export reads `content` field only 134 + 135 + ### `src/local_state.rs` 136 + 137 + - No changes needed — local state tracks full Doc snapshots independently 138 + 139 + ## Verification 140 + 141 + ### Unit tests 142 + 143 + 1. **Incremental save produces diff, not snapshot**: save a file, modify it, save again — verify the second save's pending blob is `PackDataType::Update` and is smaller than a full snapshot 144 + 2. **Multiple incremental saves accumulate updates**: save 3 times with edits — verify FileEntry has `updates.len() == 2` (first save is snapshot, next two are updates) 145 + 3. **Compaction resets everything**: push past threshold — verify `updates` is empty and `updates_count` is 0 after compaction 146 + 4. **Load with accumulated updates**: create a FileEntry with snapshot + 3 update PackRefs — verify the loaded Doc has all edits applied in order 147 + 5. **Round-trip**: save → edit → save → edit → save → load — verify final content matches 148 + 149 + ### Integration tests (with mock PDS) 150 + 151 + 6. **Incremental pack is small**: save 10 files, edit 1, save again — verify the second pack blob is much smaller than the first 152 + 7. **Pack cache hit**: load a repo with 10 files sharing one pack — verify only one blob download 153 + 8. **Compaction triggers correctly**: save 51 times with single-file edits — verify compaction happens on save 51 154 + ### Property tests 155 + 156 + 10. **Content equivalence**: for any sequence of edits, the materialized content after incremental saves must equal the content after a single full-snapshot save
+8 -8
src/export.rs
··· 1 - //! Export site content from PDS as plain text files. 1 + //! Export repo content from PDS as plain text files. 2 2 //! 3 3 //! This is the data portability escape hatch — reads the `content` field 4 4 //! from each FileEntry without requiring Yrs decoding. ··· 6 6 use std::path::Path; 7 7 8 8 use crate::pds_client::PdsClient; 9 - use crate::types::{SiteRecord, COLLECTION}; 9 + use crate::types::{YrsRepo, COLLECTION}; 10 10 11 - /// Export a site from PDS to plain text files. 11 + /// Export a repo from PDS to plain text files. 12 12 /// 13 13 /// Reads only the `content` field from each FileEntry — no Yrs 14 14 /// decoding or blob downloads needed. This works even if the Yrs ··· 20 20 output_dir: &Path, 21 21 verbose: bool, 22 22 ) -> Result<usize, String> { 23 - // Fetch site record 23 + // Fetch repo record 24 24 let record = client 25 25 .get_record(did, COLLECTION, rkey) 26 26 .await? 27 - .ok_or_else(|| format!("site record not found: {}", rkey))?; 27 + .ok_or_else(|| format!("repo record not found: {}", rkey))?; 28 28 29 - let site: SiteRecord = 30 - serde_json::from_value(record.value).map_err(|e| format!("parse SiteRecord: {}", e))?; 29 + let repo: YrsRepo = 30 + serde_json::from_value(record.value).map_err(|e| format!("parse YrsRepo: {}", e))?; 31 31 32 32 let mut files_exported = 0; 33 33 34 - for (rel_path, entry) in &site.files { 34 + for (rel_path, entry) in &repo.files { 35 35 if verbose { 36 36 eprintln!("pds-yrs: export {}", rel_path); 37 37 }
+3 -3
src/lib.rs
··· 1 - //! pds-yrs: Sync Yrs CRDT documents via AT Protocol PDS. 1 + //! pds-yrs: Sync Yrs CRDT documents via ATProto PDS. 2 2 //! 3 3 //! No git involved — files are stored as Yrs Doc state on the PDS, 4 4 //! with plain text content alongside for portability. ··· 17 17 pub use export::export; 18 18 pub use load::load; 19 19 pub use local_state::LocalState; 20 - pub use merge::merge_sites; 20 + pub use merge::{merge_project, merge_repos}; 21 21 pub use pack::{ 22 22 chunk_data, compress, create_compressed_pack, create_pack, decompress, extract_entry, is_gzip, 23 23 is_precompressed_extension, parse_pack, parse_pack_auto, reassemble_chunks, PackBlob, ··· 26 26 pub use pds_client::PdsClient; 27 27 pub use save::{save, save_filtered}; 28 28 pub use sync::{sync_loop, SyncConfig, SyncCycleResult}; 29 - pub use types::{FileKind, COLLECTION, MANIFEST_KEY}; 29 + pub use types::{Collaborator, FileKind, COLLECTION, MANIFEST_KEY};
+62 -16
src/load.rs
··· 1 - //! Load a site from PDS into a local directory. 1 + //! Load a repo from PDS into a local directory. 2 2 3 3 use std::collections::HashMap; 4 4 use std::path::Path; 5 5 6 6 use crate::pds_client::PdsClient; 7 - use crate::types::{FileKind, LoadResult, SiteRecord, COLLECTION, MANIFEST_KEY}; 7 + use crate::types::{FileKind, LoadResult, YrsRepo, COLLECTION, MANIFEST_KEY}; 8 8 use crate::yrs_pds; 9 9 10 - /// Load a site from PDS into a directory. 10 + /// Load a repo from PDS into a directory. 11 11 /// 12 12 /// If a manifest exists, only loads files listed in the manifest. 13 13 /// Supports both text (Yrs CRDT) and binary files. ··· 19 19 output_dir: &Path, 20 20 verbose: bool, 21 21 ) -> Result<LoadResult, String> { 22 - // Fetch site record 22 + // Fetch repo record 23 23 let record = client 24 24 .get_record(did, COLLECTION, rkey) 25 25 .await? 26 - .ok_or_else(|| format!("site record not found: {}", rkey))?; 26 + .ok_or_else(|| format!("repo record not found: {}", rkey))?; 27 27 28 - let site: SiteRecord = 29 - serde_json::from_value(record.value).map_err(|e| format!("parse SiteRecord: {}", e))?; 28 + let repo: YrsRepo = 29 + serde_json::from_value(record.value).map_err(|e| format!("parse YrsRepo: {}", e))?; 30 30 31 31 // Determine which files to load 32 - let file_list = if let Some(manifest_entry) = site.files.get(MANIFEST_KEY) { 32 + let file_list = if let Some(manifest_entry) = repo.files.get(MANIFEST_KEY) { 33 33 let manifest_doc = yrs_pds::file_entry_to_doc(manifest_entry, client, did).await?; 34 34 let entries = yrs_pds::manifest_entries(&manifest_doc); 35 35 if verbose { ··· 46 46 // Cache for pack blobs (keyed by CID) to avoid re-downloading 47 47 let mut pack_cache: HashMap<String, Vec<u8>> = HashMap::new(); 48 48 49 - for (rel_path, entry) in &site.files { 49 + for (rel_path, entry) in &repo.files { 50 50 if rel_path == MANIFEST_KEY { 51 51 continue; 52 52 } ··· 95 95 ) 96 96 .await?; 97 97 let doc = yrs_pds::doc_from_snapshot(&snapshot_data)?; 98 - if let Some(ref updates_blob) = entry.updates_blob { 99 - let updates_data = client.get_blob(did, updates_blob.cid()).await?; 100 - blobs_downloaded += 1; 101 - yrs_pds::apply_update(&doc, &updates_data)?; 98 + // Apply incremental updates from pack refs 99 + for update_ref in &entry.updates { 100 + let update_data = get_pack_ref_data_cached( 101 + update_ref, 102 + client, 103 + did, 104 + &mut pack_cache, 105 + &mut blobs_downloaded, 106 + ) 107 + .await?; 108 + yrs_pds::apply_update(&doc, &update_data)?; 102 109 } 103 110 let content = yrs_pds::materialize(&doc); 104 111 std::fs::write(&output_path, &content) ··· 107 114 // Direct blob download (no pack ref) 108 115 let doc = yrs_pds::file_entry_to_doc(entry, client, did).await?; 109 116 blobs_downloaded += 1; 110 - if entry.updates_blob.is_some() { 111 - blobs_downloaded += 1; 112 - } 117 + blobs_downloaded += entry.updates.len(); 113 118 let content = yrs_pds::materialize(&doc); 114 119 std::fs::write(&output_path, &content) 115 120 .map_err(|e| format!("write {:?}: {}", output_path, e))?; ··· 131 136 files_loaded, 132 137 blobs_downloaded, 133 138 }) 139 + } 140 + 141 + /// Extract data from a PackRef, using pack cache when available. 142 + async fn get_pack_ref_data_cached( 143 + pack_ref: &crate::types::PackRef, 144 + client: &PdsClient, 145 + did: &str, 146 + pack_cache: &mut HashMap<String, Vec<u8>>, 147 + blobs_downloaded: &mut usize, 148 + ) -> Result<Vec<u8>, String> { 149 + let cid = pack_ref.blob.cid().to_string(); 150 + 151 + if !pack_cache.contains_key(&cid) { 152 + let data = if let Some(ref chunks) = pack_ref.chunks { 153 + let mut chunk_data = Vec::new(); 154 + for chunk_ref in chunks { 155 + let chunk = client.get_blob(did, chunk_ref.cid()).await?; 156 + *blobs_downloaded += 1; 157 + chunk_data.push(chunk); 158 + } 159 + crate::pack::reassemble_chunks(&chunk_data) 160 + } else { 161 + let d = client.get_blob(did, &cid).await?; 162 + *blobs_downloaded += 1; 163 + d 164 + }; 165 + pack_cache.insert(cid.clone(), data); 166 + } 167 + 168 + let pack_data = pack_cache.get(&cid).unwrap(); 169 + let (_, blob_data) = crate::pack::parse_pack_auto(pack_data)?; 170 + 171 + let start = pack_ref.offset as usize; 172 + let end = start + pack_ref.length as usize; 173 + if end > blob_data.len() { 174 + return Err(format!( 175 + "pack_ref out of bounds: {}..{} in {} bytes", 176 + start, end, blob_data.len() 177 + )); 178 + } 179 + Ok(blob_data[start..end].to_vec()) 134 180 } 135 181 136 182 /// Get blob data for a file entry, using pack cache when available.
+85 -13
src/local_state.rs
··· 1 - //! Local state management for `.pds-yrs/` directory. 1 + //! Local state management for `.yrs/` directory. 2 2 //! 3 3 //! Persists Yrs Doc state and state vectors between save/load cycles, 4 4 //! enabling incremental sync (only exchange diffs since last sync). 5 5 6 6 use std::path::{Path, PathBuf}; 7 7 8 + use rand::Rng; 8 9 use yrs::Doc; 9 10 10 11 use crate::yrs_pds; 11 12 12 13 /// Local state directory name (like `.git/`). 13 - const STATE_DIR: &str = ".pds-yrs"; 14 + const STATE_DIR: &str = ".yrs"; 14 15 15 - /// Manages the `.pds-yrs/` local state directory. 16 + /// Manages the `.yrs/` local state directory. 16 17 pub struct LocalState { 17 18 state_dir: PathBuf, 18 19 } 19 20 20 - /// Configuration stored in `.pds-yrs/_config.json`. 21 + /// Configuration stored in `.yrs/pdsyrs_config.json`. 21 22 #[derive(Debug, serde::Serialize, serde::Deserialize)] 22 23 pub struct LocalConfig { 23 24 pub pds_url: String, 24 25 pub handle: String, 25 - pub site_rkey: String, 26 26 pub did: String, 27 + pub project: String, 28 + pub repo_rkey: String, 29 + /// Cached access token (avoids re-login on every invocation). 30 + #[serde(default, skip_serializing_if = "Option::is_none")] 31 + pub access_token: Option<String>, 32 + /// Cached refresh token (used to get a new access token when expired). 33 + #[serde(default, skip_serializing_if = "Option::is_none")] 34 + pub refresh_token: Option<String>, 27 35 } 28 36 29 37 impl LocalState { ··· 44 52 45 53 /// Save config. 46 54 pub fn save_config(&self, config: &LocalConfig) -> Result<(), String> { 47 - let path = self.state_dir.join("_config.json"); 55 + let path = self.state_dir.join("pdsyrs_config.json"); 48 56 let json = 49 57 serde_json::to_string_pretty(config).map_err(|e| format!("serialize config: {}", e))?; 50 58 std::fs::write(&path, json).map_err(|e| format!("write config: {}", e)) ··· 52 60 53 61 /// Load config. 54 62 pub fn load_config(&self) -> Result<Option<LocalConfig>, String> { 55 - let path = self.state_dir.join("_config.json"); 63 + let path = self.state_dir.join("pdsyrs_config.json"); 56 64 if !path.exists() { 57 65 return Ok(None); 58 66 } ··· 62 70 Ok(Some(config)) 63 71 } 64 72 73 + /// Save tokens to config (updates existing config in place). 74 + pub fn save_tokens( 75 + &self, 76 + access_token: &str, 77 + refresh_token: &str, 78 + ) -> Result<(), String> { 79 + if let Some(mut config) = self.load_config()? { 80 + config.access_token = Some(access_token.to_string()); 81 + config.refresh_token = Some(refresh_token.to_string()); 82 + self.save_config(&config)?; 83 + } 84 + Ok(()) 85 + } 86 + 65 87 /// Save a Yrs Doc's full state for a file path. 66 88 pub fn save_doc_state(&self, rel_path: &str, doc: &Doc) -> Result<(), String> { 67 89 let yrs_path = self.doc_path(rel_path); ··· 107 129 108 130 /// Save the manifest Doc state. 109 131 pub fn save_manifest(&self, doc: &Doc) -> Result<(), String> { 110 - self.save_doc_state("_manifest", doc) 132 + self.save_doc_state("pdsyrs_manifest", doc) 111 133 } 112 134 113 135 /// Load the manifest Doc. 114 136 pub fn load_manifest(&self) -> Result<Option<Doc>, String> { 115 - let yrs_path = self.doc_path("_manifest"); 137 + let yrs_path = self.doc_path("pdsyrs_manifest"); 116 138 if !yrs_path.exists() { 117 139 return Ok(None); 118 140 } ··· 136 158 Ok(()) 137 159 } 138 160 139 - /// List all file paths that have local state (excluding _manifest and _config). 161 + /// Ensure a device rkey exists for a project. 162 + /// 163 + /// If `pdsyrs_config.json` exists and the project matches, returns `(rkey, false)`. 164 + /// Otherwise generates `{project}-{random8}`, saves config, and returns `(rkey, true)`. 165 + /// The boolean indicates whether this is a newly created rkey (first initialization). 166 + pub fn ensure_device_rkey( 167 + &self, 168 + project: &str, 169 + pds_url: &str, 170 + handle: &str, 171 + did: &str, 172 + ) -> Result<(String, bool), String> { 173 + if let Some(config) = self.load_config()? { 174 + if config.project == project { 175 + return Ok((config.repo_rkey, false)); 176 + } 177 + } 178 + let suffix = generate_random_suffix(); 179 + let rkey = format!("{}-{}", project, suffix); 180 + let config = LocalConfig { 181 + pds_url: pds_url.to_string(), 182 + handle: handle.to_string(), 183 + did: did.to_string(), 184 + project: project.to_string(), 185 + repo_rkey: rkey.clone(), 186 + access_token: None, 187 + refresh_token: None, 188 + }; 189 + self.save_config(&config)?; 190 + Ok((rkey, true)) 191 + } 192 + 193 + /// List all file paths that have local state (excluding internal pdsyrs_ files). 140 194 pub fn list_tracked_files(&self) -> Result<Vec<String>, String> { 141 195 let mut files = Vec::new(); 142 196 self.collect_tracked(&self.state_dir, &self.state_dir, &mut files)?; ··· 197 251 // Strip .yrs extension 198 252 let rel = rel.trim_end_matches(".yrs").to_string(); 199 253 // Skip internal files 200 - if !rel.starts_with('_') { 254 + if !rel.starts_with("pdsyrs_") { 201 255 files.push(rel); 202 256 } 203 257 } ··· 205 259 } 206 260 Ok(()) 207 261 } 262 + } 263 + 264 + /// Generate 8 random lowercase alphanumeric characters for device rkey suffix. 265 + fn generate_random_suffix() -> String { 266 + let mut rng = rand::thread_rng(); 267 + let chars: Vec<char> = (0..8) 268 + .map(|_| { 269 + let idx = rng.gen_range(0..36); 270 + if idx < 10 { 271 + (b'0' + idx) as char 272 + } else { 273 + (b'a' + idx - 10) as char 274 + } 275 + }) 276 + .collect(); 277 + chars.into_iter().collect() 208 278 } 209 279 210 280 #[cfg(test)] ··· 256 326 let config = LocalConfig { 257 327 pds_url: "https://pds.example.com".to_string(), 258 328 handle: "user.example.com".to_string(), 259 - site_rkey: "my-site".to_string(), 260 329 did: "did:plc:abc123".to_string(), 330 + project: "my-site".to_string(), 331 + repo_rkey: "my-site-a1b2c3d4".to_string(), 261 332 }; 262 333 state.save_config(&config).unwrap(); 263 334 264 335 let loaded = state.load_config().unwrap().unwrap(); 265 336 assert_eq!(loaded.pds_url, "https://pds.example.com"); 266 - assert_eq!(loaded.site_rkey, "my-site"); 337 + assert_eq!(loaded.project, "my-site"); 338 + assert_eq!(loaded.repo_rkey, "my-site-a1b2c3d4"); 267 339 } 268 340 269 341 #[test]
+306 -45
src/main.rs
··· 4 4 #[derive(Parser)] 5 5 #[command( 6 6 name = "pds-yrs", 7 - about = "Sync Yrs CRDT documents via AT Protocol PDS" 7 + about = "Sync Yrs CRDT documents via ATProto PDS" 8 8 )] 9 9 struct Cli { 10 10 #[command(subcommand)] ··· 18 18 /// Directory to save 19 19 #[arg(long)] 20 20 dir: String, 21 - /// AT Protocol handle 21 + /// ATProto handle 22 22 #[arg(long)] 23 23 handle: String, 24 - /// Site name (used as rkey) 24 + /// Project name (shared across devices) 25 25 #[arg(long)] 26 - site: String, 26 + project: String, 27 27 /// Password for authentication 28 28 #[arg(long)] 29 29 password: String, ··· 40 40 #[arg(long)] 41 41 verbose: bool, 42 42 }, 43 - /// Load a site from PDS into a local directory 43 + /// Load a repo from PDS into a local directory 44 44 Load { 45 - /// AT Protocol handle 45 + /// ATProto handle 46 46 #[arg(long)] 47 47 handle: String, 48 - /// Site name (used as rkey) 48 + /// Project name (resolves device rkey from local config) 49 49 #[arg(long)] 50 - site: String, 50 + project: String, 51 51 /// Output directory 52 52 #[arg(long)] 53 53 output: String, ··· 61 61 #[arg(long)] 62 62 verbose: bool, 63 63 }, 64 - /// Merge sites from multiple collaborators 64 + /// Merge repos from all devices for a project 65 65 Merge { 66 - /// Comma-separated site rkeys to merge 66 + /// Project name (uses collaborators from own record, or falls back to listRecords) 67 67 #[arg(long)] 68 - sites: String, 69 - /// AT Protocol handle 68 + project: String, 69 + /// ATProto handle 70 70 #[arg(long)] 71 71 handle: String, 72 72 /// Output directory ··· 78 78 /// PDS URL 79 79 #[arg(long)] 80 80 pds: Option<String>, 81 + /// Working directory with .yrs/ config (to resolve own rkey for collaborator lookup) 82 + #[arg(long)] 83 + dir: Option<String>, 81 84 /// Show progress 82 85 #[arg(long)] 83 86 verbose: bool, ··· 87 90 /// Directory to sync 88 91 #[arg(long)] 89 92 dir: String, 90 - /// AT Protocol handle 93 + /// ATProto handle 91 94 #[arg(long)] 92 95 handle: String, 93 - /// Site name (used as rkey) 96 + /// Project name (shared across devices) 94 97 #[arg(long)] 95 - site: String, 98 + project: String, 96 99 /// Password for authentication 97 100 #[arg(long)] 98 101 password: String, ··· 112 115 #[arg(long)] 113 116 verbose: bool, 114 117 }, 115 - /// Export site content as plain text (no Yrs decoding needed) 118 + /// Export repo content as plain text (no Yrs decoding needed) 116 119 Export { 117 - /// AT Protocol handle 120 + /// ATProto handle 118 121 #[arg(long)] 119 122 handle: String, 120 - /// Site name (used as rkey) 123 + /// Project name (resolves device rkey from local config) 121 124 #[arg(long)] 122 - site: String, 125 + project: String, 123 126 /// Output directory 124 127 #[arg(long)] 125 128 output: String, ··· 133 136 #[arg(long)] 134 137 verbose: bool, 135 138 }, 139 + /// List repos stored on PDS 140 + List { 141 + /// ATProto handle 142 + #[arg(long)] 143 + handle: String, 144 + /// Filter by project name (optional, lists all if omitted) 145 + #[arg(long)] 146 + project: Option<String>, 147 + /// Password for authentication 148 + #[arg(long)] 149 + password: String, 150 + /// PDS URL 151 + #[arg(long)] 152 + pds: Option<String>, 153 + }, 136 154 } 137 155 138 156 #[tokio::main] ··· 143 161 Command::Save { 144 162 dir, 145 163 handle, 146 - site, 164 + project, 147 165 password, 148 166 pds, 149 167 include, ··· 153 171 run_save( 154 172 &dir, 155 173 &handle, 156 - &site, 174 + &project, 157 175 &password, 158 176 pds.as_deref(), 159 177 &include, ··· 164 182 } 165 183 Command::Load { 166 184 handle, 167 - site, 185 + project, 168 186 output, 169 187 password, 170 188 pds, 171 189 verbose, 172 - } => run_load(&handle, &site, &output, &password, pds.as_deref(), verbose).await, 190 + } => run_load(&handle, &project, &output, &password, pds.as_deref(), verbose).await, 173 191 Command::Merge { 174 - sites, 192 + project, 175 193 handle, 176 194 output, 177 195 password, 178 196 pds, 197 + dir, 179 198 verbose, 180 - } => run_merge(&sites, &handle, &output, &password, pds.as_deref(), verbose).await, 199 + } => run_merge(&project, &handle, &output, &password, pds.as_deref(), dir.as_deref(), verbose).await, 181 200 Command::Sync { 182 201 dir, 183 202 handle, 184 - site, 203 + project, 185 204 password, 186 205 pds, 187 206 interval, ··· 192 211 run_sync( 193 212 &dir, 194 213 &handle, 195 - &site, 214 + &project, 196 215 &password, 197 216 pds.as_deref(), 198 217 interval, ··· 204 223 } 205 224 Command::Export { 206 225 handle, 207 - site, 226 + project, 208 227 output, 209 228 password, 210 229 pds, 211 230 verbose, 212 - } => run_export(&handle, &site, &output, &password, pds.as_deref(), verbose).await, 231 + } => run_export(&handle, &project, &output, &password, pds.as_deref(), verbose).await, 232 + Command::List { 233 + handle, 234 + project, 235 + password, 236 + pds, 237 + } => run_list(&handle, project.as_deref(), &password, pds.as_deref()).await, 213 238 }; 214 239 215 240 if let Err(e) = result { ··· 229 254 Ok((client, session.did)) 230 255 } 231 256 257 + /// Login with token caching: try cached tokens from .yrs/ config first, 258 + /// fall back to password login, then save tokens for next time. 259 + async fn login_cached( 260 + dir: &std::path::Path, 261 + handle: &str, 262 + password: &str, 263 + pds_url: Option<&str>, 264 + ) -> Result<(pds_yrs::PdsClient, String), String> { 265 + let url = pds_url.unwrap_or("https://bluesky-pds.t1cc.commoninternet.net"); 266 + let local_state = pds_yrs::LocalState::open(dir)?; 267 + 268 + // Try cached tokens from config 269 + if let Some(config) = local_state.load_config()? { 270 + if let (Some(ref access), Some(ref refresh)) = 271 + (&config.access_token, &config.refresh_token) 272 + { 273 + let mut client = pds_yrs::PdsClient::new(url); 274 + client.set_tokens(access.clone(), refresh.clone()); 275 + 276 + // Validate with a lightweight call (getRecord for non-existent record is fine) 277 + // If it fails, the token is expired — try refresh, then fall back to login 278 + match client.refresh_session().await { 279 + Ok(()) => { 280 + // Save refreshed tokens 281 + if let (Some(at), Some(rt)) = (client.access_token(), client.refresh_token()) { 282 + let _ = local_state.save_tokens(at, rt); 283 + } 284 + return Ok((client, config.did)); 285 + } 286 + Err(_) => { 287 + // Refresh failed, fall through to password login 288 + } 289 + } 290 + } 291 + } 292 + 293 + // Fall back to password login 294 + let mut client = pds_yrs::PdsClient::new(url); 295 + let session = client.login(handle, password).await?; 296 + 297 + // Cache tokens 298 + if let (Some(at), Some(rt)) = (client.access_token(), client.refresh_token()) { 299 + let _ = local_state.save_tokens(at, rt); 300 + } 301 + 302 + Ok((client, session.did)) 303 + } 304 + 232 305 async fn run_save( 233 306 dir: &str, 234 307 handle: &str, 235 - site: &str, 308 + project: &str, 236 309 password: &str, 237 310 pds_url: Option<&str>, 238 311 include: &[String], 239 312 exclude: &[String], 240 313 verbose: bool, 241 314 ) -> Result<(), String> { 242 - let (client, did) = login(handle, password, pds_url).await?; 315 + let url = pds_url.unwrap_or("https://bluesky-pds.t1cc.commoninternet.net"); 316 + let dir_path = std::path::Path::new(dir); 317 + let (client, did) = login_cached(dir_path, handle, password, Some(url)).await?; 318 + let local_state = pds_yrs::LocalState::open(dir_path)?; 319 + let (rkey, is_new) = local_state.ensure_device_rkey(project, url, handle, &did)?; 320 + if verbose { 321 + eprintln!( 322 + "pds-yrs: project='{}', device rkey='{}' ({})", 323 + project, 324 + rkey, 325 + if is_new { "new" } else { "existing" } 326 + ); 327 + } 328 + 329 + // On first initialization, discover peers and register as collaborator 330 + let initial_collaborators = if is_new { 331 + discover_and_register(&client, &did, project, &rkey, verbose).await? 332 + } else { 333 + None 334 + }; 335 + 243 336 let inc = if include.is_empty() { 244 337 None 245 338 } else { ··· 251 344 Some(exclude) 252 345 }; 253 346 let result = pds_yrs::save_filtered( 254 - std::path::Path::new(dir), 347 + dir_path, 255 348 &client, 256 349 &did, 257 - site, 350 + &rkey, 351 + project, 352 + initial_collaborators.as_deref(), 258 353 inc, 259 354 exc, 260 355 verbose, ··· 267 362 Ok(()) 268 363 } 269 364 365 + /// On first initialization, discover existing repos for the same project, 366 + /// add our rkey as a collaborator to each, and return them as our initial collaborators. 367 + async fn discover_and_register( 368 + client: &pds_yrs::PdsClient, 369 + did: &str, 370 + project: &str, 371 + our_rkey: &str, 372 + verbose: bool, 373 + ) -> Result<Option<Vec<pds_yrs::types::Collaborator>>, String> { 374 + let records = client 375 + .list_all_records(did, pds_yrs::COLLECTION) 376 + .await?; 377 + 378 + let mut peer_collaborators: Vec<pds_yrs::types::Collaborator> = Vec::new(); 379 + 380 + for entry in &records { 381 + let peer_rkey = match entry.uri.rsplit('/').next() { 382 + Some(r) => r.to_string(), 383 + None => continue, 384 + }; 385 + if peer_rkey == our_rkey { 386 + continue; 387 + } 388 + let repo: pds_yrs::types::YrsRepo = match serde_json::from_value(entry.value.clone()) { 389 + Ok(r) => r, 390 + Err(_) => continue, 391 + }; 392 + if repo.name != project { 393 + continue; 394 + } 395 + 396 + // Add our rkey as collaborator to this peer's record 397 + if !repo.collaborators.iter().any(|c| c.rkey == our_rkey) { 398 + let mut updated = repo.clone(); 399 + updated.collaborators.push(pds_yrs::types::Collaborator { 400 + rkey: our_rkey.to_string(), 401 + pds: None, 402 + }); 403 + let updated_json = serde_json::to_value(&updated) 404 + .map_err(|e| format!("serialize updated peer: {}", e))?; 405 + client 406 + .put_record(did, pds_yrs::COLLECTION, &peer_rkey, updated_json, entry.cid.clone()) 407 + .await?; 408 + if verbose { 409 + eprintln!( 410 + "pds-yrs: registered as collaborator on peer '{}'", 411 + peer_rkey 412 + ); 413 + } 414 + } 415 + 416 + // Track this peer as our collaborator 417 + peer_collaborators.push(pds_yrs::types::Collaborator { 418 + rkey: peer_rkey, 419 + pds: None, 420 + }); 421 + } 422 + 423 + if peer_collaborators.is_empty() { 424 + Ok(None) 425 + } else { 426 + if verbose { 427 + let names: Vec<&str> = peer_collaborators.iter().map(|c| c.rkey.as_str()).collect(); 428 + eprintln!("pds-yrs: discovered {} peer(s): {}", names.len(), names.join(", ")); 429 + } 430 + Ok(Some(peer_collaborators)) 431 + } 432 + } 433 + 270 434 async fn run_load( 271 435 handle: &str, 272 - site: &str, 436 + project: &str, 273 437 output: &str, 274 438 password: &str, 275 439 pds_url: Option<&str>, 276 440 verbose: bool, 277 441 ) -> Result<(), String> { 278 442 let (client, did) = login(handle, password, pds_url).await?; 279 - let result = pds_yrs::load(&client, &did, site, std::path::Path::new(output), verbose).await?; 443 + let output_path = std::path::Path::new(output); 444 + let local_state = pds_yrs::LocalState::open(output_path)?; 445 + let rkey = match local_state.load_config()? { 446 + Some(config) if config.project == project => config.repo_rkey, 447 + _ => return Err(format!( 448 + "no device rkey found for project '{}' — run 'save' first to initialize", 449 + project 450 + )), 451 + }; 452 + if verbose { 453 + eprintln!("pds-yrs: loading project='{}', rkey='{}'", project, rkey); 454 + } 455 + let result = pds_yrs::load(&client, &did, &rkey, output_path, verbose).await?; 280 456 eprintln!( 281 457 "pds-yrs: loaded {} file(s), {} blob(s) downloaded", 282 458 result.files_loaded, result.blobs_downloaded ··· 285 461 } 286 462 287 463 async fn run_merge( 288 - sites: &str, 464 + project: &str, 289 465 handle: &str, 290 466 output: &str, 291 467 password: &str, 292 468 pds_url: Option<&str>, 469 + dir: Option<&str>, 293 470 verbose: bool, 294 471 ) -> Result<(), String> { 295 472 let (client, did) = login(handle, password, pds_url).await?; 296 - let rkeys: Vec<&str> = sites.split(',').collect(); 297 - pds_yrs::merge_sites(&client, &did, &rkeys, std::path::Path::new(output), verbose).await?; 473 + // Try to resolve own rkey from local config for collaborator-based merge 474 + let own_rkey = if let Some(d) = dir { 475 + let local_state = pds_yrs::LocalState::open(std::path::Path::new(d))?; 476 + local_state 477 + .load_config()? 478 + .filter(|c| c.project == project) 479 + .map(|c| c.repo_rkey) 480 + } else { 481 + None 482 + }; 483 + pds_yrs::merge_project( 484 + &client, 485 + &did, 486 + project, 487 + own_rkey.as_deref(), 488 + std::path::Path::new(output), 489 + verbose, 490 + ) 491 + .await?; 298 492 eprintln!("pds-yrs: merge complete"); 299 493 Ok(()) 300 494 } ··· 302 496 async fn run_sync( 303 497 dir: &str, 304 498 handle: &str, 305 - site: &str, 499 + project: &str, 306 500 password: &str, 307 501 pds_url: Option<&str>, 308 502 interval: u64, ··· 310 504 exclude: &[String], 311 505 verbose: bool, 312 506 ) -> Result<(), String> { 313 - let (client, did) = login(handle, password, pds_url).await?; 507 + let url = pds_url.unwrap_or("https://bluesky-pds.t1cc.commoninternet.net"); 508 + let dir_path = std::path::Path::new(dir); 509 + let (client, did) = login_cached(dir_path, handle, password, Some(url)).await?; 510 + let local_state = pds_yrs::LocalState::open(dir_path)?; 511 + let (rkey, is_new) = local_state.ensure_device_rkey(project, url, handle, &did)?; 512 + if is_new { 513 + discover_and_register(&client, &did, project, &rkey, verbose).await?; 514 + } 314 515 let config = pds_yrs::SyncConfig { 315 516 dir: dir.to_string(), 316 517 interval: std::time::Duration::from_secs(interval), ··· 321 522 verbose, 322 523 }; 323 524 eprintln!( 324 - "pds-yrs: starting sync (interval={}s, Ctrl+C to stop)", 325 - interval 525 + "pds-yrs: starting sync project='{}', rkey='{}' (interval={}s, Ctrl+C to stop)", 526 + project, rkey, interval 326 527 ); 327 - pds_yrs::sync_loop(&client, &did, site, &config).await?; 528 + pds_yrs::sync_loop(&client, &did, &rkey, project, &config).await?; 328 529 Ok(()) 329 530 } 330 531 331 532 async fn run_export( 332 533 handle: &str, 333 - site: &str, 534 + project: &str, 334 535 output: &str, 335 536 password: &str, 336 537 pds_url: Option<&str>, 337 538 verbose: bool, 338 539 ) -> Result<(), String> { 339 540 let (client, did) = login(handle, password, pds_url).await?; 340 - let count = pds_yrs::export(&client, &did, site, std::path::Path::new(output), verbose).await?; 541 + let output_path = std::path::Path::new(output); 542 + let local_state = pds_yrs::LocalState::open(output_path)?; 543 + let rkey = match local_state.load_config()? { 544 + Some(config) if config.project == project => config.repo_rkey, 545 + _ => return Err(format!( 546 + "no device rkey found for project '{}' — run 'save' first to initialize", 547 + project 548 + )), 549 + }; 550 + if verbose { 551 + eprintln!("pds-yrs: exporting project='{}', rkey='{}'", project, rkey); 552 + } 553 + let count = pds_yrs::export(&client, &did, &rkey, output_path, verbose).await?; 341 554 eprintln!("pds-yrs: exported {} file(s)", count); 342 555 Ok(()) 343 556 } 557 + 558 + async fn run_list( 559 + handle: &str, 560 + project: Option<&str>, 561 + password: &str, 562 + pds_url: Option<&str>, 563 + ) -> Result<(), String> { 564 + let (client, did) = login(handle, password, pds_url).await?; 565 + let records = client 566 + .list_all_records(&did, pds_yrs::COLLECTION) 567 + .await?; 568 + 569 + let mut entries: Vec<(String, String, String, usize, usize)> = Vec::new(); 570 + for entry in &records { 571 + if let Ok(repo) = serde_json::from_value::<pds_yrs::types::YrsRepo>(entry.value.clone()) { 572 + if let Some(filter) = project { 573 + if repo.name != filter { 574 + continue; 575 + } 576 + } 577 + let rkey = entry 578 + .uri 579 + .rsplit('/') 580 + .next() 581 + .unwrap_or("?") 582 + .to_string(); 583 + let file_count = repo 584 + .files 585 + .keys() 586 + .filter(|k| !k.starts_with("pdsyrs_")) 587 + .count(); 588 + let collab_count = repo.collaborators.len(); 589 + entries.push((repo.name, rkey, repo.updated_at, file_count, collab_count)); 590 + } 591 + } 592 + 593 + if entries.is_empty() { 594 + eprintln!("pds-yrs: no repos found"); 595 + return Ok(()); 596 + } 597 + 598 + println!("{:<20} {:<30} {:<25} {:<6} {}", "PROJECT", "RKEY", "UPDATED", "FILES", "COLLABS"); 599 + for (name, rkey, updated, files, collabs) in &entries { 600 + println!("{:<20} {:<30} {:<25} {:<6} {}", name, rkey, updated, files, collabs); 601 + } 602 + 603 + Ok(()) 604 + }
+216 -74
src/merge.rs
··· 1 - //! Merge multiple collaborators' sites via CRDT. 1 + //! Merge multiple collaborators' repos via CRDT. 2 2 3 3 use std::collections::HashMap; 4 4 use std::path::Path; ··· 6 6 use yrs::updates::decoder::Decode; 7 7 use yrs::{Doc, ReadTxn, Transact}; 8 8 9 + use crate::pack; 9 10 use crate::pds_client::PdsClient; 10 - use crate::types::{FileKind, SiteRecord, COLLECTION, MANIFEST_KEY}; 11 + use crate::types::{FileEntry, FileKind, PackRef, YrsRepo, COLLECTION, MANIFEST_KEY}; 11 12 use crate::yrs_pds; 12 13 13 - /// Merge sites from multiple rkeys into an output directory. 14 + /// Merge all repos for a project. 15 + /// 16 + /// If `own_rkey` is provided, uses the collaborators field from that record 17 + /// to discover peer rkeys (no `listRecords` needed). Otherwise falls back 18 + /// to `listRecords` to discover all repos for the project. 19 + pub async fn merge_project( 20 + client: &PdsClient, 21 + did: &str, 22 + project_name: &str, 23 + own_rkey: Option<&str>, 24 + output_dir: &Path, 25 + verbose: bool, 26 + ) -> Result<(), String> { 27 + let mut rkeys: Vec<String> = Vec::new(); 28 + 29 + // Try collaborators from own record first 30 + if let Some(rkey) = own_rkey { 31 + if let Some(record) = client.get_record(did, COLLECTION, rkey).await? { 32 + if let Ok(repo) = serde_json::from_value::<YrsRepo>(record.value) { 33 + if repo.name == project_name { 34 + rkeys.push(rkey.to_string()); 35 + for collab in &repo.collaborators { 36 + if !rkeys.iter().any(|r| r == &collab.rkey) { 37 + rkeys.push(collab.rkey.clone()); 38 + } 39 + } 40 + } 41 + } 42 + } 43 + } 44 + 45 + // Fall back to listRecords if no collaborators found 46 + if rkeys.is_empty() { 47 + if verbose { 48 + eprintln!("pds-yrs: no collaborators found, discovering via listRecords"); 49 + } 50 + let records = client.list_all_records(did, COLLECTION).await?; 51 + for entry in &records { 52 + if let Ok(repo) = serde_json::from_value::<YrsRepo>(entry.value.clone()) { 53 + if repo.name == project_name { 54 + if let Some(rkey) = entry.uri.rsplit('/').next() { 55 + rkeys.push(rkey.to_string()); 56 + } 57 + } 58 + } 59 + } 60 + } 61 + 62 + if rkeys.is_empty() { 63 + return Err(format!("no repos found for project: {}", project_name)); 64 + } 65 + if verbose { 66 + eprintln!( 67 + "pds-yrs: merging {} repo(s) for project '{}': {}", 68 + rkeys.len(), 69 + project_name, 70 + rkeys.join(", ") 71 + ); 72 + } 73 + let rkey_refs: Vec<&str> = rkeys.iter().map(|s| s.as_str()).collect(); 74 + merge_repos(client, did, &rkey_refs, output_dir, verbose).await 75 + } 76 + 77 + /// Merge repos from multiple rkeys into an output directory. 14 78 /// 15 79 /// For text files: CRDT merge all Yrs Docs (conflict-free). 16 80 /// For binary files: detect conflicts via CID comparison, create 17 81 /// `file.creator1.ext` + `file.creator2.ext` when CIDs differ. 18 82 /// Manifest Maps are CRDT-merged — "set wins over delete" for edit-wins. 19 - pub async fn merge_sites( 83 + pub async fn merge_repos( 20 84 client: &PdsClient, 21 85 did: &str, 22 86 rkeys: &[&str], 23 87 output_dir: &Path, 24 88 verbose: bool, 25 89 ) -> Result<(), String> { 26 - // Fetch all site records 27 - let mut sites: Vec<(String, SiteRecord)> = Vec::new(); 90 + // Fetch all repo records 91 + let mut repos: Vec<(String, YrsRepo)> = Vec::new(); 28 92 for rkey in rkeys { 29 93 let record = client 30 94 .get_record(did, COLLECTION, rkey) 31 95 .await? 32 - .ok_or_else(|| format!("site record not found: {}", rkey))?; 33 - let site: SiteRecord = serde_json::from_value(record.value) 34 - .map_err(|e| format!("parse SiteRecord for {}: {}", rkey, e))?; 35 - sites.push((rkey.to_string(), site)); 96 + .ok_or_else(|| format!("repo record not found: {}", rkey))?; 97 + let repo: YrsRepo = serde_json::from_value(record.value) 98 + .map_err(|e| format!("parse YrsRepo for {}: {}", rkey, e))?; 99 + repos.push((rkey.to_string(), repo)); 36 100 } 37 101 102 + // Pack cache: keyed by CID, avoids redundant blob downloads. 103 + // All files in a repo typically share 1-2 pack blobs, so this reduces 104 + // O(N × sites) blob downloads to O(pack_blobs × sites). 105 + let mut pack_cache: HashMap<String, Vec<u8>> = HashMap::new(); 106 + 38 107 // CRDT-merge manifests 39 - let merged_manifest = merge_manifests(&sites, client, did).await?; 108 + let merged_manifest = merge_manifests(&repos, client, did, &mut pack_cache).await?; 40 109 let manifest_entries = yrs_pds::manifest_entries(&merged_manifest); 41 110 42 111 if verbose { ··· 46 115 ); 47 116 } 48 117 49 - // For each file in the merged manifest, merge across sites 118 + // For each file in the merged manifest, merge across repos 50 119 for (rel_path, kind) in &manifest_entries { 51 - // Collect which sites have this file 52 - let mut site_indices: Vec<usize> = Vec::new(); 53 - for (i, (_, site)) in sites.iter().enumerate() { 54 - if site.files.contains_key(rel_path) { 55 - site_indices.push(i); 120 + // Collect which repos have this file 121 + let mut repo_indices: Vec<usize> = Vec::new(); 122 + for (i, (_, repo)) in repos.iter().enumerate() { 123 + if repo.files.contains_key(rel_path) { 124 + repo_indices.push(i); 56 125 } 57 126 } 58 127 59 - if site_indices.is_empty() { 128 + if repo_indices.is_empty() { 60 129 continue; 61 130 } 62 131 63 132 if verbose { 64 133 eprintln!( 65 - "pds-yrs: merging {} ({}, from {} site(s))", 134 + "pds-yrs: merging {} ({}, from {} repo(s))", 66 135 rel_path, 67 136 match kind { 68 137 FileKind::Text => "text", 69 138 FileKind::Binary => "binary", 70 139 }, 71 - site_indices.len() 140 + repo_indices.len() 72 141 ); 73 142 } 74 143 ··· 80 149 81 150 match kind { 82 151 FileKind::Text => { 83 - let content = merge_text_file(rel_path, &site_indices, &sites, client, did).await?; 152 + let content = 153 + merge_text_file(rel_path, &repo_indices, &repos, client, did, &mut pack_cache) 154 + .await?; 84 155 std::fs::write(&output_path, &content) 85 156 .map_err(|e| format!("write {:?}: {}", output_path, e))?; 86 157 } 87 158 FileKind::Binary => { 88 159 merge_binary_file( 89 160 rel_path, 90 - &site_indices, 91 - &sites, 161 + &repo_indices, 162 + &repos, 92 163 rkeys, 93 164 client, 94 165 did, ··· 106 177 Ok(()) 107 178 } 108 179 109 - /// CRDT-merge manifest Maps from all sites. 180 + /// CRDT-merge manifest Maps from all repos. 110 181 async fn merge_manifests( 111 - sites: &[(String, SiteRecord)], 182 + repos: &[(String, YrsRepo)], 112 183 client: &PdsClient, 113 184 did: &str, 185 + pack_cache: &mut HashMap<String, Vec<u8>>, 114 186 ) -> Result<Doc, String> { 115 187 let mut manifest_docs: Vec<Doc> = Vec::new(); 116 188 117 - for (_rkey, site) in sites { 118 - let doc = if let Some(manifest_entry) = site.files.get(MANIFEST_KEY) { 119 - yrs_pds::file_entry_to_doc(manifest_entry, client, did).await? 189 + for (_rkey, repo) in repos { 190 + let doc = if let Some(manifest_entry) = repo.files.get(MANIFEST_KEY) { 191 + file_entry_to_doc_cached(manifest_entry, client, did, pack_cache).await? 120 192 } else { 121 - // Legacy site — create manifest from its files 193 + // Legacy repo — create manifest from its files 122 194 let doc = yrs_pds::new_manifest_doc(); 123 - for (path, entry) in &site.files { 195 + for (path, entry) in &repo.files { 124 196 if path != MANIFEST_KEY { 125 197 yrs_pds::manifest_insert(&doc, path, &entry.kind); 126 198 } ··· 147 219 Ok(manifest_docs.into_iter().next().unwrap()) 148 220 } 149 221 150 - /// CRDT-merge a text file across multiple sites. 222 + /// CRDT-merge a text file across multiple repos. 151 223 async fn merge_text_file( 152 224 rel_path: &str, 153 - site_indices: &[usize], 154 - sites: &[(String, SiteRecord)], 225 + repo_indices: &[usize], 226 + repos: &[(String, YrsRepo)], 155 227 client: &PdsClient, 156 228 did: &str, 229 + pack_cache: &mut HashMap<String, Vec<u8>>, 157 230 ) -> Result<String, String> { 158 - if site_indices.len() == 1 { 159 - let entry = &sites[site_indices[0]].1.files[rel_path]; 160 - let doc = yrs_pds::file_entry_to_doc(entry, client, did).await?; 231 + if repo_indices.len() == 1 { 232 + let entry = &repos[repo_indices[0]].1.files[rel_path]; 233 + let doc = file_entry_to_doc_cached(entry, client, did, pack_cache).await?; 161 234 return Ok(yrs_pds::materialize(&doc)); 162 235 } 163 236 164 237 let mut docs: Vec<Doc> = Vec::new(); 165 - for &idx in site_indices { 166 - let entry = &sites[idx].1.files[rel_path]; 167 - let doc = yrs_pds::file_entry_to_doc(entry, client, did).await?; 238 + for &idx in repo_indices { 239 + let entry = &repos[idx].1.files[rel_path]; 240 + let doc = file_entry_to_doc_cached(entry, client, did, pack_cache).await?; 168 241 docs.push(doc); 169 242 } 170 243 ··· 184 257 /// Handle binary file merge — detect conflicts via CID comparison. 185 258 async fn merge_binary_file( 186 259 rel_path: &str, 187 - site_indices: &[usize], 188 - sites: &[(String, SiteRecord)], 260 + repo_indices: &[usize], 261 + repos: &[(String, YrsRepo)], 189 262 rkeys: &[&str], 190 263 client: &PdsClient, 191 264 did: &str, 192 265 output_dir: &Path, 193 266 ) -> Result<(), String> { 194 - if site_indices.len() == 1 { 195 - // Only one site has this binary file — just download it 196 - let entry = &sites[site_indices[0]].1.files[rel_path]; 267 + if repo_indices.len() == 1 { 268 + // Only one repo has this binary file — just download it 269 + let entry = &repos[repo_indices[0]].1.files[rel_path]; 197 270 let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 198 271 let output_path = output_dir.join(rel_path); 199 272 std::fs::write(&output_path, &data) ··· 201 274 return Ok(()); 202 275 } 203 276 204 - // Collect CIDs from all sites 205 - let mut cid_site: HashMap<String, Vec<usize>> = HashMap::new(); 206 - for &idx in site_indices { 207 - let entry = &sites[idx].1.files[rel_path]; 208 - cid_site 277 + // Collect CIDs from all repos 278 + let mut cid_repo: HashMap<String, Vec<usize>> = HashMap::new(); 279 + for &idx in repo_indices { 280 + let entry = &repos[idx].1.files[rel_path]; 281 + cid_repo 209 282 .entry(entry.snapshot_blob.cid().to_string()) 210 283 .or_default() 211 284 .push(idx); 212 285 } 213 286 214 - if cid_site.len() == 1 { 215 - // All sites have the same CID — no conflict 216 - let entry = &sites[site_indices[0]].1.files[rel_path]; 287 + if cid_repo.len() == 1 { 288 + // All repos have the same CID — no conflict 289 + let entry = &repos[repo_indices[0]].1.files[rel_path]; 217 290 let data = client.get_blob(did, entry.snapshot_blob.cid()).await?; 218 291 let output_path = output_dir.join(rel_path); 219 292 std::fs::write(&output_path, &data) ··· 225 298 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or(""); 226 299 let parent = path.parent().unwrap_or(std::path::Path::new("")); 227 300 228 - for (cid, indices) in &cid_site { 229 - let site_name = &rkeys[indices[0]]; 301 + for (cid, indices) in &cid_repo { 302 + let repo_name = &rkeys[indices[0]]; 230 303 let conflict_name = if ext.is_empty() { 231 - format!("{}.{}", stem, site_name) 304 + format!("{}.{}", stem, repo_name) 232 305 } else { 233 - format!("{}.{}.{}", stem, site_name, ext) 306 + format!("{}.{}.{}", stem, repo_name, ext) 234 307 }; 235 308 let conflict_path = output_dir.join(parent).join(&conflict_name); 236 309 if let Some(p) = conflict_path.parent() { ··· 245 318 Ok(()) 246 319 } 247 320 248 - /// Generate a conflict filename: stem.site_name.ext 249 - pub fn conflict_filename(rel_path: &str, site_name: &str) -> String { 321 + /// Reconstruct a Yrs Doc from a FileEntry, using pack cache to avoid redundant downloads. 322 + async fn file_entry_to_doc_cached( 323 + entry: &FileEntry, 324 + client: &PdsClient, 325 + did: &str, 326 + pack_cache: &mut HashMap<String, Vec<u8>>, 327 + ) -> Result<Doc, String> { 328 + let snapshot_data = get_blob_data_cached(entry, client, did, pack_cache).await?; 329 + let doc = yrs_pds::doc_from_snapshot(&snapshot_data)?; 330 + 331 + for update_ref in &entry.updates { 332 + let update_data = 333 + get_pack_ref_data_cached(update_ref, client, did, pack_cache).await?; 334 + yrs_pds::apply_update(&doc, &update_data)?; 335 + } 336 + 337 + Ok(doc) 338 + } 339 + 340 + /// Get blob data for a file entry's snapshot, using pack cache. 341 + async fn get_blob_data_cached( 342 + entry: &FileEntry, 343 + client: &PdsClient, 344 + did: &str, 345 + pack_cache: &mut HashMap<String, Vec<u8>>, 346 + ) -> Result<Vec<u8>, String> { 347 + if let Some(ref pack_ref) = entry.pack_ref { 348 + get_pack_ref_data_cached(pack_ref, client, did, pack_cache).await 349 + } else { 350 + client.get_blob(did, entry.snapshot_blob.cid()).await 351 + } 352 + } 353 + 354 + /// Extract data from a PackRef, using pack cache to avoid redundant downloads. 355 + async fn get_pack_ref_data_cached( 356 + pack_ref: &PackRef, 357 + client: &PdsClient, 358 + did: &str, 359 + pack_cache: &mut HashMap<String, Vec<u8>>, 360 + ) -> Result<Vec<u8>, String> { 361 + let cid = pack_ref.blob.cid().to_string(); 362 + 363 + if !pack_cache.contains_key(&cid) { 364 + let data = if let Some(ref chunks) = pack_ref.chunks { 365 + let mut chunk_data = Vec::new(); 366 + for chunk_ref in chunks { 367 + chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); 368 + } 369 + pack::reassemble_chunks(&chunk_data) 370 + } else { 371 + client.get_blob(did, &cid).await? 372 + }; 373 + pack_cache.insert(cid.clone(), data); 374 + } 375 + 376 + let pack_data = pack_cache.get(&cid).unwrap(); 377 + let (_, blob_data) = pack::parse_pack_auto(pack_data)?; 378 + 379 + let start = pack_ref.offset as usize; 380 + let end = start + pack_ref.length as usize; 381 + if end > blob_data.len() { 382 + return Err(format!( 383 + "pack_ref out of bounds: {}..{} in {} bytes", 384 + start, end, blob_data.len() 385 + )); 386 + } 387 + Ok(blob_data[start..end].to_vec()) 388 + } 389 + 390 + /// Generate a conflict filename: stem.repo_name.ext 391 + pub fn conflict_filename(rel_path: &str, repo_name: &str) -> String { 250 392 let path = std::path::Path::new(rel_path); 251 393 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("file"); 252 394 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or(""); 253 395 let parent = path.parent().unwrap_or(std::path::Path::new("")); 254 396 255 397 let conflict_name = if ext.is_empty() { 256 - format!("{}.{}", stem, site_name) 398 + format!("{}.{}", stem, repo_name) 257 399 } else { 258 - format!("{}.{}.{}", stem, site_name, ext) 400 + format!("{}.{}.{}", stem, repo_name, ext) 259 401 }; 260 402 261 403 parent.join(&conflict_name).to_string_lossy().to_string() ··· 309 451 310 452 #[test] 311 453 fn binary_merge_no_conflict_same_cid() { 312 - // simulate: two sites have same binary CID → no conflict 454 + // simulate: two repos have same binary CID → no conflict 313 455 let mut cid_site: HashMap<String, Vec<usize>> = HashMap::new(); 314 456 let cid = "bafysame".to_string(); 315 457 cid_site.entry(cid.clone()).or_default().push(0); ··· 321 463 322 464 #[test] 323 465 fn binary_merge_conflict_different_cids() { 324 - // simulate: two sites modify same binary differently → conflict files 325 - let mut cid_site: HashMap<String, Vec<usize>> = HashMap::new(); 326 - cid_site.entry("bafyabc".to_string()).or_default().push(0); 327 - cid_site.entry("bafydef".to_string()).or_default().push(1); 466 + // simulate: two repos modify same binary differently → conflict files 467 + let mut cid_repo: HashMap<String, Vec<usize>> = HashMap::new(); 468 + cid_repo.entry("bafyabc".to_string()).or_default().push(0); 469 + cid_repo.entry("bafydef".to_string()).or_default().push(1); 328 470 329 - assert_eq!(cid_site.len(), 2); 471 + assert_eq!(cid_repo.len(), 2); 330 472 331 473 // verify conflict filenames 332 474 let rkeys = ["site-a", "site-b"]; 333 475 let mut conflict_files = Vec::new(); 334 - for (_, indices) in &cid_site { 335 - let site_name = rkeys[indices[0]]; 336 - conflict_files.push(conflict_filename("logo.png", site_name)); 476 + for (_, indices) in &cid_repo { 477 + let repo_name = rkeys[indices[0]]; 478 + conflict_files.push(conflict_filename("logo.png", repo_name)); 337 479 } 338 480 conflict_files.sort(); 339 481 assert!(conflict_files.contains(&"logo.site-a.png".to_string())); ··· 345 487 use yrs::updates::decoder::Decode; 346 488 use yrs::{Text, Transact}; 347 489 348 - // Simulate two sites editing the same text file 490 + // Simulate two repos editing the same text file 349 491 let base_doc = crate::yrs_pds::doc_from_text("Hello world"); 350 492 let base_snapshot = crate::yrs_pds::encode_snapshot(&base_doc); 351 493 352 - // Site A: adds " from Alice" at end 494 + // Repo A: adds " from Alice" at end 353 495 let doc_a = crate::yrs_pds::doc_from_snapshot(&base_snapshot).unwrap(); 354 496 { 355 497 let text = doc_a.get_or_insert_text("content"); ··· 357 499 text.insert(&mut txn, 11, " from Alice"); 358 500 } 359 501 360 - // Site B: adds "Dear " at beginning 502 + // Repo B: adds "Dear " at beginning 361 503 let doc_b = crate::yrs_pds::doc_from_snapshot(&base_snapshot).unwrap(); 362 504 { 363 505 let text = doc_b.get_or_insert_text("content"); ··· 373 515 374 516 let merged = crate::yrs_pds::materialize(&doc_a); 375 517 // CRDT should include both edits 376 - assert!(merged.contains("Dear "), "should have site B's prefix"); 377 - assert!(merged.contains("from Alice"), "should have site A's suffix"); 518 + assert!(merged.contains("Dear "), "should have repo B's prefix"); 519 + assert!(merged.contains("from Alice"), "should have repo A's suffix"); 378 520 } 379 521 }
+1 -1
src/pack.rs
··· 145 145 data.len() >= 2 && data[0] == 0x1f && data[1] == 0x8b 146 146 } 147 147 148 - /// Maximum blob size before chunking (40MB — AT Protocol limit is ~50MB but leave margin). 148 + /// Maximum blob size before chunking (40MB — ATProto limit is ~50MB but leave margin). 149 149 pub const CHUNK_SIZE: usize = 40 * 1024 * 1024; 150 150 151 151 /// Split data into chunks of at most CHUNK_SIZE bytes.
+90 -1
src/pds_client.rs
··· 1 - //! PDS client for AT Protocol XRPC calls. 1 + //! PDS client for ATProto XRPC calls. 2 2 //! 3 3 //! Simplified port from git-remote-pds — Bearer auth only. 4 4 ··· 66 66 size: u64, 67 67 } 68 68 69 + /// Response from `com.atproto.repo.listRecords`. 70 + #[derive(Debug, Deserialize)] 71 + pub struct ListRecordsResponse { 72 + pub records: Vec<ListRecordEntry>, 73 + pub cursor: Option<String>, 74 + } 75 + 76 + /// A single record entry from `listRecords`. 77 + #[derive(Debug, Deserialize)] 78 + pub struct ListRecordEntry { 79 + pub uri: String, 80 + pub cid: Option<String>, 81 + pub value: serde_json::Value, 82 + } 83 + 69 84 impl PdsClient { 70 85 pub fn new(base_url: impl Into<String>) -> Self { 71 86 Self { ··· 79 94 80 95 pub fn base_url(&self) -> &str { 81 96 &self.base_url 97 + } 98 + 99 + /// Set pre-existing tokens (e.g. from cached config). 100 + pub fn set_tokens(&mut self, access_token: String, refresh_token: String) { 101 + self.auth_token = Some(access_token); 102 + self.refresh_token = Some(refresh_token); 103 + self.token_expiry = 104 + Some(std::time::Instant::now() + std::time::Duration::from_secs(TOKEN_TTL_SECS)); 105 + } 106 + 107 + /// Get the current access token (if authenticated). 108 + pub fn access_token(&self) -> Option<&str> { 109 + self.auth_token.as_deref() 110 + } 111 + 112 + /// Get the current refresh token (if authenticated). 113 + pub fn refresh_token(&self) -> Option<&str> { 114 + self.refresh_token.as_deref() 82 115 } 83 116 84 117 pub async fn login( ··· 327 360 } 328 361 329 362 Ok(()) 363 + } 364 + 365 + /// List records in a collection (single page). 366 + pub async fn list_records( 367 + &self, 368 + did: &str, 369 + collection: &str, 370 + limit: u32, 371 + cursor: Option<&str>, 372 + ) -> Result<ListRecordsResponse, String> { 373 + let mut url = format!( 374 + "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit={}", 375 + self.base_url, did, collection, limit 376 + ); 377 + if let Some(c) = cursor { 378 + url.push_str(&format!("&cursor={}", c)); 379 + } 380 + 381 + let resp = self 382 + .http 383 + .get(&url) 384 + .send() 385 + .await 386 + .map_err(|e| format!("list_records request failed: {}", e))?; 387 + 388 + if !resp.status().is_success() { 389 + let status = resp.status(); 390 + let text = resp.text().await.unwrap_or_default(); 391 + return Err(format!("list_records failed ({}): {}", status, text)); 392 + } 393 + 394 + resp.json() 395 + .await 396 + .map_err(|e| format!("parse list_records response: {}", e)) 397 + } 398 + 399 + /// List all records in a collection (paginated, collects all pages). 400 + pub async fn list_all_records( 401 + &self, 402 + did: &str, 403 + collection: &str, 404 + ) -> Result<Vec<ListRecordEntry>, String> { 405 + let mut all = Vec::new(); 406 + let mut cursor: Option<String> = None; 407 + loop { 408 + let resp = self 409 + .list_records(did, collection, 100, cursor.as_deref()) 410 + .await?; 411 + let has_more = resp.cursor.is_some(); 412 + cursor = resp.cursor; 413 + all.extend(resp.records); 414 + if !has_more { 415 + break; 416 + } 417 + } 418 + Ok(all) 330 419 } 331 420 332 421 /// Delete a record with explicit DID.
+191 -42
src/save.rs
··· 1 - //! Save a directory of files to PDS as a SiteRecord with Yrs CRDT state. 1 + //! Save a directory of files to PDS as a YrsRepo record with Yrs CRDT state. 2 2 3 3 use std::collections::HashMap; 4 4 use std::path::Path; ··· 6 6 use crate::pack::{self, PackDataType}; 7 7 use crate::pds_client::PdsClient; 8 8 use crate::types::{ 9 - BlobRef, FileEntry, FileKind, PackRef, SaveResult, SiteRecord, COLLECTION, MANIFEST_KEY, 9 + BlobRef, Collaborator, FileEntry, FileKind, PackRef, SaveResult, YrsRepo, COLLECTION, 10 + MANIFEST_KEY, 10 11 }; 11 12 use crate::yrs_pds; 12 13 13 - /// Compaction threshold: create new snapshot when updates exceed this count. 14 - const COMPACTION_THRESHOLD: u32 = 50; 14 + /// Compaction threshold: when any file's updates_count reaches this, 15 + /// the entire repo is compacted (all files get fresh snapshots in one pack). 16 + const COMPACTION_THRESHOLD: u32 = 10; 15 17 16 18 /// Pending blob to be packed into a single upload. 17 19 struct PendingBlob { ··· 20 22 data_type: PackDataType, 21 23 } 22 24 25 + /// Whether a pending blob is an incremental update (appended to updates list) 26 + /// or a snapshot (replaces pack_ref). 27 + #[derive(Clone, PartialEq)] 28 + enum PendingKind { 29 + /// Full snapshot — will become the new pack_ref. 30 + Snapshot, 31 + /// Incremental update — will be appended to updates list. 32 + Update, 33 + } 34 + 23 35 /// Save a directory to PDS. 24 36 /// 25 37 /// Maintains a CRDT manifest (Yrs Map) tracking all files. Supports both ··· 30 42 client: &PdsClient, 31 43 did: &str, 32 44 rkey: &str, 45 + project_name: &str, 46 + new_collaborators: Option<&[Collaborator]>, 33 47 verbose: bool, 34 48 ) -> Result<SaveResult, String> { 35 - save_filtered(dir, client, did, rkey, None, None, verbose).await 49 + save_filtered( 50 + dir, 51 + client, 52 + did, 53 + rkey, 54 + project_name, 55 + new_collaborators, 56 + None, 57 + None, 58 + verbose, 59 + ) 60 + .await 36 61 } 37 62 38 63 /// Save a directory to PDS with optional include/exclude glob filters. 64 + /// 65 + /// `new_collaborators` is used on first initialization to set initial collaborators. 66 + /// On subsequent saves, collaborators are preserved from the existing record. 39 67 pub async fn save_filtered( 40 68 dir: &Path, 41 69 client: &PdsClient, 42 70 did: &str, 43 71 rkey: &str, 72 + project_name: &str, 73 + new_collaborators: Option<&[Collaborator]>, 44 74 include: Option<&[String]>, 45 75 exclude: Option<&[String]>, 46 76 verbose: bool, ··· 53 83 54 84 // Fetch existing record if present 55 85 let existing = client.get_record(did, COLLECTION, rkey).await?; 56 - let existing_site: Option<SiteRecord> = existing 86 + let existing_repo: Option<YrsRepo> = existing 57 87 .as_ref() 58 88 .and_then(|r| serde_json::from_value(r.value.clone()).ok()); 59 89 let swap_cid = existing.as_ref().and_then(|r| r.cid.clone()); 60 90 61 91 // Reconstruct or create manifest 62 - let manifest_doc = if let Some(ref site) = existing_site { 63 - if let Some(manifest_entry) = site.files.get(MANIFEST_KEY) { 92 + let manifest_doc = if let Some(ref repo) = existing_repo { 93 + if let Some(manifest_entry) = repo.files.get(MANIFEST_KEY) { 64 94 yrs_pds::file_entry_to_doc(manifest_entry, client, did).await? 65 95 } else { 66 96 let doc = yrs_pds::new_manifest_doc(); 67 - for (path, entry) in &site.files { 97 + for (path, entry) in &repo.files { 68 98 if path != MANIFEST_KEY { 69 99 yrs_pds::manifest_insert(&doc, path, &entry.kind); 70 100 } ··· 77 107 78 108 let mut file_entries: HashMap<String, FileEntry> = HashMap::new(); 79 109 let mut pending_blobs: Vec<PendingBlob> = Vec::new(); 110 + // Track whether each pending blob is a snapshot or incremental update 111 + let mut pending_kinds: HashMap<String, PendingKind> = HashMap::new(); 80 112 let mut files_uploaded = 0; 81 113 let mut files_skipped = 0; 114 + let mut needs_compaction = false; 82 115 83 116 // Track which local files exist (for deletion detection) 84 117 let local_paths: std::collections::HashSet<String> = ··· 99 132 data: file_data.clone(), 100 133 data_type: PackDataType::Binary, 101 134 }); 102 - // Placeholder entry — will be updated with pack_ref after upload 135 + pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 103 136 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 104 137 files_uploaded += 1; 105 138 continue; ··· 107 140 }; 108 141 109 142 // Check if file changed since last save 110 - if let Some(ref site) = existing_site { 111 - if let Some(existing_entry) = site.files.get(rel_path) { 143 + if let Some(ref repo) = existing_repo { 144 + if let Some(existing_entry) = repo.files.get(rel_path) { 112 145 if existing_entry.content == content { 113 146 file_entries.insert(rel_path.clone(), existing_entry.clone()); 114 147 files_skipped += 1; ··· 121 154 // Changed — re-assert in manifest 122 155 yrs_pds::manifest_insert(&manifest_doc, rel_path, &FileKind::Text); 123 156 124 - // Incremental update path 125 - if existing_entry.updates_count < COMPACTION_THRESHOLD { 157 + // Incremental update path: compute diff only 158 + if existing_entry.pack_ref.is_some() 159 + && existing_entry.updates_count + 1 < COMPACTION_THRESHOLD 160 + { 126 161 if let Ok(doc) = 127 162 reconstruct_and_diff(existing_entry, content, client, did).await 128 163 { 129 - let snapshot = yrs_pds::encode_snapshot(&doc); 164 + // Compute only the new ops (diff from previous state vector) 165 + let old_sv_bytes = yrs_pds::base64_decode(&existing_entry.state_vector)?; 166 + let diff = yrs_pds::encode_diff(&doc, &old_sv_bytes)?; 130 167 let sv = yrs_pds::encode_state_vector(&doc); 131 168 let materialized = yrs_pds::materialize(&doc); 132 169 pending_blobs.push(PendingBlob { 133 170 path: rel_path.clone(), 134 - data: snapshot, 135 - data_type: PackDataType::Snapshot, 171 + data: diff, 172 + data_type: PackDataType::Update, 136 173 }); 174 + pending_kinds.insert(rel_path.clone(), PendingKind::Update); 175 + // Keep existing snapshot/pack_ref, will append update 137 176 file_entries.insert( 138 177 rel_path.clone(), 139 178 FileEntry { 140 179 content: materialized, 141 - snapshot_blob: placeholder_blob_ref(), 180 + snapshot_blob: existing_entry.snapshot_blob.clone(), 142 181 state_vector: yrs_pds::base64_encode(&sv), 143 - updates_blob: None, 182 + updates: existing_entry.updates.clone(), 144 183 updates_count: existing_entry.updates_count + 1, 145 - snapshot_at: chrono::Utc::now().to_rfc3339(), 184 + snapshot_at: existing_entry.snapshot_at.clone(), 146 185 kind: FileKind::Text, 147 - pack_ref: None, 186 + pack_ref: existing_entry.pack_ref.clone(), 148 187 conflict_source: None, 149 188 }, 150 189 ); ··· 156 195 } 157 196 } 158 197 198 + // Check if this triggers compaction 199 + if existing_entry.updates_count + 1 >= COMPACTION_THRESHOLD { 200 + needs_compaction = true; 201 + } 202 + 159 203 if verbose { 160 - eprintln!("pds-yrs: full snapshot (compaction) {}", rel_path); 204 + eprintln!("pds-yrs: full snapshot {}", rel_path); 161 205 } 162 206 } else { 163 207 yrs_pds::manifest_insert(&manifest_doc, rel_path, &FileKind::Text); ··· 175 219 data: snapshot, 176 220 data_type: PackDataType::Snapshot, 177 221 }); 222 + pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 178 223 file_entries.insert( 179 224 rel_path.clone(), 180 225 FileEntry { 181 226 content: content.to_string(), 182 227 snapshot_blob: placeholder_blob_ref(), 183 228 state_vector: yrs_pds::base64_encode(&sv), 184 - updates_blob: None, 229 + updates: vec![], 185 230 updates_count: 0, 186 231 snapshot_at: chrono::Utc::now().to_rfc3339(), 187 232 kind: FileKind::Text, ··· 195 240 } 196 241 } 197 242 FileKind::Binary => { 198 - if let Some(ref site) = existing_site { 199 - if let Some(existing_entry) = site.files.get(rel_path) { 243 + if let Some(ref repo) = existing_repo { 244 + if let Some(existing_entry) = repo.files.get(rel_path) { 200 245 if existing_entry.kind == FileKind::Binary { 201 246 let hash = hex_hash(file_data); 202 247 if existing_entry.content == hash { ··· 222 267 data: file_data.clone(), 223 268 data_type: PackDataType::Binary, 224 269 }); 270 + pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 225 271 file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 226 272 files_uploaded += 1; 227 273 if verbose { ··· 231 277 } 232 278 } 233 279 280 + // Compaction: if any file crossed the threshold, re-snapshot ALL files 281 + if needs_compaction { 282 + if verbose { 283 + eprintln!("pds-yrs: compaction triggered — re-snapshotting all files"); 284 + } 285 + pending_blobs.clear(); 286 + pending_kinds.clear(); 287 + file_entries.clear(); 288 + files_uploaded = 0; 289 + files_skipped = 0; 290 + 291 + for (rel_path, file_data, kind) in &local_files { 292 + match kind { 293 + FileKind::Text => { 294 + let content = match std::str::from_utf8(file_data) { 295 + Ok(s) => s, 296 + Err(_) => { 297 + let hash = hex_hash(file_data); 298 + pending_blobs.push(PendingBlob { 299 + path: rel_path.clone(), 300 + data: file_data.clone(), 301 + data_type: PackDataType::Binary, 302 + }); 303 + pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 304 + file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 305 + files_uploaded += 1; 306 + continue; 307 + } 308 + }; 309 + // Reconstruct full doc if we have existing state, else create fresh 310 + let doc = if let Some(ref repo) = existing_repo { 311 + if let Some(existing_entry) = repo.files.get(rel_path) { 312 + if let Ok(d) = reconstruct_and_diff(existing_entry, content, client, did).await { 313 + d 314 + } else { 315 + yrs_pds::doc_from_text(content) 316 + } 317 + } else { 318 + yrs_pds::doc_from_text(content) 319 + } 320 + } else { 321 + yrs_pds::doc_from_text(content) 322 + }; 323 + let snapshot = yrs_pds::encode_snapshot(&doc); 324 + let sv = yrs_pds::encode_state_vector(&doc); 325 + let materialized = yrs_pds::materialize(&doc); 326 + pending_blobs.push(PendingBlob { 327 + path: rel_path.clone(), 328 + data: snapshot, 329 + data_type: PackDataType::Snapshot, 330 + }); 331 + pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 332 + file_entries.insert( 333 + rel_path.clone(), 334 + FileEntry { 335 + content: materialized, 336 + snapshot_blob: placeholder_blob_ref(), 337 + state_vector: yrs_pds::base64_encode(&sv), 338 + updates: vec![], 339 + updates_count: 0, 340 + snapshot_at: chrono::Utc::now().to_rfc3339(), 341 + kind: FileKind::Text, 342 + pack_ref: None, 343 + conflict_source: None, 344 + }, 345 + ); 346 + files_uploaded += 1; 347 + } 348 + FileKind::Binary => { 349 + let hash = hex_hash(file_data); 350 + pending_blobs.push(PendingBlob { 351 + path: rel_path.clone(), 352 + data: file_data.clone(), 353 + data_type: PackDataType::Binary, 354 + }); 355 + pending_kinds.insert(rel_path.clone(), PendingKind::Snapshot); 356 + file_entries.insert(rel_path.clone(), placeholder_binary_entry(&hash)); 357 + files_uploaded += 1; 358 + } 359 + } 360 + } 361 + } 362 + 234 363 // Detect deletions: files in manifest but not on disk 235 364 let manifest_entries = yrs_pds::manifest_entries(&manifest_doc); 236 365 for path in manifest_entries.keys() { ··· 250 379 data: manifest_snapshot, 251 380 data_type: PackDataType::Snapshot, 252 381 }); 382 + pending_kinds.insert(MANIFEST_KEY.to_string(), PendingKind::Snapshot); 253 383 254 - // Upload all blobs as a single pack (or individually if nothing to pack) 384 + // Upload all blobs as a single pack 255 385 let total_bytes; 256 386 if pending_blobs.is_empty() { 257 387 total_bytes = 0; 258 - // Still need manifest entry 259 388 let manifest_entry = yrs_pds::doc_to_file_entry(&manifest_doc, client, did).await?; 260 389 file_entries.insert(MANIFEST_KEY.to_string(), manifest_entry); 261 390 } else { ··· 268 397 let is_compressed = pack::is_gzip(&pack_blob.data); 269 398 total_bytes = pack_blob.data.len() as u64; 270 399 271 - // Upload pack blob — chunk if larger than AT Protocol limit 400 + // Upload pack blob — chunk if larger than ATProto limit 272 401 let (blob_ref, chunk_refs) = if pack_blob.data.len() > pack::CHUNK_SIZE { 273 402 let chunks = pack::chunk_data(&pack_blob.data); 274 403 let mut refs = Vec::new(); ··· 284 413 } 285 414 refs.push(r); 286 415 } 287 - // Use first chunk's ref as the primary blob ref 288 416 let primary = refs[0].clone(); 289 417 (primary, Some(refs)) 290 418 } else { ··· 316 444 }; 317 445 318 446 if entry.path == MANIFEST_KEY { 319 - // Manifest entry 320 447 let manifest_content = yrs_pds::materialize_manifest_content(&manifest_doc); 321 448 file_entries.insert( 322 449 MANIFEST_KEY.to_string(), ··· 324 451 content: manifest_content, 325 452 snapshot_blob: blob_ref.clone(), 326 453 state_vector: yrs_pds::base64_encode(&manifest_sv), 327 - updates_blob: None, 454 + updates: vec![], 328 455 updates_count: 0, 329 456 snapshot_at: chrono::Utc::now().to_rfc3339(), 330 457 kind: FileKind::Text, ··· 333 460 }, 334 461 ); 335 462 } else if let Some(fe) = file_entries.get_mut(&entry.path) { 336 - // Update placeholder blob ref with actual pack ref 337 - fe.snapshot_blob = blob_ref.clone(); 338 - fe.pack_ref = Some(pack_ref); 463 + let kind = pending_kinds.get(&entry.path).cloned().unwrap_or(PendingKind::Snapshot); 464 + match kind { 465 + PendingKind::Snapshot => { 466 + // Full snapshot — replace pack_ref, clear updates 467 + fe.snapshot_blob = blob_ref.clone(); 468 + fe.pack_ref = Some(pack_ref); 469 + fe.updates.clear(); 470 + } 471 + PendingKind::Update => { 472 + // Incremental update — append to updates list 473 + fe.updates.push(pack_ref); 474 + } 475 + } 339 476 } 340 477 } 341 478 } 342 479 343 - // Build SiteRecord 480 + // Build YrsRepo — preserve existing collaborators, merge with any new ones 481 + let mut collaborators = existing_repo 482 + .as_ref() 483 + .map(|r| r.collaborators.clone()) 484 + .unwrap_or_default(); 485 + if let Some(new_collabs) = new_collaborators { 486 + for c in new_collabs { 487 + if !collaborators.iter().any(|existing| existing.rkey == c.rkey) { 488 + collaborators.push(c.clone()); 489 + } 490 + } 491 + } 344 492 let now = chrono::Utc::now().to_rfc3339(); 345 - let record = SiteRecord { 346 - name: rkey.to_string(), 493 + let record = YrsRepo { 494 + name: project_name.to_string(), 347 495 files: file_entries, 348 496 updated_at: now, 497 + collaborators, 349 498 }; 350 499 351 500 let record_json = 352 - serde_json::to_value(&record).map_err(|e| format!("serialize SiteRecord: {}", e))?; 501 + serde_json::to_value(&record).map_err(|e| format!("serialize YrsRepo: {}", e))?; 353 502 354 503 client 355 504 .put_record(did, COLLECTION, rkey, record_json, swap_cid) ··· 377 526 content: hash.to_string(), 378 527 snapshot_blob: placeholder_blob_ref(), 379 528 state_vector: String::new(), 380 - updates_blob: None, 529 + updates: vec![], 381 530 updates_count: 0, 382 531 snapshot_at: chrono::Utc::now().to_rfc3339(), 383 532 kind: FileKind::Binary, ··· 563 712 fn collect_files_skips_hidden_and_pds_yrs() { 564 713 let tmp = tempfile::tempdir().unwrap(); 565 714 std::fs::write(tmp.path().join("visible.md"), "content").unwrap(); 566 - std::fs::create_dir_all(tmp.path().join(".pds-yrs")).unwrap(); 567 - std::fs::write(tmp.path().join(".pds-yrs/state.yrs"), "state").unwrap(); 715 + std::fs::create_dir_all(tmp.path().join(".yrs")).unwrap(); 716 + std::fs::write(tmp.path().join(".yrs/state.yrs"), "state").unwrap(); 568 717 std::fs::create_dir_all(tmp.path().join(".git")).unwrap(); 569 718 std::fs::write(tmp.path().join(".git/HEAD"), "ref").unwrap(); 570 719 ··· 652 801 assert_eq!(entry.kind, FileKind::Binary); 653 802 assert_eq!(entry.content, "abc123"); 654 803 assert!(entry.state_vector.is_empty()); 655 - assert!(entry.updates_blob.is_none()); 804 + assert!(entry.updates.is_empty()); 656 805 } 657 806 658 807 #[test]
+4 -2
src/sync.rs
··· 66 66 client: &PdsClient, 67 67 did: &str, 68 68 rkey: &str, 69 + project_name: &str, 69 70 config: &SyncConfig, 70 71 ) -> Result<Vec<SyncCycleResult>, String> { 71 72 let dir = Path::new(&config.dir); ··· 79 80 } 80 81 } 81 82 82 - let result = sync_cycle(client, did, rkey, dir, cycle, config).await?; 83 + let result = sync_cycle(client, did, rkey, project_name, dir, cycle, config).await?; 83 84 84 85 if config.verbose { 85 86 eprintln!( ··· 113 114 client: &PdsClient, 114 115 did: &str, 115 116 rkey: &str, 117 + project_name: &str, 116 118 dir: &Path, 117 119 cycle: u32, 118 120 config: &SyncConfig, ··· 157 159 } else { 158 160 // Save local changes with filters 159 161 let save_result = 160 - save::save_filtered(dir, client, did, rkey, inc, exc, config.verbose).await?; 162 + save::save_filtered(dir, client, did, rkey, project_name, None, inc, exc, config.verbose).await?; 161 163 files_uploaded = save_result.files_uploaded; 162 164 163 165 // Determine if this cycle should materialize
+37 -17
src/types.rs
··· 1 - //! AT Protocol types for CRDT-on-PDS storage. 1 + //! ATProto types for CRDT-on-PDS storage. 2 2 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::collections::HashMap; 5 5 6 - /// Collection name for site records. 7 - pub const COLLECTION: &str = "net.commoninternet.lichen.site"; 6 + /// Collection name for yrs repo records. 7 + pub const COLLECTION: &str = "net.commoninternet.yrsrepo"; 8 8 9 - /// Key for the manifest FileEntry in the SiteRecord. 10 - pub const MANIFEST_KEY: &str = "_manifest"; 9 + /// Key for the manifest FileEntry in the YrsRepo. 10 + pub const MANIFEST_KEY: &str = "pdsyrs_manifest"; 11 11 12 12 /// File kind — determines how the file is stored and merged. 13 13 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] ··· 19 19 Binary, 20 20 } 21 21 22 - /// A site stored on PDS with Yrs CRDT state per file. 22 + /// A collaborator reference — points to another device's rkey, optionally on a different PDS. 23 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 24 + pub struct Collaborator { 25 + /// The rkey of the collaborator's repo record. 26 + pub rkey: String, 27 + /// PDS URL if different from the current PDS (for cross-PDS collaboration). 28 + #[serde(skip_serializing_if = "Option::is_none")] 29 + pub pds: Option<String>, 30 + } 31 + 32 + /// A repo stored on PDS with Yrs CRDT state per file. 33 + /// 34 + /// `name` is the project name — shared across all devices/writers for the same project. 35 + /// Each device gets its own rkey (auto-generated), while `name` identifies the project. 36 + /// `collaborators` lists other device rkeys for the same project, enabling merge 37 + /// without needing to list all records. 23 38 #[derive(Debug, Clone, Serialize, Deserialize)] 24 - pub struct SiteRecord { 39 + pub struct YrsRepo { 25 40 pub name: String, 26 41 pub files: HashMap<String, FileEntry>, 27 42 #[serde(rename = "updatedAt")] 28 43 pub updated_at: String, 44 + #[serde(default, skip_serializing_if = "Vec::is_empty")] 45 + pub collaborators: Vec<Collaborator>, 29 46 } 30 47 31 48 /// A single file's state, stored as Yrs CRDT + plain text. ··· 40 57 /// State vector bytes, base64-encoded for inline storage. 41 58 #[serde(rename = "stateVector")] 42 59 pub state_vector: String, 43 - /// Incremental updates since snapshot. 44 - #[serde(rename = "updatesBlob", skip_serializing_if = "Option::is_none")] 45 - pub updates_blob: Option<BlobRef>, 60 + /// Incremental update packs since snapshot (ordered, each points into a pack blob). 61 + #[serde(default, skip_serializing_if = "Vec::is_empty")] 62 + pub updates: Vec<PackRef>, 46 63 /// Number of incremental updates applied since last snapshot. 47 64 #[serde(rename = "updatesCount", default)] 48 65 pub updates_count: u32, ··· 95 112 pub total_size: u64, 96 113 } 97 114 98 - /// AT Protocol blob reference. 115 + /// ATProto blob reference. 99 116 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 100 117 pub struct BlobRef { 101 118 #[serde(rename = "$type")] ··· 172 189 100, 173 190 ), 174 191 state_vector: "AQID".to_string(), 175 - updates_blob: None, 192 + updates: vec![], 176 193 updates_count: 0, 177 194 snapshot_at: "2026-03-13T00:00:00Z".to_string(), 178 195 kind: FileKind::Text, ··· 199 216 5000, 200 217 ), 201 218 state_vector: String::new(), 202 - updates_blob: None, 219 + updates: vec![], 203 220 updates_count: 0, 204 221 snapshot_at: "2026-03-13T00:00:00Z".to_string(), 205 222 kind: FileKind::Binary, ··· 213 230 } 214 231 215 232 #[test] 216 - fn site_record_serialization() { 233 + fn yrs_repo_serialization() { 217 234 let mut files = HashMap::new(); 218 235 files.insert( 219 236 "index.md".to_string(), ··· 225 242 50, 226 243 ), 227 244 state_vector: "AQID".to_string(), 228 - updates_blob: None, 245 + updates: vec![], 229 246 updates_count: 0, 230 247 snapshot_at: "2026-03-13T00:00:00Z".to_string(), 231 248 kind: FileKind::Text, ··· 233 250 conflict_source: None, 234 251 }, 235 252 ); 236 - let record = SiteRecord { 253 + let record = YrsRepo { 237 254 name: "my-site".to_string(), 238 255 files, 239 256 updated_at: "2026-03-13T00:00:00Z".to_string(), 257 + collaborators: vec![], 240 258 }; 241 259 let json = serde_json::to_string(&record).unwrap(); 242 - let deserialized: SiteRecord = serde_json::from_str(&json).unwrap(); 260 + assert!(!json.contains("collaborators")); // empty vec is skipped 261 + let deserialized: YrsRepo = serde_json::from_str(&json).unwrap(); 243 262 assert_eq!(deserialized.name, "my-site"); 244 263 assert!(deserialized.files.contains_key("index.md")); 264 + assert!(deserialized.collaborators.is_empty()); 245 265 } 246 266 247 267 #[test]
+53 -26
src/yrs_pds.rs
··· 92 92 let snapshot_blob = client.upload_blob(snapshot.clone()).await?; 93 93 94 94 // We need to reference the blob in a record for it to persist, 95 - // so we return the FileEntry which will be embedded in a SiteRecord. 95 + // so we return the FileEntry which will be embedded in a YrsRepo. 96 96 97 97 let now = chrono::Utc::now().to_rfc3339(); 98 98 let _ = did; // used by caller for the record ··· 101 101 content, 102 102 snapshot_blob, 103 103 state_vector: base64_encode(&sv), 104 - updates_blob: None, 104 + updates: vec![], 105 105 updates_count: 0, 106 106 snapshot_at: now, 107 107 kind: FileKind::Text, ··· 123 123 let doc = doc_from_snapshot(&snapshot_data)?; 124 124 125 125 // Apply incremental updates if present 126 - if let Some(ref updates_blob) = entry.updates_blob { 127 - let updates_data = client.get_blob(did, updates_blob.cid()).await?; 128 - apply_update(&doc, &updates_data)?; 126 + for update_ref in &entry.updates { 127 + let update_data = get_pack_ref_data(update_ref, client, did).await?; 128 + apply_update(&doc, &update_data)?; 129 129 } 130 130 131 131 Ok(doc) 132 + } 133 + 134 + /// Extract data from a PackRef by downloading and parsing the pack blob. 135 + pub async fn get_pack_ref_data( 136 + pack_ref: &crate::types::PackRef, 137 + client: &PdsClient, 138 + did: &str, 139 + ) -> Result<Vec<u8>, String> { 140 + let pack_data = if let Some(ref chunks) = pack_ref.chunks { 141 + let mut chunk_data = Vec::new(); 142 + for chunk_ref in chunks { 143 + chunk_data.push(client.get_blob(did, chunk_ref.cid()).await?); 144 + } 145 + crate::pack::reassemble_chunks(&chunk_data) 146 + } else { 147 + client.get_blob(did, pack_ref.blob.cid()).await? 148 + }; 149 + let (_, blob_data) = crate::pack::parse_pack_auto(&pack_data)?; 150 + let start = pack_ref.offset as usize; 151 + let end = start + pack_ref.length as usize; 152 + if end > blob_data.len() { 153 + return Err(format!( 154 + "pack_ref out of bounds: {}..{} in {} bytes", 155 + start, end, blob_data.len() 156 + )); 157 + } 158 + Ok(blob_data[start..end].to_vec()) 132 159 } 133 160 134 161 /// Get the raw blob data for a FileEntry, handling pack_ref extraction. ··· 529 556 530 557 #[test] 531 558 fn manifest_crdt_merge_concurrent_add() { 532 - // Two sites start from same base 559 + // Two repos start from same base 533 560 let base = new_manifest_doc(); 534 561 manifest_insert(&base, "shared.md", &FileKind::Text); 535 562 let base_snapshot = encode_manifest_snapshot(&base); 536 563 537 - // Site A adds a file 538 - let site_a = manifest_from_snapshot(&base_snapshot).unwrap(); 539 - manifest_insert(&site_a, "page-a.md", &FileKind::Text); 564 + // Repo A adds a file 565 + let repo_a = manifest_from_snapshot(&base_snapshot).unwrap(); 566 + manifest_insert(&repo_a, "page-a.md", &FileKind::Text); 540 567 541 - // Site B adds a different file 542 - let site_b = manifest_from_snapshot(&base_snapshot).unwrap(); 543 - manifest_insert(&site_b, "page-b.md", &FileKind::Text); 568 + // Repo B adds a different file 569 + let repo_b = manifest_from_snapshot(&base_snapshot).unwrap(); 570 + manifest_insert(&repo_b, "page-b.md", &FileKind::Text); 544 571 545 572 // Merge B into A 546 - let sv_a = site_a.transact().state_vector(); 547 - let diff_b = site_b.transact().encode_diff_v1(&sv_a); 573 + let sv_a = repo_a.transact().state_vector(); 574 + let diff_b = repo_b.transact().encode_diff_v1(&sv_a); 548 575 if let Ok(update) = yrs::Update::decode_v1(&diff_b) { 549 - let _ = site_a.transact_mut().apply_update(update); 576 + let _ = repo_a.transact_mut().apply_update(update); 550 577 } 551 578 552 - let entries = manifest_entries(&site_a); 579 + let entries = manifest_entries(&repo_a); 553 580 assert_eq!(entries.len(), 3); 554 581 assert!(entries.contains_key("shared.md")); 555 582 assert!(entries.contains_key("page-a.md")); ··· 563 590 manifest_insert(&base, "file.md", &FileKind::Text); 564 591 let base_snapshot = encode_manifest_snapshot(&base); 565 592 566 - // Site A deletes the file 567 - let site_a = manifest_from_snapshot(&base_snapshot).unwrap(); 568 - manifest_remove(&site_a, "file.md"); 593 + // Repo A deletes the file 594 + let repo_a = manifest_from_snapshot(&base_snapshot).unwrap(); 595 + manifest_remove(&repo_a, "file.md"); 569 596 570 - // Site B re-asserts the file (simulating an edit) 571 - let site_b = manifest_from_snapshot(&base_snapshot).unwrap(); 572 - manifest_insert(&site_b, "file.md", &FileKind::Text); 597 + // Repo B re-asserts the file (simulating an edit) 598 + let repo_b = manifest_from_snapshot(&base_snapshot).unwrap(); 599 + manifest_insert(&repo_b, "file.md", &FileKind::Text); 573 600 574 601 // Merge B into A — set should win over delete 575 - let sv_a = site_a.transact().state_vector(); 576 - let diff_b = site_b.transact().encode_diff_v1(&sv_a); 602 + let sv_a = repo_a.transact().state_vector(); 603 + let diff_b = repo_b.transact().encode_diff_v1(&sv_a); 577 604 if let Ok(update) = yrs::Update::decode_v1(&diff_b) { 578 - let _ = site_a.transact_mut().apply_update(update); 605 + let _ = repo_a.transact_mut().apply_update(update); 579 606 } 580 607 581 - let entries = manifest_entries(&site_a); 608 + let entries = manifest_entries(&repo_a); 582 609 assert!( 583 610 entries.contains_key("file.md"), 584 611 "edit wins: set should win over delete"
+11 -11
tests/e2e_tests.rs
··· 201 201 let rkey_b = unique_rkey("e2e-merge-b"); 202 202 203 203 // Both collaborators start from the same content 204 - let site_a = tempfile::tempdir().unwrap(); 205 - let site_b = tempfile::tempdir().unwrap(); 204 + let repo_a = tempfile::tempdir().unwrap(); 205 + let repo_b = tempfile::tempdir().unwrap(); 206 206 write_file( 207 - site_a.path(), 207 + repo_a.path(), 208 208 "shared.md", 209 209 "# Shared\n\nOriginal content.\n", 210 210 ) 211 211 .await; 212 212 write_file( 213 - site_b.path(), 213 + repo_b.path(), 214 214 "shared.md", 215 215 "# Shared\n\nOriginal content.\n", 216 216 ) 217 217 .await; 218 218 219 219 // Save initial state for both 220 - pds_yrs::save(site_a.path(), &client, &did, &rkey_a, false) 220 + pds_yrs::save(repo_a.path(), &client, &did, &rkey_a, false) 221 221 .await 222 222 .unwrap(); 223 - pds_yrs::save(site_b.path(), &client, &did, &rkey_b, false) 223 + pds_yrs::save(repo_b.path(), &client, &did, &rkey_b, false) 224 224 .await 225 225 .unwrap(); 226 226 227 227 // Collaborator A appends 228 228 write_file( 229 - site_a.path(), 229 + repo_a.path(), 230 230 "shared.md", 231 231 "# Shared\n\nOriginal content.\n\nAlice's addition.\n", 232 232 ) 233 233 .await; 234 - pds_yrs::save(site_a.path(), &client, &did, &rkey_a, false) 234 + pds_yrs::save(repo_a.path(), &client, &did, &rkey_a, false) 235 235 .await 236 236 .unwrap(); 237 237 238 238 // Collaborator B edits first paragraph 239 239 write_file( 240 - site_b.path(), 240 + repo_b.path(), 241 241 "shared.md", 242 242 "# Shared\n\nBob's edit to original content.\n", 243 243 ) 244 244 .await; 245 - pds_yrs::save(site_b.path(), &client, &did, &rkey_b, false) 245 + pds_yrs::save(repo_b.path(), &client, &did, &rkey_b, false) 246 246 .await 247 247 .unwrap(); 248 248 249 249 // Merge both 250 250 let merged = tempfile::tempdir().unwrap(); 251 - pds_yrs::merge_sites(&client, &did, &[&rkey_a, &rkey_b], merged.path(), false) 251 + pds_yrs::merge_repos(&client, &did, &[&rkey_a, &rkey_b], merged.path(), false) 252 252 .await 253 253 .unwrap(); 254 254