A Product Hunt Clone for AtProto with an emphasis on on-proto community
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Deno Jetstream counter script and deno task

Mirror count-records-24h.mjs using native WebSocket, Deno.stdout, and
performance.now()-based timing; deno.json task with --allow-net.

Made-with: Cursor

+170
+5
deno.json
··· 1 + { 2 + "tasks": { 3 + "count-records-24h-deno": "deno run --allow-net ./scripts/count-records-24h.deno.ts" 4 + } 5 + }
+165
scripts/count-records-24h.deno.ts
··· 1 + /** 2 + * Deno port of `count-records-24h.mjs`: Jetstream last-24h replay with parse/type timing. 3 + * 4 + * Run: deno task count-records-24h-deno 5 + * Or: deno run --allow-net ./scripts/count-records-24h.deno.ts 6 + */ 7 + const encoder = new TextEncoder(); 8 + 9 + /** Monotonic time in nanoseconds (from `performance.now()`, sub-ms resolution where available). */ 10 + const nowNs = () => performance.now() * 1e6; 11 + 12 + const nowUs = Date.now() * 1000; 13 + const startUs = nowUs - 24 * 60 * 60 * 1_000_000; 14 + const endUs = nowUs; 15 + const windowUs = endUs - startUs; 16 + 17 + const wsUrl = `wss://jetstream2.us-east.bsky.network/subscribe?cursor=${startUs}`; 18 + 19 + type JetstreamEvent = { 20 + kind?: string; 21 + time_us?: number; 22 + commit?: { 23 + operation?: string; 24 + record?: unknown; 25 + }; 26 + }; 27 + 28 + let recordEvents = 0; 29 + let lastTimeUs = 0; 30 + 31 + const startedAt = Date.now(); 32 + let lastPrintedAt = 0; 33 + 34 + const types = new Set<string>(); 35 + 36 + let parseNsTotal = 0; 37 + let typeNsTotal = 0; 38 + let handlerNsTotal = 0; 39 + let parseCount = 0; 40 + let typeCount = 0; 41 + 42 + const avgNsToUs = (totalNs: number, count: number) => 43 + count > 0 ? ((totalNs / count) / 1000).toFixed(1) : "0.0"; 44 + 45 + const printProgress = async (final = false) => { 46 + const elapsedSec = Math.max(1, Math.floor((Date.now() - startedAt) / 1000)); 47 + const recordsPerSec = recordEvents / elapsedSec; 48 + const progressedUs = Math.max(0, lastTimeUs - startUs); 49 + const catchupPct = windowUs > 0 ? (progressedUs / windowUs) * 100 : 0; 50 + const eventHoursPerSec = progressedUs / 1_000_000 / 60 / 60 / elapsedSec; 51 + const lagHours = lastTimeUs ? ((endUs - lastTimeUs) / 1_000_000 / 60 / 60).toFixed(2) : "n/a"; 52 + const avgParseUs = avgNsToUs(parseNsTotal, parseCount); 53 + const avgTypeUs = avgNsToUs(typeNsTotal, typeCount); 54 + const avgHandlerUs = avgNsToUs(handlerNsTotal, recordEvents); 55 + const line = 56 + `records=${recordEvents} unique_types=${types.size} ` + 57 + `catchup=${catchupPct.toFixed(2)}% lag=${lagHours}h event_time_rate=${eventHoursPerSec.toFixed(3)}h/s ` + 58 + `records_rate=${recordsPerSec.toFixed(1)}/s avg_parse=${avgParseUs}us avg_type=${avgTypeUs}us avg_handler=${avgHandlerUs}us`; 59 + 60 + if (final) { 61 + await Deno.stdout.write(encoder.encode(`\n${line}\n`)); 62 + return; 63 + } 64 + 65 + await Deno.stdout.write(encoder.encode(`\r${line}`)); 66 + }; 67 + 68 + const done = async (reason: string) => { 69 + await printProgress(true); 70 + console.log(`done: ${reason}`); 71 + Deno.exit(0); 72 + }; 73 + 74 + async function messageDataToString(data: unknown): Promise<string> { 75 + if (typeof data === "string") return data; 76 + if (data instanceof Blob) return await data.text(); 77 + if (data instanceof ArrayBuffer) return new TextDecoder().decode(data); 78 + if (ArrayBuffer.isView(data)) { 79 + const view = data as ArrayBufferView; 80 + return new TextDecoder().decode( 81 + new Uint8Array(view.buffer, view.byteOffset, view.byteLength), 82 + ); 83 + } 84 + return String(data); 85 + } 86 + 87 + const get$types = (record: unknown) => { 88 + if (typeof record === "object" && record !== null) { 89 + if (Array.isArray(record)) { 90 + for (const item of record) { 91 + get$types(item); 92 + } 93 + } else if ("$type" in record && typeof (record as Record<string, unknown>)["$type"] === "string") { 94 + types.add((record as Record<string, string>)["$type"]); 95 + } 96 + } 97 + }; 98 + 99 + console.log(`connecting: ${wsUrl}`); 100 + console.log("counting records from the last 24h window..."); 101 + 102 + const ws = new WebSocket(wsUrl); 103 + 104 + ws.addEventListener("open", () => { 105 + console.log("connected"); 106 + }); 107 + 108 + ws.addEventListener("message", async (ev) => { 109 + const handlerStartNs = nowNs(); 110 + const raw = await messageDataToString(ev.data); 111 + 112 + let evt: JetstreamEvent; 113 + const parseStartNs = nowNs(); 114 + try { 115 + evt = JSON.parse(raw) as JetstreamEvent; 116 + } catch { 117 + return; 118 + } 119 + parseNsTotal += nowNs() - parseStartNs; 120 + parseCount++; 121 + 122 + if (evt.kind === "commit" && evt.commit) { 123 + if (evt.commit.record !== undefined && evt.commit.record !== null) { 124 + const typeStartNs = nowNs(); 125 + recordEvents++; 126 + get$types(evt.commit.record); 127 + typeNsTotal += nowNs() - typeStartNs; 128 + typeCount++; 129 + handlerNsTotal += nowNs() - handlerStartNs; 130 + } 131 + } 132 + 133 + if (typeof evt.time_us === "number") { 134 + lastTimeUs = evt.time_us; 135 + if (lastTimeUs >= endUs) { 136 + await done("caught up to last 24h window end"); 137 + return; 138 + } 139 + } 140 + 141 + if (Date.now() - lastPrintedAt >= 1000) { 142 + await printProgress(false); 143 + lastPrintedAt = Date.now(); 144 + } 145 + }); 146 + 147 + ws.addEventListener("error", async () => { 148 + await printProgress(true); 149 + console.error("websocket error"); 150 + Deno.exit(1); 151 + }); 152 + 153 + ws.addEventListener("close", async (ev) => { 154 + await printProgress(true); 155 + const reasonStr = ev.reason || "n/a"; 156 + console.log(`socket closed: code=${ev.code} reason=${reasonStr}`); 157 + if (lastTimeUs < endUs) { 158 + Deno.exit(1); 159 + } 160 + Deno.exit(0); 161 + }); 162 + 163 + Deno.addSignalListener("SIGINT", () => { 164 + void done("interrupted"); 165 + });