audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: restore avatar on account reactivation, clear on deactivation (#1291)

jetstream ignored `kind=account` events entirely — deactivation left
stale cdn.bsky.app avatar URLs (dead 404s), and reactivation never
refreshed them. identity events also skipped avatar updates.

- handle `account` events in jetstream consumer (dispatch new
`ingest_account_status_change` task)
- on deactivation: clear avatar_url so frontend doesn't show broken img
- on reactivation: fetch fresh avatar from Bluesky profile
- add avatar refresh to `ingest_identity_update` (covers PDS migrations
and handle changes too)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6 (1M context)
and committed by
GitHub
524758ac 04109994

+257 -29
+18
backend/src/backend/_internal/jetstream.py
··· 25 25 26 26 from backend._internal.background import get_docket 27 27 from backend._internal.tasks.ingest import ( 28 + ingest_account_status_change, 28 29 ingest_comment_create, 29 30 ingest_comment_delete, 30 31 ingest_comment_update, ··· 145 146 "jetstream dispatched identity update", 146 147 did=did, 147 148 handle=handle, 149 + ) 150 + if time_us := event.get("time_us"): 151 + self._cursor = time_us 152 + return 153 + 154 + if kind == "account": 155 + did = event.get("did") 156 + account = event.get("account") or {} 157 + active = account.get("active", False) 158 + if did and did in self._known_dids: 159 + docket = get_docket() 160 + await docket.add(ingest_account_status_change)(did=did, active=active) 161 + logfire.info( 162 + "jetstream dispatched account status change", 163 + did=did, 164 + active=active, 165 + status=account.get("status"), 148 166 ) 149 167 if time_us := event.get("time_us"): 150 168 self._cursor = time_us
+3
backend/src/backend/_internal/tasks/__init__.py
··· 30 30 ) 31 31 from backend._internal.tasks.ingest import ( 32 32 SubjectNotFoundError, 33 + ingest_account_status_change, 33 34 ingest_comment_create, 34 35 ingest_comment_delete, 35 36 ingest_comment_update, ··· 108 109 ingest_list_update, 109 110 ingest_list_delete, 110 111 ingest_profile_update, 112 + ingest_account_status_change, 111 113 scan_image_moderation, 112 114 ] 113 115 ··· 135 137 "classify_genres", 136 138 "consume_jetstream", 137 139 "generate_embedding", 140 + "ingest_account_status_change", 138 141 "ingest_comment_create", 139 142 "ingest_comment_delete", 140 143 "ingest_comment_update",
+64 -5
backend/src/backend/_internal/tasks/ingest.py
··· 716 716 did: str, 717 717 handle: str, 718 718 ) -> None: 719 - """update artist handle and PDS URL when an identity event arrives. 719 + """update artist handle, PDS URL, and avatar when an identity event arrives. 720 720 721 - identity events fire on both handle changes and PDS migrations. 722 - resolving the DID gives us the current PDS URL, so we update both 723 - in one pass rather than lazily healing stale pds_url in every API 724 - endpoint that fetches ATProto records. 721 + identity events fire on handle changes, PDS migrations, and account 722 + reactivation. resolving the DID gives us the current PDS URL, and 723 + re-fetching the profile refreshes the avatar (which may have gone 724 + stale during account deactivation). 725 725 """ 726 726 from atproto_identity.did.resolver import AsyncDidResolver 727 + 728 + from backend._internal.atproto.profile import fetch_user_avatar 727 729 728 730 async with db_session() as db: 729 731 artist = await db.get(Artist, did) ··· 754 756 "ingest_identity_update: DID resolution failed for %s: %s", did, e 755 757 ) 756 758 759 + # refresh avatar from Bluesky profile 760 + try: 761 + fresh_avatar = await fetch_user_avatar(did) 762 + if fresh_avatar != artist.avatar_url: 763 + changes["avatar_url"] = (artist.avatar_url, fresh_avatar) 764 + artist.avatar_url = fresh_avatar 765 + except Exception as e: 766 + logger.warning( 767 + "ingest_identity_update: avatar fetch failed for %s: %s", did, e 768 + ) 769 + 757 770 if not changes: 758 771 return 759 772 ··· 763 776 did=did, 764 777 changes={k: {"old": v[0], "new": v[1]} for k, v in changes.items()}, 765 778 ) 779 + 780 + 781 + async def ingest_account_status_change( 782 + did: str, 783 + active: bool, 784 + ) -> None: 785 + """handle account activation/deactivation events. 786 + 787 + on reactivation (active=True): re-fetch avatar from Bluesky since 788 + the CDN URL goes dead during deactivation. 789 + 790 + on deactivation (active=False): clear the avatar URL so the frontend 791 + doesn't show a broken image pointing at a dead CDN URL. 792 + """ 793 + from backend._internal.atproto.profile import fetch_user_avatar 794 + 795 + async with db_session() as db: 796 + artist = await db.get(Artist, did) 797 + if not artist: 798 + logger.debug("ingest_account_status_change: unknown artist %s", did) 799 + return 800 + 801 + if active: 802 + # re-fetch avatar on reactivation 803 + try: 804 + fresh_avatar = await fetch_user_avatar(did) 805 + if fresh_avatar != artist.avatar_url: 806 + artist.avatar_url = fresh_avatar 807 + await db.commit() 808 + logfire.info( 809 + "ingest: avatar restored on reactivation", 810 + did=did, 811 + avatar_url=fresh_avatar, 812 + ) 813 + except Exception as e: 814 + logger.warning( 815 + "ingest_account_status_change: avatar fetch failed for %s: %s", 816 + did, 817 + e, 818 + ) 819 + else: 820 + # clear stale avatar on deactivation 821 + if artist.avatar_url: 822 + artist.avatar_url = None 823 + await db.commit() 824 + logfire.info("ingest: avatar cleared on deactivation", did=did)
+170 -22
backend/tests/test_jetstream.py
··· 13 13 from backend._internal.tasks.ingest import ( 14 14 SubjectNotFoundError, 15 15 _write_tombstone, 16 + ingest_account_status_change, 16 17 ingest_comment_create, 17 18 ingest_comment_delete, 18 19 ingest_identity_update, ··· 156 157 await consumer._process_event(event) 157 158 consumer._dispatch.assert_not_called() # type: ignore[union-attr] 158 159 159 - async def test_skips_non_commit_non_identity_events(self) -> None: 160 + async def test_skips_non_commit_non_identity_non_account_events(self) -> None: 160 161 consumer = JetstreamConsumer() 161 162 consumer._known_dids = {"did:plc:jetstream_test"} 162 163 consumer._dispatch = AsyncMock() # type: ignore[method-assign] 163 164 164 - event = {"kind": "account", "did": "did:plc:jetstream_test"} 165 + event = {"kind": "info", "did": "did:plc:jetstream_test"} 165 166 await consumer._process_event(event) 166 167 consumer._dispatch.assert_not_called() # type: ignore[union-attr] 168 + 169 + async def test_dispatches_account_event(self) -> None: 170 + consumer = JetstreamConsumer() 171 + consumer._known_dids = {"did:plc:jetstream_test"} 172 + 173 + mock_docket = MagicMock() 174 + dispatched: list[dict] = [] 175 + 176 + async def capture(**kwargs: object) -> None: 177 + dispatched.append(dict(kwargs)) 178 + 179 + mock_docket.add = MagicMock(return_value=capture) 180 + 181 + event = { 182 + "kind": "account", 183 + "did": "did:plc:jetstream_test", 184 + "time_us": 3000000, 185 + "account": {"active": True, "status": "active"}, 186 + } 187 + 188 + with patch("backend._internal.jetstream.get_docket", return_value=mock_docket): 189 + await consumer._process_event(event) 190 + 191 + assert len(dispatched) == 1 192 + assert dispatched[0]["did"] == "did:plc:jetstream_test" 193 + assert dispatched[0]["active"] is True 194 + assert consumer._cursor == 3000000 195 + 196 + async def test_account_event_skips_unknown_did(self) -> None: 197 + consumer = JetstreamConsumer() 198 + consumer._known_dids = {"did:plc:known"} 199 + 200 + mock_docket = MagicMock() 201 + mock_docket.add = MagicMock() 202 + 203 + event = { 204 + "kind": "account", 205 + "did": "did:plc:unknown", 206 + "account": {"active": False, "status": "deactivated"}, 207 + } 208 + 209 + with patch("backend._internal.jetstream.get_docket", return_value=mock_docket): 210 + await consumer._process_event(event) 211 + 212 + mock_docket.add.assert_not_called() 167 213 168 214 async def test_dispatches_identity_event(self) -> None: 169 215 consumer = JetstreamConsumer() ··· 1574 1620 return mock_resolver 1575 1621 1576 1622 1623 + def _patch_identity_update( 1624 + pds_url: str = "https://pds.example.com", 1625 + avatar_url: str | None = None, 1626 + ): 1627 + """context manager stack for patching DID resolution + avatar fetch.""" 1628 + from contextlib import ExitStack 1629 + 1630 + stack = ExitStack() 1631 + stack.enter_context( 1632 + patch( 1633 + "atproto_identity.did.resolver.AsyncDidResolver", 1634 + return_value=_mock_did_resolver(pds_url=pds_url), 1635 + ) 1636 + ) 1637 + stack.enter_context( 1638 + patch( 1639 + "backend._internal.atproto.profile.fetch_user_avatar", 1640 + new_callable=AsyncMock, 1641 + return_value=avatar_url, 1642 + ) 1643 + ) 1644 + return stack 1645 + 1646 + 1577 1647 class TestIngestIdentityUpdate: 1578 1648 async def test_updates_artist_handle( 1579 1649 self, db_session: AsyncSession, artist: Artist 1580 1650 ) -> None: 1581 1651 new_handle = "updated.handle.example" 1582 - with patch( 1583 - "atproto_identity.did.resolver.AsyncDidResolver", 1584 - return_value=_mock_did_resolver(), 1585 - ): 1652 + with _patch_identity_update(): 1586 1653 await ingest_identity_update(did=artist.did, handle=new_handle) 1587 1654 1588 1655 await db_session.refresh(artist) ··· 1601 1668 await db_session.commit() 1602 1669 1603 1670 new_handle = "updated.handle.example" 1604 - with patch( 1605 - "atproto_identity.did.resolver.AsyncDidResolver", 1606 - return_value=_mock_did_resolver(), 1607 - ): 1671 + with _patch_identity_update(): 1608 1672 await ingest_identity_update(did=artist.did, handle=new_handle) 1609 1673 1610 1674 await db_session.refresh(session) ··· 1615 1679 ) -> None: 1616 1680 """PDS migration updates the cached pds_url via DID resolution.""" 1617 1681 new_pds = "https://new-pds.example.com" 1618 - with patch( 1619 - "atproto_identity.did.resolver.AsyncDidResolver", 1620 - return_value=_mock_did_resolver(pds_url=new_pds), 1621 - ): 1682 + with _patch_identity_update(pds_url=new_pds): 1622 1683 await ingest_identity_update(did=artist.did, handle=artist.handle) 1623 1684 1624 1685 await db_session.refresh(artist) 1625 1686 assert artist.pds_url == new_pds 1626 1687 1627 - async def test_noop_when_handle_and_pds_unchanged( 1688 + async def test_updates_avatar( 1689 + self, db_session: AsyncSession, artist: Artist 1690 + ) -> None: 1691 + """identity event refreshes avatar from Bluesky profile.""" 1692 + new_avatar = "https://cdn.bsky.app/img/avatar/plain/did:plc:test/newcid@jpeg" 1693 + with _patch_identity_update(avatar_url=new_avatar): 1694 + await ingest_identity_update(did=artist.did, handle=artist.handle) 1695 + 1696 + await db_session.refresh(artist) 1697 + assert artist.avatar_url == new_avatar 1698 + 1699 + async def test_noop_when_handle_pds_and_avatar_unchanged( 1628 1700 self, db_session: AsyncSession, artist: Artist 1629 1701 ) -> None: 1630 1702 """no commit when nothing changed — idempotent.""" 1631 1703 original_handle = artist.handle 1632 1704 original_pds = artist.pds_url 1633 - with patch( 1634 - "atproto_identity.did.resolver.AsyncDidResolver", 1635 - return_value=_mock_did_resolver(pds_url=original_pds or ""), 1705 + original_avatar = artist.avatar_url 1706 + with _patch_identity_update( 1707 + pds_url=original_pds or "", 1708 + avatar_url=original_avatar, 1636 1709 ): 1637 1710 await ingest_identity_update(did=artist.did, handle=original_handle) 1638 1711 1639 1712 await db_session.refresh(artist) 1640 1713 assert artist.handle == original_handle 1641 1714 assert artist.pds_url == original_pds 1715 + assert artist.avatar_url == original_avatar 1642 1716 1643 1717 async def test_noop_for_unknown_did(self, db_session: AsyncSession) -> None: 1644 1718 """unknown DID is silently skipped (no DID resolution attempted).""" ··· 1647 1721 async def test_did_resolution_failure_still_updates_handle( 1648 1722 self, db_session: AsyncSession, artist: Artist 1649 1723 ) -> None: 1650 - """if DID resolution fails, handle is still updated.""" 1724 + """if DID resolution fails, handle and avatar are still updated.""" 1651 1725 new_handle = "updated.handle.example" 1652 1726 mock_resolver = AsyncMock() 1653 1727 mock_resolver.resolve_atproto_data = AsyncMock( 1654 1728 side_effect=Exception("resolution failed") 1655 1729 ) 1656 - with patch( 1657 - "atproto_identity.did.resolver.AsyncDidResolver", 1658 - return_value=mock_resolver, 1730 + with ( 1731 + patch( 1732 + "atproto_identity.did.resolver.AsyncDidResolver", 1733 + return_value=mock_resolver, 1734 + ), 1735 + patch( 1736 + "backend._internal.atproto.profile.fetch_user_avatar", 1737 + new_callable=AsyncMock, 1738 + return_value=None, 1739 + ), 1659 1740 ): 1660 1741 await ingest_identity_update(did=artist.did, handle=new_handle) 1661 1742 1662 1743 await db_session.refresh(artist) 1663 1744 assert artist.handle == new_handle 1745 + 1746 + 1747 + class TestIngestAccountStatusChange: 1748 + async def test_reactivation_restores_avatar( 1749 + self, db_session: AsyncSession, artist: Artist 1750 + ) -> None: 1751 + """reactivation fetches fresh avatar from Bluesky.""" 1752 + artist.avatar_url = None 1753 + await db_session.commit() 1754 + 1755 + new_avatar = "https://cdn.bsky.app/img/avatar/plain/did:plc:test/cid123@jpeg" 1756 + with patch( 1757 + "backend._internal.atproto.profile.fetch_user_avatar", 1758 + new_callable=AsyncMock, 1759 + return_value=new_avatar, 1760 + ): 1761 + await ingest_account_status_change(did=artist.did, active=True) 1762 + 1763 + await db_session.refresh(artist) 1764 + assert artist.avatar_url == new_avatar 1765 + 1766 + async def test_deactivation_clears_avatar( 1767 + self, db_session: AsyncSession, artist: Artist 1768 + ) -> None: 1769 + """deactivation clears avatar to avoid broken CDN URLs.""" 1770 + artist.avatar_url = ( 1771 + "https://cdn.bsky.app/img/avatar/plain/did:plc:test/old@jpeg" 1772 + ) 1773 + await db_session.commit() 1774 + 1775 + await ingest_account_status_change(did=artist.did, active=False) 1776 + 1777 + await db_session.refresh(artist) 1778 + assert artist.avatar_url is None 1779 + 1780 + async def test_deactivation_noop_when_no_avatar( 1781 + self, db_session: AsyncSession, artist: Artist 1782 + ) -> None: 1783 + """deactivation is a noop if avatar is already None.""" 1784 + artist.avatar_url = None 1785 + await db_session.commit() 1786 + 1787 + await ingest_account_status_change(did=artist.did, active=False) 1788 + 1789 + await db_session.refresh(artist) 1790 + assert artist.avatar_url is None 1791 + 1792 + async def test_unknown_did_is_skipped(self, db_session: AsyncSession) -> None: 1793 + """unknown DID is silently skipped.""" 1794 + await ingest_account_status_change(did="did:plc:nonexistent", active=True) 1795 + 1796 + async def test_avatar_fetch_failure_on_reactivation( 1797 + self, db_session: AsyncSession, artist: Artist 1798 + ) -> None: 1799 + """avatar fetch failure on reactivation doesn't raise.""" 1800 + artist.avatar_url = None 1801 + await db_session.commit() 1802 + 1803 + with patch( 1804 + "backend._internal.atproto.profile.fetch_user_avatar", 1805 + new_callable=AsyncMock, 1806 + side_effect=Exception("network error"), 1807 + ): 1808 + await ingest_account_status_change(did=artist.did, active=True) 1809 + 1810 + await db_session.refresh(artist) 1811 + assert artist.avatar_url is None
+2 -2
loq.toml
··· 212 212 213 213 [[rules]] 214 214 path = "backend/src/backend/_internal/tasks/ingest.py" 215 - max_lines = 765 215 + max_lines = 824 216 216 217 217 [[rules]] 218 218 path = "backend/tests/test_jetstream.py" 219 - max_lines = 1663 219 + max_lines = 1811 220 220 221 221 [[rules]] 222 222 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte"