audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: decompose lists.py and albums.py into subpackages (#1276)

* refactor: decompose lists.py and albums.py into subpackages, fix PDS URL healing

Split two monolithic API files into subpackages following the existing
api/tracks/ pattern:

- lists.py (1149 lines) → lists/{router,schemas,reorder,resolver,playlists}.py
- albums.py (995 lines) → albums/{router,schemas,cache,listing,mutations}.py

Also moves PDS URL healing from lazy per-request side effects (copy-pasted
in 5 API endpoints) to the jetstream identity event handler, where it
belongs. Identity events fire on both handle changes and PDS migrations,
so resolving the DID there keeps the cached pds_url warm proactively
instead of discovering staleness at request time.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: correct mock targets for decomposed module paths

- AsyncDidResolver: patch at source (atproto_identity.did.resolver)
since ingest.py uses a deferred import
- get_async_redis_client: update to backend.api.albums.cache

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: hoist deferred imports to top-level in decomposed modules

Move ~15 deferred imports to module-level where they don't risk circular
dependencies. The only remaining deferred import in the new packages is
backend.api.tracks.mutations.delete_track (cross-package API call).

Also keeps the AsyncDidResolver import in ingest.py deferred — it's a
heavy external dependency in a background task module.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: update mock targets for hoisted imports in album tests

With imports at top-level, mocks must target the importing module's
namespace, not the source module.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6 (1M context)
and committed by
GitHub
ec0d740b 920af3a9

+1448 -1279
+3 -3
backend/src/backend/_internal/jetstream.py
··· 28 28 ingest_comment_create, 29 29 ingest_comment_delete, 30 30 ingest_comment_update, 31 - ingest_handle_update, 31 + ingest_identity_update, 32 32 ingest_like_create, 33 33 ingest_like_delete, 34 34 ingest_list_create, ··· 140 140 handle = (event.get("identity") or {}).get("handle") 141 141 if did and handle and did in self._known_dids: 142 142 docket = get_docket() 143 - await docket.add(ingest_handle_update)(did=did, handle=handle) 143 + await docket.add(ingest_identity_update)(did=did, handle=handle) 144 144 logfire.info( 145 - "jetstream dispatched handle update", 145 + "jetstream dispatched identity update", 146 146 did=did, 147 147 handle=handle, 148 148 )
+36 -14
backend/src/backend/_internal/tasks/ingest.py
··· 712 712 # --- handle update task --- 713 713 714 714 715 - async def ingest_handle_update( 715 + async def ingest_identity_update( 716 716 did: str, 717 717 handle: str, 718 718 ) -> None: 719 - """update artist and session handles when an identity event arrives.""" 719 + """update artist handle and PDS URL when an identity event arrives. 720 + 721 + identity events fire on both handle changes and PDS migrations. 722 + resolving the DID gives us the current PDS URL, so we update both 723 + in one pass rather than lazily healing stale pds_url in every API 724 + endpoint that fetches ATProto records. 725 + """ 726 + from atproto_identity.did.resolver import AsyncDidResolver 727 + 720 728 async with db_session() as db: 721 729 artist = await db.get(Artist, did) 722 730 if not artist: 723 - logger.debug("ingest_handle_update: unknown artist %s", did) 731 + logger.debug("ingest_identity_update: unknown artist %s", did) 724 732 return 725 733 726 - if artist.handle == handle: 727 - return 734 + changes: dict[str, tuple[str | None, str | None]] = {} 728 735 729 - old_handle = artist.handle 730 - artist.handle = handle 736 + if artist.handle != handle: 737 + changes["handle"] = (artist.handle, handle) 738 + artist.handle = handle 731 739 732 - # update active sessions so session handles stay current 733 - await db.execute( 734 - update(UserSession).where(UserSession.did == did).values(handle=handle) 735 - ) 740 + # update active sessions so session handles stay current 741 + await db.execute( 742 + update(UserSession).where(UserSession.did == did).values(handle=handle) 743 + ) 744 + 745 + # resolve DID to get current PDS URL 746 + try: 747 + atproto_data = await AsyncDidResolver().resolve_atproto_data(did) 748 + resolved_pds = atproto_data.pds 749 + if resolved_pds and resolved_pds != artist.pds_url: 750 + changes["pds_url"] = (artist.pds_url, resolved_pds) 751 + artist.pds_url = resolved_pds 752 + except Exception as e: 753 + logger.warning( 754 + "ingest_identity_update: DID resolution failed for %s: %s", did, e 755 + ) 756 + 757 + if not changes: 758 + return 736 759 737 760 await db.commit() 738 761 logfire.info( 739 - "ingest: handle updated", 762 + "ingest: identity updated", 740 763 did=did, 741 - old_handle=old_handle, 742 - new_handle=handle, 764 + changes={k: {"old": v[0], "new": v[1]} for k, v in changes.items()}, 743 765 )
-995
backend/src/backend/api/albums.py
··· 1 - """albums api endpoints.""" 2 - 3 - import asyncio 4 - import contextlib 5 - import logging 6 - from datetime import datetime 7 - from typing import Annotated 8 - 9 - from fastapi import ( 10 - APIRouter, 11 - Depends, 12 - File, 13 - HTTPException, 14 - Query, 15 - UploadFile, 16 - ) 17 - from pydantic import BaseModel 18 - from sqlalchemy import func, select 19 - from sqlalchemy.ext.asyncio import AsyncSession 20 - from sqlalchemy.orm import selectinload 21 - 22 - from backend._internal import Session as AuthSession 23 - from backend._internal import get_optional_session, require_artist_profile 24 - from backend._internal.image_uploads import COVER_EXTENSIONS, process_image_upload 25 - from backend.models import Album, Artist, Track, TrackLike, get_db 26 - from backend.schemas import TrackResponse 27 - from backend.storage import storage 28 - from backend.utilities.aggregations import ( 29 - get_comment_counts, 30 - get_like_counts, 31 - get_track_tags, 32 - ) 33 - from backend.utilities.redis import get_async_redis_client 34 - from backend.utilities.slugs import slugify 35 - 36 - logger = logging.getLogger(__name__) 37 - 38 - router = APIRouter(prefix="/albums", tags=["albums"]) 39 - 40 - ALBUM_CACHE_PREFIX = "plyr:album:" 41 - ALBUM_CACHE_TTL_SECONDS = 300 # 5 minutes 42 - 43 - 44 - def _album_cache_key(handle: str, slug: str) -> str: 45 - return f"{ALBUM_CACHE_PREFIX}{handle}/{slug}" 46 - 47 - 48 - async def invalidate_album_cache(handle: str, slug: str) -> None: 49 - """delete cached album response. fails silently.""" 50 - try: 51 - redis = get_async_redis_client() 52 - await redis.delete(_album_cache_key(handle, slug)) 53 - except Exception: 54 - logger.debug("failed to invalidate album cache for %s/%s", handle, slug) 55 - 56 - 57 - async def invalidate_album_cache_by_id(db: AsyncSession, album_id: str) -> None: 58 - """look up album handle+slug and invalidate cache. fails silently.""" 59 - try: 60 - result = await db.execute( 61 - select(Album.slug, Artist.handle) 62 - .join(Artist, Album.artist_did == Artist.did) 63 - .where(Album.id == album_id) 64 - ) 65 - if row := result.first(): 66 - slug, handle = row 67 - await invalidate_album_cache(handle, slug) 68 - except Exception: 69 - logger.debug("failed to invalidate album cache by id %s", album_id) 70 - 71 - 72 - # Pydantic models defined first to avoid forward reference issues 73 - class AlbumMetadata(BaseModel): 74 - """album metadata response.""" 75 - 76 - id: str 77 - title: str 78 - slug: str 79 - description: str | None = None 80 - artist: str 81 - artist_handle: str 82 - artist_did: str 83 - track_count: int 84 - total_plays: int 85 - image_url: str | None 86 - list_uri: str | None = None # ATProto list record URI for reordering 87 - 88 - 89 - class AlbumResponse(BaseModel): 90 - """album detail response with tracks.""" 91 - 92 - metadata: AlbumMetadata 93 - tracks: list[dict] 94 - 95 - 96 - class AlbumListItem(BaseModel): 97 - """minimal album info for listing.""" 98 - 99 - id: str 100 - title: str 101 - slug: str 102 - artist: str 103 - artist_handle: str 104 - track_count: int 105 - 106 - 107 - class RemoveTrackFromAlbumResponse(BaseModel): 108 - """response for removing a track from an album.""" 109 - 110 - removed: bool = True 111 - track_id: int 112 - 113 - 114 - class DeleteAlbumResponse(BaseModel): 115 - """response for deleting an album.""" 116 - 117 - deleted: bool = True 118 - cascade: bool 119 - 120 - 121 - class ArtistAlbumListItem(BaseModel): 122 - """album info for a specific artist (used on artist pages).""" 123 - 124 - id: str 125 - title: str 126 - slug: str 127 - track_count: int 128 - total_plays: int 129 - image_url: str | None 130 - 131 - 132 - class AlbumCreatePayload(BaseModel): 133 - title: str 134 - slug: str | None = None 135 - description: str | None = None 136 - 137 - 138 - class AlbumUpdatePayload(BaseModel): 139 - title: str | None = None 140 - slug: str | None = None 141 - description: str | None = None 142 - 143 - 144 - class AlbumFinalizePayload(BaseModel): 145 - """request body for POST /albums/{id}/finalize. 146 - 147 - track_ids is the authoritative user-intended order for the album's 148 - ATProto list record. every id must belong to this album and have a 149 - completed PDS write (atproto_record_uri + cid set). 150 - """ 151 - 152 - track_ids: list[int] 153 - 154 - 155 - # Helper functions 156 - async def _album_stats(db: AsyncSession, album_id: str) -> tuple[int, int]: 157 - result = await db.execute( 158 - select( 159 - func.count(Track.id), 160 - func.coalesce(func.sum(Track.play_count), 0), 161 - ).where(Track.album_id == album_id) 162 - ) 163 - track_count, total_plays = result.one() 164 - return int(track_count or 0), int(total_plays or 0) 165 - 166 - 167 - async def _album_image_url(album: Album, artist: Artist | None = None) -> str | None: 168 - if album.image_url: 169 - return album.image_url 170 - if album.image_id: 171 - return await album.get_image_url() 172 - if artist and artist.avatar_url: 173 - return artist.avatar_url 174 - return None 175 - 176 - 177 - async def _album_list_item( 178 - album: Album, 179 - artist: Artist, 180 - track_count: int, 181 - total_plays: int, 182 - ) -> AlbumListItem: 183 - image_url = await _album_image_url(album, artist) 184 - return AlbumListItem( 185 - id=album.id, 186 - title=album.title, 187 - slug=album.slug, 188 - artist=artist.display_name, 189 - artist_handle=artist.handle, 190 - track_count=track_count, 191 - total_plays=total_plays, 192 - image_url=image_url, 193 - ) 194 - 195 - 196 - async def _artist_album_summary( 197 - album: Album, 198 - artist: Artist, 199 - track_count: int, 200 - total_plays: int, 201 - ) -> ArtistAlbumListItem: 202 - image_url = await _album_image_url(album, artist) 203 - return ArtistAlbumListItem( 204 - id=album.id, 205 - title=album.title, 206 - slug=album.slug, 207 - track_count=track_count, 208 - total_plays=total_plays, 209 - image_url=image_url, 210 - ) 211 - 212 - 213 - async def _album_metadata( 214 - album: Album, 215 - artist: Artist, 216 - track_count: int, 217 - total_plays: int, 218 - ) -> AlbumMetadata: 219 - image_url = await _album_image_url(album, artist) 220 - return AlbumMetadata( 221 - id=album.id, 222 - title=album.title, 223 - slug=album.slug, 224 - description=album.description, 225 - artist=artist.display_name, 226 - artist_handle=artist.handle, 227 - artist_did=artist.did, 228 - track_count=track_count, 229 - total_plays=total_plays, 230 - image_url=image_url, 231 - list_uri=album.atproto_record_uri, 232 - ) 233 - 234 - 235 - @router.get("/") 236 - async def list_albums( 237 - db: Annotated[AsyncSession, Depends(get_db)], 238 - ) -> dict[str, list[AlbumListItem]]: 239 - """list all albums with basic metadata. 240 - 241 - albums with zero tracks are hidden — they're either unfinalized drafts 242 - from the multi-track upload flow or legacy albums awaiting sync. only 243 - albums that have at least one track appear in public listings. 244 - """ 245 - stmt = ( 246 - select( 247 - Album, 248 - Artist, 249 - func.count(Track.id).label("track_count"), 250 - func.coalesce(func.sum(Track.play_count), 0).label("total_plays"), 251 - ) 252 - .join(Artist, Album.artist_did == Artist.did) 253 - .outerjoin(Track, Track.album_id == Album.id) 254 - .group_by(Album.id, Artist.did) 255 - .having(func.count(Track.id) > 0) 256 - .order_by(func.lower(Album.title)) 257 - ) 258 - 259 - result = await db.execute(stmt) 260 - albums: list[AlbumListItem] = [] 261 - for album, artist, track_count, total_plays in result: 262 - albums.append( 263 - await _album_list_item( 264 - album, 265 - artist, 266 - int(track_count or 0), 267 - int(total_plays or 0), 268 - ) 269 - ) 270 - 271 - return {"albums": albums} 272 - 273 - 274 - @router.get("/{handle}") 275 - async def list_artist_albums( 276 - handle: str, db: Annotated[AsyncSession, Depends(get_db)] 277 - ) -> dict[str, list[ArtistAlbumListItem]]: 278 - """list albums for a specific artist.""" 279 - artist_result = await db.execute(select(Artist).where(Artist.handle == handle)) 280 - artist = artist_result.scalar_one_or_none() 281 - if not artist: 282 - raise HTTPException(status_code=404, detail="artist not found") 283 - 284 - stmt = ( 285 - select( 286 - Album, 287 - func.count(Track.id).label("track_count"), 288 - func.coalesce(func.sum(Track.play_count), 0).label("total_plays"), 289 - ) 290 - .outerjoin(Track, Track.album_id == Album.id) 291 - .where(Album.artist_did == artist.did) 292 - .group_by(Album.id) 293 - .having(func.count(Track.id) > 0) 294 - .order_by(func.lower(Album.title)) 295 - ) 296 - result = await db.execute(stmt) 297 - 298 - album_items: list[ArtistAlbumListItem] = [] 299 - for album, track_count, total_plays in result: 300 - album_items.append( 301 - await _artist_album_summary( 302 - album, 303 - artist, 304 - int(track_count or 0), 305 - int(total_plays or 0), 306 - ) 307 - ) 308 - 309 - return {"albums": album_items} 310 - 311 - 312 - @router.get("/{handle}/{slug}") 313 - async def get_album( 314 - handle: str, 315 - slug: str, 316 - db: Annotated[AsyncSession, Depends(get_db)], 317 - session: AuthSession | None = Depends(get_optional_session), 318 - ) -> AlbumResponse: 319 - """get album details with tracks (ordered by ATProto list record or created_at).""" 320 - # check Redis cache first 321 - cache_key = _album_cache_key(handle, slug) 322 - try: 323 - redis = get_async_redis_client() 324 - if cached := await redis.get(cache_key): 325 - return AlbumResponse.model_validate_json(cached) 326 - except Exception: 327 - logger.debug("album cache read failed for %s/%s", handle, slug) 328 - 329 - from backend._internal.atproto.records import get_record_public_resilient 330 - 331 - # look up artist + album 332 - album_result = await db.execute( 333 - select(Album, Artist) 334 - .join(Artist, Album.artist_did == Artist.did) 335 - .where(Artist.handle == handle, Album.slug == slug) 336 - ) 337 - row = album_result.first() 338 - if not row: 339 - raise HTTPException(status_code=404, detail="album not found") 340 - 341 - album, artist = row 342 - 343 - pds_cache: dict[str, str | None] = {artist.did: artist.pds_url} 344 - 345 - # fetch all tracks for this album 346 - track_stmt = ( 347 - select(Track) 348 - .options(selectinload(Track.artist), selectinload(Track.album_rel)) 349 - .where(Track.album_id == album.id) 350 - ) 351 - track_result = await db.execute(track_stmt) 352 - all_tracks = list(track_result.scalars().all()) 353 - 354 - # determine track order: use ATProto list record if available 355 - ordered_tracks: list[Track] = [] 356 - if album.atproto_record_uri: 357 - try: 358 - record_data, resolved_pds_url = await get_record_public_resilient( 359 - record_uri=album.atproto_record_uri, 360 - pds_url=artist.pds_url, 361 - ) 362 - if resolved_pds_url: 363 - artist.pds_url = resolved_pds_url 364 - pds_cache[artist.did] = resolved_pds_url 365 - db.add(artist) 366 - await db.commit() 367 - 368 - items = record_data.get("value", {}).get("items", []) 369 - track_uris = [item.get("subject", {}).get("uri") for item in items] 370 - track_uris = [uri for uri in track_uris if uri] 371 - 372 - # build uri -> track map 373 - track_by_uri = {t.atproto_record_uri: t for t in all_tracks} 374 - 375 - # order tracks by ATProto list, append any not in list at end 376 - seen_ids = set() 377 - for uri in track_uris: 378 - if uri in track_by_uri: 379 - track = track_by_uri[uri] 380 - ordered_tracks.append(track) 381 - seen_ids.add(track.id) 382 - 383 - # append any tracks not in the ATProto list (fallback) 384 - for track in sorted(all_tracks, key=lambda t: t.created_at): 385 - if track.id not in seen_ids: 386 - ordered_tracks.append(track) 387 - 388 - except Exception as e: 389 - logger.warning(f"failed to fetch ATProto list for album ordering: {e}") 390 - # fallback to created_at order 391 - ordered_tracks = sorted(all_tracks, key=lambda t: t.created_at) 392 - else: 393 - # no ATProto record - order by created_at 394 - ordered_tracks = sorted(all_tracks, key=lambda t: t.created_at) 395 - 396 - tracks = ordered_tracks 397 - track_ids = [track.id for track in tracks] 398 - 399 - # batch fetch aggregations 400 - if track_ids: 401 - like_counts, comment_counts, track_tags = await asyncio.gather( 402 - get_like_counts(db, track_ids), 403 - get_comment_counts(db, track_ids), 404 - get_track_tags(db, track_ids), 405 - ) 406 - else: 407 - like_counts, comment_counts, track_tags = {}, {}, {} 408 - 409 - # get authenticated user's likes for this album's tracks only 410 - liked_track_ids: set[int] | None = None 411 - if session: 412 - if track_ids: 413 - liked_result = await db.execute( 414 - select(TrackLike.track_id).where( 415 - TrackLike.user_did == session.did, 416 - TrackLike.track_id.in_(track_ids), 417 - ) 418 - ) 419 - liked_track_ids = set(liked_result.scalars().all()) 420 - 421 - # build track responses (maintaining order) 422 - track_responses = await asyncio.gather( 423 - *[ 424 - TrackResponse.from_track( 425 - track, 426 - pds_cache.get(track.artist_did), 427 - liked_track_ids, 428 - like_counts, 429 - comment_counts, 430 - track_tags=track_tags, 431 - ) 432 - for track in tracks 433 - ] 434 - ) 435 - 436 - total_plays = sum(t.play_count for t in tracks) 437 - metadata = await _album_metadata(album, artist, len(tracks), total_plays) 438 - 439 - response = AlbumResponse( 440 - metadata=metadata, 441 - tracks=[t.model_dump(mode="json") for t in track_responses], 442 - ) 443 - 444 - # cache a depersonalized copy (is_liked zeroed out) 445 - try: 446 - redis = get_async_redis_client() 447 - cache_tracks = [{**t, "is_liked": False} for t in response.tracks] 448 - cacheable = AlbumResponse(metadata=response.metadata, tracks=cache_tracks) 449 - await redis.set( 450 - cache_key, cacheable.model_dump_json(), ex=ALBUM_CACHE_TTL_SECONDS 451 - ) 452 - except Exception: 453 - logger.debug("album cache write failed for %s/%s", handle, slug) 454 - 455 - return response 456 - 457 - 458 - @router.post("/") 459 - async def create_album( 460 - body: AlbumCreatePayload, 461 - db: Annotated[AsyncSession, Depends(get_db)], 462 - auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 463 - ) -> AlbumMetadata: 464 - """create an empty album shell for the multi-track upload flow. 465 - 466 - the ATProto list record is NOT written here — it is deferred to 467 - `POST /albums/{id}/finalize`, which runs after tracks have actually 468 - been published so a total upload failure doesn't leave a fake release 469 - behind. for the same reason, the `album_release` CollectionEvent is 470 - also deferred to finalize (first successful call only, deduped). 471 - 472 - idempotent on (artist_did, slug): if an album with the same slug 473 - already exists, the existing row is returned instead of failing. 474 - this preserves the "type an existing album name to add tracks to it" 475 - UX — see finalize_album for the append semantics. 476 - """ 477 - from sqlalchemy.exc import IntegrityError 478 - 479 - title = body.title.strip() 480 - if not title: 481 - raise HTTPException(status_code=400, detail="title is required") 482 - 483 - slug = body.slug.strip() if body.slug else slugify(title) 484 - if not slug: 485 - raise HTTPException(status_code=400, detail="invalid slug") 486 - 487 - description = body.description.strip() if body.description else None 488 - 489 - # lookup artist for the response payload 490 - artist_result = await db.execute( 491 - select(Artist).where(Artist.did == auth_session.did) 492 - ) 493 - artist = artist_result.scalar_one() 494 - 495 - # idempotent on (artist_did, slug) — matches get_or_create_album semantics 496 - existing_result = await db.execute( 497 - select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 498 - ) 499 - if existing := existing_result.scalar_one_or_none(): 500 - track_count, total_plays = await _album_stats(db, existing.id) 501 - return await _album_metadata(existing, artist, track_count, total_plays) 502 - 503 - album = Album( 504 - artist_did=artist.did, 505 - slug=slug, 506 - title=title, 507 - description=description, 508 - ) 509 - db.add(album) 510 - try: 511 - await db.flush() 512 - except IntegrityError: 513 - # concurrent create raced us — return the winning row 514 - await db.rollback() 515 - retry_result = await db.execute( 516 - select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 517 - ) 518 - album = retry_result.scalar_one() 519 - track_count, total_plays = await _album_stats(db, album.id) 520 - return await _album_metadata(album, artist, track_count, total_plays) 521 - 522 - await db.commit() 523 - await db.refresh(album) 524 - 525 - return await _album_metadata(album, artist, track_count=0, total_plays=0) 526 - 527 - 528 - @router.post("/{album_id}/cover") 529 - async def upload_album_cover( 530 - album_id: str, 531 - db: Annotated[AsyncSession, Depends(get_db)], 532 - auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 533 - image: UploadFile = File(...), 534 - ) -> dict[str, str | None]: 535 - """upload cover art for an album (requires authentication).""" 536 - # verify album exists and belongs to the authenticated artist 537 - result = await db.execute(select(Album).where(Album.id == album_id)) 538 - album = result.scalar_one_or_none() 539 - if not album: 540 - raise HTTPException(status_code=404, detail="album not found") 541 - if album.artist_did != auth_session.did: 542 - raise HTTPException( 543 - status_code=403, detail="you can only upload cover art for your own albums" 544 - ) 545 - 546 - try: 547 - uploaded = await process_image_upload( 548 - image, "album", allowed_extensions=COVER_EXTENSIONS 549 - ) 550 - 551 - # delete old image if exists (prevent R2 object leaks) 552 - if album.image_id and album.image_id != uploaded.image_id: 553 - with contextlib.suppress(Exception): 554 - if album.image_url: 555 - await storage.delete_image(album.image_id, album.image_url) 556 - else: 557 - await storage.delete(album.image_id) 558 - 559 - # update album with new image 560 - album.image_id = uploaded.image_id 561 - album.image_url = uploaded.image_url 562 - album.thumbnail_url = uploaded.thumbnail_url 563 - await db.commit() 564 - 565 - await invalidate_album_cache(auth_session.handle, album.slug) 566 - 567 - return { 568 - "image_url": uploaded.image_url, 569 - "image_id": uploaded.image_id, 570 - "thumbnail_url": uploaded.thumbnail_url, 571 - } 572 - 573 - except HTTPException: 574 - raise 575 - except Exception as e: 576 - raise HTTPException( 577 - status_code=500, detail=f"failed to upload image: {e!s}" 578 - ) from e 579 - 580 - 581 - @router.post("/{album_id}/finalize") 582 - async def finalize_album( 583 - album_id: str, 584 - body: AlbumFinalizePayload, 585 - db: Annotated[AsyncSession, Depends(get_db)], 586 - auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 587 - ) -> AlbumMetadata: 588 - """write the album's ATProto list record using an explicit track order. 589 - 590 - called by the frontend after per-track uploads have settled. this is 591 - the single place the list record is created/updated for albums built 592 - via `POST /albums/` + `POST /tracks/?album_id=...`. 593 - 594 - append semantics: `track_ids` carries only the tracks from the current 595 - upload session. any tracks already on the album that are NOT in 596 - `track_ids` are preserved in the list record at their current positions 597 - (fetched from the existing list record if present, falling back to 598 - created_at order). new tracks are appended at the end in the order 599 - requested. this matches the "type an existing album name to add tracks 600 - to it" UX without truncating prior track history. 601 - 602 - also emits an `album_release` CollectionEvent on the first successful 603 - finalize for the album — so total upload failures don't leave a fake 604 - release event in the activity feed. 605 - """ 606 - from backend._internal.atproto.records import get_record_public_resilient 607 - from backend._internal.atproto.records.fm_plyr.list import ( 608 - upsert_album_list_record, 609 - ) 610 - from backend.models import CollectionEvent 611 - 612 - if not body.track_ids: 613 - raise HTTPException(status_code=400, detail="track_ids must not be empty") 614 - 615 - # verify album ownership 616 - album_result = await db.execute(select(Album).where(Album.id == album_id)) 617 - album = album_result.scalar_one_or_none() 618 - if not album: 619 - raise HTTPException(status_code=404, detail="album not found") 620 - if album.artist_did != auth_session.did: 621 - raise HTTPException( 622 - status_code=403, detail="you can only finalize your own albums" 623 - ) 624 - 625 - # fetch the requested tracks for validation 626 - requested_result = await db.execute( 627 - select(Track).where(Track.id.in_(body.track_ids)) 628 - ) 629 - requested_by_id = {t.id: t for t in requested_result.scalars().all()} 630 - 631 - # validate: every requested id exists, belongs to this album, and has a 632 - # completed PDS write. surface specific errors so the frontend can retry 633 - # or message the user precisely. 634 - missing = [tid for tid in body.track_ids if tid not in requested_by_id] 635 - if missing: 636 - raise HTTPException(status_code=400, detail=f"track(s) not found: {missing}") 637 - 638 - wrong_album = [ 639 - tid for tid in body.track_ids if requested_by_id[tid].album_id != album_id 640 - ] 641 - if wrong_album: 642 - raise HTTPException( 643 - status_code=400, 644 - detail=f"track(s) do not belong to this album: {wrong_album}", 645 - ) 646 - 647 - missing_pds = [ 648 - tid 649 - for tid in body.track_ids 650 - if not requested_by_id[tid].atproto_record_uri 651 - or not requested_by_id[tid].atproto_record_cid 652 - ] 653 - if missing_pds: 654 - raise HTTPException( 655 - status_code=400, 656 - detail=( 657 - f"track(s) missing PDS record (upload may still be in flight): " 658 - f"{missing_pds}" 659 - ), 660 - ) 661 - 662 - # fetch ALL PDS-ref'd tracks already on this album — these may include 663 - # tracks from prior upload sessions that the current request doesn't 664 - # mention and must be preserved in the list record. 665 - existing_result = await db.execute( 666 - select(Track).where( 667 - Track.album_id == album_id, 668 - Track.atproto_record_uri.isnot(None), 669 - Track.atproto_record_cid.isnot(None), 670 - ) 671 - ) 672 - all_album_tracks = {t.id: t for t in existing_result.scalars().all()} 673 - 674 - # partition: preserved (existing, not in this request) vs new (in this request). 675 - # a track id that appears in both sets is treated as "new" so a repeat finalize 676 - # with the same ids rewrites the order deterministically. 677 - requested_set = set(body.track_ids) 678 - preserved_tracks = [ 679 - t for tid, t in all_album_tracks.items() if tid not in requested_set 680 - ] 681 - 682 - # determine preserved order: if the album already has a list record, honor 683 - # its current item order (which captures any manual reorderings the owner 684 - # made from the album edit page). fall back to created_at for tracks not 685 - # in the existing list, or if the PDS fetch fails entirely. 686 - preserved_position_by_uri: dict[str, int] = {} 687 - if album.atproto_record_uri and preserved_tracks: 688 - try: 689 - artist_lookup = await db.execute( 690 - select(Artist).where(Artist.did == album.artist_did) 691 - ) 692 - artist_for_pds = artist_lookup.scalar_one() 693 - record_data, _ = await get_record_public_resilient( 694 - record_uri=album.atproto_record_uri, 695 - pds_url=artist_for_pds.pds_url, 696 - ) 697 - items = record_data.get("value", {}).get("items", []) 698 - for i, item in enumerate(items): 699 - uri = item.get("subject", {}).get("uri") 700 - if uri: 701 - preserved_position_by_uri[uri] = i 702 - except Exception as e: 703 - logger.debug( 704 - f"finalize_album: failed to fetch existing list for preserved " 705 - f"track order on {album_id}: {e}" 706 - ) 707 - 708 - def _preserved_sort_key(t: Track) -> tuple[int, datetime]: 709 - # tracks already in the existing list: keep their position 710 - # tracks not in the existing list (or if fetch failed): sort by created_at 711 - # after all positioned items 712 - pos = preserved_position_by_uri.get(t.atproto_record_uri or "", 10_000_000) 713 - return (pos, t.created_at) 714 - 715 - preserved_tracks.sort(key=_preserved_sort_key) 716 - 717 - # build the final list: preserved (existing, at front) + new (in requested order) 718 - final_order: list[Track] = list(preserved_tracks) + [ 719 - requested_by_id[tid] for tid in body.track_ids 720 - ] 721 - 722 - # strongRefs in final order (the validation above guarantees these are 723 - # non-None for the requested tracks; preserved tracks were filtered at 724 - # fetch time, but narrow for the type checker) 725 - track_refs: list[dict[str, str]] = [] 726 - for t in final_order: 727 - assert t.atproto_record_uri is not None 728 - assert t.atproto_record_cid is not None 729 - track_refs.append({"uri": t.atproto_record_uri, "cid": t.atproto_record_cid}) 730 - 731 - try: 732 - result = await upsert_album_list_record( 733 - auth_session, 734 - album_id=album_id, 735 - album_title=album.title, 736 - track_refs=track_refs, 737 - existing_uri=album.atproto_record_uri, 738 - existing_created_at=album.created_at, 739 - ) 740 - except Exception as e: 741 - logger.warning(f"failed to write album list record for {album_id}: {e}") 742 - raise HTTPException( 743 - status_code=500, detail=f"failed to write album list record: {e}" 744 - ) from e 745 - 746 - if result: 747 - album.atproto_record_uri = result[0] 748 - album.atproto_record_cid = result[1] 749 - 750 - # emit album_release CollectionEvent on the first successful finalize only. 751 - # deferred from create_album so a total upload failure doesn't publish a 752 - # fake release event. deduped by checking for any existing event. 753 - existing_event = await db.execute( 754 - select(CollectionEvent).where( 755 - CollectionEvent.album_id == album_id, 756 - CollectionEvent.event_type == "album_release", 757 - ) 758 - ) 759 - if not existing_event.scalar_one_or_none(): 760 - db.add( 761 - CollectionEvent( 762 - event_type="album_release", 763 - actor_did=auth_session.did, 764 - album_id=album_id, 765 - ) 766 - ) 767 - 768 - await db.commit() 769 - 770 - await invalidate_album_cache(auth_session.handle, album.slug) 771 - 772 - artist_result = await db.execute( 773 - select(Artist).where(Artist.did == album.artist_did) 774 - ) 775 - artist = artist_result.scalar_one() 776 - track_count, total_plays = await _album_stats(db, album_id) 777 - return await _album_metadata(album, artist, track_count, total_plays) 778 - 779 - 780 - @router.patch("/{album_id}") 781 - async def update_album( 782 - album_id: str, 783 - db: Annotated[AsyncSession, Depends(get_db)], 784 - auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 785 - title: Annotated[str | None, Query(description="new album title")] = None, 786 - description: Annotated[ 787 - str | None, Query(description="new album description") 788 - ] = None, 789 - ) -> AlbumMetadata: 790 - """update album metadata (title, description). syncs ATProto records on title change.""" 791 - from backend._internal.atproto.records.fm_plyr.list import update_list_record 792 - from backend._internal.atproto.records.fm_plyr.track import ( 793 - build_track_record, 794 - update_record, 795 - ) 796 - 797 - result = await db.execute( 798 - select(Album) 799 - .where(Album.id == album_id) 800 - .options(selectinload(Album.tracks).selectinload(Track.artist)) 801 - ) 802 - album = result.scalar_one_or_none() 803 - if not album: 804 - raise HTTPException(status_code=404, detail="album not found") 805 - if album.artist_did != auth_session.did: 806 - raise HTTPException( 807 - status_code=403, detail="you can only update your own albums" 808 - ) 809 - 810 - old_title = album.title 811 - old_slug = album.slug 812 - title_changed = title is not None and title.strip() != old_title 813 - 814 - if title is not None: 815 - album.title = title.strip() 816 - # sync slug when title changes so get_or_create_album lookups work 817 - if title_changed: 818 - album.slug = slugify(title.strip()) 819 - if description is not None: 820 - album.description = description.strip() if description.strip() else None 821 - 822 - # if title changed, update all tracks' extra["album"] and ATProto records 823 - if title_changed and title is not None: 824 - new_title = title.strip() 825 - 826 - for track in album.tracks: 827 - # update the track's extra["album"] field 828 - if track.extra is None: 829 - track.extra = {} 830 - track.extra = {**track.extra, "album": new_title} 831 - 832 - # update ATProto record if track has one 833 - if track.atproto_record_uri and track.r2_url and track.file_type: 834 - updated_record = build_track_record( 835 - title=track.title, 836 - artist=track.artist.display_name, 837 - audio_url=track.r2_url, 838 - file_type=track.file_type, 839 - album=new_title, 840 - duration=track.duration, 841 - features=track.features if track.features else None, 842 - image_url=await track.get_image_url(), 843 - ) 844 - 845 - _, new_cid = await update_record( 846 - auth_session=auth_session, 847 - record_uri=track.atproto_record_uri, 848 - record=updated_record, 849 - ) 850 - track.atproto_record_cid = new_cid 851 - 852 - # update the album's ATProto list record name 853 - if album.atproto_record_uri: 854 - track_refs = [ 855 - {"uri": t.atproto_record_uri, "cid": t.atproto_record_cid} 856 - for t in album.tracks 857 - if t.atproto_record_uri and t.atproto_record_cid 858 - ] 859 - _, new_list_cid = await update_list_record( 860 - auth_session=auth_session, 861 - list_uri=album.atproto_record_uri, 862 - items=track_refs, 863 - name=new_title, 864 - list_type="album", 865 - created_at=album.created_at, 866 - ) 867 - album.atproto_record_cid = new_list_cid 868 - 869 - await db.commit() 870 - 871 - # invalidate cache for old slug (new slug will be a cache miss) 872 - await invalidate_album_cache(auth_session.handle, old_slug) 873 - if album.slug != old_slug: 874 - await invalidate_album_cache(auth_session.handle, album.slug) 875 - 876 - # fetch artist for response 877 - artist_result = await db.execute( 878 - select(Artist).where(Artist.did == album.artist_did) 879 - ) 880 - artist = artist_result.scalar_one() 881 - track_count, total_plays = await _album_stats(db, album_id) 882 - 883 - return await _album_metadata(album, artist, track_count, total_plays) 884 - 885 - 886 - @router.delete("/{album_id}/tracks/{track_id}") 887 - async def remove_track_from_album( 888 - album_id: str, 889 - track_id: int, 890 - db: Annotated[AsyncSession, Depends(get_db)], 891 - auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 892 - ) -> RemoveTrackFromAlbumResponse: 893 - """remove a track from an album (orphan it, don't delete). 894 - 895 - the track remains available as a standalone track. 896 - """ 897 - # verify album exists and belongs to the authenticated artist 898 - album_result = await db.execute(select(Album).where(Album.id == album_id)) 899 - album = album_result.scalar_one_or_none() 900 - if not album: 901 - raise HTTPException(status_code=404, detail="album not found") 902 - if album.artist_did != auth_session.did: 903 - raise HTTPException( 904 - status_code=403, detail="you can only modify your own albums" 905 - ) 906 - 907 - # verify track exists and is in this album 908 - track_result = await db.execute(select(Track).where(Track.id == track_id)) 909 - track = track_result.scalar_one_or_none() 910 - if not track: 911 - raise HTTPException(status_code=404, detail="track not found") 912 - if track.album_id != album_id: 913 - raise HTTPException(status_code=400, detail="track is not in this album") 914 - 915 - # orphan the track 916 - track.album_id = None 917 - await db.commit() 918 - 919 - await invalidate_album_cache(auth_session.handle, album.slug) 920 - 921 - return RemoveTrackFromAlbumResponse(track_id=track_id) 922 - 923 - 924 - @router.delete("/{album_id}") 925 - async def delete_album( 926 - album_id: str, 927 - db: Annotated[AsyncSession, Depends(get_db)], 928 - auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 929 - cascade: Annotated[ 930 - bool, 931 - Query(description="if true, also delete all tracks in the album"), 932 - ] = False, 933 - ) -> DeleteAlbumResponse: 934 - """delete album. tracks are orphaned unless cascade=true. removes ATProto list record.""" 935 - from backend._internal.atproto.records import delete_record_by_uri 936 - 937 - # verify album exists and belongs to the authenticated artist 938 - result = await db.execute(select(Album).where(Album.id == album_id)) 939 - album = result.scalar_one_or_none() 940 - if not album: 941 - raise HTTPException(status_code=404, detail="album not found") 942 - if album.artist_did != auth_session.did: 943 - raise HTTPException( 944 - status_code=403, detail="you can only delete your own albums" 945 - ) 946 - 947 - # handle tracks 948 - if cascade: 949 - # delete all tracks in album 950 - from backend.api.tracks.mutations import delete_track 951 - 952 - tracks_result = await db.execute( 953 - select(Track).where(Track.album_id == album_id) 954 - ) 955 - tracks = tracks_result.scalars().all() 956 - for track in tracks: 957 - try: 958 - await delete_track(track.id, db, auth_session) 959 - except Exception as e: 960 - logger.warning(f"failed to delete track {track.id}: {e}") 961 - else: 962 - # orphan tracks - set album_id to null 963 - from sqlalchemy import update 964 - 965 - await db.execute( 966 - update(Track).where(Track.album_id == album_id).values(album_id=None) 967 - ) 968 - 969 - # delete ATProto record if exists 970 - if album.atproto_record_uri: 971 - try: 972 - await delete_record_by_uri(auth_session, album.atproto_record_uri) 973 - except Exception as e: 974 - logger.warning(f"failed to delete ATProto record: {e}") 975 - # continue with database cleanup even if ATProto delete fails 976 - 977 - # delete cover image from storage if exists 978 - if album.image_id: 979 - with contextlib.suppress(Exception): 980 - if album.image_url: 981 - await storage.delete_image(album.image_id, album.image_url) 982 - else: 983 - await storage.delete(album.image_id) 984 - 985 - # capture slug before deletion 986 - album_slug = album.slug 987 - 988 - # delete album from database 989 - await db.delete(album) 990 - await db.commit() 991 - 992 - # invalidate cache after commit so concurrent reads can't re-populate from pre-delete state 993 - await invalidate_album_cache(auth_session.handle, album_slug) 994 - 995 - return DeleteAlbumResponse(cascade=cascade)
+35
backend/src/backend/api/albums/__init__.py
··· 1 + """Albums API package that exposes the FastAPI router.""" 2 + 3 + from .router import router 4 + 5 + # Re-export cache utilities used by other modules 6 + from .cache import ( 7 + ALBUM_CACHE_PREFIX, 8 + ALBUM_CACHE_TTL_SECONDS, 9 + _album_cache_key, 10 + invalidate_album_cache, 11 + invalidate_album_cache_by_id, 12 + ) 13 + 14 + # Re-export schemas and endpoint functions used by tests 15 + from .listing import list_artist_albums 16 + from .schemas import AlbumMetadata, AlbumResponse 17 + 18 + # Import route modules to register handlers on the shared router. 19 + # Static path (GET /) imported first. 20 + from . import listing as _listing # /, /{handle}, /{handle}/{slug} 21 + from . import ( 22 + mutations as _mutations, 23 + ) # POST /, /{id}/cover, /{id}/finalize, PATCH, DELETE 24 + 25 + __all__ = [ 26 + "ALBUM_CACHE_PREFIX", 27 + "ALBUM_CACHE_TTL_SECONDS", 28 + "AlbumMetadata", 29 + "AlbumResponse", 30 + "_album_cache_key", 31 + "invalidate_album_cache", 32 + "invalidate_album_cache_by_id", 33 + "list_artist_albums", 34 + "router", 35 + ]
+123
backend/src/backend/api/albums/cache.py
··· 1 + """album cache utilities and response-builder helpers.""" 2 + 3 + import logging 4 + 5 + from sqlalchemy import func, select 6 + from sqlalchemy.ext.asyncio import AsyncSession 7 + 8 + from backend.models import Album, Artist, Track 9 + from backend.utilities.redis import get_async_redis_client 10 + 11 + from .schemas import AlbumListItem, AlbumMetadata, ArtistAlbumListItem 12 + 13 + logger = logging.getLogger(__name__) 14 + 15 + ALBUM_CACHE_PREFIX = "plyr:album:" 16 + ALBUM_CACHE_TTL_SECONDS = 300 # 5 minutes 17 + 18 + 19 + def _album_cache_key(handle: str, slug: str) -> str: 20 + return f"{ALBUM_CACHE_PREFIX}{handle}/{slug}" 21 + 22 + 23 + async def invalidate_album_cache(handle: str, slug: str) -> None: 24 + """delete cached album response. fails silently.""" 25 + try: 26 + redis = get_async_redis_client() 27 + await redis.delete(_album_cache_key(handle, slug)) 28 + except Exception: 29 + logger.debug("failed to invalidate album cache for %s/%s", handle, slug) 30 + 31 + 32 + async def invalidate_album_cache_by_id(db: AsyncSession, album_id: str) -> None: 33 + """look up album handle+slug and invalidate cache. fails silently.""" 34 + try: 35 + result = await db.execute( 36 + select(Album.slug, Artist.handle) 37 + .join(Artist, Album.artist_did == Artist.did) 38 + .where(Album.id == album_id) 39 + ) 40 + if row := result.first(): 41 + slug, handle = row 42 + await invalidate_album_cache(handle, slug) 43 + except Exception: 44 + logger.debug("failed to invalidate album cache by id %s", album_id) 45 + 46 + 47 + async def _album_stats(db: AsyncSession, album_id: str) -> tuple[int, int]: 48 + result = await db.execute( 49 + select( 50 + func.count(Track.id), 51 + func.coalesce(func.sum(Track.play_count), 0), 52 + ).where(Track.album_id == album_id) 53 + ) 54 + track_count, total_plays = result.one() 55 + return int(track_count or 0), int(total_plays or 0) 56 + 57 + 58 + async def _album_image_url(album: Album, artist: Artist | None = None) -> str | None: 59 + if album.image_url: 60 + return album.image_url 61 + if album.image_id: 62 + return await album.get_image_url() 63 + if artist and artist.avatar_url: 64 + return artist.avatar_url 65 + return None 66 + 67 + 68 + async def _album_list_item( 69 + album: Album, 70 + artist: Artist, 71 + track_count: int, 72 + total_plays: int, 73 + ) -> AlbumListItem: 74 + image_url = await _album_image_url(album, artist) 75 + return AlbumListItem( 76 + id=album.id, 77 + title=album.title, 78 + slug=album.slug, 79 + artist=artist.display_name, 80 + artist_handle=artist.handle, 81 + track_count=track_count, 82 + total_plays=total_plays, 83 + image_url=image_url, 84 + ) 85 + 86 + 87 + async def _artist_album_summary( 88 + album: Album, 89 + artist: Artist, 90 + track_count: int, 91 + total_plays: int, 92 + ) -> ArtistAlbumListItem: 93 + image_url = await _album_image_url(album, artist) 94 + return ArtistAlbumListItem( 95 + id=album.id, 96 + title=album.title, 97 + slug=album.slug, 98 + track_count=track_count, 99 + total_plays=total_plays, 100 + image_url=image_url, 101 + ) 102 + 103 + 104 + async def _album_metadata( 105 + album: Album, 106 + artist: Artist, 107 + track_count: int, 108 + total_plays: int, 109 + ) -> AlbumMetadata: 110 + image_url = await _album_image_url(album, artist) 111 + return AlbumMetadata( 112 + id=album.id, 113 + title=album.title, 114 + slug=album.slug, 115 + description=album.description, 116 + artist=artist.display_name, 117 + artist_handle=artist.handle, 118 + artist_did=artist.did, 119 + track_count=track_count, 120 + total_plays=total_plays, 121 + image_url=image_url, 122 + list_uri=album.atproto_record_uri, 123 + )
+250
backend/src/backend/api/albums/listing.py
··· 1 + """read-only album endpoints.""" 2 + 3 + import asyncio 4 + import logging 5 + from typing import Annotated 6 + 7 + from fastapi import Depends, HTTPException 8 + from sqlalchemy import func, select 9 + from sqlalchemy.ext.asyncio import AsyncSession 10 + from sqlalchemy.orm import selectinload 11 + 12 + from backend._internal import Session as AuthSession 13 + from backend._internal import get_optional_session 14 + from backend._internal.atproto.records import get_record_public_resilient 15 + from backend.models import Album, Artist, Track, TrackLike, get_db 16 + from backend.schemas import TrackResponse 17 + from backend.utilities.aggregations import ( 18 + get_comment_counts, 19 + get_like_counts, 20 + get_track_tags, 21 + ) 22 + from backend.utilities.redis import get_async_redis_client 23 + 24 + from .cache import ( 25 + ALBUM_CACHE_TTL_SECONDS, 26 + _album_cache_key, 27 + _album_list_item, 28 + _album_metadata, 29 + _artist_album_summary, 30 + ) 31 + from .router import router 32 + from .schemas import AlbumListItem, AlbumResponse, ArtistAlbumListItem 33 + 34 + logger = logging.getLogger(__name__) 35 + 36 + 37 + @router.get("/") 38 + async def list_albums( 39 + db: Annotated[AsyncSession, Depends(get_db)], 40 + ) -> dict[str, list[AlbumListItem]]: 41 + """list all albums with basic metadata. 42 + 43 + albums with zero tracks are hidden — they're either unfinalized drafts 44 + from the multi-track upload flow or legacy albums awaiting sync. only 45 + albums that have at least one track appear in public listings. 46 + """ 47 + stmt = ( 48 + select( 49 + Album, 50 + Artist, 51 + func.count(Track.id).label("track_count"), 52 + func.coalesce(func.sum(Track.play_count), 0).label("total_plays"), 53 + ) 54 + .join(Artist, Album.artist_did == Artist.did) 55 + .outerjoin(Track, Track.album_id == Album.id) 56 + .group_by(Album.id, Artist.did) 57 + .having(func.count(Track.id) > 0) 58 + .order_by(func.lower(Album.title)) 59 + ) 60 + 61 + result = await db.execute(stmt) 62 + albums: list[AlbumListItem] = [] 63 + for album, artist, track_count, total_plays in result: 64 + albums.append( 65 + await _album_list_item( 66 + album, 67 + artist, 68 + int(track_count or 0), 69 + int(total_plays or 0), 70 + ) 71 + ) 72 + 73 + return {"albums": albums} 74 + 75 + 76 + @router.get("/{handle}") 77 + async def list_artist_albums( 78 + handle: str, db: Annotated[AsyncSession, Depends(get_db)] 79 + ) -> dict[str, list[ArtistAlbumListItem]]: 80 + """list albums for a specific artist.""" 81 + artist_result = await db.execute(select(Artist).where(Artist.handle == handle)) 82 + artist = artist_result.scalar_one_or_none() 83 + if not artist: 84 + raise HTTPException(status_code=404, detail="artist not found") 85 + 86 + stmt = ( 87 + select( 88 + Album, 89 + func.count(Track.id).label("track_count"), 90 + func.coalesce(func.sum(Track.play_count), 0).label("total_plays"), 91 + ) 92 + .outerjoin(Track, Track.album_id == Album.id) 93 + .where(Album.artist_did == artist.did) 94 + .group_by(Album.id) 95 + .having(func.count(Track.id) > 0) 96 + .order_by(func.lower(Album.title)) 97 + ) 98 + result = await db.execute(stmt) 99 + 100 + album_items: list[ArtistAlbumListItem] = [] 101 + for album, track_count, total_plays in result: 102 + album_items.append( 103 + await _artist_album_summary( 104 + album, 105 + artist, 106 + int(track_count or 0), 107 + int(total_plays or 0), 108 + ) 109 + ) 110 + 111 + return {"albums": album_items} 112 + 113 + 114 + @router.get("/{handle}/{slug}") 115 + async def get_album( 116 + handle: str, 117 + slug: str, 118 + db: Annotated[AsyncSession, Depends(get_db)], 119 + session: AuthSession | None = Depends(get_optional_session), 120 + ) -> AlbumResponse: 121 + """get album details with tracks (ordered by ATProto list record or created_at).""" 122 + # check Redis cache first 123 + cache_key = _album_cache_key(handle, slug) 124 + try: 125 + redis = get_async_redis_client() 126 + if cached := await redis.get(cache_key): 127 + return AlbumResponse.model_validate_json(cached) 128 + except Exception: 129 + logger.debug("album cache read failed for %s/%s", handle, slug) 130 + 131 + # look up artist + album 132 + album_result = await db.execute( 133 + select(Album, Artist) 134 + .join(Artist, Album.artist_did == Artist.did) 135 + .where(Artist.handle == handle, Album.slug == slug) 136 + ) 137 + row = album_result.first() 138 + if not row: 139 + raise HTTPException(status_code=404, detail="album not found") 140 + 141 + album, artist = row 142 + 143 + pds_cache: dict[str, str | None] = {artist.did: artist.pds_url} 144 + 145 + # fetch all tracks for this album 146 + track_stmt = ( 147 + select(Track) 148 + .options(selectinload(Track.artist), selectinload(Track.album_rel)) 149 + .where(Track.album_id == album.id) 150 + ) 151 + track_result = await db.execute(track_stmt) 152 + all_tracks = list(track_result.scalars().all()) 153 + 154 + # determine track order: use ATProto list record if available 155 + ordered_tracks: list[Track] = [] 156 + if album.atproto_record_uri: 157 + try: 158 + record_data, _ = await get_record_public_resilient( 159 + record_uri=album.atproto_record_uri, 160 + pds_url=artist.pds_url, 161 + ) 162 + 163 + items = record_data.get("value", {}).get("items", []) 164 + track_uris = [item.get("subject", {}).get("uri") for item in items] 165 + track_uris = [uri for uri in track_uris if uri] 166 + 167 + # build uri -> track map 168 + track_by_uri = {t.atproto_record_uri: t for t in all_tracks} 169 + 170 + # order tracks by ATProto list, append any not in list at end 171 + seen_ids = set() 172 + for uri in track_uris: 173 + if uri in track_by_uri: 174 + track = track_by_uri[uri] 175 + ordered_tracks.append(track) 176 + seen_ids.add(track.id) 177 + 178 + # append any tracks not in the ATProto list (fallback) 179 + for track in sorted(all_tracks, key=lambda t: t.created_at): 180 + if track.id not in seen_ids: 181 + ordered_tracks.append(track) 182 + 183 + except Exception as e: 184 + logger.warning(f"failed to fetch ATProto list for album ordering: {e}") 185 + # fallback to created_at order 186 + ordered_tracks = sorted(all_tracks, key=lambda t: t.created_at) 187 + else: 188 + # no ATProto record - order by created_at 189 + ordered_tracks = sorted(all_tracks, key=lambda t: t.created_at) 190 + 191 + tracks = ordered_tracks 192 + track_ids = [track.id for track in tracks] 193 + 194 + # batch fetch aggregations 195 + if track_ids: 196 + like_counts, comment_counts, track_tags = await asyncio.gather( 197 + get_like_counts(db, track_ids), 198 + get_comment_counts(db, track_ids), 199 + get_track_tags(db, track_ids), 200 + ) 201 + else: 202 + like_counts, comment_counts, track_tags = {}, {}, {} 203 + 204 + # get authenticated user's likes for this album's tracks only 205 + liked_track_ids: set[int] | None = None 206 + if session: 207 + if track_ids: 208 + liked_result = await db.execute( 209 + select(TrackLike.track_id).where( 210 + TrackLike.user_did == session.did, 211 + TrackLike.track_id.in_(track_ids), 212 + ) 213 + ) 214 + liked_track_ids = set(liked_result.scalars().all()) 215 + 216 + # build track responses (maintaining order) 217 + track_responses = await asyncio.gather( 218 + *[ 219 + TrackResponse.from_track( 220 + track, 221 + pds_cache.get(track.artist_did), 222 + liked_track_ids, 223 + like_counts, 224 + comment_counts, 225 + track_tags=track_tags, 226 + ) 227 + for track in tracks 228 + ] 229 + ) 230 + 231 + total_plays = sum(t.play_count for t in tracks) 232 + metadata = await _album_metadata(album, artist, len(tracks), total_plays) 233 + 234 + response = AlbumResponse( 235 + metadata=metadata, 236 + tracks=[t.model_dump(mode="json") for t in track_responses], 237 + ) 238 + 239 + # cache a depersonalized copy (is_liked zeroed out) 240 + try: 241 + redis = get_async_redis_client() 242 + cache_tracks = [{**t, "is_liked": False} for t in response.tracks] 243 + cacheable = AlbumResponse(metadata=response.metadata, tracks=cache_tracks) 244 + await redis.set( 245 + cache_key, cacheable.model_dump_json(), ex=ALBUM_CACHE_TTL_SECONDS 246 + ) 247 + except Exception: 248 + logger.debug("album cache write failed for %s/%s", handle, slug) 249 + 250 + return response
+569
backend/src/backend/api/albums/mutations.py
··· 1 + """write endpoints for albums (create, update, delete, cover upload, finalize).""" 2 + 3 + import contextlib 4 + import logging 5 + from datetime import datetime 6 + from typing import Annotated 7 + 8 + from fastapi import Depends, File, HTTPException, Query, UploadFile 9 + from sqlalchemy import select, update 10 + from sqlalchemy.exc import IntegrityError 11 + from sqlalchemy.ext.asyncio import AsyncSession 12 + from sqlalchemy.orm import selectinload 13 + 14 + from backend._internal import Session as AuthSession 15 + from backend._internal import require_artist_profile 16 + from backend._internal.atproto.records import ( 17 + delete_record_by_uri, 18 + get_record_public_resilient, 19 + ) 20 + from backend._internal.atproto.records.fm_plyr.list import ( 21 + update_list_record, 22 + upsert_album_list_record, 23 + ) 24 + from backend._internal.atproto.records.fm_plyr.track import ( 25 + build_track_record, 26 + update_record, 27 + ) 28 + from backend._internal.image_uploads import COVER_EXTENSIONS, process_image_upload 29 + from backend.models import Album, Artist, CollectionEvent, Track, get_db 30 + from backend.storage import storage 31 + from backend.utilities.slugs import slugify 32 + 33 + from .cache import ( 34 + _album_metadata, 35 + _album_stats, 36 + invalidate_album_cache, 37 + ) 38 + from .router import router 39 + from .schemas import ( 40 + AlbumCreatePayload, 41 + AlbumFinalizePayload, 42 + AlbumMetadata, 43 + DeleteAlbumResponse, 44 + RemoveTrackFromAlbumResponse, 45 + ) 46 + 47 + logger = logging.getLogger(__name__) 48 + 49 + 50 + @router.post("/") 51 + async def create_album( 52 + body: AlbumCreatePayload, 53 + db: Annotated[AsyncSession, Depends(get_db)], 54 + auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 55 + ) -> AlbumMetadata: 56 + """create an empty album shell for the multi-track upload flow. 57 + 58 + the ATProto list record is NOT written here — it is deferred to 59 + `POST /albums/{id}/finalize`, which runs after tracks have actually 60 + been published so a total upload failure doesn't leave a fake release 61 + behind. for the same reason, the `album_release` CollectionEvent is 62 + also deferred to finalize (first successful call only, deduped). 63 + 64 + idempotent on (artist_did, slug): if an album with the same slug 65 + already exists, the existing row is returned instead of failing. 66 + this preserves the "type an existing album name to add tracks to it" 67 + UX — see finalize_album for the append semantics. 68 + """ 69 + title = body.title.strip() 70 + if not title: 71 + raise HTTPException(status_code=400, detail="title is required") 72 + 73 + slug = body.slug.strip() if body.slug else slugify(title) 74 + if not slug: 75 + raise HTTPException(status_code=400, detail="invalid slug") 76 + 77 + description = body.description.strip() if body.description else None 78 + 79 + # lookup artist for the response payload 80 + artist_result = await db.execute( 81 + select(Artist).where(Artist.did == auth_session.did) 82 + ) 83 + artist = artist_result.scalar_one() 84 + 85 + # idempotent on (artist_did, slug) — matches get_or_create_album semantics 86 + existing_result = await db.execute( 87 + select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 88 + ) 89 + if existing := existing_result.scalar_one_or_none(): 90 + track_count, total_plays = await _album_stats(db, existing.id) 91 + return await _album_metadata(existing, artist, track_count, total_plays) 92 + 93 + album = Album( 94 + artist_did=artist.did, 95 + slug=slug, 96 + title=title, 97 + description=description, 98 + ) 99 + db.add(album) 100 + try: 101 + await db.flush() 102 + except IntegrityError: 103 + # concurrent create raced us — return the winning row 104 + await db.rollback() 105 + retry_result = await db.execute( 106 + select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 107 + ) 108 + album = retry_result.scalar_one() 109 + track_count, total_plays = await _album_stats(db, album.id) 110 + return await _album_metadata(album, artist, track_count, total_plays) 111 + 112 + await db.commit() 113 + await db.refresh(album) 114 + 115 + return await _album_metadata(album, artist, track_count=0, total_plays=0) 116 + 117 + 118 + @router.post("/{album_id}/cover") 119 + async def upload_album_cover( 120 + album_id: str, 121 + db: Annotated[AsyncSession, Depends(get_db)], 122 + auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 123 + image: UploadFile = File(...), 124 + ) -> dict[str, str | None]: 125 + """upload cover art for an album (requires authentication).""" 126 + # verify album exists and belongs to the authenticated artist 127 + result = await db.execute(select(Album).where(Album.id == album_id)) 128 + album = result.scalar_one_or_none() 129 + if not album: 130 + raise HTTPException(status_code=404, detail="album not found") 131 + if album.artist_did != auth_session.did: 132 + raise HTTPException( 133 + status_code=403, detail="you can only upload cover art for your own albums" 134 + ) 135 + 136 + try: 137 + uploaded = await process_image_upload( 138 + image, "album", allowed_extensions=COVER_EXTENSIONS 139 + ) 140 + 141 + # delete old image if exists (prevent R2 object leaks) 142 + if album.image_id and album.image_id != uploaded.image_id: 143 + with contextlib.suppress(Exception): 144 + if album.image_url: 145 + await storage.delete_image(album.image_id, album.image_url) 146 + else: 147 + await storage.delete(album.image_id) 148 + 149 + # update album with new image 150 + album.image_id = uploaded.image_id 151 + album.image_url = uploaded.image_url 152 + album.thumbnail_url = uploaded.thumbnail_url 153 + await db.commit() 154 + 155 + await invalidate_album_cache(auth_session.handle, album.slug) 156 + 157 + return { 158 + "image_url": uploaded.image_url, 159 + "image_id": uploaded.image_id, 160 + "thumbnail_url": uploaded.thumbnail_url, 161 + } 162 + 163 + except HTTPException: 164 + raise 165 + except Exception as e: 166 + raise HTTPException( 167 + status_code=500, detail=f"failed to upload image: {e!s}" 168 + ) from e 169 + 170 + 171 + @router.post("/{album_id}/finalize") 172 + async def finalize_album( 173 + album_id: str, 174 + body: AlbumFinalizePayload, 175 + db: Annotated[AsyncSession, Depends(get_db)], 176 + auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 177 + ) -> AlbumMetadata: 178 + """write the album's ATProto list record using an explicit track order. 179 + 180 + called by the frontend after per-track uploads have settled. this is 181 + the single place the list record is created/updated for albums built 182 + via `POST /albums/` + `POST /tracks/?album_id=...`. 183 + 184 + append semantics: `track_ids` carries only the tracks from the current 185 + upload session. any tracks already on the album that are NOT in 186 + `track_ids` are preserved in the list record at their current positions 187 + (fetched from the existing list record if present, falling back to 188 + created_at order). new tracks are appended at the end in the order 189 + requested. this matches the "type an existing album name to add tracks 190 + to it" UX without truncating prior track history. 191 + 192 + also emits an `album_release` CollectionEvent on the first successful 193 + finalize for the album — so total upload failures don't leave a fake 194 + release event in the activity feed. 195 + """ 196 + if not body.track_ids: 197 + raise HTTPException(status_code=400, detail="track_ids must not be empty") 198 + 199 + # verify album ownership 200 + album_result = await db.execute(select(Album).where(Album.id == album_id)) 201 + album = album_result.scalar_one_or_none() 202 + if not album: 203 + raise HTTPException(status_code=404, detail="album not found") 204 + if album.artist_did != auth_session.did: 205 + raise HTTPException( 206 + status_code=403, detail="you can only finalize your own albums" 207 + ) 208 + 209 + # fetch the requested tracks for validation 210 + requested_result = await db.execute( 211 + select(Track).where(Track.id.in_(body.track_ids)) 212 + ) 213 + requested_by_id = {t.id: t for t in requested_result.scalars().all()} 214 + 215 + # validate: every requested id exists, belongs to this album, and has a 216 + # completed PDS write. surface specific errors so the frontend can retry 217 + # or message the user precisely. 218 + missing = [tid for tid in body.track_ids if tid not in requested_by_id] 219 + if missing: 220 + raise HTTPException(status_code=400, detail=f"track(s) not found: {missing}") 221 + 222 + wrong_album = [ 223 + tid for tid in body.track_ids if requested_by_id[tid].album_id != album_id 224 + ] 225 + if wrong_album: 226 + raise HTTPException( 227 + status_code=400, 228 + detail=f"track(s) do not belong to this album: {wrong_album}", 229 + ) 230 + 231 + missing_pds = [ 232 + tid 233 + for tid in body.track_ids 234 + if not requested_by_id[tid].atproto_record_uri 235 + or not requested_by_id[tid].atproto_record_cid 236 + ] 237 + if missing_pds: 238 + raise HTTPException( 239 + status_code=400, 240 + detail=( 241 + f"track(s) missing PDS record (upload may still be in flight): " 242 + f"{missing_pds}" 243 + ), 244 + ) 245 + 246 + # fetch ALL PDS-ref'd tracks already on this album — these may include 247 + # tracks from prior upload sessions that the current request doesn't 248 + # mention and must be preserved in the list record. 249 + existing_result = await db.execute( 250 + select(Track).where( 251 + Track.album_id == album_id, 252 + Track.atproto_record_uri.isnot(None), 253 + Track.atproto_record_cid.isnot(None), 254 + ) 255 + ) 256 + all_album_tracks = {t.id: t for t in existing_result.scalars().all()} 257 + 258 + # partition: preserved (existing, not in this request) vs new (in this request). 259 + # a track id that appears in both sets is treated as "new" so a repeat finalize 260 + # with the same ids rewrites the order deterministically. 261 + requested_set = set(body.track_ids) 262 + preserved_tracks = [ 263 + t for tid, t in all_album_tracks.items() if tid not in requested_set 264 + ] 265 + 266 + # determine preserved order: if the album already has a list record, honor 267 + # its current item order (which captures any manual reorderings the owner 268 + # made from the album edit page). fall back to created_at for tracks not 269 + # in the existing list, or if the PDS fetch fails entirely. 270 + preserved_position_by_uri: dict[str, int] = {} 271 + if album.atproto_record_uri and preserved_tracks: 272 + try: 273 + artist_lookup = await db.execute( 274 + select(Artist).where(Artist.did == album.artist_did) 275 + ) 276 + artist_for_pds = artist_lookup.scalar_one() 277 + record_data, _ = await get_record_public_resilient( 278 + record_uri=album.atproto_record_uri, 279 + pds_url=artist_for_pds.pds_url, 280 + ) 281 + items = record_data.get("value", {}).get("items", []) 282 + for i, item in enumerate(items): 283 + uri = item.get("subject", {}).get("uri") 284 + if uri: 285 + preserved_position_by_uri[uri] = i 286 + except Exception as e: 287 + logger.debug( 288 + f"finalize_album: failed to fetch existing list for preserved " 289 + f"track order on {album_id}: {e}" 290 + ) 291 + 292 + def _preserved_sort_key(t: Track) -> tuple[int, datetime]: 293 + # tracks already in the existing list: keep their position 294 + # tracks not in the existing list (or if fetch failed): sort by created_at 295 + # after all positioned items 296 + pos = preserved_position_by_uri.get(t.atproto_record_uri or "", 10_000_000) 297 + return (pos, t.created_at) 298 + 299 + preserved_tracks.sort(key=_preserved_sort_key) 300 + 301 + # build the final list: preserved (existing, at front) + new (in requested order) 302 + final_order: list[Track] = list(preserved_tracks) + [ 303 + requested_by_id[tid] for tid in body.track_ids 304 + ] 305 + 306 + # strongRefs in final order (the validation above guarantees these are 307 + # non-None for the requested tracks; preserved tracks were filtered at 308 + # fetch time, but narrow for the type checker) 309 + track_refs: list[dict[str, str]] = [] 310 + for t in final_order: 311 + assert t.atproto_record_uri is not None 312 + assert t.atproto_record_cid is not None 313 + track_refs.append({"uri": t.atproto_record_uri, "cid": t.atproto_record_cid}) 314 + 315 + try: 316 + result = await upsert_album_list_record( 317 + auth_session, 318 + album_id=album_id, 319 + album_title=album.title, 320 + track_refs=track_refs, 321 + existing_uri=album.atproto_record_uri, 322 + existing_created_at=album.created_at, 323 + ) 324 + except Exception as e: 325 + logger.warning(f"failed to write album list record for {album_id}: {e}") 326 + raise HTTPException( 327 + status_code=500, detail=f"failed to write album list record: {e}" 328 + ) from e 329 + 330 + if result: 331 + album.atproto_record_uri = result[0] 332 + album.atproto_record_cid = result[1] 333 + 334 + # emit album_release CollectionEvent on the first successful finalize only. 335 + # deferred from create_album so a total upload failure doesn't publish a 336 + # fake release event. deduped by checking for any existing event. 337 + existing_event = await db.execute( 338 + select(CollectionEvent).where( 339 + CollectionEvent.album_id == album_id, 340 + CollectionEvent.event_type == "album_release", 341 + ) 342 + ) 343 + if not existing_event.scalar_one_or_none(): 344 + db.add( 345 + CollectionEvent( 346 + event_type="album_release", 347 + actor_did=auth_session.did, 348 + album_id=album_id, 349 + ) 350 + ) 351 + 352 + await db.commit() 353 + 354 + await invalidate_album_cache(auth_session.handle, album.slug) 355 + 356 + artist_result = await db.execute( 357 + select(Artist).where(Artist.did == album.artist_did) 358 + ) 359 + artist = artist_result.scalar_one() 360 + track_count, total_plays = await _album_stats(db, album_id) 361 + return await _album_metadata(album, artist, track_count, total_plays) 362 + 363 + 364 + @router.patch("/{album_id}") 365 + async def update_album( 366 + album_id: str, 367 + db: Annotated[AsyncSession, Depends(get_db)], 368 + auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 369 + title: Annotated[str | None, Query(description="new album title")] = None, 370 + description: Annotated[ 371 + str | None, Query(description="new album description") 372 + ] = None, 373 + ) -> AlbumMetadata: 374 + """update album metadata (title, description). syncs ATProto records on title change.""" 375 + result = await db.execute( 376 + select(Album) 377 + .where(Album.id == album_id) 378 + .options(selectinload(Album.tracks).selectinload(Track.artist)) 379 + ) 380 + album = result.scalar_one_or_none() 381 + if not album: 382 + raise HTTPException(status_code=404, detail="album not found") 383 + if album.artist_did != auth_session.did: 384 + raise HTTPException( 385 + status_code=403, detail="you can only update your own albums" 386 + ) 387 + 388 + old_title = album.title 389 + old_slug = album.slug 390 + title_changed = title is not None and title.strip() != old_title 391 + 392 + if title is not None: 393 + album.title = title.strip() 394 + # sync slug when title changes so get_or_create_album lookups work 395 + if title_changed: 396 + album.slug = slugify(title.strip()) 397 + if description is not None: 398 + album.description = description.strip() if description.strip() else None 399 + 400 + # if title changed, update all tracks' extra["album"] and ATProto records 401 + if title_changed and title is not None: 402 + new_title = title.strip() 403 + 404 + for track in album.tracks: 405 + # update the track's extra["album"] field 406 + if track.extra is None: 407 + track.extra = {} 408 + track.extra = {**track.extra, "album": new_title} 409 + 410 + # update ATProto record if track has one 411 + if track.atproto_record_uri and track.r2_url and track.file_type: 412 + updated_record = build_track_record( 413 + title=track.title, 414 + artist=track.artist.display_name, 415 + audio_url=track.r2_url, 416 + file_type=track.file_type, 417 + album=new_title, 418 + duration=track.duration, 419 + features=track.features if track.features else None, 420 + image_url=await track.get_image_url(), 421 + ) 422 + 423 + _, new_cid = await update_record( 424 + auth_session=auth_session, 425 + record_uri=track.atproto_record_uri, 426 + record=updated_record, 427 + ) 428 + track.atproto_record_cid = new_cid 429 + 430 + # update the album's ATProto list record name 431 + if album.atproto_record_uri: 432 + track_refs = [ 433 + {"uri": t.atproto_record_uri, "cid": t.atproto_record_cid} 434 + for t in album.tracks 435 + if t.atproto_record_uri and t.atproto_record_cid 436 + ] 437 + _, new_list_cid = await update_list_record( 438 + auth_session=auth_session, 439 + list_uri=album.atproto_record_uri, 440 + items=track_refs, 441 + name=new_title, 442 + list_type="album", 443 + created_at=album.created_at, 444 + ) 445 + album.atproto_record_cid = new_list_cid 446 + 447 + await db.commit() 448 + 449 + # invalidate cache for old slug (new slug will be a cache miss) 450 + await invalidate_album_cache(auth_session.handle, old_slug) 451 + if album.slug != old_slug: 452 + await invalidate_album_cache(auth_session.handle, album.slug) 453 + 454 + # fetch artist for response 455 + artist_result = await db.execute( 456 + select(Artist).where(Artist.did == album.artist_did) 457 + ) 458 + artist = artist_result.scalar_one() 459 + track_count, total_plays = await _album_stats(db, album_id) 460 + 461 + return await _album_metadata(album, artist, track_count, total_plays) 462 + 463 + 464 + @router.delete("/{album_id}/tracks/{track_id}") 465 + async def remove_track_from_album( 466 + album_id: str, 467 + track_id: int, 468 + db: Annotated[AsyncSession, Depends(get_db)], 469 + auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 470 + ) -> RemoveTrackFromAlbumResponse: 471 + """remove a track from an album (orphan it, don't delete). 472 + 473 + the track remains available as a standalone track. 474 + """ 475 + # verify album exists and belongs to the authenticated artist 476 + album_result = await db.execute(select(Album).where(Album.id == album_id)) 477 + album = album_result.scalar_one_or_none() 478 + if not album: 479 + raise HTTPException(status_code=404, detail="album not found") 480 + if album.artist_did != auth_session.did: 481 + raise HTTPException( 482 + status_code=403, detail="you can only modify your own albums" 483 + ) 484 + 485 + # verify track exists and is in this album 486 + track_result = await db.execute(select(Track).where(Track.id == track_id)) 487 + track = track_result.scalar_one_or_none() 488 + if not track: 489 + raise HTTPException(status_code=404, detail="track not found") 490 + if track.album_id != album_id: 491 + raise HTTPException(status_code=400, detail="track is not in this album") 492 + 493 + # orphan the track 494 + track.album_id = None 495 + await db.commit() 496 + 497 + await invalidate_album_cache(auth_session.handle, album.slug) 498 + 499 + return RemoveTrackFromAlbumResponse(track_id=track_id) 500 + 501 + 502 + @router.delete("/{album_id}") 503 + async def delete_album( 504 + album_id: str, 505 + db: Annotated[AsyncSession, Depends(get_db)], 506 + auth_session: Annotated[AuthSession, Depends(require_artist_profile)], 507 + cascade: Annotated[ 508 + bool, 509 + Query(description="if true, also delete all tracks in the album"), 510 + ] = False, 511 + ) -> DeleteAlbumResponse: 512 + """delete album. tracks are orphaned unless cascade=true. removes ATProto list record.""" 513 + # verify album exists and belongs to the authenticated artist 514 + result = await db.execute(select(Album).where(Album.id == album_id)) 515 + album = result.scalar_one_or_none() 516 + if not album: 517 + raise HTTPException(status_code=404, detail="album not found") 518 + if album.artist_did != auth_session.did: 519 + raise HTTPException( 520 + status_code=403, detail="you can only delete your own albums" 521 + ) 522 + 523 + # handle tracks 524 + if cascade: 525 + # delete all tracks in album 526 + from backend.api.tracks.mutations import delete_track 527 + 528 + tracks_result = await db.execute( 529 + select(Track).where(Track.album_id == album_id) 530 + ) 531 + tracks = tracks_result.scalars().all() 532 + for track in tracks: 533 + try: 534 + await delete_track(track.id, db, auth_session) 535 + except Exception as e: 536 + logger.warning(f"failed to delete track {track.id}: {e}") 537 + else: 538 + # orphan tracks - set album_id to null 539 + await db.execute( 540 + update(Track).where(Track.album_id == album_id).values(album_id=None) 541 + ) 542 + 543 + # delete ATProto record if exists 544 + if album.atproto_record_uri: 545 + try: 546 + await delete_record_by_uri(auth_session, album.atproto_record_uri) 547 + except Exception as e: 548 + logger.warning(f"failed to delete ATProto record: {e}") 549 + # continue with database cleanup even if ATProto delete fails 550 + 551 + # delete cover image from storage if exists 552 + if album.image_id: 553 + with contextlib.suppress(Exception): 554 + if album.image_url: 555 + await storage.delete_image(album.image_id, album.image_url) 556 + else: 557 + await storage.delete(album.image_id) 558 + 559 + # capture slug before deletion 560 + album_slug = album.slug 561 + 562 + # delete album from database 563 + await db.delete(album) 564 + await db.commit() 565 + 566 + # invalidate cache after commit so concurrent reads can't re-populate from pre-delete state 567 + await invalidate_album_cache(auth_session.handle, album_slug) 568 + 569 + return DeleteAlbumResponse(cascade=cascade)
+7
backend/src/backend/api/albums/router.py
··· 1 + """shared FastAPI router for album endpoints.""" 2 + 3 + from fastapi import APIRouter 4 + 5 + router = APIRouter(prefix="/albums", tags=["albums"]) 6 + 7 + __all__ = ["router"]
+85
backend/src/backend/api/albums/schemas.py
··· 1 + """pydantic models for album endpoints.""" 2 + 3 + from pydantic import BaseModel 4 + 5 + 6 + class AlbumMetadata(BaseModel): 7 + """album metadata response.""" 8 + 9 + id: str 10 + title: str 11 + slug: str 12 + description: str | None = None 13 + artist: str 14 + artist_handle: str 15 + artist_did: str 16 + track_count: int 17 + total_plays: int 18 + image_url: str | None 19 + list_uri: str | None = None # ATProto list record URI for reordering 20 + 21 + 22 + class AlbumResponse(BaseModel): 23 + """album detail response with tracks.""" 24 + 25 + metadata: AlbumMetadata 26 + tracks: list[dict] 27 + 28 + 29 + class AlbumListItem(BaseModel): 30 + """minimal album info for listing.""" 31 + 32 + id: str 33 + title: str 34 + slug: str 35 + artist: str 36 + artist_handle: str 37 + track_count: int 38 + 39 + 40 + class RemoveTrackFromAlbumResponse(BaseModel): 41 + """response for removing a track from an album.""" 42 + 43 + removed: bool = True 44 + track_id: int 45 + 46 + 47 + class DeleteAlbumResponse(BaseModel): 48 + """response for deleting an album.""" 49 + 50 + deleted: bool = True 51 + cascade: bool 52 + 53 + 54 + class ArtistAlbumListItem(BaseModel): 55 + """album info for a specific artist (used on artist pages).""" 56 + 57 + id: str 58 + title: str 59 + slug: str 60 + track_count: int 61 + total_plays: int 62 + image_url: str | None 63 + 64 + 65 + class AlbumCreatePayload(BaseModel): 66 + title: str 67 + slug: str | None = None 68 + description: str | None = None 69 + 70 + 71 + class AlbumUpdatePayload(BaseModel): 72 + title: str | None = None 73 + slug: str | None = None 74 + description: str | None = None 75 + 76 + 77 + class AlbumFinalizePayload(BaseModel): 78 + """request body for POST /albums/{id}/finalize. 79 + 80 + track_ids is the authoritative user-intended order for the album's 81 + ATProto list record. every id must belong to this album and have a 82 + completed PDS write (atproto_record_uri + cid set). 83 + """ 84 + 85 + track_ids: list[int]
+20 -234
backend/src/backend/api/lists.py backend/src/backend/api/lists/playlists.py
··· 1 - """lists api endpoints for ATProto list records.""" 1 + """playlist CRUD endpoints.""" 2 2 3 3 import contextlib 4 4 import json 5 5 import logging 6 - from typing import Annotated, Literal 6 + from typing import Annotated 7 7 8 - from fastapi import ( 9 - APIRouter, 10 - Depends, 11 - File, 12 - Form, 13 - HTTPException, 14 - Query, 15 - UploadFile, 16 - ) 17 - from pydantic import BaseModel 8 + from fastapi import Depends, File, Form, HTTPException, Query, UploadFile 18 9 from sqlalchemy import select 19 10 from sqlalchemy.ext.asyncio import AsyncSession 11 + from sqlalchemy.orm import selectinload 20 12 21 13 from backend._internal import Session as AuthSession 22 - from backend._internal import get_optional_session, require_auth 14 + from backend._internal import get_oauth_client, get_optional_session, require_auth 23 15 from backend._internal.atproto.client import parse_at_uri 24 16 from backend._internal.atproto.records import ( 25 17 RecordNotFound, 26 18 _reconstruct_oauth_session, 27 19 create_list_record, 20 + delete_record_by_uri, 28 21 get_record_public_resilient, 29 22 update_list_record, 30 23 ) 31 24 from backend._internal.image_uploads import COVER_EXTENSIONS, process_image_upload 32 25 from backend._internal.recommendations import get_playlist_recommendations 33 - from backend.api.albums import invalidate_album_cache 34 26 from backend.config import settings 35 27 from backend.models import ( 36 - Album, 37 28 Artist, 29 + CollectionEvent, 38 30 Playlist, 39 31 Track, 40 32 TrackLike, 41 - UserPreferences, 42 33 get_db, 43 34 ) 44 35 from backend.schemas import DeletedResponse, TrackResponse ··· 46 37 from backend.utilities.aggregations import get_comment_counts, get_like_counts 47 38 from backend.utilities.redis import get_async_redis_client 48 39 49 - logger = logging.getLogger(__name__) 50 - 51 - 52 - # --- playlist schemas --- 53 - 54 - 55 - class CreatePlaylistRequest(BaseModel): 56 - """request body for creating a playlist.""" 57 - 58 - name: str 59 - """display name for the playlist.""" 60 - 61 - 62 - class PlaylistResponse(BaseModel): 63 - """playlist metadata response.""" 64 - 65 - id: str 66 - name: str 67 - owner_did: str 68 - owner_handle: str 69 - track_count: int 70 - image_url: str | None 71 - show_on_profile: bool 72 - atproto_record_uri: str 73 - created_at: str 74 - 75 - 76 - class PlaylistWithTracksResponse(PlaylistResponse): 77 - """playlist with full track details.""" 78 - 79 - tracks: list[TrackResponse] 80 - """ordered list of track details.""" 81 - 82 - 83 - class AddTrackRequest(BaseModel): 84 - """request body for adding a track to a playlist.""" 85 - 86 - track_uri: str 87 - """ATProto URI of the track to add.""" 88 - track_cid: str 89 - """CID of the track to add.""" 90 - 91 - 92 - router = APIRouter(prefix="/lists", tags=["lists"]) 93 - 94 - 95 - class ReorderRequest(BaseModel): 96 - """request body for reordering list items.""" 97 - 98 - items: list[dict[str, str]] 99 - """ordered array of strongRefs (uri + cid). array order = display order.""" 100 - 101 - 102 - class ReorderResponse(BaseModel): 103 - """response from reorder operation.""" 104 - 105 - uri: str 106 - cid: str 107 - 108 - 109 - class RecommendedTrack(BaseModel): 110 - """a recommended track for a playlist.""" 111 - 112 - id: int 113 - title: str 114 - artist_handle: str 115 - artist_display_name: str 116 - image_url: str | None 117 - 118 - 119 - class PlaylistRecommendationsResponse(BaseModel): 120 - """response for playlist recommendations.""" 121 - 122 - tracks: list[RecommendedTrack] 123 - available: bool 124 - 125 - 126 - @router.put("/liked/reorder") 127 - async def reorder_liked_list( 128 - body: ReorderRequest, 129 - session: AuthSession = Depends(require_auth), 130 - db: AsyncSession = Depends(get_db), 131 - ) -> ReorderResponse: 132 - """reorder items in the user's liked tracks list. 133 - 134 - the items array order becomes the new display order. 135 - only the list owner can reorder their own list. 136 - """ 137 - # get the user's liked list URI from preferences 138 - prefs_result = await db.execute( 139 - select(UserPreferences).where(UserPreferences.did == session.did) 140 - ) 141 - prefs = prefs_result.scalar_one_or_none() 142 - 143 - if not prefs or not prefs.liked_list_uri: 144 - raise HTTPException( 145 - status_code=404, 146 - detail="liked list not found - try liking a track first", 147 - ) 148 - 149 - # update the list record with new item order 150 - # (update_list_record → make_pds_request handles token refresh internally) 151 - try: 152 - uri, cid = await update_list_record( 153 - auth_session=session, 154 - list_uri=prefs.liked_list_uri, 155 - items=body.items, 156 - list_type="liked", 157 - ) 158 - return ReorderResponse(uri=uri, cid=cid) 159 - except Exception as e: 160 - raise HTTPException( 161 - status_code=500, detail=f"failed to reorder list: {e}" 162 - ) from e 163 - 164 - 165 - @router.put("/{rkey}/reorder") 166 - async def reorder_list( 167 - rkey: str, 168 - body: ReorderRequest, 169 - session: AuthSession = Depends(require_auth), 170 - db: AsyncSession = Depends(get_db), 171 - ) -> ReorderResponse: 172 - """reorder items in a list by rkey. items array order = new display order.""" 173 - from backend.config import settings 174 - 175 - # construct the full AT URI 176 - list_uri = f"at://{session.did}/{settings.atproto.list_collection}/{rkey}" 177 - 178 - # update the list record with new item order 179 - # (update_list_record → make_pds_request handles token refresh internally) 180 - try: 181 - uri, cid = await update_list_record( 182 - auth_session=session, 183 - list_uri=list_uri, 184 - items=body.items, 185 - ) 186 - 187 - # invalidate album cache if this list belongs to an album 188 - result = await db.execute( 189 - select(Album).where(Album.atproto_record_uri == list_uri) 190 - ) 191 - if album := result.scalar_one_or_none(): 192 - await invalidate_album_cache(session.handle, album.slug) 193 - 194 - return ReorderResponse(uri=uri, cid=cid) 195 - except Exception as e: 196 - raise HTTPException( 197 - status_code=500, detail=f"failed to reorder list: {e}" 198 - ) from e 199 - 200 - 201 - # --- generic list resolver --- 40 + from .router import router 41 + from .schemas import ( 42 + AddTrackRequest, 43 + CreatePlaylistRequest, 44 + PlaylistRecommendationsResponse, 45 + PlaylistResponse, 46 + PlaylistWithTracksResponse, 47 + RecommendedTrack, 48 + ) 202 49 203 - 204 - class ListByUriResponse(BaseModel): 205 - """resolved list type and routing info for an AT-URI.""" 206 - 207 - type: Literal["album", "playlist"] 208 - id: str 209 - handle: str | None = None 210 - slug: str | None = None 211 - 212 - 213 - @router.get("/by-uri", response_model=ListByUriResponse) 214 - async def resolve_list_by_uri( 215 - uri: Annotated[str, Query(description="AT-URI of a list record")], 216 - db: AsyncSession = Depends(get_db), 217 - ) -> ListByUriResponse: 218 - """resolve a list AT-URI to its type (album or playlist) with routing info.""" 219 - # check albums first 220 - result = await db.execute( 221 - select(Album, Artist) 222 - .join(Artist, Album.artist_did == Artist.did) 223 - .where(Album.atproto_record_uri == uri) 224 - ) 225 - if row := result.first(): 226 - album, artist = row 227 - return ListByUriResponse( 228 - type="album", id=album.id, handle=artist.handle, slug=album.slug 229 - ) 230 - 231 - # check playlists 232 - result = await db.execute( 233 - select(Playlist).where(Playlist.atproto_record_uri == uri) 234 - ) 235 - if playlist := result.scalar_one_or_none(): 236 - return ListByUriResponse(type="playlist", id=playlist.id) 237 - 238 - raise HTTPException(status_code=404, detail="list not found") 50 + logger = logging.getLogger(__name__) 239 51 240 52 241 53 # --- playlist CRUD endpoints --- ··· 282 94 await db.flush() 283 95 284 96 # emit collection event 285 - from backend.models import CollectionEvent 286 - 287 97 db.add( 288 98 CollectionEvent( 289 99 event_type="playlist_create", ··· 390 200 391 201 # fetch ATProto record (public - no auth needed) 392 202 try: 393 - record_data, resolved_pds_url = await get_record_public_resilient( 203 + record_data, _ = await get_record_public_resilient( 394 204 record_uri=playlist.atproto_record_uri, 395 205 pds_url=artist.pds_url, 396 206 ) 397 - if resolved_pds_url: 398 - artist.pds_url = resolved_pds_url 399 - db.add(artist) 400 - await db.commit() 401 207 except RecordNotFound: 402 208 raise HTTPException( 403 209 status_code=404, detail="playlist record not found on PDS" ··· 416 222 # hydrate track metadata from database 417 223 tracks: list[TrackResponse] = [] 418 224 if track_uris: 419 - from sqlalchemy.orm import selectinload 420 - 421 225 track_result = await db.execute( 422 226 select(Track) 423 227 .options(selectinload(Track.artist), selectinload(Track.album_rel)) ··· 522 326 523 327 # fetch ATProto record (public - no auth needed) 524 328 try: 525 - record_data, resolved_pds_url = await get_record_public_resilient( 329 + record_data, _ = await get_record_public_resilient( 526 330 record_uri=playlist.atproto_record_uri, 527 331 pds_url=artist.pds_url, 528 332 ) 529 - if resolved_pds_url: 530 - artist.pds_url = resolved_pds_url 531 - db.add(artist) 532 - await db.commit() 533 333 except RecordNotFound: 534 334 raise HTTPException( 535 335 status_code=404, detail="playlist record not found on PDS" ··· 548 348 # hydrate track metadata from database 549 349 tracks: list[TrackResponse] = [] 550 350 if track_uris: 551 - from sqlalchemy.orm import selectinload 552 - 553 351 track_result = await db.execute( 554 352 select(Track) 555 353 .options(selectinload(Track.artist), selectinload(Track.album_rel)) ··· 636 434 raise HTTPException(status_code=401, detail="invalid session") 637 435 638 436 oauth_session = _reconstruct_oauth_session(oauth_data) 639 - from backend._internal import get_oauth_client 640 437 641 438 repo, collection, rkey = parse_at_uri(playlist.atproto_record_uri) 642 439 ··· 687 484 playlist.track_count = len(new_items) 688 485 689 486 # emit collection event — resolve track_id from URI 690 - from backend.models import CollectionEvent 691 - 692 487 track_result = await db.execute( 693 488 select(Track.id).where(Track.atproto_record_uri == body.track_uri) 694 489 ) ··· 748 543 raise HTTPException(status_code=401, detail="invalid session") 749 544 750 545 oauth_session = _reconstruct_oauth_session(oauth_data) 751 - from backend._internal import get_oauth_client 752 546 753 547 repo, collection, rkey = parse_at_uri(playlist.atproto_record_uri) 754 548 ··· 821 615 822 616 deletes both the ATProto list record and the database cache. 823 617 """ 824 - from backend._internal.atproto.records import delete_record_by_uri 825 - 826 618 # get playlist and verify ownership 827 619 result = await db.execute(select(Playlist).where(Playlist.id == playlist_id)) 828 620 playlist = result.scalar_one_or_none() ··· 952 744 # fetch current list record to preserve items 953 745 oauth_data = session.oauth_session 954 746 if oauth_data and "access_token" in oauth_data: 955 - from backend._internal import get_oauth_client 956 - 957 747 oauth_session = _reconstruct_oauth_session(oauth_data) 958 748 959 749 repo, collection, rkey = parse_at_uri(playlist.atproto_record_uri) ··· 1060 850 1061 851 # get track IDs from the playlist's ATProto record 1062 852 try: 1063 - record_data, resolved_pds_url = await get_record_public_resilient( 853 + record_data, _ = await get_record_public_resilient( 1064 854 record_uri=playlist.atproto_record_uri, 1065 855 pds_url=artist.pds_url, 1066 856 ) 1067 - if resolved_pds_url: 1068 - artist.pds_url = resolved_pds_url 1069 - db.add(artist) 1070 - await db.commit() 1071 857 except Exception as e: 1072 858 logger.warning("failed to fetch playlist record for recommendations: %s", e) 1073 859 return unavailable
+11
backend/src/backend/api/lists/__init__.py
··· 1 + """Lists API package that exposes the FastAPI router.""" 2 + 3 + from .router import router 4 + 5 + # Import route modules to register handlers on the shared router. 6 + # Static paths first, then wildcard paths. 7 + from . import reorder as _reorder # /liked/reorder, /{rkey}/reorder 8 + from . import resolver as _resolver # /by-uri 9 + from . import playlists as _playlists # /playlists, /playlists/{id}, ... 10 + 11 + __all__ = ["router"]
+88
backend/src/backend/api/lists/reorder.py
··· 1 + """reorder endpoints for list items.""" 2 + 3 + from fastapi import Depends, HTTPException 4 + from sqlalchemy import select 5 + from sqlalchemy.ext.asyncio import AsyncSession 6 + 7 + from backend._internal import Session as AuthSession 8 + from backend._internal import require_auth 9 + from backend._internal.atproto.records import update_list_record 10 + from backend.api.albums import invalidate_album_cache 11 + from backend.config import settings 12 + from backend.models import Album, UserPreferences, get_db 13 + 14 + from .router import router 15 + from .schemas import ReorderRequest, ReorderResponse 16 + 17 + 18 + @router.put("/liked/reorder") 19 + async def reorder_liked_list( 20 + body: ReorderRequest, 21 + session: AuthSession = Depends(require_auth), 22 + db: AsyncSession = Depends(get_db), 23 + ) -> ReorderResponse: 24 + """reorder items in the user's liked tracks list. 25 + 26 + the items array order becomes the new display order. 27 + only the list owner can reorder their own list. 28 + """ 29 + # get the user's liked list URI from preferences 30 + prefs_result = await db.execute( 31 + select(UserPreferences).where(UserPreferences.did == session.did) 32 + ) 33 + prefs = prefs_result.scalar_one_or_none() 34 + 35 + if not prefs or not prefs.liked_list_uri: 36 + raise HTTPException( 37 + status_code=404, 38 + detail="liked list not found - try liking a track first", 39 + ) 40 + 41 + # update the list record with new item order 42 + # (update_list_record → make_pds_request handles token refresh internally) 43 + try: 44 + uri, cid = await update_list_record( 45 + auth_session=session, 46 + list_uri=prefs.liked_list_uri, 47 + items=body.items, 48 + list_type="liked", 49 + ) 50 + return ReorderResponse(uri=uri, cid=cid) 51 + except Exception as e: 52 + raise HTTPException( 53 + status_code=500, detail=f"failed to reorder list: {e}" 54 + ) from e 55 + 56 + 57 + @router.put("/{rkey}/reorder") 58 + async def reorder_list( 59 + rkey: str, 60 + body: ReorderRequest, 61 + session: AuthSession = Depends(require_auth), 62 + db: AsyncSession = Depends(get_db), 63 + ) -> ReorderResponse: 64 + """reorder items in a list by rkey. items array order = new display order.""" 65 + # construct the full AT URI 66 + list_uri = f"at://{session.did}/{settings.atproto.list_collection}/{rkey}" 67 + 68 + # update the list record with new item order 69 + # (update_list_record → make_pds_request handles token refresh internally) 70 + try: 71 + uri, cid = await update_list_record( 72 + auth_session=session, 73 + list_uri=list_uri, 74 + items=body.items, 75 + ) 76 + 77 + # invalidate album cache if this list belongs to an album 78 + result = await db.execute( 79 + select(Album).where(Album.atproto_record_uri == list_uri) 80 + ) 81 + if album := result.scalar_one_or_none(): 82 + await invalidate_album_cache(session.handle, album.slug) 83 + 84 + return ReorderResponse(uri=uri, cid=cid) 85 + except Exception as e: 86 + raise HTTPException( 87 + status_code=500, detail=f"failed to reorder list: {e}" 88 + ) from e
+51
backend/src/backend/api/lists/resolver.py
··· 1 + """generic list resolver endpoint.""" 2 + 3 + from typing import Annotated, Literal 4 + 5 + from fastapi import Depends, HTTPException, Query 6 + from pydantic import BaseModel 7 + from sqlalchemy import select 8 + from sqlalchemy.ext.asyncio import AsyncSession 9 + 10 + from backend.models import Album, Artist, Playlist, get_db 11 + 12 + from .router import router 13 + 14 + # --- generic list resolver --- 15 + 16 + 17 + class ListByUriResponse(BaseModel): 18 + """resolved list type and routing info for an AT-URI.""" 19 + 20 + type: Literal["album", "playlist"] 21 + id: str 22 + handle: str | None = None 23 + slug: str | None = None 24 + 25 + 26 + @router.get("/by-uri", response_model=ListByUriResponse) 27 + async def resolve_list_by_uri( 28 + uri: Annotated[str, Query(description="AT-URI of a list record")], 29 + db: AsyncSession = Depends(get_db), 30 + ) -> ListByUriResponse: 31 + """resolve a list AT-URI to its type (album or playlist) with routing info.""" 32 + # check albums first 33 + result = await db.execute( 34 + select(Album, Artist) 35 + .join(Artist, Album.artist_did == Artist.did) 36 + .where(Album.atproto_record_uri == uri) 37 + ) 38 + if row := result.first(): 39 + album, artist = row 40 + return ListByUriResponse( 41 + type="album", id=album.id, handle=artist.handle, slug=album.slug 42 + ) 43 + 44 + # check playlists 45 + result = await db.execute( 46 + select(Playlist).where(Playlist.atproto_record_uri == uri) 47 + ) 48 + if playlist := result.scalar_one_or_none(): 49 + return ListByUriResponse(type="playlist", id=playlist.id) 50 + 51 + raise HTTPException(status_code=404, detail="list not found")
+7
backend/src/backend/api/lists/router.py
··· 1 + """shared FastAPI router for list endpoints.""" 2 + 3 + from fastapi import APIRouter 4 + 5 + router = APIRouter(prefix="/lists", tags=["lists"]) 6 + 7 + __all__ = ["router"]
+75
backend/src/backend/api/lists/schemas.py
··· 1 + """pydantic schemas for list endpoints.""" 2 + 3 + from pydantic import BaseModel 4 + 5 + from backend.schemas import TrackResponse 6 + 7 + # --- playlist schemas --- 8 + 9 + 10 + class CreatePlaylistRequest(BaseModel): 11 + """request body for creating a playlist.""" 12 + 13 + name: str 14 + """display name for the playlist.""" 15 + 16 + 17 + class PlaylistResponse(BaseModel): 18 + """playlist metadata response.""" 19 + 20 + id: str 21 + name: str 22 + owner_did: str 23 + owner_handle: str 24 + track_count: int 25 + image_url: str | None 26 + show_on_profile: bool 27 + atproto_record_uri: str 28 + created_at: str 29 + 30 + 31 + class PlaylistWithTracksResponse(PlaylistResponse): 32 + """playlist with full track details.""" 33 + 34 + tracks: list[TrackResponse] 35 + """ordered list of track details.""" 36 + 37 + 38 + class AddTrackRequest(BaseModel): 39 + """request body for adding a track to a playlist.""" 40 + 41 + track_uri: str 42 + """ATProto URI of the track to add.""" 43 + track_cid: str 44 + """CID of the track to add.""" 45 + 46 + 47 + class ReorderRequest(BaseModel): 48 + """request body for reordering list items.""" 49 + 50 + items: list[dict[str, str]] 51 + """ordered array of strongRefs (uri + cid). array order = display order.""" 52 + 53 + 54 + class ReorderResponse(BaseModel): 55 + """response from reorder operation.""" 56 + 57 + uri: str 58 + cid: str 59 + 60 + 61 + class RecommendedTrack(BaseModel): 62 + """a recommended track for a playlist.""" 63 + 64 + id: int 65 + title: str 66 + artist_handle: str 67 + artist_display_name: str 68 + image_url: str | None 69 + 70 + 71 + class PlaylistRecommendationsResponse(BaseModel): 72 + """response for playlist recommendations.""" 73 + 74 + tracks: list[RecommendedTrack] 75 + available: bool
+1 -1
backend/tests/_internal/test_album_cache.py
··· 130 130 broken_redis.delete = AsyncMock(side_effect=ConnectionError("redis down")) 131 131 132 132 with patch( 133 - "backend.api.albums.get_async_redis_client", 133 + "backend.api.albums.cache.get_async_redis_client", 134 134 return_value=broken_redis, 135 135 ): 136 136 # should not raise
+7 -7
backend/tests/api/test_albums.py
··· 535 535 } 536 536 537 537 with patch( 538 - "backend._internal.atproto.records.get_record_public_resilient", 538 + "backend.api.albums.listing.get_record_public_resilient", 539 539 new_callable=AsyncMock, 540 540 return_value=(mock_record, None), 541 541 ): ··· 674 674 # mock ATProto update_record for tracks and list 675 675 with ( 676 676 patch( 677 - "backend._internal.atproto.records.fm_plyr.track.update_record", 677 + "backend.api.albums.mutations.update_record", 678 678 new_callable=AsyncMock, 679 679 return_value=("at://did:test:user123/fm.plyr.track/track123", "new_cid"), 680 680 ) as mock_track_update, 681 681 patch( 682 - "backend._internal.atproto.records.fm_plyr.list.update_list_record", 682 + "backend.api.albums.mutations.update_list_record", 683 683 new_callable=AsyncMock, 684 684 return_value=( 685 685 "at://did:test:user123/fm.plyr.dev.list/album123", ··· 1074 1074 ) 1075 1075 1076 1076 with patch( 1077 - "backend._internal.atproto.records.fm_plyr.list.upsert_album_list_record", 1077 + "backend.api.albums.mutations.upsert_album_list_record", 1078 1078 side_effect=fake_upsert, 1079 1079 ): 1080 1080 async with AsyncClient( ··· 1273 1273 return (f"at://did:test:user123/fm.plyr.list/{album_id}", "cid-finalize") 1274 1274 1275 1275 with patch( 1276 - "backend._internal.atproto.records.fm_plyr.list.upsert_album_list_record", 1276 + "backend.api.albums.mutations.upsert_album_list_record", 1277 1277 side_effect=fake_upsert, 1278 1278 ): 1279 1279 async with AsyncClient( ··· 1472 1472 1473 1473 with ( 1474 1474 patch( 1475 - "backend._internal.atproto.records.get_record_public_resilient", 1475 + "backend.api.albums.mutations.get_record_public_resilient", 1476 1476 new_callable=AsyncMock, 1477 1477 return_value=(existing_list_record, None), 1478 1478 ), 1479 1479 patch( 1480 - "backend._internal.atproto.records.fm_plyr.list.upsert_album_list_record", 1480 + "backend.api.albums.mutations.upsert_album_list_record", 1481 1481 side_effect=fake_upsert, 1482 1482 ), 1483 1483 ):
+3 -3
backend/tests/api/test_playlist_by_uri.py
··· 87 87 """lookup existing playlist by AT-URI returns 200.""" 88 88 mock_record = {"value": {"items": []}} 89 89 with patch( 90 - "backend.api.lists.get_record_public_resilient", 90 + "backend.api.lists.playlists.get_record_public_resilient", 91 91 new_callable=AsyncMock, 92 92 return_value=(mock_record, None), 93 93 ): ··· 128 128 ) -> None: 129 129 """playlist exists in DB but PDS record is gone — returns 404 not 500.""" 130 130 with patch( 131 - "backend.api.lists.get_record_public_resilient", 131 + "backend.api.lists.playlists.get_record_public_resilient", 132 132 new_callable=AsyncMock, 133 133 side_effect=RecordNotFound("record not found"), 134 134 ): ··· 149 149 ) -> None: 150 150 """playlist exists in DB but PDS record is gone — returns 404 not 500.""" 151 151 with patch( 152 - "backend.api.lists.get_record_public_resilient", 152 + "backend.api.lists.playlists.get_record_public_resilient", 153 153 new_callable=AsyncMock, 154 154 side_effect=RecordNotFound("record not found"), 155 155 ):
+2 -2
backend/tests/api/test_playlist_liked_state.py
··· 168 168 test_app.dependency_overrides[get_optional_session] = _override_session 169 169 170 170 with patch( 171 - "backend.api.lists.get_record_public_resilient", 171 + "backend.api.lists.playlists.get_record_public_resilient", 172 172 new_callable=AsyncMock, 173 173 return_value=(mock_record_data, None), 174 174 ): ··· 231 231 test_app.dependency_overrides[get_optional_session] = _override_no_session 232 232 233 233 with patch( 234 - "backend.api.lists.get_record_public_resilient", 234 + "backend.api.lists.playlists.get_record_public_resilient", 235 235 new_callable=AsyncMock, 236 236 return_value=(mock_record_data, None), 237 237 ):
+65 -10
backend/tests/test_jetstream.py
··· 15 15 _write_tombstone, 16 16 ingest_comment_create, 17 17 ingest_comment_delete, 18 - ingest_handle_update, 18 + ingest_identity_update, 19 19 ingest_like_create, 20 20 ingest_like_delete, 21 21 ingest_list_create, ··· 1562 1562 assert result.scalar_one() is not None 1563 1563 1564 1564 1565 - # --- handle update tests --- 1565 + # --- identity update tests --- 1566 + 1567 + 1568 + def _mock_did_resolver(pds_url: str = "https://pds.example.com") -> AsyncMock: 1569 + """create a mock AsyncDidResolver that returns the given PDS URL.""" 1570 + mock_resolver = AsyncMock() 1571 + mock_data = MagicMock() 1572 + mock_data.pds = pds_url 1573 + mock_resolver.resolve_atproto_data = AsyncMock(return_value=mock_data) 1574 + return mock_resolver 1566 1575 1567 1576 1568 - class TestIngestHandleUpdate: 1577 + class TestIngestIdentityUpdate: 1569 1578 async def test_updates_artist_handle( 1570 1579 self, db_session: AsyncSession, artist: Artist 1571 1580 ) -> None: 1572 1581 new_handle = "updated.handle.example" 1573 - await ingest_handle_update(did=artist.did, handle=new_handle) 1582 + with patch( 1583 + "atproto_identity.did.resolver.AsyncDidResolver", 1584 + return_value=_mock_did_resolver(), 1585 + ): 1586 + await ingest_identity_update(did=artist.did, handle=new_handle) 1574 1587 1575 1588 await db_session.refresh(artist) 1576 1589 assert artist.handle == new_handle ··· 1588 1601 await db_session.commit() 1589 1602 1590 1603 new_handle = "updated.handle.example" 1591 - await ingest_handle_update(did=artist.did, handle=new_handle) 1604 + with patch( 1605 + "atproto_identity.did.resolver.AsyncDidResolver", 1606 + return_value=_mock_did_resolver(), 1607 + ): 1608 + await ingest_identity_update(did=artist.did, handle=new_handle) 1592 1609 1593 1610 await db_session.refresh(session) 1594 1611 assert session.handle == new_handle 1595 1612 1596 - async def test_noop_when_handle_unchanged( 1613 + async def test_updates_pds_url( 1614 + self, db_session: AsyncSession, artist: Artist 1615 + ) -> None: 1616 + """PDS migration updates the cached pds_url via DID resolution.""" 1617 + new_pds = "https://new-pds.example.com" 1618 + with patch( 1619 + "atproto_identity.did.resolver.AsyncDidResolver", 1620 + return_value=_mock_did_resolver(pds_url=new_pds), 1621 + ): 1622 + await ingest_identity_update(did=artist.did, handle=artist.handle) 1623 + 1624 + await db_session.refresh(artist) 1625 + assert artist.pds_url == new_pds 1626 + 1627 + async def test_noop_when_handle_and_pds_unchanged( 1597 1628 self, db_session: AsyncSession, artist: Artist 1598 1629 ) -> None: 1599 - """no commit when handle already matches — idempotent.""" 1630 + """no commit when nothing changed — idempotent.""" 1600 1631 original_handle = artist.handle 1601 - await ingest_handle_update(did=artist.did, handle=original_handle) 1632 + original_pds = artist.pds_url 1633 + with patch( 1634 + "atproto_identity.did.resolver.AsyncDidResolver", 1635 + return_value=_mock_did_resolver(pds_url=original_pds or ""), 1636 + ): 1637 + await ingest_identity_update(did=artist.did, handle=original_handle) 1602 1638 1603 1639 await db_session.refresh(artist) 1604 1640 assert artist.handle == original_handle 1641 + assert artist.pds_url == original_pds 1605 1642 1606 1643 async def test_noop_for_unknown_did(self, db_session: AsyncSession) -> None: 1607 - """unknown DID is silently skipped.""" 1608 - await ingest_handle_update(did="did:plc:nonexistent", handle="ghost.handle") 1644 + """unknown DID is silently skipped (no DID resolution attempted).""" 1645 + await ingest_identity_update(did="did:plc:nonexistent", handle="ghost.handle") 1646 + 1647 + async def test_did_resolution_failure_still_updates_handle( 1648 + self, db_session: AsyncSession, artist: Artist 1649 + ) -> None: 1650 + """if DID resolution fails, handle is still updated.""" 1651 + new_handle = "updated.handle.example" 1652 + mock_resolver = AsyncMock() 1653 + mock_resolver.resolve_atproto_data = AsyncMock( 1654 + side_effect=Exception("resolution failed") 1655 + ) 1656 + with patch( 1657 + "atproto_identity.did.resolver.AsyncDidResolver", 1658 + return_value=mock_resolver, 1659 + ): 1660 + await ingest_identity_update(did=artist.did, handle=new_handle) 1661 + 1662 + await db_session.refresh(artist) 1663 + assert artist.handle == new_handle
+10 -10
loq.toml
··· 27 27 max_lines = 896 28 28 29 29 [[rules]] 30 - path = "backend/src/backend/api/albums.py" 31 - max_lines = 995 32 - 33 - [[rules]] 34 30 path = "backend/src/backend/api/auth.py" 35 31 max_lines = 807 36 - 37 - [[rules]] 38 - path = "backend/src/backend/api/lists.py" 39 - max_lines = 1149 40 32 41 33 [[rules]] 42 34 path = "backend/src/backend/api/tracks/listing.py" ··· 220 212 221 213 [[rules]] 222 214 path = "backend/src/backend/_internal/tasks/ingest.py" 223 - max_lines = 743 215 + max_lines = 765 224 216 225 217 [[rules]] 226 218 path = "backend/tests/test_jetstream.py" 227 - max_lines = 1608 219 + max_lines = 1663 228 220 229 221 [[rules]] 230 222 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte" ··· 253 245 [[rules]] 254 246 path = "frontend/src/routes/record/+page.svelte" 255 247 max_lines = 701 248 + 249 + [[rules]] 250 + path = "backend/src/backend/api/albums/mutations.py" 251 + max_lines = 574 252 + 253 + [[rules]] 254 + path = "backend/src/backend/api/lists/playlists.py" 255 + max_lines = 952