tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: use a separate thread for jetstream

dusk 866b7f05 39da75b0

+110 -64
+3 -4
bun.lock
··· 9 9 "devDependencies": { 10 10 "@eslint/compat": "^1.2.5", 11 11 "@eslint/js": "^9.18.0", 12 - "@sveltejs/adapter-auto": "^6.0.0", 13 12 "@sveltejs/kit": "^2.22.0", 14 13 "@sveltejs/vite-plugin-svelte": "^6.0.0", 15 14 "@tailwindcss/vite": "^4.0.0", 16 - "bun-types": "^1.2.18", 15 + "@types/bun": "^1.2.18", 17 16 "eslint": "^9.18.0", 18 17 "eslint-plugin-svelte": "^3.0.0", 19 18 "globals": "^16.0.0", ··· 180 179 181 180 "@sveltejs/acorn-typescript": ["@sveltejs/acorn-typescript@1.0.5", "", { "peerDependencies": { "acorn": "^8.9.0" } }, "sha512-IwQk4yfwLdibDlrXVE04jTZYlLnwsTT2PIOQQGNLWfjavGifnk1JD1LcZjZaBTRcxZu2FfPfNLOE04DSu9lqtQ=="], 182 181 183 - "@sveltejs/adapter-auto": ["@sveltejs/adapter-auto@6.0.1", "", { "peerDependencies": { "@sveltejs/kit": "^2.0.0" } }, "sha512-mcWud3pYGPWM2Pphdj8G9Qiq24nZ8L4LB7coCUckUEy5Y7wOWGJ/enaZ4AtJTcSm5dNK1rIkBRoqt+ae4zlxcQ=="], 184 - 185 182 "@sveltejs/kit": ["@sveltejs/kit@2.25.1", "", { "dependencies": { "@sveltejs/acorn-typescript": "^1.0.5", "@types/cookie": "^0.6.0", "acorn": "^8.14.1", "cookie": "^0.6.0", "devalue": "^5.1.0", "esm-env": "^1.2.2", "kleur": "^4.1.5", "magic-string": "^0.30.5", "mrmime": "^2.0.0", "sade": "^1.8.1", "set-cookie-parser": "^2.6.0", "sirv": "^3.0.0" }, "peerDependencies": { "@sveltejs/vite-plugin-svelte": "^3.0.0 || ^4.0.0-next.1 || ^5.0.0 || ^6.0.0-next.0", "svelte": "^4.0.0 || ^5.0.0-next.0", "vite": "^5.0.3 || ^6.0.0 || ^7.0.0-beta.0" }, "bin": { "svelte-kit": "svelte-kit.js" } }, "sha512-8H+fxDEp7Xq6tLFdrGdS5fLu6ONDQQ9DgyjboXpChubuFdfH9QoFX09ypssBpyNkJNZFt9eW3yLmXIc9CesPCA=="], 186 183 187 184 "@sveltejs/vite-plugin-svelte": ["@sveltejs/vite-plugin-svelte@6.1.0", "", { "dependencies": { "@sveltejs/vite-plugin-svelte-inspector": "^5.0.0-next.1", "debug": "^4.4.1", "deepmerge": "^4.3.1", "kleur": "^4.1.5", "magic-string": "^0.30.17", "vitefu": "^1.1.1" }, "peerDependencies": { "svelte": "^5.0.0", "vite": "^6.3.0 || ^7.0.0" } }, "sha512-+U6lz1wvGEG/BvQyL4z/flyNdQ9xDNv5vrh+vWBWTHaebqT0c9RNggpZTo/XSPoHsSCWBlYaTlRX8pZ9GATXCw=="], ··· 217 214 "@tailwindcss/oxide-win32-x64-msvc": ["@tailwindcss/oxide-win32-x64-msvc@4.1.11", "", { "os": "win32", "cpu": "x64" }, "sha512-YfHoggn1j0LK7wR82TOucWc5LDCguHnoS879idHekmmiR7g9HUtMw9MI0NHatS28u/Xlkfi9w5RJWgz2Dl+5Qg=="], 218 215 219 216 "@tailwindcss/vite": ["@tailwindcss/vite@4.1.11", "", { "dependencies": { "@tailwindcss/node": "4.1.11", "@tailwindcss/oxide": "4.1.11", "tailwindcss": "4.1.11" }, "peerDependencies": { "vite": "^5.2.0 || ^6 || ^7" } }, "sha512-RHYhrR3hku0MJFRV+fN2gNbDNEh3dwKvY8XJvTxCSXeMOsCRSr+uKvDWQcbizrHgjML6ZmTE5OwMrl5wKcujCw=="], 217 + 218 + "@types/bun": ["@types/bun@1.2.18", "", { "dependencies": { "bun-types": "1.2.18" } }, "sha512-Xf6RaWVheyemaThV0kUfaAUvCNokFr+bH8Jxp+tTZfx7dAPA8z9ePnP9S9+Vspzuxxx9JRAXhnyccRj3GyCMdQ=="], 220 219 221 220 "@types/cookie": ["@types/cookie@0.6.0", "", {}, "sha512-4Kh9a6B2bQciAhf7FSuMRRkUWecJgJu9nPnx3yzpsfXX/c50REIqpHY4C82bXP90qrLtXtkDxTZosYO3UpOwlA=="], 222 221
+1 -2
package.json
··· 15 15 "devDependencies": { 16 16 "@eslint/compat": "^1.2.5", 17 17 "@eslint/js": "^9.18.0", 18 - "@sveltejs/adapter-auto": "^6.0.0", 19 18 "@sveltejs/kit": "^2.22.0", 20 19 "@sveltejs/vite-plugin-svelte": "^6.0.0", 21 20 "@tailwindcss/vite": "^4.0.0", 22 - "bun-types": "^1.2.18", 21 + "@types/bun": "^1.2.18", 23 22 "eslint": "^9.18.0", 24 23 "eslint-plugin-svelte": "^3.0.0", 25 24 "globals": "^16.0.0",
+10 -6
src/hooks.server.ts
··· 1 - import { track, writeEvents } from "$lib/jetstream.js"; 1 + import { eventTracker } from "$lib/db"; 2 + import { exit as workerExit, start } from "$lib/worker_manager.js"; 2 3 3 - // Start tracking when the server starts 4 - track().catch(console.error); 5 - process.on("SIGINT", writeEvents); 6 - process.on("SIGTERM", writeEvents); 7 - process.on("SIGQUIT", writeEvents); 4 + const exit = () => { 5 + workerExit(); 6 + eventTracker.exit(); 7 + }; 8 + start(); 9 + 10 + process.on("SIGINT", exit); 11 + process.on("SIGTERM", exit);
+42 -25
src/lib/db.ts
··· 1 1 import { Database } from "bun:sqlite"; 2 - import {env} from 'process'; 2 + import { env } from "process"; 3 + import type { WorkerEventData } from "./types"; 3 4 4 5 export interface EventRecord { 5 6 nsid: string; ··· 17 18 private updateCountQuery; 18 19 private getNsidCountQuery; 19 20 21 + // private bufferedRecords: WorkerEventData[]; 22 + 20 23 constructor() { 24 + // this.bufferedRecords = []; 21 25 this.db = new Database(env.DB_PATH ?? "events.sqlite"); 22 26 // init db 23 27 this.db.run("PRAGMA journal_mode = WAL;"); ··· 82 86 this.insertNsidQuery.run(ALL_NSID); 83 87 } 84 88 85 - writeEvents = ( 86 - events: { nsid: string; timestamp: number; deleted: boolean }[], 87 - ) => { 88 - this.db.transaction(() => { 89 - for (const event of events) { 90 - this.insertEventQuery.run(event.nsid, event.timestamp, event.deleted); 91 - } 92 - })(); 93 - }; 89 + // private recordBufferedEvents = () => { 90 + // try { 91 + // this.db.transaction(() => { 92 + // for (const event of this.bufferedRecords) { 93 + // this.insertEventQuery.run(event.nsid, event.timestamp, event.deleted); 94 + // } 95 + // })(); 96 + // this.bufferedRecords = []; 97 + // } catch (e) { 98 + // console.error(`can't commit to db: ${e}`); 99 + // } 100 + // }; 94 101 95 - recordEvent = (nsid: string, timestamp: number, deleted: boolean) => { 96 - this.db.transaction(() => { 97 - this.insertNsidQuery.run(nsid); 98 - this.updateCountQuery.run({ 99 - $nsid: nsid, 100 - $deleted: deleted, 101 - $timestamp: timestamp, 102 - }); 103 - this.updateCountQuery.run({ 104 - $nsid: ALL_NSID, 105 - $deleted: deleted, 106 - $timestamp: timestamp, 107 - }); 108 - })(); 102 + recordEventHit = (data: WorkerEventData) => { 103 + try { 104 + this.db.transaction(() => { 105 + const { nsid, timestamp, deleted } = data; 106 + this.insertNsidQuery.run(nsid); 107 + this.insertEventQuery.run(nsid, timestamp, deleted); 108 + this.updateCountQuery.run({ 109 + $nsid: nsid, 110 + $deleted: deleted, 111 + $timestamp: timestamp, 112 + }); 113 + this.updateCountQuery.run({ 114 + $nsid: ALL_NSID, 115 + $deleted: deleted, 116 + $timestamp: timestamp, 117 + }); 118 + })(); 119 + // this.bufferedRecords.push(data); 120 + } catch (e) { 121 + console.error(`can't commit to db: ${e}`); 122 + } 123 + // commit buffered if at 10k 124 + // if (this.bufferedRecords.length > 9999) this.recordBufferedEvents(); 109 125 }; 110 126 111 127 getNsidCounts = (): EventRecord[] => { 112 128 return this.getNsidCountQuery.all() as EventRecord[]; 113 129 }; 114 130 115 - close = () => { 131 + exit = () => { 132 + // this.recordBufferedEvents(); 116 133 this.db.close(); 117 134 }; 118 135 }
+20 -22
src/lib/jetstream.ts src/lib/worker.ts
··· 1 1 import { JetstreamSubscription } from "@atcute/jetstream"; 2 - import { eventTracker } from "./db.js"; 2 + import type { WorkerEventData, WorkerCommand } from "./types.js"; 3 + import * as wt from "node:worker_threads"; 3 4 5 + const port = wt.parentPort!; 4 6 let subscription: JetstreamSubscription | null = null; 5 - let eventsToCommit: { 6 - nsid: string; 7 - timestamp: number; 8 - deleted: boolean; 9 - }[] = []; 10 7 11 - export const writeEvents = () => { 12 - eventTracker.writeEvents(eventsToCommit); 13 - eventsToCommit = []; 14 - }; 15 - 16 - const startTracking = async () => { 8 + const track = async () => { 17 9 subscription = new JetstreamSubscription({ 18 10 url: "wss://jetstream2.us-east.bsky.network", 19 11 validateEvents: false, // trust the jetstream :3 ··· 26 18 27 19 const { operation, collection } = event.commit; 28 20 29 - eventTracker.recordEvent(collection, event.time_us, operation === "delete"); 30 - eventsToCommit.push({ 21 + const data: WorkerEventData = { 31 22 nsid: collection, 32 23 timestamp: event.time_us, 33 24 deleted: operation === "delete", 34 - }); 25 + }; 35 26 36 - if (eventsToCommit.length > 10000) { 37 - writeEvents(); 38 - } 27 + port.postMessage(data); 39 28 } 29 + }; 40 30 41 - writeEvents(); 42 - }; 31 + const trackLoop = async () => { 32 + if (subscription !== null) { 33 + return; 34 + } 43 35 44 - export const track = async () => { 45 36 let retryCount = 0; 46 37 const baseDelay = 1000; // 1 second 47 38 const maxDelay = 60000; // 60 seconds 48 39 40 + // if the above fails we fall into here 49 41 while (true) { 50 42 try { 51 - await startTracking(); 43 + await track(); 52 44 retryCount = 0; // Reset on success 53 45 } catch (e) { 54 46 console.log(`tracking failed: ${e}`); ··· 61 53 } 62 54 } 63 55 }; 56 + 57 + port.on("message", (command: WorkerCommand) => { 58 + if (command === "exit") process.exit(); 59 + }); 60 + 61 + trackLoop().catch(console.error);
+6
src/lib/types.ts
··· 1 + export type WorkerEventData = { 2 + nsid: string; 3 + timestamp: number; 4 + deleted: boolean; 5 + }; 6 + export type WorkerCommand = "exit";
+20
src/lib/worker_manager.ts
··· 1 + import { eventTracker } from "./db.js"; 2 + import type { WorkerCommand } from "./types.js"; 3 + import * as wt from "node:worker_threads"; 4 + 5 + let worker: wt.Worker | null = null; 6 + 7 + const sendCommand = (command: WorkerCommand) => { 8 + worker?.postMessage(command); 9 + }; 10 + 11 + export const start = () => { 12 + worker = new wt.Worker("$lib/worker.js"); 13 + worker.on("message", eventTracker.recordEventHit); 14 + }; 15 + 16 + export const exit = () => { 17 + if (worker === null) return; 18 + sendCommand("exit"); 19 + worker = null; 20 + };
+1 -1
tsconfig.json
··· 10 10 "sourceMap": true, 11 11 "strict": true, 12 12 "moduleResolution": "bundler", 13 - "types": ["bun-types"] 13 + "types": ["@types/bun"] 14 14 } 15 15 // Path aliases are handled by https://svelte.dev/docs/kit/configuration#alias 16 16 // except $lib which is handled by https://svelte.dev/docs/kit/configuration#files
+7 -4
vite.config.ts
··· 1 - import tailwindcss from '@tailwindcss/vite'; 2 - import { sveltekit } from '@sveltejs/kit/vite'; 3 - import { defineConfig } from 'vite'; 1 + import tailwindcss from "@tailwindcss/vite"; 2 + import { sveltekit } from "@sveltejs/kit/vite"; 3 + import { defineConfig } from "vite"; 4 4 5 5 export default defineConfig({ 6 - plugins: [tailwindcss(), sveltekit()] 6 + plugins: [tailwindcss(), sveltekit()], 7 + define: { 8 + global: "globalThis", 9 + }, 7 10 });