audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: reserve-then-publish to prevent upload/Jetstream race (#1072)

* fix: reserve-then-publish to prevent upload/Jetstream race

Upload path now generates rkey upfront, reserves the DB row as "pending",
then publishes to PDS via putRecord. Jetstream ingest reconciles pending
rows instead of racing to create duplicates. Only the winner of the
pending→published transition runs post-creation hooks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: CAS-guard cleanup on ambiguous PDS failure, defer album creation

- Only delete pending row + media if row is still pending (CAS delete).
On ambiguous failures (timeout after PDS committed), Jetstream may
have already finalized the row — don't destroy a published track.
- Defer get_or_create_album to after PDS success to avoid orphan albums
when the publish step fails.
- Add db_session.expire_all() in tests before re-querying rows that
ingest_track_create committed in its own session.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6
and committed by
GitHub
df186f5b 26d90b47

+388 -88
+29
backend/alembic/versions/2026_03_10_214234_5007f35f03d9_add_track_publish_state_column.py
··· 1 + """add track publish_state column 2 + 3 + Revision ID: 5007f35f03d9 4 + Revises: e2d11e296633 5 + Create Date: 2026-03-10 21:42:34.157861 6 + 7 + """ 8 + 9 + from collections.abc import Sequence 10 + 11 + import sqlalchemy as sa 12 + 13 + from alembic import op 14 + 15 + # revision identifiers, used by Alembic. 16 + revision: str = "5007f35f03d9" 17 + down_revision: str | Sequence[str] | None = "e2d11e296633" 18 + branch_labels: str | Sequence[str] | None = None 19 + depends_on: str | Sequence[str] | None = None 20 + 21 + 22 + def upgrade() -> None: 23 + """Upgrade schema.""" 24 + op.add_column("tracks", sa.Column("publish_state", sa.String(), nullable=True)) 25 + 26 + 27 + def downgrade() -> None: 28 + """Downgrade schema.""" 29 + op.drop_column("tracks", "publish_state")
+21 -5
backend/src/backend/_internal/atproto/records/fm_plyr/track.py
··· 25 25 support_gate: dict[str, Any] | None = None, 26 26 audio_blob: BlobRef | None = None, 27 27 description: str | None = None, 28 + created_at: datetime | None = None, 28 29 ) -> dict[str, Any]: 29 30 """Build a track record dict for ATProto. 30 31 ··· 40 41 support_gate: optional gating config (e.g., {"type": "any"}) 41 42 audio_blob: optional blob reference from PDS upload (canonical source when present) 42 43 description: optional track description (liner notes, show notes) 44 + created_at: optional timestamp (uses now if not provided) 43 45 44 46 returns: 45 47 record dict ready for ATProto 46 48 """ 49 + ts = (created_at or datetime.now(UTC)).isoformat().replace("+00:00", "Z") 47 50 record: dict[str, Any] = { 48 51 "$type": settings.atproto.track_collection, 49 52 "title": title, 50 53 "artist": artist, 51 54 "audioUrl": audio_url, 52 55 "fileType": file_type, 53 - "createdAt": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 56 + "createdAt": ts, 54 57 } 55 58 56 59 # add optional fields ··· 95 98 support_gate: dict[str, Any] | None = None, 96 99 audio_blob: BlobRef | None = None, 97 100 description: str | None = None, 101 + rkey: str | None = None, 102 + created_at: datetime | None = None, 98 103 ) -> tuple[str, str]: 99 104 """Create a track record on the user's PDS using the configured collection. 100 105 106 + when rkey is provided, uses putRecord for idempotent creation (safe for retries 107 + and avoids races with Jetstream echo). otherwise falls back to createRecord 108 + which lets the PDS auto-generate the rkey. 109 + 101 110 args: 102 111 auth_session: authenticated user session 103 112 title: track title ··· 111 120 support_gate: optional gating config (e.g., {"type": "any"}) 112 121 audio_blob: optional blob reference from PDS upload (canonical source when present) 113 122 description: optional track description (liner notes, show notes) 123 + rkey: optional explicit record key (TID). uses putRecord when provided 124 + created_at: optional timestamp for the record (uses now if not provided) 114 125 115 126 returns: 116 127 tuple of (record_uri, record_cid) ··· 131 142 support_gate=support_gate, 132 143 audio_blob=audio_blob, 133 144 description=description, 145 + created_at=created_at, 134 146 ) 135 147 136 - payload = { 148 + payload: dict[str, Any] = { 137 149 "repo": auth_session.did, 138 150 "collection": settings.atproto.track_collection, 139 151 "record": record, 140 152 } 141 153 142 - result = await make_pds_request( 143 - auth_session, "POST", "com.atproto.repo.createRecord", payload 144 - ) 154 + if rkey: 155 + payload["rkey"] = rkey 156 + endpoint = "com.atproto.repo.putRecord" 157 + else: 158 + endpoint = "com.atproto.repo.createRecord" 159 + 160 + result = await make_pds_request(auth_session, "POST", endpoint, payload) 145 161 return result["uri"], result["cid"] 146 162 147 163
+36 -8
backend/src/backend/_internal/tasks/ingest.py
··· 78 78 logger.debug("ingest_track_create: unknown artist %s, skipping", did) 79 79 return 80 80 81 - # dedup by AT URI 81 + # check for existing row by URI (may be pending from upload path) 82 82 existing = await db.execute( 83 - select(Track.id).where(Track.atproto_record_uri == uri).limit(1) 83 + select(Track).where(Track.atproto_record_uri == uri).limit(1) 84 84 ) 85 - if existing.scalar_one_or_none() is not None: 86 - logger.debug("ingest_track_create: duplicate URI %s, skipping", uri) 85 + existing_track = existing.scalar_one_or_none() 86 + 87 + if existing_track is not None: 88 + if existing_track.publish_state == "pending": 89 + # upload path reserved this row — finalize it 90 + existing_track.atproto_record_cid = cid 91 + existing_track.publish_state = "published" 92 + await db.commit() 93 + await db.refresh(existing_track) 94 + 95 + resolved_audio_url = existing_track.r2_url 96 + if ( 97 + not resolved_audio_url 98 + and existing_track.pds_blob_cid 99 + and artist.pds_url 100 + ): 101 + resolved_audio_url = pds_blob_url( 102 + artist.pds_url, did, existing_track.pds_blob_cid 103 + ) 104 + 105 + logfire.info( 106 + "ingest: finalized pending track", 107 + uri=uri, 108 + artist_did=did, 109 + track_id=existing_track.id, 110 + ) 111 + 112 + await run_post_track_create_hooks( 113 + existing_track.id, audio_url=resolved_audio_url 114 + ) 115 + else: 116 + logger.debug("ingest_track_create: duplicate URI %s, skipping", uri) 87 117 return 88 118 89 - # determine audio storage type 119 + # no existing row — create from scratch (external ATProto client) 90 120 audio_blob = record.get("audioBlob") 91 121 audio_url = record.get("audioUrl") 92 122 pds_blob_cid = ( ··· 104 134 else: 105 135 audio_storage = "pds" 106 136 107 - # build extra dict 108 137 extra: dict = {} 109 138 if duration := record.get("duration"): 110 139 extra["duration"] = duration ··· 121 150 atproto_record_cid=cid, 122 151 audio_storage=audio_storage, 123 152 pds_blob_cid=pds_blob_cid, 153 + publish_state="published", 124 154 description=record.get("description"), 125 155 image_url=record.get("imageUrl"), 126 156 support_gate=record.get("supportGate"), ··· 135 165 logger.debug("ingest_track_create: duplicate URI %s (race), skipping", uri) 136 166 return 137 167 138 - # resolve audio URL for post-creation hooks 139 168 resolved_audio_url = track.r2_url 140 169 if not resolved_audio_url and pds_blob_cid and artist.pds_url: 141 170 resolved_audio_url = pds_blob_url(artist.pds_url, did, pds_blob_cid) ··· 147 176 audio_storage=audio_storage, 148 177 ) 149 178 150 - # shared post-creation hooks (copyright, embeddings, cache, etc.) 151 179 await run_post_track_create_hooks(track.id, audio_url=resolved_audio_url) 152 180 153 181
+156 -72
backend/src/backend/api/tracks/uploads.py
··· 23 23 ) 24 24 from fastapi.responses import StreamingResponse 25 25 from pydantic import BaseModel 26 - from sqlalchemy import select 26 + from sqlalchemy import delete, select, update 27 27 from sqlalchemy.exc import IntegrityError 28 28 from sqlalchemy.ext.asyncio import AsyncSession 29 29 ··· 591 591 image_id: str | None, 592 592 image_url: str | None, 593 593 thumbnail_url: str | None = None, 594 - ) -> Track: 595 - """phase 6: create ATProto record + DB track record.""" 594 + ) -> tuple[Track, bool]: 595 + """phase 6: reserve DB row, create ATProto record, finalize. 596 + 597 + uses reserve-then-publish to avoid races with Jetstream ingest: 598 + 1. generate rkey (TID) upfront and reserve the DB row as "pending" 599 + 2. publish to PDS with explicit rkey via putRecord 600 + 3. atomic CAS update pending → published (only winner runs hooks) 601 + 602 + returns: 603 + (track, published_by_us) — published_by_us is False if Jetstream 604 + ingest finalized the row before we could. 605 + """ 606 + from datetime import UTC, datetime 607 + 608 + from backend._internal.atproto.tid import datetime_to_tid 609 + 596 610 ext = Path(ctx.filename).suffix.lower() 597 611 playable_file_type = sr.playable_format.value if sr.playable_format else ext[1:] 598 612 613 + # compute audio URL for ATProto record 614 + if audio_info.is_gated: 615 + from urllib.parse import urljoin 616 + 617 + backend_url = settings.atproto.redirect_uri.rsplit("/", 2)[0] 618 + audio_url_for_record = urljoin(backend_url + "/", f"audio/{sr.file_id}") 619 + else: 620 + assert sr.r2_url is not None 621 + audio_url_for_record = sr.r2_url 622 + 623 + # generate deterministic rkey and AT URI 624 + created_at = datetime.now(UTC) 625 + rkey = datetime_to_tid(created_at) 626 + collection = settings.atproto.track_collection 627 + uri = f"at://{ctx.artist_did}/{collection}/{rkey}" 628 + 629 + # step 1: reserve DB row as pending 630 + await job_service.update_progress( 631 + ctx.upload_id, 632 + JobStatus.PROCESSING, 633 + "saving track metadata...", 634 + phase="database", 635 + ) 636 + 599 637 async with db_session() as db: 600 638 result = await db.execute(select(Artist).where(Artist.did == ctx.artist_did)) 601 639 artist = result.scalar_one_or_none() ··· 615 653 ctx.features_json, artist.handle 616 654 ) 617 655 618 - # create ATProto record 619 - await job_service.update_progress( 620 - ctx.upload_id, 621 - JobStatus.PROCESSING, 622 - "creating atproto record...", 623 - phase="atproto", 624 - ) 625 - try: 626 - if audio_info.is_gated: 627 - from urllib.parse import urljoin 628 - 629 - backend_url = settings.atproto.redirect_uri.rsplit("/", 2)[0] 630 - audio_url_for_record = urljoin(backend_url + "/", f"audio/{sr.file_id}") 631 - else: 632 - assert sr.r2_url is not None 633 - audio_url_for_record = sr.r2_url 634 - 635 - atproto_result = await create_track_record( 636 - auth_session=ctx.auth_session, 637 - title=ctx.title, 638 - artist=artist.display_name, 639 - audio_url=audio_url_for_record, 640 - file_type=playable_file_type, 641 - album=ctx.album, 642 - duration=audio_info.duration, 643 - features=featured_artists or None, 644 - image_url=image_url, 645 - support_gate=ctx.support_gate, 646 - audio_blob=pds_result.blob_ref if pds_result else None, 647 - description=ctx.description, 648 - ) 649 - if not atproto_result: 650 - raise ValueError("PDS returned no record data") 651 - atproto_uri, atproto_cid = atproto_result 652 - except Exception as e: 653 - logger.error("ATProto sync failed for upload %s: %s", ctx.upload_id, e) 654 - # cleanup orphaned media 655 - with contextlib.suppress(Exception): 656 - await storage.delete(sr.file_id, playable_file_type) 657 - if sr.original_file_id and sr.original_file_type: 658 - with contextlib.suppress(Exception): 659 - await storage.delete(sr.original_file_id, sr.original_file_type) 660 - if image_id: 661 - with contextlib.suppress(Exception): 662 - await storage.delete(image_id) 663 - raise UploadPhaseError(f"failed to sync track to ATProto: {e}") from e 664 - 665 - # create DB record 666 - await job_service.update_progress( 667 - ctx.upload_id, 668 - JobStatus.PROCESSING, 669 - "saving track metadata...", 670 - phase="database", 671 - ) 672 - 673 656 extra: dict = {} 674 657 if audio_info.duration: 675 658 extra["duration"] = audio_info.duration 676 659 if ctx.auto_tag: 677 660 extra["auto_tag"] = True 678 - 679 - album_record = None 680 661 if ctx.album: 681 662 extra["album"] = ctx.album 682 - album_record = await get_or_create_album( 683 - db, artist, ctx.album, image_id, image_url 684 - ) 685 663 686 664 has_pds_blob = pds_result and pds_result.cid is not None 687 665 audio_storage = "both" if has_pds_blob else "r2" 688 666 667 + artist_display_name = artist.display_name 668 + 669 + # album creation deferred to after PDS success to avoid orphan albums 689 670 track = Track( 690 671 title=ctx.title, 691 672 file_id=sr.file_id, ··· 695 676 artist_did=ctx.artist_did, 696 677 description=ctx.description, 697 678 extra=extra, 698 - album_id=album_record.id if album_record else None, 679 + album_id=None, 699 680 features=featured_artists, 700 681 r2_url=sr.r2_url, 701 - atproto_record_uri=atproto_uri, 702 - atproto_record_cid=atproto_cid, 682 + atproto_record_uri=uri, 683 + atproto_record_cid=None, 684 + created_at=created_at, 685 + publish_state="pending", 703 686 image_id=image_id, 704 687 image_url=image_url, 705 688 thumbnail_url=thumbnail_url, ··· 722 705 await storage.delete(sr.original_file_id, sr.original_file_type) 723 706 raise UploadPhaseError(f"database constraint violation: {e!s}") from e 724 707 725 - return track 708 + track_id = track.id 709 + 710 + # step 2: publish to PDS with explicit rkey (putRecord for idempotency) 711 + await job_service.update_progress( 712 + ctx.upload_id, 713 + JobStatus.PROCESSING, 714 + "creating atproto record...", 715 + phase="atproto", 716 + ) 717 + try: 718 + atproto_result = await create_track_record( 719 + auth_session=ctx.auth_session, 720 + title=ctx.title, 721 + artist=artist_display_name, 722 + audio_url=audio_url_for_record, 723 + file_type=playable_file_type, 724 + album=ctx.album, 725 + duration=audio_info.duration, 726 + features=featured_artists or None, 727 + image_url=image_url, 728 + support_gate=ctx.support_gate, 729 + audio_blob=pds_result.blob_ref if pds_result else None, 730 + description=ctx.description, 731 + rkey=rkey, 732 + created_at=created_at, 733 + ) 734 + if not atproto_result: 735 + raise ValueError("PDS returned no record data") 736 + _, atproto_cid = atproto_result 737 + except Exception as e: 738 + logger.error("ATProto sync failed for upload %s: %s", ctx.upload_id, e) 739 + # only delete the row if it's still pending — on ambiguous failures 740 + # (timeouts, connection drops) Jetstream may have already finalized it 741 + deleted_pending = False 742 + with contextlib.suppress(Exception): 743 + async with db_session() as db: 744 + result = await db.execute( 745 + delete(Track).where( 746 + Track.id == track_id, Track.publish_state == "pending" 747 + ) 748 + ) 749 + await db.commit() 750 + deleted_pending = result.rowcount == 1 # type: ignore[union-attr] 751 + 752 + if deleted_pending: 753 + # row was still pending — safe to clean up media 754 + with contextlib.suppress(Exception): 755 + await storage.delete(sr.file_id, playable_file_type) 756 + if sr.original_file_id and sr.original_file_type: 757 + with contextlib.suppress(Exception): 758 + await storage.delete(sr.original_file_id, sr.original_file_type) 759 + if image_id: 760 + with contextlib.suppress(Exception): 761 + await storage.delete(image_id) 762 + # else: Jetstream finalized the row — media belongs to the published track 763 + 764 + raise UploadPhaseError(f"failed to sync track to ATProto: {e}") from e 765 + 766 + # step 3: atomic CAS update pending → published + deferred album linkage 767 + async with db_session() as db: 768 + # create album now that PDS write succeeded (avoids orphan albums on failure) 769 + album_record = None 770 + if ctx.album: 771 + artist_row = await db.execute( 772 + select(Artist).where(Artist.did == ctx.artist_did) 773 + ) 774 + artist_obj = artist_row.scalar_one() 775 + album_record = await get_or_create_album( 776 + db, artist_obj, ctx.album, image_id, image_url 777 + ) 778 + 779 + values: dict = { 780 + "atproto_record_cid": atproto_cid, 781 + "publish_state": "published", 782 + } 783 + if album_record: 784 + values["album_id"] = album_record.id 785 + 786 + result = await db.execute( 787 + update(Track) 788 + .where(Track.id == track_id, Track.publish_state == "pending") 789 + .values(**values) 790 + ) 791 + await db.commit() 792 + published_by_us = result.rowcount == 1 # type: ignore[union-attr] 793 + 794 + # reload the finalized track 795 + row = await db.execute(select(Track).where(Track.id == track_id)) 796 + track = row.scalar_one() 797 + 798 + return track, published_by_us 726 799 727 800 728 801 async def _schedule_post_upload( 729 - ctx: UploadContext, sr: StorageResult, track: Track 802 + ctx: UploadContext, 803 + sr: StorageResult, 804 + track: Track, 805 + *, 806 + run_hooks: bool = True, 730 807 ) -> None: 731 - """phase 7: post-upload tasks (tags, album sync, shared hooks).""" 808 + """phase 7: post-upload tasks (tags, album sync, shared hooks). 809 + 810 + run_hooks is False when Jetstream ingest already finalized the pending 811 + row and ran hooks before us (race condition — hooks only run once). 812 + """ 732 813 async with db_session() as db: 733 814 await add_tags_to_track(db, track.id, ctx.tags, ctx.artist_did) 734 815 ··· 739 820 740 821 async with db_session() as db: 741 822 await invalidate_album_cache_by_id(db, track.album_id) 823 + 824 + if not run_hooks: 825 + return 742 826 743 827 # shared post-creation hooks 744 828 is_integration_test = ( ··· 785 869 # phase 5: store image (optional) 786 870 image_id, image_url, thumbnail_url = await _store_image(ctx) 787 871 788 - # phase 6: create records (ATProto + DB) 789 - track = await _create_records( 872 + # phase 6: reserve DB row, create ATProto record, finalize 873 + track, published_by_us = await _create_records( 790 874 ctx, audio_info, sr, pds_result, image_id, image_url, thumbnail_url 791 875 ) 792 876 793 877 # phase 7: post-upload tasks (tags, album sync, shared hooks) 794 - await _schedule_post_upload(ctx, sr, track) 878 + await _schedule_post_upload(ctx, sr, track, run_hooks=published_by_us) 795 879 796 880 await job_service.update_progress( 797 881 ctx.upload_id,
+4
backend/src/backend/models/track.py
··· 84 84 pds_blob_cid: Mapped[str | None] = mapped_column(String, nullable=True) 85 85 pds_blob_size: Mapped[int | None] = mapped_column(Integer, nullable=True) 86 86 87 + # publish state for reserve-then-publish flow 88 + # None = legacy (treated as published), "pending" = PDS write pending, "published" = confirmed 89 + publish_state: Mapped[str | None] = mapped_column(String, nullable=True) 90 + 87 91 # track description (liner notes, show notes, etc.) 88 92 description: Mapped[str | None] = mapped_column(String, nullable=True) 89 93
+139
backend/tests/test_jetstream.py
··· 242 242 assert track.r2_url == "https://r2.example.com/js_file_001.mp3" 243 243 assert track.audio_storage == "r2" 244 244 assert track.extra.get("duration") == 180 245 + assert track.publish_state == "published" 245 246 246 247 async def test_dedup_by_uri( 247 248 self, db_session: AsyncSession, artist: Artist, track: Track ··· 453 454 call_audio_url = mock_hooks.call_args[1]["audio_url"] 454 455 # R2 URL preferred over PDS blob when both are available 455 456 assert call_audio_url == "https://r2.example.com/both_hook_001.mp3" 457 + 458 + 459 + class TestIngestPendingReconciliation: 460 + """tests for the reserve-then-publish race condition handling.""" 461 + 462 + async def test_finalize_pending_track( 463 + self, db_session: AsyncSession, artist: Artist 464 + ) -> None: 465 + """ingest finalizes a pending row reserved by the upload path.""" 466 + uri = f"at://{artist.did}/fm.plyr.track/pending1" 467 + 468 + # simulate upload path reserving a pending row 469 + pending_track = Track( 470 + title="Pending Track", 471 + file_id="pend_001", 472 + file_type="mp3", 473 + artist_did=artist.did, 474 + r2_url="https://r2.example.com/pend_001.mp3", 475 + atproto_record_uri=uri, 476 + atproto_record_cid=None, 477 + publish_state="pending", 478 + audio_storage="r2", 479 + ) 480 + db_session.add(pending_track) 481 + await db_session.commit() 482 + original_id = pending_track.id 483 + 484 + # ingest arrives with the same URI 485 + record = { 486 + "title": "Pending Track", 487 + "artist": "Test Artist", 488 + "fileId": "pend_001", 489 + "fileType": "mp3", 490 + "audioUrl": "https://r2.example.com/pend_001.mp3", 491 + "createdAt": _recent_ts(), 492 + } 493 + await ingest_track_create( 494 + did=artist.did, rkey="pending1", record=record, uri=uri, cid="bafyfinalized" 495 + ) 496 + 497 + # ingest commits in its own db_session — expire cached state 498 + db_session.expire_all() 499 + 500 + # should finalize the existing row, not create a new one 501 + result = await db_session.execute( 502 + select(Track).where(Track.atproto_record_uri == uri) 503 + ) 504 + tracks = result.scalars().all() 505 + assert len(tracks) == 1 506 + track = tracks[0] 507 + assert track.id == original_id 508 + assert track.publish_state == "published" 509 + assert track.atproto_record_cid == "bafyfinalized" 510 + 511 + async def test_finalize_pending_runs_hooks( 512 + self, db_session: AsyncSession, artist: Artist 513 + ) -> None: 514 + """finalizing a pending row runs post-creation hooks.""" 515 + uri = f"at://{artist.did}/fm.plyr.track/pendhook1" 516 + 517 + pending_track = Track( 518 + title="Pending Hook Track", 519 + file_id="pendhook_001", 520 + file_type="mp3", 521 + artist_did=artist.did, 522 + r2_url="https://r2.example.com/pendhook_001.mp3", 523 + atproto_record_uri=uri, 524 + atproto_record_cid=None, 525 + publish_state="pending", 526 + audio_storage="r2", 527 + ) 528 + db_session.add(pending_track) 529 + await db_session.commit() 530 + 531 + record = { 532 + "title": "Pending Hook Track", 533 + "artist": "Test Artist", 534 + "fileId": "pendhook_001", 535 + "fileType": "mp3", 536 + "audioUrl": "https://r2.example.com/pendhook_001.mp3", 537 + "createdAt": _recent_ts(), 538 + } 539 + 540 + with patch( 541 + "backend._internal.tasks.ingest.run_post_track_create_hooks", 542 + new_callable=AsyncMock, 543 + ) as mock_hooks: 544 + await ingest_track_create( 545 + did=artist.did, 546 + rkey="pendhook1", 547 + record=record, 548 + uri=uri, 549 + cid="bafypendhook", 550 + ) 551 + 552 + mock_hooks.assert_called_once() 553 + assert mock_hooks.call_args[0][0] == pending_track.id 554 + 555 + async def test_published_track_skips_ingest( 556 + self, db_session: AsyncSession, artist: Artist 557 + ) -> None: 558 + """already-published track is skipped (not re-finalized).""" 559 + uri = f"at://{artist.did}/fm.plyr.track/published1" 560 + 561 + published_track = Track( 562 + title="Published Track", 563 + file_id="pub_001", 564 + file_type="mp3", 565 + artist_did=artist.did, 566 + r2_url="https://r2.example.com/pub_001.mp3", 567 + atproto_record_uri=uri, 568 + atproto_record_cid="bafyoriginal", 569 + publish_state="published", 570 + audio_storage="r2", 571 + ) 572 + db_session.add(published_track) 573 + await db_session.commit() 574 + 575 + record = { 576 + "title": "Published Track", 577 + "artist": "Test Artist", 578 + "fileId": "pub_001", 579 + "fileType": "mp3", 580 + "audioUrl": "https://r2.example.com/pub_001.mp3", 581 + "createdAt": _recent_ts(), 582 + } 583 + await ingest_track_create( 584 + did=artist.did, rkey="published1", record=record, uri=uri, cid="bafynew" 585 + ) 586 + 587 + db_session.expire_all() 588 + 589 + # CID should NOT be overwritten 590 + result = await db_session.execute( 591 + select(Track).where(Track.atproto_record_uri == uri) 592 + ) 593 + track = result.scalar_one() 594 + assert track.atproto_record_cid == "bafyoriginal" 456 595 457 596 458 597 class TestIngestTrackDelete:
+3 -3
loq.toml
··· 47 47 48 48 [[rules]] 49 49 path = "backend/src/backend/api/tracks/uploads.py" 50 - max_lines = 1110 50 + max_lines = 1150 51 51 52 52 [[rules]] 53 53 path = "backend/src/backend/config.py" ··· 219 219 220 220 [[rules]] 221 221 path = "backend/src/backend/_internal/tasks/ingest.py" 222 - max_lines = 540 222 + max_lines = 570 223 223 224 224 [[rules]] 225 225 path = "backend/tests/test_jetstream.py" 226 - max_lines = 1130 226 + max_lines = 1270 227 227 228 228 [[rules]] 229 229 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte"