See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add firehose cursor persistence

Introduce a dedicated SQLite singleton row (firehose_cursor) and
BigInt-safe read/write helpers so the firehose worker can checkpoint
its jetstream cursor independently from the main jetstream consumer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+91
+38
app/services/firehose_cursor_io.ts
··· 1 + import db from '@adonisjs/lucid/services/db' 2 + 3 + /** 4 + * Reads the last durable firehose cursor from SQLite. 5 + * 6 + * Uses `options({ safeIntegers: true })` on the underlying knex raw query so 7 + * that better-sqlite3 returns the INTEGER column as a native BigInt — avoiding 8 + * the precision loss that occurs when a 64-bit microsecond timestamp is 9 + * silently cast to a JS `number` (which only has 53 bits of mantissa). 10 + * 11 + * Returns null if no cursor row exists yet. 12 + * 13 + * The firehose worker uses its own independent cursor row (separate table 14 + * from jetstream_cursor) so that the two workers can checkpoint independently. 15 + */ 16 + export async function readCursor(): Promise<bigint | null> { 17 + const rows = (await db 18 + .connection() 19 + .knexRawQuery('SELECT cursor_us FROM firehose_cursor WHERE id = 1 LIMIT 1') 20 + .options({ safeIntegers: true })) as Array<{ cursor_us: bigint }> 21 + 22 + if (!rows || rows.length === 0) return null 23 + return rows[0].cursor_us 24 + } 25 + 26 + /** 27 + * Durably writes the firehose cursor to SQLite. 28 + * 29 + * Passes `cursor` as a BigInt binding directly — better-sqlite3 accepts BigInt 30 + * bind parameters natively, which avoids precision loss for values above 2^53. 31 + */ 32 + export async function writeCursor(cursor: bigint): Promise<void> { 33 + const nowMs = Date.now() 34 + await db.rawQuery( 35 + 'INSERT OR REPLACE INTO firehose_cursor (id, cursor_us, updated_at) VALUES (1, ?, ?)', 36 + [cursor, nowMs] 37 + ) 38 + }
+19
database/migrations/1776168721386_create_firehose_cursor.ts
··· 1 + import { BaseSchema } from '@adonisjs/lucid/schema' 2 + 3 + export default class extends BaseSchema { 4 + protected tableName = 'firehose_cursor' 5 + 6 + async up() { 7 + this.schema.createTable(this.tableName, (table) => { 8 + // Singleton table: only one row with id=1 is ever allowed. 9 + table.integer('id').primary() 10 + table.check('?? = 1', ['id'], 'firehose_cursor_singleton') 11 + table.integer('cursor_us').notNullable() 12 + table.integer('updated_at').notNullable() 13 + }) 14 + } 15 + 16 + async down() { 17 + this.schema.dropTable(this.tableName) 18 + } 19 + }
+34
tests/unit/services/firehose_cursor_io.spec.ts
··· 1 + /** 2 + * Unit tests for firehose_cursor_io — the SQLite cursor read/write helpers 3 + * for the firehose worker. 4 + * 5 + * Mirrors tests/unit/services/jetstream_cursor_io.spec.ts — the firehose 6 + * cursor uses an independent SQLite row so it can be checkpointed without 7 + * interfering with the existing jetstream consumer cursor. 8 + */ 9 + import { test } from '@japa/runner' 10 + import testUtils from '@adonisjs/core/services/test_utils' 11 + import { readCursor, writeCursor } from '#services/firehose_cursor_io' 12 + 13 + test.group('FirehoseCursorIO — BigInt round-trip', (group) => { 14 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 15 + 16 + test('returns null when no cursor row exists', async ({ assert }) => { 17 + const result = await readCursor() 18 + assert.isNull(result) 19 + }) 20 + 21 + test('cursor round-trips a normal value', async ({ assert }) => { 22 + const value = 1725911162329308n 23 + await writeCursor(value) 24 + const read = await readCursor() 25 + assert.equal(read, value) 26 + }) 27 + 28 + test('cursor round-trips BigInt values above 2^53', async ({ assert }) => { 29 + const big = (1n << 60n) + 42n 30 + await writeCursor(big) 31 + const read = await readCursor() 32 + assert.equal(read, big) 33 + }) 34 + })