[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

Make ingester resume with a cursor on restart (#10)

* record processed events

* save cursor for resuming on reconnection

* remove processedevent and make cursor save periodcally

* Update jetstream-client.ts

authored by

Davi Rodrigues and committed by
GitHub
04fc4d4a e2301ccc

+113 -12
+45 -2
services/ingester/src/db/connection.ts
··· 12 12 lookSchema, 13 13 generatorSchema, 14 14 actorSchema, 15 + cursorStateSchema, 15 16 } from './models.js' 16 17 import { env } from '../utils/env.js' 17 18 import { pino } from 'pino' ··· 35 36 Look: this.connection.model('Look', lookSchema), 36 37 Generator: this.connection.model('Generator', generatorSchema), 37 38 Actor: this.connection.model('Actor', actorSchema), 39 + CursorState: this.connection.model('CursorState', cursorStateSchema), 38 40 } 39 41 } 40 42 ··· 42 44 const { DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME } = env 43 45 const uri = `mongodb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/?appName=ingester` 44 46 45 - this.logger.info(`Connecting to MongoDB at ${DB_HOST}:${DB_PORT}/?appName=ingester`) 47 + this.logger.info( 48 + `Connecting to MongoDB at ${DB_HOST}:${DB_PORT}/?appName=ingester`, 49 + ) 46 50 47 51 try { 48 52 await this.connection.openUri(uri, { ··· 63 67 this.logger.info('Disconnected from MongoDB') 64 68 } 65 69 } 66 - } 70 + 71 + private readonly CURSOR_IDENTIFIER = 'last_processed_cursor' 72 + 73 + async getCursorState(): Promise<number | null> { 74 + try { 75 + const state = await this.models.CursorState.findOne({ 76 + identifier: this.CURSOR_IDENTIFIER, 77 + }).exec() 78 + if (state) { 79 + this.logger.info( 80 + { cursorValue: state.cursorValue }, 81 + 'Loaded cursor state', 82 + ) 83 + return state.cursorValue 84 + } 85 + this.logger.info('No cursor state found in database.') 86 + return null 87 + } catch (error) { 88 + this.logger.error({ error }, 'Failed to get cursor state') 89 + return null // Or rethrow, depending on desired error handling 90 + } 91 + } 92 + 93 + async saveCursorState(cursorValue: number): Promise<void> { 94 + try { 95 + await this.models.CursorState.updateOne( 96 + { identifier: this.CURSOR_IDENTIFIER }, 97 + { 98 + $set: { cursorValue, updatedAt: new Date() }, 99 + $setOnInsert: { identifier: this.CURSOR_IDENTIFIER }, 100 + }, 101 + { upsert: true }, 102 + ).exec() 103 + this.logger.debug({ cursorValue }, 'Saved cursor state') 104 + } catch (error) { 105 + this.logger.error({ cursorValue, error }, 'Failed to save cursor state') 106 + // Depending on desired behavior, you might want to rethrow or handle more gracefully 107 + } 108 + } 109 + }
+15 -3
services/ingester/src/db/models.ts
··· 345 345 generatorSchema.index({ authorDid: 1, createdAt: -1 }) 346 346 generatorSchema.index({ did: 1, createdAt: -1 }) 347 347 348 - 349 348 export interface ActorDocument extends Document { 350 349 did: string 351 350 handle: string | null 352 351 indexedAt: string 353 - takedownRef: string | null 352 + takedownRef: string | null 354 353 upstreamStatus: string | null 355 354 } 356 355 357 356 export const actorSchema = new Schema<ActorDocument>({ 358 - did: { type: String, required: true, index: true }, 357 + did: { type: String, required: true }, 359 358 handle: { type: String, required: false, index: true }, 360 359 indexedAt: { type: String, required: true }, 361 360 takedownRef: { type: String, required: false }, ··· 366 365 actorSchema.index({ handle: 'text' }) 367 366 actorSchema.index({ did: 1 }, { unique: true }) 368 367 368 + export interface CursorStateDocument extends Document { 369 + identifier: string // To ensure a single document, e.g., 'last_processed_cursor' 370 + cursorValue: number 371 + updatedAt: Date 372 + } 373 + 374 + export const cursorStateSchema = new Schema<CursorStateDocument>({ 375 + identifier: { type: String, required: true, unique: true, index: true }, 376 + cursorValue: { type: Number, required: true }, 377 + updatedAt: { type: Date, default: Date.now }, 378 + }) 379 + 369 380 export interface DatabaseModels { 370 381 Like: Model<LikeDocument> 371 382 Post: Model<PostDocument> ··· 378 389 Look: Model<LookDocument> 379 390 Generator: Model<GeneratorDocument> 380 391 Actor: Model<ActorDocument> 392 + CursorState: Model<CursorStateDocument> 381 393 }
+1 -1
services/ingester/src/index.ts
··· 30 30 const bidirectionalResolver = createBidirectionalResolver(resolver) 31 31 32 32 // Create and start Jetstream client 33 - const jetstreamClient = createJetstreamClient(db, bidirectionalResolver) 33 + const jetstreamClient = await createJetstreamClient(db, bidirectionalResolver) 34 34 const connection = jetstreamClient.connect({ 35 35 filterCollections: ['so.sprk.*'], 36 36 })
+2
services/ingester/src/types/events.ts
··· 8 8 rkey: string 9 9 record: any 10 10 cid: string 11 + rev: string 11 12 } 12 13 } 13 14 ··· 20 21 rkey: string 21 22 record: any 22 23 cid: string 24 + rev: string 23 25 } 24 26 time_us: number 25 27 event: 'create' | 'update' | 'delete'
+50 -6
services/ingester/src/utils/jetstream-client.ts
··· 13 13 initialCursor?: number | null 14 14 } 15 15 16 - export function createJetstreamClient( 16 + export async function createJetstreamClient( 17 17 db: Database, 18 18 resolver: BidirectionalResolver, 19 19 ) { 20 - let cursorPosition: number | null = null 20 + // Load initial cursor from DB 21 + let cursorPosition: number | null = await db.getCursorState() 22 + if (cursorPosition) { 23 + logger.info( 24 + { initialCursor: cursorPosition }, 25 + 'Loaded initial cursor from DB', 26 + ) 27 + } else { 28 + logger.info('No initial cursor found in DB, will start from live feed.') 29 + } 30 + 21 31 let wsConnection: WebSocket | null = null 22 32 let heartbeatInterval: Timer | null = null 33 + let saveCursorInterval: Timer | null = null // Added for periodic cursor saving 23 34 24 35 function connect(options: JetstreamClientOptions = {}): { 25 36 close: () => void 26 37 } { 27 - const { filterCollections = ['so.sprk.*'], initialCursor = null } = options 38 + // Use the loaded cursorPosition as default if no initialCursor is provided in options 39 + const { 40 + filterCollections = ['so.sprk.*'], 41 + initialCursor = cursorPosition, 42 + } = options 28 43 44 + // Update cursorPosition if an initialCursor was explicitly passed in options or from DB 29 45 if (initialCursor) { 30 46 cursorPosition = initialCursor 31 47 } 32 48 33 49 let url = filterCollections.reduce((acc, collection) => { 34 - return `${acc}&wantedCollections=${collection}` 50 + return `${acc}wantedCollections=${collection}&` 35 51 }, `${env.JETSTREAM_URL}?`) 36 52 37 53 if (cursorPosition) { 38 54 // Subtract a few seconds (in microseconds) to ensure no gaps 39 55 const rewindCursor = parseInt(cursorPosition.toString()) - 5000000 // 5 seconds in microseconds 40 - url += `&cursor=${rewindCursor}` 56 + url += `cursor=${rewindCursor}` 41 57 } 42 58 43 59 logger.info(`Connecting to Jetstream: ${url}`) ··· 46 62 47 63 wsConnection.on('open', () => { 48 64 logger.info('Connected to Jetstream') 65 + // Start periodic cursor saving only after successful connection 66 + startPeriodicCursorSave() 49 67 }) 50 68 51 69 wsConnection.on('message', async (data) => { ··· 86 104 return { 87 105 close: () => { 88 106 clearHeartbeatInterval() 107 + clearSaveCursorInterval() // Clear cursor save interval on close/error 89 108 if (wsConnection && wsConnection.readyState === WebSocket.OPEN) { 90 109 wsConnection.close() 91 110 wsConnection = null ··· 101 120 } 102 121 } 103 122 123 + function clearSaveCursorInterval() { 124 + if (saveCursorInterval) { 125 + clearInterval(saveCursorInterval) 126 + saveCursorInterval = null 127 + } 128 + } 129 + 130 + function startPeriodicCursorSave() { 131 + clearSaveCursorInterval() // Clear any existing interval first 132 + saveCursorInterval = setInterval(async () => { 133 + if (cursorPosition !== null) { 134 + try { 135 + await db.saveCursorState(cursorPosition) 136 + logger.debug({ cursorPosition }, 'Periodically saved cursor state') 137 + } catch (error) { 138 + logger.error( 139 + { cursorPosition, error }, 140 + 'Failed to periodically save cursor state', 141 + ) 142 + } 143 + } 144 + }, 30000) // Save every 30 seconds 145 + } 146 + 104 147 function handleReconnect(options: JetstreamClientOptions) { 105 148 clearHeartbeatInterval() 149 + clearSaveCursorInterval() // Clear cursor save interval before reconnecting 106 150 107 151 if (wsConnection) { 108 152 wsConnection = null ··· 119 163 } 120 164 121 165 const { did, time_us } = event 122 - const { operation, collection, rkey, record, cid } = event.commit 166 + const { operation, collection, rkey, record } = event.commit 123 167 124 168 logger.debug( 125 169 `Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`,