the statusphere demo reworked into a vite/react app in a monorepo
1import { IdResolver } from '@atproto/identity'
2import { Firehose, MemoryRunner, type Event } from '@atproto/sync'
3import { XyzStatusphereStatus } from '@statusphere/lexicon'
4import pino from 'pino'
5
6import type { Database } from '#/db'
7
8export async function createIngester(db: Database, idResolver: IdResolver) {
9 const logger = pino({ name: 'firehose ingestion' })
10
11 const cursor = await db
12 .selectFrom('cursor')
13 .where('id', '=', 1)
14 .select('seq')
15 .executeTakeFirst()
16
17 logger.info(`start cursor: ${cursor?.seq}`)
18
19 // For throttling cursor writes
20 let lastCursorWrite = 0
21
22 const runner = new MemoryRunner({
23 startCursor: cursor?.seq || undefined,
24 setCursor: async (seq) => {
25 const now = Date.now()
26
27 if (now - lastCursorWrite >= 10000) {
28 lastCursorWrite = now
29 await db
30 .updateTable('cursor')
31 .set({ seq })
32 .where('id', '=', 1)
33 .execute()
34 }
35 },
36 })
37
38 return new Firehose({
39 idResolver,
40 runner,
41 handleEvent: async (evt: Event) => {
42 // Watch for write events
43 if (evt.event === 'create' || evt.event === 'update') {
44 const now = new Date()
45 const record = evt.record
46
47 // If the write is a valid status update
48 if (
49 evt.collection === 'xyz.statusphere.status' &&
50 XyzStatusphereStatus.isRecord(record)
51 ) {
52 const validatedRecord = XyzStatusphereStatus.validateRecord(record)
53 if (!validatedRecord.success) return
54 // Store the status in our SQLite
55 await db
56 .insertInto('status')
57 .values({
58 uri: evt.uri.toString(),
59 authorDid: evt.did,
60 status: validatedRecord.value.status,
61 createdAt: validatedRecord.value.createdAt,
62 indexedAt: now.toISOString(),
63 })
64 .onConflict((oc) =>
65 oc.column('uri').doUpdateSet({
66 status: validatedRecord.value.status,
67 indexedAt: now.toISOString(),
68 }),
69 )
70 .execute()
71 }
72 } else if (
73 evt.event === 'delete' &&
74 evt.collection === 'xyz.statusphere.status'
75 ) {
76 // Remove the status from our SQLite
77 await db
78 .deleteFrom('status')
79 .where('uri', '=', evt.uri.toString())
80 .execute()
81 }
82 },
83 onError: (err: Error) => {
84 logger.error({ err }, 'error on firehose ingestion')
85 },
86 filterCollections: ['xyz.statusphere.status'],
87 excludeIdentity: true,
88 excludeAccount: true,
89 })
90}