audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: surface atproto strongref fields in upload SSE stream (#1262)

Follow-up to #1260. The staging integration tests added in that PR
failed on first run because the upload-progress SSE endpoint whitelists
which fields from job.result get emitted, and the whitelist was track_id
and warnings only — atproto_uri and atproto_cid were written to the job
record by _process_upload_background but silently dropped on the way
out. The docs and public API reference both promised these fields, and
the frontend UploadResult type reads them, but nothing on the wire
actually delivered them.

Add the two strongref fields to the SSE whitelist in upload_progress.
Same mechanism as track_id and warnings — conditionally copy if the job
result carries the key.

Regression coverage:
- tests/api/test_upload_progress_sse.py (new) — pure unit tests that
mock job_service.get_job to return a completed Job with various
result-dict shapes and assert the SSE frame contains or omits the
strongref fields correctly. Runs in CI without staging tokens, so a
future regression is caught on the PR instead of post-deploy.
- tests/integration/test_album_upload.py already asserts on the same
fields and will pass post-deploy once this lands.

Functional note: the frontend currently collects these into UploadResult
but doesn't read them (finalize sends track_ids and the backend
resolves strongrefs from DB). Exposing the fields still matches the
stated design and keeps future callers unblocked if they want to build
items[] arrays client-side the way the album edit page's reorder does.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6 (1M context)
and committed by
GitHub
d323f651 d285a857

