audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(likes): tombstone cancelled URIs to prevent like resurrection race (#1338)

`test_cross_user_like` flakes intermittently in the staging integration
suite because of a real race in the like → unlike sequence:

1. user clicks LIKE → DB INSERT row R (atproto_like_uri=NULL),
`pds_create_like(R.id)` enqueued via docket.
2. user clicks UNLIKE before pds_create_like runs. atproto_like_uri
is still NULL so we just DELETE R; no PDS-delete is scheduled
because there's no URI yet.
3. `pds_create_like(R.id)` finally runs:
a. PDS create returns URI X.
b. SELECT R.id → row gone → orphan-cleanup branch fires.
c. `delete_record_by_uri(X)` is scheduled.
4. Jetstream emits the `app.bsky.feed.like` create event for X
BEFORE the matching delete event from (3c) propagates.
5. `ingest_like_create` finds no existing row for (track, user)
→ INSERTS a fresh row with URI X. **the like just resurrected
itself after the user explicitly unliked.**
6. eventually the delete event arrives and `ingest_like_delete`
by URI X clears the resurrected row — but in the gap the user
sees their unlike undone.

Fix: in (3c), tombstone the URI in Redis with a 5-minute TTL BEFORE
issuing the orphan PDS delete. `ingest_like_create` checks the
tombstone and drops the matching create event in (5). The TTL only
needs to cover Jetstream propagation; expiry is harmless because the
matching delete event still arrives shortly after.

Why Redis tombstone over a `cancelled_at` schema column: no migration,
no read-path filtering across ~15 query sites, scoped fix to the two
files actually involved in the race. Local Redis blip falls back to
the existing Jetstream-delete cleanup; user briefly sees the ghost
like but it's cleared seconds later.

Mirrors the existing track-tombstone pattern in `ingest.py` (which
prevents ghost tracks from cursor rewind) — same Redis primitive,
different prefix (`like_cancelled:` vs `plyr:tombstone:`) reflecting
the different concern (write race vs replay race).

Tests:
- tests/test_pds_create_like_tombstone.py — pds_create_like writes
the tombstone in the orphan branch and NOT on the happy path
(which would otherwise stall the user's own like indefinitely).
- tests/test_jetstream.py::TestIngestLikeCreate::test_skips_create_for_cancelled_uri
— ingest_like_create drops the create event when the URI is
tombstoned.

447/447 backend tests pass; ruff + ty clean.

Co-authored-by: Claude Opus 4 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4 (1M context)
and committed by
GitHub
5980d3cc 1495c643

+325 -3
+15
backend/src/backend/_internal/tasks/ingest.py
··· 21 21 is_trusted_audio_origin, 22 22 is_trusted_image_origin, 23 23 ) 24 + from backend._internal.tasks.pds import is_like_uri_cancelled 24 25 from backend.config import settings 25 26 from backend.models import Artist, Playlist, Track, TrackComment, TrackLike 26 27 from backend.models.session import UserSession ··· 415 416 """create a like from a Jetstream event.""" 416 417 if errors := validate_record("fm.plyr.like", record): 417 418 logfire.warn("ingest: invalid like record, skipping", uri=uri, errors=errors) 419 + return 420 + 421 + # `pds_create_like` tombstones URIs whose owning row was unliked while 422 + # the PDS create was still in flight. Jetstream may emit the create 423 + # event for that URI before our orphan-cleanup PDS delete propagates; 424 + # without this guard, we'd insert a row the user already cancelled 425 + # (the "like resurrects after unlike" race surfaced by 426 + # `test_cross_user_like` in integration tests). 427 + if await is_like_uri_cancelled(uri): 428 + logfire.info( 429 + "ingest: skipping cancelled like create event", 430 + uri=uri, 431 + user_did=did, 432 + ) 418 433 return 419 434 420 435 subject = record.get("subject", {})
+65 -1
backend/src/backend/_internal/tasks/pds.py
··· 4 4 from datetime import UTC, datetime 5 5 6 6 import logfire 7 + from redis.exceptions import RedisError 7 8 from sqlalchemy import select 8 9 9 10 from backend._internal.atproto.records import ( ··· 16 17 from backend._internal.background import get_docket 17 18 from backend.models import TrackComment, TrackLike 18 19 from backend.utilities.database import db_session 20 + from backend.utilities.redis import get_async_redis_client 19 21 20 22 logger = logging.getLogger(__name__) 21 23 22 24 25 + # When `pds_create_like` discovers that the user already unliked before the 26 + # PDS record could be written, we tombstone the URI in Redis so the matching 27 + # Jetstream `app.bsky.feed.like` create event — which arrives after our local 28 + # delete and would otherwise resurrect the row in `ingest_like_create` — 29 + # is recognized as already-cancelled and skipped. the TTL only needs to 30 + # cover Jetstream propagation; 5 minutes is comfortably above observed 31 + # end-to-end latency. expiry is harmless: by then the matching delete event 32 + # (from our orphan-cleanup) will have arrived and any stray row would be 33 + # cleaned up via `ingest_like_delete` shortly thereafter. 34 + LIKE_CANCELLED_TOMBSTONE_PREFIX = "like_cancelled:" 35 + LIKE_CANCELLED_TOMBSTONE_TTL_SECONDS = 300 36 + 37 + 38 + async def mark_like_uri_cancelled(like_uri: str) -> None: 39 + """tombstone a like URI so a still-in-flight Jetstream create event 40 + for it is treated as already-cancelled and dropped before it can 41 + re-insert the row that the user just unliked. 42 + 43 + suppressed on Redis errors — the tombstone is an optimization to 44 + close a race window, and a missed tombstone falls back to the 45 + natural Jetstream-delete-event eventually clearing the resurrected 46 + row. don't fail the cancellation on a transient Redis blip. 47 + """ 48 + try: 49 + redis = get_async_redis_client() 50 + await redis.set( 51 + f"{LIKE_CANCELLED_TOMBSTONE_PREFIX}{like_uri}", 52 + "1", 53 + ex=LIKE_CANCELLED_TOMBSTONE_TTL_SECONDS, 54 + ) 55 + except RedisError as e: 56 + logger.warning( 57 + "failed to write like-cancelled tombstone for %s: %s", like_uri, e 58 + ) 59 + 60 + 61 + async def is_like_uri_cancelled(like_uri: str) -> bool: 62 + """check whether a like URI has been tombstoned by `pds_create_like`. 63 + 64 + suppressed on Redis errors — returning False on Redis trouble means 65 + the ingest path falls through to its normal behavior; the 66 + eventually-arriving delete event still cleans up. 67 + """ 68 + try: 69 + redis = get_async_redis_client() 70 + return await redis.exists(f"{LIKE_CANCELLED_TOMBSTONE_PREFIX}{like_uri}") > 0 71 + except RedisError as e: 72 + logger.warning( 73 + "failed to read like-cancelled tombstone for %s: %s", like_uri, e 74 + ) 75 + return False 76 + 77 + 23 78 async def pds_create_like( 24 79 session_id: str, 25 80 like_id: int, ··· 57 112 await session.commit() 58 113 logger.info(f"pds_create_like: created like record {like_uri}") 59 114 else: 60 - # like was deleted before we could update it - clean up orphan 115 + # the user unliked before we could write the PDS record. we 116 + # already created it on PDS in `create_like_record` above, 117 + # so we now have to delete it. but Jetstream has likely 118 + # already emitted a `like` create event for the URI we just 119 + # wrote — if it lands in `ingest_like_create` before our 120 + # PDS delete propagates, the row gets resurrected (no local 121 + # row exists for the existing-row dedup branch to catch). 122 + # tombstone the URI in Redis BEFORE the PDS delete so the 123 + # ingest path can recognize and drop the create event. 61 124 logger.warning(f"pds_create_like: like {like_id} no longer exists") 125 + await mark_like_uri_cancelled(like_uri) 62 126 await delete_record_by_uri(auth_session, like_uri) 63 127 64 128 except Exception as e:
+56
backend/tests/test_jetstream.py
··· 1100 1100 uri="at://did:plc:jetstream_test/fm.plyr.like/like2", 1101 1101 ) 1102 1102 1103 + async def test_skips_create_for_cancelled_uri( 1104 + self, db_session: AsyncSession, artist: Artist, track: Track 1105 + ) -> None: 1106 + """regression for the like-resurrection race surfaced by 1107 + `test_cross_user_like` in the staging integration suite. 1108 + 1109 + sequence: user clicks like (DB INSERT, atproto_like_uri=NULL), 1110 + then unlikes before `pds_create_like` finishes writing to PDS 1111 + (DB row deleted, no PDS URI to schedule a delete for). then 1112 + `pds_create_like` completes — PDS record IS written, but the 1113 + local row is gone, so it tombstones the URI and schedules an 1114 + orphan PDS delete. before that delete propagates through 1115 + Jetstream, the matching `app.bsky.feed.like` create event 1116 + arrives. without the tombstone check this re-inserts the row 1117 + the user already cancelled; with it, the event is dropped. 1118 + """ 1119 + from backend._internal.tasks.pds import LIKE_CANCELLED_TOMBSTONE_PREFIX 1120 + 1121 + cancelled_uri = "at://did:plc:jetstream_test/fm.plyr.like/cancelled" 1122 + store: dict[str, str] = { 1123 + f"{LIKE_CANCELLED_TOMBSTONE_PREFIX}{cancelled_uri}": "1" 1124 + } 1125 + 1126 + async def fake_exists(key: str) -> int: 1127 + return 1 if key in store else 0 1128 + 1129 + mock_redis = AsyncMock() 1130 + mock_redis.exists = AsyncMock(side_effect=fake_exists) 1131 + 1132 + record = { 1133 + "subject": { 1134 + "uri": track.atproto_record_uri, 1135 + "cid": track.atproto_record_cid, 1136 + }, 1137 + "createdAt": _recent_ts(), 1138 + } 1139 + with patch( 1140 + "backend._internal.tasks.pds.get_async_redis_client", 1141 + return_value=mock_redis, 1142 + ): 1143 + await ingest_like_create( 1144 + did=artist.did, 1145 + rkey="cancelled", 1146 + record=record, 1147 + uri=cancelled_uri, 1148 + ) 1149 + 1150 + result = await db_session.execute( 1151 + select(TrackLike).where(TrackLike.atproto_like_uri == cancelled_uri) 1152 + ) 1153 + assert result.scalar_one_or_none() is None, ( 1154 + "ingest_like_create must drop the create event when the URI is " 1155 + "tombstoned by pds_create_like's orphan-cleanup path; otherwise " 1156 + "the unlike-while-pending race resurrects the row." 1157 + ) 1158 + 1103 1159 1104 1160 class TestIngestLikeDelete: 1105 1161 async def test_deletes_by_uri(
+187
backend/tests/test_pds_create_like_tombstone.py
··· 1 + """regression: pds_create_like writes a tombstone when its row was unliked. 2 + 3 + paired with `TestIngestLikeCreate.test_skips_create_for_cancelled_uri` — 4 + together they cover both halves of the unlike-while-pending race fix: 5 + 6 + - this file: tombstone IS written by `pds_create_like` when the optimistic 7 + TrackLike row no longer exists at the time the PDS create completes 8 + (i.e. the user unliked between handler-return and worker-execution). 9 + - ingest test: `ingest_like_create` reads the tombstone and drops the 10 + matching create event so the row is not resurrected. 11 + 12 + without both halves, the race in `test_cross_user_like` (integration 13 + suite) returns: PDS-create completes, Jetstream emits a create event, 14 + ingest re-inserts the row that the user already cancelled. 15 + """ 16 + 17 + from __future__ import annotations 18 + 19 + import uuid 20 + from unittest.mock import AsyncMock, patch 21 + 22 + import pytest 23 + from sqlalchemy.ext.asyncio import AsyncSession 24 + 25 + from backend._internal.tasks.pds import ( 26 + LIKE_CANCELLED_TOMBSTONE_PREFIX, 27 + LIKE_CANCELLED_TOMBSTONE_TTL_SECONDS, 28 + pds_create_like, 29 + ) 30 + from backend.models import Artist, Track 31 + 32 + 33 + @pytest.fixture 34 + async def artist(db_session: AsyncSession) -> Artist: 35 + """test artist with a unique DID (xdist-safe).""" 36 + a = Artist( 37 + did=f"did:plc:liketomb_{uuid.uuid4().hex[:12]}", 38 + handle="liketomb.test", 39 + display_name="Like Tombstone Test", 40 + pds_url="https://bsky.social", 41 + ) 42 + db_session.add(a) 43 + await db_session.commit() 44 + return a 45 + 46 + 47 + @pytest.fixture 48 + async def track(db_session: AsyncSession, artist: Artist) -> Track: 49 + """test track owned by `artist`.""" 50 + t = Track( 51 + title="liketomb track", 52 + file_id=f"file_{uuid.uuid4().hex[:12]}", 53 + file_type="mp3", 54 + artist_did=artist.did, 55 + r2_url="https://r2.example.com/liketomb.mp3", 56 + atproto_record_uri=f"at://{artist.did}/fm.plyr.track/liketomb", 57 + atproto_record_cid="bafyliketomb", 58 + audio_storage="r2", 59 + ) 60 + db_session.add(t) 61 + await db_session.commit() 62 + return t 63 + 64 + 65 + def _mock_redis() -> tuple[AsyncMock, dict[str, str]]: 66 + """in-memory redis double that records SETs so we can assert TTL + ordering.""" 67 + store: dict[str, str] = {} 68 + set_calls: list[tuple[str, str, int | None]] = [] 69 + 70 + async def fake_set(key: str, value: str, ex: int | None = None) -> None: 71 + store[key] = value 72 + set_calls.append((key, value, ex)) 73 + 74 + async def fake_exists(key: str) -> int: 75 + return 1 if key in store else 0 76 + 77 + mock = AsyncMock() 78 + mock.set = AsyncMock(side_effect=fake_set) 79 + mock.exists = AsyncMock(side_effect=fake_exists) 80 + mock._set_calls = set_calls 81 + return mock, store 82 + 83 + 84 + class TestPdsCreateLikeTombstones: 85 + """`pds_create_like` writes a tombstone before issuing the orphan PDS 86 + delete, so a still-in-flight Jetstream create event for the same URI 87 + is recognized as already-cancelled in `ingest_like_create`.""" 88 + 89 + async def test_writes_tombstone_when_local_row_already_unliked( 90 + self, db_session: AsyncSession, artist: Artist, track: Track 91 + ) -> None: 92 + """if the optimistic TrackLike row is gone by the time PDS create 93 + returns, tombstone the URI BEFORE calling delete_record_by_uri.""" 94 + # no row in the DB — user already unliked. 95 + nonexistent_like_id = 99999999 96 + created_uri = "at://did:plc:test/fm.plyr.like/cancelled-uri" 97 + 98 + mock_redis, store = _mock_redis() 99 + delete_call_args: list[str] = [] 100 + 101 + async def fake_delete_by_uri(_session: object, uri: str) -> None: 102 + delete_call_args.append(uri) 103 + 104 + with ( 105 + patch( 106 + "backend._internal.tasks.pds.get_session", 107 + return_value=AsyncMock(did=artist.did), 108 + ), 109 + patch( 110 + "backend._internal.tasks.pds.create_like_record", 111 + return_value=created_uri, 112 + ), 113 + patch( 114 + "backend._internal.tasks.pds.delete_record_by_uri", 115 + side_effect=fake_delete_by_uri, 116 + ), 117 + patch( 118 + "backend._internal.tasks.pds.get_async_redis_client", 119 + return_value=mock_redis, 120 + ), 121 + ): 122 + await pds_create_like( 123 + session_id="any-session", 124 + like_id=nonexistent_like_id, 125 + subject_uri=track.atproto_record_uri or "at://did/track/1", 126 + subject_cid=track.atproto_record_cid or "bafy", 127 + ) 128 + 129 + # tombstone is keyed by URI under the documented prefix, with the 130 + # documented TTL — these constants are the contract `ingest.py` 131 + # imports against, so a future drift breaks both halves at once. 132 + tombstone_key = f"{LIKE_CANCELLED_TOMBSTONE_PREFIX}{created_uri}" 133 + assert tombstone_key in store 134 + ((called_key, _, ttl),) = mock_redis._set_calls 135 + assert called_key == tombstone_key 136 + assert ttl == LIKE_CANCELLED_TOMBSTONE_TTL_SECONDS 137 + 138 + # and the orphan PDS delete still fires (the tombstone closes the 139 + # ingest race; the actual PDS record still has to be removed). 140 + assert delete_call_args == [created_uri] 141 + 142 + async def test_no_tombstone_on_happy_path( 143 + self, db_session: AsyncSession, artist: Artist, track: Track 144 + ) -> None: 145 + """the tombstone path is *only* for the orphan-cleanup branch; a 146 + normal PDS create that finds its local row should not pollute 147 + Redis with a tombstone (which would suppress the user's own 148 + Jetstream create event and stall the like indefinitely).""" 149 + from backend.models import TrackLike 150 + 151 + like = TrackLike(track_id=track.id, user_did=artist.did, atproto_like_uri=None) 152 + db_session.add(like) 153 + await db_session.commit() 154 + await db_session.refresh(like) 155 + 156 + mock_redis, store = _mock_redis() 157 + 158 + with ( 159 + patch( 160 + "backend._internal.tasks.pds.get_session", 161 + return_value=AsyncMock(did=artist.did), 162 + ), 163 + patch( 164 + "backend._internal.tasks.pds.create_like_record", 165 + return_value="at://did:plc:test/fm.plyr.like/happy", 166 + ), 167 + patch( 168 + "backend._internal.tasks.pds.delete_record_by_uri", 169 + new_callable=AsyncMock, 170 + ) as mock_delete, 171 + patch( 172 + "backend._internal.tasks.pds.get_async_redis_client", 173 + return_value=mock_redis, 174 + ), 175 + ): 176 + await pds_create_like( 177 + session_id="any-session", 178 + like_id=like.id, 179 + subject_uri=track.atproto_record_uri or "at://did/track/1", 180 + subject_cid=track.atproto_record_cid or "bafy", 181 + ) 182 + 183 + assert store == {}, ( 184 + "happy-path PDS create must not write a tombstone " 185 + "(would suppress the user's own Jetstream create event)" 186 + ) 187 + mock_delete.assert_not_called()
+2 -2
loq.toml
··· 212 212 213 213 [[rules]] 214 214 path = "backend/src/backend/_internal/tasks/ingest.py" 215 - max_lines = 824 215 + max_lines = 839 216 216 217 217 [[rules]] 218 218 path = "backend/tests/test_jetstream.py" 219 - max_lines = 1811 219 + max_lines = 1867 220 220 221 221 [[rules]] 222 222 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte"