See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add FirehoseConsumer service

Subscribes to the Bluesky jetstream app.bsky.feed.like collection with
no DID filter; buffers like creates into like_events_lookup + a +1
delta into like_counts_daily, and resolves unlikes against the lookup
table to emit matching -1 deltas (silent drop on miss). Flush cadence,
cursor semantics, and backoff mirror JetstreamConsumer so operational
behaviour is consistent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+760
+418
app/services/firehose_consumer.ts
··· 1 + import { parseJetstreamEvent } from '#lib/atproto/index' 2 + import type { LikeCountsDailyDeltaRow, LikeEventLookupRow } from '#lib/clickhouse/index' 3 + 4 + // --------------------------------------------------------------------------- 5 + // WebSocket-like interface for dependency injection in tests 6 + // --------------------------------------------------------------------------- 7 + 8 + export interface WebSocketLike { 9 + onmessage?: ((ev: any) => void) | null 10 + onclose?: ((ev: any) => void) | null 11 + onerror?: ((ev: any) => void) | null 12 + onopen?: ((ev?: any) => void) | null 13 + close(): void 14 + } 15 + 16 + // --------------------------------------------------------------------------- 17 + // Deps 18 + // --------------------------------------------------------------------------- 19 + 20 + export interface FirehoseConsumerDeps { 21 + /** Factory that creates a WebSocket connected to the given URL. */ 22 + createWebSocket: (url: string) => WebSocketLike 23 + 24 + /** Reads the last durable cursor from storage. */ 25 + readCursor: () => Promise<bigint | null> 26 + 27 + /** Durably writes the cursor to storage. */ 28 + writeCursor: (cursor: bigint) => Promise<void> 29 + 30 + /** 31 + * Looks up the subject URI for a like URI. Returns null when the like is 32 + * not in the lookup table (e.g. the original like aged out). 33 + */ 34 + lookupSubjectUri: (likeUri: string) => Promise<string | null> 35 + 36 + /** Batched insert for like_events_lookup. */ 37 + insertLookupRows: (rows: LikeEventLookupRow[]) => Promise<void> 38 + 39 + /** Batched insert for like_counts_daily deltas. */ 40 + insertCountsDeltas: (rows: LikeCountsDailyDeltaRow[]) => Promise<void> 41 + 42 + /** Injected clock — used to compute the UTC day column for count deltas. */ 43 + now: () => Date 44 + 45 + /** Override reconnect delay for tests. */ 46 + reconnectDelay?: (ms: number) => Promise<void> 47 + } 48 + 49 + // --------------------------------------------------------------------------- 50 + // Firehose URL 51 + // --------------------------------------------------------------------------- 52 + 53 + /** 54 + * Default jetstream URL — matches the existing jetstream consumer default so 55 + * that both workers point at the same endpoint by default. The firehose 56 + * worker does not filter by DID, which turns the subscription into a whole- 57 + * network like/unlike stream. 58 + */ 59 + const DEFAULT_JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe' 60 + 61 + /** 62 + * Resolves the jetstream URL to subscribe to. 63 + * Preference order: FIREHOSE_JETSTREAM_URL → JETSTREAM_URL → default. 64 + */ 65 + export function resolveFirehoseJetstreamUrl( 66 + env: NodeJS.ProcessEnv = process.env 67 + ): string { 68 + return env.FIREHOSE_JETSTREAM_URL ?? env.JETSTREAM_URL ?? DEFAULT_JETSTREAM_URL 69 + } 70 + 71 + // --------------------------------------------------------------------------- 72 + // Helpers 73 + // --------------------------------------------------------------------------- 74 + 75 + /** 76 + * Returns midnight of the given instant in UTC as a Date. Matches the 77 + * semantics of ClickHouse `today()` so the per-day partition column is 78 + * consistent regardless of the server's local TZ. 79 + */ 80 + function utcDay(now: Date): Date { 81 + return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate())) 82 + } 83 + 84 + // --------------------------------------------------------------------------- 85 + // FirehoseConsumer 86 + // --------------------------------------------------------------------------- 87 + 88 + /** 89 + * Subscribes to the Bluesky jetstream `app.bsky.feed.like` collection without 90 + * a DID filter, and writes per-post like/unlike deltas into ClickHouse. 91 + * 92 + * Structured to mirror JetstreamConsumer: 93 + * - Two buffers (lookup rows, count deltas) drained on a 500ms/1000-row 94 + * cadence. The cursor is only advanced when both buffers flush cleanly. 95 + * - Per-event processing is sequential so unlike-lookup ordering is 96 + * preserved; sub-ms ClickHouse PK lookups keep this cheap at ~5-20/sec. 97 + * - Exponential backoff on WebSocket reconnect, capped at 30s. 98 + */ 99 + export class FirehoseConsumer { 100 + private lookupBuffer: LikeEventLookupRow[] = [] 101 + private countsBuffer: LikeCountsDailyDeltaRow[] = [] 102 + 103 + private shutdownRequested = false 104 + private ws: WebSocketLike | null = null 105 + private bufferFlushTimer: ReturnType<typeof setInterval> | null = null 106 + 107 + private pendingCursor: bigint | null = null 108 + private lastFlushedCursor: bigint | null = null 109 + private reconnectBackoffMs = 1000 110 + private shutdownResolve: (() => void) | null = null 111 + 112 + /** 113 + * Queue of unlike events currently awaiting a ClickHouse subject lookup. 114 + * Sequential processing: the head of the queue is resolved before we 115 + * dequeue the next message. The tail is advanced as each lookup resolves. 116 + */ 117 + private processing: Promise<void> = Promise.resolve() 118 + 119 + constructor(private readonly deps: FirehoseConsumerDeps) {} 120 + 121 + // ------------------------------------------------------------------------- 122 + // Public API 123 + // ------------------------------------------------------------------------- 124 + 125 + async start(): Promise<void> { 126 + this.lastFlushedCursor = await this.deps.readCursor() 127 + 128 + await this.connect() 129 + 130 + this.bufferFlushTimer = setInterval(() => { 131 + void this.flushIfNeeded() 132 + }, 500) 133 + this.bufferFlushTimer.unref() 134 + 135 + await this.waitForShutdown() 136 + } 137 + 138 + async shutdown(): Promise<void> { 139 + this.shutdownRequested = true 140 + 141 + if (this.bufferFlushTimer !== null) { 142 + clearInterval(this.bufferFlushTimer) 143 + this.bufferFlushTimer = null 144 + } 145 + 146 + // Let any in-flight event processing drain before the final flush 147 + await this.processing 148 + 149 + await this.flushBuffers() 150 + 151 + if (this.ws !== null) { 152 + this.ws.close() 153 + this.ws = null 154 + } 155 + 156 + if (this.shutdownResolve !== null) { 157 + this.shutdownResolve() 158 + this.shutdownResolve = null 159 + } 160 + } 161 + 162 + // ------------------------------------------------------------------------- 163 + // Connection 164 + // ------------------------------------------------------------------------- 165 + 166 + private buildUrl(baseUrl: string): string { 167 + const url = new URL(baseUrl) 168 + 169 + // Only care about like create/delete events 170 + url.searchParams.set('wantedCollections', 'app.bsky.feed.like') 171 + 172 + // compress=false — same rationale as JetstreamConsumer: avoids zstd 173 + // dictionary bootstrap for simpler wire handling. 174 + url.searchParams.set('compress', 'false') 175 + 176 + if (this.lastFlushedCursor !== null) { 177 + url.searchParams.set('cursor', this.lastFlushedCursor.toString()) 178 + } 179 + 180 + return url.toString() 181 + } 182 + 183 + private async connect(): Promise<void> { 184 + const url = this.buildUrl(resolveFirehoseJetstreamUrl()) 185 + 186 + const ws = this.deps.createWebSocket(url) 187 + this.ws = ws 188 + 189 + ws.onmessage = (ev) => { 190 + this.handleMessage(ev.data) 191 + } 192 + 193 + ws.onclose = (ev) => { 194 + if (this.shutdownRequested) return 195 + 196 + console.error( 197 + `[FirehoseConsumer] WebSocket closed (code=${ev.code}, reason=${ev.reason}). Reconnecting in ${this.reconnectBackoffMs}ms...` 198 + ) 199 + 200 + const delay = this.reconnectBackoffMs 201 + this.reconnectBackoffMs = Math.min(this.reconnectBackoffMs * 2, 30_000) 202 + 203 + const delayFn = 204 + this.deps.reconnectDelay ?? ((ms: number) => new Promise<void>((r) => setTimeout(r, ms))) 205 + void delayFn(delay).then(() => { 206 + if (!this.shutdownRequested) { 207 + void this.connect() 208 + } 209 + }) 210 + } 211 + 212 + ws.onerror = (err) => { 213 + console.error('[FirehoseConsumer] WebSocket error:', err) 214 + } 215 + 216 + ws.onopen = () => { 217 + this.reconnectBackoffMs = 1000 218 + } 219 + } 220 + 221 + // ------------------------------------------------------------------------- 222 + // Message handling 223 + // ------------------------------------------------------------------------- 224 + 225 + private handleMessage(data: string | Buffer | ArrayBuffer): void { 226 + let text: string 227 + if (typeof data === 'string') { 228 + text = data 229 + } else if (Buffer.isBuffer(data)) { 230 + text = data.toString('utf8') 231 + } else { 232 + text = Buffer.from(data).toString('utf8') 233 + } 234 + 235 + let rawJson: unknown 236 + try { 237 + rawJson = JSON.parse(text) 238 + } catch { 239 + return 240 + } 241 + 242 + const event = parseJetstreamEvent(rawJson) 243 + if (event === null) return 244 + 245 + let timeUs: bigint | null = null 246 + if (typeof rawJson === 'object' && rawJson !== null && 'time_us' in rawJson) { 247 + const raw = (rawJson as Record<string, unknown>)['time_us'] 248 + if (typeof raw === 'number') { 249 + timeUs = BigInt(Math.floor(raw)) 250 + } 251 + } 252 + 253 + if (event.kind === 'like') { 254 + // Compute the UTC day on the Node side (spec requirement: consistent 255 + // regardless of server TZ). 256 + const day = utcDay(this.deps.now()) 257 + this.lookupBuffer.push({ 258 + likeUri: `at://${event.actorDid}/app.bsky.feed.like/${event.rkey}`, 259 + subjectUri: event.postUri, 260 + createdAt: event.createdAt, 261 + }) 262 + this.countsBuffer.push({ 263 + subjectUri: event.postUri, 264 + day, 265 + count: 1, 266 + }) 267 + this.advancePendingCursor(timeUs) 268 + 269 + if (this.lookupBuffer.length >= 1000 || this.countsBuffer.length >= 1000) { 270 + void this.flushBuffers() 271 + } 272 + return 273 + } 274 + 275 + if (event.kind === 'like-delete') { 276 + // Unlike — resolve the subject via ClickHouse. We chain on 277 + // `processing` so that sequential events are resolved in order; 278 + // this keeps subject lookups correct if a create+delete pair arrive 279 + // back-to-back for the same like_uri. 280 + const likeUri = event.likeUri 281 + const capturedTimeUs = timeUs 282 + this.processing = this.processing.then(() => this.handleUnlike(likeUri, capturedTimeUs)) 283 + return 284 + } 285 + 286 + // Ignore all other events (account/identity/post/etc.) 287 + } 288 + 289 + private async handleUnlike(likeUri: string, timeUs: bigint | null): Promise<void> { 290 + try { 291 + const subjectUri = await this.deps.lookupSubjectUri(likeUri) 292 + if (subjectUri === null) { 293 + // Original like is not in our window — drop the unlike and still 294 + // advance the cursor so we don't replay this event forever. 295 + this.advancePendingCursor(timeUs) 296 + return 297 + } 298 + 299 + const day = utcDay(this.deps.now()) 300 + this.countsBuffer.push({ 301 + subjectUri, 302 + day, 303 + count: -1, 304 + }) 305 + this.advancePendingCursor(timeUs) 306 + 307 + if (this.countsBuffer.length >= 1000) { 308 + await this.flushBuffers() 309 + } 310 + } catch (err) { 311 + // Lookup failures are non-fatal — log and skip. Not advancing the 312 + // cursor here means a reconnect could replay; that's the design noise 313 + // the spec acknowledges. 314 + console.error('[FirehoseConsumer] lookupSubjectUri failed for', likeUri, err) 315 + } 316 + } 317 + 318 + private advancePendingCursor(timeUs: bigint | null): void { 319 + if (timeUs !== null) { 320 + if (this.pendingCursor === null || timeUs > this.pendingCursor) { 321 + this.pendingCursor = timeUs 322 + } 323 + } 324 + } 325 + 326 + // ------------------------------------------------------------------------- 327 + // Buffer flush 328 + // ------------------------------------------------------------------------- 329 + 330 + private async flushIfNeeded(): Promise<void> { 331 + if (this.lookupBuffer.length > 0 || this.countsBuffer.length > 0) { 332 + await this.flushBuffers() 333 + } 334 + } 335 + 336 + /** 337 + * Drains both buffers and checkpoints the cursor only if both inserts 338 + * succeed. If either insert fails, the affected rows are re-queued at 339 + * the front of their buffer and the cursor is NOT advanced. 340 + */ 341 + async flushBuffers(): Promise<void> { 342 + // Drain any pending unlike-lookup work so the -1 deltas it produces 343 + // are part of this flush cycle. 344 + await this.processing 345 + 346 + if (this.lookupBuffer.length === 0 && this.countsBuffer.length === 0) return 347 + 348 + const lookupToFlush = this.lookupBuffer.splice(0) 349 + const countsToFlush = this.countsBuffer.splice(0) 350 + const cursorAtFlushTime = this.pendingCursor 351 + this.pendingCursor = null 352 + 353 + let lookupOk = true 354 + let countsOk = true 355 + 356 + if (lookupToFlush.length > 0) { 357 + try { 358 + await this.deps.insertLookupRows(lookupToFlush) 359 + } catch (err) { 360 + console.error( 361 + `[FirehoseConsumer] lookup flush failed (${lookupToFlush.length} rows); will retry:`, 362 + err 363 + ) 364 + this.lookupBuffer.unshift(...lookupToFlush) 365 + lookupOk = false 366 + } 367 + } 368 + 369 + if (countsToFlush.length > 0) { 370 + try { 371 + await this.deps.insertCountsDeltas(countsToFlush) 372 + } catch (err) { 373 + console.error( 374 + `[FirehoseConsumer] counts flush failed (${countsToFlush.length} rows); will retry:`, 375 + err 376 + ) 377 + this.countsBuffer.unshift(...countsToFlush) 378 + countsOk = false 379 + } 380 + } 381 + 382 + if (lookupOk && countsOk) { 383 + if (cursorAtFlushTime !== null) { 384 + this.lastFlushedCursor = cursorAtFlushTime 385 + await this.checkpointCursor(cursorAtFlushTime) 386 + } 387 + } else { 388 + if (cursorAtFlushTime !== null) { 389 + if (this.pendingCursor === null || cursorAtFlushTime > this.pendingCursor) { 390 + this.pendingCursor = cursorAtFlushTime 391 + } 392 + } 393 + } 394 + } 395 + 396 + // ------------------------------------------------------------------------- 397 + // Cursor checkpointing 398 + // ------------------------------------------------------------------------- 399 + 400 + private async checkpointCursor(cursor: bigint): Promise<void> { 401 + try { 402 + await this.deps.writeCursor(cursor) 403 + } catch (err) { 404 + console.error('[FirehoseConsumer] cursor checkpoint failed:', err) 405 + } 406 + } 407 + 408 + // ------------------------------------------------------------------------- 409 + // Shutdown coordination 410 + // ------------------------------------------------------------------------- 411 + 412 + private waitForShutdown(): Promise<void> { 413 + if (this.shutdownRequested) return Promise.resolve() 414 + return new Promise((resolve) => { 415 + this.shutdownResolve = resolve 416 + }) 417 + } 418 + }
+342
tests/unit/services/firehose_consumer.spec.ts
··· 1 + /** 2 + * Unit tests for FirehoseConsumer. 3 + * 4 + * All dependencies (WebSocket, cursor IO, ClickHouse) are injected as fakes 5 + * so these tests run without external services. 6 + */ 7 + import { test } from '@japa/runner' 8 + import { 9 + FirehoseConsumer, 10 + type WebSocketLike, 11 + type FirehoseConsumerDeps, 12 + } from '#services/firehose_consumer' 13 + import type { LikeCountsDailyDeltaRow, LikeEventLookupRow } from '#lib/clickhouse/index' 14 + 15 + // --------------------------------------------------------------------------- 16 + // FakeWebSocket 17 + // --------------------------------------------------------------------------- 18 + 19 + class FakeWebSocket implements WebSocketLike { 20 + onmessage?: ((ev: any) => void) | null 21 + onclose?: ((ev: any) => void) | null 22 + onerror?: ((ev: any) => void) | null 23 + onopen?: (() => void) | null 24 + 25 + closeCallCount = 0 26 + 27 + close() { 28 + this.closeCallCount++ 29 + } 30 + 31 + emit(event: unknown) { 32 + this.onmessage?.({ data: JSON.stringify(event) }) 33 + } 34 + 35 + emitClose(code = 1006) { 36 + this.onclose?.({ code, reason: 'test close' }) 37 + } 38 + 39 + emitOpen() { 40 + this.onopen?.() 41 + } 42 + } 43 + 44 + // --------------------------------------------------------------------------- 45 + // Event fixtures 46 + // --------------------------------------------------------------------------- 47 + 48 + const AUTHOR_DID = 'did:plc:author001' 49 + const ACTOR_DID = 'did:plc:actor001' 50 + 51 + function makeLikeCreate( 52 + actorDid = ACTOR_DID, 53 + authorDid = AUTHOR_DID, 54 + rkey = 'likerkeyaaa', 55 + timeUs = 1725911162329308 56 + ) { 57 + return { 58 + did: actorDid, 59 + time_us: timeUs, 60 + kind: 'commit', 61 + commit: { 62 + operation: 'create', 63 + collection: 'app.bsky.feed.like', 64 + rkey, 65 + record: { 66 + $type: 'app.bsky.feed.like', 67 + createdAt: '2025-01-15T12:00:00Z', 68 + subject: { 69 + cid: 'bafycid', 70 + uri: `at://${authorDid}/app.bsky.feed.post/postrkey001`, 71 + }, 72 + }, 73 + }, 74 + } 75 + } 76 + 77 + function makeLikeDelete(actorDid = ACTOR_DID, rkey = 'likerkeyaaa', timeUs = 1725911162329309) { 78 + return { 79 + did: actorDid, 80 + time_us: timeUs, 81 + kind: 'commit', 82 + commit: { 83 + operation: 'delete', 84 + collection: 'app.bsky.feed.like', 85 + rkey, 86 + }, 87 + } 88 + } 89 + 90 + // --------------------------------------------------------------------------- 91 + // Consumer factory 92 + // --------------------------------------------------------------------------- 93 + 94 + interface Setup { 95 + fakeWs: FakeWebSocket 96 + consumer: FirehoseConsumer 97 + lookupInserts: LikeEventLookupRow[][] 98 + countsInserts: LikeCountsDailyDeltaRow[][] 99 + writeCursorCalls: bigint[] 100 + lookupSubjectCalls: string[] 101 + lookupSubjectMap: Map<string, string | null> 102 + connectedUrls: string[] 103 + } 104 + 105 + function makeConsumer( 106 + options: { 107 + initialCursor?: bigint | null 108 + subjectLookup?: Map<string, string | null> 109 + insertLookupShouldThrow?: boolean 110 + insertCountsShouldThrow?: boolean 111 + instantReconnect?: boolean 112 + now?: Date 113 + } = {} 114 + ): Setup { 115 + const { 116 + initialCursor = null, 117 + subjectLookup = new Map<string, string | null>(), 118 + insertLookupShouldThrow = false, 119 + insertCountsShouldThrow = false, 120 + instantReconnect = false, 121 + now = new Date('2026-04-14T12:00:00Z'), 122 + } = options 123 + 124 + const fakeWs = new FakeWebSocket() 125 + const lookupInserts: LikeEventLookupRow[][] = [] 126 + const countsInserts: LikeCountsDailyDeltaRow[][] = [] 127 + const writeCursorCalls: bigint[] = [] 128 + const lookupSubjectCalls: string[] = [] 129 + const connectedUrls: string[] = [] 130 + 131 + const deps: FirehoseConsumerDeps = { 132 + createWebSocket(url: string) { 133 + connectedUrls.push(url) 134 + return fakeWs 135 + }, 136 + readCursor: async () => initialCursor, 137 + writeCursor: async (cursor: bigint) => { 138 + writeCursorCalls.push(cursor) 139 + }, 140 + lookupSubjectUri: async (likeUri: string) => { 141 + lookupSubjectCalls.push(likeUri) 142 + return subjectLookup.has(likeUri) ? subjectLookup.get(likeUri)! : null 143 + }, 144 + insertLookupRows: async (rows) => { 145 + if (insertLookupShouldThrow) throw new Error('ClickHouse unavailable (lookup)') 146 + lookupInserts.push([...rows]) 147 + }, 148 + insertCountsDeltas: async (rows) => { 149 + if (insertCountsShouldThrow) throw new Error('ClickHouse unavailable (counts)') 150 + countsInserts.push([...rows]) 151 + }, 152 + now: () => now, 153 + ...(instantReconnect ? { reconnectDelay: (_ms: number) => Promise.resolve() } : {}), 154 + } 155 + 156 + const consumer = new FirehoseConsumer(deps) 157 + 158 + return { 159 + fakeWs, 160 + consumer, 161 + lookupInserts, 162 + countsInserts, 163 + writeCursorCalls, 164 + lookupSubjectCalls, 165 + lookupSubjectMap: subjectLookup, 166 + connectedUrls, 167 + } 168 + } 169 + 170 + async function startInBackground(consumer: FirehoseConsumer): Promise<void> { 171 + void consumer.start() 172 + await new Promise((r) => setTimeout(r, 0)) 173 + } 174 + 175 + // --------------------------------------------------------------------------- 176 + // Tests 177 + // --------------------------------------------------------------------------- 178 + 179 + test.group('FirehoseConsumer — like create', () => { 180 + test('creates a lookup row and a +1 delta', async ({ assert }) => { 181 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer() 182 + await startInBackground(consumer) 183 + 184 + fakeWs.emit(makeLikeCreate()) 185 + await consumer.flushBuffers() 186 + 187 + assert.equal(lookupInserts.length, 1) 188 + assert.equal(lookupInserts[0].length, 1) 189 + assert.equal( 190 + lookupInserts[0][0].likeUri, 191 + `at://${ACTOR_DID}/app.bsky.feed.like/likerkeyaaa` 192 + ) 193 + assert.equal( 194 + lookupInserts[0][0].subjectUri, 195 + `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001` 196 + ) 197 + 198 + assert.equal(countsInserts.length, 1) 199 + assert.equal(countsInserts[0].length, 1) 200 + assert.equal( 201 + countsInserts[0][0].subjectUri, 202 + `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001` 203 + ) 204 + assert.equal(countsInserts[0][0].count, 1) 205 + 206 + await consumer.shutdown() 207 + }) 208 + }) 209 + 210 + test.group('FirehoseConsumer — like delete', () => { 211 + test('with matching lookup produces a -1 delta', async ({ assert }) => { 212 + const likeUri = `at://${ACTOR_DID}/app.bsky.feed.like/likerkeyaaa` 213 + const subjectUri = `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001` 214 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer({ 215 + subjectLookup: new Map([[likeUri, subjectUri]]), 216 + }) 217 + await startInBackground(consumer) 218 + 219 + fakeWs.emit(makeLikeDelete()) 220 + await consumer.flushBuffers() 221 + 222 + assert.equal(lookupInserts.length, 0, 'no new lookup row on delete') 223 + assert.equal(countsInserts.length, 1) 224 + assert.equal(countsInserts[0].length, 1) 225 + assert.equal(countsInserts[0][0].subjectUri, subjectUri) 226 + assert.equal(countsInserts[0][0].count, -1) 227 + 228 + await consumer.shutdown() 229 + }) 230 + 231 + test('with no matching lookup is dropped silently', async ({ assert }) => { 232 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer({ 233 + subjectLookup: new Map(), 234 + }) 235 + await startInBackground(consumer) 236 + 237 + fakeWs.emit(makeLikeDelete()) 238 + await consumer.flushBuffers() 239 + 240 + assert.equal(lookupInserts.length, 0) 241 + assert.equal(countsInserts.length, 0) 242 + 243 + await consumer.shutdown() 244 + }) 245 + }) 246 + 247 + test.group('FirehoseConsumer — buffering', () => { 248 + test('flushes when buffer reaches 1000 rows', async ({ assert }) => { 249 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer() 250 + await startInBackground(consumer) 251 + 252 + for (let i = 0; i < 1000; i++) { 253 + fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, `rkey${i}`, 1725911162329308 + i)) 254 + } 255 + 256 + // Let microtasks settle for the automatic flush 257 + await new Promise((r) => setTimeout(r, 10)) 258 + 259 + assert.isAbove(lookupInserts.length, 0, 'lookup auto-flushed at size threshold') 260 + assert.isAbove(countsInserts.length, 0, 'counts auto-flushed at size threshold') 261 + 262 + await consumer.shutdown() 263 + }) 264 + }) 265 + 266 + test.group('FirehoseConsumer — cursor', () => { 267 + test('advances cursor only after successful flush', async ({ assert }) => { 268 + const { fakeWs, consumer, writeCursorCalls } = makeConsumer() 269 + await startInBackground(consumer) 270 + 271 + fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, 'r1', 1725911162329308)) 272 + await consumer.flushBuffers() 273 + 274 + assert.equal(writeCursorCalls.length, 1) 275 + assert.equal(writeCursorCalls[0], 1725911162329308n) 276 + 277 + await consumer.shutdown() 278 + }) 279 + 280 + test('cursor is NOT advanced if the lookup flush fails', async ({ assert }) => { 281 + const { fakeWs, consumer, writeCursorCalls } = makeConsumer({ 282 + insertLookupShouldThrow: true, 283 + }) 284 + await startInBackground(consumer) 285 + 286 + fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, 'r1', 1725911162329308)) 287 + await consumer.flushBuffers() 288 + 289 + assert.equal(writeCursorCalls.length, 0) 290 + 291 + await consumer.shutdown() 292 + }) 293 + 294 + test('failed flush re-queues rows at the front of the buffer', async ({ assert }) => { 295 + const { fakeWs, consumer, lookupInserts } = makeConsumer({ 296 + insertLookupShouldThrow: true, 297 + }) 298 + await startInBackground(consumer) 299 + 300 + fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, 'r1', 1725911162329308)) 301 + await consumer.flushBuffers() 302 + 303 + assert.equal(lookupInserts.length, 0, 'first attempt failed so no successful insert') 304 + 305 + // Now flip the flag so the retry succeeds 306 + ;(consumer as any).deps.insertLookupRows = async (rows: LikeEventLookupRow[]) => { 307 + lookupInserts.push([...rows]) 308 + } 309 + 310 + await consumer.flushBuffers() 311 + assert.equal(lookupInserts.length, 1, 'retry picked up the re-queued rows') 312 + assert.equal(lookupInserts[0].length, 1) 313 + 314 + await consumer.shutdown() 315 + }) 316 + }) 317 + 318 + test.group('FirehoseConsumer — URL construction', () => { 319 + test('subscribes only to app.bsky.feed.like and omits wantedDids', async ({ assert }) => { 320 + const { consumer, connectedUrls } = makeConsumer() 321 + await startInBackground(consumer) 322 + 323 + assert.equal(connectedUrls.length, 1) 324 + const url = new URL(connectedUrls[0]) 325 + const collections = url.searchParams.getAll('wantedCollections') 326 + assert.deepEqual(collections, ['app.bsky.feed.like']) 327 + assert.isNull(url.searchParams.get('wantedDids')) 328 + 329 + await consumer.shutdown() 330 + }) 331 + 332 + test('includes cursor query param when readCursor returns a value', async ({ assert }) => { 333 + const cursor = 1725911162329308n 334 + const { consumer, connectedUrls } = makeConsumer({ initialCursor: cursor }) 335 + await startInBackground(consumer) 336 + 337 + const url = new URL(connectedUrls[0]) 338 + assert.equal(url.searchParams.get('cursor'), cursor.toString()) 339 + 340 + await consumer.shutdown() 341 + }) 342 + })