See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Move parsePostEmbed to consumer; only parse embeds for tracked authors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+222 -102
+27 -22
app/lib/atproto/parsers/jetstream.ts
··· 71 71 /** 72 72 * Parse a post record's embed field into a PostEmbed. 73 73 * 74 - * Called from `parsePostCreate` for every post event. For quote-post embed 75 - * types (record / recordWithMedia), returns null — the consumer uses 76 - * `embeddedPostUri` to detect quotes instead. 74 + * NOT called from `parsePostCreate` — the consumer calls this directly, 75 + * only for tracked authors, to save CPU on the firehose-scale hot path. 76 + * 77 + * For quote-post embed types (record / recordWithMedia), returns 78 + * `{ skip: true }` so the consumer knows to skip the snapshot entirely. 79 + * For images/video/external, returns `{ skip: false, embed: ... }`. 80 + * For no embed or malformed data, returns `{ skip: false, embed: null }`. 77 81 * 78 82 * @param record - The raw post record object from the Jetstream commit 79 83 * @param authorDid - The DID of the post's author (needed for CDN URL construction) 80 - * @returns A PostEmbed or null 81 84 */ 82 - export function parsePostEmbed(record: unknown, authorDid: string): PostEmbed | null { 83 - if (!isObject(record)) return null 85 + export function parsePostEmbed( 86 + record: unknown, 87 + authorDid: string 88 + ): { skip: true } | { skip: false; embed: PostEmbed | null } { 89 + if (!isObject(record)) return { skip: false, embed: null } 84 90 85 91 const embed = getObject(record, 'embed') 86 - if (!embed) return null 92 + if (!embed) return { skip: false, embed: null } 87 93 88 94 const embedType = getString(embed, '$type') 89 - if (!embedType) return null 95 + if (!embedType) return { skip: false, embed: null } 90 96 91 - // Quote-post types — not rendered, handled by embeddedPostUri 97 + // Quote-post types — skip the snapshot entirely 92 98 if (embedType === 'app.bsky.embed.record' || embedType === 'app.bsky.embed.recordWithMedia') { 93 - return null 99 + return { skip: true } 94 100 } 95 101 96 102 // Images embed ··· 98 104 const rawImages = embed['images'] 99 105 if (!Array.isArray(rawImages) || rawImages.length === 0) { 100 106 console.warn('[parsePostEmbed] images embed has no images array') 101 - return null 107 + return { skip: false, embed: null } 102 108 } 103 109 104 110 const items: Array<{ ··· 147 153 148 154 if (items.length === 0) { 149 155 console.warn('[parsePostEmbed] images embed produced no valid items') 150 - return null 156 + return { skip: false, embed: null } 151 157 } 152 158 153 - return { type: 'images', items } 159 + return { skip: false, embed: { type: 'images', items } } 154 160 } 155 161 156 162 // Video embed ··· 158 164 const blob = getObject(embed, 'video') 159 165 if (!blob) { 160 166 console.warn('[parsePostEmbed] video embed missing video blob') 161 - return null 167 + return { skip: false, embed: null } 162 168 } 163 169 const cid = blobCid(blob) 164 170 if (!cid) { 165 171 console.warn('[parsePostEmbed] video blob missing ref.$link') 166 - return null 172 + return { skip: false, embed: null } 167 173 } 168 174 169 175 const alt = getString(embed, 'alt') ?? '' ··· 187 193 } 188 194 } 189 195 190 - return result 196 + return { skip: false, embed: result } 191 197 } 192 198 193 199 // External embed ··· 195 201 const external = getObject(embed, 'external') 196 202 if (!external) { 197 203 console.warn('[parsePostEmbed] external embed missing external object') 198 - return null 204 + return { skip: false, embed: null } 199 205 } 200 206 201 207 const uri = getString(external, 'uri') ··· 203 209 const description = getString(external, 'description') 204 210 if (uri === undefined || title === undefined || description === undefined) { 205 211 console.warn('[parsePostEmbed] external embed missing uri/title/description') 206 - return null 212 + return { skip: false, embed: null } 207 213 } 208 214 209 215 let thumb: string | null = null ··· 215 221 } 216 222 } 217 223 218 - return { type: 'external', uri, title, description, thumb } 224 + return { skip: false, embed: { type: 'external', uri, title, description, thumb } } 219 225 } 220 226 221 227 // Unknown embed type — not an error, just not supported 222 - return null 228 + return { skip: false, embed: null } 223 229 } 224 230 225 231 // --------------------------------------------------------------------------- ··· 347 353 348 354 const postUri = `at://${did}/${collection}/${rkey}` 349 355 const embeddedPostUri = extractEmbeddedPostUri(record) 350 - const embed = parsePostEmbed(record, did) 351 356 352 357 return { 353 358 kind: 'post', ··· 357 362 text, 358 363 createdAt: new Date(createdAtStr), 359 364 embeddedPostUri, 360 - embed, 365 + rawRecord: record, 361 366 ingestedAt: timeUsToDate(timeUs), 362 367 } 363 368 }
+4 -3
app/lib/atproto/types.ts
··· 70 70 */ 71 71 embeddedPostUri: string | null 72 72 /** 73 - * Parsed embed (images/video/external link) from the post record. 74 - * Null for posts with no embed or quote-post embed types. 73 + * The raw post record object from the Jetstream commit. 74 + * Passed through so the consumer can call parsePostEmbed() only for 75 + * tracked authors (saving CPU on the firehose-scale hot path). 75 76 */ 76 - embed: PostEmbed | null 77 + rawRecord: Record<string, unknown> 77 78 /** When the event was ingested (from top-level time_us, converted from µs) */ 78 79 ingestedAt: Date 79 80 }
+4 -5
app/services/jetstream_consumer.ts
··· 1 1 import type { EngagementEventRow } from '#lib/clickhouse/index' 2 2 import type { ClickHouseStore } from '#lib/clickhouse/index' 3 3 import type { PostSnapshot } from '#lib/clickhouse/index' 4 - import { parseJetstreamEvent, parseAtUri } from '#lib/atproto/index' 4 + import { parseJetstreamEvent, parseAtUri, parsePostEmbed } from '#lib/atproto/index' 5 5 import type { PostEvent } from '#lib/atproto/index' 6 6 7 7 // --------------------------------------------------------------------------- ··· 367 367 368 368 // Part A: snapshot insert for tracked authors 369 369 if (isTrackedAuthor) { 370 - if (event.embeddedPostUri !== null) { 371 - // Quote post — skip snapshot, don't advance cursor; Part B handles engagement 372 - } else { 370 + const embedResult = parsePostEmbed(event.rawRecord, event.authorDid) 371 + if (!embedResult.skip) { 373 372 this.snapshotBuffer.push({ 374 373 postUri: event.postUri, 375 374 postAuthorDid: event.authorDid, ··· 379 378 snapshotReposts: 0, 380 379 snapshotQuotes: 0, 381 380 snapshotTakenAt: this.deps.now(), 382 - embed: event.embed, 381 + embed: embedResult.embed, 383 382 }) 384 383 385 384 this.advancePendingCursor(timeUs)
+108 -72
tests/unit/atproto/jetstream.spec.ts
··· 506 506 507 507 const result = parsePostEmbed(record, TEST_DID) 508 508 509 - assert.isNotNull(result) 510 - assert.equal(result!.type, 'images') 511 - if (result?.type !== 'images') return 509 + assert.isFalse(result.skip) 510 + if (result.skip) return 512 511 513 - assert.equal(result.items.length, 2) 512 + assert.isNotNull(result.embed) 513 + assert.equal(result.embed!.type, 'images') 514 + if (result.embed?.type !== 'images') return 515 + 516 + assert.equal(result.embed.items.length, 2) 514 517 assert.equal( 515 - result.items[0].thumb, 518 + result.embed.items[0].thumb, 516 519 `https://cdn.bsky.app/img/feed_thumbnail/plain/${TEST_DID}/bafkreifirstcid` 517 520 ) 518 521 assert.equal( 519 - result.items[0].fullsize, 522 + result.embed.items[0].fullsize, 520 523 `https://cdn.bsky.app/img/feed_fullsize/plain/${TEST_DID}/bafkreifirstcid` 521 524 ) 522 - assert.equal(result.items[0].alt, 'first image') 523 - assert.deepEqual(result.items[0].aspectRatio, { width: 1200, height: 675 }) 525 + assert.equal(result.embed.items[0].alt, 'first image') 526 + assert.deepEqual(result.embed.items[0].aspectRatio, { width: 1200, height: 675 }) 524 527 525 528 assert.equal( 526 - result.items[1].thumb, 529 + result.embed.items[1].thumb, 527 530 `https://cdn.bsky.app/img/feed_thumbnail/plain/${TEST_DID}/bafkreisecondcid` 528 531 ) 529 - assert.equal(result.items[1].alt, 'second image') 532 + assert.equal(result.embed.items[1].alt, 'second image') 530 533 }) 531 534 532 535 test('parses single image without aspectRatio', ({ assert }) => { ··· 549 552 550 553 const result = parsePostEmbed(record, TEST_DID) 551 554 552 - assert.isNotNull(result) 553 - if (result?.type !== 'images') return 555 + assert.isFalse(result.skip) 556 + if (result.skip) return 557 + 558 + assert.isNotNull(result.embed) 559 + if (result.embed?.type !== 'images') return 554 560 555 - assert.equal(result.items.length, 1) 556 - assert.isUndefined(result.items[0].aspectRatio) 561 + assert.equal(result.embed.items.length, 1) 562 + assert.isUndefined(result.embed.items[0].aspectRatio) 557 563 }) 558 564 }) 559 565 ··· 575 581 576 582 const result = parsePostEmbed(record, TEST_DID) 577 583 578 - assert.isNotNull(result) 579 - assert.equal(result!.type, 'video') 580 - if (result?.type !== 'video') return 584 + assert.isFalse(result.skip) 585 + if (result.skip) return 586 + 587 + assert.isNotNull(result.embed) 588 + assert.equal(result.embed!.type, 'video') 589 + if (result.embed?.type !== 'video') return 581 590 582 591 const encodedDid = encodeURIComponent(TEST_DID) 583 592 assert.equal( 584 - result.thumbnail, 593 + result.embed.thumbnail, 585 594 `https://video.bsky.app/watch/${encodedDid}/bafkreibefbrio2cu7cydqjvgg46bx63zsu4i5t6eblohtjjk3j7sotgrim/thumbnail.jpg` 586 595 ) 587 - assert.equal(result.alt, 'my video') 588 - assert.deepEqual(result.aspectRatio, { width: 440, height: 912 }) 596 + assert.equal(result.embed.alt, 'my video') 597 + assert.deepEqual(result.embed.aspectRatio, { width: 440, height: 912 }) 589 598 }) 590 599 591 600 test('video embed with no alt defaults to empty string', ({ assert }) => { ··· 603 612 604 613 const result = parsePostEmbed(record, TEST_DID) 605 614 606 - assert.isNotNull(result) 607 - if (result?.type !== 'video') return 615 + assert.isFalse(result.skip) 616 + if (result.skip) return 608 617 609 - assert.equal(result.alt, '') 610 - assert.isUndefined(result.aspectRatio) 618 + assert.isNotNull(result.embed) 619 + if (result.embed?.type !== 'video') return 620 + 621 + assert.equal(result.embed.alt, '') 622 + assert.isUndefined(result.embed.aspectRatio) 611 623 }) 612 624 }) 613 625 ··· 632 644 633 645 const result = parsePostEmbed(record, TEST_DID) 634 646 635 - assert.isNotNull(result) 636 - assert.equal(result!.type, 'external') 637 - if (result?.type !== 'external') return 647 + assert.isFalse(result.skip) 648 + if (result.skip) return 649 + 650 + assert.isNotNull(result.embed) 651 + assert.equal(result.embed!.type, 'external') 652 + if (result.embed?.type !== 'external') return 638 653 639 - assert.equal(result.uri, 'https://example.com') 640 - assert.equal(result.title, 'Example') 641 - assert.equal(result.description, 'An example page') 654 + assert.equal(result.embed.uri, 'https://example.com') 655 + assert.equal(result.embed.title, 'Example') 656 + assert.equal(result.embed.description, 'An example page') 642 657 assert.equal( 643 - result.thumb, 658 + result.embed.thumb, 644 659 `https://cdn.bsky.app/img/feed_thumbnail/plain/${TEST_DID}/bafkreithumbcid` 645 660 ) 646 661 }) ··· 659 674 660 675 const result = parsePostEmbed(record, TEST_DID) 661 676 662 - assert.isNotNull(result) 663 - if (result?.type !== 'external') return 677 + assert.isFalse(result.skip) 678 + if (result.skip) return 679 + 680 + assert.isNotNull(result.embed) 681 + if (result.embed?.type !== 'external') return 664 682 665 - assert.isNull(result.thumb) 683 + assert.isNull(result.embed.thumb) 666 684 }) 667 685 }) 668 686 669 - test.group('parsePostEmbed — quote types return null', () => { 670 - test('record embed returns null', ({ assert }) => { 687 + test.group('parsePostEmbed — quote types return { skip: true }', () => { 688 + test('record embed returns { skip: true }', ({ assert }) => { 671 689 const record = { 672 690 embed: { 673 691 $type: 'app.bsky.embed.record', ··· 679 697 } 680 698 681 699 const result = parsePostEmbed(record, TEST_DID) 682 - assert.isNull(result) 700 + assert.deepEqual(result, { skip: true }) 683 701 }) 684 702 685 - test('recordWithMedia embed returns null', ({ assert }) => { 703 + test('recordWithMedia embed returns { skip: true }', ({ assert }) => { 686 704 const record = { 687 705 embed: { 688 706 $type: 'app.bsky.embed.recordWithMedia', ··· 692 710 } 693 711 694 712 const result = parsePostEmbed(record, TEST_DID) 695 - assert.isNull(result) 713 + assert.deepEqual(result, { skip: true }) 696 714 }) 697 715 }) 698 716 699 717 test.group('parsePostEmbed — no embed / malformed', () => { 700 - test('no embed field returns null', ({ assert }) => { 718 + test('no embed field returns { skip: false, embed: null }', ({ assert }) => { 701 719 const record = { 702 720 $type: 'app.bsky.feed.post', 703 721 text: 'just text', 704 722 } 705 723 706 724 const result = parsePostEmbed(record, TEST_DID) 707 - assert.isNull(result) 725 + assert.deepEqual(result, { skip: false, embed: null }) 708 726 }) 709 727 710 - test('malformed blob missing ref.$link returns null and warns', ({ assert }) => { 728 + test('malformed blob missing ref.$link returns { skip: false, embed: null }', ({ assert }) => { 711 729 const record = { 712 730 embed: { 713 731 $type: 'app.bsky.embed.images', ··· 726 744 } 727 745 728 746 const result = parsePostEmbed(record, TEST_DID) 729 - assert.isNull(result) 747 + assert.deepEqual(result, { skip: false, embed: null }) 730 748 }) 731 749 732 - test('non-object record returns null', ({ assert }) => { 750 + test('non-object record returns { skip: false, embed: null }', ({ assert }) => { 733 751 const result = parsePostEmbed('not an object', TEST_DID) 734 - assert.isNull(result) 752 + assert.deepEqual(result, { skip: false, embed: null }) 735 753 }) 736 754 }) 737 755 738 - test.group('parseJetstreamEvent — post create with embeds integration', () => { 739 - test('post create with images embed populates event.embed', ({ assert }) => { 756 + test.group('parseJetstreamEvent — post create carries rawRecord for deferred embed parsing', () => { 757 + test('post create with images embed carries rawRecord (embed parsed by consumer, not parser)', ({ 758 + assert, 759 + }) => { 760 + const record = { 761 + $type: 'app.bsky.feed.post', 762 + text: 'Check out these pics', 763 + createdAt: '2024-09-09T19:48:00.000Z', 764 + embed: { 765 + $type: 'app.bsky.embed.images', 766 + images: [ 767 + { 768 + alt: 'test image', 769 + image: { 770 + $type: 'blob', 771 + ref: { $link: 'bafkreitestcid' }, 772 + mimeType: 'image/jpeg', 773 + size: 50000, 774 + }, 775 + }, 776 + ], 777 + }, 778 + } 779 + 740 780 const event = parseJetstreamEvent({ 741 781 did: 'did:plc:author123', 742 782 time_us: 1725911162329308, ··· 746 786 operation: 'create', 747 787 collection: 'app.bsky.feed.post', 748 788 rkey: 'postrkeyimg', 749 - record: { 750 - $type: 'app.bsky.feed.post', 751 - text: 'Check out these pics', 752 - createdAt: '2024-09-09T19:48:00.000Z', 753 - embed: { 754 - $type: 'app.bsky.embed.images', 755 - images: [ 756 - { 757 - alt: 'test image', 758 - image: { 759 - $type: 'blob', 760 - ref: { $link: 'bafkreitestcid' }, 761 - mimeType: 'image/jpeg', 762 - size: 50000, 763 - }, 764 - }, 765 - ], 766 - }, 767 - }, 789 + record, 768 790 cid: 'bafypostcidimg', 769 791 }, 770 792 }) ··· 772 794 assert.isNotNull(event) 773 795 if (!event || event.kind !== 'post') return 774 796 775 - assert.isNotNull(event.embed) 776 - assert.equal(event.embed!.type, 'images') 797 + // rawRecord should be the same object reference as the record 798 + assert.strictEqual(event.rawRecord, record) 777 799 assert.isNull(event.embeddedPostUri) 800 + 801 + // Verify parsePostEmbed works when called separately on the rawRecord 802 + const embedResult = parsePostEmbed(event.rawRecord, event.authorDid) 803 + assert.isFalse(embedResult.skip) 804 + if (!embedResult.skip) { 805 + assert.isNotNull(embedResult.embed) 806 + assert.equal(embedResult.embed!.type, 'images') 807 + } 778 808 }) 779 809 780 - test('post create with no embed has null embed', ({ assert }) => { 810 + test('post create with no embed has rawRecord with no embed field', ({ assert }) => { 781 811 const event = parseJetstreamEvent(POST_CREATE_EVENT) 782 812 783 813 assert.isNotNull(event) 784 814 if (!event || event.kind !== 'post') return 785 815 786 - assert.isNull(event.embed) 816 + assert.isObject(event.rawRecord) 817 + const embedResult = parsePostEmbed(event.rawRecord, event.authorDid) 818 + assert.isFalse(embedResult.skip) 819 + if (!embedResult.skip) { 820 + assert.isNull(embedResult.embed) 821 + } 787 822 }) 788 823 789 - test('post create with record embed has null embed but non-null embeddedPostUri', ({ 824 + test('post create with record embed has non-null embeddedPostUri and skip=true from parsePostEmbed', ({ 790 825 assert, 791 826 }) => { 792 827 const event = parseJetstreamEvent(POST_CREATE_WITH_EMBED_RECORD_EVENT) ··· 794 829 assert.isNotNull(event) 795 830 if (!event || event.kind !== 'post') return 796 831 797 - assert.isNull(event.embed) 798 832 assert.isNotNull(event.embeddedPostUri) 833 + const embedResult = parsePostEmbed(event.rawRecord, event.authorDid) 834 + assert.deepEqual(embedResult, { skip: true }) 799 835 }) 800 836 })
+79
tests/unit/services/jetstream_consumer.spec.ts
··· 215 215 } 216 216 } 217 217 218 + function makePostEventWithRecordWithMediaEmbed( 219 + authorDid: string, 220 + embeddedPostUri: string, 221 + timeUs = 1725911162329308 222 + ) { 223 + return { 224 + did: authorDid, 225 + time_us: timeUs, 226 + kind: 'commit', 227 + commit: { 228 + rev: 'rev4rwm', 229 + operation: 'create', 230 + collection: 'app.bsky.feed.post', 231 + rkey: 'postrkeyrecordwithmedia', 232 + record: { 233 + $type: 'app.bsky.feed.post', 234 + text: 'Quote with media!', 235 + createdAt: '2024-09-09T19:48:00.000Z', 236 + embed: { 237 + $type: 'app.bsky.embed.recordWithMedia', 238 + record: { 239 + record: { 240 + uri: embeddedPostUri, 241 + cid: 'bafyembedcid', 242 + }, 243 + }, 244 + media: { 245 + $type: 'app.bsky.embed.images', 246 + images: [ 247 + { 248 + alt: 'media image', 249 + image: { 250 + $type: 'blob', 251 + ref: { $link: 'bafkreimediaimgcid' }, 252 + mimeType: 'image/jpeg', 253 + size: 50000, 254 + }, 255 + }, 256 + ], 257 + }, 258 + }, 259 + }, 260 + cid: 'bafypostcidrecordwithmedia', 261 + }, 262 + } 263 + } 264 + 218 265 function makeQuotePostEvent( 219 266 quoterDid: string, 220 267 embeddedPostUri: string, ··· 1054 1101 assert.equal(snapshot.embed.items[0].alt, 'test image') 1055 1102 assert.deepEqual(snapshot.embed.items[0].aspectRatio, { width: 800, height: 600 }) 1056 1103 } 1104 + 1105 + await consumer.shutdown() 1106 + }) 1107 + } 1108 + ) 1109 + 1110 + test.group( 1111 + 'JetstreamConsumer — tracked-author post with recordWithMedia embed → no snapshot buffered', 1112 + () => { 1113 + test('recordWithMedia embed (quote + media) skips snapshot for tracked author', async ({ 1114 + assert, 1115 + }) => { 1116 + const secondTrackedDid = 'did:plc:secondtracked789' 1117 + const embeddedPostUri = `at://${secondTrackedDid}/app.bsky.feed.post/postrkeydef` 1118 + 1119 + const { fakeWs, store, consumer } = makeConsumer({ 1120 + trackedDids: new Set([TRACKED_AUTHOR_DID, secondTrackedDid]), 1121 + }) 1122 + 1123 + await startConsumerInBackground(consumer) 1124 + 1125 + fakeWs.emit(makePostEventWithRecordWithMediaEmbed(TRACKED_AUTHOR_DID, embeddedPostUri)) 1126 + 1127 + await consumer.flushBuffer() 1128 + 1129 + // No snapshot buffered — recordWithMedia is a quote-post type, skip=true 1130 + assert.equal(store.snapshotInsertCalls.length, 0) 1131 + 1132 + // Part B fires: quote engagement event for the embedded author 1133 + assert.equal(store.insertCalls.length, 1) 1134 + assert.equal(store.insertCalls[0][0].kind, 'quote') 1135 + assert.equal(store.insertCalls[0][0].postUri, embeddedPostUri) 1057 1136 1058 1137 await consumer.shutdown() 1059 1138 })