atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 481 lines 14 kB view raw
1/** 2 * Subscribe to the AT Protocol firehose (com.atproto.sync.subscribeRepos) 3 * to receive real-time updates for serviced DIDs. 4 * 5 * The firehose streams CBOR-encoded frames over WebSocket. Each frame 6 * consists of two concatenated CBOR values: a header and a body. 7 * The header contains { op, t } where t identifies the event type. 8 * Commit events contain blocks (as CAR bytes) and ops (record changes). 9 */ 10 11import { WebSocket } from "ws"; 12import { decode as cborDecode } from "../cbor-compat.js"; 13 14/** Parsed commit operation from a firehose event. */ 15export interface FirehoseCommitOp { 16 action: "create" | "update" | "delete"; 17 path: string; 18 cid: unknown | null; 19} 20 21/** Parsed commit event from the firehose. */ 22export interface FirehoseCommitEvent { 23 seq: number; 24 repo: string; 25 rev: string; 26 since: string | null; 27 blocks: Uint8Array; 28 ops: FirehoseCommitOp[]; 29 commit: unknown; 30 time: string; 31 tooBig: boolean; 32 rebase: boolean; 33} 34 35/** Parsed account status event from the firehose. */ 36export interface FirehoseAccountEvent { 37 seq: number; 38 did: string; 39 time: string; 40 active: boolean; 41 status?: string; // "takendown", "suspended", "deleted", "deactivated" 42} 43 44/** Callback for commit events. */ 45export type CommitHandler = (event: FirehoseCommitEvent) => void | Promise<void>; 46 47/** Callback for account status events. */ 48export type AccountHandler = (event: FirehoseAccountEvent) => void | Promise<void>; 49 50/** Configuration for the firehose subscription. */ 51export interface FirehoseSubscriptionConfig { 52 /** WebSocket URL for the firehose relay. */ 53 firehoseUrl: string; 54 /** Initial reconnect delay in ms (doubles on each failure, capped at maxReconnectDelayMs). */ 55 reconnectDelayMs?: number; 56 /** Maximum reconnect delay in ms. */ 57 maxReconnectDelayMs?: number; 58} 59 60const DEFAULT_RECONNECT_DELAY_MS = 1000; 61const DEFAULT_MAX_RECONNECT_DELAY_MS = 60_000; 62 63/** 64 * FirehoseSubscription connects to an AT Protocol firehose relay and 65 * dispatches commit events for a configured set of DIDs. 66 * 67 * Features: 68 * - Filters events to only process commits for serviced DIDs (Set lookup) 69 * - Cursor-based resumption: saves last processed seq for restart 70 * - Auto-reconnect with exponential backoff 71 * - Graceful shutdown 72 */ 73export class FirehoseSubscription { 74 private ws: WebSocket | null = null; 75 private dids: Set<string> = new Set(); 76 private commitHandlers: CommitHandler[] = []; 77 private accountHandlers: AccountHandler[] = []; 78 private cursor: number | null = null; 79 private running = false; 80 private reconnectTimer: ReturnType<typeof setTimeout> | null = null; 81 private currentReconnectDelay: number; 82 private config: Required<FirehoseSubscriptionConfig>; 83 84 /** Count of events received (all DIDs, before filtering). */ 85 private _eventsReceived = 0; 86 /** Count of events processed (matching serviced DIDs). */ 87 private _eventsProcessed = 0; 88 89 constructor(config: FirehoseSubscriptionConfig) { 90 this.config = { 91 firehoseUrl: config.firehoseUrl, 92 reconnectDelayMs: config.reconnectDelayMs ?? DEFAULT_RECONNECT_DELAY_MS, 93 maxReconnectDelayMs: config.maxReconnectDelayMs ?? DEFAULT_MAX_RECONNECT_DELAY_MS, 94 }; 95 this.currentReconnectDelay = this.config.reconnectDelayMs; 96 } 97 98 /** 99 * Start subscribing to the firehose for the given set of DIDs. 100 * If a cursor is provided, events will be replayed from that point. 101 */ 102 start(dids: Set<string>, cursor?: number | null): void { 103 if (this.running) return; 104 this.running = true; 105 this.dids = dids; 106 if (cursor !== undefined && cursor !== null) { 107 this.cursor = cursor; 108 } 109 this.connect(); 110 } 111 112 /** 113 * Stop the firehose subscription and disconnect cleanly. 114 */ 115 stop(): void { 116 this.running = false; 117 if (this.reconnectTimer) { 118 clearTimeout(this.reconnectTimer); 119 this.reconnectTimer = null; 120 } 121 if (this.ws) { 122 // Close with normal closure code 123 try { 124 this.ws.close(1000, "Shutting down"); 125 } catch { 126 // Ignore errors during close 127 } 128 this.ws = null; 129 } 130 } 131 132 /** 133 * Register a handler for commit events matching serviced DIDs. 134 */ 135 onCommit(handler: CommitHandler): void { 136 this.commitHandlers.push(handler); 137 } 138 139 /** 140 * Register a handler for account status events matching serviced DIDs. 141 */ 142 onAccount(handler: AccountHandler): void { 143 this.accountHandlers.push(handler); 144 } 145 146 /** 147 * Update the set of tracked DIDs without reconnecting. 148 */ 149 updateDids(dids: Set<string>): void { 150 this.dids = dids; 151 } 152 153 /** 154 * Get the last successfully processed cursor (sequence number). 155 */ 156 getCursor(): number | null { 157 return this.cursor; 158 } 159 160 /** 161 * Get subscription statistics. 162 */ 163 getStats(): { 164 connected: boolean; 165 cursor: number | null; 166 eventsReceived: number; 167 eventsProcessed: number; 168 trackedDids: number; 169 } { 170 return { 171 connected: this.ws !== null && this.ws.readyState === WebSocket.OPEN, 172 cursor: this.cursor, 173 eventsReceived: this._eventsReceived, 174 eventsProcessed: this._eventsProcessed, 175 trackedDids: this.dids.size, 176 }; 177 } 178 179 // ============================================ 180 // Internal: Connection management 181 // ============================================ 182 183 private connect(): void { 184 if (!this.running) return; 185 186 const url = new URL(this.config.firehoseUrl); 187 if (this.cursor !== null) { 188 url.searchParams.set("cursor", String(this.cursor)); 189 } 190 191 const ws = new WebSocket(url.toString()); 192 this.ws = ws; 193 194 ws.binaryType = "nodebuffer"; 195 196 ws.on("open", () => { 197 // Reset reconnect delay on successful connection 198 this.currentReconnectDelay = this.config.reconnectDelayMs; 199 }); 200 201 ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => { 202 try { 203 const bytes = toUint8Array(data); 204 this.handleFrame(bytes); 205 } catch (err) { 206 // Log but don't crash on individual frame errors 207 console.error("[firehose-subscription] Frame parse error:", err); 208 } 209 }); 210 211 ws.on("close", (code: number, reason: Buffer) => { 212 this.ws = null; 213 if (this.running) { 214 const reasonStr = reason.toString("utf8"); 215 console.warn( 216 `[firehose-subscription] Disconnected (code=${code}, reason="${reasonStr}"). Reconnecting in ${this.currentReconnectDelay}ms...`, 217 ); 218 this.scheduleReconnect(); 219 } 220 }); 221 222 ws.on("error", (err: Error) => { 223 console.error("[firehose-subscription] WebSocket error:", err.message); 224 // The 'close' event will fire after this, triggering reconnection 225 }); 226 } 227 228 private scheduleReconnect(): void { 229 if (!this.running) return; 230 if (this.reconnectTimer) return; 231 232 this.reconnectTimer = setTimeout(() => { 233 this.reconnectTimer = null; 234 if (this.running) { 235 this.connect(); 236 } 237 }, this.currentReconnectDelay); 238 239 // Exponential backoff 240 this.currentReconnectDelay = Math.min( 241 this.currentReconnectDelay * 2, 242 this.config.maxReconnectDelayMs, 243 ); 244 } 245 246 // ============================================ 247 // Internal: Frame parsing 248 // ============================================ 249 250 /** 251 * Parse a firehose frame (two concatenated CBOR values: header + body). 252 * 253 * The AT Protocol firehose sends binary WebSocket frames containing: 254 * 1. A CBOR-encoded header: { op: number, t: string } 255 * - op=1 means a normal event, t identifies the type (#commit, #identity, etc.) 256 * - op=-1 means an error frame 257 * 2. A CBOR-encoded body (the event payload) 258 */ 259 private handleFrame(bytes: Uint8Array): void { 260 // Decode the header (first CBOR value) 261 const { value: header, bytesConsumed } = decodeCborWithLength(bytes); 262 const headerObj = header as Record<string, unknown>; 263 264 const op = headerObj.op as number; 265 const type = headerObj.t as string | undefined; 266 267 // Error frame 268 if (op === -1) { 269 const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>; 270 console.error( 271 `[firehose-subscription] Error from relay: ${body.error}: ${body.message}`, 272 ); 273 return; 274 } 275 276 // Only process normal events 277 if (op !== 1) return; 278 279 // Process commit and account events 280 if (type !== "#commit" && type !== "#account") return; 281 282 const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>; 283 this._eventsReceived++; 284 285 // Handle account status events 286 if (type === "#account") { 287 const did = body.did as string | undefined; 288 if (!did || !this.dids.has(did)) return; 289 290 const event: FirehoseAccountEvent = { 291 seq: body.seq as number, 292 did, 293 time: (body.time as string) ?? new Date().toISOString(), 294 active: (body.active as boolean) ?? true, 295 status: body.status as string | undefined, 296 }; 297 298 this.cursor = event.seq; 299 this._eventsProcessed++; 300 301 for (const handler of this.accountHandlers) { 302 try { 303 const result = handler(event); 304 if (result && typeof result === "object" && "catch" in result) { 305 (result as Promise<void>).catch((err) => { 306 console.error("[firehose-subscription] Account handler error:", err); 307 }); 308 } 309 } catch (err) { 310 console.error("[firehose-subscription] Account handler error:", err); 311 } 312 } 313 return; 314 } 315 316 // Filter: only process events for our serviced DIDs 317 const repo = body.repo as string | undefined; 318 if (!repo || !this.dids.has(repo)) return; 319 320 const event: FirehoseCommitEvent = { 321 seq: body.seq as number, 322 repo: repo, 323 rev: body.rev as string, 324 since: (body.since as string) ?? null, 325 blocks: body.blocks instanceof Uint8Array 326 ? body.blocks 327 : new Uint8Array(0), 328 ops: Array.isArray(body.ops) 329 ? (body.ops as Array<Record<string, unknown>>).map((op) => ({ 330 action: op.action as "create" | "update" | "delete", 331 path: op.path as string, 332 cid: op.cid ?? null, 333 })) 334 : [], 335 commit: body.commit ?? null, 336 time: (body.time as string) ?? new Date().toISOString(), 337 tooBig: (body.tooBig as boolean) ?? false, 338 rebase: (body.rebase as boolean) ?? false, 339 }; 340 341 // Update cursor to this event's sequence number 342 this.cursor = event.seq; 343 this._eventsProcessed++; 344 345 // Dispatch to handlers 346 for (const handler of this.commitHandlers) { 347 try { 348 const result = handler(event); 349 // If handler returns a promise, catch errors but don't await 350 if (result && typeof result === "object" && "catch" in result) { 351 (result as Promise<void>).catch((err) => { 352 console.error("[firehose-subscription] Commit handler error:", err); 353 }); 354 } 355 } catch (err) { 356 console.error("[firehose-subscription] Commit handler error:", err); 357 } 358 } 359 } 360} 361 362// ============================================ 363// Utility functions 364// ============================================ 365 366/** 367 * Convert WebSocket message data to Uint8Array. 368 */ 369function toUint8Array(data: Buffer | ArrayBuffer | Buffer[]): Uint8Array { 370 if (data instanceof Uint8Array) { 371 return data; 372 } 373 if (data instanceof ArrayBuffer) { 374 return new Uint8Array(data); 375 } 376 if (Array.isArray(data)) { 377 // Buffer[] — concatenate 378 const totalLength = data.reduce((acc, buf) => acc + buf.length, 0); 379 const result = new Uint8Array(totalLength); 380 let offset = 0; 381 for (const buf of data) { 382 result.set(buf, offset); 383 offset += buf.length; 384 } 385 return result; 386 } 387 return new Uint8Array(data as ArrayBuffer); 388} 389 390/** 391 * Decode a single CBOR value from bytes and return the value plus 392 * how many bytes were consumed. 393 * 394 * This is needed because firehose frames contain two concatenated CBOR 395 * values and we need to split them. We use a simple approach: decode the 396 * first value using the full buffer, then figure out where it ended 397 * by re-encoding it (since CBOR is deterministic for our use case). 398 * 399 * For robustness we use a manual CBOR header parser to determine the 400 * length of the first CBOR item (the header is always a small map). 401 */ 402function decodeCborWithLength(bytes: Uint8Array): { value: unknown; bytesConsumed: number } { 403 // The header is always a small CBOR map (2-3 entries). 404 // We parse the CBOR manually to find where the first value ends. 405 const consumed = cborItemLength(bytes, 0); 406 const headerBytes = bytes.subarray(0, consumed); 407 const value = cborDecode(headerBytes); 408 return { value, bytesConsumed: consumed }; 409} 410 411/** 412 * Calculate the byte length of a CBOR item starting at `offset` in `bytes`. 413 * Supports the subset of CBOR types used in firehose headers: 414 * unsigned int, negative int, byte string, text string, array, map. 415 */ 416function cborItemLength(bytes: Uint8Array, offset: number): number { 417 const initial = bytes[offset]!; 418 const majorType = initial >> 5; 419 const additionalInfo = initial & 0x1f; 420 421 // Get the argument value and how many bytes the argument header takes 422 let argValue: number; 423 let headerSize: number; 424 425 if (additionalInfo < 24) { 426 argValue = additionalInfo; 427 headerSize = 1; 428 } else if (additionalInfo === 24) { 429 argValue = bytes[offset + 1]!; 430 headerSize = 2; 431 } else if (additionalInfo === 25) { 432 argValue = (bytes[offset + 1]! << 8) | bytes[offset + 2]!; 433 headerSize = 3; 434 } else if (additionalInfo === 26) { 435 argValue = 436 (bytes[offset + 1]! << 24) | 437 (bytes[offset + 2]! << 16) | 438 (bytes[offset + 3]! << 8) | 439 bytes[offset + 4]!; 440 headerSize = 5; 441 } else { 442 // 27 = 8-byte (not expected for headers), or special values 443 throw new Error(`Unsupported CBOR additional info: ${additionalInfo}`); 444 } 445 446 switch (majorType) { 447 case 0: // unsigned integer 448 case 1: // negative integer 449 return headerSize; 450 case 2: // byte string 451 case 3: // text string 452 return headerSize + argValue; 453 case 4: { // array 454 let pos = offset + headerSize; 455 for (let i = 0; i < argValue; i++) { 456 pos += cborItemLength(bytes, pos); 457 } 458 return pos - offset; 459 } 460 case 5: { // map 461 let pos = offset + headerSize; 462 for (let i = 0; i < argValue; i++) { 463 pos += cborItemLength(bytes, pos); // key 464 pos += cborItemLength(bytes, pos); // value 465 } 466 return pos - offset; 467 } 468 case 6: { // tag 469 return headerSize + cborItemLength(bytes, offset + headerSize); 470 } 471 case 7: // simple/float 472 if (additionalInfo === 20 || additionalInfo === 21) return 1; // false/true 473 if (additionalInfo === 22) return 1; // null 474 if (additionalInfo === 25) return 3; // float16 475 if (additionalInfo === 26) return 5; // float32 476 if (additionalInfo === 27) return 9; // float64 477 return headerSize; 478 default: 479 throw new Error(`Unknown CBOR major type: ${majorType}`); 480 } 481}