audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(restore): re-upload blob to PDS when it's been GC'd (#1325)

the restore path used to strip audioBlob from the republished record
whenever the PDS had already GC'd the revision's original CID, silently
downgrading the track to audio_storage="r2". plyr.fm's core promise is
that users own their audio on their PDS — dropping the blob ref would
break that promise.

new behavior when PDS returns BlobNotFound on the first publish:
1. fetch the R2 bytes via storage.get_file_data(file_id, file_type)
2. upload them to the user's PDS to mint a fresh blob CID
3. republish the record with the fresh audioBlob ref
4. commit the track with audio_storage="both" + the new CID

fallback chain (rare): if R2 is also missing the bytes, or the PDS
rejects the re-upload (oversize, transient), we keep the old behavior
— republish without audioBlob and downgrade to r2-only. restore still
completes; playback keeps working via audio_url.

verified via smoke test on stg.plyr.fm (track 2202) before the fix:
post-restore PDS record had no audioBlob, DB had audio_storage="r2",
pds_blob_cid=null. with this patch, the restored record carries a
first-class PDS blob ref again.

tests:
- rewrote test_restore_falls_back_when_pds_blob_gc →
test_restore_reuploads_blob_when_pds_gc: asserts the retry record
carries the re-uploaded ref and DB ends with audio_storage="both"
- added test_restore_falls_back_to_r2_when_reupload_also_fails: covers
the R2-miss path (retained fallback behavior)

Co-authored-by: Claude Opus 4 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4 (1M context)
and committed by
GitHub
d38aa796 5c6c6b77

+196 -40
+75 -26
backend/src/backend/api/tracks/revisions.py
··· 28 28 29 29 from backend._internal import Session as AuthSession 30 30 from backend._internal import require_auth 31 + from backend._internal.atproto import PayloadTooLargeError, upload_blob 31 32 from backend._internal.atproto.records import build_track_record, update_record 32 33 from backend._internal.audio import AudioFormat 33 34 from backend._internal.track_revisions import prune_revisions 34 35 from backend.api.albums import invalidate_album_cache_by_id 35 36 from backend.config import settings 36 37 from backend.models import Track, TrackRevision 38 + from backend.storage import storage 37 39 from backend.utilities.database import db_session 38 40 39 41 from .router import router ··· 199 201 # if this fails, we abort before touching the DB. 200 202 # 201 203 # if the revision carried a PDS blob ref, include it in the new record so 202 - # the user's PDS keeps its canonical copy of the audio. the blob itself is 203 - # NOT re-uploaded — we trust that PDS still has it (blobs are only GC'd 204 - # after a grace period post-dereference). if the blob has already been 205 - # GC'd by the user's PDS, this record is still valid; playback falls back 206 - # to audio_url (R2). 204 + # the user's PDS keeps its canonical copy of the audio. we first try the 205 + # revision's original CID — if PDS still has it, zero extra cost. if PDS 206 + # has GC'd the blob (normal after audio was replaced), we re-upload the 207 + # bytes from R2 to mint a fresh CID. the core promise of plyr.fm is that 208 + # users own their audio on their PDS; losing the blob ref on restore 209 + # would silently break that promise. 210 + audio_format = AudioFormat.from_extension(f".{revision.file_type}") 211 + content_type = audio_format.media_type if audio_format else "audio/mpeg" 207 212 audio_blob: dict | None = None 208 213 if revision.pds_blob_cid: 209 - audio_format = AudioFormat.from_extension(f".{revision.file_type}") 210 214 audio_blob = { 211 215 "$type": "blob", 212 216 "ref": {"$link": revision.pds_blob_cid}, 213 - "mimeType": audio_format.media_type if audio_format else "audio/mpeg", 217 + "mimeType": content_type, 214 218 "size": revision.pds_blob_size or 0, 215 219 } 216 220 ··· 227 231 audio_blob=audio_blob, 228 232 description=track.description, 229 233 ) 230 - # `pds_blob_lost` tracks whether we had to drop the blob ref because the 231 - # user's PDS no longer has it. when True, we downgrade the restored track 232 - # to audio_storage="r2" with null pds_blob_cid on commit — R2 playback 233 - # still works, but the PDS-hosted copy is genuinely gone. 234 + # track the effective PDS blob state that will land on the DB row: 235 + # - if the initial update_record succeeds: revision's own ref (no change) 236 + # - if we had to re-upload: the newly-minted ref from the retry 237 + # - if re-upload also fails: None → downgrade track to audio_storage="r2" 238 + effective_blob_cid: str | None = revision.pds_blob_cid 239 + effective_blob_size: int | None = revision.pds_blob_size 234 240 pds_blob_lost = False 235 241 try: 236 242 _, new_cid = await update_record( ··· 241 247 except Exception as exc: 242 248 # PDSes GC unreferenced blobs after a short grace period. when a user 243 249 # replaces audio and then later restores, the old blob may already be 244 - # gone. retry without the blob ref so the restore still succeeds — 245 - # the audio remains available via R2 (audio_url). 250 + # gone. re-upload the R2 bytes to PDS to mint a fresh CID so the 251 + # restored record still references a first-class PDS blob. 246 252 if audio_blob is not None and "BlobNotFound" in str(exc): 247 253 logfire.info( 248 - "restore: PDS lost the old blob; publishing without audioBlob", 254 + "restore: PDS lost the old blob; re-uploading bytes from R2", 249 255 track_id=track_id, 250 256 revision_id=revision_id, 251 - blob_cid=revision.pds_blob_cid, 257 + old_blob_cid=revision.pds_blob_cid, 252 258 ) 253 - # build a fresh dict for the retry — never mutate `new_record` 254 - # in place so callers / tests can inspect the original payload 255 - retry_record = {k: v for k, v in new_record.items() if k != "audioBlob"} 256 - pds_blob_lost = True 259 + reupload_blob_ref: dict | None = None 260 + try: 261 + audio_data = await storage.get_file_data( 262 + revision.file_id, revision.file_type 263 + ) 264 + if audio_data: 265 + reupload_blob_ref = await upload_blob( 266 + auth_session, audio_data, content_type 267 + ) 268 + except PayloadTooLargeError: 269 + logfire.info( 270 + "restore: re-upload skipped (PDS payload too large)", 271 + track_id=track_id, 272 + revision_id=revision_id, 273 + ) 274 + except Exception: 275 + logfire.exception( 276 + "restore: re-upload failed; falling back to r2-only", 277 + track_id=track_id, 278 + revision_id=revision_id, 279 + ) 280 + 281 + if reupload_blob_ref is not None: 282 + # success path: replace the audioBlob in the record with the 283 + # fresh ref and retry publishing. build a fresh dict rather 284 + # than mutating the original new_record. 285 + retry_record = dict(new_record) 286 + retry_record["audioBlob"] = reupload_blob_ref 287 + effective_blob_cid = reupload_blob_ref.get("ref", {}).get("$link") 288 + effective_blob_size = reupload_blob_ref.get("size") 289 + else: 290 + # could not re-upload (R2 miss / oversize / transient failure) 291 + # — fall back to the old drop-the-ref behavior so the restore 292 + # still completes, and mark the track r2-only on commit. 293 + retry_record = {k: v for k, v in new_record.items() if k != "audioBlob"} 294 + effective_blob_cid = None 295 + effective_blob_size = None 296 + pds_blob_lost = True 297 + 257 298 try: 258 299 _, new_cid = await update_record( 259 300 auth_session=auth_session, ··· 262 303 ) 263 304 except Exception as exc2: 264 305 logfire.exception( 265 - "restore: failed to update ATProto record (retry without blob)", 306 + "restore: failed to update ATProto record (retry)", 266 307 track_id=track_id, 267 308 revision_id=revision_id, 268 309 ) ··· 311 352 live_track.r2_url = revision.audio_url 312 353 live_track.atproto_record_cid = new_cid 313 354 314 - # if the PDS lost the blob between replace and restore, downgrade the 315 - # track's recorded storage state so it matches the published record 316 - # (no audioBlob). otherwise copy through the revision's state. 355 + # if the PDS lost the blob AND we couldn't re-upload it, downgrade the 356 + # track's recorded storage state to match the published record (no 357 + # audioBlob). otherwise record the effective blob ref on the track — 358 + # this is the revision's original CID if PDS still had it, or a fresh 359 + # CID minted by the re-upload path. 317 360 if pds_blob_lost: 318 361 live_track.audio_storage = "r2" 319 362 live_track.pds_blob_cid = None 320 363 live_track.pds_blob_size = None 321 364 else: 322 - live_track.audio_storage = revision.audio_storage 323 - live_track.pds_blob_cid = revision.pds_blob_cid 324 - live_track.pds_blob_size = revision.pds_blob_size 365 + # if the original had no PDS blob (audio_storage="r2"), preserve 366 + # that; otherwise we successfully have a blob on PDS (either the 367 + # original one or a re-uploaded one) so ensure storage reflects it. 368 + if effective_blob_cid is not None: 369 + live_track.audio_storage = "both" 370 + else: 371 + live_track.audio_storage = revision.audio_storage 372 + live_track.pds_blob_cid = effective_blob_cid 373 + live_track.pds_blob_size = effective_blob_size 325 374 326 375 # update duration in extra; clear stale genre-prediction provenance so 327 376 # a future re-classification doesn't get short-circuited.
+120 -13
backend/tests/api/track_audio_replace/test_revisions.py
··· 398 398 assert refreshed.pds_blob_cid == "bafkreiORIGINALBLOB" 399 399 assert refreshed.pds_blob_size == 4096 400 400 401 - async def test_restore_falls_back_when_pds_blob_gc( 401 + async def test_restore_reuploads_blob_when_pds_gc( 402 402 self, 403 403 test_app_owner: FastAPI, 404 404 db_session: AsyncSession, 405 405 owner: Artist, 406 406 ) -> None: 407 - """if the user's PDS has already GC'd the old blob, restore must not 408 - fail. it should retry publishing WITHOUT audioBlob and downgrade the 409 - track to audio_storage='r2' so DB + PDS stay consistent. R2 playback 410 - still works — only the PDS-hosted copy is lost, which is expected 411 - after GC.""" 407 + """if the user's PDS has GC'd the old blob, restore re-uploads the 408 + R2 bytes to PDS to mint a fresh CID and republishes the record with 409 + the new ref. the core plyr.fm promise is that users own their audio 410 + on their PDS — dropping the ref silently would break that. 411 + """ 412 412 track = make_track(file_id="CURRENT-NEW", duration=200) 413 413 track.audio_storage = "both" 414 414 track.pds_blob_cid = "bafkreiNEWBLOB" ··· 443 443 RuntimeError( 444 444 'PDS request failed: 400 {"error":"BlobNotFound","message":"Could not find blob: bafkreiGCdBLOB"}' 445 445 ), 446 - (TRACK_URI, "bafyRESTOREDNOBBLOB"), 446 + (TRACK_URI, "bafyRESTOREDWITHFRESHBLOB"), 447 447 ] 448 448 ) 449 - with patch("backend.api.tracks.revisions.update_record", update_record_mock): 449 + # storage returns the R2 bytes, upload_blob mints a fresh CID 450 + reupload_ref = { 451 + "$type": "blob", 452 + "ref": {"$link": "bafkreiFRESH"}, 453 + "mimeType": "audio/wav", 454 + "size": 4096, 455 + } 456 + get_file_data_mock = AsyncMock(return_value=b"fake-audio-bytes") 457 + upload_blob_mock = AsyncMock(return_value=reupload_ref) 458 + 459 + with ( 460 + patch("backend.api.tracks.revisions.update_record", update_record_mock), 461 + patch( 462 + "backend.api.tracks.revisions.storage.get_file_data", 463 + get_file_data_mock, 464 + ), 465 + patch("backend.api.tracks.revisions.upload_blob", upload_blob_mock), 466 + ): 450 467 async with AsyncClient( 451 468 transport=ASGITransport(app=test_app_owner), base_url="http://test" 452 469 ) as client: ··· 455 472 ) 456 473 457 474 assert resp.status_code == 200 458 - # first attempt included audioBlob; retry dropped it 475 + # first attempt used the revision's original (GC'd) CID; retry used 476 + # the freshly re-uploaded blob ref — NOT a record with audioBlob dropped 459 477 assert update_record_mock.call_count == 2 460 478 first_record = update_record_mock.call_args_list[0].kwargs["record"] 461 479 second_record = update_record_mock.call_args_list[1].kwargs["record"] 462 - assert first_record.get("audioBlob") is not None 463 - assert second_record.get("audioBlob") is None 480 + assert first_record["audioBlob"]["ref"]["$link"] == "bafkreiGCdBLOB" 481 + assert second_record["audioBlob"]["ref"]["$link"] == "bafkreiFRESH" 482 + 483 + # re-upload happened with the R2 bytes 484 + get_file_data_mock.assert_awaited_once_with("ORIGINAL", "wav") 485 + upload_blob_mock.assert_awaited_once() 486 + assert upload_blob_mock.call_args.args[1] == b"fake-audio-bytes" 487 + assert upload_blob_mock.call_args.args[2] == "audio/wav" 488 + 489 + # DB reflects the fresh blob — audio_storage stays "both", pds_blob_cid 490 + # is the re-uploaded CID (not the original GC'd one, not null) 491 + db_session.expire_all() 492 + refreshed = await db_session.get(Track, track_id) 493 + assert refreshed is not None 494 + assert refreshed.file_id == "ORIGINAL" 495 + assert refreshed.audio_storage == "both" 496 + assert refreshed.pds_blob_cid == "bafkreiFRESH" 497 + assert refreshed.pds_blob_size == 4096 498 + assert refreshed.atproto_record_cid == "bafyRESTOREDWITHFRESHBLOB" 499 + 500 + async def test_restore_falls_back_to_r2_when_reupload_also_fails( 501 + self, 502 + test_app_owner: FastAPI, 503 + db_session: AsyncSession, 504 + owner: Artist, 505 + ) -> None: 506 + """if the PDS GC'd the blob AND re-upload also fails (R2 miss, 507 + PDS oversize, transient error), restore still completes by 508 + dropping the blob ref and downgrading the track to r2-only. 509 + playback keeps working via audio_url; only the PDS-hosted copy 510 + is genuinely gone.""" 511 + track = make_track(file_id="CURRENT-NEW", duration=200) 512 + track.audio_storage = "both" 513 + track.pds_blob_cid = "bafkreiNEWBLOB" 514 + track.pds_blob_size = 9999 515 + db_session.add(track) 516 + await db_session.commit() 517 + await db_session.refresh(track) 518 + 519 + revision = TrackRevision( 520 + track_id=track.id, 521 + file_id="ORIGINAL", 522 + file_type="wav", 523 + original_file_id=None, 524 + original_file_type=None, 525 + audio_storage="both", 526 + audio_url="https://audio.example/ORIGINAL.wav", 527 + pds_blob_cid="bafkreiGCdBLOB", 528 + pds_blob_size=4096, 529 + duration=120, 530 + was_gated=False, 531 + ) 532 + db_session.add(revision) 533 + await db_session.commit() 534 + await db_session.refresh(revision) 535 + revision_id = revision.id 536 + track_id = track.id 464 537 465 - # DB now reflects the downgraded state — track's published record has 466 - # no audioBlob, so audio_storage must be 'r2' and pds_blob_cid null 538 + update_record_mock = AsyncMock( 539 + side_effect=[ 540 + RuntimeError( 541 + 'PDS request failed: 400 {"error":"BlobNotFound","message":"Could not find blob: bafkreiGCdBLOB"}' 542 + ), 543 + (TRACK_URI, "bafyRESTOREDNOBBLOB"), 544 + ] 545 + ) 546 + # R2 returns None (bytes genuinely unrecoverable), so re-upload 547 + # is impossible and we fall back to publishing without audioBlob 548 + get_file_data_mock = AsyncMock(return_value=None) 549 + upload_blob_mock = AsyncMock() 550 + 551 + with ( 552 + patch("backend.api.tracks.revisions.update_record", update_record_mock), 553 + patch( 554 + "backend.api.tracks.revisions.storage.get_file_data", 555 + get_file_data_mock, 556 + ), 557 + patch("backend.api.tracks.revisions.upload_blob", upload_blob_mock), 558 + ): 559 + async with AsyncClient( 560 + transport=ASGITransport(app=test_app_owner), base_url="http://test" 561 + ) as client: 562 + resp = await client.post( 563 + f"/tracks/{track_id}/revisions/{revision_id}/restore" 564 + ) 565 + 566 + assert resp.status_code == 200 567 + # upload_blob was not called because there were no bytes 568 + upload_blob_mock.assert_not_awaited() 569 + # retry record has audioBlob dropped (fallback path) 570 + second_record = update_record_mock.call_args_list[1].kwargs["record"] 571 + assert "audioBlob" not in second_record 572 + 573 + # DB downgraded to r2-only 467 574 db_session.expire_all() 468 575 refreshed = await db_session.get(Track, track_id) 469 576 assert refreshed is not None
+1 -1
loq.toml
··· 268 268 269 269 [[rules]] 270 270 path = "backend/tests/api/track_audio_replace/test_revisions.py" 271 - max_lines = 561 271 + max_lines = 668