the statusphere demo reworked into a vite/react app in a monorepo
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8a3a1fc3e9be24e8cce58526f02b5b8f7d9482cd 90 lines 2.6 kB view raw
1import { IdResolver } from '@atproto/identity' 2import { Firehose, MemoryRunner, type Event } from '@atproto/sync' 3import { XyzStatusphereStatus } from '@statusphere/lexicon' 4import pino from 'pino' 5 6import type { Database } from '#/db' 7 8export async function createIngester(db: Database, idResolver: IdResolver) { 9 const logger = pino({ name: 'firehose ingestion' }) 10 11 const cursor = await db 12 .selectFrom('cursor') 13 .where('id', '=', 1) 14 .select('seq') 15 .executeTakeFirst() 16 17 logger.info(`start cursor: ${cursor?.seq}`) 18 19 // For throttling cursor writes 20 let lastCursorWrite = 0 21 22 const runner = new MemoryRunner({ 23 startCursor: cursor?.seq || undefined, 24 setCursor: async (seq) => { 25 const now = Date.now() 26 27 if (now - lastCursorWrite >= 10000) { 28 lastCursorWrite = now 29 await db 30 .updateTable('cursor') 31 .set({ seq }) 32 .where('id', '=', 1) 33 .execute() 34 } 35 }, 36 }) 37 38 return new Firehose({ 39 idResolver, 40 runner, 41 handleEvent: async (evt: Event) => { 42 // Watch for write events 43 if (evt.event === 'create' || evt.event === 'update') { 44 const now = new Date() 45 const record = evt.record 46 47 // If the write is a valid status update 48 if ( 49 evt.collection === 'xyz.statusphere.status' && 50 XyzStatusphereStatus.isRecord(record) 51 ) { 52 const validatedRecord = XyzStatusphereStatus.validateRecord(record) 53 if (!validatedRecord.success) return 54 // Store the status in our SQLite 55 await db 56 .insertInto('status') 57 .values({ 58 uri: evt.uri.toString(), 59 authorDid: evt.did, 60 status: validatedRecord.value.status, 61 createdAt: validatedRecord.value.createdAt, 62 indexedAt: now.toISOString(), 63 }) 64 .onConflict((oc) => 65 oc.column('uri').doUpdateSet({ 66 status: validatedRecord.value.status, 67 indexedAt: now.toISOString(), 68 }), 69 ) 70 .execute() 71 } 72 } else if ( 73 evt.event === 'delete' && 74 evt.collection === 'xyz.statusphere.status' 75 ) { 76 // Remove the status from our SQLite 77 await db 78 .deleteFrom('status') 79 .where('uri', '=', evt.uri.toString()) 80 .execute() 81 } 82 }, 83 onError: (err: Error) => { 84 logger.error({ err }, 'error on firehose ingestion') 85 }, 86 filterCollections: ['xyz.statusphere.status'], 87 excludeIdentity: true, 88 excludeAccount: true, 89 }) 90}