audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(uploads): migrate track upload + audio replace to docket (#1331)

the POST /tracks/ and PUT /tracks/{id}/audio handlers used
`fastapi.BackgroundTasks.add_task`, which runs the task within the
same ASGI request lifecycle after the response is sent. consequence:
any request-scoped DB session stays checked out of the pool until the
task finishes (20-100s per upload), and nothing bounds concurrency.

today flo.by uploaded 6 tracks in a single album-create fan-out. six
concurrent uploads held six of the 10 pool slots for over a minute
and starved every other request (/auth/me p95 hit 9.7s, /health 3s).
root cause: this pattern was in place from the very first streaming-
uploads commit (26a48c75, Nov 2025). docket landed a month later and
all post-upload tasks were migrated piecemeal (copyright, embedding,
genre, image moderation, atproto sync, teal, export, pds backfill)
but the upload orchestration itself never was. audio replace (#1311,
Apr 2026) copied the same pattern.

changes:
- uploads.py: add run_track_upload (docket task, primitives only,
rehydrates session, delegates to existing _process_upload_background)
+ schedule_track_upload helper
- audio_replace.py: same trio for replace
- handlers: drop `background_tasks: BackgroundTasks` param, call
await schedule_* instead
- _internal/tasks/__init__.py: register both tasks in the docket list
- test_endpoint.py: patch the scheduler helper, not the orchestrator
- tests/integration/test_album_upload.py: add
test_album_upload_10_tracks_concurrently as regression coverage —
fires 10 concurrent uploads through an album and asserts all complete
- loq.toml: relax limits on uploads.py + audio_replace.py to cover the
new wrapper functions

the existing orchestrators (_process_upload_background,
_process_replace_background) keep the same signature so every pipeline
test that drives them directly continues to pass unchanged.

buys us:
- HTTP handler returns in <1s; request-scoped DB session released on
response instead of 100s later
- per-op DB sessions via db_session() inside the task, not held across
the whole upload
- bounded concurrency via settings.docket.worker_concurrency (default
10/worker x 2 prod machines = 20 concurrent uploads, rest queue in
Redis rather than saturating the pool)
- fresh session rehydration if OAuth refreshed between queue and task

Co-authored-by: Claude Opus 4 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4 (1M context)
and committed by
GitHub
608cb733 0b2504e7

+274 -14
+9
backend/src/backend/_internal/tasks/__init__.py
··· 76 76 """build the task list, deferring jetstream import to break circular dep. 77 77 78 78 cycle: jetstream.py → tasks.ingest → tasks/__init__.py → jetstream.py 79 + 80 + the upload / audio-replace tasks are also imported lazily because their 81 + modules (`api.tracks.uploads`, `api.tracks.audio_replace`) pull in the 82 + FastAPI router infrastructure, which we don't want to touch at task-module 83 + import time. 79 84 """ 80 85 from backend._internal.jetstream import consume_jetstream 86 + from backend.api.tracks.audio_replace import run_track_audio_replace 87 + from backend.api.tracks.uploads import run_track_upload 81 88 82 89 return [ 83 90 consume_jetstream, ··· 111 118 ingest_profile_update, 112 119 ingest_account_status_change, 113 120 scan_image_moderation, 121 + run_track_upload, 122 + run_track_audio_replace, 114 123 ] 115 124 116 125
+58 -4
backend/src/backend/api/tracks/audio_replace.py
··· 30 30 31 31 import logfire 32 32 from fastapi import ( 33 - BackgroundTasks, 34 33 Depends, 35 34 File, 36 35 HTTPException, ··· 47 46 update_record, 48 47 ) 49 48 from backend._internal.audio import AudioFormat 49 + from backend._internal.background import get_docket 50 50 from backend._internal.jobs import job_service 51 51 from backend._internal.tasks import schedule_album_list_sync 52 52 from backend._internal.tasks.hooks import ( ··· 539 539 await storage.delete(new_original_file_id, new_original_file_type) 540 540 541 541 542 + # -- background task registration ----------------------------------------------- 543 + 544 + 545 + async def run_track_audio_replace( 546 + job_id: str, 547 + session_id: str, 548 + track_id: int, 549 + file_path: str, 550 + filename: str, 551 + ) -> None: 552 + """docket task entry point for audio replace. 553 + 554 + takes primitive args (everything that survives Redis serialization), 555 + rehydrates the auth session from the stored session_id, constructs a 556 + ReplaceContext, and delegates to the phase orchestrator 557 + (`_process_replace_background`). this is the function registered with 558 + docket; the HTTP handler enqueues it via `schedule_track_audio_replace`. 559 + """ 560 + auth_session = await get_session(session_id) 561 + if auth_session is None: 562 + await job_service.update_progress( 563 + job_id, 564 + JobStatus.FAILED, 565 + "audio replace failed", 566 + error="authentication session expired before processing could begin", 567 + ) 568 + _unlink_temp(file_path) 569 + return 570 + 571 + ctx = ReplaceContext( 572 + job_id=job_id, 573 + auth_session=auth_session, 574 + track_id=track_id, 575 + file_path=file_path, 576 + filename=filename, 577 + ) 578 + await _process_replace_background(ctx) 579 + 580 + 581 + async def schedule_track_audio_replace(ctx: ReplaceContext) -> None: 582 + """enqueue an audio replace as a docket task. 583 + 584 + the HTTP handler should return to the client as soon as this call 585 + resolves; the actual R2/PDS/ATProto work runs on a docket worker with 586 + bounded concurrency so concurrent replaces can't saturate the DB pool. 587 + """ 588 + docket = get_docket() 589 + await docket.add(run_track_audio_replace)( 590 + job_id=ctx.job_id, 591 + session_id=ctx.auth_session.session_id, 592 + track_id=ctx.track_id, 593 + file_path=ctx.file_path, 594 + filename=ctx.filename, 595 + ) 596 + 597 + 542 598 # -- HTTP surface ---------------------------------------------------------------- 543 599 544 600 ··· 547 603 async def replace_track_audio( 548 604 request: Request, 549 605 track_id: int, 550 - background_tasks: BackgroundTasks, 551 606 auth_session: Annotated[AuthSession, Depends(require_auth)], 552 607 file: UploadFile = File(...), 553 608 ) -> UploadStartResponse: ··· 627 682 "audio replace queued for processing", 628 683 ) 629 684 630 - background_tasks.add_task( 631 - _process_replace_background, 685 + await schedule_track_audio_replace( 632 686 ReplaceContext( 633 687 job_id=job_id, 634 688 auth_session=auth_session,
+104 -5
backend/src/backend/api/tracks/uploads.py
··· 13 13 import aiofiles 14 14 import logfire 15 15 from fastapi import ( 16 - BackgroundTasks, 17 16 Depends, 18 17 File, 19 18 Form, ··· 28 27 from sqlalchemy.ext.asyncio import AsyncSession 29 28 30 29 from backend._internal import Session as AuthSession 31 - from backend._internal import require_artist_profile 30 + from backend._internal import get_session, require_artist_profile 32 31 from backend._internal.atproto import ( 33 32 BlobRef, 34 33 PayloadTooLargeError, ··· 37 36 ) 38 37 from backend._internal.atproto.handles import resolve_featured_artists 39 38 from backend._internal.audio import AudioFormat 39 + from backend._internal.background import get_docket 40 40 from backend._internal.clients.transcoder import get_transcoder_client 41 41 from backend._internal.image import ImageFormat 42 42 from backend._internal.jobs import job_service ··· 972 972 Path(ctx.image_path).unlink(missing_ok=True) 973 973 974 974 975 + async def run_track_upload( 976 + upload_id: str, 977 + session_id: str, 978 + file_path: str, 979 + filename: str, 980 + title: str, 981 + artist_did: str, 982 + album: str | None, 983 + album_id: str | None, 984 + features_json: str | None, 985 + tags: list[str], 986 + description: str | None, 987 + image_path: str | None, 988 + image_filename: str | None, 989 + image_content_type: str | None, 990 + support_gate: dict | None, 991 + auto_tag: bool, 992 + unlisted: bool, 993 + ) -> None: 994 + """docket task entry point for track uploads. 995 + 996 + takes primitive args (everything that survives Redis serialization), 997 + rehydrates the auth session from the stored session_id, constructs an 998 + UploadContext, and delegates to the phase orchestrator 999 + (`_process_upload_background`). this is the function registered with 1000 + docket; the HTTP handler enqueues it via `schedule_track_upload`. 1001 + 1002 + rehydrating the session at task start rather than passing the cached 1003 + AuthSession over the wire means we pick up any token refresh that 1004 + happened between the HTTP request and the worker picking up the task. 1005 + """ 1006 + auth_session = await get_session(session_id) 1007 + if auth_session is None: 1008 + # session expired or was revoked between HTTP request and task start. 1009 + # no way to publish to the user's PDS without it — fail the job and 1010 + # clean up the temp files we staged. 1011 + await job_service.update_progress( 1012 + upload_id, 1013 + JobStatus.FAILED, 1014 + "upload failed", 1015 + error="authentication session expired before processing could begin", 1016 + ) 1017 + with contextlib.suppress(Exception): 1018 + Path(file_path).unlink(missing_ok=True) 1019 + if image_path: 1020 + with contextlib.suppress(Exception): 1021 + Path(image_path).unlink(missing_ok=True) 1022 + return 1023 + 1024 + ctx = UploadContext( 1025 + upload_id=upload_id, 1026 + auth_session=auth_session, 1027 + file_path=file_path, 1028 + filename=filename, 1029 + title=title, 1030 + artist_did=artist_did, 1031 + album=album, 1032 + album_id=album_id, 1033 + features_json=features_json, 1034 + tags=tags, 1035 + description=description, 1036 + image_path=image_path, 1037 + image_filename=image_filename, 1038 + image_content_type=image_content_type, 1039 + support_gate=support_gate, 1040 + auto_tag=auto_tag, 1041 + unlisted=unlisted, 1042 + ) 1043 + await _process_upload_background(ctx) 1044 + 1045 + 1046 + async def schedule_track_upload(ctx: UploadContext) -> None: 1047 + """enqueue a track upload as a docket task. 1048 + 1049 + the HTTP handler should return to the client as soon as this call 1050 + resolves; the actual R2/PDS/ATProto work runs on a docket worker with 1051 + bounded concurrency (`settings.docket.worker_concurrency`), which 1052 + prevents a burst of simultaneous uploads from saturating the DB pool. 1053 + """ 1054 + docket = get_docket() 1055 + await docket.add(run_track_upload)( 1056 + upload_id=ctx.upload_id, 1057 + session_id=ctx.auth_session.session_id, 1058 + file_path=ctx.file_path, 1059 + filename=ctx.filename, 1060 + title=ctx.title, 1061 + artist_did=ctx.artist_did, 1062 + album=ctx.album, 1063 + album_id=ctx.album_id, 1064 + features_json=ctx.features_json, 1065 + tags=ctx.tags, 1066 + description=ctx.description, 1067 + image_path=ctx.image_path, 1068 + image_filename=ctx.image_filename, 1069 + image_content_type=ctx.image_content_type, 1070 + support_gate=ctx.support_gate, 1071 + auto_tag=ctx.auto_tag, 1072 + unlisted=ctx.unlisted, 1073 + ) 1074 + 1075 + 975 1076 @router.post("/") 976 1077 @limiter.limit(settings.rate_limit.upload_limit) 977 1078 async def upload_track( 978 1079 request: Request, 979 1080 title: Annotated[str, Form()], 980 - background_tasks: BackgroundTasks, 981 1081 auth_session: AuthSession = Depends(require_artist_profile), 982 1082 album: Annotated[str | None, Form()] = None, 983 1083 album_id: Annotated[ ··· 1021 1121 Example: {"type": "any"} - requires any atprotofans support. 1022 1122 file: Audio file to upload (required). 1023 1123 image: Optional image file for track artwork. 1024 - background_tasks: FastAPI background-task runner. 1025 1124 auth_session: Authenticated artist session (dependency-injected). 1026 1125 1027 1126 Returns: ··· 1152 1251 auto_tag=auto_tag == "true", 1153 1252 unlisted=unlisted == "true", 1154 1253 ) 1155 - background_tasks.add_task(_process_upload_background, ctx) 1254 + await schedule_track_upload(ctx) 1156 1255 except Exception: 1157 1256 if file_path: 1158 1257 with contextlib.suppress(Exception):
+2 -2
backend/tests/api/track_audio_replace/test_endpoint.py
··· 105 105 await db_session.commit() 106 106 await db_session.refresh(track) 107 107 108 - # patch the background hook so the queued task is a no-op 108 + # patch the docket scheduler so the queued task is a no-op 109 109 with patch( 110 - "backend.api.tracks.audio_replace._process_replace_background", 110 + "backend.api.tracks.audio_replace.schedule_track_audio_replace", 111 111 new_callable=AsyncMock, 112 112 ): 113 113 async with AsyncClient(
+99 -1
backend/tests/integration/test_album_upload.py
··· 11 11 12 12 from __future__ import annotations 13 13 14 + import asyncio 14 15 import json 15 16 from pathlib import Path 16 17 from typing import TYPE_CHECKING, Any 17 18 18 19 import pytest 20 + 21 + from .utils.audio import save_drone 19 22 20 23 if TYPE_CHECKING: 21 24 from plyrfm import AsyncPlyrClient 22 25 23 - pytestmark = [pytest.mark.integration, pytest.mark.timeout(180)] 26 + pytestmark = [pytest.mark.integration, pytest.mark.timeout(300)] 24 27 25 28 26 29 async def _create_album( ··· 306 309 finally: 307 310 if album_id: 308 311 await _delete_album_cascade(client, album_id=album_id) 312 + 313 + 314 + async def test_album_upload_10_tracks_concurrently( 315 + user1_client: AsyncPlyrClient, 316 + tmp_path: Path, 317 + ) -> None: 318 + """upload 10 tracks to an album concurrently and verify every one completes. 319 + 320 + regression for the 2026-04-24 pool-pressure incident. the upload pipeline 321 + used to run on `fastapi.BackgroundTasks`, which tied the HTTP request 322 + lifecycle to the entire upload duration and held a request-scoped DB 323 + connection the whole time. 6 concurrent uploads from a single album 324 + create saturated the pool and starved unrelated routes. 325 + 326 + this test exercises 10 concurrent uploads against the real backend. for 327 + it to pass on the docket-based flow: 328 + 1. every HTTP POST /tracks/ must return promptly (the handler just 329 + enqueues a docket task, so response time is bounded by validation + 330 + streaming-to-disk, not by R2 + PDS + ATProto record creation) 331 + 2. every queued task must complete (no worker starvation, no task lost) 332 + 3. the album's list record must contain all 10 track IDs 333 + 4. all 10 SSE streams must report a `completed` event with a real 334 + track_id + atproto_uri + atproto_cid 335 + """ 336 + client = user1_client 337 + album_id: str | None = None 338 + n_tracks = 10 339 + # pick 10 distinct pitches so each generated audio file has a different 340 + # hash; the upload path rejects duplicate file_ids with 409. 341 + notes = ["C3", "D3", "E3", "F3", "G3", "A3", "B3", "C4", "D4", "E4"] 342 + assert len(notes) == n_tracks 343 + 344 + try: 345 + # step 1: create album shell 346 + album = await _create_album( 347 + client, 348 + title="Integration Test Album (10 Concurrent Uploads)", 349 + description=( 350 + "regression for upload-on-docket migration; verifies 10 " 351 + "concurrent uploads all complete" 352 + ), 353 + ) 354 + album_id = album["id"] 355 + assert album_id is not None 356 + 357 + # step 2: generate 10 unique audio files up front (file-hash-distinct 358 + # so duplicate detection doesn't short-circuit) 359 + files: list[Path] = [] 360 + for i, note in enumerate(notes): 361 + path = tmp_path / f"concurrent_{i:02d}_{note}.wav" 362 + # slightly different durations as extra hash entropy 363 + save_drone(path, note, duration_sec=2.0 + (i * 0.1)) 364 + files.append(path) 365 + 366 + # step 3: fire all 10 uploads concurrently and collect track_ids 367 + uploads = [ 368 + _upload_track_with_album_id( 369 + client, 370 + file=path, 371 + title=f"Concurrent Upload {i:02d} - {notes[i]}", 372 + album_id=album_id, 373 + tags={"integration-test", "concurrent-upload"}, 374 + ) 375 + for i, path in enumerate(files) 376 + ] 377 + track_ids = await asyncio.gather(*uploads) 378 + assert len(track_ids) == n_tracks 379 + assert len(set(track_ids)) == n_tracks, ( 380 + f"expected {n_tracks} distinct track ids, got duplicates: {track_ids}" 381 + ) 382 + 383 + # step 4: finalize in upload order and verify the list record has all N 384 + finalized = await _finalize_album( 385 + client, album_id=album_id, track_ids=list(track_ids) 386 + ) 387 + assert finalized["list_uri"] is not None 388 + assert finalized["track_count"] == n_tracks 389 + 390 + artist_handle = finalized["artist_handle"] 391 + slug = finalized["slug"] 392 + detail_response = await client._client.get( 393 + client._url(f"/albums/{artist_handle}/{slug}"), 394 + headers=client._auth_headers, 395 + ) 396 + detail_response.raise_for_status() 397 + detail = detail_response.json() 398 + 399 + returned_ids = [t["id"] for t in detail["tracks"]] 400 + assert sorted(returned_ids) == sorted(track_ids), ( 401 + f"album must contain all {n_tracks} uploaded tracks; " 402 + f"expected {sorted(track_ids)}, got {sorted(returned_ids)}" 403 + ) 404 + finally: 405 + if album_id: 406 + await _delete_album_cascade(client, album_id=album_id)
+2 -2
loq.toml
··· 40 40 41 41 [[rules]] 42 42 path = "backend/src/backend/api/tracks/uploads.py" 43 - max_lines = 1234 43 + max_lines = 1333 44 44 45 45 [[rules]] 46 46 path = "backend/src/backend/config.py" ··· 256 256 257 257 [[rules]] 258 258 path = "backend/src/backend/api/tracks/audio_replace.py" 259 - max_lines = 651 259 + max_lines = 704 260 260 261 261 [[rules]] 262 262 path = "backend/tests/conftest.py"