open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents atprotocol pds cloudflare
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add com.atproto.sync.* XRPC endpoints for CAR export and repo status

Implement four sync endpoints (getRepo, getLatestCommit, getRepoStatus,
listRepos) as the first XRPC routes in rookery. Routes are in a new
src/sync.ts module mounted from index.ts. Includes 18 tests covering
CAR round-trips, diff/since filtering, pagination, and error responses.

+518
+2
src/index.ts
··· 2 2 import { serve } from "@hono/node-server"; 3 3 import { loadConfig } from "./config.js"; 4 4 import { initDatabase } from "./db.js"; 5 + import { createSyncRoutes } from "./sync.js"; 5 6 6 7 const config = loadConfig(); 7 8 const db = initDatabase(config.dbPath); ··· 9 10 const app = new Hono(); 10 11 11 12 app.get("/", (c) => c.json({ status: "ok" })); 13 + app.route("/", createSyncRoutes(db)); 12 14 13 15 serve({ fetch: app.fetch, port: config.port }, (info) => { 14 16 console.log(`rookery listening on port ${info.port}`);
+178
src/sync.ts
··· 1 + import { Hono, type Context } from "hono"; 2 + import { stream } from "hono/streaming"; 3 + import { CID } from "@atproto/lex-data"; 4 + import { BlockMap, blocksToCarStream } from "@atproto/repo"; 5 + import type Database from "better-sqlite3"; 6 + 7 + type AccountRow = { 8 + id: number; 9 + did: string; 10 + root_cid: string | null; 11 + rev: string | null; 12 + active: number; 13 + }; 14 + 15 + type BlockRow = { 16 + cid: string; 17 + bytes: Buffer; 18 + }; 19 + 20 + function xrpcError( 21 + c: Context, 22 + status: 400 | 404, 23 + error: string, 24 + message: string, 25 + ) { 26 + return c.json({ error, message }, status); 27 + } 28 + 29 + function getAccountByDid( 30 + db: Database.Database, 31 + did: string, 32 + ): AccountRow | undefined { 33 + return db 34 + .prepare("SELECT id, did, root_cid, rev, active FROM accounts WHERE did = ?") 35 + .get(did) as AccountRow | undefined; 36 + } 37 + 38 + function getAllBlocks(db: Database.Database, accountId: number): BlockRow[] { 39 + return db 40 + .prepare("SELECT cid, bytes FROM blocks WHERE account_id = ?") 41 + .all(accountId) as BlockRow[]; 42 + } 43 + 44 + function getBlocksSince( 45 + db: Database.Database, 46 + accountId: number, 47 + since: string, 48 + ): BlockRow[] { 49 + return db 50 + .prepare("SELECT cid, bytes FROM blocks WHERE account_id = ? AND rev > ?") 51 + .all(accountId, since) as BlockRow[]; 52 + } 53 + 54 + export function createSyncRoutes(db: Database.Database): Hono { 55 + const app = new Hono(); 56 + 57 + app.get("/xrpc/com.atproto.sync.getRepo", (c) => { 58 + const did = c.req.query("did"); 59 + if (!did) { 60 + return xrpcError(c, 400, "InvalidRequest", "missing did parameter"); 61 + } 62 + 63 + const account = getAccountByDid(db, did); 64 + if (!account) { 65 + return xrpcError(c, 404, "RepoNotFound", "repo not found"); 66 + } 67 + if (!account.root_cid) { 68 + return xrpcError(c, 400, "RepoNotFound", "repo not initialized"); 69 + } 70 + 71 + const since = c.req.query("since"); 72 + const rows = since 73 + ? getBlocksSince(db, account.id, since) 74 + : getAllBlocks(db, account.id); 75 + 76 + const blocks = new BlockMap(); 77 + for (const row of rows) { 78 + blocks.set(CID.parse(row.cid), row.bytes); 79 + } 80 + 81 + const rootCid = CID.parse(account.root_cid); 82 + const carStream = blocksToCarStream(rootCid, blocks); 83 + 84 + c.header("Content-Type", "application/vnd.ipld.car"); 85 + return stream(c, async (s) => { 86 + for await (const chunk of carStream) { 87 + await s.write(chunk); 88 + } 89 + }); 90 + }); 91 + 92 + app.get("/xrpc/com.atproto.sync.getLatestCommit", (c) => { 93 + const did = c.req.query("did"); 94 + if (!did) { 95 + return xrpcError(c, 400, "InvalidRequest", "missing did parameter"); 96 + } 97 + 98 + const account = getAccountByDid(db, did); 99 + if (!account) { 100 + return xrpcError(c, 404, "RepoNotFound", "repo not found"); 101 + } 102 + if (!account.root_cid || !account.rev) { 103 + return xrpcError(c, 404, "RepoNotFound", "repo has no commits"); 104 + } 105 + 106 + return c.json({ cid: account.root_cid, rev: account.rev }); 107 + }); 108 + 109 + app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => { 110 + const did = c.req.query("did"); 111 + if (!did) { 112 + return xrpcError(c, 400, "InvalidRequest", "missing did parameter"); 113 + } 114 + 115 + const account = getAccountByDid(db, did); 116 + if (!account) { 117 + return xrpcError(c, 404, "RepoNotFound", "repo not found"); 118 + } 119 + 120 + const response: { 121 + did: string; 122 + active: boolean; 123 + rev?: string; 124 + status?: "deactivated"; 125 + } = { 126 + did: account.did, 127 + active: !!account.active, 128 + rev: account.rev ?? undefined, 129 + }; 130 + 131 + if (!account.active) { 132 + response.status = "deactivated"; 133 + } 134 + 135 + return c.json(response); 136 + }); 137 + 138 + app.get("/xrpc/com.atproto.sync.listRepos", (c) => { 139 + const limit = Math.min( 140 + Math.max(Number.parseInt(c.req.query("limit") ?? "500", 10) || 500, 1), 141 + 1000, 142 + ); 143 + 144 + const cursorParam = c.req.query("cursor"); 145 + let cursor: number | undefined; 146 + if (cursorParam !== undefined) { 147 + if (!/^\d+$/.test(cursorParam)) { 148 + return xrpcError(c, 400, "InvalidRequest", "invalid cursor"); 149 + } 150 + cursor = Number.parseInt(cursorParam, 10); 151 + } 152 + 153 + const rows = db 154 + .prepare( 155 + "SELECT id, did, root_cid, rev, active FROM accounts WHERE id > ? ORDER BY id ASC LIMIT ?", 156 + ) 157 + .all(cursor ?? 0, limit + 1) as AccountRow[]; 158 + 159 + let nextCursor: string | undefined; 160 + if (rows.length === limit + 1) { 161 + rows.pop(); 162 + nextCursor = rows[rows.length - 1]?.id.toString(); 163 + } 164 + 165 + const repos = rows 166 + .filter((row) => row.root_cid) 167 + .map((row) => ({ 168 + did: row.did, 169 + head: row.root_cid!, 170 + rev: row.rev!, 171 + active: !!row.active, 172 + })); 173 + 174 + return c.json(nextCursor ? { cursor: nextCursor, repos } : { repos }); 175 + }); 176 + 177 + return app; 178 + }
+338
test/sync.test.ts
··· 1 + import { beforeEach, describe, expect, it } from "vitest"; 2 + import Database from "better-sqlite3"; 3 + import { Hono } from "hono"; 4 + import { Repo, WriteOpAction, readCarWithRoot } from "@atproto/repo"; 5 + import { Secp256k1Keypair } from "@atproto/crypto"; 6 + import { initDatabase } from "../src/db.js"; 7 + import { SqliteRepoStorage } from "../src/storage.js"; 8 + import { createSyncRoutes } from "../src/sync.js"; 9 + 10 + function createAccount( 11 + db: Database.Database, 12 + did: string, 13 + ): { id: number; storage: SqliteRepoStorage } { 14 + const info = db.prepare("INSERT INTO accounts (did) VALUES (?)").run(did); 15 + const id = Number(info.lastInsertRowid); 16 + return { id, storage: new SqliteRepoStorage(db, id) }; 17 + } 18 + 19 + async function createRepoWithPost(db: Database.Database, did: string) { 20 + const { id, storage } = createAccount(db, did); 21 + const keypair = await Secp256k1Keypair.create(); 22 + const created = await Repo.create(storage, did, keypair); 23 + const repo = await created.applyWrites( 24 + { 25 + action: WriteOpAction.Create, 26 + collection: "app.bsky.feed.post", 27 + rkey: "test1", 28 + record: { 29 + text: "hello world", 30 + createdAt: new Date().toISOString(), 31 + $type: "app.bsky.feed.post", 32 + }, 33 + }, 34 + keypair, 35 + ); 36 + 37 + return { id, storage, repo, keypair }; 38 + } 39 + 40 + describe("createSyncRoutes", () => { 41 + let db: Database.Database; 42 + let app: Hono; 43 + 44 + beforeEach(() => { 45 + db = initDatabase(":memory:"); 46 + app = createSyncRoutes(db); 47 + }); 48 + 49 + describe("getRepo", () => { 50 + it("returns a valid CAR for an existing repo", async () => { 51 + const did = "did:plc:alice"; 52 + await createRepoWithPost(db, did); 53 + const account = db 54 + .prepare("SELECT root_cid FROM accounts WHERE did = ?") 55 + .get(did) as { root_cid: string }; 56 + 57 + const res = await app.request(`/xrpc/com.atproto.sync.getRepo?did=${did}`); 58 + const buf = await res.arrayBuffer(); 59 + const bytes = new Uint8Array(buf); 60 + const car = await readCarWithRoot(bytes); 61 + 62 + expect(res.status).toBe(200); 63 + expect(car.root.toString()).toBe(account.root_cid); 64 + expect(car.blocks.size).toBeGreaterThan(0); 65 + }); 66 + 67 + it("returns 404 for an unknown DID", async () => { 68 + const res = await app.request( 69 + "/xrpc/com.atproto.sync.getRepo?did=did:plc:missing", 70 + ); 71 + const body = await res.json(); 72 + 73 + expect(res.status).toBe(404); 74 + expect(body).toEqual({ 75 + error: "RepoNotFound", 76 + message: "repo not found", 77 + }); 78 + }); 79 + 80 + it("returns 400 when did is missing", async () => { 81 + const res = await app.request("/xrpc/com.atproto.sync.getRepo"); 82 + const body = await res.json(); 83 + 84 + expect(res.status).toBe(400); 85 + expect(body).toEqual({ 86 + error: "InvalidRequest", 87 + message: "missing did parameter", 88 + }); 89 + }); 90 + 91 + it("supports the since parameter", async () => { 92 + const did = "did:plc:alice"; 93 + const { keypair, repo } = await createRepoWithPost(db, did); 94 + const firstRev = db 95 + .prepare("SELECT rev FROM accounts WHERE did = ?") 96 + .get(did) as { rev: string }; 97 + 98 + await repo.applyWrites( 99 + { 100 + action: WriteOpAction.Create, 101 + collection: "app.bsky.feed.post", 102 + rkey: "test2", 103 + record: { 104 + text: "second post", 105 + createdAt: new Date().toISOString(), 106 + $type: "app.bsky.feed.post", 107 + }, 108 + }, 109 + keypair, 110 + ); 111 + 112 + const fullRes = await app.request(`/xrpc/com.atproto.sync.getRepo?did=${did}`); 113 + const fullBytes = new Uint8Array(await fullRes.arrayBuffer()); 114 + const fullCar = await readCarWithRoot(fullBytes); 115 + 116 + const diffRes = await app.request( 117 + `/xrpc/com.atproto.sync.getRepo?did=${did}&since=${firstRev.rev}`, 118 + ); 119 + const diffBytes = new Uint8Array(await diffRes.arrayBuffer()); 120 + const diffCar = await readCarWithRoot(diffBytes); 121 + 122 + expect(diffRes.status).toBe(200); 123 + expect(diffCar.blocks.size).toBeLessThan(fullCar.blocks.size); 124 + }); 125 + 126 + it("returns valid CAR with zero blocks when since equals latest rev", async () => { 127 + const did = "did:plc:alice"; 128 + await createRepoWithPost(db, did); 129 + const account = db 130 + .prepare("SELECT rev FROM accounts WHERE did = ?") 131 + .get(did) as { rev: string }; 132 + 133 + const res = await app.request( 134 + `/xrpc/com.atproto.sync.getRepo?did=${did}&since=${account.rev}`, 135 + ); 136 + const buf = await res.arrayBuffer(); 137 + const bytes = new Uint8Array(buf); 138 + const car = await readCarWithRoot(bytes); 139 + 140 + expect(res.status).toBe(200); 141 + expect(car.blocks.size).toBe(0); 142 + }); 143 + 144 + it("returns CAR content type", async () => { 145 + const did = "did:plc:alice"; 146 + await createRepoWithPost(db, did); 147 + 148 + const res = await app.request(`/xrpc/com.atproto.sync.getRepo?did=${did}`); 149 + 150 + expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); 151 + }); 152 + }); 153 + 154 + describe("getLatestCommit", () => { 155 + it("returns cid and rev for an existing repo", async () => { 156 + const did = "did:plc:alice"; 157 + await createRepoWithPost(db, did); 158 + const account = db 159 + .prepare("SELECT root_cid, rev FROM accounts WHERE did = ?") 160 + .get(did) as { root_cid: string; rev: string }; 161 + 162 + const res = await app.request( 163 + `/xrpc/com.atproto.sync.getLatestCommit?did=${did}`, 164 + ); 165 + const body = await res.json(); 166 + 167 + expect(res.status).toBe(200); 168 + expect(body).toEqual({ cid: account.root_cid, rev: account.rev }); 169 + }); 170 + 171 + it("returns 404 for an unknown DID", async () => { 172 + const res = await app.request( 173 + "/xrpc/com.atproto.sync.getLatestCommit?did=did:plc:missing", 174 + ); 175 + const body = await res.json(); 176 + 177 + expect(res.status).toBe(404); 178 + expect(body).toEqual({ 179 + error: "RepoNotFound", 180 + message: "repo not found", 181 + }); 182 + }); 183 + 184 + it("returns 404 for account with no commits", async () => { 185 + createAccount(db, "did:plc:nocommits"); 186 + 187 + const res = await app.request( 188 + "/xrpc/com.atproto.sync.getLatestCommit?did=did:plc:nocommits", 189 + ); 190 + const body = await res.json(); 191 + 192 + expect(res.status).toBe(404); 193 + expect(body).toEqual({ 194 + error: "RepoNotFound", 195 + message: "repo has no commits", 196 + }); 197 + }); 198 + 199 + it("returns 400 when did is missing", async () => { 200 + const res = await app.request("/xrpc/com.atproto.sync.getLatestCommit"); 201 + const body = await res.json(); 202 + 203 + expect(res.status).toBe(400); 204 + expect(body).toEqual({ 205 + error: "InvalidRequest", 206 + message: "missing did parameter", 207 + }); 208 + }); 209 + }); 210 + 211 + describe("getRepoStatus", () => { 212 + it("returns active repo status", async () => { 213 + const did = "did:plc:alice"; 214 + await createRepoWithPost(db, did); 215 + const account = db.prepare("SELECT rev FROM accounts WHERE did = ?").get(did) as { 216 + rev: string; 217 + }; 218 + 219 + const res = await app.request( 220 + `/xrpc/com.atproto.sync.getRepoStatus?did=${did}`, 221 + ); 222 + const body = await res.json(); 223 + 224 + expect(res.status).toBe(200); 225 + expect(body).toEqual({ did, active: true, rev: account.rev }); 226 + }); 227 + 228 + it("returns 404 for an unknown DID", async () => { 229 + const res = await app.request( 230 + "/xrpc/com.atproto.sync.getRepoStatus?did=did:plc:missing", 231 + ); 232 + const body = await res.json(); 233 + 234 + expect(res.status).toBe(404); 235 + expect(body).toEqual({ 236 + error: "RepoNotFound", 237 + message: "repo not found", 238 + }); 239 + }); 240 + 241 + it("returns deactivated status for inactive accounts", async () => { 242 + const did = "did:plc:alice"; 243 + await createRepoWithPost(db, did); 244 + db.prepare("UPDATE accounts SET active = 0 WHERE did = ?").run(did); 245 + 246 + const res = await app.request( 247 + `/xrpc/com.atproto.sync.getRepoStatus?did=${did}`, 248 + ); 249 + const body = await res.json(); 250 + 251 + expect(res.status).toBe(200); 252 + expect(body.did).toBe(did); 253 + expect(body.active).toBe(false); 254 + expect(body.status).toBe("deactivated"); 255 + }); 256 + }); 257 + 258 + describe("listRepos", () => { 259 + it("returns repos with the expected fields", async () => { 260 + const did = "did:plc:alice"; 261 + await createRepoWithPost(db, did); 262 + createAccount(db, "did:plc:empty"); 263 + 264 + const res = await app.request("/xrpc/com.atproto.sync.listRepos"); 265 + const body = await res.json(); 266 + 267 + expect(res.status).toBe(200); 268 + expect(body.repos).toHaveLength(1); 269 + expect(body.repos[0]).toMatchObject({ 270 + did, 271 + active: true, 272 + }); 273 + expect(body.repos[0].head).toEqual(expect.any(String)); 274 + expect(body.repos[0].rev).toEqual(expect.any(String)); 275 + }); 276 + 277 + it("paginates using cursor chaining", async () => { 278 + await createRepoWithPost(db, "did:plc:one"); 279 + await createRepoWithPost(db, "did:plc:two"); 280 + await createRepoWithPost(db, "did:plc:three"); 281 + 282 + const seen: string[] = []; 283 + let cursor: string | undefined; 284 + 285 + do { 286 + const query = cursor 287 + ? `/xrpc/com.atproto.sync.listRepos?limit=1&cursor=${cursor}` 288 + : "/xrpc/com.atproto.sync.listRepos?limit=1"; 289 + const res = await app.request(query); 290 + const body = await res.json(); 291 + 292 + expect(res.status).toBe(200); 293 + expect(body.repos).toHaveLength(1); 294 + seen.push(body.repos[0].did); 295 + cursor = body.cursor; 296 + } while (cursor); 297 + 298 + expect(seen).toEqual([ 299 + "did:plc:one", 300 + "did:plc:two", 301 + "did:plc:three", 302 + ]); 303 + }); 304 + 305 + it("returns an empty repos array when no accounts exist", async () => { 306 + const res = await app.request("/xrpc/com.atproto.sync.listRepos"); 307 + const body = await res.json(); 308 + 309 + expect(res.status).toBe(200); 310 + expect(body).toEqual({ repos: [] }); 311 + }); 312 + 313 + it("returns 400 for invalid cursor", async () => { 314 + const res = await app.request( 315 + "/xrpc/com.atproto.sync.listRepos?cursor=notanumber", 316 + ); 317 + const body = await res.json(); 318 + 319 + expect(res.status).toBe(400); 320 + expect(body).toEqual({ 321 + error: "InvalidRequest", 322 + message: "invalid cursor", 323 + }); 324 + }); 325 + 326 + it("respects the limit parameter", async () => { 327 + await createRepoWithPost(db, "did:plc:one"); 328 + await createRepoWithPost(db, "did:plc:two"); 329 + 330 + const res = await app.request("/xrpc/com.atproto.sync.listRepos?limit=1"); 331 + const body = await res.json(); 332 + 333 + expect(res.status).toBe(200); 334 + expect(body.repos).toHaveLength(1); 335 + expect(body.cursor).toEqual(expect.any(String)); 336 + }); 337 + }); 338 + });