atproto user agency toolkit for individuals and groups
1import { WebSocketServer, WebSocket } from "ws";
2import type { IncomingMessage } from "node:http";
3import { encode as cborEncode } from "./cbor-compat.js";
4import type { RepoManager } from "./repo-manager.js";
5import type {
6 SeqEvent,
7 SeqCommitEvent,
8 SeqIdentityEvent,
9} from "./sequencer.js";
10
11/**
12 * Firehose manages WebSocket connections for com.atproto.sync.subscribeRepos.
13 * Replaces Cloudflare's hibernatable WebSocket API with the `ws` library.
14 */
15export class Firehose {
16 private clients = new Set<WebSocket>();
17
18 constructor(private repoManager?: RepoManager) {
19 // Register as the event handler on the repo manager (if present)
20 if (this.repoManager) {
21 this.repoManager.onFirehoseEvent = (event) => {
22 this.broadcast(event);
23 };
24 }
25 }
26
27 /**
28 * Get current subscriber count.
29 */
30 get subscriberCount(): number {
31 return this.clients.size;
32 }
33
34 /**
35 * Handle a new WebSocket connection.
36 */
37 async handleConnection(ws: WebSocket, req: IncomingMessage): Promise<void> {
38 this.clients.add(ws);
39
40 // Parse cursor from query string
41 const url = new URL(req.url ?? "/", "http://localhost");
42 const cursorParam = url.searchParams.get("cursor");
43 const cursor = cursorParam ? parseInt(cursorParam, 10) : null;
44
45 ws.on("close", () => {
46 this.clients.delete(ws);
47 });
48
49 ws.on("error", (err) => {
50 console.error("WebSocket error:", err);
51 this.clients.delete(ws);
52 });
53
54 // Backfill if cursor provided
55 if (cursor !== null) {
56 await this.backfill(ws, cursor);
57 }
58 }
59
60 /**
61 * Broadcast a firehose event to all connected clients.
62 */
63 private broadcast(event: SeqEvent): void {
64 const frame = this.encodeEventFrame(event);
65
66 for (const ws of this.clients) {
67 if (ws.readyState === WebSocket.OPEN) {
68 try {
69 ws.send(frame);
70 } catch (e) {
71 console.error("Error broadcasting to WebSocket:", e);
72 this.clients.delete(ws);
73 }
74 }
75 }
76 }
77
78 /**
79 * Backfill events from a cursor.
80 */
81 private async backfill(ws: WebSocket, cursor: number): Promise<void> {
82 if (!this.repoManager) {
83 // No repo manager — no local events to backfill
84 return;
85 }
86
87 const latestSeq = this.repoManager.sequencer.getLatestSeq();
88
89 if (cursor > latestSeq) {
90 const frame = this.encodeErrorFrame(
91 "FutureCursor",
92 "Cursor is in the future",
93 );
94 ws.send(frame);
95 ws.close(1008, "FutureCursor");
96 return;
97 }
98
99 const events = await this.repoManager.sequencer.getEventsSince(
100 cursor,
101 1000,
102 );
103
104 for (const event of events) {
105 if (ws.readyState !== WebSocket.OPEN) break;
106 const frame = this.encodeEventFrame(event);
107 ws.send(frame);
108 }
109 }
110
111 // ============================================
112 // Frame Encoding
113 // ============================================
114
115 private encodeFrame(header: object, body: object): Uint8Array {
116 const headerBytes = cborEncode(header);
117 const bodyBytes = cborEncode(body);
118
119 const frame = new Uint8Array(headerBytes.length + bodyBytes.length);
120 frame.set(headerBytes, 0);
121 frame.set(bodyBytes, headerBytes.length);
122
123 return frame;
124 }
125
126 private encodeCommitFrame(event: SeqCommitEvent): Uint8Array {
127 const header = { op: 1, t: "#commit" };
128 return this.encodeFrame(header, event.event);
129 }
130
131 private encodeIdentityFrame(event: SeqIdentityEvent): Uint8Array {
132 const header = { op: 1, t: "#identity" };
133 return this.encodeFrame(header, event.event);
134 }
135
136 private encodeEventFrame(event: SeqEvent): Uint8Array {
137 if (event.type === "identity") {
138 return this.encodeIdentityFrame(event);
139 }
140 return this.encodeCommitFrame(event);
141 }
142
143 private encodeErrorFrame(error: string, message: string): Uint8Array {
144 const header = { op: -1 };
145 const body = { error, message };
146 return this.encodeFrame(header, body);
147 }
148}