A Product Hunt Clone for AtProto with an emphasis on on-proto community
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Node Jetstream counter script with ws dependency

Replace Bun TypeScript script with ESM (.mjs), WebSocket via ws, and
process.hrtime.bigint() for parse/type/handler timing. Ignore node_modules.

Made-with: Cursor

+196
+1
.gitignore
··· 1 1 /harvester 2 2 *.log 3 + node_modules/
+37
package-lock.json
··· 1 + { 2 + "name": "harvester.blue", 3 + "lockfileVersion": 3, 4 + "requires": true, 5 + "packages": { 6 + "": { 7 + "name": "harvester.blue", 8 + "dependencies": { 9 + "ws": "^8.18.0" 10 + }, 11 + "engines": { 12 + "node": ">=18" 13 + } 14 + }, 15 + "node_modules/ws": { 16 + "version": "8.20.0", 17 + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", 18 + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", 19 + "license": "MIT", 20 + "engines": { 21 + "node": ">=10.0.0" 22 + }, 23 + "peerDependencies": { 24 + "bufferutil": "^4.0.1", 25 + "utf-8-validate": ">=5.0.2" 26 + }, 27 + "peerDependenciesMeta": { 28 + "bufferutil": { 29 + "optional": true 30 + }, 31 + "utf-8-validate": { 32 + "optional": true 33 + } 34 + } 35 + } 36 + } 37 + }
+14
package.json
··· 1 + { 2 + "name": "harvester.blue", 3 + "private": true, 4 + "type": "module", 5 + "scripts": { 6 + "count-records-24h": "node scripts/count-records-24h.mjs" 7 + }, 8 + "engines": { 9 + "node": ">=18" 10 + }, 11 + "dependencies": { 12 + "ws": "^8.18.0" 13 + } 14 + }
+144
scripts/count-records-24h.mjs
··· 1 + import WebSocket from "ws"; 2 + 3 + const nowNs = () => process.hrtime.bigint(); 4 + 5 + const nowUs = Date.now() * 1000; 6 + const startUs = nowUs - 24 * 60 * 60 * 1_000_000; 7 + const endUs = nowUs; 8 + const windowUs = endUs - startUs; 9 + 10 + const wsUrl = `wss://jetstream2.us-east.bsky.network/subscribe?cursor=${startUs}`; 11 + 12 + let recordEvents = 0; 13 + let lastTimeUs = 0; 14 + 15 + const startedAt = Date.now(); 16 + let lastPrintedAt = 0; 17 + 18 + const types = new Set(); 19 + 20 + let parseNsTotal = 0n; 21 + let typeNsTotal = 0n; 22 + let handlerNsTotal = 0n; 23 + let parseCount = 0; 24 + let typeCount = 0; 25 + 26 + /** Average nanoseconds total → microseconds per sample */ 27 + const avgNsToUs = (totalNs, count) => 28 + count > 0 ? ((Number(totalNs) / count) / 1000).toFixed(1) : "0.0"; 29 + 30 + const printProgress = (final = false) => { 31 + const elapsedSec = Math.max(1, Math.floor((Date.now() - startedAt) / 1000)); 32 + const recordsPerSec = recordEvents / elapsedSec; 33 + const progressedUs = Math.max(0, lastTimeUs - startUs); 34 + const catchupPct = windowUs > 0 ? (progressedUs / windowUs) * 100 : 0; 35 + const eventHoursPerSec = progressedUs / 1_000_000 / 60 / 60 / elapsedSec; 36 + const lagHours = lastTimeUs ? ((endUs - lastTimeUs) / 1_000_000 / 60 / 60).toFixed(2) : "n/a"; 37 + const avgParseUs = avgNsToUs(parseNsTotal, parseCount); 38 + const avgTypeUs = avgNsToUs(typeNsTotal, typeCount); 39 + const avgHandlerUs = avgNsToUs(handlerNsTotal, recordEvents); 40 + const line = 41 + `records=${recordEvents} unique_types=${types.size} ` + 42 + `catchup=${catchupPct.toFixed(2)}% lag=${lagHours}h event_time_rate=${eventHoursPerSec.toFixed(3)}h/s ` + 43 + `records_rate=${recordsPerSec.toFixed(1)}/s avg_parse=${avgParseUs}us avg_type=${avgTypeUs}us avg_handler=${avgHandlerUs}us`; 44 + 45 + if (final) { 46 + process.stdout.write(`\n${line}\n`); 47 + return; 48 + } 49 + 50 + process.stdout.write(`\r${line}`); 51 + }; 52 + 53 + const done = (reason) => { 54 + printProgress(true); 55 + console.log(`done: ${reason}`); 56 + process.exit(0); 57 + }; 58 + 59 + console.log(`connecting: ${wsUrl}`); 60 + console.log("counting records from the last 24h window..."); 61 + 62 + const get$types = (record) => { 63 + if (typeof record === "object" && record !== null) { 64 + if (Array.isArray(record)) { 65 + for (const item of record) { 66 + get$types(item); 67 + } 68 + } else if ("$type" in record && typeof record["$type"] === "string") { 69 + types.add(record["$type"]); 70 + } 71 + } 72 + }; 73 + 74 + const messageDataToString = (data) => { 75 + if (typeof data === "string") return data; 76 + if (Buffer.isBuffer(data)) return data.toString("utf8"); 77 + if (data instanceof ArrayBuffer) return Buffer.from(data).toString("utf8"); 78 + return String(data); 79 + }; 80 + 81 + const ws = new WebSocket(wsUrl); 82 + 83 + ws.on("open", () => { 84 + console.log("connected"); 85 + }); 86 + 87 + ws.on("message", (data) => { 88 + const handlerStartNs = nowNs(); 89 + const raw = messageDataToString(data); 90 + 91 + let evt; 92 + const parseStartNs = nowNs(); 93 + try { 94 + evt = JSON.parse(raw); 95 + } catch { 96 + return; 97 + } 98 + parseNsTotal += nowNs() - parseStartNs; 99 + parseCount++; 100 + 101 + if (evt.kind === "commit" && evt.commit) { 102 + if (evt.commit.record !== undefined && evt.commit.record !== null) { 103 + const typeStartNs = nowNs(); 104 + recordEvents++; 105 + get$types(evt.commit.record); 106 + typeNsTotal += nowNs() - typeStartNs; 107 + typeCount++; 108 + handlerNsTotal += nowNs() - handlerStartNs; 109 + } 110 + } 111 + 112 + if (typeof evt.time_us === "number") { 113 + lastTimeUs = evt.time_us; 114 + if (lastTimeUs >= endUs) { 115 + done("caught up to last 24h window end"); 116 + return; 117 + } 118 + } 119 + 120 + if (Date.now() - lastPrintedAt >= 1000) { 121 + printProgress(false); 122 + lastPrintedAt = Date.now(); 123 + } 124 + }); 125 + 126 + ws.on("error", () => { 127 + printProgress(true); 128 + console.error("websocket error"); 129 + process.exit(1); 130 + }); 131 + 132 + ws.on("close", (code, reason) => { 133 + printProgress(true); 134 + const reasonStr = reason && reason.length ? reason.toString("utf8") : "n/a"; 135 + console.log(`socket closed: code=${code} reason=${reasonStr}`); 136 + if (lastTimeUs < endUs) { 137 + process.exit(1); 138 + } 139 + process.exit(0); 140 + }); 141 + 142 + process.on("SIGINT", () => { 143 + done("interrupted"); 144 + });