an atproto based link aggregator
1/**
2 * Standalone ingester entry point
3 *
4 * This runs as a separate process on the LiteFS primary machine.
5 * It connects directly to SQLite without going through SvelteKit.
6 */
7
8// Initialize tracing first (before other imports)
9import { initTracing, shutdownTracing } from './instrumentation';
10initTracing();
11
12import { createClient } from '@libsql/client';
13import { drizzle } from 'drizzle-orm/libsql';
14import * as contentSchema from '../lib/server/db/content-schema';
15import { Jetstream, type JetstreamEvent } from './jetstream';
16import { EventHandler } from './handler';
17import { loadCursor, saveCursor, createCursorSetter } from './cursor';
18
19// Database path - on Fly.io this will be the LiteFS mount
20const DB_PATH = process.env.CONTENT_DB_PATH || './data/content.db';
21
22// Collections we want to subscribe to
23const WANTED_COLLECTIONS = ['one.papili.post', 'one.papili.comment'];
24
25// Create database connection
26const client = createClient({
27 url: `file:${DB_PATH}`
28});
29const db = drizzle(client, { schema: contentSchema });
30
31// Graceful shutdown
32let jetstream: Jetstream | null = null;
33
34async function shutdown() {
35 console.log('[ingester] Shutting down...');
36
37 if (jetstream) {
38 // Save final cursor before shutdown
39 const finalCursor = jetstream.getCursor();
40 if (finalCursor !== undefined) {
41 console.log(`[ingester] Saving final cursor: ${finalCursor}`);
42 await saveCursor(db, contentSchema.ingestionCursor, finalCursor);
43 }
44
45 jetstream.destroy();
46 }
47
48 // Shutdown tracing to flush any pending spans
49 await shutdownTracing();
50
51 console.log('[ingester] Shutdown complete');
52 process.exit(0);
53}
54
55process.on('SIGINT', shutdown);
56process.on('SIGTERM', shutdown);
57
58async function main() {
59 console.log('[ingester] Starting standalone ingester');
60 console.log(`[ingester] Database path: ${DB_PATH}`);
61
62 // Load cursor from database
63 const cursor = await loadCursor(db, contentSchema.ingestionCursor);
64
65 // Create event handler with our db connection
66 const handler = new EventHandler(db);
67
68 // Create cursor setter
69 const setCursor = createCursorSetter(db, contentSchema.ingestionCursor);
70
71 // Create Jetstream client
72 jetstream = new Jetstream({
73 wantedCollections: WANTED_COLLECTIONS,
74 cursor,
75 setCursor,
76 onEvent: async (event: JetstreamEvent) => {
77 try {
78 await handler.handle(event);
79 } catch (err) {
80 console.error('[ingester] Error handling event:', err);
81 }
82 },
83 onError: (error: Error) => {
84 console.error('[ingester] Jetstream error:', error);
85 }
86 });
87
88 jetstream.start();
89 console.log('[ingester] Jetstream client started');
90}
91
92main().catch((err) => {
93 console.error('[ingester] Fatal error:', err);
94 process.exit(1);
95});