See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge firehose consumer into jetstream consumer

One WebSocket subscribing to the superset of collections, one shared
cursor, and one process. JetstreamConsumer now runs both the tracked-DID
engagement pipeline and the unfiltered virality pipeline (lookup rows +
per-day count deltas). Drops the firehose_cursor table and the
firehose-worker container.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+473 -992
-424
app/services/firehose_consumer.ts
··· 1 - import { parseJetstreamEvent } from '#lib/atproto/index' 2 - import type { LikeCountsDailyDeltaRow, LikeEventLookupRow } from '#lib/clickhouse/index' 3 - 4 - // --------------------------------------------------------------------------- 5 - // WebSocket-like interface for dependency injection in tests 6 - // --------------------------------------------------------------------------- 7 - 8 - export interface WebSocketLike { 9 - onmessage?: ((ev: any) => void) | null 10 - onclose?: ((ev: any) => void) | null 11 - onerror?: ((ev: any) => void) | null 12 - onopen?: ((ev?: any) => void) | null 13 - close(): void 14 - } 15 - 16 - // --------------------------------------------------------------------------- 17 - // Deps 18 - // --------------------------------------------------------------------------- 19 - 20 - export interface FirehoseConsumerDeps { 21 - /** Factory that creates a WebSocket connected to the given URL. */ 22 - createWebSocket: (url: string) => WebSocketLike 23 - 24 - /** Reads the last durable cursor from storage. */ 25 - readCursor: () => Promise<bigint | null> 26 - 27 - /** Durably writes the cursor to storage. */ 28 - writeCursor: (cursor: bigint) => Promise<void> 29 - 30 - /** 31 - * Looks up the subject URI for a like URI. Returns null when the like is 32 - * not in the lookup table (e.g. the original like aged out). 33 - */ 34 - lookupSubjectUri: (likeUri: string) => Promise<string | null> 35 - 36 - /** Batched insert for like_events_lookup. */ 37 - insertLookupRows: (rows: LikeEventLookupRow[]) => Promise<void> 38 - 39 - /** Batched insert for like_counts_daily deltas. */ 40 - insertCountsDeltas: (rows: LikeCountsDailyDeltaRow[]) => Promise<void> 41 - 42 - /** Injected clock — used to compute the UTC day column for count deltas. */ 43 - now: () => Date 44 - 45 - /** Override reconnect delay for tests. */ 46 - reconnectDelay?: (ms: number) => Promise<void> 47 - } 48 - 49 - // --------------------------------------------------------------------------- 50 - // Firehose URL 51 - // --------------------------------------------------------------------------- 52 - 53 - /** 54 - * Default jetstream URL — matches the existing jetstream consumer default so 55 - * that both workers point at the same endpoint by default. The firehose 56 - * worker does not filter by DID, which turns the subscription into a whole- 57 - * network like/unlike stream. 58 - */ 59 - const DEFAULT_JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe' 60 - 61 - /** 62 - * Resolves the jetstream URL to subscribe to. 63 - */ 64 - export function resolveFirehoseJetstreamUrl(env: NodeJS.ProcessEnv = process.env): string { 65 - return env.JETSTREAM_URL ?? DEFAULT_JETSTREAM_URL 66 - } 67 - 68 - // --------------------------------------------------------------------------- 69 - // Helpers 70 - // --------------------------------------------------------------------------- 71 - 72 - /** 73 - * Returns midnight of the given instant in UTC as a Date. Matches the 74 - * semantics of ClickHouse `today()` so the per-day partition column is 75 - * consistent regardless of the server's local TZ. 76 - */ 77 - function utcDay(now: Date): Date { 78 - return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate())) 79 - } 80 - 81 - // --------------------------------------------------------------------------- 82 - // FirehoseConsumer 83 - // --------------------------------------------------------------------------- 84 - 85 - /** 86 - * Subscribes to the Bluesky jetstream `app.bsky.feed.like` collection without 87 - * a DID filter, and writes per-post like/unlike deltas into ClickHouse. 88 - * 89 - * Structured to mirror JetstreamConsumer: 90 - * - Two buffers (lookup rows, count deltas) drained on a 500ms/1000-row 91 - * cadence. The cursor is only advanced when both buffers flush cleanly. 92 - * - Per-event processing is sequential so unlike-lookup ordering is 93 - * preserved; sub-ms ClickHouse PK lookups keep this cheap at ~5-20/sec. 94 - * - Exponential backoff on WebSocket reconnect, capped at 30s. 95 - */ 96 - export class FirehoseConsumer { 97 - private lookupBuffer: LikeEventLookupRow[] = [] 98 - private countsBuffer: LikeCountsDailyDeltaRow[] = [] 99 - 100 - private shutdownRequested = false 101 - private ws: WebSocketLike | null = null 102 - private bufferFlushTimer: ReturnType<typeof setInterval> | null = null 103 - 104 - private pendingCursor: bigint | null = null 105 - private lastFlushedCursor: bigint | null = null 106 - private reconnectBackoffMs = 1000 107 - private shutdownResolve: (() => void) | null = null 108 - 109 - /** 110 - * Queue of unlike events currently awaiting a ClickHouse subject lookup. 111 - * Sequential processing: the head of the queue is resolved before we 112 - * dequeue the next message. The tail is advanced as each lookup resolves. 113 - */ 114 - private processing: Promise<void> = Promise.resolve() 115 - 116 - constructor(private readonly deps: FirehoseConsumerDeps) {} 117 - 118 - // ------------------------------------------------------------------------- 119 - // Public API 120 - // ------------------------------------------------------------------------- 121 - 122 - async start(): Promise<void> { 123 - this.lastFlushedCursor = await this.deps.readCursor() 124 - 125 - await this.connect() 126 - 127 - this.bufferFlushTimer = setInterval(() => { 128 - void this.flushIfNeeded() 129 - }, 500) 130 - this.bufferFlushTimer.unref() 131 - 132 - await this.waitForShutdown() 133 - } 134 - 135 - async shutdown(): Promise<void> { 136 - this.shutdownRequested = true 137 - 138 - if (this.bufferFlushTimer !== null) { 139 - clearInterval(this.bufferFlushTimer) 140 - this.bufferFlushTimer = null 141 - } 142 - 143 - // Let any in-flight event processing drain before the final flush 144 - await this.processing 145 - 146 - await this.flushBuffers() 147 - 148 - if (this.ws !== null) { 149 - this.ws.close() 150 - this.ws = null 151 - } 152 - 153 - if (this.shutdownResolve !== null) { 154 - this.shutdownResolve() 155 - this.shutdownResolve = null 156 - } 157 - } 158 - 159 - // ------------------------------------------------------------------------- 160 - // Connection 161 - // ------------------------------------------------------------------------- 162 - 163 - private buildUrl(baseUrl: string): string { 164 - const url = new URL(baseUrl) 165 - 166 - // Only care about like create/delete events 167 - url.searchParams.set('wantedCollections', 'app.bsky.feed.like') 168 - 169 - // compress=false — same rationale as JetstreamConsumer: avoids zstd 170 - // dictionary bootstrap for simpler wire handling. 171 - url.searchParams.set('compress', 'false') 172 - 173 - if (this.lastFlushedCursor !== null) { 174 - url.searchParams.set('cursor', this.lastFlushedCursor.toString()) 175 - } 176 - 177 - return url.toString() 178 - } 179 - 180 - private async connect(): Promise<void> { 181 - const url = this.buildUrl(resolveFirehoseJetstreamUrl()) 182 - 183 - const ws = this.deps.createWebSocket(url) 184 - this.ws = ws 185 - 186 - ws.onmessage = (ev) => { 187 - this.handleMessage(ev.data) 188 - } 189 - 190 - ws.onclose = (ev) => { 191 - if (this.shutdownRequested) return 192 - 193 - console.error( 194 - `[FirehoseConsumer] WebSocket closed (code=${ev.code}, reason=${ev.reason}). Reconnecting in ${this.reconnectBackoffMs}ms...` 195 - ) 196 - 197 - const delay = this.reconnectBackoffMs 198 - this.reconnectBackoffMs = Math.min(this.reconnectBackoffMs * 2, 30_000) 199 - 200 - const delayFn = 201 - this.deps.reconnectDelay ?? ((ms: number) => new Promise<void>((r) => setTimeout(r, ms))) 202 - void delayFn(delay).then(() => { 203 - if (!this.shutdownRequested) { 204 - void this.connect() 205 - } 206 - }) 207 - } 208 - 209 - ws.onerror = (err) => { 210 - console.error('[FirehoseConsumer] WebSocket error:', err) 211 - } 212 - 213 - ws.onopen = () => { 214 - this.reconnectBackoffMs = 1000 215 - } 216 - } 217 - 218 - // ------------------------------------------------------------------------- 219 - // Message handling 220 - // ------------------------------------------------------------------------- 221 - 222 - private handleMessage(data: string | Buffer | ArrayBuffer): void { 223 - let text: string 224 - if (typeof data === 'string') { 225 - text = data 226 - } else if (Buffer.isBuffer(data)) { 227 - text = data.toString('utf8') 228 - } else { 229 - text = Buffer.from(data).toString('utf8') 230 - } 231 - 232 - let rawJson: unknown 233 - try { 234 - rawJson = JSON.parse(text) 235 - } catch { 236 - return 237 - } 238 - 239 - const event = parseJetstreamEvent(rawJson) 240 - if (event === null) return 241 - 242 - let timeUs: bigint | null = null 243 - if (typeof rawJson === 'object' && rawJson !== null && 'time_us' in rawJson) { 244 - const raw = (rawJson as Record<string, unknown>)['time_us'] 245 - if (typeof raw === 'number') { 246 - timeUs = BigInt(Math.floor(raw)) 247 - } 248 - } 249 - 250 - if (event.kind === 'like') { 251 - // Compute the UTC day on the Node side (spec requirement: consistent 252 - // regardless of server TZ). 253 - const day = utcDay(this.deps.now()) 254 - this.lookupBuffer.push({ 255 - likeUri: `at://${event.actorDid}/app.bsky.feed.like/${event.rkey}`, 256 - subjectUri: event.postUri, 257 - createdAt: event.createdAt, 258 - }) 259 - this.countsBuffer.push({ 260 - subjectUri: event.postUri, 261 - day, 262 - count: 1, 263 - }) 264 - this.advancePendingCursor(timeUs) 265 - 266 - if (this.lookupBuffer.length >= 1000 || this.countsBuffer.length >= 1000) { 267 - void this.flushBuffers() 268 - } 269 - return 270 - } 271 - 272 - if (event.kind === 'like-delete') { 273 - // Unlike — resolve the subject via ClickHouse. We chain on 274 - // `processing` so that sequential events are resolved in order; 275 - // this keeps subject lookups correct if a create+delete pair arrive 276 - // back-to-back for the same like_uri. 277 - const likeUri = event.likeUri 278 - const capturedTimeUs = timeUs 279 - this.processing = this.processing 280 - .then(() => this.handleUnlike(likeUri, capturedTimeUs)) 281 - .catch((err) => { 282 - // Don't let a single handleUnlike failure poison the chain — 283 - // without this, every subsequent `.then()` would short-circuit 284 - // and all future unlikes would silently fail. 285 - console.error('[FirehoseConsumer] handleUnlike chain error:', err) 286 - }) 287 - return 288 - } 289 - 290 - // Ignore all other events (account/identity/post/etc.) 291 - } 292 - 293 - private async handleUnlike(likeUri: string, timeUs: bigint | null): Promise<void> { 294 - try { 295 - const subjectUri = await this.deps.lookupSubjectUri(likeUri) 296 - if (subjectUri === null) { 297 - // Original like is not in our window — drop the unlike and still 298 - // advance the cursor so we don't replay this event forever. 299 - this.advancePendingCursor(timeUs) 300 - return 301 - } 302 - 303 - const day = utcDay(this.deps.now()) 304 - this.countsBuffer.push({ 305 - subjectUri, 306 - day, 307 - count: -1, 308 - }) 309 - this.advancePendingCursor(timeUs) 310 - 311 - if (this.countsBuffer.length >= 1000) { 312 - await this.flushBuffers() 313 - } 314 - } catch (err) { 315 - // Lookup failure forces a reconnect so we replay from the last 316 - // durable checkpoint. We do NOT advance the cursor here; a later 317 - // like-create in this session would otherwise push `pendingCursor` 318 - // past this event's time_us and silently drop the unlike. 319 - console.error('[FirehoseConsumer] lookupSubjectUri failed for', likeUri, err) 320 - this.ws?.close() 321 - } 322 - } 323 - 324 - private advancePendingCursor(timeUs: bigint | null): void { 325 - if (timeUs !== null) { 326 - if (this.pendingCursor === null || timeUs > this.pendingCursor) { 327 - this.pendingCursor = timeUs 328 - } 329 - } 330 - } 331 - 332 - // ------------------------------------------------------------------------- 333 - // Buffer flush 334 - // ------------------------------------------------------------------------- 335 - 336 - private async flushIfNeeded(): Promise<void> { 337 - if (this.lookupBuffer.length > 0 || this.countsBuffer.length > 0) { 338 - await this.flushBuffers() 339 - } 340 - } 341 - 342 - /** 343 - * Drains both buffers and checkpoints the cursor only if both inserts 344 - * succeed. If either insert fails, the affected rows are re-queued at 345 - * the front of their buffer and the cursor is NOT advanced. 346 - */ 347 - async flushBuffers(): Promise<void> { 348 - // Drain any pending unlike-lookup work so the -1 deltas it produces 349 - // are part of this flush cycle. 350 - await this.processing 351 - 352 - if (this.lookupBuffer.length === 0 && this.countsBuffer.length === 0) return 353 - 354 - const lookupToFlush = this.lookupBuffer.splice(0) 355 - const countsToFlush = this.countsBuffer.splice(0) 356 - const cursorAtFlushTime = this.pendingCursor 357 - this.pendingCursor = null 358 - 359 - let lookupOk = true 360 - let countsOk = true 361 - 362 - if (lookupToFlush.length > 0) { 363 - try { 364 - await this.deps.insertLookupRows(lookupToFlush) 365 - } catch (err) { 366 - console.error( 367 - `[FirehoseConsumer] lookup flush failed (${lookupToFlush.length} rows); will retry:`, 368 - err 369 - ) 370 - this.lookupBuffer.unshift(...lookupToFlush) 371 - lookupOk = false 372 - } 373 - } 374 - 375 - if (countsToFlush.length > 0) { 376 - try { 377 - await this.deps.insertCountsDeltas(countsToFlush) 378 - } catch (err) { 379 - console.error( 380 - `[FirehoseConsumer] counts flush failed (${countsToFlush.length} rows); will retry:`, 381 - err 382 - ) 383 - this.countsBuffer.unshift(...countsToFlush) 384 - countsOk = false 385 - } 386 - } 387 - 388 - if (lookupOk && countsOk) { 389 - if (cursorAtFlushTime !== null) { 390 - this.lastFlushedCursor = cursorAtFlushTime 391 - await this.checkpointCursor(cursorAtFlushTime) 392 - } 393 - } else { 394 - if (cursorAtFlushTime !== null) { 395 - if (this.pendingCursor === null || cursorAtFlushTime > this.pendingCursor) { 396 - this.pendingCursor = cursorAtFlushTime 397 - } 398 - } 399 - } 400 - } 401 - 402 - // ------------------------------------------------------------------------- 403 - // Cursor checkpointing 404 - // ------------------------------------------------------------------------- 405 - 406 - private async checkpointCursor(cursor: bigint): Promise<void> { 407 - try { 408 - await this.deps.writeCursor(cursor) 409 - } catch (err) { 410 - console.error('[FirehoseConsumer] cursor checkpoint failed:', err) 411 - } 412 - } 413 - 414 - // ------------------------------------------------------------------------- 415 - // Shutdown coordination 416 - // ------------------------------------------------------------------------- 417 - 418 - private waitForShutdown(): Promise<void> { 419 - if (this.shutdownRequested) return Promise.resolve() 420 - return new Promise((resolve) => { 421 - this.shutdownResolve = resolve 422 - }) 423 - } 424 - }
-38
app/services/firehose_cursor_io.ts
··· 1 - import db from '@adonisjs/lucid/services/db' 2 - 3 - /** 4 - * Reads the last durable firehose cursor from SQLite. 5 - * 6 - * Uses `options({ safeIntegers: true })` on the underlying knex raw query so 7 - * that better-sqlite3 returns the INTEGER column as a native BigInt — avoiding 8 - * the precision loss that occurs when a 64-bit microsecond timestamp is 9 - * silently cast to a JS `number` (which only has 53 bits of mantissa). 10 - * 11 - * Returns null if no cursor row exists yet. 12 - * 13 - * The firehose worker uses its own independent cursor row (separate table 14 - * from jetstream_cursor) so that the two workers can checkpoint independently. 15 - */ 16 - export async function readCursor(): Promise<bigint | null> { 17 - const rows = (await db 18 - .connection() 19 - .knexRawQuery('SELECT cursor_us FROM firehose_cursor WHERE id = 1 LIMIT 1') 20 - .options({ safeIntegers: true })) as Array<{ cursor_us: bigint }> 21 - 22 - if (!rows || rows.length === 0) return null 23 - return rows[0].cursor_us 24 - } 25 - 26 - /** 27 - * Durably writes the firehose cursor to SQLite. 28 - * 29 - * Passes `cursor` as a BigInt binding directly — better-sqlite3 accepts BigInt 30 - * bind parameters natively, which avoids precision loss for values above 2^53. 31 - */ 32 - export async function writeCursor(cursor: bigint): Promise<void> { 33 - const nowMs = Date.now() 34 - await db.rawQuery( 35 - 'INSERT OR REPLACE INTO firehose_cursor (id, cursor_us, updated_at) VALUES (1, ?, ?)', 36 - [cursor, nowMs] 37 - ) 38 - }
+159 -26
app/services/jetstream_consumer.ts
··· 1 - import type { EngagementEventRow } from '#lib/clickhouse/index' 2 - import type { ClickHouseStore } from '#lib/clickhouse/index' 3 - import type { PostSnapshot } from '#lib/clickhouse/index' 1 + import type { 2 + ClickHouseStore, 3 + EngagementEventRow, 4 + LikeCountsDailyDeltaRow, 5 + LikeEventLookupRow, 6 + PostSnapshot, 7 + } from '#lib/clickhouse/index' 4 8 import { parseJetstreamEvent, parseAtUri, parsePostEmbed, parseFacets } from '#lib/atproto/index' 5 9 import type { PostEvent } from '#lib/atproto/index' 6 10 ··· 82 86 * In tests: records the call. 83 87 */ 84 88 updateUserHandle: (did: string, handle: string) => Promise<void> 89 + 90 + /** 91 + * Virality pipeline: looks up the subject URI for a like URI on unlike. 92 + * Returns null when the original like is not in the lookup table. 93 + */ 94 + lookupSubjectUri: (likeUri: string) => Promise<string | null> 95 + 96 + /** Virality pipeline: batched insert for like_events_lookup. */ 97 + insertLookupRows: (rows: LikeEventLookupRow[]) => Promise<void> 98 + 99 + /** Virality pipeline: batched insert for like_counts_daily deltas. */ 100 + insertCountsDeltas: (rows: LikeCountsDailyDeltaRow[]) => Promise<void> 101 + } 102 + 103 + /** 104 + * Returns midnight of the given instant in UTC. Matches ClickHouse `today()` 105 + * semantics so the per-day partition column is TZ-independent. 106 + */ 107 + function utcDay(now: Date): Date { 108 + return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate())) 85 109 } 86 110 87 111 // --------------------------------------------------------------------------- ··· 109 133 private trackedDids: Map<string, string> = new Map() 110 134 private eventBuffer: EngagementEventRow[] = [] 111 135 private snapshotBuffer: PostSnapshot[] = [] 136 + private lookupBuffer: LikeEventLookupRow[] = [] 137 + private countsBuffer: LikeCountsDailyDeltaRow[] = [] 112 138 private shutdownRequested = false 113 139 private ws: WebSocketLike | null = null 114 140 private didRefreshTimer: ReturnType<typeof setInterval> | null = null 115 141 private bufferFlushTimer: ReturnType<typeof setInterval> | null = null 116 142 117 143 /** 144 + * Serialized queue for async unlike-lookup work. Each new unlike chains on 145 + * the previous so create+delete pairs for the same like_uri resolve in 146 + * arrival order. 147 + */ 148 + private processing: Promise<void> = Promise.resolve() 149 + 150 + /** 118 151 * The largest time_us (as BigInt) across all events buffered in the current 119 152 * flush cycle. Updated whenever we push an event into either buffer. 120 153 */ ··· 190 223 clearInterval(this.bufferFlushTimer) 191 224 this.bufferFlushTimer = null 192 225 } 226 + 227 + // Let any in-flight unlike-lookup work drain before the final flush. 228 + await this.processing 193 229 194 230 // One final flush before closing 195 231 await this.flushBuffer() ··· 306 342 } 307 343 308 344 if (event.kind === 'like' || event.kind === 'repost') { 309 - if (!this.trackedDids.has(event.postAuthorDid)) return 345 + // Engagement pipeline — filtered by tracked DIDs. 346 + if (this.trackedDids.has(event.postAuthorDid)) { 347 + this.eventBuffer.push({ 348 + postUri: event.postUri, 349 + postAuthorDid: event.postAuthorDid, 350 + actorDid: event.actorDid, 351 + rkey: event.rkey, 352 + kind: event.kind, 353 + eventCreatedAt: event.createdAt, 354 + }) 355 + } 310 356 311 - this.eventBuffer.push({ 312 - postUri: event.postUri, 313 - postAuthorDid: event.postAuthorDid, 314 - actorDid: event.actorDid, 315 - rkey: event.rkey, 316 - kind: event.kind, 317 - eventCreatedAt: event.createdAt, 318 - }) 357 + // Virality pipeline — every like on the network, regardless of author. 358 + if (event.kind === 'like') { 359 + const day = utcDay(this.deps.now()) 360 + this.lookupBuffer.push({ 361 + likeUri: `at://${event.actorDid}/app.bsky.feed.like/${event.rkey}`, 362 + subjectUri: event.postUri, 363 + createdAt: event.createdAt, 364 + }) 365 + this.countsBuffer.push({ 366 + subjectUri: event.postUri, 367 + day, 368 + count: 1, 369 + }) 370 + } 319 371 320 372 this.advancePendingCursor(timeUs) 321 373 322 - // Flush immediately if the buffer hits 1000 rows 323 - if (this.eventBuffer.length >= 1000) { 374 + if ( 375 + this.eventBuffer.length >= 1000 || 376 + this.lookupBuffer.length >= 1000 || 377 + this.countsBuffer.length >= 1000 378 + ) { 324 379 void this.flushBuffer() 325 380 } 326 381 return 327 382 } 328 383 384 + if (event.kind === 'like-delete') { 385 + // Virality pipeline only: resolve the subject via ClickHouse and emit a 386 + // -1 delta. Chained on `processing` so unlikes resolve in arrival order. 387 + const likeUri = event.likeUri 388 + const capturedTimeUs = timeUs 389 + this.processing = this.processing 390 + .then(() => this.handleUnlike(likeUri, capturedTimeUs)) 391 + .catch((err) => { 392 + console.error('[JetstreamConsumer] handleUnlike chain error:', err) 393 + }) 394 + return 395 + } 396 + 329 397 if (event.kind === 'post') { 330 398 this.handlePostEvent(event, timeUs) 331 399 return ··· 443 511 } 444 512 } 445 513 514 + private async handleUnlike(likeUri: string, timeUs: bigint | null): Promise<void> { 515 + try { 516 + const subjectUri = await this.deps.lookupSubjectUri(likeUri) 517 + if (subjectUri === null) { 518 + // Original like aged out — drop it but still advance the cursor so we 519 + // don't replay this event forever. 520 + this.advancePendingCursor(timeUs) 521 + return 522 + } 523 + 524 + const day = utcDay(this.deps.now()) 525 + this.countsBuffer.push({ subjectUri, day, count: -1 }) 526 + this.advancePendingCursor(timeUs) 527 + 528 + if (this.countsBuffer.length >= 1000) { 529 + await this.flushBuffer() 530 + } 531 + } catch (err) { 532 + // Lookup failure forces a reconnect so we replay from the last durable 533 + // checkpoint. We do NOT advance the cursor — a later like-create would 534 + // otherwise push pendingCursor past this event's time_us and silently 535 + // drop the unlike. 536 + console.error('[JetstreamConsumer] lookupSubjectUri failed for', likeUri, err) 537 + this.ws?.close() 538 + } 539 + } 540 + 446 541 /** Advances pendingCursor if timeUs is larger than the current value. */ 447 542 private advancePendingCursor(timeUs: bigint | null): void { 448 543 if (timeUs !== null) { ··· 458 553 459 554 /** Called by the interval timer — only flushes if there's anything to flush. */ 460 555 private async flushIfNeeded(): Promise<void> { 461 - if (this.eventBuffer.length > 0 || this.snapshotBuffer.length > 0) { 556 + if ( 557 + this.eventBuffer.length > 0 || 558 + this.snapshotBuffer.length > 0 || 559 + this.lookupBuffer.length > 0 || 560 + this.countsBuffer.length > 0 561 + ) { 462 562 await this.flushBuffer() 463 563 } 464 564 } 465 565 466 566 /** 467 - * Drains both the engagement-event buffer and the post-snapshot buffer, 468 - * then checkpoints the cursor once both inserts succeed. 567 + * Drains all four buffers (engagement events, post snapshots, virality 568 + * lookup rows, virality count deltas) and checkpoints the cursor only if 569 + * every insert succeeds. 469 570 * 470 - * Failure strategy: if either insert fails, the affected batch is 471 - * re-queued at the front of its buffer and the cursor is NOT advanced. 472 - * The caller will retry on the next flush interval. 571 + * Failure strategy: if any insert fails, the affected batch is re-queued at 572 + * the front of its buffer and the cursor is NOT advanced. The caller will 573 + * retry on the next flush interval. 473 574 */ 474 575 async flushBuffer(): Promise<void> { 475 - if (this.eventBuffer.length === 0 && this.snapshotBuffer.length === 0) return 576 + if ( 577 + this.eventBuffer.length === 0 && 578 + this.snapshotBuffer.length === 0 && 579 + this.lookupBuffer.length === 0 && 580 + this.countsBuffer.length === 0 581 + ) 582 + return 476 583 477 584 const eventsToFlush = this.eventBuffer.splice(0) 478 585 const snapshotsToFlush = this.snapshotBuffer.splice(0) 586 + const lookupToFlush = this.lookupBuffer.splice(0) 587 + const countsToFlush = this.countsBuffer.splice(0) 479 588 const cursorAtFlushTime = this.pendingCursor 480 589 this.pendingCursor = null 481 590 482 591 let eventsOk = true 483 592 let snapshotsOk = true 593 + let lookupOk = true 594 + let countsOk = true 484 595 485 - // Flush engagement events 486 596 if (eventsToFlush.length > 0) { 487 597 try { 488 598 await this.clickHouseStore.insertEngagementEvents(eventsToFlush) ··· 496 606 } 497 607 } 498 608 499 - // Flush post snapshots 500 609 if (snapshotsToFlush.length > 0) { 501 610 try { 502 611 await this.clickHouseStore.insertPostSnapshots(snapshotsToFlush) ··· 510 619 } 511 620 } 512 621 513 - // Only checkpoint if both flushes succeeded 514 - if (eventsOk && snapshotsOk) { 622 + if (lookupToFlush.length > 0) { 623 + try { 624 + await this.deps.insertLookupRows(lookupToFlush) 625 + } catch (err) { 626 + console.error( 627 + `[JetstreamConsumer] virality lookup flush failed (${lookupToFlush.length} rows); will retry:`, 628 + err 629 + ) 630 + this.lookupBuffer.unshift(...lookupToFlush) 631 + lookupOk = false 632 + } 633 + } 634 + 635 + if (countsToFlush.length > 0) { 636 + try { 637 + await this.deps.insertCountsDeltas(countsToFlush) 638 + } catch (err) { 639 + console.error( 640 + `[JetstreamConsumer] virality counts flush failed (${countsToFlush.length} rows); will retry:`, 641 + err 642 + ) 643 + this.countsBuffer.unshift(...countsToFlush) 644 + countsOk = false 645 + } 646 + } 647 + 648 + if (eventsOk && snapshotsOk && lookupOk && countsOk) { 515 649 if (cursorAtFlushTime !== null) { 516 650 this.lastFlushedCursor = cursorAtFlushTime 517 651 await this.checkpointCursor(cursorAtFlushTime) 518 652 } 519 653 } else { 520 - // Restore pendingCursor so reconnect uses the right value 521 654 if (cursorAtFlushTime !== null) { 522 655 if (this.pendingCursor === null || cursorAtFlushTime > this.pendingCursor) { 523 656 this.pendingCursor = cursorAtFlushTime
-59
commands/firehose_watch.ts
··· 1 - import { BaseCommand } from '@adonisjs/core/ace' 2 - import { ClickHouseStore } from '#lib/clickhouse/index' 3 - import { FirehoseConsumer, resolveFirehoseJetstreamUrl } from '#services/firehose_consumer' 4 - import { readCursor, writeCursor } from '#services/firehose_cursor_io' 5 - 6 - /** 7 - * Ace command: node ace firehose:watch 8 - * 9 - * Connects to the Bluesky jetstream and streams the unfiltered 10 - * app.bsky.feed.like collection into ClickHouse so the virality-threshold 11 - * job can detect posts crossing configurable like counts. 12 - * 13 - * Long-lived daemon process — uses `staysAlive: true` so Adonis does not 14 - * exit when `run()` returns. The process remains alive until shutdown() 15 - * is called (on SIGTERM/SIGINT), which causes start() to resolve. 16 - */ 17 - export default class FirehoseWatch extends BaseCommand { 18 - static commandName = 'firehose:watch' 19 - static description = 20 - 'Consume the unfiltered app.bsky.feed.like jetstream into ClickHouse (virality)' 21 - 22 - static options = { startApp: true, staysAlive: true } 23 - 24 - async run() { 25 - const clickHouseStore = await this.app.container.make(ClickHouseStore) 26 - 27 - const consumer = new FirehoseConsumer({ 28 - createWebSocket: (url: string) => new WebSocket(url), 29 - readCursor, 30 - writeCursor, 31 - lookupSubjectUri: (likeUri: string) => clickHouseStore.lookupSubjectUri(likeUri), 32 - insertLookupRows: (rows) => clickHouseStore.insertLikeEventsLookup(rows), 33 - insertCountsDeltas: (rows) => clickHouseStore.insertLikeCountsDailyDeltas(rows), 34 - now: () => new Date(), 35 - }) 36 - 37 - const handleSignal = () => { 38 - this.logger.info('Shutdown signal received — flushing and closing...') 39 - consumer 40 - .shutdown() 41 - .then(() => { 42 - this.logger.info('FirehoseConsumer shut down cleanly') 43 - process.exit(0) 44 - }) 45 - .catch((err: unknown) => { 46 - const message = err instanceof Error ? err.message : String(err) 47 - this.logger.error(`Error during shutdown: ${message}`) 48 - process.exit(1) 49 - }) 50 - } 51 - 52 - process.on('SIGTERM', handleSignal) 53 - process.on('SIGINT', handleSignal) 54 - 55 - this.logger.info(`Starting FirehoseConsumer (URL=${resolveFirehoseJetstreamUrl()})`) 56 - 57 - await consumer.start() 58 - } 59 - }
+5
commands/jetstream_consume.ts
··· 76 76 updateUserHandle: async (did: string, handle: string) => { 77 77 await TrackedProfile.query().where('did', did).update({ handle }) 78 78 }, 79 + 80 + // Virality pipeline — merged from the old firehose:watch worker. 81 + lookupSubjectUri: (likeUri: string) => clickHouseStore.lookupSubjectUri(likeUri), 82 + insertLookupRows: (rows) => clickHouseStore.insertLikeEventsLookup(rows), 83 + insertCountsDeltas: (rows) => clickHouseStore.insertLikeCountsDailyDeltas(rows), 79 84 }) 80 85 81 86 // Graceful shutdown on SIGTERM (Docker stop, systemd stop, k8s eviction)
+18
database/migrations/1776250000000_drop_firehose_cursor.ts
··· 1 + import { BaseSchema } from '@adonisjs/lucid/schema' 2 + 3 + export default class extends BaseSchema { 4 + protected tableName = 'firehose_cursor' 5 + 6 + async up() { 7 + this.schema.dropTableIfExists(this.tableName) 8 + } 9 + 10 + async down() { 11 + this.schema.createTable(this.tableName, (table) => { 12 + table.integer('id').primary() 13 + table.check('?? = 1', ['id'], 'firehose_cursor_singleton') 14 + table.integer('cursor_us').notNullable() 15 + table.integer('updated_at').notNullable() 16 + }) 17 + } 18 + }
+6 -7
docker-compose.yml
··· 163 163 <<: *common-env 164 164 OTEL_SERVICE_NAME: favs-blue-worker 165 165 166 - firehose-worker: 166 + scheduler: 167 167 image: favs-blue:${GIT_SHA:-latest} 168 - command: node ace.js firehose:watch 169 - stop_grace_period: 30s 168 + command: node ace.js scheduler:run 170 169 volumes: 171 170 - sqlite-data:/data 172 171 deploy: 172 + replicas: 1 173 173 restart_policy: 174 174 condition: any 175 175 update_config: ··· 177 177 failure_action: rollback 178 178 resources: 179 179 limits: 180 - memory: 384M 181 - cpus: '0.5' 180 + memory: 256M 181 + cpus: '0.25' 182 182 logging: *default-logging 183 183 healthcheck: 184 184 test: ['CMD', 'node', '-e', 'process.exit(0)'] ··· 188 188 start_period: 10s 189 189 environment: 190 190 <<: *common-env 191 - JETSTREAM_URL: wss://jetstream2.us-east.bsky.network/subscribe 192 - OTEL_SERVICE_NAME: favs-blue-firehose 191 + OTEL_SERVICE_NAME: favs-blue-scheduler 193 192 194 193 volumes: 195 194 clickhouse-data:
+2 -2
package.json
··· 18 18 "start": "node bin/server.js", 19 19 "build": "node ace build", 20 20 "predev": "pnpm migrate", 21 - "dev": "concurrently --kill-others -n web,jetstream,queue -c blue,green,yellow \"node ace serve --hmr\" \"node ace jetstream:consume\" \"node ace queue:work\"", 21 + "dev": "concurrently --kill-others -n web,jetstream,queue,scheduler -c blue,green,yellow,cyan \"node ace serve --hmr\" \"node ace jetstream:consume\" \"node ace queue:work\" \"node ace scheduler:run\"", 22 22 "migrate": "node ace migration:run --force && node ace clickhouse:migrate", 23 23 "test": "node ace test", 24 24 "lint": "eslint .", ··· 89 89 "@resvg/resvg-js": "^2.6.2", 90 90 "@tailwindcss/vite": "^4.2.2", 91 91 "@vinejs/vine": "^4.3.0", 92 - "adonisjs-scheduler": "^2.7.0", 93 92 "better-sqlite3": "^12.8.0", 93 + "croner": "^10.0.1", 94 94 "edge.js": "^6.5.0", 95 95 "luxon": "^3.7.2", 96 96 "posthog-js": "^1.367.0",
-399
tests/unit/services/firehose_consumer.spec.ts
··· 1 - /** 2 - * Unit tests for FirehoseConsumer. 3 - * 4 - * All dependencies (WebSocket, cursor IO, ClickHouse) are injected as fakes 5 - * so these tests run without external services. 6 - */ 7 - import { test } from '@japa/runner' 8 - import { 9 - FirehoseConsumer, 10 - type WebSocketLike, 11 - type FirehoseConsumerDeps, 12 - } from '#services/firehose_consumer' 13 - import type { LikeCountsDailyDeltaRow, LikeEventLookupRow } from '#lib/clickhouse/index' 14 - 15 - // --------------------------------------------------------------------------- 16 - // FakeWebSocket 17 - // --------------------------------------------------------------------------- 18 - 19 - class FakeWebSocket implements WebSocketLike { 20 - onmessage?: ((ev: any) => void) | null 21 - onclose?: ((ev: any) => void) | null 22 - onerror?: ((ev: any) => void) | null 23 - onopen?: (() => void) | null 24 - 25 - closeCallCount = 0 26 - 27 - close() { 28 - this.closeCallCount++ 29 - } 30 - 31 - emit(event: unknown) { 32 - this.onmessage?.({ data: JSON.stringify(event) }) 33 - } 34 - 35 - emitClose(code = 1006) { 36 - this.onclose?.({ code, reason: 'test close' }) 37 - } 38 - 39 - emitOpen() { 40 - this.onopen?.() 41 - } 42 - } 43 - 44 - // --------------------------------------------------------------------------- 45 - // Event fixtures 46 - // --------------------------------------------------------------------------- 47 - 48 - const AUTHOR_DID = 'did:plc:author001' 49 - const ACTOR_DID = 'did:plc:actor001' 50 - 51 - function makeLikeCreate( 52 - actorDid = ACTOR_DID, 53 - authorDid = AUTHOR_DID, 54 - rkey = 'likerkeyaaa', 55 - timeUs = 1725911162329308 56 - ) { 57 - return { 58 - did: actorDid, 59 - time_us: timeUs, 60 - kind: 'commit', 61 - commit: { 62 - operation: 'create', 63 - collection: 'app.bsky.feed.like', 64 - rkey, 65 - record: { 66 - $type: 'app.bsky.feed.like', 67 - createdAt: '2025-01-15T12:00:00Z', 68 - subject: { 69 - cid: 'bafycid', 70 - uri: `at://${authorDid}/app.bsky.feed.post/postrkey001`, 71 - }, 72 - }, 73 - }, 74 - } 75 - } 76 - 77 - function makeLikeDelete(actorDid = ACTOR_DID, rkey = 'likerkeyaaa', timeUs = 1725911162329309) { 78 - return { 79 - did: actorDid, 80 - time_us: timeUs, 81 - kind: 'commit', 82 - commit: { 83 - operation: 'delete', 84 - collection: 'app.bsky.feed.like', 85 - rkey, 86 - }, 87 - } 88 - } 89 - 90 - // --------------------------------------------------------------------------- 91 - // Consumer factory 92 - // --------------------------------------------------------------------------- 93 - 94 - interface Setup { 95 - fakeWs: FakeWebSocket 96 - consumer: FirehoseConsumer 97 - lookupInserts: LikeEventLookupRow[][] 98 - countsInserts: LikeCountsDailyDeltaRow[][] 99 - writeCursorCalls: bigint[] 100 - lookupSubjectCalls: string[] 101 - lookupSubjectMap: Map<string, string | null> 102 - connectedUrls: string[] 103 - } 104 - 105 - function makeConsumer( 106 - options: { 107 - initialCursor?: bigint | null 108 - subjectLookup?: Map<string, string | null> 109 - insertLookupShouldThrow?: boolean 110 - insertCountsShouldThrow?: boolean 111 - instantReconnect?: boolean 112 - now?: Date 113 - } = {} 114 - ): Setup { 115 - const { 116 - initialCursor = null, 117 - subjectLookup = new Map<string, string | null>(), 118 - insertLookupShouldThrow = false, 119 - insertCountsShouldThrow = false, 120 - instantReconnect = false, 121 - now = new Date('2026-04-14T12:00:00Z'), 122 - } = options 123 - 124 - const fakeWs = new FakeWebSocket() 125 - const lookupInserts: LikeEventLookupRow[][] = [] 126 - const countsInserts: LikeCountsDailyDeltaRow[][] = [] 127 - const writeCursorCalls: bigint[] = [] 128 - const lookupSubjectCalls: string[] = [] 129 - const connectedUrls: string[] = [] 130 - 131 - const deps: FirehoseConsumerDeps = { 132 - createWebSocket(url: string) { 133 - connectedUrls.push(url) 134 - return fakeWs 135 - }, 136 - readCursor: async () => initialCursor, 137 - writeCursor: async (cursor: bigint) => { 138 - writeCursorCalls.push(cursor) 139 - }, 140 - lookupSubjectUri: async (likeUri: string) => { 141 - lookupSubjectCalls.push(likeUri) 142 - return subjectLookup.has(likeUri) ? subjectLookup.get(likeUri)! : null 143 - }, 144 - insertLookupRows: async (rows) => { 145 - if (insertLookupShouldThrow) throw new Error('ClickHouse unavailable (lookup)') 146 - lookupInserts.push([...rows]) 147 - }, 148 - insertCountsDeltas: async (rows) => { 149 - if (insertCountsShouldThrow) throw new Error('ClickHouse unavailable (counts)') 150 - countsInserts.push([...rows]) 151 - }, 152 - now: () => now, 153 - ...(instantReconnect ? { reconnectDelay: (_ms: number) => Promise.resolve() } : {}), 154 - } 155 - 156 - const consumer = new FirehoseConsumer(deps) 157 - 158 - return { 159 - fakeWs, 160 - consumer, 161 - lookupInserts, 162 - countsInserts, 163 - writeCursorCalls, 164 - lookupSubjectCalls, 165 - lookupSubjectMap: subjectLookup, 166 - connectedUrls, 167 - } 168 - } 169 - 170 - async function startInBackground(consumer: FirehoseConsumer): Promise<void> { 171 - void consumer.start() 172 - await new Promise((r) => setTimeout(r, 0)) 173 - } 174 - 175 - // --------------------------------------------------------------------------- 176 - // Tests 177 - // --------------------------------------------------------------------------- 178 - 179 - test.group('FirehoseConsumer — like create', () => { 180 - test('creates a lookup row and a +1 delta', async ({ assert }) => { 181 - const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer() 182 - await startInBackground(consumer) 183 - 184 - fakeWs.emit(makeLikeCreate()) 185 - await consumer.flushBuffers() 186 - 187 - assert.equal(lookupInserts.length, 1) 188 - assert.equal(lookupInserts[0].length, 1) 189 - assert.equal(lookupInserts[0][0].likeUri, `at://${ACTOR_DID}/app.bsky.feed.like/likerkeyaaa`) 190 - assert.equal( 191 - lookupInserts[0][0].subjectUri, 192 - `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001` 193 - ) 194 - 195 - assert.equal(countsInserts.length, 1) 196 - assert.equal(countsInserts[0].length, 1) 197 - assert.equal( 198 - countsInserts[0][0].subjectUri, 199 - `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001` 200 - ) 201 - assert.equal(countsInserts[0][0].count, 1) 202 - 203 - await consumer.shutdown() 204 - }) 205 - }) 206 - 207 - test.group('FirehoseConsumer — like delete', () => { 208 - test('with matching lookup produces a -1 delta', async ({ assert }) => { 209 - const likeUri = `at://${ACTOR_DID}/app.bsky.feed.like/likerkeyaaa` 210 - const subjectUri = `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001` 211 - const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer({ 212 - subjectLookup: new Map([[likeUri, subjectUri]]), 213 - }) 214 - await startInBackground(consumer) 215 - 216 - fakeWs.emit(makeLikeDelete()) 217 - await consumer.flushBuffers() 218 - 219 - assert.equal(lookupInserts.length, 0, 'no new lookup row on delete') 220 - assert.equal(countsInserts.length, 1) 221 - assert.equal(countsInserts[0].length, 1) 222 - assert.equal(countsInserts[0][0].subjectUri, subjectUri) 223 - assert.equal(countsInserts[0][0].count, -1) 224 - 225 - await consumer.shutdown() 226 - }) 227 - 228 - test('with no matching lookup is dropped silently', async ({ assert }) => { 229 - const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer({ 230 - subjectLookup: new Map(), 231 - }) 232 - await startInBackground(consumer) 233 - 234 - fakeWs.emit(makeLikeDelete()) 235 - await consumer.flushBuffers() 236 - 237 - assert.equal(lookupInserts.length, 0) 238 - assert.equal(countsInserts.length, 0) 239 - 240 - await consumer.shutdown() 241 - }) 242 - }) 243 - 244 - test.group('FirehoseConsumer — buffering', () => { 245 - test('flushes when buffer reaches 1000 rows', async ({ assert }) => { 246 - const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer() 247 - await startInBackground(consumer) 248 - 249 - for (let i = 0; i < 1000; i++) { 250 - fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, `rkey${i}`, 1725911162329308 + i)) 251 - } 252 - 253 - // Let microtasks settle for the automatic flush 254 - await new Promise((r) => setTimeout(r, 10)) 255 - 256 - assert.isAbove(lookupInserts.length, 0, 'lookup auto-flushed at size threshold') 257 - assert.isAbove(countsInserts.length, 0, 'counts auto-flushed at size threshold') 258 - 259 - await consumer.shutdown() 260 - }) 261 - }) 262 - 263 - test.group('FirehoseConsumer — cursor', () => { 264 - test('advances cursor only after successful flush', async ({ assert }) => { 265 - const { fakeWs, consumer, writeCursorCalls } = makeConsumer() 266 - await startInBackground(consumer) 267 - 268 - fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, 'r1', 1725911162329308)) 269 - await consumer.flushBuffers() 270 - 271 - assert.equal(writeCursorCalls.length, 1) 272 - assert.equal(writeCursorCalls[0], 1725911162329308n) 273 - 274 - await consumer.shutdown() 275 - }) 276 - 277 - test('cursor is NOT advanced if the lookup flush fails', async ({ assert }) => { 278 - const { fakeWs, consumer, writeCursorCalls } = makeConsumer({ 279 - insertLookupShouldThrow: true, 280 - }) 281 - await startInBackground(consumer) 282 - 283 - fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, 'r1', 1725911162329308)) 284 - await consumer.flushBuffers() 285 - 286 - assert.equal(writeCursorCalls.length, 0) 287 - 288 - await consumer.shutdown() 289 - }) 290 - 291 - test('failed flush re-queues rows at the front of the buffer', async ({ assert }) => { 292 - const { fakeWs, consumer, lookupInserts } = makeConsumer({ 293 - insertLookupShouldThrow: true, 294 - }) 295 - await startInBackground(consumer) 296 - 297 - fakeWs.emit(makeLikeCreate(ACTOR_DID, AUTHOR_DID, 'r1', 1725911162329308)) 298 - await consumer.flushBuffers() 299 - 300 - assert.equal(lookupInserts.length, 0, 'first attempt failed so no successful insert') 301 - 302 - // Now flip the flag so the retry succeeds 303 - ;(consumer as any).deps.insertLookupRows = async (rows: LikeEventLookupRow[]) => { 304 - lookupInserts.push([...rows]) 305 - } 306 - 307 - await consumer.flushBuffers() 308 - assert.equal(lookupInserts.length, 1, 'retry picked up the re-queued rows') 309 - assert.equal(lookupInserts[0].length, 1) 310 - 311 - await consumer.shutdown() 312 - }) 313 - }) 314 - 315 - test.group('FirehoseConsumer — lookup failure', () => { 316 - test('closes the websocket to force a reconnect when lookup throws', async ({ assert }) => { 317 - const { fakeWs, consumer, writeCursorCalls, countsInserts } = makeConsumer() 318 - // Swap in a throwing lookup after construction 319 - ;(consumer as any).deps.lookupSubjectUri = async () => { 320 - throw new Error('ClickHouse unavailable (lookup)') 321 - } 322 - await startInBackground(consumer) 323 - 324 - fakeWs.emit(makeLikeDelete(ACTOR_DID, 'likerkeyaaa', 1725911162329309)) 325 - 326 - // Let the processing chain settle 327 - await (consumer as any).processing 328 - await consumer.flushBuffers() 329 - 330 - assert.equal(fakeWs.closeCallCount, 1, 'ws.close() was called to force reconnect') 331 - assert.equal(countsInserts.length, 0, 'no delta recorded for the failed unlike') 332 - assert.equal(writeCursorCalls.length, 0, 'cursor was not advanced past the failed unlike') 333 - 334 - await consumer.shutdown() 335 - }) 336 - 337 - test('a rejecting handleUnlike does not poison the processing chain', async ({ assert }) => { 338 - const likeUri2 = `at://${ACTOR_DID}/app.bsky.feed.like/goodrkey` 339 - const subjectUri2 = `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey002` 340 - const { fakeWs, consumer, countsInserts } = makeConsumer({ 341 - subjectLookup: new Map([[likeUri2, subjectUri2]]), 342 - }) 343 - 344 - // Replace handleUnlike so the first invocation rejects — simulates a 345 - // future refactor where the method itself can throw outside the inner 346 - // try/catch. Subsequent invocations delegate to the original. 347 - const original = (consumer as any).handleUnlike.bind(consumer) 348 - let firstCall = true 349 - ;(consumer as any).handleUnlike = async (uri: string, timeUs: bigint | null) => { 350 - if (firstCall) { 351 - firstCall = false 352 - throw new Error('boom') 353 - } 354 - return original(uri, timeUs) 355 - } 356 - 357 - await startInBackground(consumer) 358 - 359 - // First unlike: handleUnlike rejects 360 - fakeWs.emit(makeLikeDelete(ACTOR_DID, 'likerkeyaaa', 1725911162329309)) 361 - // Second unlike: must still be processed despite the earlier rejection 362 - fakeWs.emit(makeLikeDelete(ACTOR_DID, 'goodrkey', 1725911162329310)) 363 - 364 - await (consumer as any).processing 365 - await consumer.flushBuffers() 366 - 367 - assert.equal(countsInserts.length, 1, 'second unlike still produced a delta') 368 - assert.equal(countsInserts[0][0].subjectUri, subjectUri2) 369 - assert.equal(countsInserts[0][0].count, -1) 370 - 371 - await consumer.shutdown() 372 - }) 373 - }) 374 - 375 - test.group('FirehoseConsumer — URL construction', () => { 376 - test('subscribes only to app.bsky.feed.like and omits wantedDids', async ({ assert }) => { 377 - const { consumer, connectedUrls } = makeConsumer() 378 - await startInBackground(consumer) 379 - 380 - assert.equal(connectedUrls.length, 1) 381 - const url = new URL(connectedUrls[0]) 382 - const collections = url.searchParams.getAll('wantedCollections') 383 - assert.deepEqual(collections, ['app.bsky.feed.like']) 384 - assert.isNull(url.searchParams.get('wantedDids')) 385 - 386 - await consumer.shutdown() 387 - }) 388 - 389 - test('includes cursor query param when readCursor returns a value', async ({ assert }) => { 390 - const cursor = 1725911162329308n 391 - const { consumer, connectedUrls } = makeConsumer({ initialCursor: cursor }) 392 - await startInBackground(consumer) 393 - 394 - const url = new URL(connectedUrls[0]) 395 - assert.equal(url.searchParams.get('cursor'), cursor.toString()) 396 - 397 - await consumer.shutdown() 398 - }) 399 - })
-34
tests/unit/services/firehose_cursor_io.spec.ts
··· 1 - /** 2 - * Unit tests for firehose_cursor_io — the SQLite cursor read/write helpers 3 - * for the firehose worker. 4 - * 5 - * Mirrors tests/unit/services/jetstream_cursor_io.spec.ts — the firehose 6 - * cursor uses an independent SQLite row so it can be checkpointed without 7 - * interfering with the existing jetstream consumer cursor. 8 - */ 9 - import { test } from '@japa/runner' 10 - import testUtils from '@adonisjs/core/services/test_utils' 11 - import { readCursor, writeCursor } from '#services/firehose_cursor_io' 12 - 13 - test.group('FirehoseCursorIO — BigInt round-trip', (group) => { 14 - group.each.setup(() => testUtils.db().withGlobalTransaction()) 15 - 16 - test('returns null when no cursor row exists', async ({ assert }) => { 17 - const result = await readCursor() 18 - assert.isNull(result) 19 - }) 20 - 21 - test('cursor round-trips a normal value', async ({ assert }) => { 22 - const value = 1725911162329308n 23 - await writeCursor(value) 24 - const read = await readCursor() 25 - assert.equal(read, value) 26 - }) 27 - 28 - test('cursor round-trips BigInt values above 2^53', async ({ assert }) => { 29 - const big = (1n << 60n) + 42n 30 - await writeCursor(big) 31 - const read = await readCursor() 32 - assert.equal(read, big) 33 - }) 34 - })
+283 -3
tests/unit/services/jetstream_consumer.spec.ts
··· 6 6 */ 7 7 import { test } from '@japa/runner' 8 8 import type { ClickHouseStore } from '#lib/clickhouse/index' 9 - import type { EngagementEventRow } from '#lib/clickhouse/index' 10 - import type { PostSnapshot } from '#lib/clickhouse/index' 9 + import type { 10 + EngagementEventRow, 11 + LikeCountsDailyDeltaRow, 12 + LikeEventLookupRow, 13 + PostSnapshot, 14 + } from '#lib/clickhouse/index' 11 15 import { 12 16 JetstreamConsumer, 13 17 type WebSocketLike, ··· 391 395 updateUserHandleCalls: Array<{ did: string; handle: string }> 392 396 lastConnectedUrl: { value: string } 393 397 connectedUrls: string[] 398 + lookupInserts: LikeEventLookupRow[][] 399 + countsInserts: LikeCountsDailyDeltaRow[][] 400 + lookupSubjectCalls: string[] 394 401 } 395 402 396 403 function makeConsumer( ··· 399 406 initialCursor?: bigint | null 400 407 storeShouldThrow?: boolean 401 408 instantReconnect?: boolean 409 + subjectLookup?: Map<string, string | null> 410 + insertLookupShouldThrow?: boolean 411 + insertCountsShouldThrow?: boolean 412 + now?: Date 402 413 } = {} 403 414 ): ConsumerSetup { 404 415 const { ··· 406 417 initialCursor = null, 407 418 storeShouldThrow = false, 408 419 instantReconnect = false, 420 + subjectLookup = new Map<string, string | null>(), 421 + insertLookupShouldThrow = false, 422 + insertCountsShouldThrow = false, 423 + now = new Date('2024-09-09T19:46:00.000Z'), 409 424 } = options 410 425 411 426 const fakeWs = new FakeWebSocket() ··· 415 430 const updateUserHandleCalls: Array<{ did: string; handle: string }> = [] 416 431 const lastConnectedUrl: { value: string } = { value: '' } 417 432 const connectedUrls: string[] = [] 433 + const lookupInserts: LikeEventLookupRow[][] = [] 434 + const countsInserts: LikeCountsDailyDeltaRow[][] = [] 435 + const lookupSubjectCalls: string[] = [] 418 436 419 437 let currentTrackedDids = trackedDids 420 438 ··· 430 448 writeCursor: async (cursor: bigint) => { 431 449 writeCursorCalls.push(cursor) 432 450 }, 433 - now: () => new Date('2024-09-09T19:46:00.000Z'), 451 + now: () => now, 434 452 markUserDeleted: async (did: string) => { 435 453 markUserDeletedCalls.push(did) 436 454 }, 437 455 updateUserHandle: async (did: string, handle: string) => { 438 456 updateUserHandleCalls.push({ did, handle }) 439 457 }, 458 + lookupSubjectUri: async (likeUri: string) => { 459 + lookupSubjectCalls.push(likeUri) 460 + return subjectLookup.has(likeUri) ? subjectLookup.get(likeUri)! : null 461 + }, 462 + insertLookupRows: async (rows: LikeEventLookupRow[]) => { 463 + if (insertLookupShouldThrow) throw new Error('ClickHouse unavailable (lookup)') 464 + lookupInserts.push([...rows]) 465 + }, 466 + insertCountsDeltas: async (rows: LikeCountsDailyDeltaRow[]) => { 467 + if (insertCountsShouldThrow) throw new Error('ClickHouse unavailable (counts)') 468 + countsInserts.push([...rows]) 469 + }, 440 470 ...(instantReconnect ? { reconnectDelay: (_ms: number) => Promise.resolve() } : {}), 441 471 } 442 472 ··· 456 486 updateUserHandleCalls, 457 487 lastConnectedUrl, 458 488 connectedUrls, 489 + lookupInserts, 490 + countsInserts, 491 + lookupSubjectCalls, 459 492 } 460 493 } 461 494 ··· 1265 1298 now: () => new Date('2024-09-09T19:46:00.000Z'), 1266 1299 markUserDeleted: async (_did: string) => {}, 1267 1300 updateUserHandle: async (_did: string, _handle: string) => {}, 1301 + lookupSubjectUri: async (_uri: string) => null, 1302 + insertLookupRows: async (_rows: LikeEventLookupRow[]) => {}, 1303 + insertCountsDeltas: async (_rows: LikeCountsDailyDeltaRow[]) => {}, 1268 1304 } 1269 1305 1270 1306 const consumer = new JetstreamConsumer(store, deps) ··· 1369 1405 }) 1370 1406 } 1371 1407 ) 1408 + 1409 + // --------------------------------------------------------------------------- 1410 + // Virality pipeline (merged from firehose_consumer.spec.ts) 1411 + // 1412 + // Every like / unlike on the network — regardless of whether the author is 1413 + // tracked — must produce a lookup row and a per-day count delta so the 1414 + // threshold-scan job can detect virality spikes. 1415 + // --------------------------------------------------------------------------- 1416 + 1417 + const VIRALITY_AUTHOR_DID = 'did:plc:virality_author' 1418 + const VIRALITY_ACTOR_DID = 'did:plc:virality_actor' 1419 + 1420 + function makeViralityLikeCreate( 1421 + actorDid = VIRALITY_ACTOR_DID, 1422 + authorDid = VIRALITY_AUTHOR_DID, 1423 + rkey = 'viralitylike1', 1424 + timeUs = 1725911162329308 1425 + ) { 1426 + return { 1427 + did: actorDid, 1428 + time_us: timeUs, 1429 + kind: 'commit', 1430 + commit: { 1431 + operation: 'create', 1432 + collection: 'app.bsky.feed.like', 1433 + rkey, 1434 + record: { 1435 + $type: 'app.bsky.feed.like', 1436 + createdAt: '2025-01-15T12:00:00Z', 1437 + subject: { 1438 + cid: 'bafyvirality', 1439 + uri: `at://${authorDid}/app.bsky.feed.post/viralitypost1`, 1440 + }, 1441 + }, 1442 + }, 1443 + } 1444 + } 1445 + 1446 + function makeViralityLikeDelete( 1447 + actorDid = VIRALITY_ACTOR_DID, 1448 + rkey = 'viralitylike1', 1449 + timeUs = 1725911162329309 1450 + ) { 1451 + return { 1452 + did: actorDid, 1453 + time_us: timeUs, 1454 + kind: 'commit', 1455 + commit: { 1456 + operation: 'delete', 1457 + collection: 'app.bsky.feed.like', 1458 + rkey, 1459 + }, 1460 + } 1461 + } 1462 + 1463 + test.group('JetstreamConsumer — virality: like create', () => { 1464 + test('untracked author like produces a lookup row and a +1 delta', async ({ assert }) => { 1465 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer() 1466 + await startConsumerInBackground(consumer) 1467 + 1468 + fakeWs.emit(makeViralityLikeCreate()) 1469 + await consumer.flushBuffer() 1470 + 1471 + assert.equal(lookupInserts.length, 1) 1472 + assert.equal(lookupInserts[0].length, 1) 1473 + assert.equal( 1474 + lookupInserts[0][0].likeUri, 1475 + `at://${VIRALITY_ACTOR_DID}/app.bsky.feed.like/viralitylike1` 1476 + ) 1477 + assert.equal( 1478 + lookupInserts[0][0].subjectUri, 1479 + `at://${VIRALITY_AUTHOR_DID}/app.bsky.feed.post/viralitypost1` 1480 + ) 1481 + 1482 + assert.equal(countsInserts.length, 1) 1483 + assert.equal(countsInserts[0].length, 1) 1484 + assert.equal( 1485 + countsInserts[0][0].subjectUri, 1486 + `at://${VIRALITY_AUTHOR_DID}/app.bsky.feed.post/viralitypost1` 1487 + ) 1488 + assert.equal(countsInserts[0][0].count, 1) 1489 + 1490 + await consumer.shutdown() 1491 + }) 1492 + 1493 + test('tracked author like also produces virality rows alongside engagement row', async ({ 1494 + assert, 1495 + }) => { 1496 + const { fakeWs, store, consumer, lookupInserts, countsInserts } = makeConsumer() 1497 + await startConsumerInBackground(consumer) 1498 + 1499 + fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID)) 1500 + await consumer.flushBuffer() 1501 + 1502 + // Engagement pipeline still fires for tracked authors 1503 + assert.equal(store.insertCalls.length, 1) 1504 + assert.equal(store.insertCalls[0][0].kind, 'like') 1505 + 1506 + // Virality pipeline fires too — one lookup row and one +1 delta 1507 + assert.equal(lookupInserts.length, 1) 1508 + assert.equal(lookupInserts[0].length, 1) 1509 + assert.equal(countsInserts.length, 1) 1510 + assert.equal(countsInserts[0][0].count, 1) 1511 + 1512 + await consumer.shutdown() 1513 + }) 1514 + }) 1515 + 1516 + test.group('JetstreamConsumer — virality: like delete', () => { 1517 + test('with matching lookup produces a -1 delta', async ({ assert }) => { 1518 + const likeUri = `at://${VIRALITY_ACTOR_DID}/app.bsky.feed.like/viralitylike1` 1519 + const subjectUri = `at://${VIRALITY_AUTHOR_DID}/app.bsky.feed.post/viralitypost1` 1520 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer({ 1521 + subjectLookup: new Map([[likeUri, subjectUri]]), 1522 + }) 1523 + await startConsumerInBackground(consumer) 1524 + 1525 + fakeWs.emit(makeViralityLikeDelete()) 1526 + await (consumer as any).processing 1527 + await consumer.flushBuffer() 1528 + 1529 + assert.equal(lookupInserts.length, 0, 'no new lookup row on delete') 1530 + assert.equal(countsInserts.length, 1) 1531 + assert.equal(countsInserts[0].length, 1) 1532 + assert.equal(countsInserts[0][0].subjectUri, subjectUri) 1533 + assert.equal(countsInserts[0][0].count, -1) 1534 + 1535 + await consumer.shutdown() 1536 + }) 1537 + 1538 + test('with no matching lookup is dropped silently', async ({ assert }) => { 1539 + const { fakeWs, consumer, lookupInserts, countsInserts } = makeConsumer({ 1540 + subjectLookup: new Map(), 1541 + }) 1542 + await startConsumerInBackground(consumer) 1543 + 1544 + fakeWs.emit(makeViralityLikeDelete()) 1545 + await (consumer as any).processing 1546 + await consumer.flushBuffer() 1547 + 1548 + assert.equal(lookupInserts.length, 0) 1549 + assert.equal(countsInserts.length, 0) 1550 + 1551 + await consumer.shutdown() 1552 + }) 1553 + }) 1554 + 1555 + test.group('JetstreamConsumer — virality: cursor', () => { 1556 + test('cursor is NOT advanced if the lookup flush fails', async ({ assert }) => { 1557 + const { fakeWs, consumer, writeCursorCalls } = makeConsumer({ 1558 + insertLookupShouldThrow: true, 1559 + }) 1560 + await startConsumerInBackground(consumer) 1561 + 1562 + fakeWs.emit(makeViralityLikeCreate(VIRALITY_ACTOR_DID, VIRALITY_AUTHOR_DID, 'r1')) 1563 + await consumer.flushBuffer() 1564 + 1565 + assert.equal(writeCursorCalls.length, 0) 1566 + 1567 + await consumer.shutdown() 1568 + }) 1569 + 1570 + test('failed virality flush re-queues rows at the front of the buffer', async ({ assert }) => { 1571 + const { fakeWs, consumer, lookupInserts } = makeConsumer({ 1572 + insertLookupShouldThrow: true, 1573 + }) 1574 + await startConsumerInBackground(consumer) 1575 + 1576 + fakeWs.emit(makeViralityLikeCreate(VIRALITY_ACTOR_DID, VIRALITY_AUTHOR_DID, 'r1')) 1577 + await consumer.flushBuffer() 1578 + 1579 + assert.equal(lookupInserts.length, 0, 'first attempt failed so no successful insert') 1580 + 1581 + // Flip the flag so the retry succeeds 1582 + ;(consumer as any).deps.insertLookupRows = async (rows: LikeEventLookupRow[]) => { 1583 + lookupInserts.push([...rows]) 1584 + } 1585 + 1586 + await consumer.flushBuffer() 1587 + assert.equal(lookupInserts.length, 1, 'retry picked up the re-queued rows') 1588 + assert.equal(lookupInserts[0].length, 1) 1589 + 1590 + await consumer.shutdown() 1591 + }) 1592 + }) 1593 + 1594 + test.group('JetstreamConsumer — virality: lookup failure forces reconnect', () => { 1595 + test('closes the websocket to force a reconnect when lookupSubjectUri throws', async ({ 1596 + assert, 1597 + }) => { 1598 + const { fakeWs, consumer, writeCursorCalls, countsInserts } = makeConsumer() 1599 + // Swap in a throwing lookup after construction 1600 + ;(consumer as any).deps.lookupSubjectUri = async () => { 1601 + throw new Error('ClickHouse unavailable (lookup)') 1602 + } 1603 + await startConsumerInBackground(consumer) 1604 + 1605 + fakeWs.emit(makeViralityLikeDelete()) 1606 + 1607 + // Let the processing chain settle 1608 + await (consumer as any).processing 1609 + await consumer.flushBuffer() 1610 + 1611 + assert.equal(fakeWs.closeCallCount, 1, 'ws.close() was called to force reconnect') 1612 + assert.equal(countsInserts.length, 0, 'no delta recorded for the failed unlike') 1613 + assert.equal(writeCursorCalls.length, 0, 'cursor was not advanced past the failed unlike') 1614 + 1615 + await consumer.shutdown() 1616 + }) 1617 + 1618 + test('a rejecting handleUnlike does not poison the processing chain', async ({ assert }) => { 1619 + const likeUri2 = `at://${VIRALITY_ACTOR_DID}/app.bsky.feed.like/goodrkey` 1620 + const subjectUri2 = `at://${VIRALITY_AUTHOR_DID}/app.bsky.feed.post/viralitypost2` 1621 + const { fakeWs, consumer, countsInserts } = makeConsumer({ 1622 + subjectLookup: new Map([[likeUri2, subjectUri2]]), 1623 + }) 1624 + 1625 + // Replace handleUnlike so the first invocation rejects — simulates a 1626 + // future refactor where the method itself throws outside the inner try. 1627 + const original = (consumer as any).handleUnlike.bind(consumer) 1628 + let firstCall = true 1629 + ;(consumer as any).handleUnlike = async (uri: string, timeUs: bigint | null) => { 1630 + if (firstCall) { 1631 + firstCall = false 1632 + throw new Error('boom') 1633 + } 1634 + return original(uri, timeUs) 1635 + } 1636 + 1637 + await startConsumerInBackground(consumer) 1638 + 1639 + fakeWs.emit(makeViralityLikeDelete(VIRALITY_ACTOR_DID, 'viralitylike1', 1725911162329309)) 1640 + fakeWs.emit(makeViralityLikeDelete(VIRALITY_ACTOR_DID, 'goodrkey', 1725911162329310)) 1641 + 1642 + await (consumer as any).processing 1643 + await consumer.flushBuffer() 1644 + 1645 + assert.equal(countsInserts.length, 1, 'second unlike still produced a delta') 1646 + assert.equal(countsInserts[0][0].subjectUri, subjectUri2) 1647 + assert.equal(countsInserts[0][0].count, -1) 1648 + 1649 + await consumer.shutdown() 1650 + }) 1651 + })