A fullstack app for indexing standard.site documents
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 121 lines 3.5 kB view raw
1import { Hono } from "hono"; 2import { cors } from "hono/cors"; 3import type { Bindings } from "./types"; 4import { 5 health, 6 webhook, 7 feed, 8 stats, 9 records, 10 admin, 11 rss, 12 jetstream, 13} from "./routes"; 14import { processDocument } from "./utils"; 15 16const app = new Hono<{ Bindings: Bindings }>(); 17 18// Middleware 19app.use("*", cors()); 20 21// Mount routes 22app.route("/health", health); 23app.route("/webhook", webhook); 24app.route("/feed", feed); 25app.route("/stats", stats); 26app.route("/records", records); 27app.route("/admin", admin); 28app.route("/rss.xml", rss); 29app.route("/jetstream", jetstream); 30 31// 404 handler 32app.notFound((c) => { 33 return c.json({ error: "Not found" }, 404); 34}); 35 36// Export Durable Object class 37export { JetstreamConsumer } from "./durable-objects/jetstream-consumer"; 38 39// Export for Cloudflare Workers 40export default { 41 fetch: app.fetch, 42 async scheduled(_event: ScheduledEvent, env: Bindings, _ctx: ExecutionContext) { 43 const batchSize = 50; 44 // Select stale documents 45 const { results } = await env.DB.prepare( 46 `SELECT did, rkey FROM resolved_documents 47 WHERE stale_at < datetime('now') OR stale_at IS NULL 48 LIMIT ?`, 49 ) 50 .bind(batchSize) 51 .all<{ did: string; rkey: string }>(); 52 53 if (results && results.length > 0) { 54 const messages = results.map((row) => ({ 55 body: { 56 did: row.did, 57 collection: "site.standard.document", 58 rkey: row.rkey, 59 }, 60 })); 61 62 // Send to queue 63 await env.RESOLUTION_QUEUE.sendBatch(messages); 64 console.log(`Queued ${messages.length} documents for resolution`); 65 } 66 67 // Cleanup: keep only the 300 most recent verified documents, delete everything else 68 const deletedVerified = await env.DB.prepare( 69 `DELETE FROM resolved_documents WHERE verified = 1 AND uri NOT IN ( 70 SELECT uri FROM resolved_documents WHERE verified = 1 71 ORDER BY published_at DESC LIMIT 300 72 )`, 73 ).run(); 74 if (deletedVerified.meta.changes > 0) { 75 console.log(`Cleaned up ${deletedVerified.meta.changes} old verified documents`); 76 } 77 78 // Delete unverified/stale documents older than 24 hours 79 const deletedUnverified = await env.DB.prepare( 80 `DELETE FROM resolved_documents WHERE (verified IS NULL OR verified = 0) 81 AND resolved_at < datetime('now', '-24 hours')`, 82 ).run(); 83 if (deletedUnverified.meta.changes > 0) { 84 console.log(`Cleaned up ${deletedUnverified.meta.changes} unverified documents`); 85 } 86 87 // Clean up orphaned repo_records 88 if (deletedVerified.meta.changes > 0 || deletedUnverified.meta.changes > 0) { 89 const orphaned = await env.DB.prepare( 90 `DELETE FROM repo_records WHERE NOT EXISTS ( 91 SELECT 1 FROM resolved_documents WHERE resolved_documents.did = repo_records.did 92 AND resolved_documents.rkey = repo_records.rkey 93 )`, 94 ).run(); 95 if (orphaned.meta.changes > 0) { 96 console.log(`Cleaned up ${orphaned.meta.changes} orphaned repo_records`); 97 } 98 } 99 100 // Ensure Jetstream consumer DO is alive 101 try { 102 const id = env.JETSTREAM_CONSUMER.idFromName("singleton"); 103 const stub = env.JETSTREAM_CONSUMER.get(id); 104 await stub.fetch(new Request("http://do/start")); 105 } catch (error) { 106 console.error("Failed to ping Jetstream consumer:", error); 107 } 108 }, 109 async queue(batch: MessageBatch<any>, env: Bindings) { 110 for (const message of batch.messages) { 111 try { 112 const { did, collection, rkey } = message.body; 113 await processDocument(env.DB, did, collection, rkey); 114 message.ack(); 115 } catch (error) { 116 console.error("Queue processing error:", error); 117 message.retry(); 118 } 119 } 120 }, 121};