A fullstack app for indexing standard.site documents
1import type { Bindings, JetstreamEvent } from "../types";
2import { resolvePds } from "../utils/resolver";
3import { ingestDocument, deleteDocument } from "../utils/ingest";
4
5const JETSTREAM_URL =
6 "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=site.standard.document";
7const ALARM_INTERVAL_MS = 30_000;
8const CURSOR_SAVE_INTERVAL_MS = 10_000;
9const CURSOR_SAVE_MESSAGE_COUNT = 100;
10
11export class JetstreamConsumer implements DurableObject {
12 private state: DurableObjectState;
13 private env: Bindings;
14 private ws: WebSocket | null = null;
15 private cursor: string | null = null;
16 private messageCount = 0;
17 private lastCursorSave = 0;
18 private pdsCache: Map<string, string | null> = new Map();
19
20 constructor(state: DurableObjectState, env: Bindings) {
21 this.state = state;
22 this.env = env;
23 }
24
25 async fetch(request: Request): Promise<Response> {
26 const url = new URL(request.url);
27
28 switch (url.pathname) {
29 case "/start":
30 return this.handleStart();
31 case "/status":
32 return this.handleStatus();
33 case "/stop":
34 return this.handleStop();
35 default:
36 return new Response("Not found", { status: 404 });
37 }
38 }
39
40 async alarm(): Promise<void> {
41 if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
42 console.log("Alarm: WebSocket not connected, reconnecting...");
43 await this.connect();
44 }
45 // Reschedule alarm
46 await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS);
47 }
48
49 private async handleStart(): Promise<Response> {
50 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
51 return Response.json({
52 status: "already_connected",
53 cursor: this.cursor,
54 });
55 }
56
57 await this.connect();
58 await this.state.storage.setAlarm(Date.now() + ALARM_INTERVAL_MS);
59
60 return Response.json({ status: "started", cursor: this.cursor });
61 }
62
63 private handleStatus(): Response {
64 return Response.json({
65 connected: this.ws?.readyState === WebSocket.OPEN,
66 cursor: this.cursor,
67 messageCount: this.messageCount,
68 });
69 }
70
71 private async handleStop(): Promise<Response> {
72 if (this.ws) {
73 this.ws.close();
74 this.ws = null;
75 }
76 await this.state.storage.deleteAlarm();
77 return Response.json({ status: "stopped" });
78 }
79
80 private async connect(): Promise<void> {
81 // Close existing connection if any
82 if (this.ws) {
83 try {
84 this.ws.close();
85 } catch {}
86 this.ws = null;
87 }
88
89 // Load cursor from storage
90 if (!this.cursor) {
91 this.cursor =
92 (await this.state.storage.get<string>("cursor")) || null;
93 }
94
95 const url = this.cursor
96 ? `${JETSTREAM_URL}&cursor=${this.cursor}`
97 : JETSTREAM_URL;
98
99 console.log(`Connecting to Jetstream: ${url}`);
100
101 try {
102 const ws = new WebSocket(url);
103
104 ws.addEventListener("message", (event) => {
105 this.onMessage(event.data as string);
106 });
107
108 ws.addEventListener("close", () => {
109 console.log("Jetstream WebSocket closed");
110 this.ws = null;
111 // Alarm will handle reconnection
112 });
113
114 ws.addEventListener("error", (event) => {
115 console.error("Jetstream WebSocket error:", event);
116 try {
117 ws.close();
118 } catch {}
119 this.ws = null;
120 });
121
122 this.ws = ws;
123 } catch (error) {
124 console.error("Failed to connect to Jetstream:", error);
125 this.ws = null;
126 }
127 }
128
129 private async onMessage(data: string): Promise<void> {
130 try {
131 const event = JSON.parse(data) as JetstreamEvent;
132
133 // Update cursor from every event
134 this.cursor = String(event.time_us);
135
136 // Only process commit events
137 if (event.kind !== "commit") return;
138
139 const { commit } = event;
140 if (commit.collection !== "site.standard.document") return;
141
142 // PDS filter: skip bridgy noise
143 const pds = await this.resolvePdsWithCache(event.did);
144 if (!pds || pds.includes("brid.gy")) return;
145
146 if (
147 commit.operation === "create" ||
148 commit.operation === "update"
149 ) {
150 await ingestDocument(this.env.DB, this.env.RESOLUTION_QUEUE, {
151 did: event.did,
152 rkey: commit.rkey,
153 collection: commit.collection,
154 cid: commit.cid,
155 record: commit.record,
156 });
157 } else if (commit.operation === "delete") {
158 await deleteDocument(this.env.DB, {
159 did: event.did,
160 collection: commit.collection,
161 rkey: commit.rkey,
162 });
163 }
164
165 this.messageCount++;
166
167 // Periodically save cursor
168 await this.maybeSaveCursor();
169 } catch (error) {
170 console.error("Error processing Jetstream message:", error);
171 }
172 }
173
174 private async resolvePdsWithCache(did: string): Promise<string | null> {
175 // Fast in-memory cache (cleared on DO eviction)
176 if (this.pdsCache.has(did)) {
177 return this.pdsCache.get(did)!;
178 }
179
180 const pds = await resolvePds(this.env.DB, did);
181 this.pdsCache.set(did, pds);
182
183 // Keep in-memory cache bounded
184 if (this.pdsCache.size > 10_000) {
185 const firstKey = this.pdsCache.keys().next().value;
186 if (firstKey) this.pdsCache.delete(firstKey);
187 }
188
189 return pds;
190 }
191
192 private async maybeSaveCursor(): Promise<void> {
193 if (!this.cursor) return;
194
195 const now = Date.now();
196 const shouldSave =
197 this.messageCount % CURSOR_SAVE_MESSAGE_COUNT === 0 ||
198 now - this.lastCursorSave > CURSOR_SAVE_INTERVAL_MS;
199
200 if (shouldSave) {
201 await this.state.storage.put("cursor", this.cursor);
202 this.lastCursorSave = now;
203 }
204 }
205}