See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add tests for Task 10 worker extensions (posts, deletes, accounts, quotes)

16 new tests covering:
- post create by tracked author → snapshot buffered with zero counts
- post create by untracked author → no snapshot
- quote by untracked author embedding tracked author → quote engagement
- quote by tracked author embedding tracked author → snapshot AND quote
- quote embedding untracked author → no quote engagement
- post delete for tracked author → tombstonePost called
- post delete for untracked author → dropped
- account deleted → markUserDeleted + tombstoneUserSnapshots + DID removed
- account takendown → same as deleted
- account deactivated/suspended → dropped
- identity event for tracked user → handle updated
- identity event for untracked user → dropped
- DID removed from tracked set after account deletion

Plus tombstoneUserSnapshots integration test in clickhouse_store.spec.ts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+563 -12
+90 -1
apps/web/tests/unit/clickhouse_store.spec.ts
··· 501 501 }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 502 502 503 503 // ------------------------------------------------------------------------- 504 - // Test 13: insertPostSnapshots wraps errors with cause 504 + // Test 13: tombstoneUserSnapshots bulk-tombstones all posts for an author 505 + // ------------------------------------------------------------------------- 506 + 507 + test('tombstoneUserSnapshots marks all non-deleted posts as deleted', async ({ assert }) => { 508 + const author = 'did:plc:author_bulk_tombstone' 509 + const otherAuthor = 'did:plc:other_author_bulk' 510 + 511 + // Insert 3 posts for the target author 512 + await store.insertPostSnapshots([ 513 + aSnapshot({ 514 + postAuthorDid: author, 515 + postUri: `at://${author}/app.bsky.feed.post/p1`, 516 + snapshotLikes: 5, 517 + }), 518 + aSnapshot({ 519 + postAuthorDid: author, 520 + postUri: `at://${author}/app.bsky.feed.post/p2`, 521 + snapshotLikes: 10, 522 + }), 523 + aSnapshot({ 524 + postAuthorDid: author, 525 + postUri: `at://${author}/app.bsky.feed.post/p3`, 526 + snapshotLikes: 15, 527 + }), 528 + ]) 529 + 530 + // Insert 1 post for a different author (should NOT be affected) 531 + await store.insertPostSnapshots([ 532 + aSnapshot({ 533 + postAuthorDid: otherAuthor, 534 + postUri: `at://${otherAuthor}/app.bsky.feed.post/q1`, 535 + snapshotLikes: 99, 536 + }), 537 + ]) 538 + 539 + // Confirm all 3 author posts appear before tombstoning 540 + const before = await store.getTopPosts({ authorDid: author, kind: 'likes' }) 541 + assert.equal(before.length, 3) 542 + 543 + // Bulk tombstone 544 + await store.tombstoneUserSnapshots(author) 545 + 546 + // FINAL query should return 0 results for the author (all tombstoned) 547 + const afterAuthor = await store.getTopPosts({ authorDid: author, kind: 'likes' }) 548 + assert.equal(afterAuthor.length, 0, 'all author posts should be tombstoned') 549 + 550 + // The other author's post should be unaffected 551 + const afterOther = await store.getTopPosts({ authorDid: otherAuthor, kind: 'likes' }) 552 + assert.equal(afterOther.length, 1, 'other author post should be unaffected') 553 + assert.equal(afterOther[0].likes, 99) 554 + 555 + // Verify tombstone rows exist in the raw table (original + tombstone = 6 total for author) 556 + const rawCount = await store.client.query({ 557 + query: `SELECT count() AS n FROM post_snapshots WHERE post_author_did = {authorDid:String}`, 558 + query_params: { authorDid: author }, 559 + format: 'JSONEachRow', 560 + }) 561 + const rawCountRows = await rawCount.json<{ n: string }>() 562 + // 3 original rows + 3 tombstone rows = 6 563 + assert.equal(Number(rawCountRows[0].n), 6, 'should have 3 original + 3 tombstone rows') 564 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 565 + 566 + // ------------------------------------------------------------------------- 567 + // Test 14: tombstoneUserSnapshots wraps errors with cause 568 + // ------------------------------------------------------------------------- 569 + 570 + test('tombstoneUserSnapshots wraps errors with cause', async ({ assert }) => { 571 + const badStore = new ClickHouseStore({ 572 + url: 'http://127.0.0.1:19999', 573 + database: 'irrelevant', 574 + username: 'default', 575 + password: '', 576 + }) 577 + 578 + let thrown: unknown 579 + try { 580 + await badStore.tombstoneUserSnapshots('did:plc:someauthor') 581 + } catch (err) { 582 + thrown = err 583 + } finally { 584 + await badStore.close() 585 + } 586 + 587 + assert.instanceOf(thrown, Error) 588 + const error = thrown as Error & { cause?: unknown } 589 + assert.exists(error.cause, 'Error.cause should be set') 590 + }).skip(async () => !(await isClickHouseAvailable()), 'ClickHouse not available') 591 + 592 + // ------------------------------------------------------------------------- 593 + // Test 15: insertPostSnapshots wraps errors with cause 505 594 // ------------------------------------------------------------------------- 506 595 507 596 test('insertPostSnapshots wraps insert errors with cause', async ({ assert }) => {
+473 -11
apps/web/tests/unit/services/jetstream_consumer.spec.ts
··· 7 7 import { test } from '@japa/runner' 8 8 import type { ClickHouseStore } from '@skystar/clickhouse' 9 9 import type { EngagementEventRow } from '@skystar/clickhouse' 10 + import type { PostSnapshot } from '@skystar/clickhouse' 10 11 import { 11 12 JetstreamConsumer, 12 13 type WebSocketLike, ··· 56 57 shouldThrow?: boolean 57 58 } 58 59 59 - function makeFakeStore(config: FakeStoreConfig = {}): ClickHouseStore & { 60 + interface FakeStore extends ClickHouseStore { 60 61 insertCalls: EngagementEventRow[][] 61 - } { 62 + snapshotInsertCalls: PostSnapshot[][] 63 + tombstonePostCalls: Array<{ postUri: string; postAuthorDid: string }> 64 + tombstoneUserSnapshotsCalls: string[] 65 + } 66 + 67 + function makeFakeStore(config: FakeStoreConfig = {}): FakeStore { 62 68 const insertCalls: EngagementEventRow[][] = [] 69 + const snapshotInsertCalls: PostSnapshot[][] = [] 70 + const tombstonePostCalls: Array<{ postUri: string; postAuthorDid: string }> = [] 71 + const tombstoneUserSnapshotsCalls: string[] = [] 63 72 64 73 return { 65 74 insertCalls, 75 + snapshotInsertCalls, 76 + tombstonePostCalls, 77 + tombstoneUserSnapshotsCalls, 66 78 async insertEngagementEvents(events: EngagementEventRow[]) { 67 79 if (config.shouldThrow) throw new Error('ClickHouse unavailable') 68 80 insertCalls.push([...events]) 69 81 }, 70 - } as unknown as ClickHouseStore & { insertCalls: EngagementEventRow[][] } 82 + async insertPostSnapshots(snapshots: PostSnapshot[]) { 83 + if (config.shouldThrow) throw new Error('ClickHouse unavailable') 84 + snapshotInsertCalls.push([...snapshots]) 85 + }, 86 + async tombstonePost(postUri: string, postAuthorDid: string) { 87 + tombstonePostCalls.push({ postUri, postAuthorDid }) 88 + }, 89 + async tombstoneUserSnapshots(authorDid: string) { 90 + tombstoneUserSnapshotsCalls.push(authorDid) 91 + }, 92 + } as unknown as FakeStore 71 93 } 72 94 73 95 // --------------------------------------------------------------------------- ··· 138 160 } 139 161 } 140 162 141 - function makePostEvent(authorDid: string) { 163 + function makePostEvent(authorDid: string, timeUs = 1725911162329308) { 142 164 return { 143 165 did: authorDid, 144 - time_us: 1725911162329308, 166 + time_us: timeUs, 145 167 kind: 'commit', 146 168 commit: { 147 169 rev: 'rev4', ··· 158 180 } 159 181 } 160 182 183 + function makeQuotePostEvent( 184 + quoterDid: string, 185 + embeddedPostUri: string, 186 + quoterRkey = 'quoterkeyabc', 187 + timeUs = 1725911162329309 188 + ) { 189 + return { 190 + did: quoterDid, 191 + time_us: timeUs, 192 + kind: 'commit', 193 + commit: { 194 + rev: 'rev5', 195 + operation: 'create', 196 + collection: 'app.bsky.feed.post', 197 + rkey: quoterRkey, 198 + record: { 199 + $type: 'app.bsky.feed.post', 200 + text: 'I am quoting this post', 201 + createdAt: '2024-09-09T19:49:00.000Z', 202 + embed: { 203 + $type: 'app.bsky.embed.record', 204 + record: { 205 + uri: embeddedPostUri, 206 + cid: 'bafyembedcid', 207 + }, 208 + }, 209 + }, 210 + cid: 'bafyquotecid', 211 + }, 212 + } 213 + } 214 + 215 + function makePostDeleteEvent(authorDid: string, rkey = 'postrkeyxyz', timeUs = 1725911162329310) { 216 + return { 217 + did: authorDid, 218 + time_us: timeUs, 219 + kind: 'commit', 220 + commit: { 221 + rev: 'rev6', 222 + operation: 'delete', 223 + collection: 'app.bsky.feed.post', 224 + rkey, 225 + }, 226 + } 227 + } 228 + 229 + function makeAccountEvent( 230 + did: string, 231 + status: 'deleted' | 'takendown' | 'deactivated' | 'suspended' | 'active', 232 + timeUs = 1725911162329311 233 + ) { 234 + if (status === 'active') { 235 + return { 236 + did, 237 + time_us: timeUs, 238 + kind: 'account', 239 + account: { 240 + did, 241 + active: true, 242 + }, 243 + } 244 + } 245 + return { 246 + did, 247 + time_us: timeUs, 248 + kind: 'account', 249 + account: { 250 + did, 251 + active: false, 252 + status, 253 + }, 254 + } 255 + } 256 + 257 + function makeIdentityEvent(did: string, handle: string, timeUs = 1725911162329312) { 258 + return { 259 + did, 260 + time_us: timeUs, 261 + kind: 'identity', 262 + identity: { 263 + did, 264 + handle, 265 + }, 266 + } 267 + } 268 + 161 269 // --------------------------------------------------------------------------- 162 270 // Consumer factory 163 271 // --------------------------------------------------------------------------- 164 272 165 273 interface ConsumerSetup { 166 274 fakeWs: FakeWebSocket 167 - store: ClickHouseStore & { insertCalls: EngagementEventRow[][] } 275 + store: FakeStore 168 276 consumer: JetstreamConsumer 169 277 writeCursorCalls: bigint[] 278 + markUserDeletedCalls: string[] 279 + updateUserHandleCalls: Array<{ did: string; handle: string }> 170 280 lastConnectedUrl: { value: string } 171 281 connectedUrls: string[] 172 282 } ··· 189 299 const fakeWs = new FakeWebSocket() 190 300 const store = makeFakeStore({ shouldThrow: storeShouldThrow }) 191 301 const writeCursorCalls: bigint[] = [] 302 + const markUserDeletedCalls: string[] = [] 303 + const updateUserHandleCalls: Array<{ did: string; handle: string }> = [] 192 304 const lastConnectedUrl: { value: string } = { value: '' } 193 305 const connectedUrls: string[] = [] 194 306 ··· 207 319 writeCursorCalls.push(cursor) 208 320 }, 209 321 now: () => new Date('2024-09-09T19:46:00.000Z'), 322 + markUserDeleted: async (did: string) => { 323 + markUserDeletedCalls.push(did) 324 + }, 325 + updateUserHandle: async (did: string, handle: string) => { 326 + updateUserHandleCalls.push({ did, handle }) 327 + }, 210 328 ...(instantReconnect ? { reconnectDelay: (_ms: number) => Promise.resolve() } : {}), 211 329 } 212 330 ··· 217 335 currentTrackedDids = newSet 218 336 } 219 337 220 - return { fakeWs, store, consumer, writeCursorCalls, lastConnectedUrl, connectedUrls } 338 + return { 339 + fakeWs, 340 + store, 341 + consumer, 342 + writeCursorCalls, 343 + markUserDeletedCalls, 344 + updateUserHandleCalls, 345 + lastConnectedUrl, 346 + connectedUrls, 347 + } 221 348 } 222 349 223 350 // --------------------------------------------------------------------------- ··· 307 434 }) 308 435 309 436 test.group('JetstreamConsumer — post events do not crash', () => { 310 - test('post create event for tracked author does not crash and inserts nothing (Task 9)', async ({ 437 + test('post create event for untracked author does not crash and inserts nothing', async ({ 311 438 assert, 312 439 }) => { 313 440 const { fakeWs, store, consumer } = makeConsumer() 314 441 315 442 await startConsumerInBackground(consumer) 316 443 317 - // Should not throw 318 - fakeWs.emit(makePostEvent(TRACKED_AUTHOR_DID)) 444 + // Untracked author — should not throw and should produce no inserts 445 + fakeWs.emit(makePostEvent(UNTRACKED_AUTHOR_DID)) 319 446 320 447 await consumer.flushBuffer() 321 448 322 - // Task 9: no insert for post events 323 449 assert.equal(store.insertCalls.length, 0) 450 + assert.equal(store.snapshotInsertCalls.length, 0) 324 451 325 452 await consumer.shutdown() 326 453 }) ··· 528 655 await consumer.shutdown() 529 656 }) 530 657 }) 658 + 659 + // --------------------------------------------------------------------------- 660 + // Task 10 — new tests 661 + // --------------------------------------------------------------------------- 662 + 663 + test.group('JetstreamConsumer — post create event for tracked author → snapshot insert', () => { 664 + test('post create by tracked author buffers a snapshot with zero counts', async ({ assert }) => { 665 + const { fakeWs, store, consumer } = makeConsumer() 666 + 667 + await startConsumerInBackground(consumer) 668 + 669 + fakeWs.emit(makePostEvent(TRACKED_AUTHOR_DID)) 670 + 671 + await consumer.flushBuffer() 672 + 673 + assert.equal(store.snapshotInsertCalls.length, 1, 'insertPostSnapshots should be called once') 674 + assert.equal(store.snapshotInsertCalls[0].length, 1, 'one snapshot per post event') 675 + 676 + const snapshot = store.snapshotInsertCalls[0][0] 677 + assert.equal(snapshot.postAuthorDid, TRACKED_AUTHOR_DID) 678 + assert.equal(snapshot.postUri, `at://${TRACKED_AUTHOR_DID}/app.bsky.feed.post/postrkeyxyz`) 679 + assert.equal(snapshot.postText, 'Hello Bluesky!') 680 + assert.equal(snapshot.snapshotLikes, 0) 681 + assert.equal(snapshot.snapshotReposts, 0) 682 + assert.equal(snapshot.snapshotQuotes, 0) 683 + // snapshotTakenAt should be set to `deps.now()` (injected as 2024-09-09T19:46:00Z) 684 + assert.deepEqual(snapshot.snapshotTakenAt, new Date('2024-09-09T19:46:00.000Z')) 685 + 686 + await consumer.shutdown() 687 + }) 688 + }) 689 + 690 + test.group('JetstreamConsumer — post create event for untracked author → no snapshot', () => { 691 + test('post create by untracked author produces no snapshot insert', async ({ assert }) => { 692 + const { fakeWs, store, consumer } = makeConsumer() 693 + 694 + await startConsumerInBackground(consumer) 695 + 696 + fakeWs.emit(makePostEvent(UNTRACKED_AUTHOR_DID)) 697 + 698 + await consumer.flushBuffer() 699 + 700 + assert.equal(store.snapshotInsertCalls.length, 0, 'insertPostSnapshots should NOT be called') 701 + 702 + await consumer.shutdown() 703 + }) 704 + }) 705 + 706 + test.group( 707 + 'JetstreamConsumer — quote post by untracked author embedding tracked author → quote engagement', 708 + () => { 709 + test('untracked quoter quoting tracked author buffers a quote engagement event', async ({ 710 + assert, 711 + }) => { 712 + const embeddedPostUri = `at://${TRACKED_AUTHOR_DID}/app.bsky.feed.post/postrkeyabc` 713 + 714 + const { fakeWs, store, consumer } = makeConsumer() 715 + 716 + await startConsumerInBackground(consumer) 717 + 718 + fakeWs.emit(makeQuotePostEvent(UNTRACKED_AUTHOR_DID, embeddedPostUri)) 719 + 720 + await consumer.flushBuffer() 721 + 722 + // No snapshot insert (quoter is untracked) 723 + assert.equal(store.snapshotInsertCalls.length, 0) 724 + 725 + // One engagement event with kind='quote' 726 + assert.equal(store.insertCalls.length, 1) 727 + assert.equal(store.insertCalls[0].length, 1) 728 + 729 + const quoteEvent = store.insertCalls[0][0] 730 + assert.equal(quoteEvent.kind, 'quote') 731 + assert.equal(quoteEvent.postUri, embeddedPostUri) 732 + assert.equal(quoteEvent.postAuthorDid, TRACKED_AUTHOR_DID) 733 + assert.equal(quoteEvent.actorDid, UNTRACKED_AUTHOR_DID) 734 + assert.equal(quoteEvent.rkey, 'quoterkeyabc') 735 + 736 + await consumer.shutdown() 737 + }) 738 + } 739 + ) 740 + 741 + test.group( 742 + 'JetstreamConsumer — quote post by tracked author embedding tracked author → snapshot AND quote', 743 + () => { 744 + test('tracked author quoting tracked author fires both snapshot insert and quote engagement', async ({ 745 + assert, 746 + }) => { 747 + const secondTrackedDid = 'did:plc:secondtracked789' 748 + const embeddedPostUri = `at://${secondTrackedDid}/app.bsky.feed.post/postrkeydef` 749 + 750 + const { fakeWs, store, consumer } = makeConsumer({ 751 + trackedDids: new Set([TRACKED_AUTHOR_DID, secondTrackedDid]), 752 + }) 753 + 754 + await startConsumerInBackground(consumer) 755 + 756 + // TRACKED_AUTHOR_DID quotes secondTrackedDid's post 757 + fakeWs.emit(makeQuotePostEvent(TRACKED_AUTHOR_DID, embeddedPostUri)) 758 + 759 + await consumer.flushBuffer() 760 + 761 + // Snapshot for the quoter's own post 762 + assert.equal(store.snapshotInsertCalls.length, 1) 763 + assert.equal(store.snapshotInsertCalls[0][0].postAuthorDid, TRACKED_AUTHOR_DID) 764 + 765 + // Quote engagement event for the embedded author's post 766 + assert.equal(store.insertCalls.length, 1) 767 + assert.equal(store.insertCalls[0].length, 1) 768 + const quoteEvent = store.insertCalls[0][0] 769 + assert.equal(quoteEvent.kind, 'quote') 770 + assert.equal(quoteEvent.postUri, embeddedPostUri) 771 + assert.equal(quoteEvent.postAuthorDid, secondTrackedDid) 772 + assert.equal(quoteEvent.actorDid, TRACKED_AUTHOR_DID) 773 + 774 + await consumer.shutdown() 775 + }) 776 + } 777 + ) 778 + 779 + test.group( 780 + 'JetstreamConsumer — quote post embedding untracked author → no quote engagement', 781 + () => { 782 + test('quoting an untracked author produces no engagement event', async ({ assert }) => { 783 + const embeddedPostUri = `at://${UNTRACKED_AUTHOR_DID}/app.bsky.feed.post/postrkeyzzz` 784 + 785 + const { fakeWs, store, consumer } = makeConsumer() 786 + 787 + await startConsumerInBackground(consumer) 788 + 789 + // Tracked author quotes an untracked author — no quote event 790 + fakeWs.emit(makeQuotePostEvent(TRACKED_AUTHOR_DID, embeddedPostUri)) 791 + 792 + await consumer.flushBuffer() 793 + 794 + // Snapshot for the tracked author's own post (since they ARE tracked) 795 + assert.equal(store.snapshotInsertCalls.length, 1) 796 + 797 + // No quote engagement event (embedded author is untracked) 798 + assert.equal(store.insertCalls.length, 0) 799 + 800 + await consumer.shutdown() 801 + }) 802 + } 803 + ) 804 + 805 + test.group( 806 + 'JetstreamConsumer — post delete event for tracked author → tombstonePost called', 807 + () => { 808 + test('post delete for tracked author calls tombstonePost with correct URI', async ({ 809 + assert, 810 + }) => { 811 + const { fakeWs, store, consumer } = makeConsumer() 812 + 813 + await startConsumerInBackground(consumer) 814 + 815 + fakeWs.emit(makePostDeleteEvent(TRACKED_AUTHOR_DID, 'postrkeyxyz')) 816 + 817 + // tombstonePost is called directly (not batched), but it's async via void + catch 818 + await new Promise((r) => setTimeout(r, 10)) 819 + 820 + assert.equal(store.tombstonePostCalls.length, 1) 821 + assert.equal( 822 + store.tombstonePostCalls[0].postUri, 823 + `at://${TRACKED_AUTHOR_DID}/app.bsky.feed.post/postrkeyxyz` 824 + ) 825 + assert.equal(store.tombstonePostCalls[0].postAuthorDid, TRACKED_AUTHOR_DID) 826 + 827 + await consumer.shutdown() 828 + }) 829 + } 830 + ) 831 + 832 + test.group( 833 + 'JetstreamConsumer — post delete event for untracked author → tombstonePost NOT called', 834 + () => { 835 + test('post delete for untracked author is dropped', async ({ assert }) => { 836 + const { fakeWs, store, consumer } = makeConsumer() 837 + 838 + await startConsumerInBackground(consumer) 839 + 840 + fakeWs.emit(makePostDeleteEvent(UNTRACKED_AUTHOR_DID, 'postrkeyxyz')) 841 + 842 + await new Promise((r) => setTimeout(r, 10)) 843 + 844 + assert.equal(store.tombstonePostCalls.length, 0) 845 + 846 + await consumer.shutdown() 847 + }) 848 + } 849 + ) 850 + 851 + test.group( 852 + 'JetstreamConsumer — account deleted event → markUserDeleted, remove from tracked set, tombstoneUserSnapshots', 853 + () => { 854 + test('account status=deleted fires all three side effects', async ({ assert }) => { 855 + const { fakeWs, store, consumer, markUserDeletedCalls } = makeConsumer() 856 + 857 + await startConsumerInBackground(consumer) 858 + 859 + fakeWs.emit(makeAccountEvent(TRACKED_AUTHOR_DID, 'deleted')) 860 + 861 + await new Promise((r) => setTimeout(r, 10)) 862 + 863 + // markUserDeleted called 864 + assert.equal(markUserDeletedCalls.length, 1) 865 + assert.equal(markUserDeletedCalls[0], TRACKED_AUTHOR_DID) 866 + 867 + // tombstoneUserSnapshots called 868 + assert.equal(store.tombstoneUserSnapshotsCalls.length, 1) 869 + assert.equal(store.tombstoneUserSnapshotsCalls[0], TRACKED_AUTHOR_DID) 870 + 871 + // DID removed from in-memory tracked set — subsequent likes should be dropped 872 + fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID, 1725911162339999, 'rkey-after-delete')) 873 + await consumer.flushBuffer() 874 + assert.equal(store.insertCalls.length, 0) 875 + 876 + await consumer.shutdown() 877 + }) 878 + } 879 + ) 880 + 881 + test.group('JetstreamConsumer — account takendown event → same behaviour as deleted', () => { 882 + test('account status=takendown fires all three side effects', async ({ assert }) => { 883 + const { fakeWs, store, consumer, markUserDeletedCalls } = makeConsumer() 884 + 885 + await startConsumerInBackground(consumer) 886 + 887 + fakeWs.emit(makeAccountEvent(TRACKED_AUTHOR_DID, 'takendown')) 888 + 889 + await new Promise((r) => setTimeout(r, 10)) 890 + 891 + assert.equal(markUserDeletedCalls.length, 1) 892 + assert.equal(markUserDeletedCalls[0], TRACKED_AUTHOR_DID) 893 + assert.equal(store.tombstoneUserSnapshotsCalls.length, 1) 894 + 895 + await consumer.shutdown() 896 + }) 897 + }) 898 + 899 + test.group('JetstreamConsumer — account deactivated or suspended event → no action', () => { 900 + test('account status=deactivated is dropped', async ({ assert }) => { 901 + const { fakeWs, store, consumer, markUserDeletedCalls } = makeConsumer() 902 + 903 + await startConsumerInBackground(consumer) 904 + 905 + fakeWs.emit(makeAccountEvent(TRACKED_AUTHOR_DID, 'deactivated')) 906 + 907 + await new Promise((r) => setTimeout(r, 10)) 908 + 909 + assert.equal(markUserDeletedCalls.length, 0) 910 + assert.equal(store.tombstoneUserSnapshotsCalls.length, 0) 911 + 912 + await consumer.shutdown() 913 + }) 914 + 915 + test('account status=suspended is dropped', async ({ assert }) => { 916 + const { fakeWs, store, consumer, markUserDeletedCalls } = makeConsumer() 917 + 918 + await startConsumerInBackground(consumer) 919 + 920 + fakeWs.emit(makeAccountEvent(TRACKED_AUTHOR_DID, 'suspended')) 921 + 922 + await new Promise((r) => setTimeout(r, 10)) 923 + 924 + assert.equal(markUserDeletedCalls.length, 0) 925 + assert.equal(store.tombstoneUserSnapshotsCalls.length, 0) 926 + 927 + await consumer.shutdown() 928 + }) 929 + }) 930 + 931 + test.group('JetstreamConsumer — identity event for tracked user → handle updated', () => { 932 + test('identity event for tracked user calls updateUserHandle', async ({ assert }) => { 933 + const { fakeWs, consumer, updateUserHandleCalls } = makeConsumer() 934 + 935 + await startConsumerInBackground(consumer) 936 + 937 + fakeWs.emit(makeIdentityEvent(TRACKED_AUTHOR_DID, 'newhandle.bsky.social')) 938 + 939 + await new Promise((r) => setTimeout(r, 10)) 940 + 941 + assert.equal(updateUserHandleCalls.length, 1) 942 + assert.equal(updateUserHandleCalls[0].did, TRACKED_AUTHOR_DID) 943 + assert.equal(updateUserHandleCalls[0].handle, 'newhandle.bsky.social') 944 + 945 + await consumer.shutdown() 946 + }) 947 + }) 948 + 949 + test.group('JetstreamConsumer — identity event for untracked user → no DB write', () => { 950 + test('identity event for untracked user is dropped', async ({ assert }) => { 951 + const { fakeWs, consumer, updateUserHandleCalls } = makeConsumer() 952 + 953 + await startConsumerInBackground(consumer) 954 + 955 + fakeWs.emit(makeIdentityEvent(UNTRACKED_AUTHOR_DID, 'someone.bsky.social')) 956 + 957 + await new Promise((r) => setTimeout(r, 10)) 958 + 959 + assert.equal(updateUserHandleCalls.length, 0) 960 + 961 + await consumer.shutdown() 962 + }) 963 + }) 964 + 965 + test.group( 966 + 'JetstreamConsumer — account deletion removes DID from tracked set (in-memory check)', 967 + () => { 968 + test('after account deletion, likes for that DID are dropped', async ({ assert }) => { 969 + const { fakeWs, store, consumer } = makeConsumer() 970 + 971 + await startConsumerInBackground(consumer) 972 + 973 + // First confirm the DID is tracked: a like should be buffered 974 + fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID, 1725911162329308, 'rkey-before')) 975 + await consumer.flushBuffer() 976 + assert.equal(store.insertCalls.length, 1) 977 + 978 + // Trigger account deletion 979 + fakeWs.emit(makeAccountEvent(TRACKED_AUTHOR_DID, 'deleted')) 980 + await new Promise((r) => setTimeout(r, 10)) 981 + 982 + // Now a like should be dropped (DID removed from tracked set) 983 + fakeWs.emit(makeLikeEvent(TRACKED_AUTHOR_DID, 1725911162339999, 'rkey-after')) 984 + await consumer.flushBuffer() 985 + 986 + // Still only 1 insert call from before 987 + assert.equal(store.insertCalls.length, 1) 988 + 989 + await consumer.shutdown() 990 + }) 991 + } 992 + )