/** * Subscribe to the AT Protocol firehose (com.atproto.sync.subscribeRepos) * to receive real-time updates for serviced DIDs. * * The firehose streams CBOR-encoded frames over WebSocket. Each frame * consists of two concatenated CBOR values: a header and a body. * The header contains { op, t } where t identifies the event type. * Commit events contain blocks (as CAR bytes) and ops (record changes). */ import { WebSocket } from "ws"; import { decode as cborDecode } from "../cbor-compat.js"; /** Parsed commit operation from a firehose event. */ export interface FirehoseCommitOp { action: "create" | "update" | "delete"; path: string; cid: unknown | null; } /** Parsed commit event from the firehose. */ export interface FirehoseCommitEvent { seq: number; repo: string; rev: string; since: string | null; blocks: Uint8Array; ops: FirehoseCommitOp[]; commit: unknown; time: string; tooBig: boolean; rebase: boolean; } /** Parsed account status event from the firehose. */ export interface FirehoseAccountEvent { seq: number; did: string; time: string; active: boolean; status?: string; // "takendown", "suspended", "deleted", "deactivated" } /** Callback for commit events. */ export type CommitHandler = (event: FirehoseCommitEvent) => void | Promise; /** Callback for account status events. */ export type AccountHandler = (event: FirehoseAccountEvent) => void | Promise; /** Configuration for the firehose subscription. */ export interface FirehoseSubscriptionConfig { /** WebSocket URL for the firehose relay. */ firehoseUrl: string; /** Initial reconnect delay in ms (doubles on each failure, capped at maxReconnectDelayMs). */ reconnectDelayMs?: number; /** Maximum reconnect delay in ms. */ maxReconnectDelayMs?: number; } const DEFAULT_RECONNECT_DELAY_MS = 1000; const DEFAULT_MAX_RECONNECT_DELAY_MS = 60_000; /** * FirehoseSubscription connects to an AT Protocol firehose relay and * dispatches commit events for a configured set of DIDs. * * Features: * - Filters events to only process commits for serviced DIDs (Set lookup) * - Cursor-based resumption: saves last processed seq for restart * - Auto-reconnect with exponential backoff * - Graceful shutdown */ export class FirehoseSubscription { private ws: WebSocket | null = null; private dids: Set = new Set(); private commitHandlers: CommitHandler[] = []; private accountHandlers: AccountHandler[] = []; private cursor: number | null = null; private running = false; private reconnectTimer: ReturnType | null = null; private currentReconnectDelay: number; private config: Required; /** Count of events received (all DIDs, before filtering). */ private _eventsReceived = 0; /** Count of events processed (matching serviced DIDs). */ private _eventsProcessed = 0; constructor(config: FirehoseSubscriptionConfig) { this.config = { firehoseUrl: config.firehoseUrl, reconnectDelayMs: config.reconnectDelayMs ?? DEFAULT_RECONNECT_DELAY_MS, maxReconnectDelayMs: config.maxReconnectDelayMs ?? DEFAULT_MAX_RECONNECT_DELAY_MS, }; this.currentReconnectDelay = this.config.reconnectDelayMs; } /** * Start subscribing to the firehose for the given set of DIDs. * If a cursor is provided, events will be replayed from that point. */ start(dids: Set, cursor?: number | null): void { if (this.running) return; this.running = true; this.dids = dids; if (cursor !== undefined && cursor !== null) { this.cursor = cursor; } this.connect(); } /** * Stop the firehose subscription and disconnect cleanly. */ stop(): void { this.running = false; if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.ws) { // Close with normal closure code try { this.ws.close(1000, "Shutting down"); } catch { // Ignore errors during close } this.ws = null; } } /** * Register a handler for commit events matching serviced DIDs. */ onCommit(handler: CommitHandler): void { this.commitHandlers.push(handler); } /** * Register a handler for account status events matching serviced DIDs. */ onAccount(handler: AccountHandler): void { this.accountHandlers.push(handler); } /** * Update the set of tracked DIDs without reconnecting. */ updateDids(dids: Set): void { this.dids = dids; } /** * Get the last successfully processed cursor (sequence number). */ getCursor(): number | null { return this.cursor; } /** * Get subscription statistics. */ getStats(): { connected: boolean; cursor: number | null; eventsReceived: number; eventsProcessed: number; trackedDids: number; } { return { connected: this.ws !== null && this.ws.readyState === WebSocket.OPEN, cursor: this.cursor, eventsReceived: this._eventsReceived, eventsProcessed: this._eventsProcessed, trackedDids: this.dids.size, }; } // ============================================ // Internal: Connection management // ============================================ private connect(): void { if (!this.running) return; const url = new URL(this.config.firehoseUrl); if (this.cursor !== null) { url.searchParams.set("cursor", String(this.cursor)); } const ws = new WebSocket(url.toString()); this.ws = ws; ws.binaryType = "nodebuffer"; ws.on("open", () => { // Reset reconnect delay on successful connection this.currentReconnectDelay = this.config.reconnectDelayMs; }); ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => { try { const bytes = toUint8Array(data); this.handleFrame(bytes); } catch (err) { // Log but don't crash on individual frame errors console.error("[firehose-subscription] Frame parse error:", err); } }); ws.on("close", (code: number, reason: Buffer) => { this.ws = null; if (this.running) { const reasonStr = reason.toString("utf8"); console.warn( `[firehose-subscription] Disconnected (code=${code}, reason="${reasonStr}"). Reconnecting in ${this.currentReconnectDelay}ms...`, ); this.scheduleReconnect(); } }); ws.on("error", (err: Error) => { console.error("[firehose-subscription] WebSocket error:", err.message); // The 'close' event will fire after this, triggering reconnection }); } private scheduleReconnect(): void { if (!this.running) return; if (this.reconnectTimer) return; this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; if (this.running) { this.connect(); } }, this.currentReconnectDelay); // Exponential backoff this.currentReconnectDelay = Math.min( this.currentReconnectDelay * 2, this.config.maxReconnectDelayMs, ); } // ============================================ // Internal: Frame parsing // ============================================ /** * Parse a firehose frame (two concatenated CBOR values: header + body). * * The AT Protocol firehose sends binary WebSocket frames containing: * 1. A CBOR-encoded header: { op: number, t: string } * - op=1 means a normal event, t identifies the type (#commit, #identity, etc.) * - op=-1 means an error frame * 2. A CBOR-encoded body (the event payload) */ private handleFrame(bytes: Uint8Array): void { // Decode the header (first CBOR value) const { value: header, bytesConsumed } = decodeCborWithLength(bytes); const headerObj = header as Record; const op = headerObj.op as number; const type = headerObj.t as string | undefined; // Error frame if (op === -1) { const body = cborDecode(bytes.subarray(bytesConsumed)) as Record; console.error( `[firehose-subscription] Error from relay: ${body.error}: ${body.message}`, ); return; } // Only process normal events if (op !== 1) return; // Process commit and account events if (type !== "#commit" && type !== "#account") return; const body = cborDecode(bytes.subarray(bytesConsumed)) as Record; this._eventsReceived++; // Handle account status events if (type === "#account") { const did = body.did as string | undefined; if (!did || !this.dids.has(did)) return; const event: FirehoseAccountEvent = { seq: body.seq as number, did, time: (body.time as string) ?? new Date().toISOString(), active: (body.active as boolean) ?? true, status: body.status as string | undefined, }; this.cursor = event.seq; this._eventsProcessed++; for (const handler of this.accountHandlers) { try { const result = handler(event); if (result && typeof result === "object" && "catch" in result) { (result as Promise).catch((err) => { console.error("[firehose-subscription] Account handler error:", err); }); } } catch (err) { console.error("[firehose-subscription] Account handler error:", err); } } return; } // Filter: only process events for our serviced DIDs const repo = body.repo as string | undefined; if (!repo || !this.dids.has(repo)) return; const event: FirehoseCommitEvent = { seq: body.seq as number, repo: repo, rev: body.rev as string, since: (body.since as string) ?? null, blocks: body.blocks instanceof Uint8Array ? body.blocks : new Uint8Array(0), ops: Array.isArray(body.ops) ? (body.ops as Array>).map((op) => ({ action: op.action as "create" | "update" | "delete", path: op.path as string, cid: op.cid ?? null, })) : [], commit: body.commit ?? null, time: (body.time as string) ?? new Date().toISOString(), tooBig: (body.tooBig as boolean) ?? false, rebase: (body.rebase as boolean) ?? false, }; // Update cursor to this event's sequence number this.cursor = event.seq; this._eventsProcessed++; // Dispatch to handlers for (const handler of this.commitHandlers) { try { const result = handler(event); // If handler returns a promise, catch errors but don't await if (result && typeof result === "object" && "catch" in result) { (result as Promise).catch((err) => { console.error("[firehose-subscription] Commit handler error:", err); }); } } catch (err) { console.error("[firehose-subscription] Commit handler error:", err); } } } } // ============================================ // Utility functions // ============================================ /** * Convert WebSocket message data to Uint8Array. */ function toUint8Array(data: Buffer | ArrayBuffer | Buffer[]): Uint8Array { if (data instanceof Uint8Array) { return data; } if (data instanceof ArrayBuffer) { return new Uint8Array(data); } if (Array.isArray(data)) { // Buffer[] — concatenate const totalLength = data.reduce((acc, buf) => acc + buf.length, 0); const result = new Uint8Array(totalLength); let offset = 0; for (const buf of data) { result.set(buf, offset); offset += buf.length; } return result; } return new Uint8Array(data as ArrayBuffer); } /** * Decode a single CBOR value from bytes and return the value plus * how many bytes were consumed. * * This is needed because firehose frames contain two concatenated CBOR * values and we need to split them. We use a simple approach: decode the * first value using the full buffer, then figure out where it ended * by re-encoding it (since CBOR is deterministic for our use case). * * For robustness we use a manual CBOR header parser to determine the * length of the first CBOR item (the header is always a small map). */ function decodeCborWithLength(bytes: Uint8Array): { value: unknown; bytesConsumed: number } { // The header is always a small CBOR map (2-3 entries). // We parse the CBOR manually to find where the first value ends. const consumed = cborItemLength(bytes, 0); const headerBytes = bytes.subarray(0, consumed); const value = cborDecode(headerBytes); return { value, bytesConsumed: consumed }; } /** * Calculate the byte length of a CBOR item starting at `offset` in `bytes`. * Supports the subset of CBOR types used in firehose headers: * unsigned int, negative int, byte string, text string, array, map. */ function cborItemLength(bytes: Uint8Array, offset: number): number { const initial = bytes[offset]!; const majorType = initial >> 5; const additionalInfo = initial & 0x1f; // Get the argument value and how many bytes the argument header takes let argValue: number; let headerSize: number; if (additionalInfo < 24) { argValue = additionalInfo; headerSize = 1; } else if (additionalInfo === 24) { argValue = bytes[offset + 1]!; headerSize = 2; } else if (additionalInfo === 25) { argValue = (bytes[offset + 1]! << 8) | bytes[offset + 2]!; headerSize = 3; } else if (additionalInfo === 26) { argValue = (bytes[offset + 1]! << 24) | (bytes[offset + 2]! << 16) | (bytes[offset + 3]! << 8) | bytes[offset + 4]!; headerSize = 5; } else { // 27 = 8-byte (not expected for headers), or special values throw new Error(`Unsupported CBOR additional info: ${additionalInfo}`); } switch (majorType) { case 0: // unsigned integer case 1: // negative integer return headerSize; case 2: // byte string case 3: // text string return headerSize + argValue; case 4: { // array let pos = offset + headerSize; for (let i = 0; i < argValue; i++) { pos += cborItemLength(bytes, pos); } return pos - offset; } case 5: { // map let pos = offset + headerSize; for (let i = 0; i < argValue; i++) { pos += cborItemLength(bytes, pos); // key pos += cborItemLength(bytes, pos); // value } return pos - offset; } case 6: { // tag return headerSize + cborItemLength(bytes, offset + headerSize); } case 7: // simple/float if (additionalInfo === 20 || additionalInfo === 21) return 1; // false/true if (additionalInfo === 22) return 1; // null if (additionalInfo === 25) return 3; // float16 if (additionalInfo === 26) return 5; // float32 if (additionalInfo === 27) return 9; // float64 return headerSize; default: throw new Error(`Unknown CBOR major type: ${majorType}`); } }