atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 616 lines 17 kB view raw
1import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 2import { mkdtempSync, rmSync } from "node:fs"; 3import { tmpdir } from "node:os"; 4import { join } from "node:path"; 5import Database from "better-sqlite3"; 6import { WebSocketServer, WebSocket } from "ws"; 7import { createServer, type Server } from "node:http"; 8import { encode as cborEncode } from "../cbor-compat.js"; 9import { SyncStorage } from "./sync-storage.js"; 10import { 11 FirehoseSubscription, 12 type FirehoseCommitEvent, 13} from "./firehose-subscription.js"; 14 15// ============================================ 16// Helpers 17// ============================================ 18 19/** Encode a firehose frame (header + body) as concatenated CBOR. */ 20function encodeFrame(header: object, body: object): Buffer { 21 const headerBytes = cborEncode(header); 22 const bodyBytes = cborEncode(body); 23 const frame = new Uint8Array(headerBytes.length + bodyBytes.length); 24 frame.set(headerBytes, 0); 25 frame.set(bodyBytes, headerBytes.length); 26 return Buffer.from(frame); 27} 28 29/** Create a mock commit frame for a given DID. */ 30function makeCommitFrame( 31 seq: number, 32 repo: string, 33 ops: Array<{ action: string; path: string; cid: null }> = [], 34): Buffer { 35 const header = { op: 1, t: "#commit" }; 36 const body = { 37 seq, 38 repo, 39 rev: `rev${seq}`, 40 since: null, 41 blocks: new Uint8Array(0), 42 ops, 43 commit: null, 44 time: new Date().toISOString(), 45 tooBig: false, 46 rebase: false, 47 }; 48 return encodeFrame(header, body); 49} 50 51/** Create a mock identity frame. */ 52function makeIdentityFrame(seq: number, did: string): Buffer { 53 const header = { op: 1, t: "#identity" }; 54 const body = { seq, did, handle: "test.bsky.social", time: new Date().toISOString() }; 55 return encodeFrame(header, body); 56} 57 58/** Create an error frame. */ 59function makeErrorFrame(error: string, message: string): Buffer { 60 const header = { op: -1 }; 61 const body = { error, message }; 62 return encodeFrame(header, body); 63} 64 65/** Start a local WebSocket server that simulates a firehose relay. */ 66function startMockRelay(): Promise<{ 67 server: Server; 68 wss: WebSocketServer; 69 url: string; 70 getClients: () => Set<WebSocket>; 71 close: () => Promise<void>; 72}> { 73 return new Promise((resolve) => { 74 const server = createServer(); 75 const wss = new WebSocketServer({ server }); 76 77 server.listen(0, "127.0.0.1", () => { 78 const addr = server.address() as { port: number }; 79 const url = `ws://127.0.0.1:${addr.port}/xrpc/com.atproto.sync.subscribeRepos`; 80 resolve({ 81 server, 82 wss, 83 url, 84 getClients: () => wss.clients, 85 close: () => 86 new Promise<void>((res) => { 87 for (const client of wss.clients) { 88 client.close(); 89 } 90 wss.close(() => { 91 server.close(() => res()); 92 }); 93 }), 94 }); 95 }); 96 }); 97} 98 99/** Wait for a condition to be true, with timeout. */ 100async function waitFor( 101 condition: () => boolean, 102 timeoutMs = 5000, 103 intervalMs = 50, 104): Promise<void> { 105 const deadline = Date.now() + timeoutMs; 106 while (!condition()) { 107 if (Date.now() > deadline) { 108 throw new Error(`waitFor timed out after ${timeoutMs}ms`); 109 } 110 await new Promise((r) => setTimeout(r, intervalMs)); 111 } 112} 113 114// ============================================ 115// CBOR Frame Parsing 116// ============================================ 117 118describe("FirehoseSubscription: frame parsing", () => { 119 let relay: Awaited<ReturnType<typeof startMockRelay>>; 120 let sub: FirehoseSubscription; 121 122 beforeEach(async () => { 123 relay = await startMockRelay(); 124 }); 125 126 afterEach(async () => { 127 sub?.stop(); 128 await relay.close(); 129 }); 130 131 it("parses commit events for tracked DIDs", async () => { 132 const events: FirehoseCommitEvent[] = []; 133 134 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 135 sub.onCommit((event) => { 136 events.push(event); 137 }); 138 139 const trackedDid = "did:plc:tracked1"; 140 sub.start(new Set([trackedDid])); 141 142 // Wait for client to connect 143 await waitFor(() => relay.getClients().size > 0); 144 145 // Send a commit for the tracked DID 146 const frame = makeCommitFrame(1, trackedDid, [ 147 { action: "create", path: "app.bsky.feed.post/abc", cid: null }, 148 ]); 149 for (const client of relay.getClients()) { 150 client.send(frame); 151 } 152 153 // Wait for event to be processed 154 await waitFor(() => events.length > 0); 155 156 expect(events).toHaveLength(1); 157 expect(events[0]!.seq).toBe(1); 158 expect(events[0]!.repo).toBe(trackedDid); 159 expect(events[0]!.ops).toHaveLength(1); 160 expect(events[0]!.ops[0]!.action).toBe("create"); 161 expect(events[0]!.ops[0]!.path).toBe("app.bsky.feed.post/abc"); 162 }); 163 164 it("filters out events for non-tracked DIDs", async () => { 165 const events: FirehoseCommitEvent[] = []; 166 167 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 168 sub.onCommit((event) => { 169 events.push(event); 170 }); 171 172 const trackedDid = "did:plc:tracked1"; 173 const untrackedDid = "did:plc:untracked1"; 174 sub.start(new Set([trackedDid])); 175 176 await waitFor(() => relay.getClients().size > 0); 177 178 // Send events for both tracked and untracked DIDs 179 for (const client of relay.getClients()) { 180 client.send(makeCommitFrame(1, untrackedDid)); 181 client.send(makeCommitFrame(2, trackedDid)); 182 client.send(makeCommitFrame(3, untrackedDid)); 183 } 184 185 await waitFor(() => events.length >= 1); 186 // Small delay to ensure no extra events arrive 187 await new Promise((r) => setTimeout(r, 200)); 188 189 // Only the tracked DID's event should have been dispatched 190 expect(events).toHaveLength(1); 191 expect(events[0]!.repo).toBe(trackedDid); 192 expect(events[0]!.seq).toBe(2); 193 }); 194 195 it("ignores identity events (only processes commits)", async () => { 196 const events: FirehoseCommitEvent[] = []; 197 198 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 199 sub.onCommit((event) => { 200 events.push(event); 201 }); 202 203 const trackedDid = "did:plc:tracked1"; 204 sub.start(new Set([trackedDid])); 205 206 await waitFor(() => relay.getClients().size > 0); 207 208 for (const client of relay.getClients()) { 209 client.send(makeIdentityFrame(1, trackedDid)); 210 client.send(makeCommitFrame(2, trackedDid)); 211 } 212 213 await waitFor(() => events.length >= 1); 214 await new Promise((r) => setTimeout(r, 200)); 215 216 // Only commit events should be dispatched 217 expect(events).toHaveLength(1); 218 expect(events[0]!.seq).toBe(2); 219 }); 220 221 it("handles error frames without crashing", async () => { 222 const events: FirehoseCommitEvent[] = []; 223 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 224 225 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 226 sub.onCommit((event) => { 227 events.push(event); 228 }); 229 230 const trackedDid = "did:plc:tracked1"; 231 sub.start(new Set([trackedDid])); 232 233 await waitFor(() => relay.getClients().size > 0); 234 235 for (const client of relay.getClients()) { 236 client.send(makeErrorFrame("ConsumerTooSlow", "Consumer is too slow")); 237 client.send(makeCommitFrame(1, trackedDid)); 238 } 239 240 await waitFor(() => events.length >= 1); 241 242 // Error was logged (single string argument containing the prefix) 243 expect(consoleSpy).toHaveBeenCalledWith( 244 expect.stringContaining("[firehose-subscription] Error from relay"), 245 ); 246 247 // Commit event still processed 248 expect(events).toHaveLength(1); 249 250 consoleSpy.mockRestore(); 251 }); 252}); 253 254// ============================================ 255// Cursor Management 256// ============================================ 257 258describe("FirehoseSubscription: cursor management", () => { 259 let relay: Awaited<ReturnType<typeof startMockRelay>>; 260 let sub: FirehoseSubscription; 261 262 beforeEach(async () => { 263 relay = await startMockRelay(); 264 }); 265 266 afterEach(async () => { 267 sub?.stop(); 268 await relay.close(); 269 }); 270 271 it("updates cursor as events are processed", async () => { 272 const events: FirehoseCommitEvent[] = []; 273 274 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 275 sub.onCommit((event) => { 276 events.push(event); 277 }); 278 279 const trackedDid = "did:plc:tracked1"; 280 sub.start(new Set([trackedDid])); 281 282 await waitFor(() => relay.getClients().size > 0); 283 284 expect(sub.getCursor()).toBeNull(); 285 286 for (const client of relay.getClients()) { 287 client.send(makeCommitFrame(10, trackedDid)); 288 } 289 await waitFor(() => events.length >= 1); 290 expect(sub.getCursor()).toBe(10); 291 292 for (const client of relay.getClients()) { 293 client.send(makeCommitFrame(20, trackedDid)); 294 } 295 await waitFor(() => events.length >= 2); 296 expect(sub.getCursor()).toBe(20); 297 }); 298 299 it("passes cursor to relay URL on connection", async () => { 300 let receivedUrl: string | undefined; 301 302 relay.wss.on("connection", (_ws, req) => { 303 receivedUrl = req.url; 304 }); 305 306 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 307 sub.start(new Set(["did:plc:test"]), 12345); 308 309 await waitFor(() => relay.getClients().size > 0); 310 311 expect(receivedUrl).toContain("cursor=12345"); 312 }); 313 314 it("resumes from cursor on reconnect", async () => { 315 const events: FirehoseCommitEvent[] = []; 316 const urls: string[] = []; 317 318 relay.wss.on("connection", (_ws, req) => { 319 urls.push(req.url ?? ""); 320 }); 321 322 sub = new FirehoseSubscription({ 323 firehoseUrl: relay.url, 324 reconnectDelayMs: 100, 325 }); 326 sub.onCommit((event) => { 327 events.push(event); 328 }); 329 330 const trackedDid = "did:plc:tracked1"; 331 sub.start(new Set([trackedDid])); 332 333 await waitFor(() => relay.getClients().size > 0); 334 335 // Send an event to set the cursor 336 for (const client of relay.getClients()) { 337 client.send(makeCommitFrame(42, trackedDid)); 338 } 339 await waitFor(() => events.length >= 1); 340 expect(sub.getCursor()).toBe(42); 341 342 // Close all clients to trigger reconnect 343 for (const client of relay.getClients()) { 344 client.close(); 345 } 346 347 // Wait for reconnect 348 await waitFor(() => urls.length >= 2, 5000); 349 350 // The reconnection URL should include the cursor 351 expect(urls[1]).toContain("cursor=42"); 352 }); 353}); 354 355// ============================================ 356// SyncStorage: Firehose Cursor Persistence 357// ============================================ 358 359describe("SyncStorage: firehose cursor", () => { 360 let tmpDir: string; 361 let db: InstanceType<typeof Database>; 362 let storage: SyncStorage; 363 364 beforeEach(() => { 365 tmpDir = mkdtempSync(join(tmpdir(), "firehose-cursor-test-")); 366 db = new Database(join(tmpDir, "test.db")); 367 storage = new SyncStorage(db); 368 storage.initSchema(); 369 }); 370 371 afterEach(() => { 372 db.close(); 373 rmSync(tmpDir, { recursive: true, force: true }); 374 }); 375 376 it("returns null when no cursor saved", () => { 377 expect(storage.getFirehoseCursor()).toBeNull(); 378 }); 379 380 it("saves and retrieves cursor", () => { 381 storage.saveFirehoseCursor(12345); 382 expect(storage.getFirehoseCursor()).toBe(12345); 383 }); 384 385 it("updates cursor on subsequent saves", () => { 386 storage.saveFirehoseCursor(100); 387 expect(storage.getFirehoseCursor()).toBe(100); 388 389 storage.saveFirehoseCursor(200); 390 expect(storage.getFirehoseCursor()).toBe(200); 391 }); 392 393 it("clears cursor", () => { 394 storage.saveFirehoseCursor(100); 395 storage.clearFirehoseCursor(); 396 expect(storage.getFirehoseCursor()).toBeNull(); 397 }); 398}); 399 400// ============================================ 401// Reconnection Logic 402// ============================================ 403 404describe("FirehoseSubscription: reconnection", () => { 405 let relay: Awaited<ReturnType<typeof startMockRelay>>; 406 let sub: FirehoseSubscription; 407 408 beforeEach(async () => { 409 relay = await startMockRelay(); 410 }); 411 412 afterEach(async () => { 413 sub?.stop(); 414 await relay.close(); 415 }); 416 417 it("reconnects after disconnection", async () => { 418 let connectionCount = 0; 419 relay.wss.on("connection", () => { 420 connectionCount++; 421 }); 422 423 sub = new FirehoseSubscription({ 424 firehoseUrl: relay.url, 425 reconnectDelayMs: 100, 426 maxReconnectDelayMs: 500, 427 }); 428 sub.start(new Set(["did:plc:test"])); 429 430 await waitFor(() => connectionCount >= 1); 431 432 // Close server-side connections to trigger reconnect 433 for (const client of relay.getClients()) { 434 client.close(); 435 } 436 437 await waitFor(() => connectionCount >= 2, 5000); 438 expect(connectionCount).toBeGreaterThanOrEqual(2); 439 }); 440 441 it("stop() prevents reconnection", async () => { 442 let connectionCount = 0; 443 relay.wss.on("connection", () => { 444 connectionCount++; 445 }); 446 447 sub = new FirehoseSubscription({ 448 firehoseUrl: relay.url, 449 reconnectDelayMs: 100, 450 }); 451 sub.start(new Set(["did:plc:test"])); 452 453 await waitFor(() => connectionCount >= 1); 454 455 // Stop before disconnection 456 sub.stop(); 457 458 // Close server-side connections 459 for (const client of relay.getClients()) { 460 client.close(); 461 } 462 463 // Wait and verify no reconnection 464 await new Promise((r) => setTimeout(r, 500)); 465 expect(connectionCount).toBe(1); 466 }); 467}); 468 469// ============================================ 470// Stats and DID Updates 471// ============================================ 472 473describe("FirehoseSubscription: stats and DID updates", () => { 474 let relay: Awaited<ReturnType<typeof startMockRelay>>; 475 let sub: FirehoseSubscription; 476 477 beforeEach(async () => { 478 relay = await startMockRelay(); 479 }); 480 481 afterEach(async () => { 482 sub?.stop(); 483 await relay.close(); 484 }); 485 486 it("reports correct stats", async () => { 487 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 488 sub.onCommit(() => {}); 489 490 const trackedDid = "did:plc:tracked1"; 491 const otherDid = "did:plc:other"; 492 sub.start(new Set([trackedDid])); 493 494 await waitFor(() => relay.getClients().size > 0); 495 496 // Send events 497 for (const client of relay.getClients()) { 498 client.send(makeCommitFrame(1, otherDid)); // filtered out 499 client.send(makeCommitFrame(2, trackedDid)); // processed 500 client.send(makeCommitFrame(3, trackedDid)); // processed 501 } 502 503 await waitFor(() => sub.getStats().eventsProcessed >= 2); 504 505 const stats = sub.getStats(); 506 expect(stats.connected).toBe(true); 507 expect(stats.cursor).toBe(3); 508 expect(stats.eventsReceived).toBe(3); // all commit events received 509 expect(stats.eventsProcessed).toBe(2); // only tracked DID 510 expect(stats.trackedDids).toBe(1); 511 }); 512 513 it("updateDids changes which DIDs are tracked", async () => { 514 const events: FirehoseCommitEvent[] = []; 515 516 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 517 sub.onCommit((event) => { 518 events.push(event); 519 }); 520 521 const did1 = "did:plc:first"; 522 const did2 = "did:plc:second"; 523 sub.start(new Set([did1])); 524 525 await waitFor(() => relay.getClients().size > 0); 526 527 // Send event for did1 528 for (const client of relay.getClients()) { 529 client.send(makeCommitFrame(1, did1)); 530 } 531 await waitFor(() => events.length >= 1); 532 533 // Switch tracking to did2 534 sub.updateDids(new Set([did2])); 535 536 for (const client of relay.getClients()) { 537 client.send(makeCommitFrame(2, did1)); // should be filtered 538 client.send(makeCommitFrame(3, did2)); // should be processed 539 } 540 541 await waitFor(() => events.length >= 2); 542 await new Promise((r) => setTimeout(r, 200)); 543 544 expect(events).toHaveLength(2); 545 expect(events[0]!.repo).toBe(did1); 546 expect(events[1]!.repo).toBe(did2); 547 }); 548}); 549 550// ============================================ 551// Multiple handlers 552// ============================================ 553 554describe("FirehoseSubscription: multiple handlers", () => { 555 let relay: Awaited<ReturnType<typeof startMockRelay>>; 556 let sub: FirehoseSubscription; 557 558 beforeEach(async () => { 559 relay = await startMockRelay(); 560 }); 561 562 afterEach(async () => { 563 sub?.stop(); 564 await relay.close(); 565 }); 566 567 it("dispatches to all registered handlers", async () => { 568 const events1: FirehoseCommitEvent[] = []; 569 const events2: FirehoseCommitEvent[] = []; 570 571 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 572 sub.onCommit((event) => { events1.push(event); }); 573 sub.onCommit((event) => { events2.push(event); }); 574 575 const trackedDid = "did:plc:tracked1"; 576 sub.start(new Set([trackedDid])); 577 578 await waitFor(() => relay.getClients().size > 0); 579 580 for (const client of relay.getClients()) { 581 client.send(makeCommitFrame(1, trackedDid)); 582 } 583 584 await waitFor(() => events1.length >= 1 && events2.length >= 1); 585 586 expect(events1).toHaveLength(1); 587 expect(events2).toHaveLength(1); 588 }); 589 590 it("handler errors do not prevent other handlers from running", async () => { 591 const events: FirehoseCommitEvent[] = []; 592 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 593 594 sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 595 sub.onCommit(() => { 596 throw new Error("Handler 1 fails"); 597 }); 598 sub.onCommit((event) => { events.push(event); }); 599 600 const trackedDid = "did:plc:tracked1"; 601 sub.start(new Set([trackedDid])); 602 603 await waitFor(() => relay.getClients().size > 0); 604 605 for (const client of relay.getClients()) { 606 client.send(makeCommitFrame(1, trackedDid)); 607 } 608 609 await waitFor(() => events.length >= 1); 610 611 expect(events).toHaveLength(1); 612 expect(consoleSpy).toHaveBeenCalled(); 613 614 consoleSpy.mockRestore(); 615 }); 616});