···11+[package]
22+name = "pds-git-remote"
33+version = "0.1.0"
44+edition = "2024"
55+66+[workspace]
77+88+[dependencies]
99+reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false }
1010+serde = { version = "1.0", features = ["derive"] }
1111+serde_json = "1.0"
1212+tempfile = "3.13.0"
1313+tokio = { version = "1", features = ["full"] }
1414+tracing = "0.1"
1515+1616+[dev-dependencies]
1717+tempfile = "3.13.0"
+186
plan.md
···11+# pds-git-remote
22+33+A Rust crate providing PDS-backed git remote functionality — uses AT Protocol PDS as a git backup backend via incremental bundles.
44+55+See `pds-git-plan.md` for the full architectural rationale and design document.
66+77+---
88+99+## Overview
1010+1111+`pds-git-remote` is a new crate in the lichen workspace (`crates/pds-git-remote`). It provides:
1212+1313+1. **A library** — core types, PDS API client, bundle operations, and push/pull logic that lichen-cms (and other crates) can call directly
1414+2. **A git remote helper binary** (`git-remote-pds`) — so developers can `git push pds main` from the CLI
1515+1616+The crate shells out to the `git` binary for bundle creation/application (consistent with how `lichen-git` already works). PDS interaction uses HTTP via `reqwest`.
1717+1818+---
1919+2020+## Phase 1: Core types and PDS client
2121+2222+Foundation layer — types that model the PDS state record and an HTTP client for the PDS XRPC API.
2323+2424+- [x] create crate skeleton (`crates/pds-git-remote`, add to workspace `Cargo.toml`)
2525+- [x] define core types in `types.rs`:
2626+ - `GitRef { name, sha }`
2727+ - `BundleEntry { parts (blob CIDs), prerequisites, tips, total_size, created_at }`
2828+ - `RepoState { name, refs, bundles, updated_at }` — mirrors the `sh.pdsbackup.git.state` lexicon
2929+- [x] implement PDS XRPC client in `pds_client.rs`:
3030+ - `get_record(did, collection, rkey)` → fetch a record
3131+ - `put_record(did, collection, rkey, record)` → create or update a record
3232+ - `upload_blob(data)` → upload bytes, return blob ref/CID
3333+ - `get_blob(did, cid)` → download blob bytes (unauthenticated)
3434+ - auth: accept a bearer token (access token), with a builder pattern for optional auth
3535+- [x] implement AT Protocol identity resolution in `identity.rs`:
3636+ - resolve handle → DID (via `com.atproto.identity.resolveHandle` or DNS)
3737+ - resolve DID → PDS endpoint (via DID document / PLC directory)
3838+- [x] add unit tests for types serialization and identity resolution
3939+4040+**Dependencies to add:** `reqwest`, `serde`, `serde_json`, `tokio`
4141+4242+---
4343+4444+## Phase 2: Bundle operations
4545+4646+Wrap git bundle commands and handle chunking for large bundles.
4747+4848+- [x] implement bundle operations in `bundle.rs`:
4949+ - `create_full_bundle(repo_path)` → create a full bundle of the entire repo, return bytes
5050+ - `create_incremental_bundle(repo_path, refs, since_commits)` → create bundle with only new commits
5151+ - `apply_bundle(repo_path, bundle_bytes)` → unbundle into a repo
5252+ - `verify_bundle(repo_path, bundle_bytes)` → check bundle prerequisites are satisfied
5353+- [x] implement chunking in `chunk.rs`:
5454+ - `chunk_bytes(data, chunk_size)` → split into <=40MB parts
5555+ - `reassemble_chunks(parts)` → concatenate in order
5656+- [x] add integration tests (create temp repos, make commits, bundle/unbundle, verify content)
5757+5858+---
5959+6060+## Phase 3: Push flow
6161+6262+Combine the PDS client and bundle operations into a complete push.
6363+6464+- [ ] implement push logic in `push.rs`:
6565+ - read current state record from PDS (or handle first-push where none exists)
6666+ - determine what's new: compare local refs to remote refs
6767+ - create incremental bundle (or full bundle on first push)
6868+ - chunk if needed, upload blob(s) via `upload_blob`
6969+ - append new `BundleEntry` to state, update refs
7070+ - write updated state record via `put_record`
7171+- [ ] handle edge cases:
7272+ - first push (no existing state record) → create full bundle + new record
7373+ - nothing to push (refs match) → no-op
7474+ - non-fast-forward → reject with error (no force push for now)
7575+- [ ] add integration tests with a mock PDS server (or test against local PDS)
7676+7777+---
7878+7979+## Phase 3.1: Local PDS test environment
8080+8181+Set up a local PDS server via Docker for integration testing. Scripts live in `scripts/pds-dev/`.
8282+8383+- [ ] create `scripts/pds-dev/compose.yaml`:
8484+ - single service: `ghcr.io/bluesky-social/pds:0.4` on port 3000
8585+ - volume mount `./pds-data:/pds` for persistence
8686+ - env_file pointing to `pds.env`
8787+- [ ] create `scripts/pds-dev/setup.sh`:
8888+ - generate secrets (`PDS_JWT_SECRET`, `PDS_ADMIN_PASSWORD`, rotation key)
8989+ - write `pds.env` with local-dev defaults (`PDS_HOSTNAME=localhost`, `PDS_DEV_MODE=true`, `PDS_INVITE_REQUIRED=false`)
9090+ - create data directory
9191+ - print admin password for reference
9292+- [ ] create `scripts/pds-dev/start.sh`:
9393+ - run `setup.sh` if `pds.env` doesn't exist yet
9494+ - `docker compose up -d`
9595+ - wait for health check (`/xrpc/_health`) with timeout
9696+- [ ] create `scripts/pds-dev/create-account.sh`:
9797+ - accept handle and password as args (defaults: `test.localhost` / `test-password-123`)
9898+ - call `com.atproto.server.createAccount` XRPC endpoint
9999+ - print the DID and access token
100100+- [ ] create `scripts/pds-dev/login.sh`:
101101+ - call `com.atproto.server.createSession` for a given handle/password
102102+ - print `accessJwt` for use in manual testing or piping to other scripts
103103+- [ ] create `scripts/pds-dev/stop.sh`:
104104+ - `docker compose down`
105105+- [ ] create `scripts/pds-dev/reset.sh`:
106106+ - `docker compose down -v` and `rm -rf pds-data` to wipe all state
107107+- [ ] add `scripts/pds-dev/README.md` with quick-start instructions
108108+- [ ] add `scripts/pds-dev/` to `.gitignore` for `pds-data/` and `pds.env` (generated secrets)
109109+110110+---
111111+112112+## Phase 4: Clone and fetch flow
113113+114114+Download bundle chain from PDS and apply to local repo.
115115+116116+- [ ] implement fetch/clone logic in `fetch.rs`:
117117+ - resolve `pds://handle/repo-name` into DID + PDS endpoint + rkey
118118+ - read state record → get bundle chain and refs
119119+ - for clone: download all bundles in order (oldest first), apply each
120120+ - for fetch: compare local refs to remote, skip bundles whose tips we already have, download and apply only new ones
121121+ - set up local refs from state record
122122+- [ ] handle chunked bundles (reassemble parts before unbundling)
123123+- [ ] add integration tests
124124+125125+---
126126+127127+## Phase 5: Git remote helper binary
128128+129129+Implement the `git-remote-pds` binary that speaks git's remote helper protocol on stdin/stdout.
130130+131131+- [ ] add `[[bin]]` target to `Cargo.toml` for `git-remote-pds`
132132+- [ ] implement remote helper protocol in `remote_helper.rs`:
133133+ - `capabilities` → respond with `push` and `fetch`
134134+ - `list` / `list for-push` → read refs from PDS state record
135135+ - `fetch <sha> <ref>` → download and apply bundle chain
136136+ - `push <src>:<dst>` → create bundle, upload, update state
137137+- [ ] implement CLI auth in `auth.rs`:
138138+ - `pds-git auth login` → open browser for atproto OAuth, cache token locally
139139+ - token storage in `~/.config/pds-git-remote/` or platform-appropriate config dir
140140+ - token refresh on expiry
141141+- [ ] add `clap`-based CLI for auth subcommands
142142+- [ ] end-to-end test: init repo, add remote, push, clone elsewhere, verify content matches
143143+144144+---
145145+146146+## Phase 6: Library interface for lichen-cms
147147+148148+Expose a high-level API that lichen-cms can call with an existing OAuth session.
149149+150150+- [ ] implement library API in `lib.rs` (public interface):
151151+ - `PdsBackup::new(pds_endpoint, auth_token, repo_path, repo_name)` — constructor
152152+ - `push()` — push new commits to PDS
153153+ - `pull()` — pull latest from PDS into local repo
154154+ - `status()` → `{ ahead, behind, last_push }` — compare local vs remote state
155155+ - `compact()` — replace bundle chain with single full bundle
156156+- [ ] integrate with `lichen-git`:
157157+ - `git_push` placeholder in `lichen-git/src/git.rs` can call into `pds-git-remote`
158158+ - auto-commit flow triggers `PdsBackup::push()` on a debounce timer
159159+- [ ] add settings support:
160160+ - extend `lichen-git` settings or add `[pds_backup]` section to site config
161161+ - fields: `handle`, `repo_name`, `enabled`
162162+163163+---
164164+165165+## Phase 7: Compaction and polish
166166+167167+- [ ] implement bundle chain compaction:
168168+ - `compact()` → create full bundle (`git bundle create --all`), upload, replace state
169169+ - auto-compaction trigger when chain length exceeds threshold (e.g. 25 entries)
170170+- [ ] error recovery:
171171+ - resume interrupted uploads (track which blobs were uploaded before record write)
172172+ - handle PDS downtime gracefully (queue pushes, retry with backoff)
173173+- [ ] progress reporting:
174174+ - callback/channel-based progress for UI integration (upload %, bundle creation, etc.)
175175+- [ ] documentation and examples
176176+177177+---
178178+179179+## Open questions
180180+181181+Carried forward from pds-git-plan.md — to be resolved as we go:
182182+183183+- **Lexicon NSID**: `sh.pdsbackup.git.state` is a placeholder. Needs a domain we control for the real NSID.
184184+- **Private repos**: all PDS data is currently public. Only suitable for public repos until atproto ships private data.
185185+- **Force push**: currently planned as "reject". May revisit (full re-upload approach).
186186+- **Multiple branches**: current design tracks all refs in one state record. Should work naturally but needs testing.
+206
src/bundle.rs
···11+//! Git bundle operations.
22+//!
33+//! Wraps `git bundle` CLI commands to create and apply bundles.
44+//! Bundles are the serialization format used to store repository
55+//! snapshots as PDS blobs.
66+77+use std::path::Path;
88+use tokio::process::Command;
99+1010+/// Result of creating a bundle, including the raw bytes and metadata
1111+/// needed to build a `BundleEntry`.
1212+#[derive(Debug)]
1313+pub struct CreatedBundle {
1414+ /// raw bundle file bytes
1515+ pub data: Vec<u8>,
1616+ /// commit SHAs that the receiver must already have
1717+ pub prerequisites: Vec<String>,
1818+ /// commit SHAs this bundle provides up to (the tip commits)
1919+ pub tips: Vec<String>,
2020+}
2121+2222+/// Creates a full bundle containing the entire repository history.
2323+///
2424+/// Equivalent to `git bundle create <file> --all`. The bundle includes
2525+/// all branches and tags, and has no prerequisites.
2626+pub async fn create_full_bundle(repo_path: &Path) -> Result<CreatedBundle, String> {
2727+ let tmp =
2828+ tempfile::NamedTempFile::new().map_err(|e| format!("failed to create temp file: {}", e))?;
2929+ let bundle_path = tmp.path();
3030+3131+ // create a full bundle with all refs
3232+ let output = Command::new("git")
3333+ .args(["bundle", "create", bundle_path.to_str().unwrap(), "--all"])
3434+ .current_dir(repo_path)
3535+ .output()
3636+ .await
3737+ .map_err(|e| format!("failed to run git bundle create: {}", e))?;
3838+3939+ if !output.status.success() {
4040+ let stderr = String::from_utf8_lossy(&output.stderr);
4141+ return Err(format!("git bundle create failed: {}", stderr.trim()));
4242+ }
4343+4444+ let data = tokio::fs::read(bundle_path)
4545+ .await
4646+ .map_err(|e| format!("failed to read bundle file: {}", e))?;
4747+4848+ // get the tip commits (all branch heads)
4949+ let tips = get_branch_tips(repo_path).await?;
5050+5151+ Ok(CreatedBundle {
5252+ data,
5353+ prerequisites: vec![],
5454+ tips,
5555+ })
5656+}
5757+5858+/// Creates an incremental bundle containing only commits since the given SHAs.
5959+///
6060+/// `refs` is the list of refs to include (e.g. `["refs/heads/main"]`).
6161+/// `since_commits` are the prerequisite commits the receiver must already
6262+/// have — the bundle contains everything reachable from `refs` that is
6363+/// not reachable from `since_commits`.
6464+pub async fn create_incremental_bundle(
6565+ repo_path: &Path,
6666+ refs: &[&str],
6767+ since_commits: &[&str],
6868+) -> Result<CreatedBundle, String> {
6969+ let tmp =
7070+ tempfile::NamedTempFile::new().map_err(|e| format!("failed to create temp file: {}", e))?;
7171+ let bundle_path = tmp.path();
7272+7373+ // build args: git bundle create <file> <refs...> ^<since...>
7474+ let mut args = vec![
7575+ "bundle".to_string(),
7676+ "create".to_string(),
7777+ bundle_path.to_str().unwrap().to_string(),
7878+ ];
7979+8080+ for r in refs {
8181+ args.push(r.to_string());
8282+ }
8383+ for sha in since_commits {
8484+ args.push(format!("^{}", sha));
8585+ }
8686+8787+ let output = Command::new("git")
8888+ .args(&args)
8989+ .current_dir(repo_path)
9090+ .output()
9191+ .await
9292+ .map_err(|e| format!("failed to run git bundle create: {}", e))?;
9393+9494+ if !output.status.success() {
9595+ let stderr = String::from_utf8_lossy(&output.stderr);
9696+ return Err(format!(
9797+ "git bundle create (incremental) failed: {}",
9898+ stderr.trim()
9999+ ));
100100+ }
101101+102102+ let data = tokio::fs::read(bundle_path)
103103+ .await
104104+ .map_err(|e| format!("failed to read bundle file: {}", e))?;
105105+106106+ // resolve ref names to their current SHAs for the tips
107107+ let tips = resolve_refs_to_shas(repo_path, refs).await?;
108108+109109+ Ok(CreatedBundle {
110110+ data,
111111+ prerequisites: since_commits.iter().map(|s| s.to_string()).collect(),
112112+ tips,
113113+ })
114114+}
115115+116116+/// Applies a bundle to a repository.
117117+///
118118+/// Writes the bundle bytes to a temp file and runs `git bundle unbundle`.
119119+/// The caller should set up refs afterward if needed.
120120+pub async fn apply_bundle(repo_path: &Path, bundle_bytes: &[u8]) -> Result<(), String> {
121121+ let tmp =
122122+ tempfile::NamedTempFile::new().map_err(|e| format!("failed to create temp file: {}", e))?;
123123+ let bundle_path = tmp.path();
124124+125125+ tokio::fs::write(bundle_path, bundle_bytes)
126126+ .await
127127+ .map_err(|e| format!("failed to write bundle to temp file: {}", e))?;
128128+129129+ let output = Command::new("git")
130130+ .args(["bundle", "unbundle", bundle_path.to_str().unwrap()])
131131+ .current_dir(repo_path)
132132+ .output()
133133+ .await
134134+ .map_err(|e| format!("failed to run git bundle unbundle: {}", e))?;
135135+136136+ if !output.status.success() {
137137+ let stderr = String::from_utf8_lossy(&output.stderr);
138138+ return Err(format!("git bundle unbundle failed: {}", stderr.trim()));
139139+ }
140140+141141+ Ok(())
142142+}
143143+144144+/// Verifies that a bundle's prerequisites are satisfied by the local repo.
145145+///
146146+/// Returns Ok(true) if the bundle can be applied, Ok(false) if
147147+/// prerequisites are missing.
148148+pub async fn verify_bundle(repo_path: &Path, bundle_bytes: &[u8]) -> Result<bool, String> {
149149+ let tmp =
150150+ tempfile::NamedTempFile::new().map_err(|e| format!("failed to create temp file: {}", e))?;
151151+ let bundle_path = tmp.path();
152152+153153+ tokio::fs::write(bundle_path, bundle_bytes)
154154+ .await
155155+ .map_err(|e| format!("failed to write bundle to temp file: {}", e))?;
156156+157157+ let output = Command::new("git")
158158+ .args(["bundle", "verify", bundle_path.to_str().unwrap()])
159159+ .current_dir(repo_path)
160160+ .output()
161161+ .await
162162+ .map_err(|e| format!("failed to run git bundle verify: {}", e))?;
163163+164164+ Ok(output.status.success())
165165+}
166166+167167+/// Returns the current tip commit SHAs for all local branches.
168168+async fn get_branch_tips(repo_path: &Path) -> Result<Vec<String>, String> {
169169+ let output = Command::new("git")
170170+ .args(["for-each-ref", "--format=%(objectname)", "refs/heads/"])
171171+ .current_dir(repo_path)
172172+ .output()
173173+ .await
174174+ .map_err(|e| format!("failed to run git for-each-ref: {}", e))?;
175175+176176+ if !output.status.success() {
177177+ let stderr = String::from_utf8_lossy(&output.stderr);
178178+ return Err(format!("git for-each-ref failed: {}", stderr.trim()));
179179+ }
180180+181181+ let stdout = String::from_utf8_lossy(&output.stdout);
182182+ let tips: Vec<String> = stdout.lines().map(|l| l.trim().to_string()).collect();
183183+ Ok(tips)
184184+}
185185+186186+/// Resolves ref names to their current commit SHAs.
187187+async fn resolve_refs_to_shas(repo_path: &Path, refs: &[&str]) -> Result<Vec<String>, String> {
188188+ let mut shas = Vec::with_capacity(refs.len());
189189+ for r in refs {
190190+ let output = Command::new("git")
191191+ .args(["rev-parse", r])
192192+ .current_dir(repo_path)
193193+ .output()
194194+ .await
195195+ .map_err(|e| format!("failed to run git rev-parse {}: {}", r, e))?;
196196+197197+ if !output.status.success() {
198198+ let stderr = String::from_utf8_lossy(&output.stderr);
199199+ return Err(format!("git rev-parse {} failed: {}", r, stderr.trim()));
200200+ }
201201+202202+ let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
203203+ shas.push(sha);
204204+ }
205205+ Ok(shas)
206206+}
+91
src/chunk.rs
···11+//! Bundle chunking for large blobs.
22+//!
33+//! PDS blob uploads have a size limit (~50MB on Bluesky's PDS).
44+//! Bundles larger than the chunk size are split into parts, uploaded
55+//! as separate blobs, and reassembled on download.
66+77+/// Default chunk size: 40MB (headroom under the 50MB PDS limit).
88+pub const DEFAULT_CHUNK_SIZE: usize = 40 * 1024 * 1024;
99+1010+/// Splits data into chunks of at most `chunk_size` bytes.
1111+///
1212+/// Returns a vec of byte slices. If the data fits in one chunk,
1313+/// returns a single-element vec (no overhead for the common case).
1414+pub fn chunk_bytes(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
1515+ if data.is_empty() {
1616+ return vec![data];
1717+ }
1818+ data.chunks(chunk_size).collect()
1919+}
2020+2121+/// Reassembles chunks into a single contiguous byte vector.
2222+///
2323+/// Chunks must be in order (as returned by `chunk_bytes`).
2424+pub fn reassemble_chunks(parts: &[Vec<u8>]) -> Vec<u8> {
2525+ let total: usize = parts.iter().map(|p| p.len()).sum();
2626+ let mut result = Vec::with_capacity(total);
2727+ for part in parts {
2828+ result.extend_from_slice(part);
2929+ }
3030+ result
3131+}
3232+3333+/// Returns true if the data exceeds the given chunk size and needs splitting.
3434+pub fn needs_chunking(data: &[u8], chunk_size: usize) -> bool {
3535+ data.len() > chunk_size
3636+}
3737+3838+#[cfg(test)]
3939+mod tests {
4040+ use super::*;
4141+4242+ #[test]
4343+ fn small_data_single_chunk() {
4444+ let data = b"hello world";
4545+ let chunks = chunk_bytes(data, DEFAULT_CHUNK_SIZE);
4646+ assert_eq!(chunks.len(), 1);
4747+ assert_eq!(chunks[0], b"hello world");
4848+ }
4949+5050+ #[test]
5151+ fn exact_chunk_size() {
5252+ let data = vec![0u8; 100];
5353+ let chunks = chunk_bytes(&data, 100);
5454+ assert_eq!(chunks.len(), 1);
5555+ assert_eq!(chunks[0].len(), 100);
5656+ }
5757+5858+ #[test]
5959+ fn splits_into_multiple_chunks() {
6060+ let data = vec![0u8; 250];
6161+ let chunks = chunk_bytes(&data, 100);
6262+ assert_eq!(chunks.len(), 3);
6363+ assert_eq!(chunks[0].len(), 100);
6464+ assert_eq!(chunks[1].len(), 100);
6565+ assert_eq!(chunks[2].len(), 50);
6666+ }
6767+6868+ #[test]
6969+ fn reassemble_round_trip() {
7070+ let original = (0..255u8).cycle().take(1000).collect::<Vec<_>>();
7171+ let chunks = chunk_bytes(&original, 300);
7272+ let parts: Vec<Vec<u8>> = chunks.iter().map(|c| c.to_vec()).collect();
7373+ let reassembled = reassemble_chunks(&parts);
7474+ assert_eq!(original, reassembled);
7575+ }
7676+7777+ #[test]
7878+ fn empty_data() {
7979+ let data = b"";
8080+ let chunks = chunk_bytes(data, DEFAULT_CHUNK_SIZE);
8181+ assert_eq!(chunks.len(), 1);
8282+ assert_eq!(chunks[0].len(), 0);
8383+ }
8484+8585+ #[test]
8686+ fn needs_chunking_check() {
8787+ assert!(!needs_chunking(b"small", DEFAULT_CHUNK_SIZE));
8888+ let big = vec![0u8; DEFAULT_CHUNK_SIZE + 1];
8989+ assert!(needs_chunking(&big, DEFAULT_CHUNK_SIZE));
9090+ }
9191+}
+201
src/identity.rs
···11+//! AT Protocol identity resolution.
22+//!
33+//! Resolves atproto handles to DIDs and DIDs to PDS service endpoints,
44+//! following the standard AT Protocol identity resolution flow.
55+66+use serde::Deserialize;
77+88+/// Resolved identity: a DID and the PDS endpoint that hosts it.
99+#[derive(Debug, Clone)]
1010+pub struct ResolvedIdentity {
1111+ pub did: String,
1212+ pub pds_url: String,
1313+}
1414+1515+/// Response from `com.atproto.identity.resolveHandle`.
1616+#[derive(Debug, Deserialize)]
1717+struct ResolveHandleResponse {
1818+ did: String,
1919+}
2020+2121+/// A DID document returned by the PLC directory.
2222+#[derive(Debug, Deserialize)]
2323+struct DidDocument {
2424+ #[serde(default)]
2525+ service: Vec<DidService>,
2626+}
2727+2828+/// A service entry in a DID document.
2929+#[derive(Debug, Deserialize)]
3030+struct DidService {
3131+ id: String,
3232+ #[serde(rename = "type")]
3333+ service_type: String,
3434+ #[serde(rename = "serviceEndpoint")]
3535+ service_endpoint: String,
3636+}
3737+3838+/// Default PLC directory URL.
3939+const DEFAULT_PLC_DIRECTORY: &str = "https://plc.directory";
4040+4141+/// Resolves an AT Protocol handle to a DID.
4242+///
4343+/// Uses the `com.atproto.identity.resolveHandle` XRPC endpoint on the
4444+/// given PDS (or a public one). If `pds_url` is None, uses the handle's
4545+/// own domain as the resolution source.
4646+pub async fn resolve_handle(handle: &str, pds_url: Option<&str>) -> Result<String, String> {
4747+ let http = reqwest::Client::new();
4848+4949+ // try resolving via the provided PDS, or fall back to the handle's domain
5050+ let base = match pds_url {
5151+ Some(url) => url.trim_end_matches('/').to_string(),
5252+ None => format!("https://{}", handle),
5353+ };
5454+5555+ let url = format!(
5656+ "{}/xrpc/com.atproto.identity.resolveHandle?handle={}",
5757+ base, handle
5858+ );
5959+6060+ let resp = http
6161+ .get(&url)
6262+ .send()
6363+ .await
6464+ .map_err(|e| format!("resolveHandle request failed: {}", e))?;
6565+6666+ if !resp.status().is_success() {
6767+ let status = resp.status();
6868+ let body = resp.text().await.unwrap_or_default();
6969+ return Err(format!("resolveHandle failed ({}): {}", status, body));
7070+ }
7171+7272+ let parsed: ResolveHandleResponse = resp
7373+ .json()
7474+ .await
7575+ .map_err(|e| format!("failed to parse resolveHandle response: {}", e))?;
7676+7777+ Ok(parsed.did)
7878+}
7979+8080+/// Resolves a DID to its PDS service endpoint.
8181+///
8282+/// Looks up the DID document from the PLC directory (for `did:plc`) or
8383+/// fetches the DID document directly (for `did:web`), then extracts the
8484+/// `#atproto_pds` service endpoint.
8585+pub async fn resolve_pds_endpoint(
8686+ did: &str,
8787+ plc_directory: Option<&str>,
8888+) -> Result<String, String> {
8989+ let http = reqwest::Client::new();
9090+ let doc = fetch_did_document(&http, did, plc_directory).await?;
9191+9292+ // find the atproto PDS service endpoint
9393+ let pds_service = doc
9494+ .service
9595+ .iter()
9696+ .find(|s| s.id == "#atproto_pds" && s.service_type == "AtprotoPersonalDataServer")
9797+ .ok_or_else(|| format!("no #atproto_pds service found in DID document for {}", did))?;
9898+9999+ Ok(pds_service
100100+ .service_endpoint
101101+ .trim_end_matches('/')
102102+ .to_string())
103103+}
104104+105105+/// Resolves a handle to both its DID and PDS endpoint in one call.
106106+pub async fn resolve_identity(
107107+ handle: &str,
108108+ pds_url: Option<&str>,
109109+ plc_directory: Option<&str>,
110110+) -> Result<ResolvedIdentity, String> {
111111+ let did = resolve_handle(handle, pds_url).await?;
112112+ let pds_url = resolve_pds_endpoint(&did, plc_directory).await?;
113113+ Ok(ResolvedIdentity { did, pds_url })
114114+}
115115+116116+/// Fetches a DID document for the given DID.
117117+async fn fetch_did_document(
118118+ http: &reqwest::Client,
119119+ did: &str,
120120+ plc_directory: Option<&str>,
121121+) -> Result<DidDocument, String> {
122122+ let url = if did.starts_with("did:plc:") {
123123+ let plc = plc_directory.unwrap_or(DEFAULT_PLC_DIRECTORY);
124124+ format!("{}/{}", plc.trim_end_matches('/'), did)
125125+ } else if did.starts_with("did:web:") {
126126+ // did:web:example.com → https://example.com/.well-known/did.json
127127+ let domain = did.strip_prefix("did:web:").unwrap();
128128+ format!("https://{}/.well-known/did.json", domain)
129129+ } else {
130130+ return Err(format!("unsupported DID method: {}", did));
131131+ };
132132+133133+ let resp = http
134134+ .get(&url)
135135+ .send()
136136+ .await
137137+ .map_err(|e| format!("failed to fetch DID document for {}: {}", did, e))?;
138138+139139+ if !resp.status().is_success() {
140140+ let status = resp.status();
141141+ let body = resp.text().await.unwrap_or_default();
142142+ return Err(format!(
143143+ "failed to fetch DID document for {} ({}): {}",
144144+ did, status, body
145145+ ));
146146+ }
147147+148148+ resp.json()
149149+ .await
150150+ .map_err(|e| format!("failed to parse DID document for {}: {}", did, e))
151151+}
152152+153153+#[cfg(test)]
154154+mod tests {
155155+ use super::*;
156156+157157+ #[test]
158158+ fn did_plc_url_construction() {
159159+ let did = "did:plc:abc123";
160160+ let url = format!("{}/{}", DEFAULT_PLC_DIRECTORY, did);
161161+ assert_eq!(url, "https://plc.directory/did:plc:abc123");
162162+ }
163163+164164+ #[test]
165165+ fn did_web_url_construction() {
166166+ let did = "did:web:example.com";
167167+ let domain = did.strip_prefix("did:web:").unwrap();
168168+ let url = format!("https://{}/.well-known/did.json", domain);
169169+ assert_eq!(url, "https://example.com/.well-known/did.json");
170170+ }
171171+172172+ #[test]
173173+ fn parse_did_document_with_pds_service() {
174174+ let json = serde_json::json!({
175175+ "id": "did:plc:abc123",
176176+ "service": [
177177+ {
178178+ "id": "#atproto_pds",
179179+ "type": "AtprotoPersonalDataServer",
180180+ "serviceEndpoint": "https://pds.example.com"
181181+ }
182182+ ]
183183+ });
184184+185185+ let doc: DidDocument = serde_json::from_value(json).unwrap();
186186+ assert_eq!(doc.service.len(), 1);
187187+ assert_eq!(doc.service[0].id, "#atproto_pds");
188188+ assert_eq!(doc.service[0].service_endpoint, "https://pds.example.com");
189189+ }
190190+191191+ #[test]
192192+ fn parse_did_document_without_pds_service() {
193193+ let json = serde_json::json!({
194194+ "id": "did:plc:abc123",
195195+ "service": []
196196+ });
197197+198198+ let doc: DidDocument = serde_json::from_value(json).unwrap();
199199+ assert!(doc.service.is_empty());
200200+ }
201201+}
+11
src/lib.rs
···11+//! pds-git-remote: PDS-backed git remote via incremental bundles.
22+//!
33+//! Uses AT Protocol PDS as a git backup backend. Stores repositories
44+//! as chains of incremental git bundles uploaded as PDS blobs, tracked
55+//! by a single mutable state record.
66+77+pub mod bundle;
88+pub mod chunk;
99+pub mod identity;
1010+pub mod pds_client;
1111+pub mod types;
+324
src/pds_client.rs
···11+//! HTTP client for the PDS XRPC API.
22+//!
33+//! Wraps the AT Protocol XRPC endpoints needed for git backup:
44+//! record CRUD, blob upload/download, and session creation.
55+66+use crate::types::{BlobRef, CidLink};
77+use serde::{Deserialize, Serialize};
88+99+/// Client for interacting with a PDS server over XRPC.
1010+///
1111+/// Supports both authenticated (push) and unauthenticated (clone/fetch)
1212+/// operations. Use `PdsClient::new` for unauthenticated access and
1313+/// `PdsClient::with_auth` when a bearer token is available.
1414+#[derive(Debug, Clone)]
1515+pub struct PdsClient {
1616+ /// base URL of the PDS, e.g. "https://bsky.social"
1717+ base_url: String,
1818+ /// bearer token for authenticated requests (access JWT)
1919+ auth_token: Option<String>,
2020+ http: reqwest::Client,
2121+}
2222+2323+/// Response from `com.atproto.repo.getRecord`.
2424+#[derive(Debug, Deserialize)]
2525+pub struct GetRecordResponse {
2626+ pub uri: String,
2727+ pub cid: Option<String>,
2828+ pub value: serde_json::Value,
2929+}
3030+3131+/// Response from `com.atproto.repo.uploadBlob`.
3232+#[derive(Debug, Deserialize)]
3333+pub struct UploadBlobResponse {
3434+ pub blob: UploadedBlob,
3535+}
3636+3737+/// The blob object returned inside an `uploadBlob` response.
3838+#[derive(Debug, Deserialize)]
3939+pub struct UploadedBlob {
4040+ #[serde(rename = "$type")]
4141+ pub blob_type: String,
4242+ #[serde(rename = "ref")]
4343+ pub link: CidLink,
4444+ #[serde(rename = "mimeType")]
4545+ pub mime_type: String,
4646+ pub size: u64,
4747+}
4848+4949+/// Request body for `com.atproto.repo.putRecord`.
5050+#[derive(Debug, Serialize)]
5151+pub struct PutRecordRequest {
5252+ pub repo: String,
5353+ pub collection: String,
5454+ pub rkey: String,
5555+ pub record: serde_json::Value,
5656+ /// swap with the current CID to prevent race conditions
5757+ #[serde(rename = "swapRecord", skip_serializing_if = "Option::is_none")]
5858+ pub swap_record: Option<String>,
5959+}
6060+6161+/// Response from `com.atproto.repo.putRecord`.
6262+#[derive(Debug, Deserialize)]
6363+pub struct PutRecordResponse {
6464+ pub uri: String,
6565+ pub cid: String,
6666+}
6767+6868+/// Response from `com.atproto.server.createSession`.
6969+#[derive(Debug, Deserialize)]
7070+pub struct CreateSessionResponse {
7171+ pub did: String,
7272+ #[serde(rename = "accessJwt")]
7373+ pub access_jwt: String,
7474+ #[serde(rename = "refreshJwt")]
7575+ pub refresh_jwt: String,
7676+ pub handle: String,
7777+}
7878+7979+/// Error response from PDS XRPC endpoints.
8080+#[derive(Debug, Deserialize)]
8181+pub struct XrpcError {
8282+ pub error: String,
8383+ #[serde(default)]
8484+ pub message: String,
8585+}
8686+8787+impl std::fmt::Display for XrpcError {
8888+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8989+ write!(f, "{}: {}", self.error, self.message)
9090+ }
9191+}
9292+9393+impl PdsClient {
9494+ /// Creates an unauthenticated client for the given PDS base URL.
9595+ pub fn new(base_url: impl Into<String>) -> Self {
9696+ Self {
9797+ base_url: base_url.into().trim_end_matches('/').to_string(),
9898+ auth_token: None,
9999+ http: reqwest::Client::new(),
100100+ }
101101+ }
102102+103103+ /// Creates an authenticated client with a bearer token.
104104+ pub fn with_auth(base_url: impl Into<String>, token: impl Into<String>) -> Self {
105105+ Self {
106106+ base_url: base_url.into().trim_end_matches('/').to_string(),
107107+ auth_token: Some(token.into()),
108108+ http: reqwest::Client::new(),
109109+ }
110110+ }
111111+112112+ /// Sets or replaces the auth token.
113113+ pub fn set_auth(&mut self, token: impl Into<String>) {
114114+ self.auth_token = Some(token.into());
115115+ }
116116+117117+ /// Returns the base URL of the PDS.
118118+ pub fn base_url(&self) -> &str {
119119+ &self.base_url
120120+ }
121121+122122+ /// Logs in with handle/password and stores the resulting access token.
123123+ ///
124124+ /// Calls `com.atproto.server.createSession`.
125125+ pub async fn login(
126126+ &mut self,
127127+ identifier: &str,
128128+ password: &str,
129129+ ) -> Result<CreateSessionResponse, String> {
130130+ let url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url);
131131+132132+ let body = serde_json::json!({
133133+ "identifier": identifier,
134134+ "password": password,
135135+ });
136136+137137+ let resp = self
138138+ .http
139139+ .post(&url)
140140+ .json(&body)
141141+ .send()
142142+ .await
143143+ .map_err(|e| format!("createSession request failed: {}", e))?;
144144+145145+ if !resp.status().is_success() {
146146+ let err = parse_xrpc_error(resp).await;
147147+ return Err(format!("createSession failed: {}", err));
148148+ }
149149+150150+ let session: CreateSessionResponse = resp
151151+ .json()
152152+ .await
153153+ .map_err(|e| format!("failed to parse createSession response: {}", e))?;
154154+155155+ self.auth_token = Some(session.access_jwt.clone());
156156+ Ok(session)
157157+ }
158158+159159+ /// Fetches a record from the PDS.
160160+ ///
161161+ /// Calls `com.atproto.repo.getRecord`. Returns `Ok(None)` if the
162162+ /// record does not exist (RecordNotFound).
163163+ pub async fn get_record(
164164+ &self,
165165+ did: &str,
166166+ collection: &str,
167167+ rkey: &str,
168168+ ) -> Result<Option<GetRecordResponse>, String> {
169169+ let url = format!(
170170+ "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
171171+ self.base_url, did, collection, rkey
172172+ );
173173+174174+ let resp = self
175175+ .http
176176+ .get(&url)
177177+ .send()
178178+ .await
179179+ .map_err(|e| format!("getRecord request failed: {}", e))?;
180180+181181+ // 400 with RecordNotFound means the record doesn't exist yet
182182+ if resp.status() == reqwest::StatusCode::BAD_REQUEST {
183183+ let body = resp.text().await.unwrap_or_default();
184184+ if body.contains("RecordNotFound") {
185185+ return Ok(None);
186186+ }
187187+ return Err(format!("getRecord failed: {}", body));
188188+ }
189189+190190+ if !resp.status().is_success() {
191191+ let err = parse_xrpc_error(resp).await;
192192+ return Err(format!("getRecord failed: {}", err));
193193+ }
194194+195195+ let record: GetRecordResponse = resp
196196+ .json()
197197+ .await
198198+ .map_err(|e| format!("failed to parse getRecord response: {}", e))?;
199199+200200+ Ok(Some(record))
201201+ }
202202+203203+ /// Creates or updates a record on the PDS.
204204+ ///
205205+ /// Calls `com.atproto.repo.putRecord`. Requires authentication.
206206+ pub async fn put_record(
207207+ &self,
208208+ did: &str,
209209+ collection: &str,
210210+ rkey: &str,
211211+ record: serde_json::Value,
212212+ swap_record: Option<String>,
213213+ ) -> Result<PutRecordResponse, String> {
214214+ let token = self
215215+ .auth_token
216216+ .as_ref()
217217+ .ok_or("putRecord requires authentication")?;
218218+219219+ let url = format!("{}/xrpc/com.atproto.repo.putRecord", self.base_url);
220220+221221+ let body = PutRecordRequest {
222222+ repo: did.to_string(),
223223+ collection: collection.to_string(),
224224+ rkey: rkey.to_string(),
225225+ record,
226226+ swap_record,
227227+ };
228228+229229+ let resp = self
230230+ .http
231231+ .post(&url)
232232+ .bearer_auth(token)
233233+ .json(&body)
234234+ .send()
235235+ .await
236236+ .map_err(|e| format!("putRecord request failed: {}", e))?;
237237+238238+ if !resp.status().is_success() {
239239+ let err = parse_xrpc_error(resp).await;
240240+ return Err(format!("putRecord failed: {}", err));
241241+ }
242242+243243+ resp.json()
244244+ .await
245245+ .map_err(|e| format!("failed to parse putRecord response: {}", e))
246246+ }
247247+248248+ /// Uploads a blob to the PDS.
249249+ ///
250250+ /// Calls `com.atproto.repo.uploadBlob`. Requires authentication.
251251+ /// Returns a `BlobRef` that can be embedded in a record.
252252+ pub async fn upload_blob(&self, data: Vec<u8>) -> Result<BlobRef, String> {
253253+ let token = self
254254+ .auth_token
255255+ .as_ref()
256256+ .ok_or("uploadBlob requires authentication")?;
257257+258258+ let url = format!("{}/xrpc/com.atproto.repo.uploadBlob", self.base_url);
259259+260260+ let resp = self
261261+ .http
262262+ .post(&url)
263263+ .bearer_auth(token)
264264+ .header("Content-Type", "application/octet-stream")
265265+ .body(data)
266266+ .send()
267267+ .await
268268+ .map_err(|e| format!("uploadBlob request failed: {}", e))?;
269269+270270+ if !resp.status().is_success() {
271271+ let err = parse_xrpc_error(resp).await;
272272+ return Err(format!("uploadBlob failed: {}", err));
273273+ }
274274+275275+ let upload: UploadBlobResponse = resp
276276+ .json()
277277+ .await
278278+ .map_err(|e| format!("failed to parse uploadBlob response: {}", e))?;
279279+280280+ Ok(BlobRef {
281281+ blob_type: upload.blob.blob_type,
282282+ link: upload.blob.link,
283283+ mime_type: upload.blob.mime_type,
284284+ size: upload.blob.size,
285285+ })
286286+ }
287287+288288+ /// Downloads a blob from the PDS.
289289+ ///
290290+ /// Calls `com.atproto.sync.getBlob`. This is unauthenticated —
291291+ /// anyone can download blobs if they know the DID and CID.
292292+ pub async fn get_blob(&self, did: &str, cid: &str) -> Result<Vec<u8>, String> {
293293+ let url = format!(
294294+ "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}",
295295+ self.base_url, did, cid
296296+ );
297297+298298+ let resp = self
299299+ .http
300300+ .get(&url)
301301+ .send()
302302+ .await
303303+ .map_err(|e| format!("getBlob request failed: {}", e))?;
304304+305305+ if !resp.status().is_success() {
306306+ let err = parse_xrpc_error(resp).await;
307307+ return Err(format!("getBlob failed: {}", err));
308308+ }
309309+310310+ resp.bytes()
311311+ .await
312312+ .map(|b| b.to_vec())
313313+ .map_err(|e| format!("failed to read getBlob response body: {}", e))
314314+ }
315315+}
316316+317317+/// Extracts an error message from a non-success XRPC response.
318318+async fn parse_xrpc_error(resp: reqwest::Response) -> String {
319319+ let status = resp.status();
320320+ match resp.json::<XrpcError>().await {
321321+ Ok(err) => format!("{} ({})", err, status),
322322+ Err(_) => format!("HTTP {}", status),
323323+ }
324324+}
+172
src/types.rs
···11+//! Core types mirroring the `sh.pdsbackup.git.state` lexicon.
22+//!
33+//! These types represent the PDS record and its nested structures for
44+//! tracking git repository backup state.
55+66+use serde::{Deserialize, Serialize};
77+88+/// A git ref (branch or tag) with its current commit SHA.
99+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1010+pub struct GitRef {
1111+ /// full ref name, e.g. "refs/heads/main"
1212+ pub name: String,
1313+ /// commit SHA the ref points to
1414+ pub sha: String,
1515+}
1616+1717+/// A reference to a blob stored on the PDS.
1818+///
1919+/// When reading from the PDS, the `ref` field contains a `$link` (CID).
2020+/// When writing, we include the blob reference returned by `uploadBlob`.
2121+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2222+pub struct BlobRef {
2323+ #[serde(rename = "$type")]
2424+ pub blob_type: String,
2525+ #[serde(rename = "ref")]
2626+ pub link: CidLink,
2727+ #[serde(rename = "mimeType")]
2828+ pub mime_type: String,
2929+ pub size: u64,
3030+}
3131+3232+/// A CID link as used in AT Protocol blob references.
3333+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3434+pub struct CidLink {
3535+ #[serde(rename = "$link")]
3636+ pub link: String,
3737+}
3838+3939+/// A single entry in the bundle chain.
4040+///
4141+/// Each push appends one of these. The chain is ordered oldest-first.
4242+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
4343+pub struct BundleEntry {
4444+ /// bundle blob(s) — usually one, multiple if chunked (>40MB)
4545+ pub parts: Vec<BlobRef>,
4646+ /// commit SHAs the receiver must have before applying this bundle
4747+ pub prerequisites: Vec<String>,
4848+ /// commit SHAs this bundle provides up to
4949+ pub tips: Vec<String>,
5050+ /// total size in bytes across all parts
5151+ #[serde(rename = "totalSize", skip_serializing_if = "Option::is_none")]
5252+ pub total_size: Option<u64>,
5353+ /// when this bundle was created
5454+ #[serde(rename = "createdAt")]
5555+ pub created_at: String,
5656+}
5757+5858+/// Top-level state record for a git repository backup on PDS.
5959+///
6060+/// Stored at `sh.pdsbackup.git.state/<repo-name>`.
6161+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6262+pub struct RepoState {
6363+ /// human-readable repo name, e.g. "my-site"
6464+ #[serde(skip_serializing_if = "Option::is_none")]
6565+ pub name: Option<String>,
6666+ /// current branch and tag refs
6767+ pub refs: Vec<GitRef>,
6868+ /// ordered bundle chain, oldest first
6969+ pub bundles: Vec<BundleEntry>,
7070+ /// when the record was last updated
7171+ #[serde(rename = "updatedAt")]
7272+ pub updated_at: String,
7373+}
7474+7575+/// The lexicon collection NSID for git backup state records.
7676+pub const COLLECTION: &str = "sh.pdsbackup.git.state";
7777+7878+impl BlobRef {
7979+ /// Creates a new blob reference from an upload response.
8080+ pub fn new(cid: String, mime_type: String, size: u64) -> Self {
8181+ Self {
8282+ blob_type: "blob".to_string(),
8383+ link: CidLink { link: cid },
8484+ mime_type,
8585+ size,
8686+ }
8787+ }
8888+8989+ /// Returns the CID string for this blob.
9090+ pub fn cid(&self) -> &str {
9191+ &self.link.link
9292+ }
9393+}
9494+9595+impl GitRef {
9696+ /// Creates a new git ref.
9797+ pub fn new(name: impl Into<String>, sha: impl Into<String>) -> Self {
9898+ Self {
9999+ name: name.into(),
100100+ sha: sha.into(),
101101+ }
102102+ }
103103+}
104104+105105+#[cfg(test)]
106106+mod tests {
107107+ use super::*;
108108+109109+ #[test]
110110+ fn repo_state_round_trip() {
111111+ let state = RepoState {
112112+ name: Some("my-site".to_string()),
113113+ refs: vec![
114114+ GitRef::new("refs/heads/main", "abc123"),
115115+ GitRef::new("refs/heads/draft", "def456"),
116116+ ],
117117+ bundles: vec![BundleEntry {
118118+ parts: vec![BlobRef::new(
119119+ "bafkreiabc".to_string(),
120120+ "application/octet-stream".to_string(),
121121+ 1024,
122122+ )],
123123+ prerequisites: vec![],
124124+ tips: vec!["abc123".to_string()],
125125+ total_size: Some(1024),
126126+ created_at: "2026-02-11T00:00:00Z".to_string(),
127127+ }],
128128+ updated_at: "2026-02-11T00:00:00Z".to_string(),
129129+ };
130130+131131+ let json = serde_json::to_string(&state).unwrap();
132132+ let parsed: RepoState = serde_json::from_str(&json).unwrap();
133133+ assert_eq!(state, parsed);
134134+ }
135135+136136+ #[test]
137137+ fn blob_ref_serializes_with_dollar_fields() {
138138+ let blob = BlobRef::new(
139139+ "bafkreixyz".to_string(),
140140+ "application/octet-stream".to_string(),
141141+ 2048,
142142+ );
143143+144144+ let json = serde_json::to_value(&blob).unwrap();
145145+ assert_eq!(json["$type"], "blob");
146146+ assert_eq!(json["ref"]["$link"], "bafkreixyz");
147147+ assert_eq!(json["mimeType"], "application/octet-stream");
148148+ assert_eq!(json["size"], 2048);
149149+ }
150150+151151+ #[test]
152152+ fn git_ref_round_trip() {
153153+ let r = GitRef::new("refs/heads/main", "deadbeef");
154154+ let json = serde_json::to_string(&r).unwrap();
155155+ let parsed: GitRef = serde_json::from_str(&json).unwrap();
156156+ assert_eq!(r, parsed);
157157+ }
158158+159159+ #[test]
160160+ fn bundle_entry_omits_none_total_size() {
161161+ let entry = BundleEntry {
162162+ parts: vec![],
163163+ prerequisites: vec![],
164164+ tips: vec![],
165165+ total_size: None,
166166+ created_at: "2026-02-11T00:00:00Z".to_string(),
167167+ };
168168+169169+ let json = serde_json::to_value(&entry).unwrap();
170170+ assert!(!json.as_object().unwrap().contains_key("totalSize"));
171171+ }
172172+}
+293
tests/bundle_tests.rs
···11+//! Integration tests for bundle creation, application, and chunking.
22+//!
33+//! Each test creates temporary git repos, makes commits, creates bundles,
44+//! and verifies that applying them reproduces the correct content.
55+66+use std::path::Path;
77+use tokio::fs;
88+99+use pds_git_remote::bundle::{
1010+ apply_bundle, create_full_bundle, create_incremental_bundle, verify_bundle,
1111+};
1212+use pds_git_remote::chunk::{chunk_bytes, reassemble_chunks};
1313+1414+/// Helper: write a file inside a directory.
1515+async fn write_file(dir: &Path, name: &str, content: &str) {
1616+ let path = dir.join(name);
1717+ if let Some(parent) = path.parent() {
1818+ fs::create_dir_all(parent).await.unwrap();
1919+ }
2020+ fs::write(&path, content).await.unwrap();
2121+}
2222+2323+/// Helper: read a file, returning None if missing.
2424+async fn read_file(dir: &Path, name: &str) -> Option<String> {
2525+ fs::read_to_string(dir.join(name)).await.ok()
2626+}
2727+2828+/// Helper: configure git author so commits work in CI.
2929+async fn configure_git(dir: &Path) {
3030+ tokio::process::Command::new("git")
3131+ .args(["config", "user.email", "test@test.com"])
3232+ .current_dir(dir)
3333+ .output()
3434+ .await
3535+ .unwrap();
3636+ tokio::process::Command::new("git")
3737+ .args(["config", "user.name", "Test"])
3838+ .current_dir(dir)
3939+ .output()
4040+ .await
4141+ .unwrap();
4242+}
4343+4444+/// Helper: init a git repo in a temp dir.
4545+async fn init_repo() -> tempfile::TempDir {
4646+ let tmp = tempfile::tempdir().unwrap();
4747+ tokio::process::Command::new("git")
4848+ .args(["init"])
4949+ .current_dir(tmp.path())
5050+ .output()
5151+ .await
5252+ .unwrap();
5353+ configure_git(tmp.path()).await;
5454+ tmp
5555+}
5656+5757+/// Helper: stage all and commit.
5858+async fn commit(dir: &Path, message: &str) {
5959+ tokio::process::Command::new("git")
6060+ .args(["add", "-A"])
6161+ .current_dir(dir)
6262+ .output()
6363+ .await
6464+ .unwrap();
6565+ let output = tokio::process::Command::new("git")
6666+ .args(["commit", "-m", message])
6767+ .current_dir(dir)
6868+ .output()
6969+ .await
7070+ .unwrap();
7171+ assert!(
7272+ output.status.success(),
7373+ "commit failed: {}",
7474+ String::from_utf8_lossy(&output.stderr)
7575+ );
7676+}
7777+7878+/// Helper: get the HEAD commit SHA.
7979+async fn head_sha(dir: &Path) -> String {
8080+ let output = tokio::process::Command::new("git")
8181+ .args(["rev-parse", "HEAD"])
8282+ .current_dir(dir)
8383+ .output()
8484+ .await
8585+ .unwrap();
8686+ String::from_utf8_lossy(&output.stdout).trim().to_string()
8787+}
8888+8989+/// Helper: get the current branch name (e.g. "master" or "main").
9090+async fn current_branch(dir: &Path) -> String {
9191+ let output = tokio::process::Command::new("git")
9292+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
9393+ .current_dir(dir)
9494+ .output()
9595+ .await
9696+ .unwrap();
9797+ String::from_utf8_lossy(&output.stdout).trim().to_string()
9898+}
9999+100100+/// Helper: init a bare repo for receiving bundles.
101101+async fn init_bare_repo() -> tempfile::TempDir {
102102+ let tmp = tempfile::tempdir().unwrap();
103103+ tokio::process::Command::new("git")
104104+ .args(["init", "--bare"])
105105+ .current_dir(tmp.path())
106106+ .output()
107107+ .await
108108+ .unwrap();
109109+ tmp
110110+}
111111+112112+/// Helper: clone a bare repo into a working directory so we can inspect files.
113113+async fn clone_bare_to_working(bare_path: &Path) -> tempfile::TempDir {
114114+ let tmp = tempfile::tempdir().unwrap();
115115+ let output = tokio::process::Command::new("git")
116116+ .args([
117117+ "clone",
118118+ bare_path.to_str().unwrap(),
119119+ tmp.path().to_str().unwrap(),
120120+ ])
121121+ .output()
122122+ .await
123123+ .unwrap();
124124+ assert!(
125125+ output.status.success(),
126126+ "clone failed: {}",
127127+ String::from_utf8_lossy(&output.stderr)
128128+ );
129129+ tmp
130130+}
131131+132132+#[tokio::test]
133133+async fn full_bundle_round_trip() {
134134+ let src = init_repo().await;
135135+136136+ // make a couple of commits
137137+ write_file(src.path(), "readme.md", "# Hello").await;
138138+ commit(src.path(), "initial commit").await;
139139+140140+ write_file(src.path(), "page.md", "content here").await;
141141+ commit(src.path(), "add page").await;
142142+143143+ let branch = current_branch(src.path()).await;
144144+145145+ // create full bundle
146146+ let bundle = create_full_bundle(src.path()).await.unwrap();
147147+ assert!(!bundle.data.is_empty());
148148+ assert!(bundle.prerequisites.is_empty());
149149+ assert!(!bundle.tips.is_empty());
150150+151151+ // apply to a fresh bare repo
152152+ let dst = init_bare_repo().await;
153153+ apply_bundle(dst.path(), &bundle.data).await.unwrap();
154154+155155+ // update the branch ref so we can clone it
156156+ let tip = &bundle.tips[0];
157157+ let refname = format!("refs/heads/{}", branch);
158158+ tokio::process::Command::new("git")
159159+ .args(["update-ref", &refname, tip])
160160+ .current_dir(dst.path())
161161+ .output()
162162+ .await
163163+ .unwrap();
164164+165165+ // clone the bare repo and verify content
166166+ let working = clone_bare_to_working(dst.path()).await;
167167+ assert_eq!(
168168+ read_file(working.path(), "readme.md").await.unwrap(),
169169+ "# Hello"
170170+ );
171171+ assert_eq!(
172172+ read_file(working.path(), "page.md").await.unwrap(),
173173+ "content here"
174174+ );
175175+}
176176+177177+#[tokio::test]
178178+async fn incremental_bundle_round_trip() {
179179+ let src = init_repo().await;
180180+181181+ // first commit
182182+ write_file(src.path(), "file1.txt", "version 1").await;
183183+ commit(src.path(), "first").await;
184184+ let sha_after_first = head_sha(src.path()).await;
185185+ let branch = current_branch(src.path()).await;
186186+ let refspec = format!("refs/heads/{}", branch);
187187+188188+ // create a full bundle of the initial state
189189+ let full_bundle = create_full_bundle(src.path()).await.unwrap();
190190+191191+ // second commit
192192+ write_file(src.path(), "file2.txt", "new file").await;
193193+ commit(src.path(), "second").await;
194194+195195+ // third commit
196196+ write_file(src.path(), "file1.txt", "version 2").await;
197197+ commit(src.path(), "third").await;
198198+199199+ // create incremental bundle since the first commit
200200+ let incr_bundle = create_incremental_bundle(src.path(), &[&refspec], &[&sha_after_first])
201201+ .await
202202+ .unwrap();
203203+ assert!(!incr_bundle.data.is_empty());
204204+ assert_eq!(incr_bundle.prerequisites, vec![sha_after_first.clone()]);
205205+ assert!(!incr_bundle.tips.is_empty());
206206+207207+ // apply both bundles to a fresh bare repo
208208+ let dst = init_bare_repo().await;
209209+ apply_bundle(dst.path(), &full_bundle.data).await.unwrap();
210210+ apply_bundle(dst.path(), &incr_bundle.data).await.unwrap();
211211+212212+ // set the branch ref to the incremental tip
213213+ let tip = &incr_bundle.tips[0];
214214+ tokio::process::Command::new("git")
215215+ .args(["update-ref", &refspec, tip])
216216+ .current_dir(dst.path())
217217+ .output()
218218+ .await
219219+ .unwrap();
220220+221221+ // clone and verify
222222+ let working = clone_bare_to_working(dst.path()).await;
223223+ assert_eq!(
224224+ read_file(working.path(), "file1.txt").await.unwrap(),
225225+ "version 2"
226226+ );
227227+ assert_eq!(
228228+ read_file(working.path(), "file2.txt").await.unwrap(),
229229+ "new file"
230230+ );
231231+}
232232+233233+#[tokio::test]
234234+async fn verify_bundle_checks_prerequisites() {
235235+ let src = init_repo().await;
236236+237237+ // two commits
238238+ write_file(src.path(), "a.txt", "aaa").await;
239239+ commit(src.path(), "first").await;
240240+ let sha1 = head_sha(src.path()).await;
241241+ let branch = current_branch(src.path()).await;
242242+ let refspec = format!("refs/heads/{}", branch);
243243+244244+ write_file(src.path(), "b.txt", "bbb").await;
245245+ commit(src.path(), "second").await;
246246+247247+ // incremental bundle requiring sha1
248248+ let incr = create_incremental_bundle(src.path(), &[&refspec], &[&sha1])
249249+ .await
250250+ .unwrap();
251251+252252+ // empty bare repo should fail verification (missing prerequisite)
253253+ let empty = init_bare_repo().await;
254254+ let ok = verify_bundle(empty.path(), &incr.data).await.unwrap();
255255+ assert!(!ok, "should fail: prerequisite not present");
256256+257257+ // after applying the full bundle, verification should pass
258258+ let full = create_full_bundle(src.path()).await.unwrap();
259259+ let dst = init_bare_repo().await;
260260+ apply_bundle(dst.path(), &full.data).await.unwrap();
261261+ let ok = verify_bundle(dst.path(), &incr.data).await.unwrap();
262262+ assert!(ok, "should pass: prerequisite is present");
263263+}
264264+265265+#[tokio::test]
266266+async fn full_bundle_on_empty_repo_fails() {
267267+ let empty = init_repo().await;
268268+ // no commits — bundle create should fail
269269+ let result = create_full_bundle(empty.path()).await;
270270+ assert!(result.is_err());
271271+}
272272+273273+#[tokio::test]
274274+async fn chunk_and_reassemble_bundle_data() {
275275+ let src = init_repo().await;
276276+ write_file(src.path(), "f.txt", "data").await;
277277+ commit(src.path(), "commit").await;
278278+279279+ let bundle = create_full_bundle(src.path()).await.unwrap();
280280+281281+ // chunk at a very small size to force splitting
282282+ let chunks = chunk_bytes(&bundle.data, 64);
283283+ assert!(chunks.len() > 1, "should split into multiple chunks");
284284+285285+ // reassemble
286286+ let parts: Vec<Vec<u8>> = chunks.iter().map(|c| c.to_vec()).collect();
287287+ let reassembled = reassemble_chunks(&parts);
288288+ assert_eq!(bundle.data, reassembled);
289289+290290+ // the reassembled data should still be a valid bundle
291291+ let dst = init_bare_repo().await;
292292+ apply_bundle(dst.path(), &reassembled).await.unwrap();
293293+}