audio streaming app plyr.fm
37
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(uploads): stage audio + image to shared storage before enqueueing docket (#1336)

* fix(uploads): stage audio + image to shared storage before enqueueing docket

PR #1331 moved POST /tracks/ + PUT /tracks/{id}/audio onto docket
to fix a connection-pool problem, but mechanically forwarded the same
request-handler `/tmp/...` paths over Redis. on production fly.io,
`relay-api` runs multiple machines per process group; the docket worker
frequently lands on a different machine than the request handler. that
machine has its own /tmp, so the upload silently fails:
`FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpXXXX.wav'`.

evidence (prod, 2026-04-25 darkhart.bsky.social, 7 jobs):
4 failed at varied phases (`upload`, `pds_upload`, `atproto`) — all with
the same FileNotFoundError. the 3 that succeeded all hit the same
`atproto` phase. pure luck of which worker grabbed the job. the
successful tracks also had `image_id IS NULL` in `tracks` because
`_save_image_to_storage` reads `image_path` and silently swallows the
exception (returns `(None, None, None)` on failure). that's the
"cover art shows in the player bar but not on the track page" symptom.

shape of the fix:

HTTP handler:
1. stream client upload to a request-local temp file (size enforce)
2. extract duration once, while bytes are still local
3. `storage.save(file, filename)` -> audio_file_id
4. stream image to memory, `storage.save` -> image_id, image_url, thumb_url
5. delete request-local temp file
6. enqueue docket task with file_id / image_id / URLs ONLY

worker (`run_track_upload`, `run_track_audio_replace`):
- signatures take `audio_file_id`, never a `*_path`
- `_validate_audio` reads duration from the context (no I/O)
- `_store_audio` reuses the staged id directly for web-playable
formats; for lossless, downloads from storage, transcodes via a
worker-local /tmp (single-task, never crosses machine boundary),
saves transcoded result back to storage
- `_upload_to_pds` downloads bytes from storage when not transcoded
- `_store_image` is a no-op forward (URLs already resolved in handler)

this preserves PR #1331's connection-pool win (handler returns once
storage is durable + docket task is enqueued) and removes the
multi-machine fragility entirely.

- drops aiofiles use on this path; uses `storage.get_file_data`
- removes the temp-file cleanup in `_process_upload_background` —
there's nothing local to clean
- audio_replace handler also captures `support_gate` up front so the
staged bytes land in the right bucket (private vs public) before
the worker sees them

regression coverage:
the structural change (`UploadContext` no longer has `file_path`,
docket task signatures no longer have `*_path` args) is the contract.
existing tests (`test_upload_session_reload`, `test_upload_phases`,
`track_audio_replace/test_pipeline.py`) exercise the orchestrator
end-to-end through the new context shape and pass green (46 tests).

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

* fix(uploads): clean up staged storage on handler-side + pre-DB worker aborts

addresses three orphan-cleanup gaps reviewer flagged on the staging refactor:

1. **handler-side**: any abort between `stage_audio_to_storage` and a
successful schedule call left staged storage objects orphaned and
the job stuck in PROCESSING. wrap staging+enqueue in try/except;
on failure delete staged audio (private if gated, public otherwise)
and image, mark the job FAILED.

