audio streaming app plyr.fm
37
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: make audioUrl optional in track lexicon, reject future timestamps (#1078)

audioUrl was required in the lexicon schema, forcing third-party ATProto
clients to provide a meaningless placeholder URL when audio is stored as
a blob on the PDS. now audioUrl is optional — records must have at least
one of audioUrl or audioBlob (enforced at ingest, not schema level, since
lexicons can't express oneOf).

also rejects records with createdAt more than 5 minutes in the future,
which caught a real bug where pdsx sent naive local time interpreted as UTC.

fixes circular import in tasks/__init__.py (jetstream → tasks.ingest →
tasks/__init__ → jetstream) by deferring consume_jetstream import.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6
and committed by
GitHub
c6ce6ec9 92667e4b

+159 -48
+6 -5
backend/src/backend/_internal/atproto/records/fm_plyr/track.py
··· 16 16 def build_track_record( 17 17 title: str, 18 18 artist: str, 19 - audio_url: str, 20 19 file_type: str, 20 + audio_url: str | None = None, 21 21 album: str | None = None, 22 22 duration: int | None = None, 23 23 features: list[dict[str, Any]] | None = None, ··· 32 32 args: 33 33 title: track title 34 34 artist: artist name 35 - audio_url: R2 URL for audio file (placeholder for gated tracks) 36 35 file_type: file extension (mp3, wav, etc) 36 + audio_url: optional R2 URL for audio file (omit when audioBlob is present) 37 37 album: optional album name 38 38 duration: optional duration in seconds 39 39 features: optional list of featured artists [{did, handle, display_name, avatar_url}] ··· 51 51 "$type": settings.atproto.track_collection, 52 52 "title": title, 53 53 "artist": artist, 54 - "audioUrl": audio_url, 55 54 "fileType": file_type, 56 55 "createdAt": ts, 57 56 } 57 + if audio_url is not None: 58 + record["audioUrl"] = audio_url 58 59 59 60 # add optional fields 60 61 if album: ··· 89 90 auth_session: AuthSession, 90 91 title: str, 91 92 artist: str, 92 - audio_url: str, 93 93 file_type: str, 94 + audio_url: str | None = None, 94 95 album: str | None = None, 95 96 duration: int | None = None, 96 97 features: list[dict[str, Any]] | None = None, ··· 111 112 auth_session: authenticated user session 112 113 title: track title 113 114 artist: artist name 114 - audio_url: R2 URL for audio file (placeholder URL for gated tracks) 115 115 file_type: file extension (mp3, wav, etc) 116 + audio_url: optional R2 URL for audio file (omit when audioBlob is present) 116 117 album: optional album name 117 118 duration: optional duration in seconds 118 119 features: optional list of featured artists [{did, handle, display_name, avatar_url}]
+57 -33
backend/src/backend/_internal/tasks/__init__.py
··· 7 7 """ 8 8 9 9 from backend._internal.export_tasks import process_export 10 - from backend._internal.jetstream import consume_jetstream 11 10 from backend._internal.pds_backfill_tasks import backfill_tracks_to_pds 12 11 from backend._internal.tasks.copyright import ( 13 12 scan_copyright, ··· 67 66 warm_follow_graph, 68 67 ) 69 68 70 - # collection of all background task functions for docket registration 71 - background_tasks = [ 72 - consume_jetstream, 73 - scan_copyright, 74 - sync_copyright_resolutions, 75 - process_export, 76 - sync_atproto, 77 - scrobble_to_teal, 78 - sync_album_list, 79 - pds_create_like, 80 - pds_delete_like, 81 - pds_create_comment, 82 - pds_delete_comment, 83 - pds_update_comment, 84 - backfill_tracks_to_pds, 85 - move_track_audio, 86 - generate_embedding, 87 - classify_genres, 88 - warm_follow_graph, 89 - ingest_track_create, 90 - ingest_track_update, 91 - ingest_track_delete, 92 - ingest_like_create, 93 - ingest_like_delete, 94 - ingest_comment_create, 95 - ingest_comment_update, 96 - ingest_comment_delete, 97 - ingest_list_create, 98 - ingest_list_update, 99 - ingest_list_delete, 100 - ingest_profile_update, 101 - ] 69 + 70 + def _build_background_tasks() -> list: 71 + """build the task list, deferring jetstream import to break circular dep. 72 + 73 + cycle: jetstream.py → tasks.ingest → tasks/__init__.py → jetstream.py 74 + """ 75 + from backend._internal.jetstream import consume_jetstream 76 + 77 + return [ 78 + consume_jetstream, 79 + scan_copyright, 80 + sync_copyright_resolutions, 81 + process_export, 82 + sync_atproto, 83 + scrobble_to_teal, 84 + sync_album_list, 85 + pds_create_like, 86 + pds_delete_like, 87 + pds_create_comment, 88 + pds_delete_comment, 89 + pds_update_comment, 90 + backfill_tracks_to_pds, 91 + move_track_audio, 92 + generate_embedding, 93 + classify_genres, 94 + warm_follow_graph, 95 + ingest_track_create, 96 + ingest_track_update, 97 + ingest_track_delete, 98 + ingest_like_create, 99 + ingest_like_delete, 100 + ingest_comment_create, 101 + ingest_comment_update, 102 + ingest_comment_delete, 103 + ingest_list_create, 104 + ingest_list_update, 105 + ingest_list_delete, 106 + ingest_profile_update, 107 + ] 108 + 109 + 110 + # lazily constructed on first access (docket worker startup) 111 + _background_tasks: list | None = None 112 + 113 + 114 + def __getattr__(name: str): 115 + if name == "background_tasks": 116 + global _background_tasks 117 + if _background_tasks is None: 118 + _background_tasks = _build_background_tasks() 119 + return _background_tasks 120 + if name == "consume_jetstream": 121 + from backend._internal.jetstream import consume_jetstream 122 + 123 + return consume_jetstream 124 + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") 125 + 102 126 103 127 __all__ = [ 104 128 "SubjectNotFoundError",
+21
backend/src/backend/_internal/tasks/ingest.py
··· 35 35 ) 36 36 37 37 38 + _MAX_CLOCK_SKEW = timedelta(minutes=5) 39 + 40 + 38 41 def _parse_datetime(value: str | None) -> datetime: 39 42 """parse an ISO 8601 datetime string, falling back to now.""" 40 43 if not value: ··· 44 47 return dt if dt.tzinfo else dt.replace(tzinfo=UTC) 45 48 except (ValueError, AttributeError): 46 49 return datetime.now(UTC) 50 + 51 + 52 + def _is_future_timestamp(value: str | None) -> bool: 53 + """return True if the timestamp is beyond acceptable clock skew.""" 54 + if not value: 55 + return False 56 + dt = _parse_datetime(value) 57 + return dt > datetime.now(UTC) + _MAX_CLOCK_SKEW 47 58 48 59 49 60 # --- track tasks --- ··· 69 80 """ 70 81 if errors := validate_record("fm.plyr.track", record): 71 82 logfire.warn("ingest: invalid track record, skipping", uri=uri, errors=errors) 83 + return 84 + 85 + if not record.get("audioUrl") and not record.get("audioBlob"): 86 + logfire.warn( 87 + "ingest: track has neither audioUrl nor audioBlob, skipping", uri=uri 88 + ) 89 + return 90 + 91 + if _is_future_timestamp(record.get("createdAt")): 92 + logfire.warn("ingest: track createdAt is in the future, skipping", uri=uri) 72 93 return 73 94 74 95 async with db_session() as db:
+47 -3
backend/tests/test_jetstream.py
··· 328 328 "fileId": "pds_only_001", 329 329 "fileType": "mp3", 330 330 "audioBlob": {"ref": {"$link": "bafypdsonly"}, "mimeType": "audio/mpeg"}, 331 - "audioUrl": "https://placeholder.example.com/pds_only_001.mp3", 332 331 "createdAt": _recent_ts(), 333 332 } 334 333 uri = "at://did:plc:jetstream_test/fm.plyr.track/pdsonly1" ··· 341 340 select(Track).where(Track.atproto_record_uri == uri) 342 341 ) 343 342 track = result.scalar_one() 344 - # audioUrl is required by lexicon but audioBlob is canonical — both present = "both" 345 - assert track.audio_storage == "both" 343 + assert track.audio_storage == "pds" 346 344 assert track.pds_blob_cid == "bafypdsonly" 345 + assert track.r2_url is None 346 + 347 + async def test_neither_audio_field_skipped( 348 + self, db_session: AsyncSession, artist: Artist 349 + ) -> None: 350 + """record with neither audioUrl nor audioBlob is rejected.""" 351 + record = { 352 + "title": "No Audio", 353 + "artist": "Test Artist", 354 + "fileId": "noaudio_001", 355 + "fileType": "mp3", 356 + "createdAt": _recent_ts(), 357 + } 358 + uri = "at://did:plc:jetstream_test/fm.plyr.track/noaudio1" 359 + 360 + await ingest_track_create( 361 + did=artist.did, rkey="noaudio1", record=record, uri=uri, cid="bafyno" 362 + ) 363 + 364 + result = await db_session.execute( 365 + select(Track).where(Track.atproto_record_uri == uri) 366 + ) 367 + assert result.scalar_one_or_none() is None 368 + 369 + async def test_future_timestamp_skipped( 370 + self, db_session: AsyncSession, artist: Artist 371 + ) -> None: 372 + """record with createdAt in the future is rejected.""" 373 + record = { 374 + "title": "Future Track", 375 + "artist": "Test Artist", 376 + "fileId": "future_001", 377 + "fileType": "mp3", 378 + "audioUrl": "https://r2.example.com/future_001.mp3", 379 + "createdAt": "2099-01-01T00:00:00Z", 380 + } 381 + uri = "at://did:plc:jetstream_test/fm.plyr.track/future1" 382 + 383 + await ingest_track_create( 384 + did=artist.did, rkey="future1", record=record, uri=uri, cid="bafyfuture" 385 + ) 386 + 387 + result = await db_session.execute( 388 + select(Track).where(Track.atproto_record_uri == uri) 389 + ) 390 + assert result.scalar_one_or_none() is None 347 391 348 392 async def test_track_create_sets_support_gate( 349 393 self, db_session: AsyncSession, artist: Artist
+21 -1
backend/tests/test_lexicon_validation.py
··· 28 28 errors = validate_record("fm.plyr.track", {}) 29 29 assert any("title" in e for e in errors) 30 30 assert any("artist" in e for e in errors) 31 - assert any("audioUrl" in e for e in errors) 32 31 assert any("fileType" in e for e in errors) 33 32 assert any("createdAt" in e for e in errors) 33 + 34 + def test_audio_blob_only_valid(self) -> None: 35 + """audioBlob alone passes schema validation (audioUrl is optional).""" 36 + record = { 37 + "title": "blob track", 38 + "artist": "test", 39 + "fileType": "mp3", 40 + "createdAt": "2025-01-01T00:00:00Z", 41 + "audioBlob": {"ref": {"$link": "bafytest"}, "mimeType": "audio/mpeg"}, 42 + } 43 + assert validate_record("fm.plyr.track", record) == [] 44 + 45 + def test_neither_audio_field_passes_schema(self) -> None: 46 + """no audio fields passes schema — business rule enforced at ingest.""" 47 + record = { 48 + "title": "no audio", 49 + "artist": "test", 50 + "fileType": "mp3", 51 + "createdAt": "2025-01-01T00:00:00Z", 52 + } 53 + assert validate_record("fm.plyr.track", record) == [] 34 54 35 55 def test_title_too_long(self) -> None: 36 56 record = {
+3 -2
docs/lexicons/overview.md
··· 33 33 34 34 ``` 35 35 key: tid (timestamp-based ID) 36 - required: title, artist, audioUrl, fileType, createdAt 37 - optional: album, duration, features, imageUrl 36 + required: title, artist, fileType, createdAt 37 + optional: audioUrl, audioBlob, album, duration, features, imageUrl 38 + note: at least one of audioUrl or audioBlob must be present 38 39 ``` 39 40 40 41 this was the first lexicon, established when the project began. tracks are stored in the user's PDS (Personal Data Server) and indexed by plyr.fm for discovery.
+2 -2
lexicons/track.json
··· 8 8 "key": "tid", 9 9 "record": { 10 10 "type": "object", 11 - "required": ["title", "artist", "audioUrl", "fileType", "createdAt"], 11 + "required": ["title", "artist", "fileType", "createdAt"], 12 12 "properties": { 13 13 "title": { 14 14 "type": "string", ··· 25 25 "audioUrl": { 26 26 "type": "string", 27 27 "format": "uri", 28 - "description": "URL to the audio file (currently R2 CDN URL)." 28 + "description": "URL to the audio file. Optional when audioBlob is present." 29 29 }, 30 30 "fileType": { 31 31 "type": "string",
+2 -2
loq.toml
··· 219 219 220 220 [[rules]] 221 221 path = "backend/src/backend/_internal/tasks/ingest.py" 222 - max_lines = 582 222 + max_lines = 603 223 223 224 224 [[rules]] 225 225 path = "backend/tests/test_jetstream.py" 226 - max_lines = 1290 226 + max_lines = 1334 227 227 228 228 [[rules]] 229 229 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte"