open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents atprotocol pds cloudflare
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add L14 SequencerDurableObject + subscribeRepos firehose

Singleton SequencerDurableObject provides global monotonic event ordering
for the AT Proto subscribeRepos firehose. Account DOs call the Sequencer
via RPC after each repo write; the Sequencer assigns a sequence number,
persists the CBOR-encoded frame, and broadcasts to WebSocket subscribers
with cursor-based backfill and hibernation support.

- New SequencerDurableObject with SQLite-backed firehose_events table
- RPC methods: sequenceCommit, sequenceIdentity, sequenceAccount
- WebSocket endpoint: GET /xrpc/com.atproto.sync.subscribeRepos
- Account DO emits commit events after record create/delete/applyWrites
- Signup emits identity + account events
- Binary frame encoding: two concatenated DAG-CBOR values (header + body)
- Cursor-based backfill on WebSocket connect
- WebSocket hibernation for idle subscribers

+677 -4
+76
src/account-do.ts
··· 83 83 return this.repo; 84 84 } 85 85 86 + private async emitCommitEvent( 87 + prevDataCid: string | null, 88 + ops: Array<{ 89 + action: "create" | "update" | "delete"; 90 + path: string; 91 + cid: string | null; 92 + }>, 93 + ): Promise<void> { 94 + const commit = this.storage!.lastCommit!; 95 + const carBytes = await blocksToCarFile(commit.cid, commit.newBlocks); 96 + const seqId = this.env.SEQUENCER.idFromName("sequencer"); 97 + const seqStub = this.env.SEQUENCER.get(seqId); 98 + await seqStub.sequenceCommit({ 99 + did: this.repo!.did, 100 + commit: commit.cid.toString(), 101 + rev: commit.rev, 102 + since: commit.since, 103 + prevData: prevDataCid, 104 + blocks: carBytes, 105 + ops, 106 + }); 107 + } 108 + 86 109 /** 87 110 * RPC: Provision a new account in this DO. 88 111 * Must be called before any repo operations. ··· 114 137 this.keypair = null; 115 138 116 139 await this.ensureRepoInitialized(); 140 + 141 + // Emit identity and account events for firehose 142 + const seqId = this.env.SEQUENCER.idFromName("sequencer"); 143 + const seqStub = this.env.SEQUENCER.get(seqId); 144 + await seqStub.sequenceIdentity(opts.did, opts.handle); 145 + await seqStub.sequenceAccount(opts.did, true, null); 117 146 118 147 return { did: opts.did, handle: opts.handle }; 119 148 } ··· 233 262 const repo = await this.getRepo(); 234 263 await this.ensureRepoInitialized(); 235 264 const keypair = this.keypair!; 265 + const prevDataCid = this.storage!.getState()?.prev_data_cid ?? null; 236 266 237 267 const actualRkey = rkey || nextRkey(); 238 268 const createOp: RecordCreateOp = { ··· 252 282 } 253 283 254 284 this.storage!.addCollection(collection); 285 + 286 + await this.emitCommitEvent(prevDataCid, [ 287 + { 288 + action: "create", 289 + path: `${collection}/${actualRkey}`, 290 + cid: recordCid.toString(), 291 + }, 292 + ]); 255 293 256 294 return { 257 295 uri: `at://${this.repo.did}/${collection}/${actualRkey}`, ··· 271 309 const repo = await this.getRepo(); 272 310 await this.ensureRepoInitialized(); 273 311 const keypair = this.keypair!; 312 + const prevDataCid = this.storage!.getState()?.prev_data_cid ?? null; 274 313 275 314 const deleteOp: RecordDeleteOp = { 276 315 action: WriteOpAction.Delete, ··· 280 319 281 320 const updatedRepo = await repo.applyWrites([deleteOp], keypair); 282 321 this.repo = updatedRepo; 322 + 323 + await this.emitCommitEvent(prevDataCid, [ 324 + { 325 + action: "delete", 326 + path: `${collection}/${rkey}`, 327 + cid: null, 328 + }, 329 + ]); 283 330 284 331 return { 285 332 commit: { ··· 304 351 const repo = await this.getRepo(); 305 352 await this.ensureRepoInitialized(); 306 353 const keypair = this.keypair!; 354 + const prevDataCid = this.storage!.getState()?.prev_data_cid ?? null; 307 355 308 356 const ops: RecordWriteOp[] = []; 309 357 const results: unknown[] = []; ··· 340 388 341 389 const updatedRepo = await repo.applyWrites(ops, keypair); 342 390 this.repo = updatedRepo; 391 + 392 + // Build firehose ops from the already-constructed ops array 393 + const firehoseOps: Array<{ 394 + action: "create" | "update" | "delete"; 395 + path: string; 396 + cid: string | null; 397 + }> = []; 398 + for (const op of ops) { 399 + if (op.action === WriteOpAction.Create) { 400 + const createOp = op as RecordCreateOp; 401 + const recordCid = await this.repo.data.get( 402 + `${createOp.collection}/${createOp.rkey}`, 403 + ); 404 + firehoseOps.push({ 405 + action: "create", 406 + path: `${createOp.collection}/${createOp.rkey}`, 407 + cid: recordCid?.toString() ?? null, 408 + }); 409 + } else if (op.action === WriteOpAction.Delete) { 410 + const deleteOp = op as RecordDeleteOp; 411 + firehoseOps.push({ 412 + action: "delete", 413 + path: `${deleteOp.collection}/${deleteOp.rkey}`, 414 + cid: null, 415 + }); 416 + } 417 + } 418 + await this.emitCommitEvent(prevDataCid, firehoseOps); 343 419 344 420 return { 345 421 commit: {
+250
src/sequencer-do.ts
··· 1 + import { DurableObject } from "cloudflare:workers"; 2 + import { CID } from "@atproto/lex-data"; 3 + import { encode as cborEncode } from "./cbor-compat"; 4 + import type { Env } from "./types"; 5 + 6 + const SCHEMA = ` 7 + CREATE TABLE IF NOT EXISTS firehose_events ( 8 + seq INTEGER PRIMARY KEY AUTOINCREMENT, 9 + did TEXT NOT NULL, 10 + event_type TEXT NOT NULL, 11 + payload BLOB NOT NULL, 12 + created_at TEXT NOT NULL DEFAULT (datetime('now')) 13 + ); 14 + `; 15 + 16 + function concatBytes(a: Uint8Array, b: Uint8Array): Uint8Array { 17 + const result = new Uint8Array(a.length + b.length); 18 + result.set(a, 0); 19 + result.set(b, a.length); 20 + return result; 21 + } 22 + 23 + export class SequencerDurableObject extends DurableObject<Env> { 24 + private initialized = false; 25 + 26 + constructor(ctx: DurableObjectState, env: Env) { 27 + super(ctx, env); 28 + } 29 + 30 + private async ensureInitialized(): Promise<void> { 31 + if (!this.initialized) { 32 + await this.ctx.blockConcurrencyWhile(async () => { 33 + if (this.initialized) return; 34 + this.ctx.storage.sql.exec(SCHEMA); 35 + this.initialized = true; 36 + }); 37 + } 38 + } 39 + 40 + async sequenceCommit(data: { 41 + did: string; 42 + commit: string; 43 + rev: string; 44 + since: string | null; 45 + prevData: string | null; 46 + blocks: Uint8Array; 47 + ops: Array<{ 48 + action: "create" | "update" | "delete"; 49 + path: string; 50 + cid: string | null; 51 + }>; 52 + }): Promise<{ seq: number }> { 53 + await this.ensureInitialized(); 54 + 55 + const result = this.ctx.storage.sql 56 + .exec( 57 + "INSERT INTO firehose_events (did, event_type, payload) VALUES (?, 'commit', x'00') RETURNING seq", 58 + data.did, 59 + ) 60 + .one(); 61 + const seq = result.seq as number; 62 + 63 + const header = cborEncode({ op: 1, t: "#commit" }); 64 + const body = cborEncode({ 65 + seq, 66 + repo: data.did, 67 + commit: CID.parse(data.commit), 68 + rev: data.rev, 69 + since: data.since, 70 + blocks: data.blocks, 71 + ops: data.ops.map((op) => ({ 72 + action: op.action, 73 + path: op.path, 74 + cid: op.cid ? CID.parse(op.cid) : null, 75 + })), 76 + prevData: data.prevData ? CID.parse(data.prevData) : null, 77 + rebase: false, 78 + tooBig: data.blocks.length > 1_000_000, 79 + blobs: [], 80 + time: new Date().toISOString(), 81 + }); 82 + const frame = concatBytes(header, body); 83 + 84 + this.ctx.storage.sql.exec( 85 + "UPDATE firehose_events SET payload = ? WHERE seq = ?", 86 + frame, 87 + seq, 88 + ); 89 + 90 + this.broadcast(frame, seq); 91 + return { seq }; 92 + } 93 + 94 + async sequenceIdentity( 95 + did: string, 96 + handle: string, 97 + ): Promise<{ seq: number }> { 98 + await this.ensureInitialized(); 99 + 100 + const result = this.ctx.storage.sql 101 + .exec( 102 + "INSERT INTO firehose_events (did, event_type, payload) VALUES (?, 'identity', x'00') RETURNING seq", 103 + did, 104 + ) 105 + .one(); 106 + const seq = result.seq as number; 107 + const time = new Date().toISOString(); 108 + 109 + const header = cborEncode({ op: 1, t: "#identity" }); 110 + const body = cborEncode({ seq, did, handle, time }); 111 + const frame = concatBytes(header, body); 112 + 113 + this.ctx.storage.sql.exec( 114 + "UPDATE firehose_events SET payload = ? WHERE seq = ?", 115 + frame, 116 + seq, 117 + ); 118 + 119 + this.broadcast(frame, seq); 120 + return { seq }; 121 + } 122 + 123 + async sequenceAccount( 124 + did: string, 125 + active: boolean, 126 + status: string | null, 127 + ): Promise<{ seq: number }> { 128 + await this.ensureInitialized(); 129 + 130 + const result = this.ctx.storage.sql 131 + .exec( 132 + "INSERT INTO firehose_events (did, event_type, payload) VALUES (?, 'account', x'00') RETURNING seq", 133 + did, 134 + ) 135 + .one(); 136 + const seq = result.seq as number; 137 + const time = new Date().toISOString(); 138 + 139 + const header = cborEncode({ op: 1, t: "#account" }); 140 + const body = cborEncode({ seq, did, active, status, time }); 141 + const frame = concatBytes(header, body); 142 + 143 + this.ctx.storage.sql.exec( 144 + "UPDATE firehose_events SET payload = ? WHERE seq = ?", 145 + frame, 146 + seq, 147 + ); 148 + 149 + this.broadcast(frame, seq); 150 + return { seq }; 151 + } 152 + 153 + override async fetch(request: Request): Promise<Response> { 154 + await this.ensureInitialized(); 155 + 156 + const upgradeHeader = request.headers.get("Upgrade"); 157 + if (!upgradeHeader || upgradeHeader.toLowerCase() !== "websocket") { 158 + return new Response("Expected WebSocket upgrade", { status: 426 }); 159 + } 160 + 161 + const url = new URL(request.url); 162 + const cursorParam = url.searchParams.get("cursor"); 163 + const cursor = cursorParam ? parseInt(cursorParam, 10) : null; 164 + if (cursorParam !== null && (Number.isNaN(cursor) || cursor < 0)) { 165 + return new Response("Invalid cursor", { status: 400 }); 166 + } 167 + 168 + const pair = new WebSocketPair(); 169 + const [client, server] = [pair[0], pair[1]]; 170 + 171 + this.ctx.acceptWebSocket(server); 172 + server.serializeAttachment({ cursor: cursor ?? 0 }); 173 + 174 + if (cursor !== null) { 175 + await this.backfill(server, cursor); 176 + } 177 + 178 + return new Response(null, { status: 101, webSocket: client }); 179 + } 180 + 181 + override webSocketMessage( 182 + _ws: WebSocket, 183 + _message: string | ArrayBuffer, 184 + ): void {} 185 + 186 + override webSocketClose( 187 + _ws: WebSocket, 188 + _code: number, 189 + _reason: string, 190 + _wasClean: boolean, 191 + ): void {} 192 + 193 + private async backfill(ws: WebSocket, cursor: number): Promise<void> { 194 + const latestSeq = this.getLatestSeq(); 195 + 196 + if (cursor > latestSeq && latestSeq > 0) { 197 + const frame = concatBytes( 198 + cborEncode({ op: -1 }), 199 + cborEncode({ 200 + error: "FutureCursor", 201 + message: "Cursor is in the future", 202 + }), 203 + ); 204 + ws.send(frame); 205 + ws.close(1008, "FutureCursor"); 206 + return; 207 + } 208 + 209 + const rows = this.ctx.storage.sql 210 + .exec( 211 + "SELECT seq, payload FROM firehose_events WHERE seq > ? ORDER BY seq ASC LIMIT 1000", 212 + cursor, 213 + ) 214 + .toArray(); 215 + 216 + let lastSeq = cursor; 217 + for (const row of rows) { 218 + lastSeq = row.seq as number; 219 + ws.send(new Uint8Array(row.payload as ArrayBuffer)); 220 + } 221 + 222 + if (lastSeq > cursor) { 223 + const attachment = (ws.deserializeAttachment() ?? { 224 + cursor, 225 + }) as { cursor: number }; 226 + attachment.cursor = lastSeq; 227 + ws.serializeAttachment(attachment); 228 + } 229 + } 230 + 231 + private broadcast(frame: Uint8Array, seq: number): void { 232 + for (const ws of this.ctx.getWebSockets()) { 233 + try { 234 + ws.send(frame); 235 + const attachment = (ws.deserializeAttachment() ?? { 236 + cursor: 0, 237 + }) as { cursor: number }; 238 + attachment.cursor = seq; 239 + ws.serializeAttachment(attachment); 240 + } catch {} 241 + } 242 + } 243 + 244 + private getLatestSeq(): number { 245 + const result = this.ctx.storage.sql 246 + .exec("SELECT MAX(seq) as seq FROM firehose_events") 247 + .one(); 248 + return (result?.seq as number) ?? 0; 249 + } 250 + }
+3
src/types.ts
··· 1 1 import type { AccountDurableObject } from "./account-do"; 2 + import type { SequencerDurableObject } from "./sequencer-do"; 2 3 3 4 /** 4 5 * Environment bindings for the rookery Worker. ··· 7 8 export interface Env { 8 9 /** Durable Object namespace for account storage */ 9 10 ACCOUNT: DurableObjectNamespace<AccountDurableObject>; 11 + /** Durable Object namespace for firehose sequencing */ 12 + SEQUENCER: DurableObjectNamespace<SequencerDurableObject>; 10 13 /** D1 account directory for cross-account queries */ 11 14 DIRECTORY: D1Database; 12 15 /** Public hostname of the PDS */
+8
src/worker.ts
··· 1 1 export { AccountDurableObject } from "./account-do"; 2 + export { SequencerDurableObject } from "./sequencer-do"; 2 3 3 4 import { Hono } from "hono"; 4 5 import { ··· 182 183 } 183 184 throw err; 184 185 } 186 + }); 187 + 188 + // GET /xrpc/com.atproto.sync.subscribeRepos 189 + app.get("/xrpc/com.atproto.sync.subscribeRepos", async (c) => { 190 + const seqId = c.env.SEQUENCER.idFromName("sequencer"); 191 + const seqStub = c.env.SEQUENCER.get(seqId); 192 + return seqStub.fetch(c.req.raw); 185 193 }); 186 194 187 195 export default app;
+5 -1
test/fixtures/worker/index.ts
··· 1 - export { default, AccountDurableObject } from "../../../src/worker"; 1 + export { 2 + default, 3 + AccountDurableObject, 4 + SequencerDurableObject, 5 + } from "../../../src/worker";
+8 -1
test/fixtures/worker/wrangler.jsonc
··· 3 3 "name": "rookery-test", 4 4 "main": "index.ts", 5 5 "compatibility_date": "2025-01-01", 6 - "compatibility_flags": ["nodejs_compat"], 7 6 "durable_objects": { 8 7 "bindings": [ 9 8 { 10 9 "name": "ACCOUNT", 11 10 "class_name": "AccountDurableObject" 11 + }, 12 + { 13 + "name": "SEQUENCER", 14 + "class_name": "SequencerDurableObject" 12 15 } 13 16 ] 14 17 }, ··· 16 19 { 17 20 "tag": "v1", 18 21 "new_sqlite_classes": ["AccountDurableObject"] 22 + }, 23 + { 24 + "tag": "v2", 25 + "new_sqlite_classes": ["SequencerDurableObject"] 19 26 } 20 27 ], 21 28 "d1_databases": [
+267
test/sequencer-do.test.ts
··· 1 + import { beforeEach, describe, expect, it, vi } from "vitest"; 2 + import { BlockMap } from "@atproto/repo"; 3 + import { resolveRepo } from "../src/directory"; 4 + import { AccountDurableObject } from "../src/account-do"; 5 + import { SequencerDurableObject } from "../src/sequencer-do"; 6 + import { env, runInDurableObject, worker } from "./helpers"; 7 + 8 + function getSequencerStub() { 9 + const id = env.SEQUENCER.idFromName("sequencer"); 10 + return env.SEQUENCER.get(id); 11 + } 12 + 13 + async function createCid(data: unknown): Promise<string> { 14 + const blocks = new BlockMap(); 15 + const cid = await blocks.add(data as Record<string, unknown>); 16 + return cid.toString(); 17 + } 18 + 19 + async function resetSequencer(): Promise<void> { 20 + const stub = getSequencerStub(); 21 + await runInDurableObject(stub, async (instance: SequencerDurableObject) => { 22 + await instance.sequenceIdentity("did:plc:reset", "reset.rookery.test"); 23 + const ctx = (instance as unknown as { ctx: DurableObjectState }).ctx; 24 + ctx.storage.sql.exec("DELETE FROM firehose_events"); 25 + ctx.storage.sql.exec( 26 + "DELETE FROM sqlite_sequence WHERE name = 'firehose_events'", 27 + ); 28 + }); 29 + } 30 + 31 + async function getFirehoseRows(): Promise< 32 + Array<{ seq: number; did: string; event_type: string; payload: ArrayBuffer }> 33 + > { 34 + const stub = getSequencerStub(); 35 + return runInDurableObject(stub, async (instance: SequencerDurableObject) => { 36 + const ctx = (instance as unknown as { ctx: DurableObjectState }).ctx; 37 + return ctx.storage.sql 38 + .exec("SELECT seq, did, event_type, payload FROM firehose_events ORDER BY seq ASC") 39 + .toArray() as Array<{ 40 + seq: number; 41 + did: string; 42 + event_type: string; 43 + payload: ArrayBuffer; 44 + }>; 45 + }); 46 + } 47 + 48 + async function createAccountViaSignup(handle: string): Promise<{ did: string }> { 49 + const originalFetch = globalThis.fetch.bind(globalThis); 50 + const fetchSpy = vi 51 + .spyOn(globalThis, "fetch") 52 + .mockImplementation(async (input, init) => { 53 + const url = 54 + typeof input === "string" ? input : input instanceof Request ? input.url : input.url; 55 + if (url.startsWith("https://plc.directory/")) { 56 + return new Response(null, { status: 200 }); 57 + } 58 + return originalFetch(input as RequestInfo | URL, init); 59 + }); 60 + 61 + try { 62 + const response = await worker.fetch( 63 + new Request("http://localhost/api/signup", { 64 + method: "POST", 65 + headers: { "Content-Type": "application/json" }, 66 + body: JSON.stringify({ handle }), 67 + }), 68 + env, 69 + ); 70 + expect(response.status).toBe(200); 71 + return response.json<{ did: string }>(); 72 + } finally { 73 + fetchSpy.mockRestore(); 74 + } 75 + } 76 + 77 + async function waitForMessages( 78 + ws: WebSocket, 79 + count: number, 80 + ): Promise<Array<string | ArrayBuffer>> { 81 + const messages: Array<string | ArrayBuffer> = []; 82 + return new Promise((resolve, reject) => { 83 + const timeout = setTimeout(() => { 84 + cleanup(); 85 + reject(new Error(`Timed out waiting for ${count} messages`)); 86 + }, 1000); 87 + 88 + const onMessage = (event: MessageEvent<string | ArrayBuffer>) => { 89 + messages.push(event.data); 90 + if (messages.length >= count) { 91 + cleanup(); 92 + resolve(messages); 93 + } 94 + }; 95 + 96 + const cleanup = () => { 97 + clearTimeout(timeout); 98 + ws.removeEventListener("message", onMessage); 99 + }; 100 + 101 + ws.addEventListener("message", onMessage); 102 + }); 103 + } 104 + 105 + beforeEach(async () => { 106 + await resetSequencer(); 107 + }); 108 + 109 + describe("SequencerDurableObject", () => { 110 + it("initializes and assigns monotonic seq numbers", async () => { 111 + const stub = getSequencerStub(); 112 + 113 + await runInDurableObject(stub, async (instance: SequencerDurableObject) => { 114 + const first = await instance.sequenceIdentity( 115 + "did:plc:test-seq-1", 116 + "first.rookery.test", 117 + ); 118 + const second = await instance.sequenceIdentity( 119 + "did:plc:test-seq-2", 120 + "second.rookery.test", 121 + ); 122 + 123 + expect(first.seq).toBe(1); 124 + expect(second.seq).toBe(2); 125 + }); 126 + }); 127 + 128 + it("sequenceCommit stores an event and increments seq", async () => { 129 + const stub = getSequencerStub(); 130 + const commitCid = await createCid({ type: "commit" }); 131 + const prevDataCid = await createCid({ data: "prev" }); 132 + const recordCid = await createCid({ record: "create" }); 133 + 134 + await runInDurableObject(stub, async (instance: SequencerDurableObject) => { 135 + const identity = await instance.sequenceIdentity( 136 + "did:plc:test-identity", 137 + "identity.rookery.test", 138 + ); 139 + const commit = await instance.sequenceCommit({ 140 + did: "did:plc:test-identity", 141 + commit: commitCid, 142 + rev: "rev-1", 143 + since: null, 144 + prevData: prevDataCid, 145 + blocks: new Uint8Array([]), 146 + ops: [ 147 + { 148 + action: "create", 149 + path: "app.bsky.feed.post/test", 150 + cid: recordCid, 151 + }, 152 + ], 153 + }); 154 + 155 + expect(identity.seq).toBe(1); 156 + expect(commit.seq).toBe(2); 157 + }); 158 + 159 + const rows = await getFirehoseRows(); 160 + expect(rows.map((row) => row.event_type)).toEqual(["identity", "commit"]); 161 + }); 162 + 163 + it("sequenceAccount stores an account event", async () => { 164 + const stub = getSequencerStub(); 165 + 166 + await runInDurableObject(stub, async (instance: SequencerDurableObject) => { 167 + const result = await instance.sequenceAccount("did:plc:test", true, null); 168 + expect(result).toEqual({ seq: 1 }); 169 + }); 170 + 171 + const rows = await getFirehoseRows(); 172 + expect(rows).toHaveLength(1); 173 + expect(rows[0].event_type).toBe("account"); 174 + }); 175 + 176 + it("emits identity and account events on signup", async () => { 177 + const handle = `signup-${Date.now().toString(36)}`; 178 + await createAccountViaSignup(handle); 179 + 180 + const rows = await getFirehoseRows(); 181 + expect(rows).toHaveLength(2); 182 + expect(rows.map((row) => row.event_type)).toEqual(["identity", "account"]); 183 + }); 184 + 185 + it("emits a commit event when creating a record", async () => { 186 + const handle = `record-${Date.now().toString(36)}`; 187 + await createAccountViaSignup(handle); 188 + 189 + const resolved = await resolveRepo(`${handle}.rookery.test`, env); 190 + const doId = env.ACCOUNT.idFromString(resolved.doId); 191 + const stub = env.ACCOUNT.get(doId); 192 + 193 + await runInDurableObject(stub, async (instance: AccountDurableObject) => { 194 + await instance.rpcCreateRecord("app.bsky.feed.post", "test-rkey", { 195 + text: "hello", 196 + createdAt: new Date().toISOString(), 197 + }); 198 + }); 199 + 200 + const rows = await getFirehoseRows(); 201 + expect(rows.map((row) => row.event_type)).toEqual([ 202 + "identity", 203 + "account", 204 + "commit", 205 + ]); 206 + }); 207 + 208 + it("returns 101 for websocket upgrades", async () => { 209 + const stub = getSequencerStub(); 210 + const response = await stub.fetch( 211 + new Request("http://fake-host/xrpc/com.atproto.sync.subscribeRepos", { 212 + headers: { Upgrade: "websocket" }, 213 + }), 214 + ); 215 + 216 + expect(response.status).toBe(101); 217 + expect(response.webSocket).toBeDefined(); 218 + response.webSocket?.accept(); 219 + response.webSocket?.close(); 220 + }); 221 + 222 + it("rejects invalid cursor values", async () => { 223 + const stub = getSequencerStub(); 224 + const response = await stub.fetch( 225 + new Request( 226 + "http://fake-host/xrpc/com.atproto.sync.subscribeRepos?cursor=not-a-number", 227 + { 228 + headers: { Upgrade: "websocket" }, 229 + }, 230 + ), 231 + ); 232 + 233 + expect(response.status).toBe(400); 234 + expect(await response.text()).toBe("Invalid cursor"); 235 + }); 236 + 237 + it("backfills only events after the requested cursor", async () => { 238 + const stub = getSequencerStub(); 239 + 240 + await runInDurableObject(stub, async (instance: SequencerDurableObject) => { 241 + await instance.sequenceIdentity("did:plc:test-1", "one.rookery.test"); 242 + await instance.sequenceIdentity("did:plc:test-2", "two.rookery.test"); 243 + await instance.sequenceIdentity("did:plc:test-3", "three.rookery.test"); 244 + }); 245 + 246 + const response = await stub.fetch( 247 + new Request( 248 + "http://fake-host/xrpc/com.atproto.sync.subscribeRepos?cursor=2", 249 + { 250 + headers: { Upgrade: "websocket" }, 251 + }, 252 + ), 253 + ); 254 + 255 + expect(response.status).toBe(101); 256 + const ws = response.webSocket; 257 + expect(ws).toBeDefined(); 258 + 259 + ws!.accept(); 260 + const messagesPromise = waitForMessages(ws!, 1); 261 + const messages = await messagesPromise; 262 + 263 + expect(messages).toHaveLength(1); 264 + expect(messages[0]).toBeInstanceOf(ArrayBuffer); 265 + ws!.close(); 266 + }); 267 + });
+22
test/shims/atcute-multibase.ts
··· 1 + export { 2 + fromBase16, 3 + toBase16, 4 + } from "../../node_modules/@atcute/multibase/dist/bases/base16-web.js"; 5 + export { 6 + fromBase64, 7 + fromBase64Pad, 8 + fromBase64Url, 9 + fromBase64UrlPad, 10 + toBase64, 11 + toBase64Pad, 12 + toBase64Url, 13 + toBase64UrlPad, 14 + } from "../../node_modules/@atcute/multibase/dist/bases/base64-web.js"; 15 + export { 16 + fromBase32, 17 + toBase32, 18 + } from "../../node_modules/@atcute/multibase/dist/bases/base32.js"; 19 + export { 20 + fromBase58Btc, 21 + toBase58Btc, 22 + } from "../../node_modules/@atcute/multibase/dist/bases/base58.js";
+1
test/shims/atcute-time-ms.ts
··· 1 + export * from "../../node_modules/@atcute/time-ms/dist/index.js";
+1
test/shims/atcute-util-text.ts
··· 1 + export * from "../../node_modules/@atcute/util-text/dist/index.js";
+10
test/shims/node-process.ts
··· 1 + const processValue = (globalThis as { process?: NodeJS.Process }).process ?? { 2 + arch: "x64", 3 + platform: "linux", 4 + env: {}, 5 + }; 6 + 7 + export const arch = processValue.arch; 8 + export const platform = processValue.platform; 9 + export const env = processValue.env; 10 + export default processValue;
+20 -1
vitest.config.ts
··· 1 1 import { defineWorkersConfig } from "@cloudflare/vitest-pool-workers/config"; 2 2 3 + const atcuteMultibaseShim = new URL("./test/shims/atcute-multibase.ts", import.meta.url) 4 + .pathname; 5 + const atcuteTimeMsShim = new URL("./test/shims/atcute-time-ms.ts", import.meta.url) 6 + .pathname; 7 + const atcuteUtilTextShim = new URL( 8 + "./test/shims/atcute-util-text.ts", 9 + import.meta.url, 10 + ).pathname; 11 + const nodeProcessShim = new URL("./test/shims/node-process.ts", import.meta.url) 12 + .pathname; 13 + 3 14 export default defineWorkersConfig({ 4 15 resolve: { 5 - conditions: ["worker", "browser", "node", "require"], 16 + conditions: ["worker", "browser", "require"], 6 17 alias: { 18 + "@atcute/multibase": atcuteMultibaseShim, 19 + "@atcute/time-ms": atcuteTimeMsShim, 20 + "@atcute/util-text": atcuteUtilTextShim, 21 + "node:process": nodeProcessShim, 7 22 pino: "pino/browser.js", 8 23 }, 9 24 }, ··· 15 30 optimizer: { 16 31 ssr: { 17 32 include: [ 33 + "@atcute/cbor", 34 + "@atcute/cid", 35 + "@atcute/multibase", 18 36 "@atcute/tid", 19 37 "@atcute/time-ms", 38 + "@atcute/util-text", 20 39 "@atproto/common", 21 40 "@atproto/repo", 22 41 "@atproto/crypto",
+6 -1
wrangler.toml
··· 5 5 6 6 [durable_objects] 7 7 bindings = [ 8 - { name = "ACCOUNT", class_name = "AccountDurableObject" } 8 + { name = "ACCOUNT", class_name = "AccountDurableObject" }, 9 + { name = "SEQUENCER", class_name = "SequencerDurableObject" } 9 10 ] 10 11 11 12 [[migrations]] 12 13 tag = "v1" 13 14 new_sqlite_classes = ["AccountDurableObject"] 15 + 16 + [[migrations]] 17 + tag = "v2" 18 + new_sqlite_classes = ["SequencerDurableObject"] 14 19 15 20 [[d1_databases]] 16 21 binding = "DIRECTORY"