atproto user agency toolkit for individuals and groups
1import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
2import { mkdtempSync, rmSync } from "node:fs";
3import { tmpdir } from "node:os";
4import { join } from "node:path";
5import Database from "better-sqlite3";
6import { WebSocketServer, WebSocket } from "ws";
7import { createServer, type Server } from "node:http";
8import { encode as cborEncode } from "../cbor-compat.js";
9import { SyncStorage } from "./sync-storage.js";
10import {
11 FirehoseSubscription,
12 type FirehoseCommitEvent,
13} from "./firehose-subscription.js";
14
15// ============================================
16// Helpers
17// ============================================
18
19/** Encode a firehose frame (header + body) as concatenated CBOR. */
20function encodeFrame(header: object, body: object): Buffer {
21 const headerBytes = cborEncode(header);
22 const bodyBytes = cborEncode(body);
23 const frame = new Uint8Array(headerBytes.length + bodyBytes.length);
24 frame.set(headerBytes, 0);
25 frame.set(bodyBytes, headerBytes.length);
26 return Buffer.from(frame);
27}
28
29/** Create a mock commit frame for a given DID. */
30function makeCommitFrame(
31 seq: number,
32 repo: string,
33 ops: Array<{ action: string; path: string; cid: null }> = [],
34): Buffer {
35 const header = { op: 1, t: "#commit" };
36 const body = {
37 seq,
38 repo,
39 rev: `rev${seq}`,
40 since: null,
41 blocks: new Uint8Array(0),
42 ops,
43 commit: null,
44 time: new Date().toISOString(),
45 tooBig: false,
46 rebase: false,
47 };
48 return encodeFrame(header, body);
49}
50
51/** Create a mock identity frame. */
52function makeIdentityFrame(seq: number, did: string): Buffer {
53 const header = { op: 1, t: "#identity" };
54 const body = { seq, did, handle: "test.bsky.social", time: new Date().toISOString() };
55 return encodeFrame(header, body);
56}
57
58/** Create an error frame. */
59function makeErrorFrame(error: string, message: string): Buffer {
60 const header = { op: -1 };
61 const body = { error, message };
62 return encodeFrame(header, body);
63}
64
65/** Start a local WebSocket server that simulates a firehose relay. */
66function startMockRelay(): Promise<{
67 server: Server;
68 wss: WebSocketServer;
69 url: string;
70 getClients: () => Set<WebSocket>;
71 close: () => Promise<void>;
72}> {
73 return new Promise((resolve) => {
74 const server = createServer();
75 const wss = new WebSocketServer({ server });
76
77 server.listen(0, "127.0.0.1", () => {
78 const addr = server.address() as { port: number };
79 const url = `ws://127.0.0.1:${addr.port}/xrpc/com.atproto.sync.subscribeRepos`;
80 resolve({
81 server,
82 wss,
83 url,
84 getClients: () => wss.clients,
85 close: () =>
86 new Promise<void>((res) => {
87 for (const client of wss.clients) {
88 client.close();
89 }
90 wss.close(() => {
91 server.close(() => res());
92 });
93 }),
94 });
95 });
96 });
97}
98
99/** Wait for a condition to be true, with timeout. */
100async function waitFor(
101 condition: () => boolean,
102 timeoutMs = 5000,
103 intervalMs = 50,
104): Promise<void> {
105 const deadline = Date.now() + timeoutMs;
106 while (!condition()) {
107 if (Date.now() > deadline) {
108 throw new Error(`waitFor timed out after ${timeoutMs}ms`);
109 }
110 await new Promise((r) => setTimeout(r, intervalMs));
111 }
112}
113
114// ============================================
115// CBOR Frame Parsing
116// ============================================
117
118describe("FirehoseSubscription: frame parsing", () => {
119 let relay: Awaited<ReturnType<typeof startMockRelay>>;
120 let sub: FirehoseSubscription;
121
122 beforeEach(async () => {
123 relay = await startMockRelay();
124 });
125
126 afterEach(async () => {
127 sub?.stop();
128 await relay.close();
129 });
130
131 it("parses commit events for tracked DIDs", async () => {
132 const events: FirehoseCommitEvent[] = [];
133
134 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
135 sub.onCommit((event) => {
136 events.push(event);
137 });
138
139 const trackedDid = "did:plc:tracked1";
140 sub.start(new Set([trackedDid]));
141
142 // Wait for client to connect
143 await waitFor(() => relay.getClients().size > 0);
144
145 // Send a commit for the tracked DID
146 const frame = makeCommitFrame(1, trackedDid, [
147 { action: "create", path: "app.bsky.feed.post/abc", cid: null },
148 ]);
149 for (const client of relay.getClients()) {
150 client.send(frame);
151 }
152
153 // Wait for event to be processed
154 await waitFor(() => events.length > 0);
155
156 expect(events).toHaveLength(1);
157 expect(events[0]!.seq).toBe(1);
158 expect(events[0]!.repo).toBe(trackedDid);
159 expect(events[0]!.ops).toHaveLength(1);
160 expect(events[0]!.ops[0]!.action).toBe("create");
161 expect(events[0]!.ops[0]!.path).toBe("app.bsky.feed.post/abc");
162 });
163
164 it("filters out events for non-tracked DIDs", async () => {
165 const events: FirehoseCommitEvent[] = [];
166
167 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
168 sub.onCommit((event) => {
169 events.push(event);
170 });
171
172 const trackedDid = "did:plc:tracked1";
173 const untrackedDid = "did:plc:untracked1";
174 sub.start(new Set([trackedDid]));
175
176 await waitFor(() => relay.getClients().size > 0);
177
178 // Send events for both tracked and untracked DIDs
179 for (const client of relay.getClients()) {
180 client.send(makeCommitFrame(1, untrackedDid));
181 client.send(makeCommitFrame(2, trackedDid));
182 client.send(makeCommitFrame(3, untrackedDid));
183 }
184
185 await waitFor(() => events.length >= 1);
186 // Small delay to ensure no extra events arrive
187 await new Promise((r) => setTimeout(r, 200));
188
189 // Only the tracked DID's event should have been dispatched
190 expect(events).toHaveLength(1);
191 expect(events[0]!.repo).toBe(trackedDid);
192 expect(events[0]!.seq).toBe(2);
193 });
194
195 it("ignores identity events (only processes commits)", async () => {
196 const events: FirehoseCommitEvent[] = [];
197
198 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
199 sub.onCommit((event) => {
200 events.push(event);
201 });
202
203 const trackedDid = "did:plc:tracked1";
204 sub.start(new Set([trackedDid]));
205
206 await waitFor(() => relay.getClients().size > 0);
207
208 for (const client of relay.getClients()) {
209 client.send(makeIdentityFrame(1, trackedDid));
210 client.send(makeCommitFrame(2, trackedDid));
211 }
212
213 await waitFor(() => events.length >= 1);
214 await new Promise((r) => setTimeout(r, 200));
215
216 // Only commit events should be dispatched
217 expect(events).toHaveLength(1);
218 expect(events[0]!.seq).toBe(2);
219 });
220
221 it("handles error frames without crashing", async () => {
222 const events: FirehoseCommitEvent[] = [];
223 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
224
225 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
226 sub.onCommit((event) => {
227 events.push(event);
228 });
229
230 const trackedDid = "did:plc:tracked1";
231 sub.start(new Set([trackedDid]));
232
233 await waitFor(() => relay.getClients().size > 0);
234
235 for (const client of relay.getClients()) {
236 client.send(makeErrorFrame("ConsumerTooSlow", "Consumer is too slow"));
237 client.send(makeCommitFrame(1, trackedDid));
238 }
239
240 await waitFor(() => events.length >= 1);
241
242 // Error was logged (single string argument containing the prefix)
243 expect(consoleSpy).toHaveBeenCalledWith(
244 expect.stringContaining("[firehose-subscription] Error from relay"),
245 );
246
247 // Commit event still processed
248 expect(events).toHaveLength(1);
249
250 consoleSpy.mockRestore();
251 });
252});
253
254// ============================================
255// Cursor Management
256// ============================================
257
258describe("FirehoseSubscription: cursor management", () => {
259 let relay: Awaited<ReturnType<typeof startMockRelay>>;
260 let sub: FirehoseSubscription;
261
262 beforeEach(async () => {
263 relay = await startMockRelay();
264 });
265
266 afterEach(async () => {
267 sub?.stop();
268 await relay.close();
269 });
270
271 it("updates cursor as events are processed", async () => {
272 const events: FirehoseCommitEvent[] = [];
273
274 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
275 sub.onCommit((event) => {
276 events.push(event);
277 });
278
279 const trackedDid = "did:plc:tracked1";
280 sub.start(new Set([trackedDid]));
281
282 await waitFor(() => relay.getClients().size > 0);
283
284 expect(sub.getCursor()).toBeNull();
285
286 for (const client of relay.getClients()) {
287 client.send(makeCommitFrame(10, trackedDid));
288 }
289 await waitFor(() => events.length >= 1);
290 expect(sub.getCursor()).toBe(10);
291
292 for (const client of relay.getClients()) {
293 client.send(makeCommitFrame(20, trackedDid));
294 }
295 await waitFor(() => events.length >= 2);
296 expect(sub.getCursor()).toBe(20);
297 });
298
299 it("passes cursor to relay URL on connection", async () => {
300 let receivedUrl: string | undefined;
301
302 relay.wss.on("connection", (_ws, req) => {
303 receivedUrl = req.url;
304 });
305
306 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
307 sub.start(new Set(["did:plc:test"]), 12345);
308
309 await waitFor(() => relay.getClients().size > 0);
310
311 expect(receivedUrl).toContain("cursor=12345");
312 });
313
314 it("resumes from cursor on reconnect", async () => {
315 const events: FirehoseCommitEvent[] = [];
316 const urls: string[] = [];
317
318 relay.wss.on("connection", (_ws, req) => {
319 urls.push(req.url ?? "");
320 });
321
322 sub = new FirehoseSubscription({
323 firehoseUrl: relay.url,
324 reconnectDelayMs: 100,
325 });
326 sub.onCommit((event) => {
327 events.push(event);
328 });
329
330 const trackedDid = "did:plc:tracked1";
331 sub.start(new Set([trackedDid]));
332
333 await waitFor(() => relay.getClients().size > 0);
334
335 // Send an event to set the cursor
336 for (const client of relay.getClients()) {
337 client.send(makeCommitFrame(42, trackedDid));
338 }
339 await waitFor(() => events.length >= 1);
340 expect(sub.getCursor()).toBe(42);
341
342 // Close all clients to trigger reconnect
343 for (const client of relay.getClients()) {
344 client.close();
345 }
346
347 // Wait for reconnect
348 await waitFor(() => urls.length >= 2, 5000);
349
350 // The reconnection URL should include the cursor
351 expect(urls[1]).toContain("cursor=42");
352 });
353});
354
355// ============================================
356// SyncStorage: Firehose Cursor Persistence
357// ============================================
358
359describe("SyncStorage: firehose cursor", () => {
360 let tmpDir: string;
361 let db: InstanceType<typeof Database>;
362 let storage: SyncStorage;
363
364 beforeEach(() => {
365 tmpDir = mkdtempSync(join(tmpdir(), "firehose-cursor-test-"));
366 db = new Database(join(tmpDir, "test.db"));
367 storage = new SyncStorage(db);
368 storage.initSchema();
369 });
370
371 afterEach(() => {
372 db.close();
373 rmSync(tmpDir, { recursive: true, force: true });
374 });
375
376 it("returns null when no cursor saved", () => {
377 expect(storage.getFirehoseCursor()).toBeNull();
378 });
379
380 it("saves and retrieves cursor", () => {
381 storage.saveFirehoseCursor(12345);
382 expect(storage.getFirehoseCursor()).toBe(12345);
383 });
384
385 it("updates cursor on subsequent saves", () => {
386 storage.saveFirehoseCursor(100);
387 expect(storage.getFirehoseCursor()).toBe(100);
388
389 storage.saveFirehoseCursor(200);
390 expect(storage.getFirehoseCursor()).toBe(200);
391 });
392
393 it("clears cursor", () => {
394 storage.saveFirehoseCursor(100);
395 storage.clearFirehoseCursor();
396 expect(storage.getFirehoseCursor()).toBeNull();
397 });
398});
399
400// ============================================
401// Reconnection Logic
402// ============================================
403
404describe("FirehoseSubscription: reconnection", () => {
405 let relay: Awaited<ReturnType<typeof startMockRelay>>;
406 let sub: FirehoseSubscription;
407
408 beforeEach(async () => {
409 relay = await startMockRelay();
410 });
411
412 afterEach(async () => {
413 sub?.stop();
414 await relay.close();
415 });
416
417 it("reconnects after disconnection", async () => {
418 let connectionCount = 0;
419 relay.wss.on("connection", () => {
420 connectionCount++;
421 });
422
423 sub = new FirehoseSubscription({
424 firehoseUrl: relay.url,
425 reconnectDelayMs: 100,
426 maxReconnectDelayMs: 500,
427 });
428 sub.start(new Set(["did:plc:test"]));
429
430 await waitFor(() => connectionCount >= 1);
431
432 // Close server-side connections to trigger reconnect
433 for (const client of relay.getClients()) {
434 client.close();
435 }
436
437 await waitFor(() => connectionCount >= 2, 5000);
438 expect(connectionCount).toBeGreaterThanOrEqual(2);
439 });
440
441 it("stop() prevents reconnection", async () => {
442 let connectionCount = 0;
443 relay.wss.on("connection", () => {
444 connectionCount++;
445 });
446
447 sub = new FirehoseSubscription({
448 firehoseUrl: relay.url,
449 reconnectDelayMs: 100,
450 });
451 sub.start(new Set(["did:plc:test"]));
452
453 await waitFor(() => connectionCount >= 1);
454
455 // Stop before disconnection
456 sub.stop();
457
458 // Close server-side connections
459 for (const client of relay.getClients()) {
460 client.close();
461 }
462
463 // Wait and verify no reconnection
464 await new Promise((r) => setTimeout(r, 500));
465 expect(connectionCount).toBe(1);
466 });
467});
468
469// ============================================
470// Stats and DID Updates
471// ============================================
472
473describe("FirehoseSubscription: stats and DID updates", () => {
474 let relay: Awaited<ReturnType<typeof startMockRelay>>;
475 let sub: FirehoseSubscription;
476
477 beforeEach(async () => {
478 relay = await startMockRelay();
479 });
480
481 afterEach(async () => {
482 sub?.stop();
483 await relay.close();
484 });
485
486 it("reports correct stats", async () => {
487 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
488 sub.onCommit(() => {});
489
490 const trackedDid = "did:plc:tracked1";
491 const otherDid = "did:plc:other";
492 sub.start(new Set([trackedDid]));
493
494 await waitFor(() => relay.getClients().size > 0);
495
496 // Send events
497 for (const client of relay.getClients()) {
498 client.send(makeCommitFrame(1, otherDid)); // filtered out
499 client.send(makeCommitFrame(2, trackedDid)); // processed
500 client.send(makeCommitFrame(3, trackedDid)); // processed
501 }
502
503 await waitFor(() => sub.getStats().eventsProcessed >= 2);
504
505 const stats = sub.getStats();
506 expect(stats.connected).toBe(true);
507 expect(stats.cursor).toBe(3);
508 expect(stats.eventsReceived).toBe(3); // all commit events received
509 expect(stats.eventsProcessed).toBe(2); // only tracked DID
510 expect(stats.trackedDids).toBe(1);
511 });
512
513 it("updateDids changes which DIDs are tracked", async () => {
514 const events: FirehoseCommitEvent[] = [];
515
516 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
517 sub.onCommit((event) => {
518 events.push(event);
519 });
520
521 const did1 = "did:plc:first";
522 const did2 = "did:plc:second";
523 sub.start(new Set([did1]));
524
525 await waitFor(() => relay.getClients().size > 0);
526
527 // Send event for did1
528 for (const client of relay.getClients()) {
529 client.send(makeCommitFrame(1, did1));
530 }
531 await waitFor(() => events.length >= 1);
532
533 // Switch tracking to did2
534 sub.updateDids(new Set([did2]));
535
536 for (const client of relay.getClients()) {
537 client.send(makeCommitFrame(2, did1)); // should be filtered
538 client.send(makeCommitFrame(3, did2)); // should be processed
539 }
540
541 await waitFor(() => events.length >= 2);
542 await new Promise((r) => setTimeout(r, 200));
543
544 expect(events).toHaveLength(2);
545 expect(events[0]!.repo).toBe(did1);
546 expect(events[1]!.repo).toBe(did2);
547 });
548});
549
550// ============================================
551// Multiple handlers
552// ============================================
553
554describe("FirehoseSubscription: multiple handlers", () => {
555 let relay: Awaited<ReturnType<typeof startMockRelay>>;
556 let sub: FirehoseSubscription;
557
558 beforeEach(async () => {
559 relay = await startMockRelay();
560 });
561
562 afterEach(async () => {
563 sub?.stop();
564 await relay.close();
565 });
566
567 it("dispatches to all registered handlers", async () => {
568 const events1: FirehoseCommitEvent[] = [];
569 const events2: FirehoseCommitEvent[] = [];
570
571 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
572 sub.onCommit((event) => { events1.push(event); });
573 sub.onCommit((event) => { events2.push(event); });
574
575 const trackedDid = "did:plc:tracked1";
576 sub.start(new Set([trackedDid]));
577
578 await waitFor(() => relay.getClients().size > 0);
579
580 for (const client of relay.getClients()) {
581 client.send(makeCommitFrame(1, trackedDid));
582 }
583
584 await waitFor(() => events1.length >= 1 && events2.length >= 1);
585
586 expect(events1).toHaveLength(1);
587 expect(events2).toHaveLength(1);
588 });
589
590 it("handler errors do not prevent other handlers from running", async () => {
591 const events: FirehoseCommitEvent[] = [];
592 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
593
594 sub = new FirehoseSubscription({ firehoseUrl: relay.url });
595 sub.onCommit(() => {
596 throw new Error("Handler 1 fails");
597 });
598 sub.onCommit((event) => { events.push(event); });
599
600 const trackedDid = "did:plc:tracked1";
601 sub.start(new Set([trackedDid]));
602
603 await waitFor(() => relay.getClients().size > 0);
604
605 for (const client of relay.getClients()) {
606 client.send(makeCommitFrame(1, trackedDid));
607 }
608
609 await waitFor(() => events.length >= 1);
610
611 expect(events).toHaveLength(1);
612 expect(consoleSpy).toHaveBeenCalled();
613
614 consoleSpy.mockRestore();
615 });
616});