an atproto based link aggregator
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 95 lines 2.7 kB view raw
1/** 2 * Standalone ingester entry point 3 * 4 * This runs as a separate process on the LiteFS primary machine. 5 * It connects directly to SQLite without going through SvelteKit. 6 */ 7 8// Initialize tracing first (before other imports) 9import { initTracing, shutdownTracing } from './instrumentation'; 10initTracing(); 11 12import { createClient } from '@libsql/client'; 13import { drizzle } from 'drizzle-orm/libsql'; 14import * as contentSchema from '../lib/server/db/content-schema'; 15import { Jetstream, type JetstreamEvent } from './jetstream'; 16import { EventHandler } from './handler'; 17import { loadCursor, saveCursor, createCursorSetter } from './cursor'; 18 19// Database path - on Fly.io this will be the LiteFS mount 20const DB_PATH = process.env.CONTENT_DB_PATH || './data/content.db'; 21 22// Collections we want to subscribe to 23const WANTED_COLLECTIONS = ['one.papili.post', 'one.papili.comment']; 24 25// Create database connection 26const client = createClient({ 27 url: `file:${DB_PATH}` 28}); 29const db = drizzle(client, { schema: contentSchema }); 30 31// Graceful shutdown 32let jetstream: Jetstream | null = null; 33 34async function shutdown() { 35 console.log('[ingester] Shutting down...'); 36 37 if (jetstream) { 38 // Save final cursor before shutdown 39 const finalCursor = jetstream.getCursor(); 40 if (finalCursor !== undefined) { 41 console.log(`[ingester] Saving final cursor: ${finalCursor}`); 42 await saveCursor(db, contentSchema.ingestionCursor, finalCursor); 43 } 44 45 jetstream.destroy(); 46 } 47 48 // Shutdown tracing to flush any pending spans 49 await shutdownTracing(); 50 51 console.log('[ingester] Shutdown complete'); 52 process.exit(0); 53} 54 55process.on('SIGINT', shutdown); 56process.on('SIGTERM', shutdown); 57 58async function main() { 59 console.log('[ingester] Starting standalone ingester'); 60 console.log(`[ingester] Database path: ${DB_PATH}`); 61 62 // Load cursor from database 63 const cursor = await loadCursor(db, contentSchema.ingestionCursor); 64 65 // Create event handler with our db connection 66 const handler = new EventHandler(db); 67 68 // Create cursor setter 69 const setCursor = createCursorSetter(db, contentSchema.ingestionCursor); 70 71 // Create Jetstream client 72 jetstream = new Jetstream({ 73 wantedCollections: WANTED_COLLECTIONS, 74 cursor, 75 setCursor, 76 onEvent: async (event: JetstreamEvent) => { 77 try { 78 await handler.handle(event); 79 } catch (err) { 80 console.error('[ingester] Error handling event:', err); 81 } 82 }, 83 onError: (error: Error) => { 84 console.error('[ingester] Jetstream error:', error); 85 } 86 }); 87 88 jetstream.start(); 89 console.log('[ingester] Jetstream client started'); 90} 91 92main().catch((err) => { 93 console.error('[ingester] Fatal error:', err); 94 process.exit(1); 95});