open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents atprotocol pds cloudflare
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add L12 R2 blob storage with uploadBlob, getBlob, listBlobs XRPC endpoints

+411 -6
-1
README.md
··· 38 38 | `ROOKERY_PLC_URL` | yes | - | PLC directory URL (e.g. `https://plc.directory`) | 39 39 | `PORT` | no | `3000` | HTTP listen port | 40 40 | `ROOKERY_DB_PATH` | no | `./rookery.db` | SQLite database file path | 41 - | `ROOKERY_BLOB_DIR` | no | `./data/blobs` | Blob storage directory | 42 41 | `ROOKERY_RELAY_HOSTS` | no | - | Comma-separated relay hostnames | 43 42 | `ROOKERY_TOS_PATH` | no | built-in text | Path to custom terms-of-service file | 44 43
+42
src/account-do.ts
··· 1 1 import { DurableObject } from "cloudflare:workers"; 2 + import { create as createCid, format as formatCid } from "@atcute/cid"; 2 3 import { 3 4 Repo, 4 5 WriteOpAction, ··· 15 16 import { SqliteRepoStorage } from "./storage"; 16 17 import type { Env } from "./types"; 17 18 19 + const CODEC_RAW = 0x55; 20 + 18 21 let generatedRkeyCounter = 0; 19 22 20 23 function nextRkey(): string { 21 24 return `${Date.now().toString(36)}${(generatedRkeyCounter++).toString(36)}`; 25 + } 26 + 27 + export interface BlobRef { 28 + $type: "blob"; 29 + ref: { $link: string }; 30 + mimeType: string; 31 + size: number; 22 32 } 23 33 24 34 export class AccountDurableObject extends DurableObject<Env> { ··· 455 465 } 456 466 457 467 return blocksToCarFile(root, allBlocks); 468 + } 469 + 470 + /** RPC: Upload a blob to R2 and track in metadata */ 471 + async rpcUploadBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> { 472 + await this.ensureStorageInitialized(); 473 + const state = this.storage!.getState(); 474 + if (!state?.did) throw new Error("Account not provisioned"); 475 + 476 + const cidObj = await createCid(CODEC_RAW, bytes); 477 + const cidStr = formatCid(cidObj); 478 + 479 + const key = `${state.did}/${cidStr}`; 480 + await this.env.BLOBS.put(key, bytes, { 481 + httpMetadata: { contentType: mimeType }, 482 + }); 483 + 484 + this.storage!.insertBlob(cidStr, mimeType, bytes.length); 485 + 486 + return { 487 + $type: "blob", 488 + ref: { $link: cidStr }, 489 + mimeType, 490 + size: bytes.length, 491 + }; 492 + } 493 + 494 + /** RPC: List blob CIDs for this account */ 495 + async rpcListBlobs( 496 + opts?: { limit?: number; cursor?: string }, 497 + ): Promise<{ cids: string[]; cursor?: string }> { 498 + await this.ensureStorageInitialized(); 499 + return this.storage!.listBlobs(opts); 458 500 } 459 501 }
+21 -3
src/directory.ts
··· 5 5 did TEXT PRIMARY KEY, 6 6 handle TEXT NOT NULL UNIQUE, 7 7 do_id TEXT NOT NULL, 8 + jwk_thumbprint TEXT, 8 9 active INTEGER NOT NULL DEFAULT 1, 9 10 created_at TEXT NOT NULL DEFAULT (datetime('now')) 10 11 ); 11 12 `; 12 13 13 14 const INDEX = "CREATE INDEX IF NOT EXISTS idx_accounts_handle ON accounts(handle);"; 15 + const THUMBPRINT_INDEX = "CREATE INDEX IF NOT EXISTS idx_accounts_thumbprint ON accounts(jwk_thumbprint);"; 14 16 15 17 export async function initDirectory(db: D1Database): Promise<void> { 16 18 await db.batch([ 17 19 db.prepare(SCHEMA), 18 20 db.prepare(INDEX), 21 + db.prepare(THUMBPRINT_INDEX), 19 22 ]); 20 23 } 21 24 ··· 47 50 48 51 export async function insertAccount( 49 52 db: D1Database, 50 - account: { did: string; handle: string; doId: string }, 53 + account: { did: string; handle: string; doId: string; jwkThumbprint?: string }, 51 54 ): Promise<void> { 52 55 await db.prepare( 53 - "INSERT INTO accounts (did, handle, do_id) VALUES (?, ?, ?)", 54 - ).bind(account.did, account.handle, account.doId).run(); 56 + "INSERT INTO accounts (did, handle, do_id, jwk_thumbprint) VALUES (?, ?, ?, ?)", 57 + ).bind(account.did, account.handle, account.doId, account.jwkThumbprint ?? null).run(); 58 + } 59 + 60 + export async function resolveByThumbprint( 61 + db: D1Database, 62 + thumbprint: string, 63 + ): Promise<{ did: string; doId: string }> { 64 + const row = await db.prepare( 65 + "SELECT did, do_id FROM accounts WHERE jwk_thumbprint = ? AND active = 1", 66 + ).bind(thumbprint).first<{ did: string; do_id: string }>(); 67 + 68 + if (!row) { 69 + throw new RepoNotFoundError("No account found for thumbprint"); 70 + } 71 + 72 + return { did: row.did, doId: row.do_id }; 55 73 }
+36
src/storage.ts
··· 66 66 CREATE TABLE IF NOT EXISTS collections ( 67 67 collection TEXT PRIMARY KEY 68 68 ); 69 + 70 + CREATE TABLE IF NOT EXISTS blobs ( 71 + cid TEXT PRIMARY KEY, 72 + mime_type TEXT NOT NULL, 73 + size INTEGER NOT NULL, 74 + created_at TEXT NOT NULL DEFAULT (datetime('now')) 75 + ); 69 76 `); 70 77 } 71 78 ··· 241 248 .exec("SELECT collection FROM collections ORDER BY collection") 242 249 .toArray(); 243 250 return rows.map((row) => row.collection as string); 251 + } 252 + 253 + insertBlob(cid: string, mimeType: string, size: number): void { 254 + this.sql.exec( 255 + "INSERT OR IGNORE INTO blobs (cid, mime_type, size) VALUES (?, ?, ?)", 256 + cid, 257 + mimeType, 258 + size, 259 + ); 260 + } 261 + 262 + listBlobs(opts?: { limit?: number; cursor?: string }): { cids: string[]; cursor?: string } { 263 + const limit = opts?.limit ?? 500; 264 + let rows: Array<{ cid: string }>; 265 + if (opts?.cursor) { 266 + rows = this.sql 267 + .exec("SELECT cid FROM blobs WHERE cid > ? ORDER BY cid ASC LIMIT ?", opts.cursor, limit + 1) 268 + .toArray() as Array<{ cid: string }>; 269 + } else { 270 + rows = this.sql 271 + .exec("SELECT cid FROM blobs ORDER BY cid ASC LIMIT ?", limit + 1) 272 + .toArray() as Array<{ cid: string }>; 273 + } 274 + const hasMore = rows.length > limit; 275 + const results = hasMore ? rows.slice(0, limit) : rows; 276 + return { 277 + cids: results.map((r) => r.cid), 278 + cursor: hasMore ? results[results.length - 1].cid : undefined, 279 + }; 244 280 } 245 281 246 282 /**
+2
src/types.ts
··· 12 12 SEQUENCER: DurableObjectNamespace<SequencerDurableObject>; 13 13 /** D1 account directory for cross-account queries */ 14 14 DIRECTORY: D1Database; 15 + /** R2 bucket for blob storage */ 16 + BLOBS: R2Bucket; 15 17 /** Public hostname of the PDS */ 16 18 ROOKERY_HOSTNAME: string; 17 19 /** Handle domain suffix (e.g. ".pds.example.com") */
+109 -2
src/worker.ts
··· 7 7 initDirectory, 8 8 insertAccount, 9 9 resolveRepo, 10 + resolveByThumbprint, 10 11 } from "./directory"; 12 + import { 13 + extractBearerToken, 14 + validateDpopProof, 15 + } from "./auth"; 11 16 import type { Env } from "./types"; 12 17 13 18 const app = new Hono<{ Bindings: Env }>(); ··· 18 23 // POST /api/signup 19 24 app.post("/api/signup", async (c) => { 20 25 const env = c.env; 21 - let body: { handle?: string }; 26 + let body: { handle?: string; jwkThumbprint?: string }; 22 27 23 28 try { 24 - body = await c.req.json<{ handle?: string }>(); 29 + body = await c.req.json<{ handle?: string; jwkThumbprint?: string }>(); 25 30 } catch { 26 31 return c.json({ error: "InvalidRequest", message: "Invalid JSON body" }, 400); 27 32 } 28 33 29 34 if (!body.handle || typeof body.handle !== "string") { 30 35 return c.json({ error: "InvalidRequest", message: "Missing or invalid handle" }, 400); 36 + } 37 + if (body.jwkThumbprint !== undefined && typeof body.jwkThumbprint !== "string") { 38 + return c.json({ error: "InvalidRequest", message: "Invalid jwkThumbprint" }, 400); 31 39 } 32 40 33 41 // Always construct handle as name + configured domain ··· 74 82 signingKeyPub, 75 83 rotationKeyHex, 76 84 rotationKeyPub, 85 + jwkThumbprint: body.jwkThumbprint, 77 86 }); 78 87 79 88 await insertAccount(env.DIRECTORY, { 80 89 did, 81 90 handle, 82 91 doId: doId.toString(), 92 + jwkThumbprint: body.jwkThumbprint, 83 93 }); 84 94 85 95 return c.json({ did, handle }); ··· 183 193 } 184 194 throw err; 185 195 } 196 + }); 197 + 198 + // POST /xrpc/com.atproto.repo.uploadBlob (DPoP auth required) 199 + app.post("/xrpc/com.atproto.repo.uploadBlob", async (c) => { 200 + const contentLength = parseInt(c.req.header("content-length") || "0", 10); 201 + if (contentLength > 60 * 1024 * 1024) { 202 + return c.json({ error: "BlobTooLarge", message: "Blob exceeds 60MB limit" }, 400); 203 + } 204 + 205 + const accessToken = extractBearerToken(c.req.header("authorization") ?? null); 206 + if (!accessToken) { 207 + return c.json({ error: "AuthRequired", message: "Missing DPoP authorization" }, 401); 208 + } 209 + 210 + const dpopJwt = c.req.header("dpop"); 211 + if (!dpopJwt) { 212 + return c.json({ error: "AuthRequired", message: "Missing DPoP proof" }, 401); 213 + } 214 + 215 + let thumbprint: string; 216 + try { 217 + const result = await validateDpopProof(dpopJwt, "POST", c.req.url, accessToken); 218 + thumbprint = result.thumbprint; 219 + } catch (err) { 220 + return c.json({ error: "AuthFailed", message: (err as Error).message }, 401); 221 + } 222 + 223 + await initDirectory(c.env.DIRECTORY); 224 + 225 + let doId: string; 226 + try { 227 + const resolved = await resolveByThumbprint(c.env.DIRECTORY, thumbprint); 228 + doId = resolved.doId; 229 + } catch { 230 + return c.json({ error: "AccountNotFound", message: "No account for this key" }, 401); 231 + } 232 + 233 + const bytes = new Uint8Array(await c.req.arrayBuffer()); 234 + if (bytes.length > 60 * 1024 * 1024) { 235 + return c.json({ error: "BlobTooLarge", message: "Blob exceeds 60MB limit" }, 400); 236 + } 237 + 238 + const mimeType = c.req.header("content-type") || "application/octet-stream"; 239 + const stub = c.env.ACCOUNT.get(c.env.ACCOUNT.idFromString(doId)); 240 + const blob = await stub.rpcUploadBlob(bytes, mimeType); 241 + return c.json({ blob }); 242 + }); 243 + 244 + // GET /xrpc/com.atproto.sync.getBlob (public) 245 + app.get("/xrpc/com.atproto.sync.getBlob", async (c) => { 246 + const did = c.req.query("did"); 247 + const cid = c.req.query("cid"); 248 + if (!did || !cid) { 249 + return c.json({ error: "InvalidRequest", message: "Missing required parameters: did, cid" }, 400); 250 + } 251 + 252 + const key = `${did}/${cid}`; 253 + const object = await c.env.BLOBS.get(key); 254 + if (!object) { 255 + return c.json({ error: "BlobNotFound", message: "Blob not found" }, 404); 256 + } 257 + 258 + const headers = new Headers(); 259 + if (object.httpMetadata?.contentType) { 260 + headers.set("content-type", object.httpMetadata.contentType); 261 + } 262 + return new Response(object.body, { headers }); 263 + }); 264 + 265 + // GET /xrpc/com.atproto.sync.listBlobs (public) 266 + app.get("/xrpc/com.atproto.sync.listBlobs", async (c) => { 267 + const did = c.req.query("did"); 268 + if (!did) { 269 + return c.json({ error: "InvalidRequest", message: "Missing required parameter: did" }, 400); 270 + } 271 + 272 + await initDirectory(c.env.DIRECTORY); 273 + 274 + let doId: string; 275 + try { 276 + const resolved = await resolveRepo(did, c.env); 277 + doId = resolved.doId; 278 + } catch (err) { 279 + if (err instanceof RepoNotFoundError) { 280 + return c.json({ error: "RepoNotFound", message: "Repository not found" }, 404); 281 + } 282 + throw err; 283 + } 284 + 285 + const cursor = c.req.query("cursor"); 286 + let limit = parseInt(c.req.query("limit") || "500", 10); 287 + if (Number.isNaN(limit) || limit < 1) limit = 500; 288 + if (limit > 1000) limit = 1000; 289 + 290 + const stub = c.env.ACCOUNT.get(c.env.ACCOUNT.idFromString(doId)); 291 + const result = await stub.rpcListBlobs({ limit, cursor }); 292 + return c.json(result); 186 293 }); 187 294 188 295 // GET /xrpc/com.atproto.sync.subscribeRepos
+191
test/blobs.test.ts
··· 1 + import { describe, it, expect, beforeAll } from "vitest"; 2 + import { env, runInDurableObject, worker } from "./helpers"; 3 + import { AccountDurableObject, type BlobRef } from "../src/account-do"; 4 + import { initDirectory, insertAccount } from "../src/directory"; 5 + import { Secp256k1Keypair } from "@atproto/crypto"; 6 + import { toString } from "uint8arrays/to-string"; 7 + import { base64urlEncode, jwkThumbprint, sha256Base64url } from "../src/auth"; 8 + 9 + async function signJwt( 10 + header: Record<string, unknown>, 11 + payload: Record<string, unknown>, 12 + privateKey: CryptoKey, 13 + ): Promise<string> { 14 + const encode = (obj: Record<string, unknown>) => 15 + base64urlEncode(new TextEncoder().encode(JSON.stringify(obj))); 16 + const headerStr = encode(header); 17 + const payloadStr = encode(payload); 18 + const signingInput = `${headerStr}.${payloadStr}`; 19 + const signature = await crypto.subtle.sign( 20 + "RSASSA-PKCS1-v1_5", 21 + privateKey, 22 + new TextEncoder().encode(signingInput), 23 + ); 24 + return `${signingInput}.${base64urlEncode(signature)}`; 25 + } 26 + 27 + async function setupBlobTestAccount( 28 + testEnv: typeof env, 29 + opts?: { jwkThumbprint?: string }, 30 + ) { 31 + const signing = await Secp256k1Keypair.create({ exportable: true }); 32 + const rotation = await Secp256k1Keypair.create({ exportable: true }); 33 + 34 + const did = `did:plc:blobtest${Date.now().toString(36)}`; 35 + const handle = `blobtest-${Date.now().toString(36)}.rookery.test`; 36 + 37 + const doId = testEnv.ACCOUNT.newUniqueId(); 38 + const stub = testEnv.ACCOUNT.get(doId); 39 + 40 + await runInDurableObject(stub, async (instance: AccountDurableObject) => { 41 + await instance.rpcInitAccount({ 42 + did, 43 + handle, 44 + signingKeyHex: toString(await signing.export(), "hex"), 45 + signingKeyPub: signing.did().split(":").pop()!, 46 + rotationKeyHex: toString(await rotation.export(), "hex"), 47 + rotationKeyPub: rotation.did().split(":").pop()!, 48 + jwkThumbprint: opts?.jwkThumbprint, 49 + }); 50 + }); 51 + 52 + await initDirectory(testEnv.DIRECTORY); 53 + await insertAccount(testEnv.DIRECTORY, { 54 + did, 55 + handle, 56 + doId: doId.toString(), 57 + jwkThumbprint: opts?.jwkThumbprint, 58 + }); 59 + 60 + return { did, handle, doId, stub }; 61 + } 62 + 63 + describe("Blob storage", () => { 64 + beforeAll(async () => { 65 + await initDirectory(env.DIRECTORY); 66 + }); 67 + 68 + it("uploads and retrieves a blob via DO RPC", async () => { 69 + const { did, stub } = await setupBlobTestAccount(env); 70 + 71 + const content = new TextEncoder().encode("hello blob world"); 72 + let blobRef!: BlobRef; 73 + 74 + await runInDurableObject(stub, async (instance: AccountDurableObject) => { 75 + blobRef = await instance.rpcUploadBlob(content, "text/plain"); 76 + }); 77 + 78 + expect(blobRef.$type).toBe("blob"); 79 + expect(blobRef.ref.$link).toBeTruthy(); 80 + expect(blobRef.mimeType).toBe("text/plain"); 81 + expect(blobRef.size).toBe(content.length); 82 + 83 + const key = `${did}/${blobRef.ref.$link}`; 84 + const object = await env.BLOBS.get(key); 85 + expect(object).not.toBeNull(); 86 + expect(object!.httpMetadata?.contentType).toBe("text/plain"); 87 + 88 + const retrieved = new Uint8Array(await object!.arrayBuffer()); 89 + expect(retrieved).toEqual(content); 90 + }); 91 + 92 + it("lists blobs for an account", async () => { 93 + const { stub } = await setupBlobTestAccount(env); 94 + 95 + await runInDurableObject(stub, async (instance: AccountDurableObject) => { 96 + await instance.rpcUploadBlob(new TextEncoder().encode("blob1"), "text/plain"); 97 + await instance.rpcUploadBlob(new TextEncoder().encode("blob2"), "image/png"); 98 + 99 + const result = await instance.rpcListBlobs(); 100 + expect(result.cids).toHaveLength(2); 101 + expect(result.cids.every((c: string) => typeof c === "string" && c.length > 0)).toBe(true); 102 + }); 103 + }); 104 + 105 + it("retrieves blob with correct content-type via getBlob route", async () => { 106 + const { did, stub } = await setupBlobTestAccount(env); 107 + 108 + let blobRef!: BlobRef; 109 + const content = new TextEncoder().encode("image data here"); 110 + 111 + await runInDurableObject(stub, async (instance: AccountDurableObject) => { 112 + blobRef = await instance.rpcUploadBlob(content, "image/jpeg"); 113 + }); 114 + 115 + const response = await worker.fetch( 116 + `http://localhost/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(blobRef.ref.$link)}`, 117 + ); 118 + expect(response.status).toBe(200); 119 + expect(response.headers.get("content-type")).toBe("image/jpeg"); 120 + const retrieved = new Uint8Array(await response.arrayBuffer()); 121 + expect(retrieved).toEqual(content); 122 + }); 123 + 124 + it("returns content-addressed CIDs (same content = same CID)", async () => { 125 + const { stub } = await setupBlobTestAccount(env); 126 + 127 + const content = new TextEncoder().encode("duplicate content"); 128 + 129 + await runInDurableObject(stub, async (instance: AccountDurableObject) => { 130 + const ref1 = await instance.rpcUploadBlob(content, "text/plain"); 131 + const ref2 = await instance.rpcUploadBlob(content, "text/plain"); 132 + expect(ref1.ref.$link).toBe(ref2.ref.$link); 133 + }); 134 + }); 135 + 136 + it("uploads a blob via the DPoP-authenticated route", async () => { 137 + const keyPair = await crypto.subtle.generateKey( 138 + { 139 + name: "RSASSA-PKCS1-v1_5", 140 + modulusLength: 4096, 141 + publicExponent: new Uint8Array([1, 0, 1]), 142 + hash: "SHA-256", 143 + }, 144 + true, 145 + ["sign", "verify"], 146 + ); 147 + const publicJwk = await crypto.subtle.exportKey("jwk", keyPair.publicKey); 148 + const thumbprint = await jwkThumbprint(publicJwk as { kty: string; n: string; e: string }); 149 + const { did } = await setupBlobTestAccount(env, { jwkThumbprint: thumbprint }); 150 + 151 + const accessToken = "route-upload-token"; 152 + const body = new TextEncoder().encode("blob via route"); 153 + const dpopJwt = await signJwt( 154 + { 155 + typ: "dpop+jwt", 156 + alg: "RS256", 157 + jwk: publicJwk, 158 + }, 159 + { 160 + jti: `jti-${Date.now().toString(36)}`, 161 + htm: "POST", 162 + htu: "http://localhost/xrpc/com.atproto.repo.uploadBlob", 163 + iat: Math.floor(Date.now() / 1000), 164 + ath: await sha256Base64url(accessToken), 165 + }, 166 + keyPair.privateKey, 167 + ); 168 + 169 + const response = await worker.fetch( 170 + new Request("http://localhost/xrpc/com.atproto.repo.uploadBlob", { 171 + method: "POST", 172 + headers: { 173 + authorization: `DPoP ${accessToken}`, 174 + dpop: dpopJwt, 175 + "content-type": "text/plain", 176 + "content-length": String(body.byteLength), 177 + }, 178 + body, 179 + }), 180 + ); 181 + 182 + expect(response.status).toBe(200); 183 + const json = await response.json() as { blob: BlobRef }; 184 + expect(json.blob.mimeType).toBe("text/plain"); 185 + 186 + const stored = await env.BLOBS.get(`${did}/${json.blob.ref.$link}`); 187 + expect(stored).not.toBeNull(); 188 + const storedBytes = new Uint8Array(await stored!.arrayBuffer()); 189 + expect(storedBytes).toEqual(body); 190 + }); 191 + });
+6
test/fixtures/worker/wrangler.jsonc
··· 31 31 "database_name": "rookery-test-directory", 32 32 "database_id": "test-directory-id" 33 33 } 34 + ], 35 + "r2_buckets": [ 36 + { 37 + "binding": "BLOBS", 38 + "bucket_name": "rookery-test-blobs" 39 + } 34 40 ] 35 41 }
+4
wrangler.toml
··· 21 21 binding = "DIRECTORY" 22 22 database_name = "rookery-directory" 23 23 database_id = "placeholder-create-with-wrangler-d1-create" 24 + 25 + [[r2_buckets]] 26 + binding = "BLOBS" 27 + bucket_name = "rookery-blobs"