See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add post-event, post-delete, account, and identity handling to JetstreamConsumer

Part A: tracked author post creates buffer a PostSnapshot with zero counts
into a second buffer (snapshotBuffer), flushed alongside engagement events
on the same 500ms / 1000-row schedule with shared cursor checkpointing.

Part B: every post event inspects embeddedPostUri and buffers a
kind='quote' engagement event when the embedded author is tracked.

Part C: post-delete events for tracked authors call tombstonePost immediately.

Part D: account deleted/takendown events remove the DID from the in-memory
tracked set, mark the user deleted in SQLite, and bulk-tombstone all their
post snapshots.

Part E: identity events for tracked users update the handle in SQLite.

New deps added to JetstreamConsumerDeps: markUserDeleted, updateUserHandle.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+198 -40
+187 -40
apps/web/app/services/jetstream_consumer.ts
··· 1 1 import type { EngagementEventRow } from '@skystar/clickhouse' 2 2 import type { ClickHouseStore } from '@skystar/clickhouse' 3 - import { parseJetstreamEvent } from '@skystar/atproto' 3 + import type { PostSnapshot } from '@skystar/clickhouse' 4 + import { parseJetstreamEvent, parseAtUri } from '@skystar/atproto' 5 + import type { PostEvent } from '@skystar/atproto' 4 6 5 7 // --------------------------------------------------------------------------- 6 8 // WebSocket-like interface for dependency injection in tests ··· 65 67 * In tests: override with (_ms) => Promise.resolve() for instant reconnect. 66 68 */ 67 69 reconnectDelay?: (ms: number) => Promise<void> 70 + 71 + /** 72 + * Marks a user as deleted/taken down in the metadata store. 73 + * In production: UPDATE users SET deleted_at = now() WHERE did = ? 74 + * In tests: records the call. 75 + */ 76 + markUserDeleted: (did: string) => Promise<void> 77 + 78 + /** 79 + * Updates the handle for a tracked user in the metadata store. 80 + * In production: UPDATE users SET handle = newHandle WHERE did = ? 81 + * In tests: records the call. 82 + */ 83 + updateUserHandle: (did: string, handle: string) => Promise<void> 68 84 } 69 85 70 86 // --------------------------------------------------------------------------- ··· 91 107 92 108 private trackedDids: Set<string> = new Set() 93 109 private eventBuffer: EngagementEventRow[] = [] 110 + private snapshotBuffer: PostSnapshot[] = [] 94 111 private shutdownRequested = false 95 112 private ws: WebSocketLike | null = null 96 113 private didRefreshTimer: ReturnType<typeof setInterval> | null = null ··· 98 115 99 116 /** 100 117 * The largest time_us (as BigInt) across all events buffered in the current 101 - * flush cycle. Updated whenever we push an event into the buffer. 118 + * flush cycle. Updated whenever we push an event into either buffer. 102 119 */ 103 120 private pendingCursor: bigint | null = null 104 121 ··· 267 284 const event = parseJetstreamEvent(rawJson) 268 285 if (event === null) return 269 286 287 + // Extract time_us from the raw message for cursor tracking. 288 + // The parsed event uses `ingestedAt` (a Date), but we need the raw 289 + // microsecond BigInt for cursor storage. 290 + let timeUs: bigint | null = null 291 + if (typeof rawJson === 'object' && rawJson !== null && 'time_us' in rawJson) { 292 + const raw = (rawJson as Record<string, unknown>)['time_us'] 293 + if (typeof raw === 'number') { 294 + timeUs = BigInt(Math.floor(raw)) 295 + } 296 + } 297 + 270 298 if (event.kind === 'like' || event.kind === 'repost') { 271 299 if (!this.trackedDids.has(event.postAuthorDid)) return 272 300 273 - // Extract time_us from the raw message for cursor tracking. 274 - // The parsed event uses `ingestedAt` (a Date), but we need the raw 275 - // microsecond BigInt for cursor storage. 276 - let timeUs: bigint | null = null 277 - if (typeof rawJson === 'object' && rawJson !== null && 'time_us' in rawJson) { 278 - const raw = (rawJson as Record<string, unknown>)['time_us'] 279 - if (typeof raw === 'number') { 280 - timeUs = BigInt(Math.floor(raw)) 281 - } 282 - } 283 - 284 301 this.eventBuffer.push({ 285 302 postUri: event.postUri, 286 303 postAuthorDid: event.postAuthorDid, ··· 290 307 eventCreatedAt: event.createdAt, 291 308 }) 292 309 293 - if (timeUs !== null) { 294 - if (this.pendingCursor === null || timeUs > this.pendingCursor) { 295 - this.pendingCursor = timeUs 296 - } 297 - } 310 + this.advancePendingCursor(timeUs) 298 311 299 312 // Flush immediately if the buffer hits 1000 rows 300 313 if (this.eventBuffer.length >= 1000) { 301 314 void this.flushBuffer() 302 315 } 316 + return 303 317 } 304 318 305 - // kind === 'post' | 'post-delete' | 'account' | 'identity': 306 - // Dropped in Task 9. Task 10 will add handling. 319 + if (event.kind === 'post') { 320 + this.handlePostEvent(event, timeUs) 321 + return 322 + } 323 + 324 + if (event.kind === 'post-delete') { 325 + if (this.trackedDids.has(event.authorDid)) { 326 + void this.clickHouseStore.tombstonePost(event.postUri, event.authorDid).catch((err) => { 327 + console.error('[JetstreamConsumer] tombstonePost failed:', err) 328 + }) 329 + } 330 + return 331 + } 332 + 333 + if (event.kind === 'account') { 334 + if ( 335 + (event.status === 'deleted' || event.status === 'takendown') && 336 + this.trackedDids.has(event.did) 337 + ) { 338 + this.trackedDids.delete(event.did) 339 + void this.deps.markUserDeleted(event.did).catch((err) => { 340 + console.error('[JetstreamConsumer] markUserDeleted failed:', err) 341 + }) 342 + void this.clickHouseStore.tombstoneUserSnapshots(event.did).catch((err) => { 343 + console.error('[JetstreamConsumer] tombstoneUserSnapshots failed:', err) 344 + }) 345 + } 346 + return 347 + } 348 + 349 + if (event.kind === 'identity') { 350 + if (this.trackedDids.has(event.did) && event.handle !== null) { 351 + void this.deps.updateUserHandle(event.did, event.handle).catch((err) => { 352 + console.error('[JetstreamConsumer] updateUserHandle failed:', err) 353 + }) 354 + } 355 + return 356 + } 357 + } 358 + 359 + /** 360 + * Handles a post create event: 361 + * 1. If the post's author is tracked, buffer a snapshot row with zero counts. 362 + * 2. Regardless of author, if the post embeds a tracked author's post, 363 + * buffer a quote engagement event. 364 + */ 365 + private handlePostEvent(event: PostEvent, timeUs: bigint | null): void { 366 + const isTrackedAuthor = this.trackedDids.has(event.authorDid) 367 + 368 + // Part A: snapshot insert for tracked authors 369 + if (isTrackedAuthor) { 370 + this.snapshotBuffer.push({ 371 + postUri: event.postUri, 372 + postAuthorDid: event.authorDid, 373 + postText: event.text, 374 + postCreatedAt: event.createdAt, 375 + snapshotLikes: 0, 376 + snapshotReposts: 0, 377 + snapshotQuotes: 0, 378 + snapshotTakenAt: this.deps.now(), 379 + }) 380 + 381 + this.advancePendingCursor(timeUs) 382 + 383 + // Flush immediately if the snapshot buffer hits 1000 rows 384 + if (this.snapshotBuffer.length >= 1000) { 385 + void this.flushBuffer() 386 + } 387 + } 388 + 389 + // Part B: quote detection — applies to ALL post events 390 + if (event.embeddedPostUri !== null) { 391 + let embeddedAuthorDid: string 392 + try { 393 + embeddedAuthorDid = parseAtUri(event.embeddedPostUri).did 394 + } catch { 395 + return 396 + } 397 + 398 + if (this.trackedDids.has(embeddedAuthorDid)) { 399 + this.eventBuffer.push({ 400 + postUri: event.embeddedPostUri, 401 + postAuthorDid: embeddedAuthorDid, 402 + actorDid: event.authorDid, 403 + rkey: event.rkey, 404 + kind: 'quote', 405 + eventCreatedAt: event.createdAt, 406 + }) 407 + 408 + this.advancePendingCursor(timeUs) 409 + 410 + // Flush immediately if the engagement buffer hits 1000 rows 411 + if (this.eventBuffer.length >= 1000) { 412 + void this.flushBuffer() 413 + } 414 + } 415 + } 416 + } 417 + 418 + /** Advances pendingCursor if timeUs is larger than the current value. */ 419 + private advancePendingCursor(timeUs: bigint | null): void { 420 + if (timeUs !== null) { 421 + if (this.pendingCursor === null || timeUs > this.pendingCursor) { 422 + this.pendingCursor = timeUs 423 + } 424 + } 307 425 } 308 426 309 427 // ------------------------------------------------------------------------- ··· 312 430 313 431 /** Called by the interval timer — only flushes if there's anything to flush. */ 314 432 private async flushIfNeeded(): Promise<void> { 315 - if (this.eventBuffer.length > 0) { 433 + if (this.eventBuffer.length > 0 || this.snapshotBuffer.length > 0) { 316 434 await this.flushBuffer() 317 435 } 318 436 } 319 437 320 438 /** 321 - * Drains the event buffer and inserts into ClickHouse. 322 - * On failure: re-buffers the events at the front so the next flush retries. 323 - * On success: checkpoints the cursor. 439 + * Drains both the engagement-event buffer and the post-snapshot buffer, 440 + * then checkpoints the cursor once both inserts succeed. 441 + * 442 + * Failure strategy: if either insert fails, the affected batch is 443 + * re-queued at the front of its buffer and the cursor is NOT advanced. 444 + * The caller will retry on the next flush interval. 324 445 */ 325 446 async flushBuffer(): Promise<void> { 326 - if (this.eventBuffer.length === 0) return 447 + if (this.eventBuffer.length === 0 && this.snapshotBuffer.length === 0) return 327 448 328 - const toFlush = this.eventBuffer.splice(0) 449 + const eventsToFlush = this.eventBuffer.splice(0) 450 + const snapshotsToFlush = this.snapshotBuffer.splice(0) 329 451 const cursorAtFlushTime = this.pendingCursor 330 452 this.pendingCursor = null 331 453 332 - try { 333 - await this.clickHouseStore.insertEngagementEvents(toFlush) 334 - } catch (err) { 335 - console.error(`[JetstreamConsumer] flush failed (${toFlush.length} events); will retry:`, err) 336 - // Re-buffer the failed batch at the front 337 - this.eventBuffer.unshift(...toFlush) 338 - // Restore pendingCursor 454 + let eventsOk = true 455 + let snapshotsOk = true 456 + 457 + // Flush engagement events 458 + if (eventsToFlush.length > 0) { 459 + try { 460 + await this.clickHouseStore.insertEngagementEvents(eventsToFlush) 461 + } catch (err) { 462 + console.error( 463 + `[JetstreamConsumer] engagement flush failed (${eventsToFlush.length} events); will retry:`, 464 + err 465 + ) 466 + this.eventBuffer.unshift(...eventsToFlush) 467 + eventsOk = false 468 + } 469 + } 470 + 471 + // Flush post snapshots 472 + if (snapshotsToFlush.length > 0) { 473 + try { 474 + await this.clickHouseStore.insertPostSnapshots(snapshotsToFlush) 475 + } catch (err) { 476 + console.error( 477 + `[JetstreamConsumer] snapshot flush failed (${snapshotsToFlush.length} snapshots); will retry:`, 478 + err 479 + ) 480 + this.snapshotBuffer.unshift(...snapshotsToFlush) 481 + snapshotsOk = false 482 + } 483 + } 484 + 485 + // Only checkpoint if both flushes succeeded 486 + if (eventsOk && snapshotsOk) { 487 + if (cursorAtFlushTime !== null) { 488 + this.lastFlushedCursor = cursorAtFlushTime 489 + await this.checkpointCursor(cursorAtFlushTime) 490 + } 491 + } else { 492 + // Restore pendingCursor so reconnect uses the right value 339 493 if (cursorAtFlushTime !== null) { 340 494 if (this.pendingCursor === null || cursorAtFlushTime > this.pendingCursor) { 341 495 this.pendingCursor = cursorAtFlushTime 342 496 } 343 497 } 344 - return 345 - } 346 - 347 - // Successful flush — checkpoint the cursor 348 - if (cursorAtFlushTime !== null) { 349 - this.lastFlushedCursor = cursorAtFlushTime 350 - await this.checkpointCursor(cursorAtFlushTime) 351 498 } 352 499 } 353 500
+11
apps/web/commands/jetstream_consume.ts
··· 3 3 import { ClickHouseStore } from '@skystar/clickhouse' 4 4 import { JetstreamConsumer } from '#services/jetstream_consumer' 5 5 import { readCursor, writeCursor } from '#services/jetstream_cursor_io' 6 + import User from '#models/user' 6 7 7 8 /** 8 9 * Ace command: node ace jetstream:consume ··· 60 61 writeCursor, 61 62 62 63 now: () => new Date(), 64 + 65 + // Mark a user as deleted/takendown in SQLite 66 + markUserDeleted: async (did: string) => { 67 + await User.query().where('did', did).update({ deletedAt: Date.now() }) 68 + }, 69 + 70 + // Update a user's handle in SQLite 71 + updateUserHandle: async (did: string, handle: string) => { 72 + await User.query().where('did', did).update({ handle }) 73 + }, 63 74 }) 64 75 65 76 // Graceful shutdown on SIGTERM (Docker stop, systemd stop, k8s eviction)