audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add lexicon validation and ingest task layer (#1069)

add record-processing functions for all plyr.fm ATProto record types
(track, like, comment, list, profile) with:

- lexicon-based record validation before DB writes
- SubjectNotFoundError for retry on ordering gaps (likes/comments
arriving before their subject track)
- unique partial index on atproto_record_uri with IntegrityError catch
for replay safety
- full field reconciliation: supportGate, features, audio storage,
album/duration extras, playlist track_count
- dual-source audio_storage='both' for records with both PDS blob
and R2 URL

these tasks are registered with docket but dormant until the Jetstream
consumer (next PR) dispatches events to them.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6
and committed by
GitHub
02cec63d c67b5f13

+1962
+38
backend/alembic/versions/2026_03_10_151042_e2d11e296633_add_partial_index_on_tracks_atproto_.py
··· 1 + """add partial index on tracks atproto_record_uri 2 + 3 + Revision ID: e2d11e296633 4 + Revises: bcf223076d43 5 + Create Date: 2026-03-10 15:10:42.233072 6 + 7 + """ 8 + 9 + from collections.abc import Sequence 10 + 11 + import sqlalchemy as sa 12 + 13 + from alembic import op 14 + 15 + # revision identifiers, used by Alembic. 16 + revision: str = "e2d11e296633" 17 + down_revision: str | Sequence[str] | None = "bcf223076d43" 18 + branch_labels: str | Sequence[str] | None = None 19 + depends_on: str | Sequence[str] | None = None 20 + 21 + 22 + def upgrade() -> None: 23 + """Upgrade schema.""" 24 + op.create_index( 25 + "ix_tracks_atproto_record_uri_partial", 26 + "tracks", 27 + ["atproto_record_uri"], 28 + unique=True, 29 + postgresql_where=sa.text("atproto_record_uri IS NOT NULL"), 30 + ) 31 + 32 + 33 + def downgrade() -> None: 34 + """Downgrade schema.""" 35 + op.drop_index( 36 + "ix_tracks_atproto_record_uri_partial", 37 + table_name="tracks", 38 + )
+40
backend/src/backend/_internal/tasks/__init__.py
··· 24 24 invalidate_tracks_discovery_cache, 25 25 run_post_track_create_hooks, 26 26 ) 27 + from backend._internal.tasks.ingest import ( 28 + SubjectNotFoundError, 29 + ingest_comment_create, 30 + ingest_comment_delete, 31 + ingest_comment_update, 32 + ingest_like_create, 33 + ingest_like_delete, 34 + ingest_list_create, 35 + ingest_list_delete, 36 + ingest_list_update, 37 + ingest_profile_update, 38 + ingest_track_create, 39 + ingest_track_delete, 40 + ingest_track_update, 41 + ) 27 42 from backend._internal.tasks.pds import ( 28 43 pds_create_comment, 29 44 pds_create_like, ··· 69 84 generate_embedding, 70 85 classify_genres, 71 86 warm_follow_graph, 87 + ingest_track_create, 88 + ingest_track_update, 89 + ingest_track_delete, 90 + ingest_like_create, 91 + ingest_like_delete, 92 + ingest_comment_create, 93 + ingest_comment_update, 94 + ingest_comment_delete, 95 + ingest_list_create, 96 + ingest_list_update, 97 + ingest_list_delete, 98 + ingest_profile_update, 72 99 ] 73 100 74 101 __all__ = [ 102 + "SubjectNotFoundError", 75 103 "background_tasks", 76 104 "classify_genres", 77 105 "generate_embedding", 106 + "ingest_comment_create", 107 + "ingest_comment_delete", 108 + "ingest_comment_update", 109 + "ingest_like_create", 110 + "ingest_like_delete", 111 + "ingest_list_create", 112 + "ingest_list_delete", 113 + "ingest_list_update", 114 + "ingest_profile_update", 115 + "ingest_track_create", 116 + "ingest_track_delete", 117 + "ingest_track_update", 78 118 "invalidate_tracks_discovery_cache", 79 119 "move_track_audio", 80 120 "pds_create_comment",
+539
backend/src/backend/_internal/tasks/ingest.py
··· 1 + """Jetstream event ingestion tasks. 2 + 3 + each task resolves an ATProto record event into the database. they are 4 + dispatched by the JetstreamConsumer via docket and run asynchronously. 5 + 6 + all tasks are idempotent — duplicate events are safely skipped via 7 + unique constraint checks or existence queries. 8 + """ 9 + 10 + import logging 11 + from datetime import UTC, datetime, timedelta 12 + 13 + import logfire 14 + from docket import ConcurrencyLimit, ExponentialRetry 15 + from sqlalchemy import delete, select, update 16 + from sqlalchemy.exc import IntegrityError 17 + 18 + from backend._internal.atproto.client import pds_blob_url 19 + from backend._internal.tasks.hooks import run_post_track_create_hooks 20 + from backend.models import Artist, Playlist, Track, TrackComment, TrackLike 21 + from backend.utilities.database import db_session 22 + from backend.utilities.lexicon import validate_record 23 + 24 + logger = logging.getLogger(__name__) 25 + 26 + 27 + class SubjectNotFoundError(Exception): 28 + """referenced subject (track) not yet indexed — triggers retry.""" 29 + 30 + 31 + _INGEST_RETRY = ExponentialRetry( 32 + attempts=4, 33 + minimum_delay=timedelta(seconds=1), 34 + maximum_delay=timedelta(seconds=30), 35 + ) 36 + 37 + 38 + def _parse_datetime(value: str | None) -> datetime: 39 + """parse an ISO 8601 datetime string, falling back to now.""" 40 + if not value: 41 + return datetime.now(UTC) 42 + try: 43 + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) 44 + return dt if dt.tzinfo else dt.replace(tzinfo=UTC) 45 + except (ValueError, AttributeError): 46 + return datetime.now(UTC) 47 + 48 + 49 + # --- track tasks --- 50 + 51 + 52 + async def ingest_track_create( 53 + did: str, 54 + rkey: str, 55 + record: dict, 56 + uri: str, 57 + cid: str | None, 58 + retry: ExponentialRetry = _INGEST_RETRY, 59 + concurrency: ConcurrencyLimit = ConcurrencyLimit("did", max_concurrent=3), # noqa: B008 60 + ) -> None: 61 + """create a track from a Jetstream event. 62 + 63 + args: 64 + did: the DID of the artist 65 + rkey: record key 66 + record: the ATProto record data 67 + uri: the AT URI (at://did/collection/rkey) 68 + cid: content identifier 69 + """ 70 + if errors := validate_record("fm.plyr.track", record): 71 + logfire.warn("ingest: invalid track record, skipping", uri=uri, errors=errors) 72 + return 73 + 74 + async with db_session() as db: 75 + # verify artist exists 76 + artist = await db.get(Artist, did) 77 + if not artist: 78 + logger.debug("ingest_track_create: unknown artist %s, skipping", did) 79 + return 80 + 81 + # dedup by AT URI 82 + existing = await db.execute( 83 + select(Track.id).where(Track.atproto_record_uri == uri).limit(1) 84 + ) 85 + if existing.scalar_one_or_none() is not None: 86 + logger.debug("ingest_track_create: duplicate URI %s, skipping", uri) 87 + return 88 + 89 + # determine audio storage type 90 + audio_blob = record.get("audioBlob") 91 + audio_url = record.get("audioUrl") 92 + pds_blob_cid = ( 93 + audio_blob.get("ref", {}).get("$link") 94 + if isinstance(audio_blob, dict) 95 + else None 96 + ) 97 + if audio_blob and audio_url: 98 + audio_storage = "both" 99 + elif audio_blob: 100 + audio_storage = "pds" 101 + elif audio_url: 102 + audio_storage = "r2" 103 + pds_blob_cid = None 104 + else: 105 + audio_storage = "pds" 106 + 107 + # build extra dict 108 + extra: dict = {} 109 + if duration := record.get("duration"): 110 + extra["duration"] = duration 111 + if album := record.get("album"): 112 + extra["album"] = album 113 + 114 + track = Track( 115 + title=record.get("title", "untitled"), 116 + file_id=record.get("fileId", rkey), 117 + file_type=record.get("fileType", "mp3"), 118 + artist_did=did, 119 + r2_url=audio_url if audio_storage in ("r2", "both") else None, 120 + atproto_record_uri=uri, 121 + atproto_record_cid=cid, 122 + audio_storage=audio_storage, 123 + pds_blob_cid=pds_blob_cid, 124 + description=record.get("description"), 125 + image_url=record.get("imageUrl"), 126 + support_gate=record.get("supportGate"), 127 + features=record.get("features"), 128 + created_at=_parse_datetime(record.get("createdAt")), 129 + extra=extra, 130 + ) 131 + db.add(track) 132 + try: 133 + await db.commit() 134 + except IntegrityError: 135 + logger.debug("ingest_track_create: duplicate URI %s (race), skipping", uri) 136 + return 137 + 138 + # resolve audio URL for post-creation hooks 139 + resolved_audio_url = track.r2_url 140 + if not resolved_audio_url and pds_blob_cid and artist.pds_url: 141 + resolved_audio_url = pds_blob_url(artist.pds_url, did, pds_blob_cid) 142 + 143 + logfire.info( 144 + "ingest: track created", 145 + uri=uri, 146 + artist_did=did, 147 + audio_storage=audio_storage, 148 + ) 149 + 150 + # shared post-creation hooks (copyright, embeddings, cache, etc.) 151 + await run_post_track_create_hooks(track.id, audio_url=resolved_audio_url) 152 + 153 + 154 + async def ingest_track_update( 155 + did: str, 156 + rkey: str, 157 + record: dict, 158 + uri: str, 159 + cid: str | None, 160 + retry: ExponentialRetry = _INGEST_RETRY, 161 + ) -> None: 162 + """update mutable fields on an existing track.""" 163 + if errors := validate_record("fm.plyr.track", record, partial=True): 164 + logfire.warn("ingest: invalid track record, skipping", uri=uri, errors=errors) 165 + return 166 + 167 + async with db_session() as db: 168 + result = await db.execute( 169 + select(Track).where(Track.atproto_record_uri == uri).limit(1) 170 + ) 171 + track = result.scalar_one_or_none() 172 + if not track: 173 + logger.debug("ingest_track_update: track %s not found, skipping", uri) 174 + return 175 + 176 + if title := record.get("title"): 177 + track.title = title 178 + if description := record.get("description"): 179 + track.description = description 180 + if image_url := record.get("imageUrl"): 181 + track.image_url = image_url 182 + if cid: 183 + track.atproto_record_cid = cid 184 + 185 + # audio storage fields 186 + audio_blob = record.get("audioBlob") 187 + audio_url = record.get("audioUrl") 188 + if audio_blob and isinstance(audio_blob, dict) and audio_url: 189 + track.audio_storage = "both" 190 + track.pds_blob_cid = audio_blob.get("ref", {}).get("$link") 191 + track.r2_url = audio_url 192 + elif audio_blob and isinstance(audio_blob, dict): 193 + track.audio_storage = "pds" 194 + track.pds_blob_cid = audio_blob.get("ref", {}).get("$link") 195 + track.r2_url = None 196 + elif audio_url: 197 + track.audio_storage = "r2" 198 + track.r2_url = audio_url 199 + track.pds_blob_cid = None 200 + if file_type := record.get("fileType"): 201 + track.file_type = file_type 202 + 203 + # gating 204 + if "supportGate" in record: 205 + track.support_gate = record["supportGate"] 206 + 207 + # features 208 + if (features := record.get("features")) is not None: 209 + track.features = features 210 + 211 + # extra fields (album, duration) — reassign to trigger change detection 212 + extra = dict(track.extra or {}) 213 + extra_changed = False 214 + if (duration := record.get("duration")) is not None: 215 + extra["duration"] = duration 216 + extra_changed = True 217 + if (album := record.get("album")) is not None: 218 + extra["album"] = album 219 + extra_changed = True 220 + if extra_changed: 221 + track.extra = extra 222 + 223 + await db.commit() 224 + logfire.info("ingest: track updated", uri=uri, artist_did=did) 225 + 226 + 227 + async def ingest_track_delete( 228 + did: str, 229 + rkey: str, 230 + uri: str, 231 + retry: ExponentialRetry = _INGEST_RETRY, 232 + ) -> None: 233 + """delete a track by its AT URI.""" 234 + async with db_session() as db: 235 + result = await db.execute(delete(Track).where(Track.atproto_record_uri == uri)) 236 + if result.rowcount: # type: ignore[union-attr] 237 + await db.commit() 238 + logfire.info("ingest: track deleted", uri=uri, artist_did=did) 239 + else: 240 + logger.debug("ingest_track_delete: track %s not found", uri) 241 + 242 + 243 + # --- like tasks --- 244 + 245 + 246 + async def ingest_like_create( 247 + did: str, 248 + rkey: str, 249 + record: dict, 250 + uri: str, 251 + cid: str | None = None, 252 + retry: ExponentialRetry = _INGEST_RETRY, 253 + ) -> None: 254 + """create a like from a Jetstream event.""" 255 + if errors := validate_record("fm.plyr.like", record): 256 + logfire.warn("ingest: invalid like record, skipping", uri=uri, errors=errors) 257 + return 258 + 259 + subject = record.get("subject", {}) 260 + subject_uri = subject.get("uri", "") 261 + 262 + async with db_session() as db: 263 + # resolve subject track 264 + result = await db.execute( 265 + select(Track.id).where(Track.atproto_record_uri == subject_uri).limit(1) 266 + ) 267 + track_id = result.scalar_one_or_none() 268 + if track_id is None: 269 + raise SubjectNotFoundError( 270 + f"ingest_like_create: subject track {subject_uri} not found" 271 + ) 272 + 273 + # dedup: unique constraint on (track_id, user_did) 274 + existing = await db.execute( 275 + select(TrackLike.id) 276 + .where(TrackLike.track_id == track_id, TrackLike.user_did == did) 277 + .limit(1) 278 + ) 279 + if existing.scalar_one_or_none() is not None: 280 + logger.debug( 281 + "ingest_like_create: duplicate like for track %d by %s", track_id, did 282 + ) 283 + return 284 + 285 + like = TrackLike( 286 + track_id=track_id, 287 + user_did=did, 288 + atproto_like_uri=uri, 289 + created_at=_parse_datetime(record.get("createdAt")), 290 + ) 291 + db.add(like) 292 + await db.commit() 293 + logfire.info("ingest: like created", uri=uri, user_did=did, track_id=track_id) 294 + 295 + 296 + async def ingest_like_delete( 297 + did: str, 298 + rkey: str, 299 + uri: str, 300 + retry: ExponentialRetry = _INGEST_RETRY, 301 + ) -> None: 302 + """delete a like by its AT URI.""" 303 + async with db_session() as db: 304 + result = await db.execute( 305 + delete(TrackLike).where(TrackLike.atproto_like_uri == uri) 306 + ) 307 + if result.rowcount: # type: ignore[union-attr] 308 + await db.commit() 309 + logfire.info("ingest: like deleted", uri=uri) 310 + else: 311 + logger.debug("ingest_like_delete: like %s not found", uri) 312 + 313 + 314 + # --- comment tasks --- 315 + 316 + 317 + async def ingest_comment_create( 318 + did: str, 319 + rkey: str, 320 + record: dict, 321 + uri: str, 322 + cid: str | None = None, 323 + retry: ExponentialRetry = _INGEST_RETRY, 324 + ) -> None: 325 + """create a comment from a Jetstream event.""" 326 + if errors := validate_record("fm.plyr.comment", record): 327 + logfire.warn("ingest: invalid comment record, skipping", uri=uri, errors=errors) 328 + return 329 + 330 + subject = record.get("subject", {}) 331 + subject_uri = subject.get("uri", "") 332 + 333 + async with db_session() as db: 334 + # resolve subject track 335 + result = await db.execute( 336 + select(Track.id).where(Track.atproto_record_uri == subject_uri).limit(1) 337 + ) 338 + track_id = result.scalar_one_or_none() 339 + if track_id is None: 340 + raise SubjectNotFoundError( 341 + f"ingest_comment_create: subject track {subject_uri} not found" 342 + ) 343 + 344 + comment = TrackComment( 345 + track_id=track_id, 346 + user_did=did, 347 + text=record.get("text", ""), 348 + timestamp_ms=record.get("timestampMs", 0), 349 + atproto_comment_uri=uri, 350 + created_at=_parse_datetime(record.get("createdAt")), 351 + ) 352 + db.add(comment) 353 + await db.commit() 354 + logfire.info( 355 + "ingest: comment created", uri=uri, user_did=did, track_id=track_id 356 + ) 357 + 358 + 359 + async def ingest_comment_update( 360 + did: str, 361 + rkey: str, 362 + record: dict, 363 + uri: str, 364 + cid: str | None = None, 365 + retry: ExponentialRetry = _INGEST_RETRY, 366 + ) -> None: 367 + """update comment text and timestamp.""" 368 + if errors := validate_record("fm.plyr.comment", record, partial=True): 369 + logfire.warn("ingest: invalid comment record, skipping", uri=uri, errors=errors) 370 + return 371 + 372 + values: dict = {} 373 + if text := record.get("text"): 374 + values["text"] = text 375 + if (ts := record.get("timestampMs")) is not None: 376 + values["timestamp_ms"] = ts 377 + 378 + if not values: 379 + return 380 + 381 + values["updated_at"] = datetime.now(UTC) 382 + 383 + async with db_session() as db: 384 + result = await db.execute( 385 + update(TrackComment) 386 + .where(TrackComment.atproto_comment_uri == uri) 387 + .values(**values) 388 + ) 389 + if result.rowcount: # type: ignore[union-attr] 390 + await db.commit() 391 + logfire.info("ingest: comment updated", uri=uri) 392 + else: 393 + logger.debug("ingest_comment_update: comment %s not found", uri) 394 + 395 + 396 + async def ingest_comment_delete( 397 + did: str, 398 + rkey: str, 399 + uri: str, 400 + retry: ExponentialRetry = _INGEST_RETRY, 401 + ) -> None: 402 + """delete a comment by its AT URI.""" 403 + async with db_session() as db: 404 + result = await db.execute( 405 + delete(TrackComment).where(TrackComment.atproto_comment_uri == uri) 406 + ) 407 + if result.rowcount: # type: ignore[union-attr] 408 + await db.commit() 409 + logfire.info("ingest: comment deleted", uri=uri) 410 + else: 411 + logger.debug("ingest_comment_delete: comment %s not found", uri) 412 + 413 + 414 + # --- list (playlist) tasks --- 415 + 416 + 417 + async def ingest_list_create( 418 + did: str, 419 + rkey: str, 420 + record: dict, 421 + uri: str, 422 + cid: str | None = None, 423 + retry: ExponentialRetry = _INGEST_RETRY, 424 + ) -> None: 425 + """create a playlist from a Jetstream list event. 426 + 427 + only processes listType="playlist" — skips albums and liked lists. 428 + """ 429 + if errors := validate_record("fm.plyr.list", record): 430 + logfire.warn("ingest: invalid list record, skipping", uri=uri, errors=errors) 431 + return 432 + 433 + list_type = record.get("listType", "") 434 + if list_type != "playlist": 435 + logger.debug("ingest_list_create: skipping listType=%s", list_type) 436 + return 437 + 438 + async with db_session() as db: 439 + # verify artist exists 440 + artist = await db.get(Artist, did) 441 + if not artist: 442 + logger.debug("ingest_list_create: unknown artist %s", did) 443 + return 444 + 445 + # dedup by AT URI 446 + existing = await db.execute( 447 + select(Playlist.id).where(Playlist.atproto_record_uri == uri).limit(1) 448 + ) 449 + if existing.scalar_one_or_none() is not None: 450 + logger.debug("ingest_list_create: duplicate URI %s", uri) 451 + return 452 + 453 + playlist = Playlist( 454 + owner_did=did, 455 + name=record.get("name", "untitled"), 456 + track_count=len(record.get("items", [])), 457 + atproto_record_uri=uri, 458 + atproto_record_cid=cid or "", 459 + created_at=_parse_datetime(record.get("createdAt")), 460 + ) 461 + db.add(playlist) 462 + await db.commit() 463 + logfire.info("ingest: playlist created", uri=uri, owner_did=did) 464 + 465 + 466 + async def ingest_list_update( 467 + did: str, 468 + rkey: str, 469 + record: dict, 470 + uri: str, 471 + cid: str | None = None, 472 + retry: ExponentialRetry = _INGEST_RETRY, 473 + ) -> None: 474 + """update playlist metadata.""" 475 + if errors := validate_record("fm.plyr.list", record, partial=True): 476 + logfire.warn("ingest: invalid list record, skipping", uri=uri, errors=errors) 477 + return 478 + 479 + async with db_session() as db: 480 + result = await db.execute( 481 + select(Playlist).where(Playlist.atproto_record_uri == uri).limit(1) 482 + ) 483 + playlist = result.scalar_one_or_none() 484 + if not playlist: 485 + logger.debug("ingest_list_update: playlist %s not found", uri) 486 + return 487 + 488 + if name := record.get("name"): 489 + playlist.name = name 490 + if cid: 491 + playlist.atproto_record_cid = cid 492 + if (items := record.get("items")) is not None: 493 + playlist.track_count = len(items) 494 + 495 + await db.commit() 496 + logfire.info("ingest: playlist updated", uri=uri) 497 + 498 + 499 + async def ingest_list_delete( 500 + did: str, 501 + rkey: str, 502 + uri: str, 503 + retry: ExponentialRetry = _INGEST_RETRY, 504 + ) -> None: 505 + """delete a playlist by its AT URI.""" 506 + async with db_session() as db: 507 + result = await db.execute( 508 + delete(Playlist).where(Playlist.atproto_record_uri == uri) 509 + ) 510 + if result.rowcount: # type: ignore[union-attr] 511 + await db.commit() 512 + logfire.info("ingest: playlist deleted", uri=uri) 513 + else: 514 + logger.debug("ingest_list_delete: playlist %s not found", uri) 515 + 516 + 517 + # --- profile task --- 518 + 519 + 520 + async def ingest_profile_update( 521 + did: str, 522 + record: dict, 523 + retry: ExponentialRetry = _INGEST_RETRY, 524 + ) -> None: 525 + """update artist bio from a profile record.""" 526 + if errors := validate_record("fm.plyr.actor.profile", record, partial=True): 527 + logfire.warn("ingest: invalid profile record, skipping", did=did, errors=errors) 528 + return 529 + 530 + async with db_session() as db: 531 + artist = await db.get(Artist, did) 532 + if not artist: 533 + logger.debug("ingest_profile_update: unknown artist %s", did) 534 + return 535 + 536 + if (bio := record.get("bio")) is not None: 537 + artist.bio = bio 538 + await db.commit() 539 + logfire.info("ingest: profile updated", did=did)
+126
backend/src/backend/utilities/lexicon.py
··· 1 + """ATProto lexicon record validation. 2 + 3 + validates records against the lexicon JSON schemas in the repo's 4 + ``lexicons/`` directory. intended for ingest-time screening — lenient 5 + on unknown fields and unknown lexicon IDs (pass-through). 6 + """ 7 + 8 + import json 9 + import logging 10 + from functools import lru_cache 11 + from pathlib import Path 12 + from typing import Any 13 + 14 + logger = logging.getLogger(__name__) 15 + 16 + _LEXICONS_DIR = Path(__file__).resolve().parents[4] / "lexicons" 17 + 18 + 19 + @lru_cache(maxsize=32) 20 + def _load_lexicon(lexicon_id: str) -> dict[str, Any] | None: 21 + """load a lexicon JSON by its NSID, using the last segment as filename.""" 22 + filename = lexicon_id.rsplit(".", 1)[-1] + ".json" 23 + path = _LEXICONS_DIR / filename 24 + if not path.exists(): 25 + logger.debug("lexicon file not found: %s", path) 26 + return None 27 + with open(path) as f: 28 + return json.load(f) 29 + 30 + 31 + def _get_record_schema(lexicon: dict[str, Any]) -> dict[str, Any] | None: 32 + """extract the record schema from defs.main.record.""" 33 + return lexicon.get("defs", {}).get("main", {}).get("record") 34 + 35 + 36 + def _validate_property(key: str, value: Any, prop_schema: dict[str, Any]) -> list[str]: 37 + """validate a single property value against its schema definition.""" 38 + errors: list[str] = [] 39 + prop_type = prop_schema.get("type", "") 40 + 41 + # type checks 42 + if prop_type == "string": 43 + if not isinstance(value, str): 44 + errors.append(f"{key}: expected string, got {type(value).__name__}") 45 + return errors 46 + if (min_len := prop_schema.get("minLength")) is not None and len( 47 + value 48 + ) < min_len: 49 + errors.append(f"{key}: length {len(value)} < minLength {min_len}") 50 + if (max_len := prop_schema.get("maxLength")) is not None and len( 51 + value 52 + ) > max_len: 53 + errors.append(f"{key}: length {len(value)} > maxLength {max_len}") 54 + 55 + elif prop_type == "integer": 56 + if not isinstance(value, int) or isinstance(value, bool): 57 + errors.append(f"{key}: expected integer, got {type(value).__name__}") 58 + return errors 59 + if (minimum := prop_schema.get("minimum")) is not None and value < minimum: 60 + errors.append(f"{key}: value {value} < minimum {minimum}") 61 + 62 + elif prop_type == "array": 63 + if not isinstance(value, list): 64 + errors.append(f"{key}: expected array, got {type(value).__name__}") 65 + return errors 66 + if (max_len := prop_schema.get("maxLength")) is not None and len( 67 + value 68 + ) > max_len: 69 + errors.append(f"{key}: length {len(value)} > maxLength {max_len}") 70 + 71 + elif prop_type in ("object", "blob"): 72 + if not isinstance(value, dict): 73 + errors.append(f"{key}: expected {prop_type}, got {type(value).__name__}") 74 + 75 + elif prop_type == "ref": 76 + ref = prop_schema.get("ref", "") 77 + if ref == "com.atproto.repo.strongRef": 78 + if not isinstance(value, dict): 79 + errors.append( 80 + f"{key}: expected strongRef object, got {type(value).__name__}" 81 + ) 82 + elif not isinstance(value.get("uri"), str): 83 + errors.append(f"{key}: strongRef missing 'uri' string") 84 + 85 + return errors 86 + 87 + 88 + def validate_record( 89 + lexicon_id: str, 90 + record: dict[str, Any], 91 + *, 92 + partial: bool = False, 93 + ) -> list[str]: 94 + """validate a record dict against its lexicon schema. 95 + 96 + returns a list of error strings. empty list means the record is valid. 97 + never raises — unknown lexicons pass through with no errors. 98 + 99 + args: 100 + lexicon_id: fully-qualified NSID (e.g. "fm.plyr.track") 101 + record: the ATProto record data 102 + partial: if True, skip required-field checks (for update operations) 103 + """ 104 + lexicon = _load_lexicon(lexicon_id) 105 + if lexicon is None: 106 + return [] 107 + 108 + schema = _get_record_schema(lexicon) 109 + if schema is None: 110 + return [] 111 + 112 + errors: list[str] = [] 113 + 114 + # required field checks (skip for partial/update) 115 + if not partial: 116 + for field in schema.get("required", []): 117 + if field not in record: 118 + errors.append(f"missing required field: {field}") 119 + 120 + # property validation 121 + properties = schema.get("properties", {}) 122 + for key, value in record.items(): 123 + if key in properties: 124 + errors.extend(_validate_property(key, value, properties[key])) 125 + 126 + return errors
+998
backend/tests/test_jetstream.py
··· 1 + """tests for Jetstream consumer and ingest tasks.""" 2 + 3 + import uuid 4 + from datetime import UTC, datetime 5 + from unittest.mock import AsyncMock, patch 6 + 7 + import pytest 8 + from sqlalchemy import select 9 + from sqlalchemy.ext.asyncio import AsyncSession 10 + 11 + from backend._internal.tasks.ingest import ( 12 + SubjectNotFoundError, 13 + ingest_comment_create, 14 + ingest_comment_delete, 15 + ingest_like_create, 16 + ingest_like_delete, 17 + ingest_list_create, 18 + ingest_list_update, 19 + ingest_track_create, 20 + ingest_track_delete, 21 + ingest_track_update, 22 + ) 23 + from backend.models import Artist, Playlist, Track, TrackComment, TrackLike 24 + 25 + 26 + def _recent_ts() -> str: 27 + """return a recent ISO timestamp that clear_database will clean up. 28 + 29 + ingest functions commit via their own db_session() — the test teardown's 30 + clear_database only deletes rows with created_at > test_start_time, so 31 + records with hardcoded past timestamps (e.g. 2025-01-01) would persist 32 + and cause FK constraint errors. 33 + """ 34 + return datetime.now(UTC).isoformat() 35 + 36 + 37 + # --- fixtures --- 38 + 39 + 40 + @pytest.fixture(autouse=True) 41 + def _mock_post_create_hooks(): 42 + """prevent ingest_track_create from reaching docket/redis during tests.""" 43 + with patch( 44 + "backend._internal.tasks.ingest.run_post_track_create_hooks", 45 + new_callable=AsyncMock, 46 + ): 47 + yield 48 + 49 + 50 + @pytest.fixture 51 + async def artist(db_session: AsyncSession) -> Artist: 52 + """create a test artist with a unique DID (xdist-safe).""" 53 + did = f"did:plc:jetstream_{uuid.uuid4().hex[:12]}" 54 + a = Artist( 55 + did=did, 56 + handle="testartist.bsky.social", 57 + display_name="Test Artist", 58 + pds_url="https://bsky.social", 59 + ) 60 + db_session.add(a) 61 + await db_session.commit() 62 + return a 63 + 64 + 65 + @pytest.fixture 66 + async def track(db_session: AsyncSession, artist: Artist) -> Track: 67 + """create a test track.""" 68 + t = Track( 69 + title="Test Track", 70 + file_id="abc123", 71 + file_type="mp3", 72 + artist_did=artist.did, 73 + r2_url="https://r2.example.com/abc123.mp3", 74 + atproto_record_uri=f"at://{artist.did}/fm.plyr.track/existing", 75 + atproto_record_cid="bafyexisting", 76 + audio_storage="r2", 77 + ) 78 + db_session.add(t) 79 + await db_session.commit() 80 + return t 81 + 82 + 83 + # --- track ingestion tests --- 84 + 85 + 86 + class TestIngestTrackCreate: 87 + async def test_creates_track( 88 + self, db_session: AsyncSession, artist: Artist 89 + ) -> None: 90 + """valid record creates a Track row.""" 91 + record = { 92 + "title": "Jetstream Track", 93 + "artist": "Test Artist", 94 + "fileId": "js_file_001", 95 + "fileType": "mp3", 96 + "audioUrl": "https://r2.example.com/js_file_001.mp3", 97 + "duration": 180, 98 + "createdAt": _recent_ts(), 99 + } 100 + uri = "at://did:plc:jetstream_test/fm.plyr.track/newtrack1" 101 + 102 + await ingest_track_create( 103 + did=artist.did, rkey="newtrack1", record=record, uri=uri, cid="bafynew" 104 + ) 105 + 106 + result = await db_session.execute( 107 + select(Track).where(Track.atproto_record_uri == uri) 108 + ) 109 + track = result.scalar_one() 110 + assert track.title == "Jetstream Track" 111 + assert track.file_id == "js_file_001" 112 + assert track.r2_url == "https://r2.example.com/js_file_001.mp3" 113 + assert track.audio_storage == "r2" 114 + assert track.extra.get("duration") == 180 115 + 116 + async def test_dedup_by_uri( 117 + self, db_session: AsyncSession, artist: Artist, track: Track 118 + ) -> None: 119 + """duplicate AT URI is silently skipped.""" 120 + assert track.atproto_record_uri is not None 121 + record = { 122 + "title": "Duplicate", 123 + "artist": "Test Artist", 124 + "audioUrl": "https://r2.example.com/dup.mp3", 125 + "fileType": "mp3", 126 + "createdAt": _recent_ts(), 127 + } 128 + await ingest_track_create( 129 + did=artist.did, 130 + rkey="existing", 131 + record=record, 132 + uri=track.atproto_record_uri, 133 + cid="bafydup", 134 + ) 135 + 136 + # should still be only 1 track with this URI 137 + result = await db_session.execute( 138 + select(Track).where(Track.atproto_record_uri == track.atproto_record_uri) 139 + ) 140 + assert len(result.scalars().all()) == 1 141 + 142 + async def test_unknown_artist_skipped(self, db_session: AsyncSession) -> None: 143 + """event for non-existent artist is silently skipped.""" 144 + await ingest_track_create( 145 + did="did:plc:nonexistent", 146 + rkey="rk1", 147 + record={ 148 + "title": "Ghost", 149 + "artist": "Nobody", 150 + "audioUrl": "https://r2.example.com/ghost.mp3", 151 + "fileType": "mp3", 152 + "createdAt": _recent_ts(), 153 + }, 154 + uri="at://did:plc:nonexistent/fm.plyr.track/rk1", 155 + cid="bafy", 156 + ) 157 + 158 + result = await db_session.execute( 159 + select(Track).where(Track.artist_did == "did:plc:nonexistent") 160 + ) 161 + assert result.scalar_one_or_none() is None 162 + 163 + async def test_both_audio_storage( 164 + self, db_session: AsyncSession, artist: Artist 165 + ) -> None: 166 + """track with audioBlob + audioUrl gets audio_storage='both'.""" 167 + record = { 168 + "title": "Both Track", 169 + "artist": "Test Artist", 170 + "fileId": "both_001", 171 + "fileType": "mp3", 172 + "audioBlob": {"ref": {"$link": "bafyaudioblob"}, "mimeType": "audio/mpeg"}, 173 + "audioUrl": "https://r2.example.com/both_001.mp3", 174 + "createdAt": _recent_ts(), 175 + } 176 + uri = "at://did:plc:jetstream_test/fm.plyr.track/both1" 177 + 178 + await ingest_track_create( 179 + did=artist.did, rkey="both1", record=record, uri=uri, cid="bafynew" 180 + ) 181 + 182 + result = await db_session.execute( 183 + select(Track).where(Track.atproto_record_uri == uri) 184 + ) 185 + track = result.scalar_one() 186 + assert track.audio_storage == "both" 187 + assert track.pds_blob_cid == "bafyaudioblob" 188 + assert track.r2_url == "https://r2.example.com/both_001.mp3" 189 + 190 + async def test_pds_only_audio_storage( 191 + self, db_session: AsyncSession, artist: Artist 192 + ) -> None: 193 + """track with audioBlob only (no audioUrl) gets audio_storage='pds'.""" 194 + record = { 195 + "title": "PDS Only Track", 196 + "artist": "Test Artist", 197 + "fileId": "pds_only_001", 198 + "fileType": "mp3", 199 + "audioBlob": {"ref": {"$link": "bafypdsonly"}, "mimeType": "audio/mpeg"}, 200 + "audioUrl": "https://placeholder.example.com/pds_only_001.mp3", 201 + "createdAt": _recent_ts(), 202 + } 203 + uri = "at://did:plc:jetstream_test/fm.plyr.track/pdsonly1" 204 + 205 + await ingest_track_create( 206 + did=artist.did, rkey="pdsonly1", record=record, uri=uri, cid="bafynew" 207 + ) 208 + 209 + result = await db_session.execute( 210 + select(Track).where(Track.atproto_record_uri == uri) 211 + ) 212 + track = result.scalar_one() 213 + # audioUrl is required by lexicon but audioBlob is canonical — both present = "both" 214 + assert track.audio_storage == "both" 215 + assert track.pds_blob_cid == "bafypdsonly" 216 + 217 + async def test_track_create_sets_support_gate( 218 + self, db_session: AsyncSession, artist: Artist 219 + ) -> None: 220 + """gated track record materializes support_gate on the DB row.""" 221 + record = { 222 + "title": "Gated Track", 223 + "artist": "Test Artist", 224 + "fileId": "gated_001", 225 + "fileType": "mp3", 226 + "audioUrl": "https://r2.example.com/gated_001.mp3", 227 + "supportGate": {"type": "any"}, 228 + "createdAt": _recent_ts(), 229 + } 230 + uri = "at://did:plc:jetstream_test/fm.plyr.track/gated1" 231 + 232 + await ingest_track_create( 233 + did=artist.did, rkey="gated1", record=record, uri=uri, cid="bafygated" 234 + ) 235 + 236 + result = await db_session.execute( 237 + select(Track).where(Track.atproto_record_uri == uri) 238 + ) 239 + track = result.scalar_one() 240 + assert track.support_gate == {"type": "any"} 241 + assert track.is_gated is True 242 + 243 + async def test_track_create_sets_features( 244 + self, db_session: AsyncSession, artist: Artist 245 + ) -> None: 246 + """featured artists from record are stored on the track.""" 247 + features = [{"did": "did:plc:feat1", "handle": "feat.bsky.social"}] 248 + record = { 249 + "title": "Featured Track", 250 + "artist": "Test Artist", 251 + "fileId": "feat_001", 252 + "fileType": "mp3", 253 + "audioUrl": "https://r2.example.com/feat_001.mp3", 254 + "features": features, 255 + "createdAt": _recent_ts(), 256 + } 257 + uri = "at://did:plc:jetstream_test/fm.plyr.track/feat1" 258 + 259 + await ingest_track_create( 260 + did=artist.did, rkey="feat1", record=record, uri=uri, cid="bafyfeat" 261 + ) 262 + 263 + result = await db_session.execute( 264 + select(Track).where(Track.atproto_record_uri == uri) 265 + ) 266 + track = result.scalar_one() 267 + assert track.features == features 268 + 269 + async def test_track_create_runs_hooks( 270 + self, db_session: AsyncSession, artist: Artist 271 + ) -> None: 272 + """ingest_track_create calls run_post_track_create_hooks with R2 URL.""" 273 + record = { 274 + "title": "Hooked Track", 275 + "artist": "Test Artist", 276 + "fileId": "hook_001", 277 + "fileType": "mp3", 278 + "audioUrl": "https://r2.example.com/hook_001.mp3", 279 + "createdAt": _recent_ts(), 280 + } 281 + uri = "at://did:plc:jetstream_test/fm.plyr.track/hook1" 282 + 283 + with patch( 284 + "backend._internal.tasks.ingest.run_post_track_create_hooks", 285 + new_callable=AsyncMock, 286 + ) as mock_hooks: 287 + await ingest_track_create( 288 + did=artist.did, rkey="hook1", record=record, uri=uri, cid="bafyhook" 289 + ) 290 + 291 + result = await db_session.execute( 292 + select(Track).where(Track.atproto_record_uri == uri) 293 + ) 294 + track = result.scalar_one() 295 + mock_hooks.assert_called_once_with( 296 + track.id, audio_url="https://r2.example.com/hook_001.mp3" 297 + ) 298 + 299 + async def test_track_create_runs_hooks_both( 300 + self, db_session: AsyncSession, artist: Artist 301 + ) -> None: 302 + """with both audioBlob + audioUrl, hooks get R2 URL (CDN fallback).""" 303 + record = { 304 + "title": "Both Hooked", 305 + "artist": "Test Artist", 306 + "fileId": "both_hook_001", 307 + "fileType": "mp3", 308 + "audioBlob": {"ref": {"$link": "bafypdsblob"}, "mimeType": "audio/mpeg"}, 309 + "audioUrl": "https://r2.example.com/both_hook_001.mp3", 310 + "createdAt": _recent_ts(), 311 + } 312 + uri = "at://did:plc:jetstream_test/fm.plyr.track/bothhook1" 313 + 314 + with patch( 315 + "backend._internal.tasks.ingest.run_post_track_create_hooks", 316 + new_callable=AsyncMock, 317 + ) as mock_hooks: 318 + await ingest_track_create( 319 + did=artist.did, rkey="bothhook1", record=record, uri=uri, cid="bafyh" 320 + ) 321 + 322 + mock_hooks.assert_called_once() 323 + call_audio_url = mock_hooks.call_args[1]["audio_url"] 324 + # R2 URL preferred over PDS blob when both are available 325 + assert call_audio_url == "https://r2.example.com/both_hook_001.mp3" 326 + 327 + 328 + class TestIngestTrackDelete: 329 + async def test_deletes_by_uri( 330 + self, db_session: AsyncSession, artist: Artist, track: Track 331 + ) -> None: 332 + """deletes track by AT URI.""" 333 + assert track.atproto_record_uri is not None 334 + await ingest_track_delete( 335 + did=artist.did, 336 + rkey="existing", 337 + uri=track.atproto_record_uri, 338 + ) 339 + 340 + result = await db_session.execute( 341 + select(Track).where(Track.atproto_record_uri == track.atproto_record_uri) 342 + ) 343 + assert result.scalar_one_or_none() is None 344 + 345 + 346 + class TestIngestTrackUpdate: 347 + async def test_updates_mutable_fields( 348 + self, db_session: AsyncSession, artist: Artist, track: Track 349 + ) -> None: 350 + """updates title and description.""" 351 + assert track.atproto_record_uri is not None 352 + uri = track.atproto_record_uri 353 + await ingest_track_update( 354 + did=artist.did, 355 + rkey="existing", 356 + record={"title": "Updated Title", "description": "New desc"}, 357 + uri=uri, 358 + cid="bafyupdated", 359 + ) 360 + 361 + # expire cached objects so the re-query hits the DB 362 + db_session.expire_all() 363 + result = await db_session.execute( 364 + select(Track).where(Track.atproto_record_uri == uri) 365 + ) 366 + updated = result.scalar_one() 367 + assert updated.title == "Updated Title" 368 + assert updated.description == "New desc" 369 + assert updated.atproto_record_cid == "bafyupdated" 370 + 371 + async def test_updates_support_gate( 372 + self, db_session: AsyncSession, artist: Artist, track: Track 373 + ) -> None: 374 + """external supportGate change propagates to DB.""" 375 + assert track.atproto_record_uri is not None 376 + uri = track.atproto_record_uri 377 + await ingest_track_update( 378 + did=artist.did, 379 + rkey="existing", 380 + record={"supportGate": {"type": "any"}}, 381 + uri=uri, 382 + cid="bafygated", 383 + ) 384 + 385 + db_session.expire_all() 386 + result = await db_session.execute( 387 + select(Track).where(Track.atproto_record_uri == uri) 388 + ) 389 + updated = result.scalar_one() 390 + assert updated.support_gate == {"type": "any"} 391 + assert updated.is_gated is True 392 + 393 + async def test_removes_support_gate( 394 + self, db_session: AsyncSession, artist: Artist, track: Track 395 + ) -> None: 396 + """supportGate present as None in record clears gating.""" 397 + assert track.atproto_record_uri is not None 398 + uri = track.atproto_record_uri 399 + 400 + # first set support_gate 401 + track.support_gate = {"type": "any"} 402 + await db_session.commit() 403 + 404 + await ingest_track_update( 405 + did=artist.did, 406 + rkey="existing", 407 + record={"supportGate": None}, 408 + uri=uri, 409 + cid="bafyungated", 410 + ) 411 + 412 + db_session.expire_all() 413 + result = await db_session.execute( 414 + select(Track).where(Track.atproto_record_uri == uri) 415 + ) 416 + updated = result.scalar_one() 417 + assert updated.support_gate is None 418 + assert updated.is_gated is False 419 + 420 + async def test_updates_features( 421 + self, db_session: AsyncSession, artist: Artist, track: Track 422 + ) -> None: 423 + """features array propagates to DB.""" 424 + assert track.atproto_record_uri is not None 425 + uri = track.atproto_record_uri 426 + features = [{"did": "did:plc:feat1", "handle": "feat.bsky.social"}] 427 + await ingest_track_update( 428 + did=artist.did, 429 + rkey="existing", 430 + record={"features": features}, 431 + uri=uri, 432 + cid="bafyfeat", 433 + ) 434 + 435 + db_session.expire_all() 436 + result = await db_session.execute( 437 + select(Track).where(Track.atproto_record_uri == uri) 438 + ) 439 + updated = result.scalar_one() 440 + assert updated.features == features 441 + 442 + async def test_updates_extra_fields( 443 + self, db_session: AsyncSession, artist: Artist, track: Track 444 + ) -> None: 445 + """album and duration propagate to track.extra.""" 446 + assert track.atproto_record_uri is not None 447 + uri = track.atproto_record_uri 448 + await ingest_track_update( 449 + did=artist.did, 450 + rkey="existing", 451 + record={"album": "New Album", "duration": 240}, 452 + uri=uri, 453 + cid="bafyextra", 454 + ) 455 + 456 + db_session.expire_all() 457 + result = await db_session.execute( 458 + select(Track).where(Track.atproto_record_uri == uri) 459 + ) 460 + updated = result.scalar_one() 461 + assert updated.extra is not None 462 + assert updated.extra.get("album") == "New Album" 463 + assert updated.extra.get("duration") == 240 464 + 465 + async def test_updates_audio_storage_to_pds( 466 + self, db_session: AsyncSession, artist: Artist, track: Track 467 + ) -> None: 468 + """external audioBlob change updates storage fields.""" 469 + assert track.atproto_record_uri is not None 470 + assert track.audio_storage == "r2" 471 + uri = track.atproto_record_uri 472 + await ingest_track_update( 473 + did=artist.did, 474 + rkey="existing", 475 + record={ 476 + "audioBlob": { 477 + "ref": {"$link": "bafynewblob"}, 478 + "mimeType": "audio/mpeg", 479 + }, 480 + }, 481 + uri=uri, 482 + cid="bafyaudio", 483 + ) 484 + 485 + db_session.expire_all() 486 + result = await db_session.execute( 487 + select(Track).where(Track.atproto_record_uri == uri) 488 + ) 489 + updated = result.scalar_one() 490 + assert updated.audio_storage == "pds" 491 + assert updated.pds_blob_cid == "bafynewblob" 492 + assert updated.r2_url is None 493 + 494 + async def test_updates_audio_storage_to_both( 495 + self, db_session: AsyncSession, artist: Artist, track: Track 496 + ) -> None: 497 + """audioBlob + audioUrl together set audio_storage='both'.""" 498 + assert track.atproto_record_uri is not None 499 + uri = track.atproto_record_uri 500 + await ingest_track_update( 501 + did=artist.did, 502 + rkey="existing", 503 + record={ 504 + "audioBlob": { 505 + "ref": {"$link": "bafybothblob"}, 506 + "mimeType": "audio/mpeg", 507 + }, 508 + "audioUrl": "https://r2.example.com/both.mp3", 509 + }, 510 + uri=uri, 511 + cid="bafyboth", 512 + ) 513 + 514 + db_session.expire_all() 515 + result = await db_session.execute( 516 + select(Track).where(Track.atproto_record_uri == uri) 517 + ) 518 + updated = result.scalar_one() 519 + assert updated.audio_storage == "both" 520 + assert updated.pds_blob_cid == "bafybothblob" 521 + assert updated.r2_url == "https://r2.example.com/both.mp3" 522 + 523 + async def test_updates_audio_url( 524 + self, db_session: AsyncSession, artist: Artist, track: Track 525 + ) -> None: 526 + """external audioUrl change updates r2_url.""" 527 + assert track.atproto_record_uri is not None 528 + uri = track.atproto_record_uri 529 + await ingest_track_update( 530 + did=artist.did, 531 + rkey="existing", 532 + record={"audioUrl": "https://r2.example.com/new_url.mp3"}, 533 + uri=uri, 534 + cid="bafyurl", 535 + ) 536 + 537 + db_session.expire_all() 538 + result = await db_session.execute( 539 + select(Track).where(Track.atproto_record_uri == uri) 540 + ) 541 + updated = result.scalar_one() 542 + assert updated.r2_url == "https://r2.example.com/new_url.mp3" 543 + assert updated.audio_storage == "r2" 544 + 545 + async def test_updates_file_type( 546 + self, db_session: AsyncSession, artist: Artist, track: Track 547 + ) -> None: 548 + """external fileType change propagates.""" 549 + assert track.atproto_record_uri is not None 550 + uri = track.atproto_record_uri 551 + await ingest_track_update( 552 + did=artist.did, 553 + rkey="existing", 554 + record={"fileType": "flac"}, 555 + uri=uri, 556 + cid="bafytype", 557 + ) 558 + 559 + db_session.expire_all() 560 + result = await db_session.execute( 561 + select(Track).where(Track.atproto_record_uri == uri) 562 + ) 563 + updated = result.scalar_one() 564 + assert updated.file_type == "flac" 565 + 566 + 567 + # --- like ingestion tests --- 568 + 569 + 570 + class TestIngestLikeCreate: 571 + async def test_creates_like( 572 + self, db_session: AsyncSession, artist: Artist, track: Track 573 + ) -> None: 574 + """valid like record creates TrackLike.""" 575 + record = { 576 + "subject": { 577 + "uri": track.atproto_record_uri, 578 + "cid": track.atproto_record_cid, 579 + }, 580 + "createdAt": _recent_ts(), 581 + } 582 + uri = "at://did:plc:jetstream_test/fm.plyr.like/like1" 583 + 584 + await ingest_like_create(did=artist.did, rkey="like1", record=record, uri=uri) 585 + 586 + result = await db_session.execute( 587 + select(TrackLike).where(TrackLike.atproto_like_uri == uri) 588 + ) 589 + like = result.scalar_one() 590 + assert like.track_id == track.id 591 + assert like.user_did == artist.did 592 + 593 + async def test_raises_on_unknown_track( 594 + self, db_session: AsyncSession, artist: Artist 595 + ) -> None: 596 + """like for unknown subject track raises SubjectNotFoundError for retry.""" 597 + record = { 598 + "subject": {"uri": "at://did:plc:jetstream_test/fm.plyr.track/nonexistent"}, 599 + "createdAt": _recent_ts(), 600 + } 601 + with pytest.raises(SubjectNotFoundError): 602 + await ingest_like_create( 603 + did=artist.did, 604 + rkey="like2", 605 + record=record, 606 + uri="at://did:plc:jetstream_test/fm.plyr.like/like2", 607 + ) 608 + 609 + 610 + class TestIngestLikeDelete: 611 + async def test_deletes_by_uri( 612 + self, db_session: AsyncSession, artist: Artist, track: Track 613 + ) -> None: 614 + """deletes like by AT URI.""" 615 + like = TrackLike( 616 + track_id=track.id, 617 + user_did=artist.did, 618 + atproto_like_uri="at://did:plc:jetstream_test/fm.plyr.like/todelete", 619 + ) 620 + db_session.add(like) 621 + await db_session.commit() 622 + 623 + await ingest_like_delete( 624 + did=artist.did, 625 + rkey="todelete", 626 + uri="at://did:plc:jetstream_test/fm.plyr.like/todelete", 627 + ) 628 + 629 + result = await db_session.execute( 630 + select(TrackLike).where( 631 + TrackLike.atproto_like_uri 632 + == "at://did:plc:jetstream_test/fm.plyr.like/todelete" 633 + ) 634 + ) 635 + assert result.scalar_one_or_none() is None 636 + 637 + 638 + # --- comment ingestion tests --- 639 + 640 + 641 + class TestIngestCommentCreate: 642 + async def test_creates_comment( 643 + self, db_session: AsyncSession, artist: Artist, track: Track 644 + ) -> None: 645 + """valid comment record creates TrackComment.""" 646 + record = { 647 + "subject": {"uri": track.atproto_record_uri}, 648 + "text": "great track!", 649 + "timestampMs": 5000, 650 + "createdAt": _recent_ts(), 651 + } 652 + uri = "at://did:plc:jetstream_test/fm.plyr.comment/c1" 653 + 654 + await ingest_comment_create(did=artist.did, rkey="c1", record=record, uri=uri) 655 + 656 + result = await db_session.execute( 657 + select(TrackComment).where(TrackComment.atproto_comment_uri == uri) 658 + ) 659 + comment = result.scalar_one() 660 + assert comment.text == "great track!" 661 + assert comment.timestamp_ms == 5000 662 + 663 + async def test_raises_on_unknown_track( 664 + self, db_session: AsyncSession, artist: Artist 665 + ) -> None: 666 + """comment for unknown track raises SubjectNotFoundError for retry.""" 667 + record = { 668 + "subject": {"uri": "at://did:plc:jetstream_test/fm.plyr.track/nope"}, 669 + "text": "nope", 670 + "timestampMs": 0, 671 + "createdAt": _recent_ts(), 672 + } 673 + with pytest.raises(SubjectNotFoundError): 674 + await ingest_comment_create( 675 + did=artist.did, 676 + rkey="c2", 677 + record=record, 678 + uri="at://did:plc:jetstream_test/fm.plyr.comment/c2", 679 + ) 680 + 681 + 682 + class TestIngestCommentDelete: 683 + async def test_deletes_by_uri( 684 + self, db_session: AsyncSession, artist: Artist, track: Track 685 + ) -> None: 686 + """deletes comment by AT URI.""" 687 + comment = TrackComment( 688 + track_id=track.id, 689 + user_did=artist.did, 690 + text="to delete", 691 + timestamp_ms=0, 692 + atproto_comment_uri="at://did:plc:jetstream_test/fm.plyr.comment/del1", 693 + ) 694 + db_session.add(comment) 695 + await db_session.commit() 696 + 697 + await ingest_comment_delete( 698 + did=artist.did, 699 + rkey="del1", 700 + uri="at://did:plc:jetstream_test/fm.plyr.comment/del1", 701 + ) 702 + 703 + result = await db_session.execute( 704 + select(TrackComment).where( 705 + TrackComment.atproto_comment_uri 706 + == "at://did:plc:jetstream_test/fm.plyr.comment/del1" 707 + ) 708 + ) 709 + assert result.scalar_one_or_none() is None 710 + 711 + 712 + # --- playlist ingestion tests --- 713 + 714 + 715 + class TestIngestListCreate: 716 + async def test_creates_playlist( 717 + self, db_session: AsyncSession, artist: Artist 718 + ) -> None: 719 + """listType=playlist creates a Playlist row.""" 720 + record = { 721 + "listType": "playlist", 722 + "name": "My Playlist", 723 + "items": [], 724 + "createdAt": _recent_ts(), 725 + } 726 + uri = "at://did:plc:jetstream_test/fm.plyr.list/pl1" 727 + 728 + await ingest_list_create( 729 + did=artist.did, rkey="pl1", record=record, uri=uri, cid="bafypl" 730 + ) 731 + 732 + result = await db_session.execute( 733 + select(Playlist).where(Playlist.atproto_record_uri == uri) 734 + ) 735 + playlist = result.scalar_one() 736 + assert playlist.name == "My Playlist" 737 + assert playlist.owner_did == artist.did 738 + assert playlist.track_count == 0 739 + 740 + async def test_creates_playlist_with_items( 741 + self, db_session: AsyncSession, artist: Artist 742 + ) -> None: 743 + """track_count is set from items array length.""" 744 + items = [ 745 + {"subject": {"uri": f"at://x/fm.plyr.track/t{i}", "cid": f"bafy{i}"}} 746 + for i in range(3) 747 + ] 748 + record = { 749 + "listType": "playlist", 750 + "name": "Populated Playlist", 751 + "items": items, 752 + "createdAt": _recent_ts(), 753 + } 754 + uri = "at://did:plc:jetstream_test/fm.plyr.list/pl_items" 755 + 756 + await ingest_list_create( 757 + did=artist.did, rkey="pl_items", record=record, uri=uri, cid="bafyitems" 758 + ) 759 + 760 + result = await db_session.execute( 761 + select(Playlist).where(Playlist.atproto_record_uri == uri) 762 + ) 763 + playlist = result.scalar_one() 764 + assert playlist.track_count == 3 765 + 766 + async def test_skips_album_type( 767 + self, db_session: AsyncSession, artist: Artist 768 + ) -> None: 769 + """listType=album is not created as a Playlist.""" 770 + record = { 771 + "listType": "album", 772 + "name": "My Album", 773 + "items": [], 774 + "createdAt": _recent_ts(), 775 + } 776 + await ingest_list_create( 777 + did=artist.did, 778 + rkey="al1", 779 + record=record, 780 + uri="at://did:plc:jetstream_test/fm.plyr.list/al1", 781 + ) 782 + 783 + result = await db_session.execute( 784 + select(Playlist).where(Playlist.owner_did == artist.did) 785 + ) 786 + assert result.scalar_one_or_none() is None 787 + 788 + 789 + class TestIngestListUpdate: 790 + async def test_updates_name(self, db_session: AsyncSession, artist: Artist) -> None: 791 + """playlist name update propagates.""" 792 + # create playlist first 793 + record = { 794 + "listType": "playlist", 795 + "name": "Original", 796 + "items": [], 797 + "createdAt": _recent_ts(), 798 + } 799 + uri = "at://did:plc:jetstream_test/fm.plyr.list/pl_upd" 800 + await ingest_list_create( 801 + did=artist.did, rkey="pl_upd", record=record, uri=uri, cid="bafy1" 802 + ) 803 + 804 + await ingest_list_update( 805 + did=artist.did, 806 + rkey="pl_upd", 807 + record={"name": "Renamed"}, 808 + uri=uri, 809 + cid="bafy2", 810 + ) 811 + 812 + db_session.expire_all() 813 + result = await db_session.execute( 814 + select(Playlist).where(Playlist.atproto_record_uri == uri) 815 + ) 816 + playlist = result.scalar_one() 817 + assert playlist.name == "Renamed" 818 + 819 + async def test_updates_track_count( 820 + self, db_session: AsyncSession, artist: Artist 821 + ) -> None: 822 + """track_count updates when items change.""" 823 + record = { 824 + "listType": "playlist", 825 + "name": "Counting", 826 + "items": [], 827 + "createdAt": _recent_ts(), 828 + } 829 + uri = "at://did:plc:jetstream_test/fm.plyr.list/pl_count" 830 + await ingest_list_create( 831 + did=artist.did, rkey="pl_count", record=record, uri=uri, cid="bafy1" 832 + ) 833 + 834 + items = [ 835 + {"subject": {"uri": f"at://x/fm.plyr.track/t{i}", "cid": f"bafy{i}"}} 836 + for i in range(5) 837 + ] 838 + await ingest_list_update( 839 + did=artist.did, 840 + rkey="pl_count", 841 + record={"items": items}, 842 + uri=uri, 843 + cid="bafy2", 844 + ) 845 + 846 + db_session.expire_all() 847 + result = await db_session.execute( 848 + select(Playlist).where(Playlist.atproto_record_uri == uri) 849 + ) 850 + playlist = result.scalar_one() 851 + assert playlist.track_count == 5 852 + 853 + 854 + # --- ingest validation tests --- 855 + 856 + 857 + class TestIngestValidation: 858 + """integration tests confirming invalid records are rejected before DB work.""" 859 + 860 + async def test_track_empty_title_skipped( 861 + self, db_session: AsyncSession, artist: Artist 862 + ) -> None: 863 + """track with empty title (minLength violation) is skipped.""" 864 + record = { 865 + "title": "", 866 + "artist": "Test Artist", 867 + "audioUrl": "https://r2.example.com/x.mp3", 868 + "fileType": "mp3", 869 + "createdAt": _recent_ts(), 870 + } 871 + await ingest_track_create( 872 + did=artist.did, 873 + rkey="bad1", 874 + record=record, 875 + uri="at://did:plc:jetstream_test/fm.plyr.track/bad1", 876 + cid="bafy", 877 + ) 878 + result = await db_session.execute( 879 + select(Track).where( 880 + Track.atproto_record_uri 881 + == "at://did:plc:jetstream_test/fm.plyr.track/bad1" 882 + ) 883 + ) 884 + assert result.scalar_one_or_none() is None 885 + 886 + async def test_track_missing_required_fields_skipped( 887 + self, db_session: AsyncSession, artist: Artist 888 + ) -> None: 889 + """track missing required fields is skipped.""" 890 + await ingest_track_create( 891 + did=artist.did, 892 + rkey="bad2", 893 + record={"title": "ok"}, 894 + uri="at://did:plc:jetstream_test/fm.plyr.track/bad2", 895 + cid="bafy", 896 + ) 897 + result = await db_session.execute( 898 + select(Track).where( 899 + Track.atproto_record_uri 900 + == "at://did:plc:jetstream_test/fm.plyr.track/bad2" 901 + ) 902 + ) 903 + assert result.scalar_one_or_none() is None 904 + 905 + async def test_like_missing_subject_skipped( 906 + self, db_session: AsyncSession, artist: Artist 907 + ) -> None: 908 + """like without subject is skipped.""" 909 + await ingest_like_create( 910 + did=artist.did, 911 + rkey="bad3", 912 + record={"createdAt": "2025-01-01T00:00:00Z"}, 913 + uri="at://did:plc:jetstream_test/fm.plyr.like/bad3", 914 + ) 915 + result = await db_session.execute( 916 + select(TrackLike).where( 917 + TrackLike.atproto_like_uri 918 + == "at://did:plc:jetstream_test/fm.plyr.like/bad3" 919 + ) 920 + ) 921 + assert result.scalar_one_or_none() is None 922 + 923 + async def test_comment_text_too_long_skipped( 924 + self, db_session: AsyncSession, artist: Artist, track: Track 925 + ) -> None: 926 + """comment with text exceeding maxLength is skipped.""" 927 + await ingest_comment_create( 928 + did=artist.did, 929 + rkey="bad4", 930 + record={ 931 + "subject": {"uri": track.atproto_record_uri}, 932 + "text": "x" * 1001, 933 + "timestampMs": 0, 934 + "createdAt": _recent_ts(), 935 + }, 936 + uri="at://did:plc:jetstream_test/fm.plyr.comment/bad4", 937 + ) 938 + result = await db_session.execute( 939 + select(TrackComment).where( 940 + TrackComment.atproto_comment_uri 941 + == "at://did:plc:jetstream_test/fm.plyr.comment/bad4" 942 + ) 943 + ) 944 + assert result.scalar_one_or_none() is None 945 + 946 + async def test_valid_track_still_ingested( 947 + self, db_session: AsyncSession, artist: Artist 948 + ) -> None: 949 + """sanity check: valid record is still ingested normally.""" 950 + record = { 951 + "title": "Valid Track", 952 + "artist": "Test Artist", 953 + "audioUrl": "https://r2.example.com/valid.mp3", 954 + "fileType": "mp3", 955 + "createdAt": _recent_ts(), 956 + } 957 + uri = "at://did:plc:jetstream_test/fm.plyr.track/valid1" 958 + await ingest_track_create( 959 + did=artist.did, rkey="valid1", record=record, uri=uri, cid="bafy" 960 + ) 961 + result = await db_session.execute( 962 + select(Track).where(Track.atproto_record_uri == uri) 963 + ) 964 + assert result.scalar_one().title == "Valid Track" 965 + 966 + async def test_list_missing_items_skipped( 967 + self, db_session: AsyncSession, artist: Artist 968 + ) -> None: 969 + """list missing required items field is skipped.""" 970 + await ingest_list_create( 971 + did=artist.did, 972 + rkey="bad5", 973 + record={ 974 + "listType": "playlist", 975 + "name": "Bad List", 976 + "createdAt": _recent_ts(), 977 + }, 978 + uri="at://did:plc:jetstream_test/fm.plyr.list/bad5", 979 + ) 980 + result = await db_session.execute( 981 + select(Playlist).where( 982 + Playlist.atproto_record_uri 983 + == "at://did:plc:jetstream_test/fm.plyr.list/bad5" 984 + ) 985 + ) 986 + assert result.scalar_one_or_none() is None 987 + 988 + async def test_list_update_invalid_name_skipped( 989 + self, db_session: AsyncSession, artist: Artist 990 + ) -> None: 991 + """list update with name exceeding maxLength is skipped.""" 992 + await ingest_list_update( 993 + did=artist.did, 994 + rkey="bad6", 995 + record={"name": "x" * 300}, 996 + uri="at://did:plc:jetstream_test/fm.plyr.list/bad6", 997 + ) 998 + # nothing to assert on DB — just confirm no exception raised
+213
backend/tests/test_lexicon_validation.py
··· 1 + """pure unit tests for lexicon record validation.""" 2 + 3 + from backend.utilities.lexicon import validate_record 4 + 5 + 6 + class TestValidateTrack: 7 + def test_valid_record(self) -> None: 8 + record = { 9 + "title": "my song", 10 + "artist": "test artist", 11 + "audioUrl": "https://cdn.example.com/song.mp3", 12 + "fileType": "mp3", 13 + "createdAt": "2025-01-01T00:00:00Z", 14 + } 15 + assert validate_record("fm.plyr.track", record) == [] 16 + 17 + def test_missing_required_fields(self) -> None: 18 + errors = validate_record("fm.plyr.track", {}) 19 + assert any("title" in e for e in errors) 20 + assert any("artist" in e for e in errors) 21 + assert any("audioUrl" in e for e in errors) 22 + assert any("fileType" in e for e in errors) 23 + assert any("createdAt" in e for e in errors) 24 + 25 + def test_title_too_long(self) -> None: 26 + record = { 27 + "title": "x" * 300, 28 + "artist": "a", 29 + "audioUrl": "https://x.com/a.mp3", 30 + "fileType": "mp3", 31 + "createdAt": "2025-01-01T00:00:00Z", 32 + } 33 + errors = validate_record("fm.plyr.track", record) 34 + assert any("maxLength" in e and "title" in e for e in errors) 35 + 36 + def test_empty_title(self) -> None: 37 + record = { 38 + "title": "", 39 + "artist": "a", 40 + "audioUrl": "https://x.com/a.mp3", 41 + "fileType": "mp3", 42 + "createdAt": "2025-01-01T00:00:00Z", 43 + } 44 + errors = validate_record("fm.plyr.track", record) 45 + assert any("minLength" in e and "title" in e for e in errors) 46 + 47 + def test_wrong_type(self) -> None: 48 + record = { 49 + "title": 123, 50 + "artist": "a", 51 + "audioUrl": "https://x.com/a.mp3", 52 + "fileType": "mp3", 53 + "createdAt": "2025-01-01T00:00:00Z", 54 + } 55 + errors = validate_record("fm.plyr.track", record) 56 + assert any("expected string" in e and "title" in e for e in errors) 57 + 58 + def test_negative_duration(self) -> None: 59 + record = { 60 + "title": "ok", 61 + "artist": "a", 62 + "audioUrl": "https://x.com/a.mp3", 63 + "fileType": "mp3", 64 + "createdAt": "2025-01-01T00:00:00Z", 65 + "duration": -5, 66 + } 67 + errors = validate_record("fm.plyr.track", record) 68 + assert any("minimum" in e and "duration" in e for e in errors) 69 + 70 + def test_optional_fields_absent(self) -> None: 71 + """omitting optional fields is fine.""" 72 + record = { 73 + "title": "ok", 74 + "artist": "a", 75 + "audioUrl": "https://x.com/a.mp3", 76 + "fileType": "mp3", 77 + "createdAt": "2025-01-01T00:00:00Z", 78 + } 79 + assert validate_record("fm.plyr.track", record) == [] 80 + 81 + def test_description_max_length(self) -> None: 82 + record = { 83 + "title": "ok", 84 + "artist": "a", 85 + "audioUrl": "https://x.com/a.mp3", 86 + "fileType": "mp3", 87 + "createdAt": "2025-01-01T00:00:00Z", 88 + "description": "x" * 5001, 89 + } 90 + errors = validate_record("fm.plyr.track", record) 91 + assert any("maxLength" in e and "description" in e for e in errors) 92 + 93 + def test_features_max_length(self) -> None: 94 + record = { 95 + "title": "ok", 96 + "artist": "a", 97 + "audioUrl": "https://x.com/a.mp3", 98 + "fileType": "mp3", 99 + "createdAt": "2025-01-01T00:00:00Z", 100 + "features": [{"did": f"did:plc:{i}", "handle": f"u{i}"} for i in range(11)], 101 + } 102 + errors = validate_record("fm.plyr.track", record) 103 + assert any("maxLength" in e and "features" in e for e in errors) 104 + 105 + 106 + class TestValidateLike: 107 + def test_valid_like(self) -> None: 108 + record = { 109 + "subject": {"uri": "at://did:plc:x/fm.plyr.track/abc", "cid": "bafy"}, 110 + "createdAt": "2025-01-01T00:00:00Z", 111 + } 112 + assert validate_record("fm.plyr.like", record) == [] 113 + 114 + def test_missing_subject(self) -> None: 115 + record = {"createdAt": "2025-01-01T00:00:00Z"} 116 + errors = validate_record("fm.plyr.like", record) 117 + assert any("subject" in e for e in errors) 118 + 119 + def test_strong_ref_without_uri(self) -> None: 120 + record = { 121 + "subject": {"cid": "bafy"}, 122 + "createdAt": "2025-01-01T00:00:00Z", 123 + } 124 + errors = validate_record("fm.plyr.like", record) 125 + assert any("strongRef" in e and "uri" in e for e in errors) 126 + 127 + 128 + class TestValidateComment: 129 + def test_valid_comment(self) -> None: 130 + record = { 131 + "subject": {"uri": "at://did:plc:x/fm.plyr.track/abc", "cid": "bafy"}, 132 + "text": "great track!", 133 + "timestampMs": 5000, 134 + "createdAt": "2025-01-01T00:00:00Z", 135 + } 136 + assert validate_record("fm.plyr.comment", record) == [] 137 + 138 + def test_text_too_long(self) -> None: 139 + record = { 140 + "subject": {"uri": "at://did:plc:x/fm.plyr.track/abc", "cid": "bafy"}, 141 + "text": "x" * 1001, 142 + "timestampMs": 0, 143 + "createdAt": "2025-01-01T00:00:00Z", 144 + } 145 + errors = validate_record("fm.plyr.comment", record) 146 + assert any("maxLength" in e and "text" in e for e in errors) 147 + 148 + def test_negative_timestamp(self) -> None: 149 + record = { 150 + "subject": {"uri": "at://did:plc:x/fm.plyr.track/abc", "cid": "bafy"}, 151 + "text": "ok", 152 + "timestampMs": -1, 153 + "createdAt": "2025-01-01T00:00:00Z", 154 + } 155 + errors = validate_record("fm.plyr.comment", record) 156 + assert any("minimum" in e and "timestampMs" in e for e in errors) 157 + 158 + 159 + class TestValidateList: 160 + def test_valid_list(self) -> None: 161 + record = { 162 + "items": [], 163 + "createdAt": "2025-01-01T00:00:00Z", 164 + } 165 + assert validate_record("fm.plyr.list", record) == [] 166 + 167 + def test_items_exceeding_max(self) -> None: 168 + items = [ 169 + {"subject": {"uri": f"at://did:plc:x/fm.plyr.track/{i}", "cid": "bafy"}} 170 + for i in range(501) 171 + ] 172 + record = { 173 + "items": items, 174 + "createdAt": "2025-01-01T00:00:00Z", 175 + } 176 + errors = validate_record("fm.plyr.list", record) 177 + assert any("maxLength" in e and "items" in e for e in errors) 178 + 179 + 180 + class TestValidateProfile: 181 + def test_valid_profile(self) -> None: 182 + record = { 183 + "bio": "i make music", 184 + "createdAt": "2025-01-01T00:00:00Z", 185 + } 186 + assert validate_record("fm.plyr.actor.profile", record) == [] 187 + 188 + def test_bio_too_long(self) -> None: 189 + record = { 190 + "bio": "x" * 2561, 191 + "createdAt": "2025-01-01T00:00:00Z", 192 + } 193 + errors = validate_record("fm.plyr.actor.profile", record) 194 + assert any("maxLength" in e and "bio" in e for e in errors) 195 + 196 + 197 + class TestPartialValidation: 198 + def test_skips_required_fields(self) -> None: 199 + """partial mode doesn't flag missing required fields.""" 200 + assert validate_record("fm.plyr.track", {}, partial=True) == [] 201 + 202 + def test_still_checks_types(self) -> None: 203 + errors = validate_record("fm.plyr.track", {"title": 123}, partial=True) 204 + assert any("expected string" in e for e in errors) 205 + 206 + def test_still_checks_lengths(self) -> None: 207 + errors = validate_record("fm.plyr.track", {"title": ""}, partial=True) 208 + assert any("minLength" in e for e in errors) 209 + 210 + 211 + class TestUnknownLexicon: 212 + def test_unknown_id_returns_empty(self) -> None: 213 + assert validate_record("com.example.unknown", {"anything": "goes"}) == []
+8
loq.toml
··· 218 218 max_lines = 535 219 219 220 220 [[rules]] 221 + path = "backend/src/backend/_internal/tasks/ingest.py" 222 + max_lines = 540 223 + 224 + [[rules]] 225 + path = "backend/tests/test_jetstream.py" 226 + max_lines = 1000 227 + 228 + [[rules]] 221 229 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte" 222 230 max_lines = 580