atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 148 lines 3.7 kB view raw
1import { WebSocketServer, WebSocket } from "ws"; 2import type { IncomingMessage } from "node:http"; 3import { encode as cborEncode } from "./cbor-compat.js"; 4import type { RepoManager } from "./repo-manager.js"; 5import type { 6 SeqEvent, 7 SeqCommitEvent, 8 SeqIdentityEvent, 9} from "./sequencer.js"; 10 11/** 12 * Firehose manages WebSocket connections for com.atproto.sync.subscribeRepos. 13 * Replaces Cloudflare's hibernatable WebSocket API with the `ws` library. 14 */ 15export class Firehose { 16 private clients = new Set<WebSocket>(); 17 18 constructor(private repoManager?: RepoManager) { 19 // Register as the event handler on the repo manager (if present) 20 if (this.repoManager) { 21 this.repoManager.onFirehoseEvent = (event) => { 22 this.broadcast(event); 23 }; 24 } 25 } 26 27 /** 28 * Get current subscriber count. 29 */ 30 get subscriberCount(): number { 31 return this.clients.size; 32 } 33 34 /** 35 * Handle a new WebSocket connection. 36 */ 37 async handleConnection(ws: WebSocket, req: IncomingMessage): Promise<void> { 38 this.clients.add(ws); 39 40 // Parse cursor from query string 41 const url = new URL(req.url ?? "/", "http://localhost"); 42 const cursorParam = url.searchParams.get("cursor"); 43 const cursor = cursorParam ? parseInt(cursorParam, 10) : null; 44 45 ws.on("close", () => { 46 this.clients.delete(ws); 47 }); 48 49 ws.on("error", (err) => { 50 console.error("WebSocket error:", err); 51 this.clients.delete(ws); 52 }); 53 54 // Backfill if cursor provided 55 if (cursor !== null) { 56 await this.backfill(ws, cursor); 57 } 58 } 59 60 /** 61 * Broadcast a firehose event to all connected clients. 62 */ 63 private broadcast(event: SeqEvent): void { 64 const frame = this.encodeEventFrame(event); 65 66 for (const ws of this.clients) { 67 if (ws.readyState === WebSocket.OPEN) { 68 try { 69 ws.send(frame); 70 } catch (e) { 71 console.error("Error broadcasting to WebSocket:", e); 72 this.clients.delete(ws); 73 } 74 } 75 } 76 } 77 78 /** 79 * Backfill events from a cursor. 80 */ 81 private async backfill(ws: WebSocket, cursor: number): Promise<void> { 82 if (!this.repoManager) { 83 // No repo manager — no local events to backfill 84 return; 85 } 86 87 const latestSeq = this.repoManager.sequencer.getLatestSeq(); 88 89 if (cursor > latestSeq) { 90 const frame = this.encodeErrorFrame( 91 "FutureCursor", 92 "Cursor is in the future", 93 ); 94 ws.send(frame); 95 ws.close(1008, "FutureCursor"); 96 return; 97 } 98 99 const events = await this.repoManager.sequencer.getEventsSince( 100 cursor, 101 1000, 102 ); 103 104 for (const event of events) { 105 if (ws.readyState !== WebSocket.OPEN) break; 106 const frame = this.encodeEventFrame(event); 107 ws.send(frame); 108 } 109 } 110 111 // ============================================ 112 // Frame Encoding 113 // ============================================ 114 115 private encodeFrame(header: object, body: object): Uint8Array { 116 const headerBytes = cborEncode(header); 117 const bodyBytes = cborEncode(body); 118 119 const frame = new Uint8Array(headerBytes.length + bodyBytes.length); 120 frame.set(headerBytes, 0); 121 frame.set(bodyBytes, headerBytes.length); 122 123 return frame; 124 } 125 126 private encodeCommitFrame(event: SeqCommitEvent): Uint8Array { 127 const header = { op: 1, t: "#commit" }; 128 return this.encodeFrame(header, event.event); 129 } 130 131 private encodeIdentityFrame(event: SeqIdentityEvent): Uint8Array { 132 const header = { op: 1, t: "#identity" }; 133 return this.encodeFrame(header, event.event); 134 } 135 136 private encodeEventFrame(event: SeqEvent): Uint8Array { 137 if (event.type === "identity") { 138 return this.encodeIdentityFrame(event); 139 } 140 return this.encodeCommitFrame(event); 141 } 142 143 private encodeErrorFrame(error: string, message: string): Uint8Array { 144 const header = { op: -1 }; 145 const body = { error, message }; 146 return this.encodeFrame(header, body); 147 } 148}