2. **replace orchestrator**: `new_file_id_for_rollback` was None until
`_store_audio` returned. the gated-FLAC path (handler stages new
bytes to private bucket → `_store_audio` raises "supporter-gated
tracks cannot use lossless formats yet") left those bytes stranded.
initialize from `ctx.audio_file_id` upfront, thread the playable-
file extension through `_rollback_new_files`. add `is_gated: bool`
to ReplaceContext (handler-time decision) so rollback selects the
bucket the bytes ACTUALLY live in even under a concurrent PATCH
that flips support_gate between request and worker.

3. **upload orchestrator**: phases 1-5 raise UploadPhaseError without
releasing staged bytes. add `_cleanup_staged_media_pre_db` and a
`db_row_owns_media` boundary flag — orchestrator cleans up only
before `_create_records`, deferring to its existing reserve-then-
publish cleanup past that. covers the transcoded-sibling case.

session-expired path on both workers also deletes the staged bytes
(no recovery without a fresh sign-in; orphans serve nothing).

regression tests:
- `tests/api/test_upload_storage_cleanup.py` (4 tests)
- `track_audio_replace/test_pipeline.py` (1 test):
early-abort rolls back staged file from the right bucket per
`ctx.is_gated`

370/370 tests pass locally; ruff + ty clean.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

* chore: drop stray backend/loq.toml

* chore(uploads): consolidate cleanup helper, drop redundant deferred import

once-over after CI green:

- removed redundant `from backend._internal import get_session` deferred
re-import inside `_process_upload_background` — the symbol is already
imported at module scope. updated `test_upload_session_reload` to
patch where the symbol is used (`backend.api.tracks.uploads.get_session`)
rather than where it's defined, which is the right pattern anyway.
- audio_replace's handler + session-expired path were inlining the
same `delete_gated if gated else delete` pattern that uploads exposes
as `_delete_staged_audio`. import + reuse instead of duplicating.

no behavior change; 370/370 tests pass.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4 (1M context)
and committed by
GitHub
275e2dce 1a1d8bb9

+838 -290
+137 -52
backend/src/backend/api/tracks/audio_replace.py
··· 63 63 UploadContext, 64 64 UploadPhaseError, 65 65 UploadStartResponse, 66 + _delete_staged_audio, 66 67 _store_audio, 67 68 _upload_to_pds, 68 69 _validate_audio, 70 + stage_audio_to_storage, 69 71 ) 70 72 from backend.config import settings 71 73 from backend.models import Track, TrackRevision 72 74 from backend.models.job import JobStatus, JobType 73 75 from backend.storage import storage 76 + from backend.utilities.audio import extract_duration 74 77 from backend.utilities.database import db_session 75 78 from backend.utilities.hashing import CHUNK_SIZE 76 79 from backend.utilities.rate_limit import limiter ··· 113 116 114 117 @dataclass 115 118 class ReplaceContext: 116 - """all data needed to process an audio replace in the background.""" 119 + """all data needed to process an audio replace in the background. 120 + 121 + new audio bytes are staged to shared object storage by the HTTP 122 + handler BEFORE this context is enqueued — the worker only ever 123 + sees stable storage identifiers. see `UploadContext` for the same 124 + invariant on the upload side. 125 + 126 + `is_gated` records the bucket the handler chose at staging time 127 + (private if the existing track is gated, public otherwise). the 128 + worker uses this for rollback so we delete from the bucket the 129 + bytes actually live in, not whichever the track row happens to 130 + say at orchestrator time (a concurrent PATCH could have flipped 131 + it between the request and the rollback). 132 + """ 117 133 118 134 job_id: str 119 135 auth_session: AuthSession 120 136 track_id: int 121 - file_path: str 137 + audio_file_id: str 122 138 filename: str 139 + duration: int | None 140 + is_gated: bool 123 141 124 142 125 143 async def _load_and_authorize( ··· 174 192 return UploadContext( 175 193 upload_id=ctx.job_id, 176 194 auth_session=ctx.auth_session, 177 - file_path=ctx.file_path, 195 + audio_file_id=ctx.audio_file_id, 178 196 filename=ctx.filename, 197 + duration=ctx.duration, 179 198 title=state.title, 180 199 artist_did=state.artist_did, 181 200 album=state.album, ··· 190 209 """compute the audioUrl field for the new ATProto record. 191 210 192 211 gated tracks point at the auth-protected backend endpoint; public tracks 193 - point at the R2 custom domain URL. 212 + point at the public storage URL. 194 213 """ 195 214 if state.support_gate is not None: 196 215 backend_url = settings.atproto.redirect_uri.rsplit("/", 2)[0] ··· 338 357 339 358 the pipeline is split into two halves around the DB swap: 340 359 341 - pre-commit (rollback applies — failure deletes the new R2 file): 360 + pre-commit (rollback applies — failure deletes the new playable file): 342 361 1. load + authorize 343 362 2. validate new bytes 344 - 3. store new bytes (R2 + transcode if needed) 363 + 3. store new bytes (transcode if needed; the staged file_id is the 364 + lossless original when transcoding, the playable file otherwise) 345 365 4. upload to PDS (best-effort) 346 366 5. publish updated ATProto record (PUT) 347 367 6. atomically swap the DB row 348 368 349 369 post-commit (NO rollback — the track is replaced; failures are logged): 350 - 7. delete old R2 file 351 - 8. fire post-replace hooks (rescan, re-embed, re-classify) 352 - 9. resync album list record 353 - 10. invalidate discovery cache 370 + 7. fire post-replace hooks (rescan, re-embed, re-classify) 371 + 8. resync album list record 372 + 9. invalidate discovery cache 354 373 355 374 the split is critical: once `_commit_db_swap` returns, the track row points 356 375 at the new file and the ATProto record is published. tearing down the new 357 - R2 object at that point would leave production looking at a broken track. 376 + storage object at that point would leave production looking at a broken 377 + track. 358 378 """ 359 - new_file_id_for_rollback: str | None = None 379 + # rollback target starts at the handler-staged file. without this 380 + # initializer, any abort before `_store_audio` returns (track gone, 381 + # gated-lossless rejection, transcode failure) would leak the bytes 382 + # the handler just wrote to storage. once `_store_audio` succeeds 383 + # the rollback target is updated to the playable file (and the 384 + # staged source becomes `original_file_id` for transcoded paths). 385 + new_file_id_for_rollback: str | None = ctx.audio_file_id 386 + new_file_type_for_rollback: str | None = ( 387 + Path(ctx.filename).suffix.lower().lstrip(".") or None 388 + ) 360 389 new_original_file_id_for_rollback: str | None = None 361 390 new_original_file_type_for_rollback: str | None = None 362 - rollback_gated: bool = False 391 + # bucket chosen by the handler at staging time — must match for rollback. 392 + rollback_gated: bool = ctx.is_gated 363 393 364 394 with logfire.span( 365 395 "process audio replace background", ··· 374 404 ) 375 405 376 406 state = await _load_and_authorize(ctx.track_id, ctx.auth_session) 377 - rollback_gated = state.support_gate is not None 378 407 phase_ctx = _build_upload_context_for_phases(ctx, state) 379 408 380 409 audio_info = await _validate_audio(phase_ctx) 381 410 sr = await _store_audio(phase_ctx, audio_info) 382 411 new_file_id_for_rollback = sr.file_id 412 + new_file_type_for_rollback = ( 413 + sr.playable_format.value if sr.playable_format else None 414 + ) 383 415 new_original_file_id_for_rollback = sr.original_file_id 384 416 new_original_file_type_for_rollback = sr.original_file_type 385 417 ··· 407 439 except UploadPhaseError as e: 408 440 await _rollback_new_files( 409 441 new_file_id_for_rollback, 442 + new_file_type_for_rollback, 410 443 new_original_file_id_for_rollback, 411 444 new_original_file_type_for_rollback, 412 445 gated=rollback_gated, ··· 414 447 await job_service.update_progress( 415 448 ctx.job_id, JobStatus.FAILED, "audio replace failed", error=e.error 416 449 ) 417 - _unlink_temp(ctx.file_path) 418 450 return 419 451 except Exception as e: 420 452 await _rollback_new_files( 421 453 new_file_id_for_rollback, 454 + new_file_type_for_rollback, 422 455 new_original_file_id_for_rollback, 423 456 new_original_file_type_for_rollback, 424 457 gated=rollback_gated, ··· 432 465 "audio replace failed", 433 466 error=f"unexpected error: {e!s}", 434 467 ) 435 - _unlink_temp(ctx.file_path) 436 468 return 437 469 438 470 # ---- post-commit (NO rollback — the swap is committed) ---- ··· 472 504 "atproto_cid": track.atproto_record_cid, 473 505 }, 474 506 ) 475 - _unlink_temp(ctx.file_path) 476 - 477 - 478 - def _unlink_temp(file_path: str) -> None: 479 - """remove the temp upload file (suppress all errors).""" 480 - with contextlib.suppress(Exception): 481 - Path(file_path).unlink(missing_ok=True) 482 507 483 508 484 509 async def _refresh_metadata_state(state: TrackAudioState) -> TrackAudioState: ··· 520 545 521 546 async def _rollback_new_files( 522 547 new_file_id: str | None, 548 + new_file_type: str | None, 523 549 new_original_file_id: str | None, 524 550 new_original_file_type: str | None, 525 551 *, 526 552 gated: bool, 527 553 ) -> None: 528 - """delete any new R2 object we wrote before discovering the operation must abort. 554 + """delete any new storage object we wrote before discovering the operation must abort. 529 555 530 556 `gated=True` routes the playable-file delete to the private bucket. transcode 531 557 originals always live in the public bucket (gated tracks can't be lossless), 532 558 so the original delete uses the public path unconditionally. 559 + 560 + `new_file_type` is the playable extension used to derive the storage 561 + key. it MUST be threaded through so the early-abort case (where the 562 + handler has only staged the source bytes and `_store_audio` hasn't 563 + yet established a playable format) deletes the right key. 533 564 """ 534 565 if new_file_id: 535 566 delete_fn = storage.delete_gated if gated else storage.delete 536 567 with contextlib.suppress(Exception): 537 - await delete_fn(new_file_id) 568 + await delete_fn(new_file_id, new_file_type) 538 569 if new_original_file_id: 539 570 with contextlib.suppress(Exception): 540 571 await storage.delete(new_original_file_id, new_original_file_type) ··· 548 579 session_id: str, 549 580 user_did: str, 550 581 track_id: int, 551 - file_path: str, 582 + audio_file_id: str, 552 583 filename: str, 584 + duration: int | None, 585 + is_gated: bool, 553 586 concurrency: ConcurrencyLimit = ConcurrencyLimit("user_did", max_concurrent=3), 554 587 ) -> None: 555 588 """docket task entry point for audio replace. 556 589 557 - takes primitive args (everything that survives Redis serialization), 558 - rehydrates the auth session from the stored session_id, constructs a 559 - ReplaceContext, and delegates to the phase orchestrator 560 - (`_process_replace_background`). this is the function registered with 561 - docket; the HTTP handler enqueues it via `schedule_track_audio_replace`. 590 + takes primitive args + a shared-storage `audio_file_id` (the new 591 + bytes are staged to storage by the HTTP handler before this task is 592 + enqueued). rehydrates the auth session from the stored session_id, 593 + constructs a ReplaceContext, and delegates to the phase orchestrator 594 + (`_process_replace_background`). 595 + 596 + **never accept filesystem paths here** — see the matching note on 597 + `run_track_upload`. workers may run on a different fly machine than 598 + the request handler. 562 599 563 600 `ConcurrencyLimit("user_did", max_concurrent=3)` caps concurrent 564 601 replaces per user's DID at 3 — same as the upload task. prevents a ··· 567 604 """ 568 605 auth_session = await get_session(session_id) 569 606 if auth_session is None: 607 + # session is gone and the replace can't proceed; clean up the 608 + # staged bytes rather than leaving a durable orphan in storage. 609 + await _delete_staged_audio( 610 + audio_file_id, 611 + Path(filename).suffix.lower().lstrip(".") or None, 612 + gated=is_gated, 613 + ) 570 614 await job_service.update_progress( 571 615 job_id, 572 616 JobStatus.FAILED, 573 617 "audio replace failed", 574 618 error="authentication session expired before processing could begin", 575 619 ) 576 - _unlink_temp(file_path) 577 620 return 578 621 579 622 ctx = ReplaceContext( 580 623 job_id=job_id, 581 624 auth_session=auth_session, 582 625 track_id=track_id, 583 - file_path=file_path, 626 + audio_file_id=audio_file_id, 584 627 filename=filename, 628 + duration=duration, 629 + is_gated=is_gated, 585 630 ) 586 631 await _process_replace_background(ctx) 587 632 ··· 589 634 async def schedule_track_audio_replace(ctx: ReplaceContext) -> None: 590 635 """enqueue an audio replace as a docket task. 591 636 592 - the HTTP handler should return to the client as soon as this call 593 - resolves; the actual R2/PDS/ATProto work runs on a docket worker with 594 - bounded concurrency so concurrent replaces can't saturate the DB pool. 637 + by contract this function only forwards small primitives + storage 638 + identifiers — never local filesystem paths. the HTTP handler should 639 + return to the client as soon as this call resolves; transcode / PDS / 640 + ATProto work runs on a docket worker with bounded concurrency so 641 + concurrent replaces can't saturate the DB pool. 595 642 """ 596 643 docket = get_docket() 597 644 await docket.add(run_track_audio_replace)( ··· 599 646 session_id=ctx.auth_session.session_id, 600 647 user_did=ctx.auth_session.did, 601 648 track_id=ctx.track_id, 602 - file_path=ctx.file_path, 649 + audio_file_id=ctx.audio_file_id, 603 650 filename=ctx.filename, 651 + duration=ctx.duration, 652 + is_gated=ctx.is_gated, 604 653 ) 605 654 606 655 ··· 639 688 ) 640 689 del audio_format # validated; the background pipeline re-derives it 641 690 642 - # cheap pre-check so we 404/403 before streaming a multi-MB body to disk 691 + # cheap pre-check so we 404/403 before streaming a multi-MB body to disk. 692 + # also captures the track's gating state — needed up here so the staged 693 + # bytes land in the right bucket (private for gated, public otherwise). 643 694 async with db_session() as db: 644 695 result = await db.execute( 645 - select(Track.artist_did, Track.atproto_record_uri).where( 646 - Track.id == track_id 647 - ) 696 + select( 697 + Track.artist_did, Track.atproto_record_uri, Track.support_gate 698 + ).where(Track.id == track_id) 648 699 ) 649 700 row = result.first() 650 701 if not row: 651 702 raise HTTPException(status_code=404, detail="track not found") 652 - artist_did, atproto_uri = row 703 + artist_did, atproto_uri, support_gate = row 653 704 if artist_did != auth_session.did: 654 705 raise HTTPException( 655 706 status_code=403, ··· 663 714 "replacing audio" 664 715 ), 665 716 ) 717 + is_gated = support_gate is not None 666 718 667 - # stream the body to a temp file (constant memory, enforce max size) 719 + # stage the new audio bytes to shared object storage BEFORE enqueueing 720 + # the docket task. workers may pick up the task on a different fly 721 + # machine than this handler — only stable storage identifiers should 722 + # cross that boundary. 723 + # 724 + # the handler creates a durable storage object before the worker has 725 + # taken ownership, so any abort between staging and a successful 726 + # enqueue must roll the object back AND mark the job FAILED — otherwise 727 + # we'd leak bytes and leave the job stuck in PROCESSING forever. 728 + job_id = await job_service.create_job( 729 + JobType.UPLOAD, 730 + auth_session.did, 731 + "audio replace queued for processing", 732 + ) 733 + audio_extension = ext.lstrip(".") or None 734 + 668 735 file_path: str | None = None 736 + audio_file_id: str | None = None 737 + enqueued = False 669 738 try: 670 739 max_size = settings.storage.max_upload_size_mb * 1024 * 1024 671 740 bytes_read = 0 ··· 674 743 while chunk := await file.read(CHUNK_SIZE): 675 744 bytes_read += len(chunk) 676 745 if bytes_read > max_size: 677 - tmp.close() 678 - Path(file_path).unlink(missing_ok=True) 679 746 raise HTTPException( 680 747 status_code=413, 681 748 detail=( ··· 685 752 ) 686 753 tmp.write(chunk) 687 754 688 - job_id = await job_service.create_job( 689 - JobType.UPLOAD, 690 - auth_session.did, 691 - "audio replace queued for processing", 692 - ) 755 + with open(file_path, "rb") as f: 756 + duration = extract_duration(f) 757 + 758 + with open(file_path, "rb") as f: 759 + audio_file_id = await stage_audio_to_storage( 760 + job_id, f, file.filename, gated=is_gated 761 + ) 693 762 694 763 await schedule_track_audio_replace( 695 764 ReplaceContext( 696 765 job_id=job_id, 697 766 auth_session=auth_session, 698 767 track_id=track_id, 699 - file_path=file_path, 768 + audio_file_id=audio_file_id, 700 769 filename=file.filename, 770 + duration=duration, 771 + is_gated=is_gated, 701 772 ), 702 773 ) 774 + enqueued = True 703 775 except Exception: 776 + if not enqueued: 777 + if audio_file_id: 778 + await _delete_staged_audio( 779 + audio_file_id, audio_extension, gated=is_gated 780 + ) 781 + with contextlib.suppress(Exception): 782 + await job_service.update_progress( 783 + job_id, 784 + JobStatus.FAILED, 785 + "audio replace failed", 786 + error="audio replace aborted before queueing", 787 + ) 788 + raise 789 + finally: 704 790 if file_path: 705 791 with contextlib.suppress(Exception): 706 792 Path(file_path).unlink(missing_ok=True) 707 - raise 708 793 709 794 return UploadStartResponse( 710 795 upload_id=job_id,
+372 -222
backend/src/backend/api/tracks/uploads.py
··· 8 8 from dataclasses import dataclass 9 9 from io import BytesIO 10 10 from pathlib import Path 11 - from typing import Annotated, Any 11 + from typing import Annotated, Any, BinaryIO 12 12 13 - import aiofiles 14 13 import logfire 15 14 from docket import ConcurrencyLimit 16 15 from fastapi import ( ··· 73 72 74 73 @dataclass 75 74 class UploadContext: 76 - """all data needed to process an upload in the background.""" 75 + """all data needed to process an upload in the background. 76 + 77 + audio + image bytes are staged to shared object storage by the HTTP 78 + handler BEFORE this context is enqueued. only stable shared-storage 79 + identifiers (file_id / image_id / URLs) and small primitives travel 80 + through the docket queue. no local filesystem paths cross the 81 + request → worker boundary, because a docket worker may pick up the 82 + task on a different fly machine than the one that handled the 83 + request — that machine has its own /tmp. 84 + """ 77 85 78 86 upload_id: str 79 87 auth_session: AuthSession 80 88 81 - # audio file 82 - file_path: str 89 + # audio (already in shared storage as `audio_file_id`; extension lives in `filename`) 90 + audio_file_id: str 83 91 filename: str 92 + duration: int | None 84 93 85 94 # track metadata 86 95 title: str ··· 90 99 features_json: str | None 91 100 tags: list[str] 92 101 93 - # optional image 94 - image_path: str | None = None 95 - image_filename: str | None = None 96 - image_content_type: str | None = None 102 + # optional image (already in shared storage as `image_id`, plus computed URLs) 103 + image_id: str | None = None 104 + image_url: str | None = None 105 + thumbnail_url: str | None = None 97 106 98 107 # track description (liner notes, show notes, etc.) 99 108 description: str | None = None ··· 106 115 107 116 # visibility: unlisted tracks don't appear in discovery feeds 108 117 unlisted: bool = False 118 + 119 + @property 120 + def audio_extension(self) -> str: 121 + """source-format extension, normalized (lowercase, no leading dot).""" 122 + return Path(self.filename).suffix.lower().lstrip(".") 109 123 110 124 111 125 @dataclass ··· 137 151 super().__init__(error) 138 152 139 153 140 - async def _save_audio_to_storage( 154 + async def stage_audio_to_storage( 141 155 upload_id: str, 142 - file_path: str, 156 + file: BinaryIO | BytesIO, 143 157 filename: str, 144 158 *, 145 159 gated: bool = False, 146 - ) -> str | None: 147 - """save audio file to storage, returning file_id or None on failure. 160 + ) -> str: 161 + """save staged audio bytes to object storage and return the resulting file_id. 162 + 163 + called from the HTTP handler BEFORE the docket task is enqueued. the 164 + returned file_id is what travels over Redis to the worker — the local 165 + temp file is discarded as soon as this function returns. 166 + 167 + raises on failure; the handler catches and surfaces a 5xx to the 168 + client, since the upload didn't durably land in storage. 148 169 149 170 args: 150 171 upload_id: job tracking ID 151 - file_path: path to temp file 152 - filename: original filename 172 + file: binary stream positioned at the start of the audio bytes 173 + filename: original filename (extension determines bucket + media_type) 153 174 gated: if True, save to private bucket (no public URL) 154 175 """ 155 176 message = "uploading to private storage..." if gated else "uploading to storage..." ··· 160 181 phase="upload", 161 182 progress_pct=0.0, 162 183 ) 163 - try: 164 - async with R2ProgressTracker( 165 - job_id=upload_id, 166 - message=message, 167 - phase="upload", 168 - ) as tracker: 169 - with open(file_path, "rb") as file_obj: 170 - if gated: 171 - file_id = await storage.save_gated( 172 - file_obj, filename, progress_callback=tracker.on_progress 173 - ) 174 - else: 175 - file_id = await storage.save( 176 - file_obj, filename, progress_callback=tracker.on_progress 177 - ) 184 + async with R2ProgressTracker( 185 + job_id=upload_id, 186 + message=message, 187 + phase="upload", 188 + ) as tracker: 189 + if gated: 190 + file_id = await storage.save_gated( 191 + file, filename, progress_callback=tracker.on_progress 192 + ) 193 + else: 194 + file_id = await storage.save( 195 + file, filename, progress_callback=tracker.on_progress 196 + ) 178 197 179 - await job_service.update_progress( 180 - upload_id, 181 - JobStatus.PROCESSING, 182 - message, 183 - phase="upload", 184 - progress_pct=100.0, 185 - ) 186 - logfire.info("storage.save completed", file_id=file_id, gated=gated) 187 - return file_id 198 + await job_service.update_progress( 199 + upload_id, 200 + JobStatus.PROCESSING, 201 + message, 202 + phase="upload", 203 + progress_pct=100.0, 204 + ) 205 + logfire.info("audio staged to storage", file_id=file_id, gated=gated) 206 + return file_id 188 207 189 - except Exception as e: 190 - logfire.error("storage.save failed", error=str(e), exc_info=True) 191 - await job_service.update_progress( 192 - upload_id, JobStatus.FAILED, "upload failed", error=str(e) 193 - ) 194 - return None 195 208 196 - 197 - async def _save_image_to_storage( 198 - upload_id: str, 199 - image_path: str, 209 + async def stage_image_to_storage( 210 + image_data: bytes, 200 211 image_filename: str, 201 212 image_content_type: str | None, 202 213 ) -> tuple[str | None, str | None, str | None]: 203 - """save image to storage, returning (image_id, image_url, thumbnail_url) or (None, None, None).""" 204 - await job_service.update_progress( 205 - upload_id, 206 - JobStatus.PROCESSING, 207 - "saving image...", 208 - phase="image", 209 - ) 214 + """save image bytes + thumbnail to object storage and return (image_id, image_url, thumbnail_url). 215 + 216 + called from the HTTP handler. returns (None, None, None) if the image 217 + format is unsupported or the save fails — the upload itself still 218 + proceeds without an image rather than failing the whole track. 219 + """ 210 220 image_format, is_valid = ImageFormat.validate_and_extract( 211 221 image_filename, image_content_type 212 222 ) ··· 215 225 return None, None, None 216 226 217 227 try: 218 - with open(image_path, "rb") as image_obj: 219 - image_data = image_obj.read() 220 - 221 228 image_id = await storage.save(BytesIO(image_data), f"images/{image_filename}") 222 229 image_url = await storage.get_url(image_id, file_type="image") 223 230 thumbnail_url = await generate_and_save(image_data, image_id, "track") 224 - 225 231 return image_id, image_url, thumbnail_url 226 232 except Exception as e: 227 233 logger.warning(f"failed to save image: {e}", exc_info=True) ··· 336 342 337 343 async def _transcode_audio( 338 344 upload_id: str, 339 - file_path: str, 345 + original_file_id: str, 340 346 filename: str, 341 347 source_format: str, 342 348 ) -> TranscodeInfo | None: 343 - """transcode audio file to web-playable format. 349 + """transcode an already-staged audio file to a web-playable format. 344 350 345 - saves original to storage first, then transcodes. returns None on failure 346 - (job status already updated with error). 351 + `original_file_id` points at the lossless source bytes already in 352 + object storage (the HTTP handler put them there before enqueueing 353 + the docket task). we download those bytes, hand them to the 354 + transcoder service via a worker-local temp file, and save the 355 + transcoded result back. returns None on failure (job status already 356 + updated with error). 347 357 348 358 args: 349 359 upload_id: job tracking ID 350 - file_path: path to temp file 351 - filename: original filename 360 + original_file_id: storage file_id for the lossless source bytes 361 + filename: original filename (used to derive transcoded filename) 352 362 source_format: source format (e.g., "aiff", "flac") 353 363 354 364 returns: ··· 364 374 ) 365 375 return None 366 376 367 - # save original file first 368 - await job_service.update_progress( 369 - upload_id, 370 - JobStatus.PROCESSING, 371 - "saving original file...", 372 - phase="upload_original", 373 - progress_pct=0.0, 374 - ) 375 - 376 - try: 377 - async with R2ProgressTracker( 378 - job_id=upload_id, 379 - message="saving original file...", 380 - phase="upload_original", 381 - ) as tracker: 382 - with open(file_path, "rb") as file_obj: 383 - original_file_id = await storage.save( 384 - file_obj, filename, progress_callback=tracker.on_progress 385 - ) 386 - except Exception as e: 387 - logfire.error("failed to save original file", error=str(e), exc_info=True) 377 + # fetch the lossless source bytes from storage 378 + source_data = await storage.get_file_data(original_file_id, source_format) 379 + if not source_data: 380 + logfire.error( 381 + "transcode aborted: source file missing from storage", 382 + file_id=original_file_id, 383 + format=source_format, 384 + ) 388 385 await job_service.update_progress( 389 - upload_id, JobStatus.FAILED, "upload failed", error=str(e) 386 + upload_id, 387 + JobStatus.FAILED, 388 + "upload failed", 389 + error="staged audio file missing from storage", 390 390 ) 391 391 return None 392 392 393 - logfire.info("original file saved", file_id=original_file_id, format=source_format) 393 + logfire.info( 394 + "loaded source bytes for transcode", 395 + file_id=original_file_id, 396 + format=source_format, 397 + size_bytes=len(source_data), 398 + ) 394 399 395 - # transcode to web-playable format (streams file to service, no memory load) 400 + # transcode to web-playable format. the transcoder client streams from a 401 + # file path; spool the bytes to a worker-local temp file. this temp file 402 + # is created and deleted entirely on this worker — it never crosses the 403 + # request → worker boundary, so the multi-machine fly setup is fine. 396 404 await job_service.update_progress( 397 405 upload_id, 398 406 JobStatus.PROCESSING, ··· 401 409 progress_pct=0.0, 402 410 ) 403 411 412 + spool_path: str | None = None 404 413 try: 414 + with tempfile.NamedTemporaryFile( 415 + delete=False, suffix=f".{source_format}" 416 + ) as spool: 417 + spool_path = spool.name 418 + spool.write(source_data) 419 + 405 420 client = get_transcoder_client() 406 - result = await client.transcode_file(file_path, source_format) 421 + result = await client.transcode_file(spool_path, source_format) 407 422 408 423 if not result.success or not result.data: 409 424 await job_service.update_progress( ··· 412 427 "upload failed", 413 428 error=f"transcoding failed: {result.error}", 414 429 ) 415 - # cleanup original 416 - with contextlib.suppress(Exception): 417 - await storage.delete(original_file_id, source_format) 418 430 return None 419 431 420 432 except Exception as e: ··· 425 437 "upload failed", 426 438 error=f"transcoding error: {e}", 427 439 ) 428 - # cleanup original 429 - with contextlib.suppress(Exception): 430 - await storage.delete(original_file_id, source_format) 431 440 return None 441 + finally: 442 + if spool_path: 443 + with contextlib.suppress(Exception): 444 + Path(spool_path).unlink(missing_ok=True) 432 445 433 446 # save transcoded file 434 447 target_format = settings.transcoder.target_format 435 448 transcoded_filename = Path(filename).stem + f".{target_format}" 436 449 437 450 try: 438 - import io 439 - 440 451 async with R2ProgressTracker( 441 452 job_id=upload_id, 442 453 message="saving transcoded file...", 443 454 phase="upload_transcoded", 444 455 ) as tracker: 445 456 transcoded_file_id = await storage.save( 446 - io.BytesIO(result.data), 457 + BytesIO(result.data), 447 458 transcoded_filename, 448 459 progress_callback=tracker.on_progress, 449 460 ) ··· 452 463 await job_service.update_progress( 453 464 upload_id, JobStatus.FAILED, "upload failed", error=str(e) 454 465 ) 455 - # cleanup original 456 - with contextlib.suppress(Exception): 457 - await storage.delete(original_file_id, source_format) 458 466 return None 459 467 460 468 logfire.info( ··· 473 481 474 482 475 483 async def _validate_audio(ctx: UploadContext) -> AudioInfo: 476 - """phase 1: validate file type, extract duration, check gating requirements.""" 477 - ext = Path(ctx.filename).suffix.lower() 478 - audio_format = AudioFormat.from_extension(ext) 479 - if not audio_format: 480 - raise UploadPhaseError(f"unsupported file type: {ext}") 484 + """phase 1: validate file type, propagate handler-extracted duration, check gating. 481 485 482 - with open(ctx.file_path, "rb") as f: 483 - duration = extract_duration(f) 486 + duration is extracted in the HTTP handler (where the bytes are already 487 + in memory or on local /tmp), then carried through `ctx.duration`. the 488 + worker doesn't need to re-fetch the audio bytes just to read length 489 + metadata. format validation here is the cheap extension check; the 490 + handler already rejected unknown extensions before staging. 491 + """ 492 + audio_format = AudioFormat.from_extension(f".{ctx.audio_extension}") 493 + if not audio_format: 494 + raise UploadPhaseError(f"unsupported file type: .{ctx.audio_extension}") 484 495 485 496 is_gated = ctx.support_gate is not None 486 497 if is_gated: ··· 494 505 "supporter gating requires atprotofans to be enabled in settings" 495 506 ) 496 507 497 - return AudioInfo(format=audio_format, duration=duration, is_gated=is_gated) 508 + return AudioInfo(format=audio_format, duration=ctx.duration, is_gated=is_gated) 498 509 499 510 500 511 async def _store_audio(ctx: UploadContext, audio_info: AudioInfo) -> StorageResult: 501 - """phase 2: store audio (transcode if lossless).""" 512 + """phase 2: settle on the playable file_id, transcoding if the staged audio is lossless. 513 + 514 + the staged file_id (already in storage from the HTTP handler) is the 515 + starting point. for web-playable formats we use it directly. for 516 + lossless formats we keep the staged file as the `original_file_id` 517 + and produce a transcoded sibling. 518 + """ 502 519 transcode_info: TranscodeInfo | None = None 503 520 504 521 if not audio_info.format.is_web_playable: ··· 507 524 "supporter-gated tracks cannot use lossless formats yet" 508 525 ) 509 526 510 - original_ext = Path(ctx.filename).suffix.lower().lstrip(".") 527 + # the handler-staged file_id IS the lossless original. transcoding 528 + # downloads it from storage, produces the playable sibling, and 529 + # registers the staged id as `original_file_id`. 511 530 transcode_info = await _transcode_audio( 512 - ctx.upload_id, ctx.file_path, ctx.filename, original_ext 531 + ctx.upload_id, ctx.audio_file_id, ctx.filename, ctx.audio_extension 513 532 ) 514 533 if not transcode_info: 515 534 raise UploadPhaseError("transcoding failed") ··· 521 540 if not playable_format: 522 541 raise UploadPhaseError("unknown transcoded format") 523 542 else: 524 - file_id = await _save_audio_to_storage( 525 - ctx.upload_id, ctx.file_path, ctx.filename, gated=audio_info.is_gated 526 - ) 527 - if not file_id: 528 - raise UploadPhaseError("failed to save audio to storage") 543 + # web-playable: staged file_id is already the playable file. for 544 + # gated tracks we still need it in the private bucket — gating 545 + # is decided at the handler boundary, so the staged file already 546 + # lives in the right place. 547 + file_id = ctx.audio_file_id 529 548 playable_format = audio_info.format 530 549 transcode_info = None 531 550 532 - # get R2 URL (only for public tracks) 551 + # public-bucket URL (gated tracks proxy through the auth-protected backend) 533 552 r2_url: str | None = None 534 553 if not audio_info.is_gated: 535 - ext = Path(ctx.filename).suffix.lower() 536 - playable_ext = playable_format.value if playable_format else ext[1:] 554 + playable_ext = playable_format.value if playable_format else ctx.audio_extension 537 555 r2_url = await storage.get_url( 538 556 file_id, file_type="audio", extension=playable_ext 539 557 ) ··· 570 588 async def _upload_to_pds( 571 589 ctx: UploadContext, audio_info: AudioInfo, sr: StorageResult 572 590 ) -> PdsBlobResult | None: 573 - """phase 4: upload to PDS (best-effort). returns None if skipped.""" 591 + """phase 4: upload to PDS (best-effort). returns None if skipped. 592 + 593 + when the source was transcoded, the playable bytes are already in 594 + memory (`sr.transcode_info.transcoded_data`). otherwise we download 595 + the playable bytes from storage — there's no machine-local temp file 596 + to read from in the worker. 597 + """ 574 598 if audio_info.is_gated: 575 599 return None 576 600 ··· 583 607 if sr.transcode_info: 584 608 pds_file_data = sr.transcode_info.transcoded_data 585 609 else: 586 - async with aiofiles.open(ctx.file_path, "rb") as f: 587 - pds_file_data = await f.read() 610 + playable_ext = ( 611 + sr.playable_format.value if sr.playable_format else ctx.audio_extension 612 + ) 613 + fetched = await storage.get_file_data(sr.file_id, playable_ext) 614 + if fetched is None: 615 + logfire.warning( 616 + "pds blob upload skipped: file not found in storage", 617 + file_id=sr.file_id, 618 + file_type=playable_ext, 619 + ) 620 + return None 621 + pds_file_data = fetched 588 622 589 623 return await _try_upload_to_pds( 590 624 ctx.upload_id, ctx.auth_session, pds_file_data, content_type ··· 594 628 async def _store_image( 595 629 ctx: UploadContext, 596 630 ) -> tuple[str | None, str | None, str | None]: 597 - """phase 5: store image (optional). returns (image_id, image_url, thumbnail_url).""" 598 - if not ctx.image_path or not ctx.image_filename: 599 - return None, None, None 600 - return await _save_image_to_storage( 601 - ctx.upload_id, ctx.image_path, ctx.image_filename, ctx.image_content_type 602 - ) 631 + """phase 5: surface the staged image URLs (no I/O — bytes already in storage). 632 + 633 + the HTTP handler stages the image to storage and computes both the 634 + image URL and the thumbnail URL before enqueueing the docket task. 635 + by the time the worker reaches this phase, there is nothing to do but 636 + forward those identifiers into the ATProto record. 637 + """ 638 + return ctx.image_id, ctx.image_url, ctx.thumbnail_url 603 639 604 640 605 641 async def _create_records( ··· 902 938 ) 903 939 904 940 941 + async def _delete_staged_audio( 942 + file_id: str, file_type: str | None, *, gated: bool 943 + ) -> None: 944 + """suppressed delete for audio bytes the user uploaded to a bucket 945 + chosen at handler-staging time. `gated` selects the bucket so we 946 + don't no-op-delete from public when the file actually lives in 947 + private (or vice versa). 948 + """ 949 + delete_fn = storage.delete_gated if gated else storage.delete 950 + with contextlib.suppress(Exception): 951 + await delete_fn(file_id, file_type) 952 + 953 + 954 + async def _cleanup_staged_media_pre_db( 955 + ctx: UploadContext, sr: StorageResult | None 956 + ) -> None: 957 + """delete storage objects staged for an upload that aborts BEFORE 958 + `_create_records` reserves a DB row. once a row exists the row owns 959 + the media — `_create_records` has its own pending/finalized 960 + cleanup logic; the orchestrator must not run this past that 961 + boundary or it'll yank media out from under a committed row. 962 + """ 963 + is_gated = ctx.support_gate is not None 964 + 965 + if sr is not None and sr.transcode_info is not None: 966 + # transcode produced a new sibling. the playable sibling lives 967 + # in the public bucket (gated lossless is rejected upstream), 968 + # and the staged source is now `original_file_id` (also public). 969 + playable_ext = sr.playable_format.value if sr.playable_format else None 970 + with contextlib.suppress(Exception): 971 + await storage.delete(sr.file_id, playable_ext) 972 + if sr.original_file_id: 973 + with contextlib.suppress(Exception): 974 + await storage.delete(sr.original_file_id, sr.original_file_type) 975 + else: 976 + # `_store_audio` either didn't run, or returned the staged file 977 + # as-is (web-playable). either way, only ctx.audio_file_id is 978 + # in storage, in the bucket the handler chose. 979 + await _delete_staged_audio( 980 + ctx.audio_file_id, ctx.audio_extension, gated=is_gated 981 + ) 982 + 983 + if ctx.image_id: 984 + with contextlib.suppress(Exception): 985 + await storage.delete(ctx.image_id) 986 + 987 + 905 988 async def _process_upload_background(ctx: UploadContext) -> None: 906 - """orchestrate the upload pipeline through named phases.""" 989 + """orchestrate the upload pipeline through named phases. 990 + 991 + cleanup discipline: the HTTP handler stages audio + image to shared 992 + storage before enqueueing this task, so by the time we get here 993 + those objects are durable and orphan-able. failures in phases 1-5 994 + delete the staged objects (no DB row exists yet); failures from 995 + phase 6 onward defer to `_create_records`'s reserve-then-publish 996 + cleanup, since once the row is committed the row owns the media. 997 + """ 998 + sr: StorageResult | None = None 999 + db_row_owns_media = False 1000 + 907 1001 with logfire.span( 908 1002 "process upload background", upload_id=ctx.upload_id, filename=ctx.filename 909 1003 ): ··· 925 1019 pds_result = await _upload_to_pds(ctx, audio_info, sr) 926 1020 927 1021 # reload session in case PDS upload refreshed the token 928 - if pds_result: 929 - from backend._internal import get_session 930 - 931 - if refreshed := await get_session(ctx.auth_session.session_id): 932 - ctx.auth_session = refreshed 1022 + if pds_result and ( 1023 + refreshed := await get_session(ctx.auth_session.session_id) 1024 + ): 1025 + ctx.auth_session = refreshed 933 1026 934 1027 # phase 5: store image (optional) 935 1028 image_id, image_url, thumbnail_url = await _store_image(ctx) 936 1029 937 - # phase 6: reserve DB row, create ATProto record, finalize 1030 + # phase 6: reserve DB row, create ATProto record, finalize. 1031 + # past this boundary, _create_records owns the media via the 1032 + # reserve-then-publish flow (it deletes staged objects on 1033 + # ATProto failure when the pending row was still ours, and 1034 + # leaves them when Jetstream finalized). the orchestrator 1035 + # must not re-delete from here on. 1036 + db_row_owns_media = True 938 1037 track, published_by_us = await _create_records( 939 1038 ctx, audio_info, sr, pds_result, image_id, image_url, thumbnail_url 940 1039 ) ··· 958 1057 ) 959 1058 960 1059 except UploadPhaseError as e: 1060 + if not db_row_owns_media: 1061 + await _cleanup_staged_media_pre_db(ctx, sr) 961 1062 await job_service.update_progress( 962 1063 ctx.upload_id, JobStatus.FAILED, "upload failed", error=e.error 963 1064 ) 964 1065 except Exception as e: 965 1066 logger.exception(f"upload {ctx.upload_id} failed with unexpected error") 1067 + if not db_row_owns_media: 1068 + await _cleanup_staged_media_pre_db(ctx, sr) 966 1069 await job_service.update_progress( 967 1070 ctx.upload_id, 968 1071 JobStatus.FAILED, 969 1072 "upload failed", 970 1073 error=f"unexpected error: {e!s}", 971 1074 ) 972 - finally: 973 - # cleanup temp files 974 - with contextlib.suppress(Exception): 975 - Path(ctx.file_path).unlink(missing_ok=True) 976 - if ctx.image_path: 977 - with contextlib.suppress(Exception): 978 - Path(ctx.image_path).unlink(missing_ok=True) 1075 + 1076 + # NB: no temp-file cleanup here. the worker never receives a 1077 + # filesystem path — audio + image are staged to shared object 1078 + # storage by the HTTP handler, and identifiers are what travel 1079 + # over the docket queue. cleanup of the handler's request-local 1080 + # temp file lives in the handler's `try/finally` instead. 979 1081 980 1082 981 1083 async def run_track_upload( 982 1084 upload_id: str, 983 1085 session_id: str, 984 - file_path: str, 1086 + audio_file_id: str, 985 1087 filename: str, 1088 + duration: int | None, 986 1089 title: str, 987 1090 artist_did: str, 988 1091 album: str | None, ··· 990 1093 features_json: str | None, 991 1094 tags: list[str], 992 1095 description: str | None, 993 - image_path: str | None, 994 - image_filename: str | None, 995 - image_content_type: str | None, 1096 + image_id: str | None, 1097 + image_url: str | None, 1098 + thumbnail_url: str | None, 996 1099 support_gate: dict | None, 997 1100 auto_tag: bool, 998 1101 unlisted: bool, ··· 1000 1103 ) -> None: 1001 1104 """docket task entry point for track uploads. 1002 1105 1003 - takes primitive args (everything that survives Redis serialization), 1106 + takes primitive args + shared-storage identifiers (file_id, image_id — 1107 + everything that survives Redis serialization and is reachable from 1108 + any worker, regardless of which fly machine the request landed on). 1004 1109 rehydrates the auth session from the stored session_id, constructs an 1005 1110 UploadContext, and delegates to the phase orchestrator 1006 - (`_process_upload_background`). this is the function registered with 1007 - docket; the HTTP handler enqueues it via `schedule_track_upload`. 1111 + (`_process_upload_background`). 1112 + 1113 + **never accept filesystem paths here.** prior to 2026-04 we passed 1114 + `/tmp/...` paths through this signature; uploads silently failed 1115 + when the docket worker landed on a different machine than the request 1116 + handler (different /tmp). the handler now stages audio + image to 1117 + shared storage before enqueueing, and we work from those identifiers. 1008 1118 1009 1119 rehydrating the session at task start rather than passing the cached 1010 1120 AuthSession over the wire means we pick up any token refresh that ··· 1022 1132 auth_session = await get_session(session_id) 1023 1133 if auth_session is None: 1024 1134 # session expired or was revoked between HTTP request and task start. 1025 - # no way to publish to the user's PDS without it — fail the job and 1026 - # clean up the temp files we staged. 1135 + # the upload can't proceed (no PDS to publish to) and won't recover 1136 + # without a fresh sign-in, so clean up the staged storage objects 1137 + # rather than leaving them as durable orphans. 1138 + is_gated = support_gate is not None 1139 + await _delete_staged_audio( 1140 + audio_file_id, 1141 + Path(filename).suffix.lower().lstrip(".") or None, 1142 + gated=is_gated, 1143 + ) 1144 + if image_id: 1145 + with contextlib.suppress(Exception): 1146 + await storage.delete(image_id) 1027 1147 await job_service.update_progress( 1028 1148 upload_id, 1029 1149 JobStatus.FAILED, 1030 1150 "upload failed", 1031 1151 error="authentication session expired before processing could begin", 1032 1152 ) 1033 - with contextlib.suppress(Exception): 1034 - Path(file_path).unlink(missing_ok=True) 1035 - if image_path: 1036 - with contextlib.suppress(Exception): 1037 - Path(image_path).unlink(missing_ok=True) 1038 1153 return 1039 1154 1040 1155 ctx = UploadContext( 1041 1156 upload_id=upload_id, 1042 1157 auth_session=auth_session, 1043 - file_path=file_path, 1158 + audio_file_id=audio_file_id, 1044 1159 filename=filename, 1160 + duration=duration, 1045 1161 title=title, 1046 1162 artist_did=artist_did, 1047 1163 album=album, ··· 1049 1165 features_json=features_json, 1050 1166 tags=tags, 1051 1167 description=description, 1052 - image_path=image_path, 1053 - image_filename=image_filename, 1054 - image_content_type=image_content_type, 1168 + image_id=image_id, 1169 + image_url=image_url, 1170 + thumbnail_url=thumbnail_url, 1055 1171 support_gate=support_gate, 1056 1172 auto_tag=auto_tag, 1057 1173 unlisted=unlisted, ··· 1062 1178 async def schedule_track_upload(ctx: UploadContext) -> None: 1063 1179 """enqueue a track upload as a docket task. 1064 1180 1181 + by contract this function only forwards small primitives + storage 1182 + identifiers — never local filesystem paths. a worker may pick up 1183 + the task on a different fly machine than the one that handled the 1184 + request, and that machine has its own /tmp. 1185 + 1065 1186 the HTTP handler should return to the client as soon as this call 1066 - resolves; the actual R2/PDS/ATProto work runs on a docket worker with 1067 - bounded concurrency (`settings.docket.worker_concurrency`), which 1068 - prevents a burst of simultaneous uploads from saturating the DB pool. 1187 + resolves; the actual transcode/PDS/ATProto work runs on a docket 1188 + worker with bounded concurrency (`settings.docket.worker_concurrency`), 1189 + which prevents a burst of simultaneous uploads from saturating the 1190 + DB pool. 1069 1191 """ 1070 1192 docket = get_docket() 1071 1193 await docket.add(run_track_upload)( 1072 1194 upload_id=ctx.upload_id, 1073 1195 session_id=ctx.auth_session.session_id, 1074 - file_path=ctx.file_path, 1196 + audio_file_id=ctx.audio_file_id, 1075 1197 filename=ctx.filename, 1198 + duration=ctx.duration, 1076 1199 title=ctx.title, 1077 1200 artist_did=ctx.artist_did, 1078 1201 album=ctx.album, ··· 1080 1203 features_json=ctx.features_json, 1081 1204 tags=ctx.tags, 1082 1205 description=ctx.description, 1083 - image_path=ctx.image_path, 1084 - image_filename=ctx.image_filename, 1085 - image_content_type=ctx.image_content_type, 1206 + image_id=ctx.image_id, 1207 + image_url=ctx.image_url, 1208 + thumbnail_url=ctx.thumbnail_url, 1086 1209 support_gate=ctx.support_gate, 1087 1210 auto_tag=ctx.auto_tag, 1088 1211 unlisted=ctx.unlisted, ··· 1188 1311 f"supported: {AudioFormat.supported_extensions_str()}", 1189 1312 ) 1190 1313 1191 - # stream file to temp file (constant memory) 1192 - file_path = None 1193 - image_path = None 1314 + # stage audio + image to shared object storage BEFORE enqueueing the 1315 + # docket task. only stable file_ids travel over Redis to the worker — 1316 + # docket workers are not co-located with the request handler in 1317 + # production, and a /tmp path on machine A is meaningless on machine B. 1318 + # 1319 + # the handler now creates durable storage objects before any worker 1320 + # has taken ownership, so any abort between staging and a successful 1321 + # enqueue must roll those objects back AND mark the job FAILED — 1322 + # otherwise we'd leak audio/image bytes and leave the user's job stuck 1323 + # in PROCESSING forever. 1324 + upload_id = await job_service.create_job( 1325 + JobType.UPLOAD, auth_session.did, "upload queued for processing" 1326 + ) 1327 + is_gated = parsed_support_gate is not None 1328 + audio_extension = ext.lstrip(".") or None 1329 + 1330 + file_path: str | None = None 1331 + audio_file_id: str | None = None 1332 + image_id: str | None = None 1333 + image_url: str | None = None 1334 + thumbnail_url: str | None = None 1335 + enqueued = False 1194 1336 try: 1195 - # enforce max upload size 1196 1337 max_size = settings.storage.max_upload_size_mb * 1024 * 1024 1197 1338 bytes_read = 0 1198 - 1199 - with tempfile.NamedTemporaryFile( 1200 - delete=False, suffix=Path(file.filename).suffix 1201 - ) as tmp_file: 1339 + with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp_file: 1202 1340 file_path = tmp_file.name 1203 - # stream upload file to temp file in chunks 1204 1341 while chunk := await file.read(CHUNK_SIZE): 1205 1342 bytes_read += len(chunk) 1206 1343 if bytes_read > max_size: 1207 - # cleanup temp file before raising 1208 - tmp_file.close() 1209 - Path(file_path).unlink(missing_ok=True) 1210 1344 raise HTTPException( 1211 1345 status_code=413, 1212 - detail=f"file too large (max {settings.storage.max_upload_size_mb}MB)", 1346 + detail=( 1347 + f"file too large (max " 1348 + f"{settings.storage.max_upload_size_mb}MB)" 1349 + ), 1213 1350 ) 1214 1351 tmp_file.write(chunk) 1215 1352 1216 - # stream image to temp file if provided 1217 - image_filename = None 1218 - image_content_type = None 1353 + # extract duration once, while the bytes are still local — saves 1354 + # the worker an extra storage round-trip just to read length metadata. 1355 + with open(file_path, "rb") as f: 1356 + duration = extract_duration(f) 1357 + 1358 + # stage audio bytes to shared storage 1359 + with open(file_path, "rb") as f: 1360 + audio_file_id = await stage_audio_to_storage( 1361 + upload_id, f, file.filename, gated=is_gated 1362 + ) 1363 + 1364 + # stage image bytes to shared storage (best-effort; missing or 1365 + # invalid images don't fail the whole upload — the orchestrator 1366 + # treats `image_id is None` as "no track artwork"). 1219 1367 if image and image.filename: 1220 - image_filename = image.filename 1221 - image_content_type = image.content_type 1222 - # images have much smaller limit (20MB is generous for cover art) 1223 1368 max_image_size = 20 * 1024 * 1024 1369 + image_buffer = BytesIO() 1224 1370 image_bytes_read = 0 1225 - 1226 - with tempfile.NamedTemporaryFile( 1227 - delete=False, suffix=Path(image.filename).suffix 1228 - ) as tmp_image: 1229 - image_path = tmp_image.name 1230 - # stream image file to temp file in chunks 1231 - while chunk := await image.read(CHUNK_SIZE): 1232 - image_bytes_read += len(chunk) 1233 - if image_bytes_read > max_image_size: 1234 - # cleanup temp files before raising 1235 - tmp_image.close() 1236 - Path(image_path).unlink(missing_ok=True) 1237 - if file_path: 1238 - Path(file_path).unlink(missing_ok=True) 1239 - raise HTTPException( 1240 - status_code=413, 1241 - detail="image too large (max 20MB)", 1242 - ) 1243 - tmp_image.write(chunk) 1244 - 1245 - # create upload tracking via JobService 1246 - upload_id = await job_service.create_job( 1247 - JobType.UPLOAD, auth_session.did, "upload queued for processing" 1248 - ) 1371 + while chunk := await image.read(CHUNK_SIZE): 1372 + image_bytes_read += len(chunk) 1373 + if image_bytes_read > max_image_size: 1374 + raise HTTPException( 1375 + status_code=413, 1376 + detail="image too large (max 20MB)", 1377 + ) 1378 + image_buffer.write(chunk) 1379 + image_id, image_url, thumbnail_url = await stage_image_to_storage( 1380 + image_buffer.getvalue(), image.filename, image.content_type 1381 + ) 1249 1382 1250 - # schedule background processing once response is sent 1251 1383 ctx = UploadContext( 1252 1384 upload_id=upload_id, 1253 1385 auth_session=auth_session, 1254 - file_path=file_path, 1386 + audio_file_id=audio_file_id, 1255 1387 filename=file.filename, 1388 + duration=duration, 1256 1389 title=title, 1257 1390 artist_did=auth_session.did, 1258 1391 album=album, ··· 1260 1393 features_json=features, 1261 1394 tags=validated_tags, 1262 1395 description=description, 1263 - image_path=image_path, 1264 - image_filename=image_filename, 1265 - image_content_type=image_content_type, 1396 + image_id=image_id, 1397 + image_url=image_url, 1398 + thumbnail_url=thumbnail_url, 1266 1399 support_gate=parsed_support_gate, 1267 1400 auto_tag=auto_tag == "true", 1268 1401 unlisted=unlisted == "true", 1269 1402 ) 1270 1403 await schedule_track_upload(ctx) 1404 + enqueued = True 1271 1405 except Exception: 1406 + if not enqueued: 1407 + if audio_file_id: 1408 + await _delete_staged_audio( 1409 + audio_file_id, audio_extension, gated=is_gated 1410 + ) 1411 + if image_id: 1412 + with contextlib.suppress(Exception): 1413 + await storage.delete(image_id) 1414 + with contextlib.suppress(Exception): 1415 + await job_service.update_progress( 1416 + upload_id, 1417 + JobStatus.FAILED, 1418 + "upload failed", 1419 + error="upload aborted before queueing", 1420 + ) 1421 + raise 1422 + finally: 1423 + # the request-local temp file lives only inside this handler 1424 + # invocation. cleaning it up here means there's never a path for 1425 + # the worker to pick up — and never a /tmp leak if enqueue fails. 1272 1426 if file_path: 1273 1427 with contextlib.suppress(Exception): 1274 1428 Path(file_path).unlink(missing_ok=True) 1275 - if image_path: 1276 - with contextlib.suppress(Exception): 1277 - Path(image_path).unlink(missing_ok=True) 1278 - raise 1279 1429 1280 1430 return UploadStartResponse( 1281 1431 upload_id=upload_id,
+6 -4
backend/tests/api/test_upload_session_reload.py
··· 44 44 ctx = UploadContext( 45 45 upload_id="upload-1", 46 46 auth_session=old_session, 47 - file_path="/tmp/fake.mp3", 47 + audio_file_id="audio-file-1", 48 48 filename="fake.mp3", 49 + duration=60, 49 50 title="test track", 50 51 artist_did="did:plc:test", 51 52 album=None, ··· 112 113 return_value=None, 113 114 ), 114 115 patch( 115 - "backend._internal.get_session", 116 + "backend.api.tracks.uploads.get_session", 116 117 return_value=refreshed_session, 117 118 ), 118 119 ): ··· 131 132 ctx = UploadContext( 132 133 upload_id="upload-2", 133 134 auth_session=old_session, 134 - file_path="/tmp/fake.mp3", 135 + audio_file_id="audio-file-2", 135 136 filename="fake.mp3", 137 + duration=60, 136 138 title="test track", 137 139 artist_did="did:plc:test", 138 140 album=None, ··· 185 187 "backend.api.tracks.uploads._schedule_post_upload", 186 188 return_value=None, 187 189 ), 188 - patch("backend._internal.get_session", get_session_mock), 190 + patch("backend.api.tracks.uploads.get_session", get_session_mock), 189 191 ): 190 192 await _process_upload_background(ctx) 191 193
+241
backend/tests/api/test_upload_storage_cleanup.py
··· 1 + """regression: orphan-cleanup contract for the upload pipeline. 2 + 3 + context: the HTTP handler stages audio + image to shared object storage 4 + BEFORE enqueueing the docket task (a docket worker may run on a 5 + different fly machine than the request handler, so /tmp paths can't 6 + cross that boundary). that means by the time `_process_upload_background` 7 + runs, durable storage objects already exist that the worker hasn't yet 8 + taken DB ownership of. 9 + 10 + the contract this file verifies: 11 + 12 + * any abort during phases 1-5 (before `_create_records` reserves a 13 + DB row) MUST delete the staged audio_file_id and image_id; 14 + * once `_create_records` is entered, the orchestrator-level cleanup 15 + is suppressed because `_create_records` itself runs the 16 + reserve-then-publish cleanup (which knows whether the row was 17 + finalized by Jetstream or is still ours to delete); 18 + * for transcoded uploads, the rollback covers BOTH the lossless 19 + source (now `original_file_id`) and the transcoded sibling. 20 + """ 21 + 22 + from __future__ import annotations 23 + 24 + from unittest.mock import AsyncMock, patch 25 + 26 + from backend._internal import Session as AuthSession 27 + from backend._internal.audio import AudioFormat 28 + from backend.api.tracks.uploads import ( 29 + AudioInfo, 30 + StorageResult, 31 + TranscodeInfo, 32 + UploadContext, 33 + UploadPhaseError, 34 + _process_upload_background, 35 + ) 36 + 37 + 38 + def _session() -> AuthSession: 39 + return AuthSession( 40 + session_id="sess-1", 41 + did="did:plc:test", 42 + handle="test.bsky.social", 43 + oauth_session={"access_token": "tok"}, 44 + ) 45 + 46 + 47 + def _ctx(*, gated: bool = False, with_image: bool = True) -> UploadContext: 48 + return UploadContext( 49 + upload_id="upload-1", 50 + auth_session=_session(), 51 + audio_file_id="staged-audio-id", 52 + filename="song.mp3", 53 + duration=90, 54 + title="t", 55 + artist_did="did:plc:test", 56 + album=None, 57 + album_id=None, 58 + features_json=None, 59 + tags=[], 60 + image_id="staged-image-id" if with_image else None, 61 + image_url=( 62 + "https://images.example/staged-image-id.jpg" if with_image else None 63 + ), 64 + thumbnail_url=( 65 + "https://images.example/staged-image-id_thumb.jpg" if with_image else None 66 + ), 67 + support_gate={"type": "any"} if gated else None, 68 + ) 69 + 70 + 71 + class TestPhase1To5FailureDeletesStagedMedia: 72 + """phases 1-5 raising must delete the handler-staged storage objects.""" 73 + 74 + async def test_validate_audio_failure_deletes_audio_and_image(self) -> None: 75 + with ( 76 + patch( 77 + "backend.api.tracks.uploads._validate_audio", 78 + AsyncMock(side_effect=UploadPhaseError("bad audio")), 79 + ), 80 + patch( 81 + "backend.api.tracks.uploads.storage.delete", 82 + AsyncMock(return_value=True), 83 + ) as mock_delete, 84 + patch( 85 + "backend.api.tracks.uploads.storage.delete_gated", 86 + AsyncMock(return_value=True), 87 + ) as mock_delete_gated, 88 + patch("backend.api.tracks.uploads.job_service", AsyncMock()), 89 + ): 90 + await _process_upload_background(_ctx(gated=False)) 91 + 92 + # non-gated → audio in public bucket 93 + deleted_ids = [c.args[0] for c in mock_delete.call_args_list] 94 + assert "staged-audio-id" in deleted_ids 95 + assert "staged-image-id" in deleted_ids 96 + mock_delete_gated.assert_not_called() 97 + 98 + async def test_gated_audio_deleted_from_private_bucket(self) -> None: 99 + with ( 100 + patch( 101 + "backend.api.tracks.uploads._validate_audio", 102 + AsyncMock(side_effect=UploadPhaseError("bad audio")), 103 + ), 104 + patch( 105 + "backend.api.tracks.uploads.storage.delete", 106 + AsyncMock(return_value=True), 107 + ) as mock_delete, 108 + patch( 109 + "backend.api.tracks.uploads.storage.delete_gated", 110 + AsyncMock(return_value=True), 111 + ) as mock_delete_gated, 112 + patch("backend.api.tracks.uploads.job_service", AsyncMock()), 113 + ): 114 + await _process_upload_background(_ctx(gated=True)) 115 + 116 + # gated → staged audio lives in the private bucket; rollback 117 + # must route the audio delete to delete_gated. images are 118 + # never gated. 119 + deleted_gated_ids = [c.args[0] for c in mock_delete_gated.call_args_list] 120 + assert deleted_gated_ids == ["staged-audio-id"] 121 + deleted_public_ids = [c.args[0] for c in mock_delete.call_args_list] 122 + assert "staged-audio-id" not in deleted_public_ids 123 + assert "staged-image-id" in deleted_public_ids 124 + 125 + async def test_transcoded_failure_deletes_both_sibling_and_original( 126 + self, 127 + ) -> None: 128 + """if `_check_duplicate` fails AFTER `_store_audio` produced a 129 + transcoded sibling, BOTH the transcoded file (`sr.file_id`) and 130 + the lossless source (`sr.original_file_id`, = ctx.audio_file_id) 131 + must be cleaned up. 132 + """ 133 + sr = StorageResult( 134 + file_id="transcoded-mp3-id", 135 + original_file_id="staged-audio-id", # the lossless source 136 + original_file_type="flac", 137 + playable_format=AudioFormat.MP3, 138 + r2_url="https://audio.example/transcoded-mp3-id.mp3", 139 + transcode_info=TranscodeInfo( 140 + original_file_id="staged-audio-id", 141 + original_file_type="flac", 142 + transcoded_file_id="transcoded-mp3-id", 143 + transcoded_file_type="mp3", 144 + transcoded_data=b"", 145 + ), 146 + ) 147 + with ( 148 + patch( 149 + "backend.api.tracks.uploads._validate_audio", 150 + AsyncMock( 151 + return_value=AudioInfo( 152 + format=AudioFormat.FLAC, duration=90, is_gated=False 153 + ) 154 + ), 155 + ), 156 + patch( 157 + "backend.api.tracks.uploads._store_audio", 158 + AsyncMock(return_value=sr), 159 + ), 160 + patch( 161 + "backend.api.tracks.uploads._check_duplicate", 162 + AsyncMock(side_effect=UploadPhaseError("dup")), 163 + ), 164 + patch( 165 + "backend.api.tracks.uploads.storage.delete", 166 + AsyncMock(return_value=True), 167 + ) as mock_delete, 168 + patch("backend.api.tracks.uploads.job_service", AsyncMock()), 169 + ): 170 + await _process_upload_background(_ctx(gated=False)) 171 + 172 + deleted_ids = [c.args[0] for c in mock_delete.call_args_list] 173 + assert "transcoded-mp3-id" in deleted_ids # the playable sibling 174 + assert "staged-audio-id" in deleted_ids # the lossless source 175 + assert "staged-image-id" in deleted_ids # the cover art 176 + 177 + 178 + class TestPhase6FailureDefersToCreateRecords: 179 + """once `_create_records` is entered, the orchestrator MUST NOT run 180 + its own storage cleanup — `_create_records` already implements the 181 + reserve-then-publish cleanup that knows whether Jetstream took 182 + ownership of the row. 183 + """ 184 + 185 + async def test_create_records_failure_does_not_double_delete(self) -> None: 186 + sr = StorageResult( 187 + file_id="staged-audio-id", 188 + original_file_id=None, 189 + original_file_type=None, 190 + playable_format=AudioFormat.MP3, 191 + r2_url="https://audio.example/staged-audio-id.mp3", 192 + transcode_info=None, 193 + ) 194 + with ( 195 + patch( 196 + "backend.api.tracks.uploads._validate_audio", 197 + AsyncMock( 198 + return_value=AudioInfo( 199 + format=AudioFormat.MP3, duration=90, is_gated=False 200 + ) 201 + ), 202 + ), 203 + patch( 204 + "backend.api.tracks.uploads._store_audio", 205 + AsyncMock(return_value=sr), 206 + ), 207 + patch( 208 + "backend.api.tracks.uploads._check_duplicate", 209 + AsyncMock(return_value=None), 210 + ), 211 + patch( 212 + "backend.api.tracks.uploads._upload_to_pds", 213 + AsyncMock(return_value=None), 214 + ), 215 + patch( 216 + "backend.api.tracks.uploads._store_image", 217 + AsyncMock(return_value=(None, None, None)), 218 + ), 219 + patch( 220 + "backend.api.tracks.uploads._create_records", 221 + AsyncMock(side_effect=UploadPhaseError("PDS exploded")), 222 + ), 223 + patch( 224 + "backend.api.tracks.uploads.storage.delete", 225 + AsyncMock(return_value=True), 226 + ) as mock_delete, 227 + patch( 228 + "backend.api.tracks.uploads.storage.delete_gated", 229 + AsyncMock(return_value=True), 230 + ) as mock_delete_gated, 231 + patch("backend.api.tracks.uploads.job_service", AsyncMock()), 232 + ): 233 + await _process_upload_background(_ctx(gated=False)) 234 + 235 + # the orchestrator must not delete — _create_records owns the 236 + # cleanup logic past this boundary. (this test patches 237 + # _create_records itself so we don't observe any of its 238 + # internal cleanup; we're asserting the orchestrator-level 239 + # cleanup does NOT fire, regardless of what _create_records does.) 240 + mock_delete.assert_not_called() 241 + mock_delete_gated.assert_not_called()
+4 -2
backend/tests/api/track_audio_replace/_helpers.py
··· 84 84 ) 85 85 86 86 87 - def replace_ctx(track_id: int = 1) -> ReplaceContext: 87 + def replace_ctx(track_id: int = 1, *, is_gated: bool = False) -> ReplaceContext: 88 88 return ReplaceContext( 89 89 job_id="job-1", 90 90 auth_session=MockSession(OWNER_DID), 91 91 track_id=track_id, 92 - file_path="/tmp/fake-replacement.mp3", 92 + audio_file_id="staged-replacement-file-id", 93 93 filename="replacement.mp3", 94 + duration=200, 95 + is_gated=is_gated, 94 96 ) 95 97 96 98
+18 -5
backend/tests/api/track_audio_replace/test_endpoint.py
··· 99 99 async def test_returns_upload_id_and_schedules_background( 100 100 self, test_app_owner: FastAPI, db_session: AsyncSession, owner: Artist 101 101 ) -> None: 102 - """successful enqueue returns the SSE-pollable upload_id.""" 102 + """successful enqueue returns the SSE-pollable upload_id. 103 + 104 + the handler now stages the new audio bytes to shared object storage 105 + BEFORE enqueueing the docket task (so workers on other fly machines 106 + can pick it up without needing the request handler's /tmp). this 107 + test patches both stage_audio_to_storage and the docket scheduler 108 + so neither runs for real. 109 + """ 103 110 track = make_track() 104 111 db_session.add(track) 105 112 await db_session.commit() 106 113 await db_session.refresh(track) 107 114 108 - # patch the docket scheduler so the queued task is a no-op 109 - with patch( 110 - "backend.api.tracks.audio_replace.schedule_track_audio_replace", 111 - new_callable=AsyncMock, 115 + with ( 116 + patch( 117 + "backend.api.tracks.audio_replace.stage_audio_to_storage", 118 + new_callable=AsyncMock, 119 + return_value="staged-file-id", 120 + ), 121 + patch( 122 + "backend.api.tracks.audio_replace.schedule_track_audio_replace", 123 + new_callable=AsyncMock, 124 + ), 112 125 ): 113 126 async with AsyncClient( 114 127 transport=ASGITransport(app=test_app_owner), base_url="http://test"
+57 -2
backend/tests/api/track_audio_replace/test_pipeline.py
··· 388 388 store=storage_result(file_id="NEW", r2_url=None), 389 389 pds=None, # gated tracks skip PDS upload 390 390 ) as mocks: 391 - await _process_replace_background(replace_ctx(track_id=track_id)) 391 + await _process_replace_background( 392 + replace_ctx(track_id=track_id, is_gated=True) 393 + ) 392 394 393 395 # the OLD gated file was preserved as a revision with was_gated=True 394 396 revision = ( ··· 424 426 pds=None, 425 427 update_record_side_effect=RuntimeError("PDS exploded"), 426 428 ) as mocks: 427 - await _process_replace_background(replace_ctx(track_id=track_id)) 429 + await _process_replace_background( 430 + replace_ctx(track_id=track_id, is_gated=True) 431 + ) 428 432 429 433 # rollback deletes the NEW file from the PRIVATE bucket 430 434 mocks["storage_delete_gated"].assert_called_once() ··· 433 437 # track row is unchanged 434 438 await db_session.refresh(track) 435 439 assert track.file_id == "OLD" 440 + 441 + async def test_early_abort_rolls_back_handler_staged_file( 442 + self, db_session: AsyncSession, owner: Artist 443 + ) -> None: 444 + """regression: handler stages new audio to storage BEFORE the 445 + worker validates anything. an `UploadPhaseError` from 446 + `_validate_audio` (or any other phase that raises before 447 + `_store_audio` returns) used to leave the staged bytes orphaned 448 + in storage. now rollback initializes from `ctx.audio_file_id`, 449 + and `ctx.is_gated` selects the bucket. 450 + """ 451 + track = make_track(file_id="OLD", support_gate={"type": "any"}) 452 + db_session.add(track) 453 + await db_session.commit() 454 + await db_session.refresh(track) 455 + track_id = track.id 456 + 457 + # _validate_audio raises before _store_audio is ever called 458 + from backend.api.tracks.uploads import UploadPhaseError 459 + 460 + with ( 461 + patch( 462 + "backend.api.tracks.audio_replace._validate_audio", 463 + AsyncMock(side_effect=UploadPhaseError("nope")), 464 + ), 465 + patch( 466 + "backend.api.tracks.audio_replace.storage.delete_gated", 467 + AsyncMock(return_value=True), 468 + ) as mock_delete_gated, 469 + patch( 470 + "backend.api.tracks.audio_replace.storage.delete", 471 + AsyncMock(return_value=True), 472 + ) as mock_delete, 473 + patch("backend.api.tracks.audio_replace.job_service", AsyncMock()), 474 + patch( 475 + "backend.api.tracks.audio_replace.invalidate_tracks_discovery_cache", 476 + new_callable=AsyncMock, 477 + ), 478 + ): 479 + await _process_replace_background( 480 + replace_ctx(track_id=track_id, is_gated=True) 481 + ) 482 + 483 + # the handler-staged file (audio_file_id="staged-replacement-file-id" 484 + # from the helper) MUST be deleted from the private bucket — that's 485 + # where the handler put it for a gated track. 486 + mock_delete_gated.assert_called_once() 487 + assert mock_delete_gated.call_args.args[0] == "staged-replacement-file-id" 488 + # and not from the public bucket 489 + for call in mock_delete.call_args_list: 490 + assert call.args[0] != "staged-replacement-file-id" 436 491 437 492 438 493 class TestConcurrentMetadataPatch:
+3 -3
loq.toml
··· 40 40 41 41 [[rules]] 42 42 path = "backend/src/backend/api/tracks/uploads.py" 43 - max_lines = 1349 43 + max_lines = 1500 44 44 45 45 [[rules]] 46 46 path = "backend/src/backend/config.py" ··· 256 256 257 257 [[rules]] 258 258 path = "backend/src/backend/api/tracks/audio_replace.py" 259 - max_lines = 713 259 + max_lines = 798 260 260 261 261 [[rules]] 262 262 path = "backend/tests/conftest.py" ··· 264 264 265 265 [[rules]] 266 266 path = "backend/tests/api/track_audio_replace/test_pipeline.py" 267 - max_lines = 512 267 + max_lines = 567 268 268 269 269 [[rules]] 270 270 path = "backend/tests/api/track_audio_replace/test_revisions.py"