See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix FirehoseConsumer unlike footguns: chain poisoning and silent drop

Two issues addressed on the unlike path:

1. `this.processing = this.processing.then(...)` had no `.catch()`, so a
future refactor that let handleUnlike reject would permanently poison
the chain and silently drop every subsequent unlike. Added a catch
that logs and absorbs the error.

2. On `lookupSubjectUri` failure, the previous code logged and returned
without advancing the cursor — but a later like-create in the same
session would push pendingCursor past the failed event's time_us,
so the unlike was silently dropped. Now we force a reconnect via
`this.ws?.close()`, which resumes from `lastFlushedCursor` and
replays the failed unlike.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+73 -4
+13 -4
app/services/firehose_consumer.ts
··· 277 277 // back-to-back for the same like_uri. 278 278 const likeUri = event.likeUri 279 279 const capturedTimeUs = timeUs 280 - this.processing = this.processing.then(() => this.handleUnlike(likeUri, capturedTimeUs)) 280 + this.processing = this.processing 281 + .then(() => this.handleUnlike(likeUri, capturedTimeUs)) 282 + .catch((err) => { 283 + // Don't let a single handleUnlike failure poison the chain — 284 + // without this, every subsequent `.then()` would short-circuit 285 + // and all future unlikes would silently fail. 286 + console.error('[FirehoseConsumer] handleUnlike chain error:', err) 287 + }) 281 288 return 282 289 } 283 290 ··· 306 313 await this.flushBuffers() 307 314 } 308 315 } catch (err) { 309 - // Lookup failures are non-fatal — log and skip. Not advancing the 310 - // cursor here means a reconnect could replay; that's the design noise 311 - // the spec acknowledges. 316 + // Lookup failure forces a reconnect so we replay from the last 317 + // durable checkpoint. We do NOT advance the cursor here; a later 318 + // like-create in this session would otherwise push `pendingCursor` 319 + // past this event's time_us and silently drop the unlike. 312 320 console.error('[FirehoseConsumer] lookupSubjectUri failed for', likeUri, err) 321 + this.ws?.close() 313 322 } 314 323 } 315 324
+60
tests/unit/services/firehose_consumer.spec.ts
··· 312 312 }) 313 313 }) 314 314 315 + test.group('FirehoseConsumer — lookup failure', () => { 316 + test('closes the websocket to force a reconnect when lookup throws', async ({ assert }) => { 317 + const { fakeWs, consumer, writeCursorCalls, countsInserts } = makeConsumer() 318 + // Swap in a throwing lookup after construction 319 + ;(consumer as any).deps.lookupSubjectUri = async () => { 320 + throw new Error('ClickHouse unavailable (lookup)') 321 + } 322 + await startInBackground(consumer) 323 + 324 + fakeWs.emit(makeLikeDelete(ACTOR_DID, 'likerkeyaaa', 1725911162329309)) 325 + 326 + // Let the processing chain settle 327 + await (consumer as any).processing 328 + await consumer.flushBuffers() 329 + 330 + assert.equal(fakeWs.closeCallCount, 1, 'ws.close() was called to force reconnect') 331 + assert.equal(countsInserts.length, 0, 'no delta recorded for the failed unlike') 332 + assert.equal(writeCursorCalls.length, 0, 'cursor was not advanced past the failed unlike') 333 + 334 + await consumer.shutdown() 335 + }) 336 + 337 + test('a rejecting handleUnlike does not poison the processing chain', async ({ assert }) => { 338 + const likeUri2 = `at://${ACTOR_DID}/app.bsky.feed.like/goodrkey` 339 + const subjectUri2 = `at://${AUTHOR_DID}/app.bsky.feed.post/postrkey002` 340 + const { fakeWs, consumer, countsInserts } = makeConsumer({ 341 + subjectLookup: new Map([[likeUri2, subjectUri2]]), 342 + }) 343 + 344 + // Replace handleUnlike so the first invocation rejects — simulates a 345 + // future refactor where the method itself can throw outside the inner 346 + // try/catch. Subsequent invocations delegate to the original. 347 + const original = (consumer as any).handleUnlike.bind(consumer) 348 + let firstCall = true 349 + ;(consumer as any).handleUnlike = async (uri: string, timeUs: bigint | null) => { 350 + if (firstCall) { 351 + firstCall = false 352 + throw new Error('boom') 353 + } 354 + return original(uri, timeUs) 355 + } 356 + 357 + await startInBackground(consumer) 358 + 359 + // First unlike: handleUnlike rejects 360 + fakeWs.emit(makeLikeDelete(ACTOR_DID, 'likerkeyaaa', 1725911162329309)) 361 + // Second unlike: must still be processed despite the earlier rejection 362 + fakeWs.emit(makeLikeDelete(ACTOR_DID, 'goodrkey', 1725911162329310)) 363 + 364 + await (consumer as any).processing 365 + await consumer.flushBuffers() 366 + 367 + assert.equal(countsInserts.length, 1, 'second unlike still produced a delta') 368 + assert.equal(countsInserts[0][0].subjectUri, subjectUri2) 369 + assert.equal(countsInserts[0][0].count, -1) 370 + 371 + await consumer.shutdown() 372 + }) 373 + }) 374 + 315 375 test.group('FirehoseConsumer — URL construction', () => { 316 376 test('subscribes only to app.bsky.feed.like and omits wantedDids', async ({ assert }) => { 317 377 const { consumer, connectedUrls } = makeConsumer()