A fullstack app for indexing standard.site documents
1import { Hono } from "hono";
2import { cors } from "hono/cors";
3import type { Bindings } from "./types";
4import {
5 health,
6 webhook,
7 feed,
8 stats,
9 records,
10 admin,
11 rss,
12 jetstream,
13} from "./routes";
14import { processDocument } from "./utils";
15
16const app = new Hono<{ Bindings: Bindings }>();
17
18// Middleware
19app.use("*", cors());
20
21// Mount routes
22app.route("/health", health);
23app.route("/webhook", webhook);
24app.route("/feed", feed);
25app.route("/stats", stats);
26app.route("/records", records);
27app.route("/admin", admin);
28app.route("/rss.xml", rss);
29app.route("/jetstream", jetstream);
30
31// 404 handler
32app.notFound((c) => {
33 return c.json({ error: "Not found" }, 404);
34});
35
36// Export Durable Object class
37export { JetstreamConsumer } from "./durable-objects/jetstream-consumer";
38
39// Export for Cloudflare Workers
40export default {
41 fetch: app.fetch,
42 async scheduled(_event: ScheduledEvent, env: Bindings, _ctx: ExecutionContext) {
43 const batchSize = 50;
44 // Select stale documents
45 const { results } = await env.DB.prepare(
46 `SELECT did, rkey FROM resolved_documents
47 WHERE stale_at < datetime('now') OR stale_at IS NULL
48 LIMIT ?`,
49 )
50 .bind(batchSize)
51 .all<{ did: string; rkey: string }>();
52
53 if (results && results.length > 0) {
54 const messages = results.map((row) => ({
55 body: {
56 did: row.did,
57 collection: "site.standard.document",
58 rkey: row.rkey,
59 },
60 }));
61
62 // Send to queue
63 await env.RESOLUTION_QUEUE.sendBatch(messages);
64 console.log(`Queued ${messages.length} documents for resolution`);
65 }
66
67 // Cleanup: keep only the 300 most recent verified documents, delete everything else
68 const deletedVerified = await env.DB.prepare(
69 `DELETE FROM resolved_documents WHERE verified = 1 AND uri NOT IN (
70 SELECT uri FROM resolved_documents WHERE verified = 1
71 ORDER BY published_at DESC LIMIT 300
72 )`,
73 ).run();
74 if (deletedVerified.meta.changes > 0) {
75 console.log(`Cleaned up ${deletedVerified.meta.changes} old verified documents`);
76 }
77
78 // Delete unverified/stale documents older than 24 hours
79 const deletedUnverified = await env.DB.prepare(
80 `DELETE FROM resolved_documents WHERE (verified IS NULL OR verified = 0)
81 AND resolved_at < datetime('now', '-24 hours')`,
82 ).run();
83 if (deletedUnverified.meta.changes > 0) {
84 console.log(`Cleaned up ${deletedUnverified.meta.changes} unverified documents`);
85 }
86
87 // Clean up orphaned repo_records
88 if (deletedVerified.meta.changes > 0 || deletedUnverified.meta.changes > 0) {
89 const orphaned = await env.DB.prepare(
90 `DELETE FROM repo_records WHERE NOT EXISTS (
91 SELECT 1 FROM resolved_documents WHERE resolved_documents.did = repo_records.did
92 AND resolved_documents.rkey = repo_records.rkey
93 )`,
94 ).run();
95 if (orphaned.meta.changes > 0) {
96 console.log(`Cleaned up ${orphaned.meta.changes} orphaned repo_records`);
97 }
98 }
99
100 // Ensure Jetstream consumer DO is alive
101 try {
102 const id = env.JETSTREAM_CONSUMER.idFromName("singleton");
103 const stub = env.JETSTREAM_CONSUMER.get(id);
104 await stub.fetch(new Request("http://do/start"));
105 } catch (error) {
106 console.error("Failed to ping Jetstream consumer:", error);
107 }
108 },
109 async queue(batch: MessageBatch<any>, env: Bindings) {
110 for (const message of batch.messages) {
111 try {
112 const { did, collection, rkey } = message.body;
113 await processDocument(env.DB, did, collection, rkey);
114 message.ack();
115 } catch (error) {
116 console.error("Queue processing error:", error);
117 message.retry();
118 }
119 }
120 },
121};