open source is social v-it.org
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Jetstream firehose listener for org.v-it.hello events

Standalone Bun CLI tool that connects to the Bluesky Jetstream
WebSocket firehose, filters for custom record events, and prints
friendly console output. Supports --did, --collection, and --verbose
flags with auto-reconnect on disconnect.

+169
+169
firehose.js
··· 1 + #!/usr/bin/env bun 2 + // SPDX-License-Identifier: AGPL-3.0-only 3 + // Copyright (c) 2026 sol pbc 4 + 5 + import { readFileSync } from 'node:fs'; 6 + import { Command } from 'commander'; 7 + 8 + const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe'; 9 + const DEFAULT_COLLECTION = 'org.v-it.hello'; 10 + 11 + let ws = null; 12 + let shuttingDown = false; 13 + let backoff = 1000; 14 + 15 + function loadEnv() { 16 + const envPath = new URL('.env', import.meta.url).pathname; 17 + const vars = {}; 18 + let content; 19 + try { 20 + content = readFileSync(envPath, 'utf-8'); 21 + } catch { 22 + return vars; 23 + } 24 + for (const line of content.split('\n')) { 25 + const m = line.match(/^([A-Za-z_][A-Za-z0-9_]*)=(.*)/); 26 + if (m) vars[m[1]] = m[2]; 27 + } 28 + return vars; 29 + } 30 + 31 + function buildUrl(collection, did, cursor) { 32 + const url = new URL(JETSTREAM_URL); 33 + url.searchParams.set('wantedCollections', collection); 34 + if (did) url.searchParams.set('wantedDids', did); 35 + if (cursor) url.searchParams.set('cursor', cursor); 36 + return url.toString(); 37 + } 38 + 39 + function formatTime(timeUs) { 40 + return new Date(timeUs / 1000).toLocaleTimeString(); 41 + } 42 + 43 + function formatEvent(event) { 44 + const time = formatTime(event.time_us); 45 + const didShort = typeof event.did === 'string' ? event.did.slice(-12) : 'unknown'; 46 + 47 + if (event.kind === 'commit') { 48 + const operation = event.commit?.operation?.toUpperCase?.() ?? 'UNKNOWN'; 49 + const collection = event.commit?.collection ?? 'unknown'; 50 + const rkey = event.commit?.rkey ?? 'unknown'; 51 + 52 + if (operation === 'DELETE') { 53 + return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`; 54 + } 55 + 56 + const message = event.commit?.record?.message; 57 + if (typeof message === 'string') { 58 + return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey} — "${message}"`; 59 + } 60 + 61 + return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`; 62 + } 63 + 64 + if (event.kind === 'identity') { 65 + return `[${time}] IDENTITY ${didShort}`; 66 + } 67 + 68 + if (event.kind === 'account') { 69 + return `[${time}] ACCOUNT ${didShort} status=${event.account?.status}`; 70 + } 71 + 72 + return `[${time}] ${event.kind} from ${didShort}`; 73 + } 74 + 75 + function connect(opts, cursor) { 76 + const url = buildUrl(opts.collection, opts.did, cursor); 77 + let lastCursor = cursor; 78 + 79 + ws = new WebSocket(url); 80 + 81 + ws.onopen = () => { 82 + backoff = 1000; 83 + console.log(`Connected to ${url}`); 84 + }; 85 + 86 + ws.onmessage = (event) => { 87 + let msg; 88 + try { 89 + msg = JSON.parse(event.data); 90 + } catch { 91 + console.log('Warning: failed to parse message as JSON; skipping'); 92 + return; 93 + } 94 + 95 + if (msg.time_us) { 96 + lastCursor = String(msg.time_us); 97 + } 98 + 99 + if (opts.verbose) { 100 + console.log(JSON.stringify(msg, null, 2)); 101 + return; 102 + } 103 + 104 + console.log(formatEvent(msg)); 105 + }; 106 + 107 + ws.onclose = () => { 108 + if (shuttingDown) { 109 + return; 110 + } 111 + 112 + const delay = backoff; 113 + backoff = Math.min(backoff * 2, 30000); 114 + console.log(`Connection closed, reconnecting in ${delay}ms...`); 115 + setTimeout(() => connect(opts, lastCursor), delay); 116 + }; 117 + 118 + ws.onerror = (err) => { 119 + const message = err?.message ?? 'unknown error'; 120 + console.error(`WebSocket error: ${message}`); 121 + }; 122 + } 123 + 124 + async function main() { 125 + const program = new Command(); 126 + program 127 + .name('firehose') 128 + .description('Listen to Bluesky Jetstream firehose for custom record events') 129 + .option('-v, --verbose', 'Show full JSON for each event') 130 + .option('--did <did>', 'Filter by DID (reads BSKY_DID from .env if not provided)') 131 + .option('--collection <nsid>', 'Collection NSID to filter', DEFAULT_COLLECTION) 132 + .parse(); 133 + 134 + const opts = program.opts(); 135 + 136 + try { 137 + // Resolve DID from .env if not provided via flag 138 + if (!opts.did) { 139 + const env = loadEnv(); 140 + if (env.BSKY_DID) { 141 + opts.did = env.BSKY_DID; 142 + } 143 + } 144 + 145 + for (const sig of ['SIGINT', 'SIGTERM']) { 146 + process.on(sig, () => { 147 + shuttingDown = true; 148 + console.log('\nShutting down...'); 149 + if (ws) ws.close(); 150 + process.exit(0); 151 + }); 152 + } 153 + 154 + // Print startup banner 155 + const url = buildUrl(opts.collection, opts.did, null); 156 + console.log('Jetstream Firehose Listener'); 157 + console.log(` Collection: ${opts.collection}`); 158 + if (opts.did) console.log(` DID filter: ${opts.did}`); 159 + console.log(` Endpoint: ${url}`); 160 + console.log(' Ctrl+C to stop\n'); 161 + 162 + connect(opts, null); 163 + } catch (err) { 164 + console.error(err instanceof Error ? err.message : String(err)); 165 + process.exitCode = 1; 166 + } 167 + } 168 + 169 + await main();