atproto user agency toolkit for individuals and groups
1/**
2 * Subscribe to the AT Protocol firehose (com.atproto.sync.subscribeRepos)
3 * to receive real-time updates for serviced DIDs.
4 *
5 * The firehose streams CBOR-encoded frames over WebSocket. Each frame
6 * consists of two concatenated CBOR values: a header and a body.
7 * The header contains { op, t } where t identifies the event type.
8 * Commit events contain blocks (as CAR bytes) and ops (record changes).
9 */
10
11import { WebSocket } from "ws";
12import { decode as cborDecode } from "../cbor-compat.js";
13
14/** Parsed commit operation from a firehose event. */
15export interface FirehoseCommitOp {
16 action: "create" | "update" | "delete";
17 path: string;
18 cid: unknown | null;
19}
20
21/** Parsed commit event from the firehose. */
22export interface FirehoseCommitEvent {
23 seq: number;
24 repo: string;
25 rev: string;
26 since: string | null;
27 blocks: Uint8Array;
28 ops: FirehoseCommitOp[];
29 commit: unknown;
30 time: string;
31 tooBig: boolean;
32 rebase: boolean;
33}
34
35/** Parsed account status event from the firehose. */
36export interface FirehoseAccountEvent {
37 seq: number;
38 did: string;
39 time: string;
40 active: boolean;
41 status?: string; // "takendown", "suspended", "deleted", "deactivated"
42}
43
44/** Callback for commit events. */
45export type CommitHandler = (event: FirehoseCommitEvent) => void | Promise<void>;
46
47/** Callback for account status events. */
48export type AccountHandler = (event: FirehoseAccountEvent) => void | Promise<void>;
49
50/** Configuration for the firehose subscription. */
51export interface FirehoseSubscriptionConfig {
52 /** WebSocket URL for the firehose relay. */
53 firehoseUrl: string;
54 /** Initial reconnect delay in ms (doubles on each failure, capped at maxReconnectDelayMs). */
55 reconnectDelayMs?: number;
56 /** Maximum reconnect delay in ms. */
57 maxReconnectDelayMs?: number;
58}
59
60const DEFAULT_RECONNECT_DELAY_MS = 1000;
61const DEFAULT_MAX_RECONNECT_DELAY_MS = 60_000;
62
63/**
64 * FirehoseSubscription connects to an AT Protocol firehose relay and
65 * dispatches commit events for a configured set of DIDs.
66 *
67 * Features:
68 * - Filters events to only process commits for serviced DIDs (Set lookup)
69 * - Cursor-based resumption: saves last processed seq for restart
70 * - Auto-reconnect with exponential backoff
71 * - Graceful shutdown
72 */
73export class FirehoseSubscription {
74 private ws: WebSocket | null = null;
75 private dids: Set<string> = new Set();
76 private commitHandlers: CommitHandler[] = [];
77 private accountHandlers: AccountHandler[] = [];
78 private cursor: number | null = null;
79 private running = false;
80 private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
81 private currentReconnectDelay: number;
82 private config: Required<FirehoseSubscriptionConfig>;
83
84 /** Count of events received (all DIDs, before filtering). */
85 private _eventsReceived = 0;
86 /** Count of events processed (matching serviced DIDs). */
87 private _eventsProcessed = 0;
88
89 constructor(config: FirehoseSubscriptionConfig) {
90 this.config = {
91 firehoseUrl: config.firehoseUrl,
92 reconnectDelayMs: config.reconnectDelayMs ?? DEFAULT_RECONNECT_DELAY_MS,
93 maxReconnectDelayMs: config.maxReconnectDelayMs ?? DEFAULT_MAX_RECONNECT_DELAY_MS,
94 };
95 this.currentReconnectDelay = this.config.reconnectDelayMs;
96 }
97
98 /**
99 * Start subscribing to the firehose for the given set of DIDs.
100 * If a cursor is provided, events will be replayed from that point.
101 */
102 start(dids: Set<string>, cursor?: number | null): void {
103 if (this.running) return;
104 this.running = true;
105 this.dids = dids;
106 if (cursor !== undefined && cursor !== null) {
107 this.cursor = cursor;
108 }
109 this.connect();
110 }
111
112 /**
113 * Stop the firehose subscription and disconnect cleanly.
114 */
115 stop(): void {
116 this.running = false;
117 if (this.reconnectTimer) {
118 clearTimeout(this.reconnectTimer);
119 this.reconnectTimer = null;
120 }
121 if (this.ws) {
122 // Close with normal closure code
123 try {
124 this.ws.close(1000, "Shutting down");
125 } catch {
126 // Ignore errors during close
127 }
128 this.ws = null;
129 }
130 }
131
132 /**
133 * Register a handler for commit events matching serviced DIDs.
134 */
135 onCommit(handler: CommitHandler): void {
136 this.commitHandlers.push(handler);
137 }
138
139 /**
140 * Register a handler for account status events matching serviced DIDs.
141 */
142 onAccount(handler: AccountHandler): void {
143 this.accountHandlers.push(handler);
144 }
145
146 /**
147 * Update the set of tracked DIDs without reconnecting.
148 */
149 updateDids(dids: Set<string>): void {
150 this.dids = dids;
151 }
152
153 /**
154 * Get the last successfully processed cursor (sequence number).
155 */
156 getCursor(): number | null {
157 return this.cursor;
158 }
159
160 /**
161 * Get subscription statistics.
162 */
163 getStats(): {
164 connected: boolean;
165 cursor: number | null;
166 eventsReceived: number;
167 eventsProcessed: number;
168 trackedDids: number;
169 } {
170 return {
171 connected: this.ws !== null && this.ws.readyState === WebSocket.OPEN,
172 cursor: this.cursor,
173 eventsReceived: this._eventsReceived,
174 eventsProcessed: this._eventsProcessed,
175 trackedDids: this.dids.size,
176 };
177 }
178
179 // ============================================
180 // Internal: Connection management
181 // ============================================
182
183 private connect(): void {
184 if (!this.running) return;
185
186 const url = new URL(this.config.firehoseUrl);
187 if (this.cursor !== null) {
188 url.searchParams.set("cursor", String(this.cursor));
189 }
190
191 const ws = new WebSocket(url.toString());
192 this.ws = ws;
193
194 ws.binaryType = "nodebuffer";
195
196 ws.on("open", () => {
197 // Reset reconnect delay on successful connection
198 this.currentReconnectDelay = this.config.reconnectDelayMs;
199 });
200
201 ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => {
202 try {
203 const bytes = toUint8Array(data);
204 this.handleFrame(bytes);
205 } catch (err) {
206 // Log but don't crash on individual frame errors
207 console.error("[firehose-subscription] Frame parse error:", err);
208 }
209 });
210
211 ws.on("close", (code: number, reason: Buffer) => {
212 this.ws = null;
213 if (this.running) {
214 const reasonStr = reason.toString("utf8");
215 console.warn(
216 `[firehose-subscription] Disconnected (code=${code}, reason="${reasonStr}"). Reconnecting in ${this.currentReconnectDelay}ms...`,
217 );
218 this.scheduleReconnect();
219 }
220 });
221
222 ws.on("error", (err: Error) => {
223 console.error("[firehose-subscription] WebSocket error:", err.message);
224 // The 'close' event will fire after this, triggering reconnection
225 });
226 }
227
228 private scheduleReconnect(): void {
229 if (!this.running) return;
230 if (this.reconnectTimer) return;
231
232 this.reconnectTimer = setTimeout(() => {
233 this.reconnectTimer = null;
234 if (this.running) {
235 this.connect();
236 }
237 }, this.currentReconnectDelay);
238
239 // Exponential backoff
240 this.currentReconnectDelay = Math.min(
241 this.currentReconnectDelay * 2,
242 this.config.maxReconnectDelayMs,
243 );
244 }
245
246 // ============================================
247 // Internal: Frame parsing
248 // ============================================
249
250 /**
251 * Parse a firehose frame (two concatenated CBOR values: header + body).
252 *
253 * The AT Protocol firehose sends binary WebSocket frames containing:
254 * 1. A CBOR-encoded header: { op: number, t: string }
255 * - op=1 means a normal event, t identifies the type (#commit, #identity, etc.)
256 * - op=-1 means an error frame
257 * 2. A CBOR-encoded body (the event payload)
258 */
259 private handleFrame(bytes: Uint8Array): void {
260 // Decode the header (first CBOR value)
261 const { value: header, bytesConsumed } = decodeCborWithLength(bytes);
262 const headerObj = header as Record<string, unknown>;
263
264 const op = headerObj.op as number;
265 const type = headerObj.t as string | undefined;
266
267 // Error frame
268 if (op === -1) {
269 const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>;
270 console.error(
271 `[firehose-subscription] Error from relay: ${body.error}: ${body.message}`,
272 );
273 return;
274 }
275
276 // Only process normal events
277 if (op !== 1) return;
278
279 // Process commit and account events
280 if (type !== "#commit" && type !== "#account") return;
281
282 const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>;
283 this._eventsReceived++;
284
285 // Handle account status events
286 if (type === "#account") {
287 const did = body.did as string | undefined;
288 if (!did || !this.dids.has(did)) return;
289
290 const event: FirehoseAccountEvent = {
291 seq: body.seq as number,
292 did,
293 time: (body.time as string) ?? new Date().toISOString(),
294 active: (body.active as boolean) ?? true,
295 status: body.status as string | undefined,
296 };
297
298 this.cursor = event.seq;
299 this._eventsProcessed++;
300
301 for (const handler of this.accountHandlers) {
302 try {
303 const result = handler(event);
304 if (result && typeof result === "object" && "catch" in result) {
305 (result as Promise<void>).catch((err) => {
306 console.error("[firehose-subscription] Account handler error:", err);
307 });
308 }
309 } catch (err) {
310 console.error("[firehose-subscription] Account handler error:", err);
311 }
312 }
313 return;
314 }
315
316 // Filter: only process events for our serviced DIDs
317 const repo = body.repo as string | undefined;
318 if (!repo || !this.dids.has(repo)) return;
319
320 const event: FirehoseCommitEvent = {
321 seq: body.seq as number,
322 repo: repo,
323 rev: body.rev as string,
324 since: (body.since as string) ?? null,
325 blocks: body.blocks instanceof Uint8Array
326 ? body.blocks
327 : new Uint8Array(0),
328 ops: Array.isArray(body.ops)
329 ? (body.ops as Array<Record<string, unknown>>).map((op) => ({
330 action: op.action as "create" | "update" | "delete",
331 path: op.path as string,
332 cid: op.cid ?? null,
333 }))
334 : [],
335 commit: body.commit ?? null,
336 time: (body.time as string) ?? new Date().toISOString(),
337 tooBig: (body.tooBig as boolean) ?? false,
338 rebase: (body.rebase as boolean) ?? false,
339 };
340
341 // Update cursor to this event's sequence number
342 this.cursor = event.seq;
343 this._eventsProcessed++;
344
345 // Dispatch to handlers
346 for (const handler of this.commitHandlers) {
347 try {
348 const result = handler(event);
349 // If handler returns a promise, catch errors but don't await
350 if (result && typeof result === "object" && "catch" in result) {
351 (result as Promise<void>).catch((err) => {
352 console.error("[firehose-subscription] Commit handler error:", err);
353 });
354 }
355 } catch (err) {
356 console.error("[firehose-subscription] Commit handler error:", err);
357 }
358 }
359 }
360}
361
362// ============================================
363// Utility functions
364// ============================================
365
366/**
367 * Convert WebSocket message data to Uint8Array.
368 */
369function toUint8Array(data: Buffer | ArrayBuffer | Buffer[]): Uint8Array {
370 if (data instanceof Uint8Array) {
371 return data;
372 }
373 if (data instanceof ArrayBuffer) {
374 return new Uint8Array(data);
375 }
376 if (Array.isArray(data)) {
377 // Buffer[] — concatenate
378 const totalLength = data.reduce((acc, buf) => acc + buf.length, 0);
379 const result = new Uint8Array(totalLength);
380 let offset = 0;
381 for (const buf of data) {
382 result.set(buf, offset);
383 offset += buf.length;
384 }
385 return result;
386 }
387 return new Uint8Array(data as ArrayBuffer);
388}
389
390/**
391 * Decode a single CBOR value from bytes and return the value plus
392 * how many bytes were consumed.
393 *
394 * This is needed because firehose frames contain two concatenated CBOR
395 * values and we need to split them. We use a simple approach: decode the
396 * first value using the full buffer, then figure out where it ended
397 * by re-encoding it (since CBOR is deterministic for our use case).
398 *
399 * For robustness we use a manual CBOR header parser to determine the
400 * length of the first CBOR item (the header is always a small map).
401 */
402function decodeCborWithLength(bytes: Uint8Array): { value: unknown; bytesConsumed: number } {
403 // The header is always a small CBOR map (2-3 entries).
404 // We parse the CBOR manually to find where the first value ends.
405 const consumed = cborItemLength(bytes, 0);
406 const headerBytes = bytes.subarray(0, consumed);
407 const value = cborDecode(headerBytes);
408 return { value, bytesConsumed: consumed };
409}
410
411/**
412 * Calculate the byte length of a CBOR item starting at `offset` in `bytes`.
413 * Supports the subset of CBOR types used in firehose headers:
414 * unsigned int, negative int, byte string, text string, array, map.
415 */
416function cborItemLength(bytes: Uint8Array, offset: number): number {
417 const initial = bytes[offset]!;
418 const majorType = initial >> 5;
419 const additionalInfo = initial & 0x1f;
420
421 // Get the argument value and how many bytes the argument header takes
422 let argValue: number;
423 let headerSize: number;
424
425 if (additionalInfo < 24) {
426 argValue = additionalInfo;
427 headerSize = 1;
428 } else if (additionalInfo === 24) {
429 argValue = bytes[offset + 1]!;
430 headerSize = 2;
431 } else if (additionalInfo === 25) {
432 argValue = (bytes[offset + 1]! << 8) | bytes[offset + 2]!;
433 headerSize = 3;
434 } else if (additionalInfo === 26) {
435 argValue =
436 (bytes[offset + 1]! << 24) |
437 (bytes[offset + 2]! << 16) |
438 (bytes[offset + 3]! << 8) |
439 bytes[offset + 4]!;
440 headerSize = 5;
441 } else {
442 // 27 = 8-byte (not expected for headers), or special values
443 throw new Error(`Unsupported CBOR additional info: ${additionalInfo}`);
444 }
445
446 switch (majorType) {
447 case 0: // unsigned integer
448 case 1: // negative integer
449 return headerSize;
450 case 2: // byte string
451 case 3: // text string
452 return headerSize + argValue;
453 case 4: { // array
454 let pos = offset + headerSize;
455 for (let i = 0; i < argValue; i++) {
456 pos += cborItemLength(bytes, pos);
457 }
458 return pos - offset;
459 }
460 case 5: { // map
461 let pos = offset + headerSize;
462 for (let i = 0; i < argValue; i++) {
463 pos += cborItemLength(bytes, pos); // key
464 pos += cborItemLength(bytes, pos); // value
465 }
466 return pos - offset;
467 }
468 case 6: { // tag
469 return headerSize + cborItemLength(bytes, offset + headerSize);
470 }
471 case 7: // simple/float
472 if (additionalInfo === 20 || additionalInfo === 21) return 1; // false/true
473 if (additionalInfo === 22) return 1; // null
474 if (additionalInfo === 25) return 3; // float16
475 if (additionalInfo === 26) return 5; // float32
476 if (additionalInfo === 27) return 9; // float64
477 return headerSize;
478 default:
479 throw new Error(`Unknown CBOR major type: ${majorType}`);
480 }
481}