import { WebSocketServer, WebSocket } from "ws"; import type { IncomingMessage } from "node:http"; import { encode as cborEncode } from "./cbor-compat.js"; import type { RepoManager } from "./repo-manager.js"; import type { SeqEvent, SeqCommitEvent, SeqIdentityEvent, } from "./sequencer.js"; /** * Firehose manages WebSocket connections for com.atproto.sync.subscribeRepos. * Replaces Cloudflare's hibernatable WebSocket API with the `ws` library. */ export class Firehose { private clients = new Set(); constructor(private repoManager?: RepoManager) { // Register as the event handler on the repo manager (if present) if (this.repoManager) { this.repoManager.onFirehoseEvent = (event) => { this.broadcast(event); }; } } /** * Get current subscriber count. */ get subscriberCount(): number { return this.clients.size; } /** * Handle a new WebSocket connection. */ async handleConnection(ws: WebSocket, req: IncomingMessage): Promise { this.clients.add(ws); // Parse cursor from query string const url = new URL(req.url ?? "/", "http://localhost"); const cursorParam = url.searchParams.get("cursor"); const cursor = cursorParam ? parseInt(cursorParam, 10) : null; ws.on("close", () => { this.clients.delete(ws); }); ws.on("error", (err) => { console.error("WebSocket error:", err); this.clients.delete(ws); }); // Backfill if cursor provided if (cursor !== null) { await this.backfill(ws, cursor); } } /** * Broadcast a firehose event to all connected clients. */ private broadcast(event: SeqEvent): void { const frame = this.encodeEventFrame(event); for (const ws of this.clients) { if (ws.readyState === WebSocket.OPEN) { try { ws.send(frame); } catch (e) { console.error("Error broadcasting to WebSocket:", e); this.clients.delete(ws); } } } } /** * Backfill events from a cursor. */ private async backfill(ws: WebSocket, cursor: number): Promise { if (!this.repoManager) { // No repo manager — no local events to backfill return; } const latestSeq = this.repoManager.sequencer.getLatestSeq(); if (cursor > latestSeq) { const frame = this.encodeErrorFrame( "FutureCursor", "Cursor is in the future", ); ws.send(frame); ws.close(1008, "FutureCursor"); return; } const events = await this.repoManager.sequencer.getEventsSince( cursor, 1000, ); for (const event of events) { if (ws.readyState !== WebSocket.OPEN) break; const frame = this.encodeEventFrame(event); ws.send(frame); } } // ============================================ // Frame Encoding // ============================================ private encodeFrame(header: object, body: object): Uint8Array { const headerBytes = cborEncode(header); const bodyBytes = cborEncode(body); const frame = new Uint8Array(headerBytes.length + bodyBytes.length); frame.set(headerBytes, 0); frame.set(bodyBytes, headerBytes.length); return frame; } private encodeCommitFrame(event: SeqCommitEvent): Uint8Array { const header = { op: 1, t: "#commit" }; return this.encodeFrame(header, event.event); } private encodeIdentityFrame(event: SeqIdentityEvent): Uint8Array { const header = { op: 1, t: "#identity" }; return this.encodeFrame(header, event.event); } private encodeEventFrame(event: SeqEvent): Uint8Array { if (event.type === "identity") { return this.encodeIdentityFrame(event); } return this.encodeCommitFrame(event); } private encodeErrorFrame(error: string, message: string): Uint8Array { const header = { op: -1 }; const body = { error, message }; return this.encodeFrame(header, body); } }