+144 -1
+8
backend/src/backend/api/tracks/uploads.py
··· 1192 1192 payload["track_id"] = job.result["track_id"] 1193 1193 if job.result and "warnings" in job.result: 1194 1194 payload["warnings"] = job.result["warnings"] 1195 + # surface the PDS strongRef so album upload callers can build 1196 + # items[] arrays without a follow-up DB query (see #1260). 1197 + # background writes these to job.result in _process_upload_background; 1198 + # without this whitelist entry they never reach the SSE stream. 1199 + if job.result and "atproto_uri" in job.result: 1200 + payload["atproto_uri"] = job.result["atproto_uri"] 1201 + if job.result and "atproto_cid" in job.result: 1202 + payload["atproto_cid"] = job.result["atproto_cid"] 1195 1203 1196 1204 yield f"data: {json.dumps(payload)}\n\n" 1197 1205
+135
backend/tests/api/test_upload_progress_sse.py
··· 1 + """unit tests for the SSE upload-progress endpoint field whitelist. 2 + 3 + regression coverage for the issue where atproto_uri and atproto_cid 4 + were written to job.result by the upload pipeline but never surfaced 5 + to the SSE stream because the endpoint whitelisted only track_id and 6 + warnings. caught by staging integration tests after #1260 merged. 7 + """ 8 + 9 + import json 10 + from datetime import UTC, datetime 11 + from unittest.mock import AsyncMock, patch 12 + 13 + import pytest 14 + from fastapi import FastAPI 15 + from httpx import ASGITransport, AsyncClient 16 + 17 + from backend.main import app 18 + from backend.models.job import Job, JobStatus, JobType 19 + 20 + 21 + def _make_completed_job( 22 + *, 23 + job_id: str = "test-upload-abc", 24 + track_id: int = 42, 25 + atproto_uri: str | None = "at://did:plc:test/fm.plyr.track/abc", 26 + atproto_cid: str | None = "bafyTestCid", 27 + warnings: list[str] | None = None, 28 + ) -> Job: 29 + """build a Job row with a completed upload and an arbitrary result dict.""" 30 + now = datetime.now(UTC) 31 + result: dict = {"track_id": track_id} 32 + if atproto_uri is not None: 33 + result["atproto_uri"] = atproto_uri 34 + if atproto_cid is not None: 35 + result["atproto_cid"] = atproto_cid 36 + if warnings: 37 + result["warnings"] = warnings 38 + 39 + job = Job( 40 + id=job_id, 41 + type=JobType.UPLOAD.value, 42 + owner_did="did:plc:test", 43 + status=JobStatus.COMPLETED.value, 44 + message="upload completed successfully", 45 + progress_pct=100.0, 46 + phase="atproto", 47 + result=result, 48 + error=None, 49 + created_at=now, 50 + completed_at=now, 51 + ) 52 + return job 53 + 54 + 55 + async def _read_first_completed_event(test_app: FastAPI, upload_id: str) -> dict: 56 + """hit the SSE endpoint once, parse the first `data:` frame, return its JSON.""" 57 + async with ( 58 + AsyncClient( 59 + transport=ASGITransport(app=test_app), base_url="http://test" 60 + ) as client, 61 + client.stream("GET", f"/tracks/uploads/{upload_id}/progress") as response, 62 + ): 63 + assert response.status_code == 200 64 + async for line in response.aiter_lines(): 65 + if line.startswith("data: "): 66 + return json.loads(line[6:]) 67 + raise AssertionError("SSE stream closed without emitting a data frame") 68 + 69 + 70 + @pytest.fixture 71 + def progress_app() -> FastAPI: 72 + """the app under test doesn't require auth on the progress endpoint.""" 73 + return app 74 + 75 + 76 + async def test_sse_payload_surfaces_atproto_strongref(progress_app: FastAPI): 77 + """atproto_uri and atproto_cid written to job.result MUST appear in the 78 + SSE completion payload so album upload callers can build finalize 79 + requests or any future consumer can grab the PDS strongRef without a 80 + follow-up DB query. 81 + 82 + regression: the endpoint previously whitelisted only track_id and 83 + warnings, silently dropping the strongRef fields the upload pipeline 84 + writes to job.result. the frontend UploadResult type and the docs 85 + both promise these fields — the SSE handler has to deliver them. 86 + """ 87 + job = _make_completed_job() 88 + 89 + with patch( 90 + "backend.api.tracks.uploads.job_service.get_job", 91 + new=AsyncMock(return_value=job), 92 + ): 93 + payload = await _read_first_completed_event(progress_app, job.id) 94 + 95 + assert payload["status"] == "completed" 96 + assert payload["track_id"] == 42 97 + assert payload["atproto_uri"] == "at://did:plc:test/fm.plyr.track/abc" 98 + assert payload["atproto_cid"] == "bafyTestCid" 99 + 100 + 101 + async def test_sse_payload_omits_absent_strongref(progress_app: FastAPI): 102 + """if the job.result doesn't have atproto_uri/atproto_cid (e.g. legacy 103 + jobs or failures), the SSE payload must not invent them — absent keys 104 + stay absent, no None pollution.""" 105 + job = _make_completed_job(atproto_uri=None, atproto_cid=None) 106 + 107 + with patch( 108 + "backend.api.tracks.uploads.job_service.get_job", 109 + new=AsyncMock(return_value=job), 110 + ): 111 + payload = await _read_first_completed_event(progress_app, job.id) 112 + 113 + assert payload["status"] == "completed" 114 + assert payload["track_id"] == 42 115 + assert "atproto_uri" not in payload 116 + assert "atproto_cid" not in payload 117 + 118 + 119 + async def test_sse_payload_preserves_warnings_alongside_strongref( 120 + progress_app: FastAPI, 121 + ): 122 + """both the warnings field and the strongRef fields must pass through 123 + the whitelist together without interfering with each other.""" 124 + job = _make_completed_job(warnings=["pds blob upload timed out, used r2"]) 125 + 126 + with patch( 127 + "backend.api.tracks.uploads.job_service.get_job", 128 + new=AsyncMock(return_value=job), 129 + ): 130 + payload = await _read_first_completed_event(progress_app, job.id) 131 + 132 + assert payload["track_id"] == 42 133 + assert payload["atproto_uri"] == "at://did:plc:test/fm.plyr.track/abc" 134 + assert payload["atproto_cid"] == "bafyTestCid" 135 + assert payload["warnings"] == ["pds blob upload timed out, used r2"]
+1 -1
loq.toml
··· 48 48 49 49 [[rules]] 50 50 path = "backend/src/backend/api/tracks/uploads.py" 51 - max_lines = 1215 51 + max_lines = 1223 52 52 53 53 [[rules]] 54 54 path = "backend/src/backend/config.py"