open source is social v-it.org
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add explore/ Cloudflare Worker for indexing cap and vouch activity

Standalone Worker project that connects to Jetstream via cron,
indexes org.v-it.cap and org.v-it.vouch events into D1, resolves
DID handles via PLC directory, and serves a read-only JSON API
at explore.v-it.org.

+612
+4
.gitignore
··· 10 10 .agents/ 11 11 .claude/skills/ 12 12 skills-lock.json 13 + 14 + # Explore worker 15 + explore/node_modules/ 16 + explore/.wrangler/
+10
explore/Makefile
··· 1 + .PHONY: dev deploy schema 2 + 3 + dev: 4 + npx wrangler dev 5 + 6 + deploy: 7 + npx wrangler deploy 8 + 9 + schema: 10 + npx wrangler d1 execute vit-explore --file=schema.sql
+13
explore/package.json
··· 1 + { 2 + "name": "vit-explore", 3 + "version": "0.1.0", 4 + "private": true, 5 + "type": "module", 6 + "scripts": { 7 + "dev": "npx wrangler dev", 8 + "deploy": "npx wrangler deploy" 9 + }, 10 + "devDependencies": { 11 + "wrangler": "^4" 12 + } 13 + }
+1
explore/public/.gitkeep
··· 1 +
+50
explore/schema.sql
··· 1 + CREATE TABLE IF NOT EXISTS caps ( 2 + id INTEGER PRIMARY KEY AUTOINCREMENT, 3 + did TEXT NOT NULL, 4 + rkey TEXT NOT NULL, 5 + uri TEXT NOT NULL UNIQUE, 6 + cid TEXT, 7 + title TEXT NOT NULL, 8 + description TEXT, 9 + ref TEXT NOT NULL, 10 + beacon TEXT, 11 + record_json TEXT NOT NULL, 12 + created_at TEXT NOT NULL, 13 + indexed_at TEXT NOT NULL DEFAULT (datetime('now')), 14 + UNIQUE(did, rkey) 15 + ); 16 + 17 + CREATE INDEX IF NOT EXISTS idx_caps_beacon ON caps(beacon); 18 + CREATE INDEX IF NOT EXISTS idx_caps_created_at ON caps(created_at DESC); 19 + 20 + CREATE TABLE IF NOT EXISTS vouches ( 21 + id INTEGER PRIMARY KEY AUTOINCREMENT, 22 + did TEXT NOT NULL, 23 + rkey TEXT NOT NULL, 24 + uri TEXT NOT NULL UNIQUE, 25 + cid TEXT, 26 + cap_uri TEXT NOT NULL, 27 + ref TEXT NOT NULL, 28 + beacon TEXT NOT NULL, 29 + record_json TEXT NOT NULL, 30 + created_at TEXT NOT NULL, 31 + indexed_at TEXT NOT NULL DEFAULT (datetime('now')), 32 + UNIQUE(did, rkey) 33 + ); 34 + 35 + CREATE INDEX IF NOT EXISTS idx_vouches_cap_uri ON vouches(cap_uri); 36 + CREATE INDEX IF NOT EXISTS idx_vouches_beacon ON vouches(beacon); 37 + 38 + CREATE TABLE IF NOT EXISTS beacons ( 39 + id INTEGER PRIMARY KEY AUTOINCREMENT, 40 + name TEXT NOT NULL UNIQUE, 41 + cap_count INTEGER NOT NULL DEFAULT 0, 42 + vouch_count INTEGER NOT NULL DEFAULT 0, 43 + last_activity TEXT NOT NULL DEFAULT (datetime('now')) 44 + ); 45 + 46 + CREATE TABLE IF NOT EXISTS handles ( 47 + did TEXT PRIMARY KEY, 48 + handle TEXT NOT NULL, 49 + fetched_at TEXT NOT NULL DEFAULT (datetime('now')) 50 + );
+119
explore/src/api.js
··· 1 + // SPDX-License-Identifier: AGPL-3.0-only 2 + // Copyright (c) 2026 sol pbc 3 + 4 + const CORS_HEADERS = { 5 + 'Access-Control-Allow-Origin': '*', 6 + 'Access-Control-Allow-Methods': 'GET, OPTIONS', 7 + 'Access-Control-Allow-Headers': 'Content-Type', 8 + }; 9 + 10 + function json(data, status = 200) { 11 + return new Response(JSON.stringify(data), { 12 + status, 13 + headers: { 'Content-Type': 'application/json', ...CORS_HEADERS }, 14 + }); 15 + } 16 + 17 + function parseLimit(value) { 18 + const parsed = Number.parseInt(value ?? '50', 10); 19 + if (!Number.isFinite(parsed) || parsed <= 0) { 20 + return 50; 21 + } 22 + return Math.min(parsed, 100); 23 + } 24 + 25 + function parseCursor(value) { 26 + if (!value) { 27 + return null; 28 + } 29 + 30 + const parsed = Number.parseInt(value, 10); 31 + return Number.isFinite(parsed) && parsed > 0 ? parsed : null; 32 + } 33 + 34 + export async function handleRequest(request, env) { 35 + if (request.method === 'OPTIONS') { 36 + return new Response(null, { status: 204, headers: CORS_HEADERS }); 37 + } 38 + 39 + const url = new URL(request.url); 40 + const { pathname, searchParams } = url; 41 + 42 + if (request.method !== 'GET') { 43 + return json({ error: 'method not allowed' }, 405); 44 + } 45 + 46 + if (pathname === '/api/caps') { 47 + const cursor = parseCursor(searchParams.get('cursor')); 48 + const limit = parseLimit(searchParams.get('limit')); 49 + const beacon = searchParams.get('beacon'); 50 + 51 + const conditions = []; 52 + const bindings = []; 53 + 54 + if (beacon) { 55 + conditions.push('c.beacon = ?'); 56 + bindings.push(beacon); 57 + } 58 + 59 + if (cursor) { 60 + conditions.push('c.id < ?'); 61 + bindings.push(cursor); 62 + } 63 + 64 + let sql = 'SELECT c.*, h.handle FROM caps c LEFT JOIN handles h ON c.did = h.did'; 65 + if (conditions.length > 0) { 66 + sql += ` WHERE ${conditions.join(' AND ')}`; 67 + } 68 + sql += ' ORDER BY c.id DESC LIMIT ?'; 69 + bindings.push(limit); 70 + 71 + const { results } = await env.DB.prepare(sql).bind(...bindings).all(); 72 + return json({ 73 + caps: results, 74 + cursor: results.length > 0 ? results[results.length - 1].id : null, 75 + }); 76 + } 77 + 78 + if (pathname === '/api/vouches') { 79 + const capUri = searchParams.get('cap_uri'); 80 + if (!capUri) { 81 + return json({ error: 'cap_uri is required' }, 400); 82 + } 83 + 84 + const { results } = await env.DB.prepare( 85 + `SELECT v.*, h.handle 86 + FROM vouches v 87 + LEFT JOIN handles h ON v.did = h.did 88 + WHERE v.cap_uri = ? 89 + ORDER BY v.id DESC`, 90 + ) 91 + .bind(capUri) 92 + .all(); 93 + 94 + return json({ vouches: results }); 95 + } 96 + 97 + if (pathname === '/api/beacons') { 98 + const { results } = await env.DB.prepare('SELECT * FROM beacons ORDER BY last_activity DESC').all(); 99 + return json({ beacons: results }); 100 + } 101 + 102 + if (pathname === '/api/stats') { 103 + const [caps, vouches, beacons, dids] = await env.DB.batch([ 104 + env.DB.prepare('SELECT COUNT(*) as count FROM caps'), 105 + env.DB.prepare('SELECT COUNT(*) as count FROM vouches'), 106 + env.DB.prepare('SELECT COUNT(*) as count FROM beacons'), 107 + env.DB.prepare('SELECT COUNT(DISTINCT did) as count FROM caps'), 108 + ]); 109 + 110 + return json({ 111 + total_caps: caps.results[0]?.count ?? 0, 112 + total_vouches: vouches.results[0]?.count ?? 0, 113 + total_beacons: beacons.results[0]?.count ?? 0, 114 + active_dids: dids.results[0]?.count ?? 0, 115 + }); 116 + } 117 + 118 + return json({ error: 'not found' }, 404); 119 + }
+22
explore/src/cursor.js
··· 1 + // SPDX-License-Identifier: AGPL-3.0-only 2 + // Copyright (c) 2026 sol pbc 3 + 4 + export class CursorStore { 5 + constructor(state, env) { 6 + this.state = state; 7 + } 8 + 9 + async fetch(request) { 10 + if (request.method === 'GET') { 11 + return new Response((await this.state.storage.get('cursor')) || ''); 12 + } 13 + 14 + if (request.method === 'PUT') { 15 + const text = await request.text(); 16 + await this.state.storage.put('cursor', text); 17 + return new Response('ok'); 18 + } 19 + 20 + return new Response('method not allowed', { status: 405 }); 21 + } 22 + }
+39
explore/src/index.js
··· 1 + // SPDX-License-Identifier: AGPL-3.0-only 2 + // Copyright (c) 2026 sol pbc 3 + 4 + import { handleRequest } from './api.js'; 5 + import { streamEvents } from './jetstream.js'; 6 + 7 + export { CursorStore } from './cursor.js'; 8 + 9 + async function getCursor(env) { 10 + const id = env.CURSOR_STORE.idFromName('cursor'); 11 + const stub = env.CURSOR_STORE.get(id); 12 + const res = await stub.fetch('http://cursor/'); 13 + return (await res.text()) || null; 14 + } 15 + 16 + async function saveCursor(env, cursor) { 17 + const id = env.CURSOR_STORE.idFromName('cursor'); 18 + const stub = env.CURSOR_STORE.get(id); 19 + await stub.fetch('http://cursor/', { method: 'PUT', body: cursor }); 20 + } 21 + 22 + export default { 23 + async fetch(request, env) { 24 + const url = new URL(request.url); 25 + if (url.pathname.startsWith('/api/')) { 26 + return handleRequest(request, env); 27 + } 28 + 29 + return new Response('not found', { status: 404 }); 30 + }, 31 + 32 + async scheduled(event, env, ctx) { 33 + const cursor = await getCursor(env); 34 + const result = await streamEvents(env, cursor); 35 + if (result.latestCursor) { 36 + await saveCursor(env, result.latestCursor); 37 + } 38 + }, 39 + };
+269
explore/src/jetstream.js
··· 1 + // SPDX-License-Identifier: AGPL-3.0-only 2 + // Copyright (c) 2026 sol pbc 3 + 4 + import { resolveHandles } from './resolve.js'; 5 + 6 + const CAP_COLLECTION = 'org.v-it.cap'; 7 + const VOUCH_COLLECTION = 'org.v-it.vouch'; 8 + const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe'; 9 + const STREAM_DURATION_MS = 25_000; 10 + 11 + function beaconValue(value) { 12 + return typeof value === 'string' && value.length > 0 ? value : null; 13 + } 14 + 15 + function incrementCapBeaconStatements(env, beacon) { 16 + return [ 17 + env.DB.prepare( 18 + `INSERT INTO beacons (name, cap_count, last_activity) 19 + VALUES (?, 1, datetime('now')) 20 + ON CONFLICT(name) DO UPDATE SET 21 + cap_count = cap_count + 1, 22 + last_activity = datetime('now')`, 23 + ).bind(beacon), 24 + ]; 25 + } 26 + 27 + function incrementVouchBeaconStatements(env, beacon) { 28 + return [ 29 + env.DB.prepare( 30 + `INSERT INTO beacons (name, vouch_count, last_activity) 31 + VALUES (?, 1, datetime('now')) 32 + ON CONFLICT(name) DO UPDATE SET 33 + vouch_count = vouch_count + 1, 34 + last_activity = datetime('now')`, 35 + ).bind(beacon), 36 + ]; 37 + } 38 + 39 + function decrementCapBeaconStatement(env, beacon) { 40 + return env.DB.prepare( 41 + `UPDATE beacons 42 + SET cap_count = MAX(0, cap_count - 1), 43 + last_activity = datetime('now') 44 + WHERE name = ?`, 45 + ).bind(beacon); 46 + } 47 + 48 + function decrementVouchBeaconStatement(env, beacon) { 49 + return env.DB.prepare( 50 + `UPDATE beacons 51 + SET vouch_count = MAX(0, vouch_count - 1), 52 + last_activity = datetime('now') 53 + WHERE name = ?`, 54 + ).bind(beacon); 55 + } 56 + 57 + async function processCapEvent(env, did, commit) { 58 + const { operation, rkey, record, cid } = commit; 59 + const uri = `at://${did}/${CAP_COLLECTION}/${rkey}`; 60 + 61 + if (operation === 'create' || operation === 'update') { 62 + const nextBeacon = beaconValue(record?.beacon); 63 + const existing = await env.DB.prepare('SELECT beacon FROM caps WHERE did = ? AND rkey = ?') 64 + .bind(did, rkey) 65 + .first(); 66 + const prevBeacon = beaconValue(existing?.beacon); 67 + 68 + const stmts = [ 69 + env.DB.prepare( 70 + `INSERT INTO caps (did, rkey, uri, cid, title, description, ref, beacon, record_json, created_at) 71 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 72 + ON CONFLICT(did, rkey) DO UPDATE SET 73 + cid = excluded.cid, 74 + title = excluded.title, 75 + description = excluded.description, 76 + ref = excluded.ref, 77 + beacon = excluded.beacon, 78 + record_json = excluded.record_json, 79 + created_at = excluded.created_at`, 80 + ).bind( 81 + did, 82 + rkey, 83 + uri, 84 + cid ?? null, 85 + record.title, 86 + record.description || '', 87 + record.ref, 88 + nextBeacon, 89 + JSON.stringify(record), 90 + record.createdAt, 91 + ), 92 + ]; 93 + 94 + if (!existing && nextBeacon) { 95 + stmts.push(...incrementCapBeaconStatements(env, nextBeacon)); 96 + } else if (existing && prevBeacon !== nextBeacon) { 97 + if (prevBeacon) { 98 + stmts.push(decrementCapBeaconStatement(env, prevBeacon)); 99 + } 100 + if (nextBeacon) { 101 + stmts.push(...incrementCapBeaconStatements(env, nextBeacon)); 102 + } 103 + } 104 + 105 + await env.DB.batch(stmts); 106 + return; 107 + } 108 + 109 + if (operation === 'delete') { 110 + const existing = await env.DB.prepare('SELECT beacon FROM caps WHERE did = ? AND rkey = ?') 111 + .bind(did, rkey) 112 + .first(); 113 + 114 + const stmts = [ 115 + env.DB.prepare('DELETE FROM caps WHERE did = ? AND rkey = ?').bind(did, rkey), 116 + ]; 117 + 118 + const prevBeacon = beaconValue(existing?.beacon); 119 + if (prevBeacon) { 120 + stmts.unshift(decrementCapBeaconStatement(env, prevBeacon)); 121 + } 122 + 123 + await env.DB.batch(stmts); 124 + } 125 + } 126 + 127 + async function processVouchEvent(env, did, commit) { 128 + const { operation, rkey, record, cid } = commit; 129 + const uri = `at://${did}/${VOUCH_COLLECTION}/${rkey}`; 130 + 131 + if (operation === 'create' || operation === 'update') { 132 + const nextBeacon = beaconValue(record?.beacon); 133 + const existing = await env.DB.prepare('SELECT beacon FROM vouches WHERE did = ? AND rkey = ?') 134 + .bind(did, rkey) 135 + .first(); 136 + const prevBeacon = beaconValue(existing?.beacon); 137 + 138 + const stmts = [ 139 + env.DB.prepare( 140 + `INSERT INTO vouches (did, rkey, uri, cid, cap_uri, ref, beacon, record_json, created_at) 141 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 142 + ON CONFLICT(did, rkey) DO UPDATE SET 143 + cid = excluded.cid, 144 + cap_uri = excluded.cap_uri, 145 + ref = excluded.ref, 146 + beacon = excluded.beacon, 147 + record_json = excluded.record_json, 148 + created_at = excluded.created_at`, 149 + ).bind( 150 + did, 151 + rkey, 152 + uri, 153 + cid ?? null, 154 + record.subject?.uri, 155 + record.ref, 156 + record.beacon, 157 + JSON.stringify(record), 158 + record.createdAt, 159 + ), 160 + ]; 161 + 162 + if (!existing && nextBeacon) { 163 + stmts.push(...incrementVouchBeaconStatements(env, nextBeacon)); 164 + } else if (existing && prevBeacon !== nextBeacon) { 165 + if (prevBeacon) { 166 + stmts.push(decrementVouchBeaconStatement(env, prevBeacon)); 167 + } 168 + if (nextBeacon) { 169 + stmts.push(...incrementVouchBeaconStatements(env, nextBeacon)); 170 + } 171 + } 172 + 173 + await env.DB.batch(stmts); 174 + return; 175 + } 176 + 177 + if (operation === 'delete') { 178 + const existing = await env.DB.prepare('SELECT beacon FROM vouches WHERE did = ? AND rkey = ?') 179 + .bind(did, rkey) 180 + .first(); 181 + 182 + const stmts = [ 183 + env.DB.prepare('DELETE FROM vouches WHERE did = ? AND rkey = ?').bind(did, rkey), 184 + ]; 185 + 186 + const prevBeacon = beaconValue(existing?.beacon); 187 + if (prevBeacon) { 188 + stmts.unshift(decrementVouchBeaconStatement(env, prevBeacon)); 189 + } 190 + 191 + await env.DB.batch(stmts); 192 + } 193 + } 194 + 195 + export async function streamEvents(env, cursor) { 196 + const url = new URL(JETSTREAM_URL); 197 + url.searchParams.append('wantedCollections', CAP_COLLECTION); 198 + url.searchParams.append('wantedCollections', VOUCH_COLLECTION); 199 + if (cursor) { 200 + url.searchParams.set('cursor', cursor); 201 + } 202 + 203 + return await new Promise((resolve) => { 204 + let latestCursor = cursor || null; 205 + const newDids = new Set(); 206 + const pending = new Set(); 207 + const ws = new WebSocket(url.toString()); 208 + 209 + const timeout = setTimeout(() => { 210 + ws.close(); 211 + }, STREAM_DURATION_MS); 212 + 213 + const finish = async () => { 214 + clearTimeout(timeout); 215 + if (pending.size > 0) { 216 + await Promise.allSettled([...pending]); 217 + } 218 + if (newDids.size > 0) { 219 + await resolveHandles([...newDids], env); 220 + } 221 + resolve({ latestCursor }); 222 + }; 223 + 224 + ws.addEventListener('message', (event) => { 225 + const task = (async () => { 226 + let msg; 227 + try { 228 + msg = JSON.parse(event.data); 229 + } catch { 230 + return; 231 + } 232 + 233 + if (msg.kind !== 'commit') { 234 + return; 235 + } 236 + 237 + if (msg.time_us) { 238 + latestCursor = String(msg.time_us); 239 + } 240 + 241 + if (msg.did) { 242 + newDids.add(msg.did); 243 + } 244 + 245 + const commit = msg.commit; 246 + if (!commit) { 247 + return; 248 + } 249 + 250 + if (commit.collection === CAP_COLLECTION) { 251 + await processCapEvent(env, msg.did, commit); 252 + } else if (commit.collection === VOUCH_COLLECTION) { 253 + await processVouchEvent(env, msg.did, commit); 254 + } 255 + })(); 256 + 257 + pending.add(task); 258 + task.finally(() => pending.delete(task)); 259 + }); 260 + 261 + ws.addEventListener('close', () => { 262 + void finish(); 263 + }); 264 + 265 + ws.addEventListener('error', () => { 266 + ws.close(); 267 + }); 268 + }); 269 + }
+57
explore/src/resolve.js
··· 1 + // SPDX-License-Identifier: AGPL-3.0-only 2 + // Copyright (c) 2026 sol pbc 3 + 4 + export async function resolveHandle(did, env) { 5 + try { 6 + const cached = await env.DB.prepare( 7 + "SELECT handle, fetched_at FROM handles WHERE did = ? AND fetched_at > datetime('now', '-24 hours')", 8 + ) 9 + .bind(did) 10 + .first(); 11 + 12 + if (cached?.handle) { 13 + return cached.handle; 14 + } 15 + 16 + const res = await fetch(`https://plc.directory/${did}`); 17 + if (!res.ok) { 18 + return null; 19 + } 20 + 21 + const data = await res.json(); 22 + const handle = Array.isArray(data?.alsoKnownAs) 23 + ? data.alsoKnownAs.find(value => typeof value === 'string' && value.startsWith('at://'))?.slice(5) ?? null 24 + : null; 25 + 26 + if (!handle) { 27 + return null; 28 + } 29 + 30 + await env.DB.prepare( 31 + `INSERT INTO handles (did, handle, fetched_at) 32 + VALUES (?, ?, datetime('now')) 33 + ON CONFLICT(did) DO UPDATE SET 34 + handle = excluded.handle, 35 + fetched_at = excluded.fetched_at`, 36 + ) 37 + .bind(did, handle) 38 + .run(); 39 + 40 + return handle; 41 + } catch { 42 + return null; 43 + } 44 + } 45 + 46 + export async function resolveHandles(dids, env) { 47 + const handles = new Map(); 48 + 49 + for (const did of dids) { 50 + const handle = await resolveHandle(did, env); 51 + if (handle) { 52 + handles.set(did, handle); 53 + } 54 + } 55 + 56 + return handles; 57 + }
+28
explore/wrangler.toml
··· 1 + name = "vit-explore" 2 + main = "src/index.js" 3 + compatibility_date = "2024-12-01" 4 + account_id = "3f2c1528c7d4d9685819ea9e9e307c92" 5 + 6 + [assets] 7 + directory = "public" 8 + 9 + [[d1_databases]] 10 + binding = "DB" 11 + database_name = "vit-explore" 12 + database_id = "placeholder" 13 + 14 + [durable_objects] 15 + bindings = [ 16 + { name = "CURSOR_STORE", class_name = "CursorStore" } 17 + ] 18 + 19 + [[migrations]] 20 + tag = "v1" 21 + new_classes = ["CursorStore"] 22 + 23 + [triggers] 24 + crons = ["*/1 * * * *"] 25 + 26 + [[routes]] 27 + pattern = "explore.v-it.org" 28 + custom_domain = true