open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents atprotocol pds cloudflare
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-j5szwjdy-l8-blobs-relay'

# Conflicts:
# src/index.ts
# src/sync.ts

+695 -4
+8
src/config.ts
··· 5 5 handleDomain: string; 6 6 plcUrl: string; 7 7 dbPath: string; 8 + blobDir: string; 9 + relayHosts: string[]; 8 10 port: number; 9 11 tosText: string; 10 12 } ··· 50 52 handleDomain: process.env.ROOKERY_HANDLE_DOMAIN!, 51 53 plcUrl: process.env.ROOKERY_PLC_URL!, 52 54 dbPath: process.env.ROOKERY_DB_PATH ?? "./rookery.db", 55 + blobDir: process.env.ROOKERY_BLOB_DIR ?? "./data/blobs", 56 + relayHosts: process.env.ROOKERY_RELAY_HOSTS 57 + ? process.env.ROOKERY_RELAY_HOSTS.split(",") 58 + .map((h) => h.trim()) 59 + .filter(Boolean) 60 + : [], 53 61 port, 54 62 tosText, 55 63 };
+5 -1
src/index.ts
··· 1 + import { mkdirSync } from "node:fs"; 1 2 import { serve } from "@hono/node-server"; 2 3 import { createNodeWebSocket } from "@hono/node-ws"; 3 4 import { createApp } from "./app.js"; 4 5 import { loadConfig } from "./config.js"; 5 6 import { initDatabase } from "./db.js"; 7 + import { announceToRelays } from "./relay.js"; 6 8 import { createRepoRoutes } from "./repo.js"; 7 9 import { Sequencer } from "./sequencer.js"; 8 10 import { createSyncRoutes } from "./sync.js"; 9 11 10 12 const config = loadConfig(); 11 13 const db = initDatabase(config.dbPath); 14 + mkdirSync(config.blobDir, { recursive: true }); 12 15 const sequencer = new Sequencer(db); 13 16 const app = createApp(config, db, sequencer); 14 17 const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 15 18 16 - app.route("/", createSyncRoutes(db, sequencer, upgradeWebSocket)); 19 + app.route("/", createSyncRoutes(db, config, sequencer, upgradeWebSocket)); 17 20 app.route("/", createRepoRoutes(db, config, sequencer)); 18 21 19 22 const server = serve({ fetch: app.fetch, port: config.port }, (info) => { 20 23 console.log(`rookery listening on port ${info.port}`); 24 + announceToRelays(config); 21 25 }); 22 26 injectWebSocket(server);
+13
src/relay.ts
··· 1 + import type { Config } from "./config.js"; 2 + 3 + export function announceToRelays(config: Config) { 4 + for (const host of config.relayHosts) { 5 + fetch(`https://${host}/xrpc/com.atproto.sync.requestCrawl`, { 6 + method: "POST", 7 + headers: { "Content-Type": "application/json" }, 8 + body: JSON.stringify({ hostname: config.hostname }), 9 + }).catch((err) => { 10 + console.error(`relay announcement to ${host} failed:`, err); 11 + }); 12 + } 13 + }
+40
src/repo.ts
··· 1 1 import { Hono, type Context } from "hono"; 2 + import { bodyLimit } from "hono/body-limit"; 3 + import { mkdirSync, writeFileSync } from "node:fs"; 4 + import path from "node:path"; 2 5 import { CID } from "@atproto/lex-data"; 3 6 import { Secp256k1Keypair } from "@atproto/crypto"; 7 + import { create as createCid, format as formatCid } from "@atcute/cid"; 4 8 import { 5 9 Repo, 6 10 WriteOpAction, ··· 652 656 results, 653 657 }); 654 658 }); 659 + 660 + app.post( 661 + "/xrpc/com.atproto.repo.uploadBlob", 662 + bodyLimit({ 663 + maxSize: 60 * 1024 * 1024, 664 + onError: (c) => 665 + c.json({ error: "InvalidRequest", message: "blob too large (max 60MB)" }, 400), 666 + }), 667 + authMiddleware, 668 + async (c) => { 669 + const account = c.get("account"); 670 + const mimeType = c.req.header("content-type") ?? "application/octet-stream"; 671 + const arrayBuf = await c.req.arrayBuffer(); 672 + const bytes = new Uint8Array(arrayBuf); 673 + 674 + const cid = await createCid(0x55, bytes); 675 + const cidStr = formatCid(cid); 676 + 677 + const dirPath = path.join(config.blobDir, account.did); 678 + mkdirSync(dirPath, { recursive: true }); 679 + writeFileSync(path.join(dirPath, cidStr), bytes); 680 + 681 + db.prepare( 682 + "INSERT INTO blobs (account_id, cid, mime_type, size) VALUES (?, ?, ?, ?) ON CONFLICT(account_id, cid) DO UPDATE SET mime_type = excluded.mime_type, size = excluded.size", 683 + ).run(account.id, cidStr, mimeType, bytes.length); 684 + 685 + return c.json({ 686 + blob: { 687 + $type: "blob", 688 + ref: { $link: cidStr }, 689 + mimeType, 690 + size: bytes.length, 691 + }, 692 + }); 693 + }, 694 + ); 655 695 656 696 return app; 657 697 }
+83
src/sync.ts
··· 1 1 import { Hono, type Context } from "hono"; 2 2 import { stream } from "hono/streaming"; 3 + import { readFileSync } from "node:fs"; 4 + import path from "node:path"; 3 5 import { CID } from "@atproto/lex-data"; 4 6 import { BlockMap, blocksToCarStream } from "@atproto/repo"; 5 7 import type Database from "better-sqlite3"; 8 + import type { Config } from "./config.js"; 6 9 import type { Sequencer } from "./sequencer.js"; 7 10 8 11 type AccountRow = { ··· 54 57 55 58 export function createSyncRoutes( 56 59 db: Database.Database, 60 + config: Config, 57 61 sequencer?: Sequencer, 58 62 upgradeWebSocket?: Function, 59 63 ): Hono { ··· 230 234 }), 231 235 ); 232 236 } 237 + 238 + app.get("/xrpc/com.atproto.sync.getBlob", (c) => { 239 + const did = c.req.query("did"); 240 + const cid = c.req.query("cid"); 241 + if (!did || !cid) { 242 + return xrpcError(c, 400, "InvalidRequest", "missing did or cid parameter"); 243 + } 244 + 245 + const row = db 246 + .prepare( 247 + "SELECT b.cid, b.mime_type FROM blobs b JOIN accounts a ON a.id = b.account_id WHERE a.did = ? AND b.cid = ?", 248 + ) 249 + .get(did, cid) as { cid: string; mime_type: string | null } | undefined; 250 + 251 + if (!row) { 252 + return xrpcError(c, 404, "BlobNotFound", "blob not found"); 253 + } 254 + 255 + const blobPath = path.join(config.blobDir, did, cid); 256 + let bytes: Buffer; 257 + try { 258 + bytes = readFileSync(blobPath); 259 + } catch { 260 + return xrpcError(c, 404, "BlobNotFound", "blob not found"); 261 + } 262 + 263 + c.header("Content-Type", row.mime_type ?? "application/octet-stream"); 264 + return c.body(bytes); 265 + }); 266 + 267 + app.get("/xrpc/com.atproto.sync.listBlobs", (c) => { 268 + const did = c.req.query("did"); 269 + if (!did) { 270 + return xrpcError(c, 400, "InvalidRequest", "missing did parameter"); 271 + } 272 + 273 + const account = getAccountByDid(db, did); 274 + if (!account) { 275 + return xrpcError(c, 404, "RepoNotFound", "repo not found"); 276 + } 277 + 278 + const limit = Math.min( 279 + Math.max(Number.parseInt(c.req.query("limit") ?? "500", 10) || 500, 1), 280 + 1000, 281 + ); 282 + 283 + const cursor = c.req.query("cursor"); 284 + const since = c.req.query("since"); 285 + 286 + let query: string; 287 + const params: (number | string)[] = [account.id]; 288 + 289 + if (cursor && since) { 290 + query = 291 + "SELECT cid FROM blobs WHERE account_id = ? AND cid > ? AND created_at > ? ORDER BY cid ASC LIMIT ?"; 292 + params.push(cursor, since, limit + 1); 293 + } else if (cursor) { 294 + query = "SELECT cid FROM blobs WHERE account_id = ? AND cid > ? ORDER BY cid ASC LIMIT ?"; 295 + params.push(cursor, limit + 1); 296 + } else if (since) { 297 + query = "SELECT cid FROM blobs WHERE account_id = ? AND created_at > ? ORDER BY cid ASC LIMIT ?"; 298 + params.push(since, limit + 1); 299 + } else { 300 + query = "SELECT cid FROM blobs WHERE account_id = ? ORDER BY cid ASC LIMIT ?"; 301 + params.push(limit + 1); 302 + } 303 + 304 + const rows = db.prepare(query).all(...params) as { cid: string }[]; 305 + 306 + let nextCursor: string | undefined; 307 + if (rows.length === limit + 1) { 308 + rows.pop(); 309 + nextCursor = rows[rows.length - 1]?.cid; 310 + } 311 + 312 + const cids = rows.map((r) => r.cid); 313 + 314 + return c.json(nextCursor ? { cursor: nextCursor, cids } : { cids }); 315 + }); 233 316 234 317 return app; 235 318 }
+2
test/auth.test.ts
··· 124 124 handleDomain: "test.example.com", 125 125 plcUrl: "https://plc.example.com", 126 126 dbPath: ":memory:", 127 + blobDir: "/tmp/rookery-test-blobs", 128 + relayHosts: [], 127 129 port: 3000, 128 130 tosText: DEFAULT_TOS_TEXT, 129 131 };
+513
test/blob.test.ts
··· 1 + import crypto from "node:crypto"; 2 + import fs from "node:fs"; 3 + import os from "node:os"; 4 + import path from "node:path"; 5 + import { afterEach, describe, expect, it, vi } from "vitest"; 6 + import { create as createCid, format as formatCid } from "@atcute/cid"; 7 + import { createApp } from "../src/app.js"; 8 + import { DEFAULT_TOS_TEXT, type Config } from "../src/config.js"; 9 + import { initDatabase } from "../src/db.js"; 10 + import { announceToRelays } from "../src/relay.js"; 11 + import { createRepoRoutes } from "../src/repo.js"; 12 + import { createSyncRoutes } from "../src/sync.js"; 13 + 14 + function generateRsa4096() { 15 + return crypto.generateKeyPairSync("rsa", { 16 + modulusLength: 4096, 17 + publicKeyEncoding: { type: "spki", format: "pem" }, 18 + privateKeyEncoding: { type: "pkcs8", format: "pem" }, 19 + }); 20 + } 21 + 22 + function pemToJwk(publicKeyPem: string): { kty: string; n: string; e: string } { 23 + const key = crypto.createPublicKey(publicKeyPem); 24 + const jwk = key.export({ format: "jwk" }); 25 + if ( 26 + !("kty" in jwk) || 27 + typeof jwk.kty !== "string" || 28 + !("n" in jwk) || 29 + typeof jwk.n !== "string" || 30 + !("e" in jwk) || 31 + typeof jwk.e !== "string" 32 + ) { 33 + throw new Error("expected RSA JWK"); 34 + } 35 + return { kty: jwk.kty, n: jwk.n, e: jwk.e }; 36 + } 37 + 38 + function base64urlEncode(input: Buffer | Uint8Array | string): string { 39 + return Buffer.from(input).toString("base64url"); 40 + } 41 + 42 + function createJwt(header: object, payload: object, privateKeyPem: string): string { 43 + const headerB64 = base64urlEncode(Buffer.from(JSON.stringify(header))); 44 + const payloadB64 = base64urlEncode(Buffer.from(JSON.stringify(payload))); 45 + const signingInput = `${headerB64}.${payloadB64}`; 46 + const sign = crypto.createSign("SHA256"); 47 + sign.update(signingInput); 48 + const signature = sign.sign(privateKeyPem); 49 + return `${signingInput}.${base64urlEncode(signature)}`; 50 + } 51 + 52 + function createDpopProof( 53 + jwk: { kty: string; n: string; e: string }, 54 + privateKeyPem: string, 55 + method: string, 56 + htu: string, 57 + accessToken?: string, 58 + ) { 59 + const payload: Record<string, unknown> = { 60 + jti: crypto.randomUUID(), 61 + htm: method, 62 + htu, 63 + iat: Math.floor(Date.now() / 1000), 64 + }; 65 + if (accessToken) { 66 + const atHash = crypto.createHash("sha256").update(accessToken).digest(); 67 + payload.ath = base64urlEncode(atHash); 68 + } 69 + return createJwt( 70 + { typ: "dpop+jwt", alg: "RS256", jwk }, 71 + payload, 72 + privateKeyPem, 73 + ); 74 + } 75 + 76 + function signTos(tosText: string, privateKeyPem: string): string { 77 + const sign = crypto.createSign("SHA256"); 78 + sign.update(tosText); 79 + return base64urlEncode(sign.sign(privateKeyPem)); 80 + } 81 + 82 + function computeJwkThumbprint(jwk: { kty: string; n: string; e: string }): string { 83 + const canonical = JSON.stringify({ e: jwk.e, kty: "RSA", n: jwk.n }); 84 + return base64urlEncode(crypto.createHash("sha256").update(canonical).digest()); 85 + } 86 + 87 + function createAccessToken( 88 + tosText: string, 89 + privateKeyPem: string, 90 + jwk: { kty: string; n: string; e: string }, 91 + serviceOrigin: string, 92 + ): string { 93 + return createJwt( 94 + { typ: "wm+jwt", alg: "RS256" }, 95 + { 96 + jti: crypto.randomUUID(), 97 + tos_hash: base64urlEncode(crypto.createHash("sha256").update(tosText).digest()), 98 + aud: serviceOrigin, 99 + cnf: { jkt: computeJwkThumbprint(jwk) }, 100 + iat: Math.floor(Date.now() / 1000), 101 + }, 102 + privateKeyPem, 103 + ); 104 + } 105 + 106 + let blobDir = ""; 107 + 108 + function createTestApp() { 109 + blobDir = fs.mkdtempSync(path.join(os.tmpdir(), "rookery-blob-test-")); 110 + const db = initDatabase(":memory:"); 111 + const config: Config = { 112 + hostname: "test.example.com", 113 + handleDomain: "test.example.com", 114 + plcUrl: "https://plc.example.com", 115 + dbPath: ":memory:", 116 + blobDir, 117 + relayHosts: [], 118 + port: 3000, 119 + tosText: DEFAULT_TOS_TEXT, 120 + }; 121 + const app = createApp(config, db); 122 + app.route("/", createSyncRoutes(db, config)); 123 + app.route("/", createRepoRoutes(db, config)); 124 + return { app, db, config }; 125 + } 126 + 127 + async function performSignup( 128 + app: ReturnType<typeof createApp>, 129 + config: Config, 130 + handle = "agent", 131 + ) { 132 + const keys = generateRsa4096(); 133 + const jwk = pemToJwk(keys.publicKey); 134 + const accessToken = createAccessToken( 135 + config.tosText, 136 + keys.privateKey, 137 + jwk, 138 + `https://${config.hostname}`, 139 + ); 140 + const response = await app.request("http://localhost/api/signup", { 141 + method: "POST", 142 + headers: { 143 + DPoP: createDpopProof(jwk, keys.privateKey, "POST", "http://localhost/api/signup"), 144 + "Content-Type": "application/json", 145 + }, 146 + body: JSON.stringify({ 147 + handle, 148 + tos_signature: signTos(config.tosText, keys.privateKey), 149 + access_token: accessToken, 150 + }), 151 + }); 152 + 153 + return { response, accessToken, jwk, ...keys }; 154 + } 155 + 156 + async function authenticatedUpload( 157 + app: ReturnType<typeof createApp>, 158 + accessToken: string, 159 + privateKeyPem: string, 160 + jwk: { kty: string; n: string; e: string }, 161 + body: Uint8Array, 162 + contentType: string, 163 + ) { 164 + const url = "http://localhost/xrpc/com.atproto.repo.uploadBlob"; 165 + const dpop = createDpopProof(jwk, privateKeyPem, "POST", url, accessToken); 166 + return app.request(url, { 167 + method: "POST", 168 + headers: { 169 + Authorization: `DPoP ${accessToken}`, 170 + DPoP: dpop, 171 + "Content-Type": contentType, 172 + }, 173 + body, 174 + }); 175 + } 176 + 177 + describe("blob endpoints", () => { 178 + afterEach(() => { 179 + vi.restoreAllMocks(); 180 + vi.unstubAllGlobals(); 181 + if (blobDir && fs.existsSync(blobDir)) { 182 + fs.rmSync(blobDir, { recursive: true }); 183 + } 184 + blobDir = ""; 185 + }); 186 + 187 + describe("uploadBlob", () => { 188 + it("uploads a blob and returns correct blob ref", async () => { 189 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 190 + const { app, config } = createTestApp(); 191 + const signup = await performSignup(app, config); 192 + 193 + const content = new TextEncoder().encode("hello blob"); 194 + const res = await authenticatedUpload( 195 + app, 196 + signup.accessToken, 197 + signup.privateKey, 198 + signup.jwk, 199 + content, 200 + "text/plain", 201 + ); 202 + 203 + expect(res.status).toBe(200); 204 + const body = await res.json(); 205 + expect(body.blob.$type).toBe("blob"); 206 + expect(body.blob.mimeType).toBe("text/plain"); 207 + expect(body.blob.size).toBe(content.length); 208 + expect(typeof body.blob.ref.$link).toBe("string"); 209 + 210 + const expectedCid = formatCid(await createCid(0x55, content)); 211 + expect(body.blob.ref.$link).toBe(expectedCid); 212 + }); 213 + 214 + it("writes blob to filesystem at {blobDir}/{did}/{cid}", async () => { 215 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 216 + const { app, config } = createTestApp(); 217 + const signup = await performSignup(app, config); 218 + const signupBody = await signup.response.json(); 219 + 220 + const content = new TextEncoder().encode("fs check"); 221 + const res = await authenticatedUpload( 222 + app, 223 + signup.accessToken, 224 + signup.privateKey, 225 + signup.jwk, 226 + content, 227 + "application/octet-stream", 228 + ); 229 + 230 + const body = await res.json(); 231 + const cidStr = body.blob.ref.$link; 232 + const blobPath = path.join(blobDir, signupBody.did, cidStr); 233 + expect(fs.existsSync(blobPath)).toBe(true); 234 + expect(fs.readFileSync(blobPath)).toEqual(Buffer.from(content)); 235 + }); 236 + 237 + it("rejects unauthenticated upload with 401", async () => { 238 + const { app } = createTestApp(); 239 + const res = await app.request("http://localhost/xrpc/com.atproto.repo.uploadBlob", { 240 + method: "POST", 241 + headers: { "Content-Type": "text/plain" }, 242 + body: "hello", 243 + }); 244 + 245 + expect(res.status).toBe(401); 246 + }); 247 + 248 + it("rejects blob larger than 60MB", async () => { 249 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 250 + const { app, config } = createTestApp(); 251 + const signup = await performSignup(app, config); 252 + 253 + const url = "http://localhost/xrpc/com.atproto.repo.uploadBlob"; 254 + const dpop = createDpopProof(signup.jwk, signup.privateKey, "POST", url, signup.accessToken); 255 + const res = await app.request(url, { 256 + method: "POST", 257 + headers: { 258 + Authorization: `DPoP ${signup.accessToken}`, 259 + DPoP: dpop, 260 + "Content-Type": "application/octet-stream", 261 + "Content-Length": String(61 * 1024 * 1024), 262 + }, 263 + body: new Uint8Array(1024), 264 + }); 265 + 266 + expect(res.status).toBe(400); 267 + const body = await res.json(); 268 + expect(body.error).toBe("InvalidRequest"); 269 + expect(body.message).toContain("blob too large"); 270 + }); 271 + }); 272 + 273 + describe("getBlob", () => { 274 + it("retrieves an uploaded blob with correct content and Content-Type", async () => { 275 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 276 + const { app, config } = createTestApp(); 277 + const signup = await performSignup(app, config); 278 + const signupBody = await signup.response.json(); 279 + 280 + const content = new TextEncoder().encode("round trip"); 281 + const uploadRes = await authenticatedUpload( 282 + app, 283 + signup.accessToken, 284 + signup.privateKey, 285 + signup.jwk, 286 + content, 287 + "text/plain", 288 + ); 289 + const uploadBody = await uploadRes.json(); 290 + const cid = uploadBody.blob.ref.$link; 291 + 292 + const getRes = await app.request( 293 + `http://localhost/xrpc/com.atproto.sync.getBlob?did=${signupBody.did}&cid=${cid}`, 294 + ); 295 + 296 + expect(getRes.status).toBe(200); 297 + expect(getRes.headers.get("content-type")).toBe("text/plain"); 298 + const returnedBytes = new Uint8Array(await getRes.arrayBuffer()); 299 + expect(returnedBytes).toEqual(content); 300 + }); 301 + 302 + it("returns 404 for non-existent blob", async () => { 303 + const { app } = createTestApp(); 304 + const res = await app.request( 305 + "http://localhost/xrpc/com.atproto.sync.getBlob?did=did:plc:test&cid=bafkreinotreal", 306 + ); 307 + 308 + expect(res.status).toBe(404); 309 + const body = await res.json(); 310 + expect(body.error).toBe("BlobNotFound"); 311 + }); 312 + 313 + it("returns 400 when missing parameters", async () => { 314 + const { app } = createTestApp(); 315 + const res = await app.request("http://localhost/xrpc/com.atproto.sync.getBlob"); 316 + 317 + expect(res.status).toBe(400); 318 + }); 319 + }); 320 + 321 + describe("listBlobs", () => { 322 + it("lists all blobs for a DID", async () => { 323 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 324 + const { app, config } = createTestApp(); 325 + const signup = await performSignup(app, config); 326 + const signupBody = await signup.response.json(); 327 + 328 + const blob1 = new TextEncoder().encode("blob one"); 329 + const blob2 = new TextEncoder().encode("blob two"); 330 + await authenticatedUpload( 331 + app, 332 + signup.accessToken, 333 + signup.privateKey, 334 + signup.jwk, 335 + blob1, 336 + "text/plain", 337 + ); 338 + await authenticatedUpload( 339 + app, 340 + signup.accessToken, 341 + signup.privateKey, 342 + signup.jwk, 343 + blob2, 344 + "text/plain", 345 + ); 346 + 347 + const res = await app.request( 348 + `http://localhost/xrpc/com.atproto.sync.listBlobs?did=${signupBody.did}`, 349 + ); 350 + 351 + expect(res.status).toBe(200); 352 + const body = await res.json(); 353 + expect(body.cids).toHaveLength(2); 354 + expect(body.cursor).toBeUndefined(); 355 + }); 356 + 357 + it("paginates with cursor", async () => { 358 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 359 + const { app, config } = createTestApp(); 360 + const signup = await performSignup(app, config); 361 + const signupBody = await signup.response.json(); 362 + 363 + for (let i = 0; i < 3; i++) { 364 + const content = new TextEncoder().encode(`blob ${i}`); 365 + await authenticatedUpload( 366 + app, 367 + signup.accessToken, 368 + signup.privateKey, 369 + signup.jwk, 370 + content, 371 + "text/plain", 372 + ); 373 + } 374 + 375 + const res1 = await app.request( 376 + `http://localhost/xrpc/com.atproto.sync.listBlobs?did=${signupBody.did}&limit=2`, 377 + ); 378 + const body1 = await res1.json(); 379 + expect(body1.cids).toHaveLength(2); 380 + expect(body1.cursor).toBeDefined(); 381 + 382 + const res2 = await app.request( 383 + `http://localhost/xrpc/com.atproto.sync.listBlobs?did=${signupBody.did}&limit=2&cursor=${body1.cursor}`, 384 + ); 385 + const body2 = await res2.json(); 386 + expect(body2.cids).toHaveLength(1); 387 + expect(body2.cursor).toBeUndefined(); 388 + }); 389 + 390 + it("filters by since parameter", async () => { 391 + vi.stubGlobal("fetch", async () => new Response("", { status: 200 })); 392 + const { app, db, config } = createTestApp(); 393 + const signup = await performSignup(app, config); 394 + const signupBody = await signup.response.json(); 395 + 396 + const blob1 = new TextEncoder().encode("old blob"); 397 + await authenticatedUpload( 398 + app, 399 + signup.accessToken, 400 + signup.privateKey, 401 + signup.jwk, 402 + blob1, 403 + "text/plain", 404 + ); 405 + 406 + db.prepare("UPDATE blobs SET created_at = '2020-01-01 00:00:00'").run(); 407 + 408 + const blob2 = new TextEncoder().encode("new blob"); 409 + await authenticatedUpload( 410 + app, 411 + signup.accessToken, 412 + signup.privateKey, 413 + signup.jwk, 414 + blob2, 415 + "text/plain", 416 + ); 417 + 418 + const res = await app.request( 419 + `http://localhost/xrpc/com.atproto.sync.listBlobs?did=${signupBody.did}&since=2023-01-01 00:00:00`, 420 + ); 421 + const body = await res.json(); 422 + expect(body.cids).toHaveLength(1); 423 + }); 424 + 425 + it("returns 404 for unknown DID", async () => { 426 + const { app } = createTestApp(); 427 + const res = await app.request( 428 + "http://localhost/xrpc/com.atproto.sync.listBlobs?did=did:plc:unknown", 429 + ); 430 + expect(res.status).toBe(404); 431 + }); 432 + }); 433 + }); 434 + 435 + describe("relay announcement", () => { 436 + it("sends requestCrawl POST to each relay host", async () => { 437 + const mockFetch = vi.fn().mockResolvedValue(new Response("ok")); 438 + vi.stubGlobal("fetch", mockFetch); 439 + 440 + const config: Config = { 441 + hostname: "my-pds.example.com", 442 + handleDomain: "example.com", 443 + plcUrl: "https://plc.example.com", 444 + dbPath: ":memory:", 445 + blobDir: "/tmp/test", 446 + relayHosts: ["relay1.example.com", "relay2.example.com"], 447 + port: 3000, 448 + tosText: "tos", 449 + }; 450 + 451 + announceToRelays(config); 452 + await new Promise((resolve) => setTimeout(resolve, 10)); 453 + 454 + expect(mockFetch).toHaveBeenCalledTimes(2); 455 + expect(mockFetch).toHaveBeenCalledWith( 456 + "https://relay1.example.com/xrpc/com.atproto.sync.requestCrawl", 457 + expect.objectContaining({ 458 + method: "POST", 459 + body: JSON.stringify({ hostname: "my-pds.example.com" }), 460 + }), 461 + ); 462 + expect(mockFetch).toHaveBeenCalledWith( 463 + "https://relay2.example.com/xrpc/com.atproto.sync.requestCrawl", 464 + expect.objectContaining({ 465 + method: "POST", 466 + body: JSON.stringify({ hostname: "my-pds.example.com" }), 467 + }), 468 + ); 469 + }); 470 + 471 + it("logs errors but does not throw", async () => { 472 + const mockFetch = vi.fn().mockRejectedValue(new Error("network error")); 473 + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 474 + vi.stubGlobal("fetch", mockFetch); 475 + 476 + const config: Config = { 477 + hostname: "my-pds.example.com", 478 + handleDomain: "example.com", 479 + plcUrl: "https://plc.example.com", 480 + dbPath: ":memory:", 481 + blobDir: "/tmp/test", 482 + relayHosts: ["relay.example.com"], 483 + port: 3000, 484 + tosText: "tos", 485 + }; 486 + 487 + announceToRelays(config); 488 + await new Promise((resolve) => setTimeout(resolve, 10)); 489 + 490 + expect(errorSpy).toHaveBeenCalled(); 491 + errorSpy.mockRestore(); 492 + }); 493 + 494 + it("does nothing when relayHosts is empty", () => { 495 + const mockFetch = vi.fn(); 496 + vi.stubGlobal("fetch", mockFetch); 497 + 498 + const config: Config = { 499 + hostname: "my-pds.example.com", 500 + handleDomain: "example.com", 501 + plcUrl: "https://plc.example.com", 502 + dbPath: ":memory:", 503 + blobDir: "/tmp/test", 504 + relayHosts: [], 505 + port: 3000, 506 + tosText: "tos", 507 + }; 508 + 509 + announceToRelays(config); 510 + 511 + expect(mockFetch).not.toHaveBeenCalled(); 512 + }); 513 + });
+12 -1
test/firehose.test.ts
··· 7 7 import { Repo, WriteOpAction } from "@atproto/repo"; 8 8 import { Secp256k1Keypair } from "@atproto/crypto"; 9 9 import { decode as rawCborDecode } from "@atcute/cbor"; 10 + import type { Config } from "../src/config.js"; 10 11 import { initDatabase } from "../src/db.js"; 11 12 import { Sequencer } from "../src/sequencer.js"; 12 13 import { SqliteRepoStorage } from "../src/storage.js"; ··· 456 457 async function startServer() { 457 458 const app = new Hono(); 458 459 const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 459 - app.route("/", createSyncRoutes(db, sequencer, upgradeWebSocket)); 460 + const config: Config = { 461 + hostname: "test.example", 462 + handleDomain: "test.example", 463 + plcUrl: "https://plc.test", 464 + dbPath: ":memory:", 465 + blobDir: "/tmp/rookery-test-blobs", 466 + relayHosts: [], 467 + port: 3000, 468 + tosText: "test tos", 469 + }; 470 + app.route("/", createSyncRoutes(db, config, sequencer, upgradeWebSocket)); 460 471 461 472 return new Promise<{ port: number; close: () => void }>((resolve) => { 462 473 const server = serve({ fetch: app.fetch, port: 0 }, (info) => {
+2
test/identity.test.ts
··· 17 17 handleDomain: "test.example", 18 18 plcUrl: "https://plc.test", 19 19 dbPath: ":memory:", 20 + blobDir: "/tmp/rookery-test-blobs", 21 + relayHosts: [], 20 22 port: 3000, 21 23 tosText: DEFAULT_TOS_TEXT, 22 24 };
+5 -1
test/repo.test.ts
··· 18 18 handleDomain: "test.example", 19 19 plcUrl: "https://plc.test", 20 20 dbPath: ":memory:", 21 + blobDir: "/tmp/rookery-test-blobs", 22 + relayHosts: [], 21 23 port: 3000, 22 24 tosText: "test tos", 23 25 }; ··· 185 187 handleDomain: "test.example.com", 186 188 plcUrl: "https://plc.example.com", 187 189 dbPath: ":memory:", 190 + blobDir: "/tmp/rookery-test-blobs", 191 + relayHosts: [], 188 192 port: 3000, 189 193 tosText: DEFAULT_TOS_TEXT, 190 194 }; 191 195 const app = createApp(config, db); 192 - app.route("/", createSyncRoutes(db)); 196 + app.route("/", createSyncRoutes(db, config)); 193 197 app.route("/", createRepoRoutes(db, config)); 194 198 return { app, db, config }; 195 199 }
+12 -1
test/sync.test.ts
··· 3 3 import { Hono } from "hono"; 4 4 import { Repo, WriteOpAction, readCarWithRoot } from "@atproto/repo"; 5 5 import { Secp256k1Keypair } from "@atproto/crypto"; 6 + import type { Config } from "../src/config.js"; 6 7 import { initDatabase } from "../src/db.js"; 7 8 import { SqliteRepoStorage } from "../src/storage.js"; 8 9 import { createSyncRoutes } from "../src/sync.js"; ··· 43 44 44 45 beforeEach(() => { 45 46 db = initDatabase(":memory:"); 46 - app = createSyncRoutes(db); 47 + const config: Config = { 48 + hostname: "test.example", 49 + handleDomain: "test.example", 50 + plcUrl: "https://plc.test", 51 + dbPath: ":memory:", 52 + blobDir: "/tmp/rookery-test-blobs", 53 + relayHosts: [], 54 + port: 3000, 55 + tosText: "test tos", 56 + }; 57 + app = createSyncRoutes(db, config); 47 58 }); 48 59 49 60 describe("getRepo", () => {