See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(task-9): address review findings for JetstreamConsumer

Fix 1 (Important): Test 13 now actually exercises the reconnect path by
adding a `reconnectDelay` seam to JetstreamConsumerDeps (defaults to a
real setTimeout-based delay, overridable in tests with instant resolve).
The test drives a full reconnect cycle and asserts URL[1] uses
`lastFlushedCursor` (200), not `pendingCursor` (300).

Fix 2 (Minor): Extract cursor read/write into `jetstream_cursor_io.ts`
so they're testable without importing from a commands/ file. `readCursor`
uses `knexRawQuery(...).options({ safeIntegers: true })` so better-sqlite3
returns INTEGER as BigInt. `writeCursor` passes the BigInt binding directly
(not `.toString()`). Adds a round-trip test verifying 2^60+42 survives
the write/read cycle without precision loss.

Fix 3 (Minor): Rename `cursorRefreshTimer` → `didRefreshTimer` — it has
nothing to do with cursors; it refreshes the tracked-DID set.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+122 -48
+16 -7
apps/web/app/services/jetstream_consumer.ts
··· 58 58 * Returns the current time. Injected so tests can control time. 59 59 */ 60 60 now: () => Date 61 + 62 + /** 63 + * Delays for the given number of milliseconds before reconnecting. 64 + * In production: real setTimeout-based delay. 65 + * In tests: override with (_ms) => Promise.resolve() for instant reconnect. 66 + */ 67 + reconnectDelay?: (ms: number) => Promise<void> 61 68 } 62 69 63 70 // --------------------------------------------------------------------------- ··· 86 93 private eventBuffer: EngagementEventRow[] = [] 87 94 private shutdownRequested = false 88 95 private ws: WebSocketLike | null = null 89 - private cursorRefreshTimer: ReturnType<typeof setInterval> | null = null 96 + private didRefreshTimer: ReturnType<typeof setInterval> | null = null 90 97 private bufferFlushTimer: ReturnType<typeof setInterval> | null = null 91 98 92 99 /** ··· 133 140 await this.connect() 134 141 135 142 // 3. Set up 1s tracked-DID refresh 136 - this.cursorRefreshTimer = setInterval(() => { 143 + this.didRefreshTimer = setInterval(() => { 137 144 void this.refreshTrackedDids() 138 145 }, 1000) 139 146 ··· 153 160 async shutdown(): Promise<void> { 154 161 this.shutdownRequested = true 155 162 156 - if (this.cursorRefreshTimer !== null) { 157 - clearInterval(this.cursorRefreshTimer) 158 - this.cursorRefreshTimer = null 163 + if (this.didRefreshTimer !== null) { 164 + clearInterval(this.didRefreshTimer) 165 + this.didRefreshTimer = null 159 166 } 160 167 if (this.bufferFlushTimer !== null) { 161 168 clearInterval(this.bufferFlushTimer) ··· 216 223 const delay = this.reconnectBackoffMs 217 224 this.reconnectBackoffMs = Math.min(this.reconnectBackoffMs * 2, 30_000) 218 225 219 - setTimeout(() => { 226 + const delayFn = 227 + this.deps.reconnectDelay ?? ((ms: number) => new Promise<void>((r) => setTimeout(r, ms))) 228 + void delayFn(delay).then(() => { 220 229 if (!this.shutdownRequested) { 221 230 void this.connect() 222 231 } 223 - }, delay) 232 + }) 224 233 } 225 234 226 235 ws.onerror = (err) => {
+37
apps/web/app/services/jetstream_cursor_io.ts
··· 1 + import db from '@adonisjs/lucid/services/db' 2 + 3 + /** 4 + * Reads the last durable Jetstream cursor from SQLite. 5 + * 6 + * Uses `options({ safeIntegers: true })` on the underlying knex raw query so 7 + * that better-sqlite3 returns the INTEGER column as a native BigInt — avoiding 8 + * the precision loss that occurs when a 64-bit microsecond timestamp is 9 + * silently cast to a JS `number` (which only has 53 bits of mantissa). 10 + * 11 + * Returns null if no cursor row exists yet. 12 + */ 13 + export async function readCursor(): Promise<bigint | null> { 14 + // Use the knex raw query directly so we can pass { safeIntegers: true }, 15 + // which instructs better-sqlite3 to return INTEGER columns as BigInt. 16 + const rows = (await db 17 + .connection() 18 + .knexRawQuery('SELECT cursor_us FROM jetstream_cursor WHERE id = 1 LIMIT 1') 19 + .options({ safeIntegers: true })) as Array<{ cursor_us: bigint }> 20 + 21 + if (!rows || rows.length === 0) return null 22 + return rows[0].cursor_us 23 + } 24 + 25 + /** 26 + * Durably writes the Jetstream cursor to SQLite. 27 + * 28 + * Passes `cursor` as a BigInt binding directly — better-sqlite3 accepts BigInt 29 + * bind parameters natively, which avoids precision loss for values above 2^53. 30 + */ 31 + export async function writeCursor(cursor: bigint): Promise<void> { 32 + const nowMs = Date.now() 33 + await db.rawQuery( 34 + 'INSERT OR REPLACE INTO jetstream_cursor (id, cursor_us, updated_at) VALUES (1, ?, ?)', 35 + [cursor, nowMs] 36 + ) 37 + }
+3 -23
apps/web/commands/jetstream_consume.ts
··· 2 2 import db from '@adonisjs/lucid/services/db' 3 3 import { ClickHouseStore } from '@skystar/clickhouse' 4 4 import { JetstreamConsumer } from '#services/jetstream_consumer' 5 + import { readCursor, writeCursor } from '#services/jetstream_cursor_io' 5 6 6 7 /** 7 8 * Ace command: node ace jetstream:consume ··· 55 56 return new Set<string>(rows.map((r: { did: string }) => r.did)) 56 57 }, 57 58 58 - // Read the persisted Jetstream cursor from SQLite via raw query. 59 - // We use a raw query (not the Lucid model) because the cursor_us column 60 - // is a 64-bit integer and Lucid returns JS numbers that lose precision 61 - // above 2^53. The raw @adonisjs/lucid db.rawQuery returns the value as 62 - // a JS number from better-sqlite3; we convert via BigInt to preserve it. 63 - readCursor: async () => { 64 - const result = await db.rawQuery( 65 - 'SELECT cursor_us FROM jetstream_cursor WHERE id = 1 LIMIT 1' 66 - ) 67 - const rows = result as unknown as Array<{ cursor_us: number | bigint }> 68 - if (!rows || rows.length === 0) return null 69 - const raw = rows[0].cursor_us 70 - return typeof raw === 'bigint' ? raw : BigInt(Math.floor(Number(raw))) 71 - }, 72 - 73 - // Write the cursor to SQLite via raw query (INSERT OR REPLACE singleton row). 74 - writeCursor: async (cursor: bigint) => { 75 - const nowMs = Date.now() 76 - await db.rawQuery( 77 - 'INSERT OR REPLACE INTO jetstream_cursor (id, cursor_us, updated_at) VALUES (1, ?, ?)', 78 - [cursor.toString(), nowMs] 79 - ) 80 - }, 59 + readCursor, 60 + writeCursor, 81 61 82 62 now: () => new Date(), 83 63 })
+30 -18
apps/web/tests/unit/services/jetstream_consumer.spec.ts
··· 168 168 consumer: JetstreamConsumer 169 169 writeCursorCalls: bigint[] 170 170 lastConnectedUrl: { value: string } 171 + connectedUrls: string[] 171 172 } 172 173 173 174 function makeConsumer( ··· 175 176 trackedDids?: Set<string> 176 177 initialCursor?: bigint | null 177 178 storeShouldThrow?: boolean 179 + instantReconnect?: boolean 178 180 } = {} 179 181 ): ConsumerSetup { 180 182 const { 181 183 trackedDids = new Set([TRACKED_AUTHOR_DID]), 182 184 initialCursor = null, 183 185 storeShouldThrow = false, 186 + instantReconnect = false, 184 187 } = options 185 188 186 189 const fakeWs = new FakeWebSocket() 187 190 const store = makeFakeStore({ shouldThrow: storeShouldThrow }) 188 191 const writeCursorCalls: bigint[] = [] 189 192 const lastConnectedUrl: { value: string } = { value: '' } 193 + const connectedUrls: string[] = [] 190 194 191 195 let currentTrackedDids = trackedDids 192 196 ··· 194 198 createWebSocket(url: string) { 195 199 lastConnectedUrl.value = url 196 200 fakeWs.constructedUrl = url 201 + connectedUrls.push(url) 197 202 return fakeWs 198 203 }, 199 204 readTrackedDids: async () => currentTrackedDids, ··· 202 207 writeCursorCalls.push(cursor) 203 208 }, 204 209 now: () => new Date('2024-09-09T19:46:00.000Z'), 210 + ...(instantReconnect ? { reconnectDelay: (_ms: number) => Promise.resolve() } : {}), 205 211 } 206 212 207 213 const consumer = new JetstreamConsumer(store, deps) ··· 211 217 currentTrackedDids = newSet 212 218 } 213 219 214 - return { fakeWs, store, consumer, writeCursorCalls, lastConnectedUrl } 220 + return { fakeWs, store, consumer, writeCursorCalls, lastConnectedUrl, connectedUrls } 215 221 } 216 222 217 223 // --------------------------------------------------------------------------- ··· 487 493 test('reconnect URL contains the last durable cursor, not the in-memory pending one', async ({ 488 494 assert, 489 495 }) => { 490 - const durableCursor = 999_000_000n 491 - const { fakeWs, consumer, lastConnectedUrl } = makeConsumer({ 492 - initialCursor: durableCursor, 496 + // Use instantReconnect so the reconnect fires without a real 1s delay 497 + const { fakeWs, consumer, connectedUrls } = makeConsumer({ 498 + initialCursor: 100n, 499 + instantReconnect: true, 493 500 }) 494 501 495 502 await startConsumerInBackground(consumer) 496 503 497 - // Verify initial connection URL contains the durable cursor 498 - assert.include(lastConnectedUrl.value, `cursor=${durableCursor.toString()}`) 504 + // URL[0]: initial connect should use the initial cursor (100) 505 + assert.equal(connectedUrls.length, 1) 506 + assert.include(connectedUrls[0], 'cursor=100') 499 507 500 - // Emit an event with a new time_us (in-memory cursor advances, but not yet flushed) 501 - fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID, 1725911162329308, 'rkey1')) 508 + // Flush an event at time_us=200 — this advances lastFlushedCursor to 200n 509 + fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID, 200, 'rkey-flushed')) 510 + await consumer.flushBuffer() 502 511 503 - // Simulate unexpected WebSocket close BEFORE flushing 504 - // (so lastFlushedCursor is still durableCursor) 505 - const urlBeforeReconnect = lastConnectedUrl.value 512 + // Buffer another event at time_us=300 but DON'T flush — pendingCursor=300n, 513 + // but lastFlushedCursor is still 200n 514 + fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID, 300, 'rkey-pending')) 515 + 516 + // Simulate unexpected WebSocket disconnect 506 517 fakeWs.emitClose(1006) 507 518 508 - // Give the reconnect setTimeout(0 delay would be 1s, but we can check the 509 - // URL was set from the initial connect) — in this test we just verify that 510 - // the initial URL contained the durable cursor value, and that if we called 511 - // buildUrl again it would still use lastFlushedCursor (durableCursor). 512 - // The actual reconnect with backoff uses setTimeout(1000) so we can't easily 513 - // test it without fake timers. Instead, verify the initial URL. 514 - assert.include(urlBeforeReconnect, `cursor=${durableCursor.toString()}`) 519 + // With instantReconnect, the reconnect fires immediately. 520 + // Give the microtask queue a tick to let the Promise chain resolve. 521 + await new Promise((r) => setTimeout(r, 0)) 522 + 523 + // URL[1]: reconnect must use lastFlushedCursor (200), NOT pendingCursor (300) 524 + assert.equal(connectedUrls.length, 2, 'WebSocket factory should have been called twice') 525 + assert.include(connectedUrls[1], 'cursor=200', 'reconnect should resume from lastFlushedCursor') 526 + assert.notInclude(connectedUrls[1], 'cursor=300', 'reconnect must NOT use pendingCursor') 515 527 516 528 await consumer.shutdown() 517 529 })
+36
apps/web/tests/unit/services/jetstream_cursor_io.spec.ts
··· 1 + /** 2 + * Unit tests for jetstream_cursor_io — the SQLite cursor read/write helpers. 3 + * 4 + * These tests use the real Adonis SQLite database (via testUtils migrations) 5 + * to verify that the cursor round-trips correctly, including for BigInt values 6 + * above 2^53 that would lose precision if handled as JS numbers. 7 + */ 8 + import { test } from '@japa/runner' 9 + import testUtils from '@adonisjs/core/services/test_utils' 10 + import { readCursor, writeCursor } from '#services/jetstream_cursor_io' 11 + 12 + test.group('JetstreamCursorIO — BigInt round-trip', (group) => { 13 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 14 + 15 + test('returns null when no cursor row exists', async ({ assert }) => { 16 + const result = await readCursor() 17 + assert.isNull(result) 18 + }) 19 + 20 + test('cursor round-trips a normal value', async ({ assert }) => { 21 + const value = 1725911162329308n 22 + await writeCursor(value) 23 + const read = await readCursor() 24 + assert.equal(read, value) 25 + }) 26 + 27 + test('cursor round-trips BigInt values above 2^53', async ({ assert }) => { 28 + // 2^60 + 42 — way above Number.MAX_SAFE_INTEGER (2^53 - 1). 29 + // A naive cursor.toString() → store → BigInt(Number(row)) round-trip would 30 + // silently lose precision (the last ~7 digits would be wrong). 31 + const big = (1n << 60n) + 42n 32 + await writeCursor(big) 33 + const read = await readCursor() 34 + assert.equal(read, big) 35 + }) 36 + })