open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents atprotocol pds cloudflare
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

L7: firehose — event sequencer + subscribeRepos WebSocket

Implements the firehose event system for rookery:

- Event sequencer (src/sequencer.ts): sequenceCommit, sequenceIdentity,
sequenceAccount with CBOR-encoded Sync v1.1 frame format, cursor-based
backfill via getEventsSince, event pruning, and in-process subscriber
broadcast
- WebSocket endpoint: GET /xrpc/com.atproto.sync.subscribeRepos with
cursor-based backfill and live streaming via @hono/node-ws
- Write integration: createRecord, putRecord, deleteRecord, applyWrites
all emit commit events with correct ops, blocks (CAR), rev, prevData
- Signup integration: identity + account events emitted on signup
- Storage: captures last CommitData for sequencer access
- 27 new tests (119 total) covering all acceptance criteria

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+1064 -8
+11 -1
src/app.ts
··· 13 13 } from "./auth.js"; 14 14 import type { Config } from "./config.js"; 15 15 import { createDidPlc } from "./identity.js"; 16 + import type { Sequencer } from "./sequencer.js"; 16 17 import { SqliteRepoStorage } from "./storage.js"; 17 18 18 - export function createApp(config: Config, db: Database.Database): Hono<AuthEnv> { 19 + export function createApp( 20 + config: Config, 21 + db: Database.Database, 22 + sequencer?: Sequencer, 23 + ): Hono<AuthEnv> { 19 24 const app = new Hono<AuthEnv>(); 20 25 const authMiddleware = createAuthMiddleware(db, config); 21 26 const serviceOrigin = `https://${config.hostname}`; ··· 196 201 db.prepare("DELETE FROM blocks WHERE account_id = ?").run(accountId); 197 202 db.prepare("DELETE FROM accounts WHERE id = ?").run(accountId); 198 203 throw error; 204 + } 205 + 206 + if (sequencer) { 207 + sequencer.sequenceIdentity(did, fullHandle); 208 + sequencer.sequenceAccount(did, true); 199 209 } 200 210 201 211 return c.json({
+10 -4
src/index.ts
··· 1 1 import { serve } from "@hono/node-server"; 2 + import { createNodeWebSocket } from "@hono/node-ws"; 2 3 import { createApp } from "./app.js"; 3 4 import { loadConfig } from "./config.js"; 4 5 import { initDatabase } from "./db.js"; 5 6 import { createRepoRoutes } from "./repo.js"; 7 + import { Sequencer } from "./sequencer.js"; 6 8 import { createSyncRoutes } from "./sync.js"; 7 9 8 10 const config = loadConfig(); 9 11 const db = initDatabase(config.dbPath); 10 - const app = createApp(config, db); 11 - app.route("/", createSyncRoutes(db)); 12 - app.route("/", createRepoRoutes(db, config)); 12 + const sequencer = new Sequencer(db); 13 + const app = createApp(config, db, sequencer); 14 + const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 15 + 16 + app.route("/", createSyncRoutes(db, sequencer, upgradeWebSocket)); 17 + app.route("/", createRepoRoutes(db, config, sequencer)); 13 18 14 - serve({ fetch: app.fetch, port: config.port }, (info) => { 19 + const server = serve({ fetch: app.fetch, port: config.port }, (info) => { 15 20 console.log(`rookery listening on port ${info.port}`); 16 21 }); 22 + injectWebSocket(server);
+105 -2
src/repo.ts
··· 23 23 import type Database from "better-sqlite3"; 24 24 import { createAuthMiddleware, type AuthEnv } from "./auth.js"; 25 25 import type { Config } from "./config.js"; 26 + import type { FirehoseOp, Sequencer } from "./sequencer.js"; 26 27 import { SqliteRepoStorage } from "./storage.js"; 27 28 28 29 type AccountRow = { ··· 116 117 return { storage, repo, keypair }; 117 118 } 118 119 119 - export function createRepoRoutes(db: Database.Database, config: Config): Hono<AuthEnv> { 120 + export function createRepoRoutes( 121 + db: Database.Database, 122 + config: Config, 123 + sequencer?: Sequencer, 124 + ): Hono<AuthEnv> { 120 125 const app = new Hono<AuthEnv>(); 121 126 const authMiddleware = createAuthMiddleware(db, config); 122 127 ··· 311 316 ); 312 317 storage.addCollection(collectionResult.collection); 313 318 319 + if (sequencer && storage.lastCommit) { 320 + const prevDataCid = account.prev_data_cid 321 + ? CID.parse(account.prev_data_cid as string) 322 + : null; 323 + await sequencer.sequenceCommit({ 324 + did: account.did, 325 + commit: storage.lastCommit.cid, 326 + rev: storage.lastCommit.rev, 327 + since: storage.lastCommit.since, 328 + prevData: prevDataCid, 329 + newBlocks: storage.lastCommit.newBlocks, 330 + ops: [ 331 + { 332 + action: "create", 333 + path: `${collectionResult.collection}/${actualRkey}`, 334 + cid: recordCid!, 335 + }, 336 + ], 337 + }); 338 + } 339 + 314 340 return c.json({ 315 341 uri: `at://${account.did}/${collectionResult.collection}/${actualRkey}`, 316 342 cid: recordCid!.toString(), ··· 365 391 ); 366 392 storage.addCollection(collectionResult.collection); 367 393 394 + if (sequencer && storage.lastCommit) { 395 + const prevDataCid = account.prev_data_cid 396 + ? CID.parse(account.prev_data_cid as string) 397 + : null; 398 + await sequencer.sequenceCommit({ 399 + did: account.did, 400 + commit: storage.lastCommit.cid, 401 + rev: storage.lastCommit.rev, 402 + since: storage.lastCommit.since, 403 + prevData: prevDataCid, 404 + newBlocks: storage.lastCommit.newBlocks, 405 + ops: [ 406 + { 407 + action: existing ? "update" : "create", 408 + path: `${collectionResult.collection}/${rkeyResult.rkey}`, 409 + cid: recordCid!, 410 + }, 411 + ], 412 + }); 413 + } 414 + 368 415 return c.json({ 369 416 uri: `at://${account.did}/${collectionResult.collection}/${rkeyResult.rkey}`, 370 417 cid: recordCid!.toString(), ··· 393 440 const rkeyResult = validateRkey(c, body.rkey, true); 394 441 if (rkeyResult instanceof Response) return rkeyResult; 395 442 396 - const { repo, keypair } = await loadRepoState(db, account); 443 + const { storage, repo, keypair } = await loadRepoState(db, account); 397 444 const existing = await repo.getRecord(collectionResult.collection, rkeyResult.rkey!); 398 445 if (!existing) { 399 446 return c.json({ ··· 410 457 rkey: rkeyResult.rkey!, 411 458 }; 412 459 const updatedRepo = await repo.applyWrites([op], keypair); 460 + 461 + if (sequencer && storage.lastCommit) { 462 + const prevDataCid = account.prev_data_cid 463 + ? CID.parse(account.prev_data_cid as string) 464 + : null; 465 + await sequencer.sequenceCommit({ 466 + did: account.did, 467 + commit: storage.lastCommit.cid, 468 + rev: storage.lastCommit.rev, 469 + since: storage.lastCommit.since, 470 + prevData: prevDataCid, 471 + newBlocks: storage.lastCommit.newBlocks, 472 + ops: [ 473 + { 474 + action: "delete", 475 + path: `${collectionResult.collection}/${rkeyResult.rkey}`, 476 + cid: null, 477 + }, 478 + ], 479 + }); 480 + } 481 + 413 482 return c.json({ 414 483 commit: { 415 484 cid: updatedRepo.cid.toString(), ··· 523 592 const finalRepo = ops.length > 0 ? await repo.applyWrites(ops, keypair) : repo; 524 593 for (const collection of collectionsToTrack) { 525 594 storage.addCollection(collection); 595 + } 596 + 597 + if (sequencer && storage.lastCommit && ops.length > 0) { 598 + const prevDataCid = account.prev_data_cid 599 + ? CID.parse(account.prev_data_cid as string) 600 + : null; 601 + const firehoseOps: FirehoseOp[] = []; 602 + for (const op of ops) { 603 + if (op.action === WriteOpAction.Delete) { 604 + firehoseOps.push({ 605 + action: "delete", 606 + path: `${op.collection}/${op.rkey}`, 607 + cid: null, 608 + }); 609 + } else { 610 + const cid = await finalRepo.data.get( 611 + formatDataKey(op.collection, op.rkey), 612 + ); 613 + firehoseOps.push({ 614 + action: op.action === WriteOpAction.Create ? "create" : "update", 615 + path: `${op.collection}/${op.rkey}`, 616 + cid: cid ?? null, 617 + }); 618 + } 619 + } 620 + await sequencer.sequenceCommit({ 621 + did: account.did, 622 + commit: storage.lastCommit.cid, 623 + rev: storage.lastCommit.rev, 624 + since: storage.lastCommit.since, 625 + prevData: prevDataCid, 626 + newBlocks: storage.lastCommit.newBlocks, 627 + ops: firehoseOps, 628 + }); 526 629 } 527 630 528 631 const results = [];
+188
src/sequencer.ts
··· 1 + import { CID } from "@atproto/lex-data"; 2 + import { type BlockMap, blocksToCarFile } from "@atproto/repo"; 3 + import type Database from "better-sqlite3"; 4 + import { encode as cborEncode } from "./cbor-compat.js"; 5 + 6 + export interface FirehoseOp { 7 + action: "create" | "update" | "delete"; 8 + path: string; 9 + cid: CID | null; 10 + } 11 + 12 + export interface FirehoseCommitData { 13 + did: string; 14 + commit: CID; 15 + rev: string; 16 + since: string | null; 17 + prevData: CID | null; 18 + newBlocks: BlockMap; 19 + ops: FirehoseOp[]; 20 + } 21 + 22 + function concatBytes(a: Uint8Array, b: Uint8Array): Uint8Array { 23 + const result = new Uint8Array(a.length + b.length); 24 + result.set(a, 0); 25 + result.set(b, a.length); 26 + return result; 27 + } 28 + 29 + export class Sequencer { 30 + private subscribers = new Set<(frame: Uint8Array) => void>(); 31 + 32 + constructor(private db: Database.Database) {} 33 + 34 + async sequenceCommit(data: FirehoseCommitData): Promise<{ seq: number }> { 35 + const carBytes = await blocksToCarFile(data.commit, data.newBlocks); 36 + const time = new Date().toISOString(); 37 + 38 + const body: Record<string, unknown> = { 39 + repo: data.did, 40 + commit: data.commit, 41 + rev: data.rev, 42 + since: data.since, 43 + blocks: carBytes, 44 + ops: data.ops, 45 + prevData: data.prevData, 46 + rebase: false, 47 + tooBig: carBytes.length > 1_000_000, 48 + blobs: [], 49 + time, 50 + }; 51 + 52 + const { seq, frame } = this.db.transaction(() => { 53 + const info = this.db 54 + .prepare( 55 + "INSERT INTO firehose_events (did, event_type, payload) VALUES (?, ?, ?)", 56 + ) 57 + .run(data.did, "commit", Buffer.alloc(0)); 58 + const seq = Number(info.lastInsertRowid); 59 + 60 + const header = cborEncode({ op: 1, t: "#commit" }); 61 + const bodyWithSeq = cborEncode({ ...body, seq }); 62 + const frame = concatBytes(header, bodyWithSeq); 63 + 64 + this.db 65 + .prepare("UPDATE firehose_events SET payload = ? WHERE seq = ?") 66 + .run(frame, seq); 67 + 68 + return { seq, frame }; 69 + })(); 70 + 71 + this.broadcast(frame); 72 + return { seq }; 73 + } 74 + 75 + sequenceIdentity(did: string, handle: string): { seq: number } { 76 + const time = new Date().toISOString(); 77 + 78 + const { seq, frame } = this.db.transaction(() => { 79 + const info = this.db 80 + .prepare( 81 + "INSERT INTO firehose_events (did, event_type, payload) VALUES (?, ?, ?)", 82 + ) 83 + .run(did, "identity", Buffer.alloc(0)); 84 + const seq = Number(info.lastInsertRowid); 85 + 86 + const header = cborEncode({ op: 1, t: "#identity" }); 87 + const bodyBytes = cborEncode({ seq, did, handle, time }); 88 + const frame = concatBytes(header, bodyBytes); 89 + 90 + this.db 91 + .prepare("UPDATE firehose_events SET payload = ? WHERE seq = ?") 92 + .run(frame, seq); 93 + 94 + return { seq, frame }; 95 + })(); 96 + 97 + this.broadcast(frame); 98 + return { seq }; 99 + } 100 + 101 + sequenceAccount( 102 + did: string, 103 + active: boolean, 104 + status?: string, 105 + ): { seq: number } { 106 + const time = new Date().toISOString(); 107 + 108 + const { seq, frame } = this.db.transaction(() => { 109 + const info = this.db 110 + .prepare( 111 + "INSERT INTO firehose_events (did, event_type, payload) VALUES (?, ?, ?)", 112 + ) 113 + .run(did, "account", Buffer.alloc(0)); 114 + const seq = Number(info.lastInsertRowid); 115 + 116 + const header = cborEncode({ op: 1, t: "#account" }); 117 + const bodyBytes = cborEncode({ 118 + seq, 119 + did, 120 + active, 121 + status: status ?? null, 122 + time, 123 + }); 124 + const frame = concatBytes(header, bodyBytes); 125 + 126 + this.db 127 + .prepare("UPDATE firehose_events SET payload = ? WHERE seq = ?") 128 + .run(frame, seq); 129 + 130 + return { seq, frame }; 131 + })(); 132 + 133 + this.broadcast(frame); 134 + return { seq }; 135 + } 136 + 137 + getEventsSince(cursor: number, limit = 500): Uint8Array[] { 138 + const rows = this.db 139 + .prepare( 140 + "SELECT payload FROM firehose_events WHERE seq > ? ORDER BY seq ASC LIMIT ?", 141 + ) 142 + .all(cursor, limit) as { payload: Buffer }[]; 143 + return rows.map((r) => new Uint8Array(r.payload)); 144 + } 145 + 146 + getLatestSeq(): number { 147 + const row = this.db 148 + .prepare("SELECT MAX(seq) as seq FROM firehose_events") 149 + .get() as { seq: number | null }; 150 + return row?.seq ?? 0; 151 + } 152 + 153 + getOldestSeq(): number { 154 + const row = this.db 155 + .prepare("SELECT MIN(seq) as seq FROM firehose_events") 156 + .get() as { seq: number | null }; 157 + return row?.seq ?? 0; 158 + } 159 + 160 + pruneOldEvents(keepCount = 10000): void { 161 + this.db 162 + .prepare( 163 + "DELETE FROM firehose_events WHERE seq < (SELECT MAX(seq) - ? FROM firehose_events)", 164 + ) 165 + .run(keepCount); 166 + } 167 + 168 + subscribe(handler: (frame: Uint8Array) => void): () => void { 169 + this.subscribers.add(handler); 170 + return () => { 171 + this.subscribers.delete(handler); 172 + }; 173 + } 174 + 175 + get subscriberCount(): number { 176 + return this.subscribers.size; 177 + } 178 + 179 + private broadcast(frame: Uint8Array): void { 180 + for (const handler of this.subscribers) { 181 + try { 182 + handler(frame); 183 + } catch { 184 + this.subscribers.delete(handler); 185 + } 186 + } 187 + } 188 + }
+3
src/storage.ts
··· 12 12 extends ReadableBlockstore 13 13 implements RepoStorage 14 14 { 15 + lastCommit: CommitData | null = null; 16 + 15 17 constructor( 16 18 private db: Database.Database, 17 19 private accountId: number, ··· 88 90 } 89 91 90 92 async applyCommit(commit: CommitData): Promise<void> { 93 + this.lastCommit = commit; 91 94 const doCommit = this.db.transaction(() => { 92 95 const insertBlock = this.db.prepare( 93 96 "INSERT OR REPLACE INTO blocks (account_id, cid, bytes, rev) VALUES (?, ?, ?, ?)",
+58 -1
src/sync.ts
··· 3 3 import { CID } from "@atproto/lex-data"; 4 4 import { BlockMap, blocksToCarStream } from "@atproto/repo"; 5 5 import type Database from "better-sqlite3"; 6 + import type { Sequencer } from "./sequencer.js"; 6 7 7 8 type AccountRow = { 8 9 id: number; ··· 51 52 .all(accountId, since) as BlockRow[]; 52 53 } 53 54 54 - export function createSyncRoutes(db: Database.Database): Hono { 55 + export function createSyncRoutes( 56 + db: Database.Database, 57 + sequencer?: Sequencer, 58 + upgradeWebSocket?: Function, 59 + ): Hono { 55 60 const app = new Hono(); 56 61 57 62 app.get("/xrpc/com.atproto.sync.getRepo", (c) => { ··· 173 178 174 179 return c.json(nextCursor ? { cursor: nextCursor, repos } : { repos }); 175 180 }); 181 + 182 + if (sequencer && upgradeWebSocket) { 183 + app.get( 184 + "/xrpc/com.atproto.sync.subscribeRepos", 185 + (upgradeWebSocket as Function)((c: Context) => { 186 + const cursorParam = c.req.query("cursor"); 187 + let unsubscribe: (() => void) | null = null; 188 + 189 + return { 190 + onOpen( 191 + _evt: unknown, 192 + ws: { 193 + send: (data: string | ArrayBuffer | Uint8Array) => void; 194 + close: (code?: number, reason?: string) => void; 195 + }, 196 + ) { 197 + if (cursorParam !== undefined) { 198 + const cursor = Number.parseInt(cursorParam, 10); 199 + if (Number.isNaN(cursor) || cursor < 0) { 200 + ws.close(1008, "invalid cursor"); 201 + return; 202 + } 203 + 204 + const oldestSeq = sequencer.getOldestSeq(); 205 + if (oldestSeq > 0 && cursor < oldestSeq - 1) { 206 + ws.close(1008, "cursor too old"); 207 + return; 208 + } 209 + 210 + const events = sequencer.getEventsSince(cursor); 211 + for (const frame of events) { 212 + ws.send(frame); 213 + } 214 + } 215 + 216 + unsubscribe = sequencer.subscribe((frame) => { 217 + try { 218 + ws.send(frame); 219 + } catch { 220 + unsubscribe?.(); 221 + unsubscribe = null; 222 + } 223 + }); 224 + }, 225 + onClose() { 226 + unsubscribe?.(); 227 + unsubscribe = null; 228 + }, 229 + }; 230 + }), 231 + ); 232 + } 176 233 177 234 return app; 178 235 }
+689
test/firehose.test.ts
··· 1 + import { afterEach, beforeEach, describe, expect, it } from "vitest"; 2 + import Database from "better-sqlite3"; 3 + import { Hono } from "hono"; 4 + import { serve } from "@hono/node-server"; 5 + import { createNodeWebSocket } from "@hono/node-ws"; 6 + import { WebSocket } from "ws"; 7 + import { Repo, WriteOpAction } from "@atproto/repo"; 8 + import { Secp256k1Keypair } from "@atproto/crypto"; 9 + import { decode as rawCborDecode } from "@atcute/cbor"; 10 + import { initDatabase } from "../src/db.js"; 11 + import { Sequencer } from "../src/sequencer.js"; 12 + import { SqliteRepoStorage } from "../src/storage.js"; 13 + import { createSyncRoutes } from "../src/sync.js"; 14 + import { encode as cborEncode } from "../src/cbor-compat.js"; 15 + 16 + // --- Helpers --- 17 + 18 + function createAccount( 19 + db: Database.Database, 20 + did: string, 21 + ): { id: number; storage: SqliteRepoStorage } { 22 + const info = db.prepare("INSERT INTO accounts (did) VALUES (?)").run(did); 23 + const id = Number(info.lastInsertRowid); 24 + return { id, storage: new SqliteRepoStorage(db, id) }; 25 + } 26 + 27 + /** 28 + * Decode a firehose frame (two concatenated DAG-CBOR values: header + body). 29 + * Uses raw @atcute/cbor decode to avoid cbor-compat conversion issues. 30 + */ 31 + function decodeFrame(frame: Uint8Array): { 32 + header: Record<string, unknown>; 33 + body: Record<string, unknown>; 34 + } { 35 + const commitHeader = cborEncode({ op: 1, t: "#commit" }); 36 + const identityHeader = cborEncode({ op: 1, t: "#identity" }); 37 + const accountHeader = cborEncode({ op: 1, t: "#account" }); 38 + 39 + for (const knownHeader of [commitHeader, identityHeader, accountHeader]) { 40 + if (frame.length > knownHeader.length) { 41 + const prefix = frame.slice(0, knownHeader.length); 42 + if (Buffer.from(prefix).equals(Buffer.from(knownHeader))) { 43 + const header = rawCborDecode(prefix) as Record<string, unknown>; 44 + const body = rawCborDecode( 45 + frame.slice(knownHeader.length), 46 + ) as Record<string, unknown>; 47 + return { header, body }; 48 + } 49 + } 50 + } 51 + 52 + throw new Error("unknown frame header"); 53 + } 54 + 55 + // --- Sequencer unit tests --- 56 + 57 + describe("Sequencer", () => { 58 + let db: Database.Database; 59 + let sequencer: Sequencer; 60 + 61 + beforeEach(() => { 62 + db = initDatabase(":memory:"); 63 + sequencer = new Sequencer(db); 64 + }); 65 + 66 + describe("sequenceCommit", () => { 67 + it("produces monotonic seq numbers", async () => { 68 + const { storage } = createAccount(db, "did:plc:test"); 69 + const keypair = await Secp256k1Keypair.create(); 70 + let repo = await Repo.create(storage, "did:plc:test", keypair); 71 + 72 + const results = []; 73 + for (let i = 0; i < 3; i++) { 74 + repo = await repo.applyWrites( 75 + { 76 + action: WriteOpAction.Create, 77 + collection: "app.test.post", 78 + rkey: `r${i}`, 79 + record: { text: `post ${i}`, $type: "app.test.post" }, 80 + }, 81 + keypair, 82 + ); 83 + 84 + const commit = storage.lastCommit!; 85 + const result = await sequencer.sequenceCommit({ 86 + did: "did:plc:test", 87 + commit: commit.cid, 88 + rev: commit.rev, 89 + since: commit.since, 90 + prevData: null, 91 + newBlocks: commit.newBlocks, 92 + ops: [ 93 + { 94 + action: "create", 95 + path: `app.test.post/r${i}`, 96 + cid: commit.cid, 97 + }, 98 + ], 99 + }); 100 + results.push(result.seq); 101 + } 102 + 103 + expect(results[1]).toBeGreaterThan(results[0]); 104 + expect(results[2]).toBeGreaterThan(results[1]); 105 + }); 106 + 107 + it("stores events retrievable via getEventsSince", async () => { 108 + const { storage } = createAccount(db, "did:plc:test"); 109 + const keypair = await Secp256k1Keypair.create(); 110 + let repo = await Repo.create(storage, "did:plc:test", keypair); 111 + 112 + repo = await repo.applyWrites( 113 + { 114 + action: WriteOpAction.Create, 115 + collection: "app.test.post", 116 + rkey: "r1", 117 + record: { text: "hello", $type: "app.test.post" }, 118 + }, 119 + keypair, 120 + ); 121 + 122 + const commit = storage.lastCommit!; 123 + const { seq } = await sequencer.sequenceCommit({ 124 + did: "did:plc:test", 125 + commit: commit.cid, 126 + rev: commit.rev, 127 + since: commit.since, 128 + prevData: null, 129 + newBlocks: commit.newBlocks, 130 + ops: [{ action: "create", path: "app.test.post/r1", cid: commit.cid }], 131 + }); 132 + 133 + const events = sequencer.getEventsSince(0); 134 + expect(events.length).toBe(1); 135 + 136 + const { header, body } = decodeFrame(events[0]); 137 + expect(header.op).toBe(1); 138 + expect(header.t).toBe("#commit"); 139 + expect(body.seq).toBe(seq); 140 + expect(body.repo).toBe("did:plc:test"); 141 + }); 142 + 143 + it("includes correct ops, rev, and blocks in commit events", async () => { 144 + const { storage } = createAccount(db, "did:plc:test"); 145 + const keypair = await Secp256k1Keypair.create(); 146 + let repo = await Repo.create(storage, "did:plc:test", keypair); 147 + 148 + repo = await repo.applyWrites( 149 + { 150 + action: WriteOpAction.Create, 151 + collection: "app.test.post", 152 + rkey: "r1", 153 + record: { text: "hello", $type: "app.test.post" }, 154 + }, 155 + keypair, 156 + ); 157 + 158 + const commit = storage.lastCommit!; 159 + await sequencer.sequenceCommit({ 160 + did: "did:plc:test", 161 + commit: commit.cid, 162 + rev: commit.rev, 163 + since: commit.since, 164 + prevData: null, 165 + newBlocks: commit.newBlocks, 166 + ops: [{ action: "create", path: "app.test.post/r1", cid: commit.cid }], 167 + }); 168 + 169 + const events = sequencer.getEventsSince(0); 170 + const { body } = decodeFrame(events[0]); 171 + expect(body.rev).toBe(commit.rev); 172 + // blocks is a CBOR Bytes object (has $bytes property) 173 + expect(body.blocks).toBeDefined(); 174 + 175 + const ops = body.ops as Array<Record<string, unknown>>; 176 + expect(ops).toHaveLength(1); 177 + expect(ops[0].action).toBe("create"); 178 + expect(ops[0].path).toBe("app.test.post/r1"); 179 + }); 180 + 181 + it("includes prevData when provided", async () => { 182 + const { storage } = createAccount(db, "did:plc:test"); 183 + const keypair = await Secp256k1Keypair.create(); 184 + let repo = await Repo.create(storage, "did:plc:test", keypair); 185 + 186 + // Get prev_data_cid from DB after repo creation 187 + const prevDataRow = db 188 + .prepare("SELECT prev_data_cid FROM accounts WHERE did = ?") 189 + .get("did:plc:test") as { prev_data_cid: string | null }; 190 + 191 + repo = await repo.applyWrites( 192 + { 193 + action: WriteOpAction.Create, 194 + collection: "app.test.post", 195 + rkey: "r1", 196 + record: { text: "hello", $type: "app.test.post" }, 197 + }, 198 + keypair, 199 + ); 200 + 201 + const commit = storage.lastCommit!; 202 + const { CID } = await import("@atproto/lex-data"); 203 + const prevDataCid = prevDataRow.prev_data_cid 204 + ? CID.parse(prevDataRow.prev_data_cid) 205 + : null; 206 + 207 + await sequencer.sequenceCommit({ 208 + did: "did:plc:test", 209 + commit: commit.cid, 210 + rev: commit.rev, 211 + since: commit.since, 212 + prevData: prevDataCid, 213 + newBlocks: commit.newBlocks, 214 + ops: [{ action: "create", path: "app.test.post/r1", cid: commit.cid }], 215 + }); 216 + 217 + const events = sequencer.getEventsSince(0); 218 + const { body } = decodeFrame(events[0]); 219 + 220 + // prevData should be present when provided 221 + if (prevDataCid) { 222 + // It's a CIDLink object in raw CBOR decode: { $link: string } 223 + expect(body.prevData).toBeDefined(); 224 + expect(body.prevData).not.toBeNull(); 225 + } 226 + }); 227 + 228 + it("frame format is two concatenated DAG-CBOR values", async () => { 229 + const { storage } = createAccount(db, "did:plc:test"); 230 + const keypair = await Secp256k1Keypair.create(); 231 + let repo = await Repo.create(storage, "did:plc:test", keypair); 232 + 233 + repo = await repo.applyWrites( 234 + { 235 + action: WriteOpAction.Create, 236 + collection: "app.test.post", 237 + rkey: "r1", 238 + record: { text: "hello", $type: "app.test.post" }, 239 + }, 240 + keypair, 241 + ); 242 + 243 + const commit = storage.lastCommit!; 244 + await sequencer.sequenceCommit({ 245 + did: "did:plc:test", 246 + commit: commit.cid, 247 + rev: commit.rev, 248 + since: commit.since, 249 + prevData: null, 250 + newBlocks: commit.newBlocks, 251 + ops: [{ action: "create", path: "app.test.post/r1", cid: commit.cid }], 252 + }); 253 + 254 + const events = sequencer.getEventsSince(0); 255 + const frame = events[0]; 256 + 257 + // Should be decodable as header + body 258 + const { header, body } = decodeFrame(frame); 259 + expect(header).toBeDefined(); 260 + expect(body).toBeDefined(); 261 + expect(header.op).toBe(1); 262 + expect(header.t).toBe("#commit"); 263 + expect(typeof body.seq).toBe("number"); 264 + }); 265 + }); 266 + 267 + describe("sequenceIdentity", () => { 268 + it("stores identity events with correct fields", () => { 269 + const { seq } = sequencer.sequenceIdentity( 270 + "did:plc:test", 271 + "agent.test.example", 272 + ); 273 + expect(seq).toBeGreaterThan(0); 274 + 275 + const events = sequencer.getEventsSince(0); 276 + expect(events.length).toBe(1); 277 + 278 + const { header, body } = decodeFrame(events[0]); 279 + expect(header.op).toBe(1); 280 + expect(header.t).toBe("#identity"); 281 + expect(body.seq).toBe(seq); 282 + expect(body.did).toBe("did:plc:test"); 283 + expect(body.handle).toBe("agent.test.example"); 284 + expect(typeof body.time).toBe("string"); 285 + }); 286 + }); 287 + 288 + describe("sequenceAccount", () => { 289 + it("stores account events with active=true", () => { 290 + const { seq } = sequencer.sequenceAccount("did:plc:test", true); 291 + expect(seq).toBeGreaterThan(0); 292 + 293 + const events = sequencer.getEventsSince(0); 294 + const { header, body } = decodeFrame(events[0]); 295 + expect(header.op).toBe(1); 296 + expect(header.t).toBe("#account"); 297 + expect(body.seq).toBe(seq); 298 + expect(body.did).toBe("did:plc:test"); 299 + expect(body.active).toBe(true); 300 + }); 301 + 302 + it("stores account events with status", () => { 303 + sequencer.sequenceAccount("did:plc:test", false, "deactivated"); 304 + 305 + const events = sequencer.getEventsSince(0); 306 + const { body } = decodeFrame(events[0]); 307 + expect(body.active).toBe(false); 308 + expect(body.status).toBe("deactivated"); 309 + }); 310 + }); 311 + 312 + describe("getEventsSince", () => { 313 + it("returns events in order after cursor", () => { 314 + const seq1 = sequencer.sequenceIdentity("did:plc:a", "a.test").seq; 315 + const seq2 = sequencer.sequenceIdentity("did:plc:b", "b.test").seq; 316 + const seq3 = sequencer.sequenceIdentity("did:plc:c", "c.test").seq; 317 + 318 + // Get events after seq1 319 + const events = sequencer.getEventsSince(seq1); 320 + expect(events.length).toBe(2); 321 + 322 + const body1 = decodeFrame(events[0]).body; 323 + const body2 = decodeFrame(events[1]).body; 324 + expect(body1.seq).toBe(seq2); 325 + expect(body2.seq).toBe(seq3); 326 + }); 327 + 328 + it("returns empty array when no events after cursor", () => { 329 + const { seq } = sequencer.sequenceIdentity("did:plc:test", "test"); 330 + const events = sequencer.getEventsSince(seq); 331 + expect(events).toEqual([]); 332 + }); 333 + 334 + it("respects limit parameter", () => { 335 + for (let i = 0; i < 5; i++) { 336 + sequencer.sequenceIdentity(`did:plc:${i}`, `${i}.test`); 337 + } 338 + 339 + const events = sequencer.getEventsSince(0, 2); 340 + expect(events.length).toBe(2); 341 + }); 342 + }); 343 + 344 + describe("getLatestSeq / getOldestSeq", () => { 345 + it("returns 0 when empty", () => { 346 + expect(sequencer.getLatestSeq()).toBe(0); 347 + expect(sequencer.getOldestSeq()).toBe(0); 348 + }); 349 + 350 + it("returns correct values after events", () => { 351 + const s1 = sequencer.sequenceIdentity("did:plc:a", "a").seq; 352 + sequencer.sequenceIdentity("did:plc:b", "b"); 353 + const s3 = sequencer.sequenceIdentity("did:plc:c", "c").seq; 354 + 355 + expect(sequencer.getOldestSeq()).toBe(s1); 356 + expect(sequencer.getLatestSeq()).toBe(s3); 357 + }); 358 + }); 359 + 360 + describe("pruneOldEvents", () => { 361 + it("removes old events keeping recent ones", () => { 362 + for (let i = 0; i < 10; i++) { 363 + sequencer.sequenceIdentity(`did:plc:${i}`, `${i}.test`); 364 + } 365 + 366 + // pruneOldEvents(3) deletes where seq < MAX(seq) - 3 367 + // With seqs 1-10: deletes seq < 7, keeps 7,8,9,10 368 + sequencer.pruneOldEvents(3); 369 + const events = sequencer.getEventsSince(0); 370 + expect(events.length).toBe(4); 371 + }); 372 + }); 373 + 374 + describe("subscribe / broadcast", () => { 375 + it("broadcasts to subscribers", async () => { 376 + const received: Uint8Array[] = []; 377 + sequencer.subscribe((frame) => received.push(frame)); 378 + 379 + const { storage } = createAccount(db, "did:plc:test"); 380 + const keypair = await Secp256k1Keypair.create(); 381 + let repo = await Repo.create(storage, "did:plc:test", keypair); 382 + 383 + repo = await repo.applyWrites( 384 + { 385 + action: WriteOpAction.Create, 386 + collection: "app.test.post", 387 + rkey: "r1", 388 + record: { text: "hello", $type: "app.test.post" }, 389 + }, 390 + keypair, 391 + ); 392 + 393 + const commit = storage.lastCommit!; 394 + await sequencer.sequenceCommit({ 395 + did: "did:plc:test", 396 + commit: commit.cid, 397 + rev: commit.rev, 398 + since: commit.since, 399 + prevData: null, 400 + newBlocks: commit.newBlocks, 401 + ops: [{ action: "create", path: "app.test.post/r1", cid: commit.cid }], 402 + }); 403 + 404 + expect(received.length).toBe(1); 405 + const { header } = decodeFrame(received[0]); 406 + expect(header.t).toBe("#commit"); 407 + }); 408 + 409 + it("multiple subscribers each receive all events", () => { 410 + const received1: Uint8Array[] = []; 411 + const received2: Uint8Array[] = []; 412 + sequencer.subscribe((frame) => received1.push(frame)); 413 + sequencer.subscribe((frame) => received2.push(frame)); 414 + 415 + sequencer.sequenceIdentity("did:plc:test", "test"); 416 + 417 + expect(received1.length).toBe(1); 418 + expect(received2.length).toBe(1); 419 + }); 420 + 421 + it("unsubscribe removes handler", () => { 422 + const received: Uint8Array[] = []; 423 + const unsub = sequencer.subscribe((frame) => received.push(frame)); 424 + 425 + sequencer.sequenceIdentity("did:plc:a", "a"); 426 + expect(received.length).toBe(1); 427 + 428 + unsub(); 429 + sequencer.sequenceIdentity("did:plc:b", "b"); 430 + expect(received.length).toBe(1); // no new events 431 + }); 432 + 433 + it("subscriberCount tracks active subscribers", () => { 434 + expect(sequencer.subscriberCount).toBe(0); 435 + 436 + const unsub1 = sequencer.subscribe(() => {}); 437 + const unsub2 = sequencer.subscribe(() => {}); 438 + expect(sequencer.subscriberCount).toBe(2); 439 + 440 + unsub1(); 441 + expect(sequencer.subscriberCount).toBe(1); 442 + 443 + unsub2(); 444 + expect(sequencer.subscriberCount).toBe(0); 445 + }); 446 + }); 447 + }); 448 + 449 + // --- WebSocket integration tests --- 450 + 451 + describe("subscribeRepos WebSocket", () => { 452 + let db: Database.Database; 453 + let sequencer: Sequencer; 454 + let serverInfo: { port: number; close: () => void }; 455 + 456 + async function startServer() { 457 + const app = new Hono(); 458 + const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 459 + app.route("/", createSyncRoutes(db, sequencer, upgradeWebSocket)); 460 + 461 + return new Promise<{ port: number; close: () => void }>((resolve) => { 462 + const server = serve({ fetch: app.fetch, port: 0 }, (info) => { 463 + resolve({ port: info.port, close: () => server.close() }); 464 + }); 465 + injectWebSocket(server); 466 + }); 467 + } 468 + 469 + function wsUrl(cursor?: number): string { 470 + return `ws://localhost:${serverInfo.port}/xrpc/com.atproto.sync.subscribeRepos${cursor !== undefined ? `?cursor=${cursor}` : ""}`; 471 + } 472 + 473 + function waitForMessage(ws: WebSocket, timeoutMs = 2000): Promise<Buffer> { 474 + return new Promise((resolve, reject) => { 475 + const timer = setTimeout( 476 + () => reject(new Error("timeout waiting for message")), 477 + timeoutMs, 478 + ); 479 + ws.once("message", (data: Buffer) => { 480 + clearTimeout(timer); 481 + resolve(data); 482 + }); 483 + }); 484 + } 485 + 486 + function collectMessages( 487 + ws: WebSocket, 488 + count: number, 489 + timeoutMs = 2000, 490 + ): Promise<Buffer[]> { 491 + return new Promise((resolve, reject) => { 492 + const messages: Buffer[] = []; 493 + const timer = setTimeout( 494 + () => 495 + reject( 496 + new Error(`timeout: got ${messages.length}/${count} messages`), 497 + ), 498 + timeoutMs, 499 + ); 500 + const handler = (data: Buffer) => { 501 + messages.push(data); 502 + if (messages.length === count) { 503 + clearTimeout(timer); 504 + ws.off("message", handler); 505 + resolve(messages); 506 + } 507 + }; 508 + ws.on("message", handler); 509 + }); 510 + } 511 + 512 + function connectAndWaitOpen(cursor?: number): Promise<WebSocket> { 513 + const ws = new WebSocket(wsUrl(cursor)); 514 + return new Promise((resolve, reject) => { 515 + ws.on("open", () => resolve(ws)); 516 + ws.on("error", reject); 517 + }); 518 + } 519 + 520 + beforeEach(async () => { 521 + db = initDatabase(":memory:"); 522 + sequencer = new Sequencer(db); 523 + serverInfo = await startServer(); 524 + }); 525 + 526 + afterEach(() => { 527 + serverInfo.close(); 528 + }); 529 + 530 + it("connects at /xrpc/com.atproto.sync.subscribeRepos", async () => { 531 + const ws = await connectAndWaitOpen(); 532 + expect(ws.readyState).toBe(WebSocket.OPEN); 533 + ws.close(); 534 + }); 535 + 536 + it("receives live commit events", async () => { 537 + const ws = await connectAndWaitOpen(); 538 + const msgPromise = waitForMessage(ws); 539 + 540 + // Create a commit event 541 + const { storage } = createAccount(db, "did:plc:live"); 542 + const keypair = await Secp256k1Keypair.create(); 543 + let repo = await Repo.create(storage, "did:plc:live", keypair); 544 + repo = await repo.applyWrites( 545 + { 546 + action: WriteOpAction.Create, 547 + collection: "app.test.post", 548 + rkey: "r1", 549 + record: { text: "live event", $type: "app.test.post" }, 550 + }, 551 + keypair, 552 + ); 553 + 554 + const commit = storage.lastCommit!; 555 + await sequencer.sequenceCommit({ 556 + did: "did:plc:live", 557 + commit: commit.cid, 558 + rev: commit.rev, 559 + since: commit.since, 560 + prevData: null, 561 + newBlocks: commit.newBlocks, 562 + ops: [{ action: "create", path: "app.test.post/r1", cid: commit.cid }], 563 + }); 564 + 565 + const msg = await msgPromise; 566 + const { header, body } = decodeFrame(new Uint8Array(msg)); 567 + expect(header.t).toBe("#commit"); 568 + expect(body.repo).toBe("did:plc:live"); 569 + 570 + ws.close(); 571 + }); 572 + 573 + it("receives live identity events", async () => { 574 + const ws = await connectAndWaitOpen(); 575 + const msgPromise = waitForMessage(ws); 576 + 577 + sequencer.sequenceIdentity("did:plc:signup", "agent.test.example"); 578 + 579 + const msg = await msgPromise; 580 + const { header, body } = decodeFrame(new Uint8Array(msg)); 581 + expect(header.t).toBe("#identity"); 582 + expect(body.did).toBe("did:plc:signup"); 583 + expect(body.handle).toBe("agent.test.example"); 584 + 585 + ws.close(); 586 + }); 587 + 588 + it("cursor-based backfill replays events in order", async () => { 589 + // Create events before connecting 590 + sequencer.sequenceIdentity("did:plc:a", "a.test"); 591 + sequencer.sequenceIdentity("did:plc:b", "b.test"); 592 + sequencer.sequenceIdentity("did:plc:c", "c.test"); 593 + 594 + // Attach message handler BEFORE open to catch backfill messages 595 + const ws = new WebSocket(wsUrl(1)); 596 + const messages = await collectMessages(ws, 2); 597 + 598 + const body1 = decodeFrame(new Uint8Array(messages[0])).body; 599 + const body2 = decodeFrame(new Uint8Array(messages[1])).body; 600 + expect(body1.did).toBe("did:plc:b"); 601 + expect(body2.did).toBe("did:plc:c"); 602 + 603 + ws.close(); 604 + }); 605 + 606 + it("cursor=0 replays all events", async () => { 607 + sequencer.sequenceIdentity("did:plc:a", "a.test"); 608 + sequencer.sequenceIdentity("did:plc:b", "b.test"); 609 + 610 + // Attach message handler BEFORE open to catch backfill messages 611 + const ws = new WebSocket(wsUrl(0)); 612 + const messages = await collectMessages(ws, 2); 613 + 614 + const body1 = decodeFrame(new Uint8Array(messages[0])).body; 615 + const body2 = decodeFrame(new Uint8Array(messages[1])).body; 616 + expect(body1.did).toBe("did:plc:a"); 617 + expect(body2.did).toBe("did:plc:b"); 618 + 619 + ws.close(); 620 + }); 621 + 622 + it("backfill then live: receives both", async () => { 623 + // Pre-existing event 624 + sequencer.sequenceIdentity("did:plc:old", "old.test"); 625 + 626 + // Connect with cursor=0, set up handler before open 627 + const ws = new WebSocket(wsUrl(0)); 628 + const allMessages = collectMessages(ws, 2); 629 + 630 + // Wait for connection to be open and backfill sent 631 + await new Promise<void>((resolve) => ws.on("open", resolve)); 632 + // Small delay to ensure subscription is set up after backfill 633 + await new Promise((r) => setTimeout(r, 50)); 634 + sequencer.sequenceIdentity("did:plc:new", "new.test"); 635 + 636 + const messages = await allMessages; 637 + const body1 = decodeFrame(new Uint8Array(messages[0])).body; 638 + const body2 = decodeFrame(new Uint8Array(messages[1])).body; 639 + expect(body1.did).toBe("did:plc:old"); 640 + expect(body2.did).toBe("did:plc:new"); 641 + 642 + ws.close(); 643 + }); 644 + 645 + it("multiple simultaneous subscribers each receive all events", async () => { 646 + const ws1 = await connectAndWaitOpen(); 647 + const ws2 = await connectAndWaitOpen(); 648 + 649 + const msg1Promise = waitForMessage(ws1); 650 + const msg2Promise = waitForMessage(ws2); 651 + 652 + sequencer.sequenceIdentity("did:plc:multi", "multi.test"); 653 + 654 + const [msg1, msg2] = await Promise.all([msg1Promise, msg2Promise]); 655 + const body1 = decodeFrame(new Uint8Array(msg1)).body; 656 + const body2 = decodeFrame(new Uint8Array(msg2)).body; 657 + expect(body1.did).toBe("did:plc:multi"); 658 + expect(body2.did).toBe("did:plc:multi"); 659 + 660 + ws1.close(); 661 + ws2.close(); 662 + }); 663 + 664 + it("subscriber disconnect is handled cleanly", async () => { 665 + const ws = await connectAndWaitOpen(); 666 + expect(sequencer.subscriberCount).toBe(1); 667 + 668 + ws.close(); 669 + // Wait for close to propagate 670 + await new Promise((r) => setTimeout(r, 100)); 671 + expect(sequencer.subscriberCount).toBe(0); 672 + }); 673 + 674 + it("no cursor streams only live events", async () => { 675 + // Pre-existing events should not be sent 676 + sequencer.sequenceIdentity("did:plc:old", "old.test"); 677 + 678 + const ws = await connectAndWaitOpen(); // no cursor 679 + const msgPromise = waitForMessage(ws); 680 + 681 + sequencer.sequenceIdentity("did:plc:live", "live.test"); 682 + 683 + const msg = await msgPromise; 684 + const { body } = decodeFrame(new Uint8Array(msg)); 685 + expect(body.did).toBe("did:plc:live"); 686 + 687 + ws.close(); 688 + }); 689 + });