A fullstack app for indexing standard.site documents
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 205 lines 5.2 kB view raw
1import type { Bindings, JetstreamEvent } from "../types"; 2import { resolvePds } from "../utils/resolver"; 3import { ingestDocument, deleteDocument } from "../utils/ingest"; 4 5const JETSTREAM_URL = 6 "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=site.standard.document"; 7const ALARM_INTERVAL_MS = 30_000; 8const CURSOR_SAVE_INTERVAL_MS = 10_000; 9const CURSOR_SAVE_MESSAGE_COUNT = 100; 10 11export class JetstreamConsumer implements DurableObject { 12 private state: DurableObjectState; 13 private env: Bindings; 14 private ws: WebSocket | null = null; 15 private cursor: string | null = null; 16 private messageCount = 0; 17 private lastCursorSave = 0; 18 private pdsCache: Map<string, string | null> = new Map(); 19 20 constructor(state: DurableObjectState, env: Bindings) { 21 this.state = state; 22 this.env = env; 23 } 24 25 async fetch(request: Request): Promise<Response> { 26 const url = new URL(request.url); 27 28 switch (url.pathname) { 29 case "/start": 30 return this.handleStart(); 31 case "/status": 32 return this.handleStatus(); 33 case "/stop": 34 return this.handleStop(); 35 default: 36 return new Response("Not found", { status: 404 }); 37 } 38 } 39 40 async alarm(): Promise<void> { 41 if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { 42 console.log("Alarm: WebSocket not connected, reconnecting..."); 43 await this.connect(); 44 } 45 // Reschedule alarm 46 await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS); 47 } 48 49 private async handleStart(): Promise<Response> { 50 if (this.ws && this.ws.readyState === WebSocket.OPEN) { 51 return Response.json({ 52 status: "already_connected", 53 cursor: this.cursor, 54 }); 55 } 56 57 await this.connect(); 58 await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS); 59 60 return Response.json({ status: "started", cursor: this.cursor }); 61 } 62 63 private handleStatus(): Response { 64 return Response.json({ 65 connected: this.ws?.readyState === WebSocket.OPEN, 66 cursor: this.cursor, 67 messageCount: this.messageCount, 68 }); 69 } 70 71 private async handleStop(): Promise<Response> { 72 if (this.ws) { 73 this.ws.close(); 74 this.ws = null; 75 } 76 await this.state.storage.deleteAlarm(); 77 return Response.json({ status: "stopped" }); 78 } 79 80 private async connect(): Promise<void> { 81 // Close existing connection if any 82 if (this.ws) { 83 try { 84 this.ws.close(); 85 } catch {} 86 this.ws = null; 87 } 88 89 // Load cursor from storage 90 if (!this.cursor) { 91 this.cursor = 92 (await this.state.storage.get<string>("cursor")) || null; 93 } 94 95 const url = this.cursor 96 ? `${JETSTREAM_URL}&cursor=${this.cursor}` 97 : JETSTREAM_URL; 98 99 console.log(`Connecting to Jetstream: ${url}`); 100 101 try { 102 const ws = new WebSocket(url); 103 104 ws.addEventListener("message", (event) => { 105 this.onMessage(event.data as string); 106 }); 107 108 ws.addEventListener("close", () => { 109 console.log("Jetstream WebSocket closed"); 110 this.ws = null; 111 // Alarm will handle reconnection 112 }); 113 114 ws.addEventListener("error", (event) => { 115 console.error("Jetstream WebSocket error:", event); 116 try { 117 ws.close(); 118 } catch {} 119 this.ws = null; 120 }); 121 122 this.ws = ws; 123 } catch (error) { 124 console.error("Failed to connect to Jetstream:", error); 125 this.ws = null; 126 } 127 } 128 129 private async onMessage(data: string): Promise<void> { 130 try { 131 const event = JSON.parse(data) as JetstreamEvent; 132 133 // Update cursor from every event 134 this.cursor = String(event.time_us); 135 136 // Only process commit events 137 if (event.kind !== "commit") return; 138 139 const { commit } = event; 140 if (commit.collection !== "site.standard.document") return; 141 142 // PDS filter: skip bridgy noise 143 const pds = await this.resolvePdsWithCache(event.did); 144 if (!pds || pds.includes("brid.gy")) return; 145 146 if ( 147 commit.operation === "create" || 148 commit.operation === "update" 149 ) { 150 await ingestDocument(this.env.DB, this.env.RESOLUTION_QUEUE, { 151 did: event.did, 152 rkey: commit.rkey, 153 collection: commit.collection, 154 cid: commit.cid, 155 record: commit.record, 156 }); 157 } else if (commit.operation === "delete") { 158 await deleteDocument(this.env.DB, { 159 did: event.did, 160 collection: commit.collection, 161 rkey: commit.rkey, 162 }); 163 } 164 165 this.messageCount++; 166 167 // Periodically save cursor 168 await this.maybeSaveCursor(); 169 } catch (error) { 170 console.error("Error processing Jetstream message:", error); 171 } 172 } 173 174 private async resolvePdsWithCache(did: string): Promise<string | null> { 175 // Fast in-memory cache (cleared on DO eviction) 176 if (this.pdsCache.has(did)) { 177 return this.pdsCache.get(did)!; 178 } 179 180 const pds = await resolvePds(this.env.DB, did); 181 this.pdsCache.set(did, pds); 182 183 // Keep in-memory cache bounded 184 if (this.pdsCache.size > 10_000) { 185 const firstKey = this.pdsCache.keys().next().value; 186 if (firstKey) this.pdsCache.delete(firstKey); 187 } 188 189 return pds; 190 } 191 192 private async maybeSaveCursor(): Promise<void> { 193 if (!this.cursor) return; 194 195 const now = Date.now(); 196 const shouldSave = 197 this.messageCount % CURSOR_SAVE_MESSAGE_COUNT === 0 || 198 now - this.lastCursorSave > CURSOR_SAVE_INTERVAL_MS; 199 200 if (shouldSave) { 201 await this.state.storage.put("cursor", this.cursor); 202 this.lastCursorSave = now; 203 } 204 } 205}