See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add firehose:watch ace command and fix lint

The ace command wires the FirehoseConsumer to the container-resolved
ClickHouseStore and cursor IO, then blocks on start() with SIGTERM/SIGINT
handlers that flush buffers cleanly before exit. URL resolution prefers
FIREHOSE_JETSTREAM_URL → JETSTREAM_URL → the default jetstream2.us-east
endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+67 -19
+1 -3
app/services/firehose_consumer.ts
··· 62 62 * Resolves the jetstream URL to subscribe to. 63 63 * Preference order: FIREHOSE_JETSTREAM_URL → JETSTREAM_URL → default. 64 64 */ 65 - export function resolveFirehoseJetstreamUrl( 66 - env: NodeJS.ProcessEnv = process.env 67 - ): string { 65 + export function resolveFirehoseJetstreamUrl(env: NodeJS.ProcessEnv = process.env): string { 68 66 return env.FIREHOSE_JETSTREAM_URL ?? env.JETSTREAM_URL ?? DEFAULT_JETSTREAM_URL 69 67 } 70 68
+59
commands/firehose_watch.ts
··· 1 + import { BaseCommand } from '@adonisjs/core/ace' 2 + import { ClickHouseStore } from '#lib/clickhouse/index' 3 + import { FirehoseConsumer, resolveFirehoseJetstreamUrl } from '#services/firehose_consumer' 4 + import { readCursor, writeCursor } from '#services/firehose_cursor_io' 5 + 6 + /** 7 + * Ace command: node ace firehose:watch 8 + * 9 + * Connects to the Bluesky jetstream and streams the unfiltered 10 + * app.bsky.feed.like collection into ClickHouse so the virality-threshold 11 + * job can detect posts crossing configurable like counts. 12 + * 13 + * Long-lived daemon process — uses `staysAlive: true` so Adonis does not 14 + * exit when `run()` returns. The process remains alive until shutdown() 15 + * is called (on SIGTERM/SIGINT), which causes start() to resolve. 16 + */ 17 + export default class FirehoseWatch extends BaseCommand { 18 + static commandName = 'firehose:watch' 19 + static description = 20 + 'Consume the unfiltered app.bsky.feed.like jetstream into ClickHouse (virality)' 21 + 22 + static options = { startApp: true, staysAlive: true } 23 + 24 + async run() { 25 + const clickHouseStore = await this.app.container.make(ClickHouseStore) 26 + 27 + const consumer = new FirehoseConsumer({ 28 + createWebSocket: (url: string) => new WebSocket(url), 29 + readCursor, 30 + writeCursor, 31 + lookupSubjectUri: (likeUri: string) => clickHouseStore.lookupSubjectUri(likeUri), 32 + insertLookupRows: (rows) => clickHouseStore.insertLikeEventsLookup(rows), 33 + insertCountsDeltas: (rows) => clickHouseStore.insertLikeCountsDailyDeltas(rows), 34 + now: () => new Date(), 35 + }) 36 + 37 + const handleSignal = () => { 38 + this.logger.info('Shutdown signal received — flushing and closing...') 39 + consumer 40 + .shutdown() 41 + .then(() => { 42 + this.logger.info('FirehoseConsumer shut down cleanly') 43 + process.exit(0) 44 + }) 45 + .catch((err: unknown) => { 46 + const message = err instanceof Error ? err.message : String(err) 47 + this.logger.error(`Error during shutdown: ${message}`) 48 + process.exit(1) 49 + }) 50 + } 51 + 52 + process.on('SIGTERM', handleSignal) 53 + process.on('SIGINT', handleSignal) 54 + 55 + this.logger.info(`Starting FirehoseConsumer (URL=${resolveFirehoseJetstreamUrl()})`) 56 + 57 + await consumer.start() 58 + } 59 + }
+3 -5
tests/unit/atproto/jetstream.spec.ts
··· 368 368 test('parses a like delete into a LikeDeleteEvent', ({ assert }) => { 369 369 const result = parseJetstreamEvent(LIKE_DELETE_EVENT) 370 370 if (result === null) return assert.fail('expected LikeDeleteEvent, got null') 371 - if (result.kind !== 'like-delete') return assert.fail(`expected kind=like-delete, got ${result.kind}`) 371 + if (result.kind !== 'like-delete') 372 + return assert.fail(`expected kind=like-delete, got ${result.kind}`) 372 373 assert.equal(result.actorDid, 'did:plc:unlikingactor') 373 374 assert.equal(result.rkey, '3l3likedeletedkey') 374 - assert.equal( 375 - result.likeUri, 376 - 'at://did:plc:unlikingactor/app.bsky.feed.like/3l3likedeletedkey' 377 - ) 375 + assert.equal(result.likeUri, 'at://did:plc:unlikingactor/app.bsky.feed.like/3l3likedeletedkey') 378 376 assert.instanceOf(result.ingestedAt, Date) 379 377 }) 380 378
+3 -7
tests/unit/clickhouse_store.spec.ts
··· 902 902 903 903 // FINAL forces the merge so we can assert on the net count 904 904 const rs = await store.client.query({ 905 - query: 906 - 'SELECT sum(count) AS c FROM like_counts_daily WHERE subject_uri = {s: String}', 905 + query: 'SELECT sum(count) AS c FROM like_counts_daily WHERE subject_uri = {s: String}', 907 906 format: 'JSONEachRow', 908 907 query_params: { s: subjectUri }, 909 908 }) ··· 918 917 const d = new Date( 919 918 Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate(), 23, 30, 0) 920 919 ) 921 - const expectedDay = `${d.getUTCFullYear().toString().padStart(4, '0')}-${( 922 - d.getUTCMonth() + 1 923 - ) 920 + const expectedDay = `${d.getUTCFullYear().toString().padStart(4, '0')}-${(d.getUTCMonth() + 1) 924 921 .toString() 925 922 .padStart(2, '0')}-${d.getUTCDate().toString().padStart(2, '0')}` 926 923 927 924 await store.insertLikeCountsDailyDeltas([{ subjectUri, day: d, count: 1 }]) 928 925 929 926 const rs = await store.client.query({ 930 - query: 931 - 'SELECT toString(day) AS day FROM like_counts_daily WHERE subject_uri = {s: String}', 927 + query: 'SELECT toString(day) AS day FROM like_counts_daily WHERE subject_uri = {s: String}', 932 928 format: 'JSONEachRow', 933 929 query_params: { s: subjectUri }, 934 930 })
+1 -4
tests/unit/services/firehose_consumer.spec.ts
··· 186 186 187 187 assert.equal(lookupInserts.length, 1) 188 188 assert.equal(lookupInserts[0].length, 1) 189 - assert.equal( 190 - lookupInserts[0][0].likeUri, 191 - `at://${ACTOR_DID}/app.bsky.feed.like/likerkeyaaa` 192 - ) 189 + assert.equal(lookupInserts[0][0].likeUri, `at://${ACTOR_DID}/app.bsky.feed.like/likerkeyaaa`) 193 190 assert.equal( 194 191 lookupInserts[0][0].subjectUri, 195 192 `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey001`