audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(restore): fall back gracefully when PDS has GC'd the old blob (#1320)

* fix(restore): fall back when PDS has GC'd old blob

* test(integration): retry-poll the liked_tracks check in test_cross_user_like

failed in the #1319 post-deploy integration run: the liked_tracks list
was still showing the track immediately after unlike. pre-existing
eventual-consistency gap — the likes pipeline has a small lag between
the unlike write and the liked list read (cache / read-replica).

matches the pattern test_upload_searchable already uses for similar
eventually-consistent reads: retry up to 5 times with 1s sleep, fail
with a clear message if the track is still there.

authored by

nate nowack and committed by
GitHub
e8463db2 f282e3ad

+152 -16
+58 -12
backend/src/backend/api/tracks/revisions.py
··· 227 227 audio_blob=audio_blob, 228 228 description=track.description, 229 229 ) 230 + # `pds_blob_lost` tracks whether we had to drop the blob ref because the 231 + # user's PDS no longer has it. when True, we downgrade the restored track 232 + # to audio_storage="r2" with null pds_blob_cid on commit — R2 playback 233 + # still works, but the PDS-hosted copy is genuinely gone. 234 + pds_blob_lost = False 230 235 try: 231 236 _, new_cid = await update_record( 232 237 auth_session=auth_session, ··· 234 239 record=new_record, 235 240 ) 236 241 except Exception as exc: 237 - logfire.exception( 238 - "restore: failed to update ATProto record", 239 - track_id=track_id, 240 - revision_id=revision_id, 241 - ) 242 - raise HTTPException( 243 - status_code=502, 244 - detail=f"failed to publish restored record: {exc}", 245 - ) from exc 242 + # PDSes GC unreferenced blobs after a short grace period. when a user 243 + # replaces audio and then later restores, the old blob may already be 244 + # gone. retry without the blob ref so the restore still succeeds — 245 + # the audio remains available via R2 (audio_url). 246 + if audio_blob is not None and "BlobNotFound" in str(exc): 247 + logfire.info( 248 + "restore: PDS lost the old blob; publishing without audioBlob", 249 + track_id=track_id, 250 + revision_id=revision_id, 251 + blob_cid=revision.pds_blob_cid, 252 + ) 253 + # build a fresh dict for the retry — never mutate `new_record` 254 + # in place so callers / tests can inspect the original payload 255 + retry_record = {k: v for k, v in new_record.items() if k != "audioBlob"} 256 + pds_blob_lost = True 257 + try: 258 + _, new_cid = await update_record( 259 + auth_session=auth_session, 260 + record_uri=track.atproto_record_uri, 261 + record=retry_record, 262 + ) 263 + except Exception as exc2: 264 + logfire.exception( 265 + "restore: failed to update ATProto record (retry without blob)", 266 + track_id=track_id, 267 + revision_id=revision_id, 268 + ) 269 + raise HTTPException( 270 + status_code=502, 271 + detail=f"failed to publish restored record: {exc2}", 272 + ) from exc2 273 + else: 274 + logfire.exception( 275 + "restore: failed to update ATProto record", 276 + track_id=track_id, 277 + revision_id=revision_id, 278 + ) 279 + raise HTTPException( 280 + status_code=502, 281 + detail=f"failed to publish restored record: {exc}", 282 + ) from exc 246 283 247 284 # commit: snapshot current → update track → delete chosen revision. 248 285 # all in one transaction so we never end up with a track pointing at a ··· 271 308 live_track.file_type = revision.file_type 272 309 live_track.original_file_id = revision.original_file_id 273 310 live_track.original_file_type = revision.original_file_type 274 - live_track.audio_storage = revision.audio_storage 275 311 live_track.r2_url = revision.audio_url 276 - live_track.pds_blob_cid = revision.pds_blob_cid 277 - live_track.pds_blob_size = revision.pds_blob_size 278 312 live_track.atproto_record_cid = new_cid 313 + 314 + # if the PDS lost the blob between replace and restore, downgrade the 315 + # track's recorded storage state so it matches the published record 316 + # (no audioBlob). otherwise copy through the revision's state. 317 + if pds_blob_lost: 318 + live_track.audio_storage = "r2" 319 + live_track.pds_blob_cid = None 320 + live_track.pds_blob_size = None 321 + else: 322 + live_track.audio_storage = revision.audio_storage 323 + live_track.pds_blob_cid = revision.pds_blob_cid 324 + live_track.pds_blob_size = revision.pds_blob_size 279 325 280 326 # update duration in extra; clear stale genre-prediction provenance so 281 327 # a future re-classification doesn't get short-circuited.
+75
backend/tests/api/track_audio_replace/test_revisions.py
··· 398 398 assert refreshed.pds_blob_cid == "bafkreiORIGINALBLOB" 399 399 assert refreshed.pds_blob_size == 4096 400 400 401 + async def test_restore_falls_back_when_pds_blob_gc( 402 + self, 403 + test_app_owner: FastAPI, 404 + db_session: AsyncSession, 405 + owner: Artist, 406 + ) -> None: 407 + """if the user's PDS has already GC'd the old blob, restore must not 408 + fail. it should retry publishing WITHOUT audioBlob and downgrade the 409 + track to audio_storage='r2' so DB + PDS stay consistent. R2 playback 410 + still works — only the PDS-hosted copy is lost, which is expected 411 + after GC.""" 412 + track = make_track(file_id="CURRENT-NEW", duration=200) 413 + track.audio_storage = "both" 414 + track.pds_blob_cid = "bafkreiNEWBLOB" 415 + track.pds_blob_size = 9999 416 + db_session.add(track) 417 + await db_session.commit() 418 + await db_session.refresh(track) 419 + 420 + # revision with a PDS blob ref that PDS no longer has 421 + revision = TrackRevision( 422 + track_id=track.id, 423 + file_id="ORIGINAL", 424 + file_type="wav", 425 + original_file_id=None, 426 + original_file_type=None, 427 + audio_storage="both", 428 + audio_url="https://audio.example/ORIGINAL.wav", 429 + pds_blob_cid="bafkreiGCdBLOB", 430 + pds_blob_size=4096, 431 + duration=120, 432 + was_gated=False, 433 + ) 434 + db_session.add(revision) 435 + await db_session.commit() 436 + await db_session.refresh(revision) 437 + revision_id = revision.id 438 + track_id = track.id 439 + 440 + # first call raises BlobNotFound (real PDS error shape), second succeeds 441 + update_record_mock = AsyncMock( 442 + side_effect=[ 443 + RuntimeError( 444 + 'PDS request failed: 400 {"error":"BlobNotFound","message":"Could not find blob: bafkreiGCdBLOB"}' 445 + ), 446 + (TRACK_URI, "bafyRESTOREDNOBBLOB"), 447 + ] 448 + ) 449 + with patch("backend.api.tracks.revisions.update_record", update_record_mock): 450 + async with AsyncClient( 451 + transport=ASGITransport(app=test_app_owner), base_url="http://test" 452 + ) as client: 453 + resp = await client.post( 454 + f"/tracks/{track_id}/revisions/{revision_id}/restore" 455 + ) 456 + 457 + assert resp.status_code == 200 458 + # first attempt included audioBlob; retry dropped it 459 + assert update_record_mock.call_count == 2 460 + first_record = update_record_mock.call_args_list[0].kwargs["record"] 461 + second_record = update_record_mock.call_args_list[1].kwargs["record"] 462 + assert first_record.get("audioBlob") is not None 463 + assert second_record.get("audioBlob") is None 464 + 465 + # DB now reflects the downgraded state — track's published record has 466 + # no audioBlob, so audio_storage must be 'r2' and pds_blob_cid null 467 + db_session.expire_all() 468 + refreshed = await db_session.get(Track, track_id) 469 + assert refreshed is not None 470 + assert refreshed.file_id == "ORIGINAL" 471 + assert refreshed.audio_storage == "r2" 472 + assert refreshed.pds_blob_cid is None 473 + assert refreshed.pds_blob_size is None 474 + assert refreshed.atproto_record_cid == "bafyRESTOREDNOBBLOB" 475 + 401 476 async def test_409_when_gating_mismatches( 402 477 self, 403 478 test_app_owner: FastAPI,
+15 -4
backend/tests/integration/test_interactions.py
··· 59 59 track = await client1.get_track(track_id) 60 60 assert track.like_count == initial_likes 61 61 62 - # verify track no longer in user2's liked tracks 63 - liked = await client2.liked_tracks(limit=100) 64 - liked_ids = [t.id for t in liked] 65 - assert track_id not in liked_ids 62 + # verify track no longer in user2's liked tracks. the liked list can 63 + # lag a beat after unlike (cache / read-replica propagation), so 64 + # retry-poll before asserting — matches test_upload_searchable's 65 + # pattern for eventually-consistent reads. 66 + import asyncio 67 + 68 + for _ in range(5): 69 + liked = await client2.liked_tracks(limit=100) 70 + liked_ids = [t.id for t in liked] 71 + if track_id not in liked_ids: 72 + break 73 + await asyncio.sleep(1) 74 + assert track_id not in liked_ids, ( 75 + f"track {track_id} still in user2's liked after unlike + 5s poll" 76 + ) 66 77 67 78 finally: 68 79 await client1.delete(track_id)
+4
loq.toml
··· 265 265 [[rules]] 266 266 path = "backend/tests/api/track_audio_replace/test_pipeline.py" 267 267 max_lines = 512 268 + 269 + [[rules]] 270 + path = "backend/tests/api/track_audio_replace/test_revisions.py" 271 + max_lines = 561