audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(upload): avoid session-state race in concurrent album creation (#1334)

12-concurrent uploads targeting the same album (artist_did, slug) raced
in `get_or_create_album`: the losers caught IntegrityError and called
`db.rollback()` on the caller's shared AsyncSession. under concurrent
load this left 2/12 uploads blowing up with MissingGreenlet on the very
next pool checkout, ~300ms after INSERT albums — observed on stg during
the 12-chromatic-drone smoke test (2026-04-24).

replace SELECT-then-INSERT-then-catch with a single
`INSERT ... ON CONFLICT DO NOTHING RETURNING`. the race resolves at the
DB level, no rollback on a shared session, no churn on pool state.

regression test fires 12 concurrent `get_or_create_album` calls on
separate sessions with the same title and asserts exactly 1 row, 1
`created=True`, and all callers agree on the resulting album id.

Co-authored-by: Claude Opus 4 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4 (1M context)
and committed by
GitHub
1a1d8bb9 9ed734c3

+120 -25
+25 -25
backend/src/backend/api/tracks/services.py
··· 1 1 """shared helpers for track routes.""" 2 2 3 3 from sqlalchemy import select 4 - from sqlalchemy.exc import IntegrityError 4 + from sqlalchemy.dialects.postgresql import insert as pg_insert 5 5 from sqlalchemy.ext.asyncio import AsyncSession 6 6 7 7 from backend.models import Album, Artist ··· 18 18 """Fetch or create an album for an artist. 19 19 20 20 Returns (album, created) where created is True if a new album was made. 21 + 22 + Uses INSERT ... ON CONFLICT DO NOTHING RETURNING so a race between 23 + concurrent uploads to the same (artist_did, slug) does not have to 24 + rollback a shared session. The old SELECT-then-INSERT-then-catch pattern 25 + left the caller's AsyncSession in a fragile state under concurrent load 26 + and caused pool-level MissingGreenlet errors mid-upload. 21 27 """ 22 28 slug = slugify(title) 23 - result = await db.execute( 24 - select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 29 + stmt = ( 30 + pg_insert(Album) 31 + .values( 32 + artist_did=artist.did, 33 + slug=slug, 34 + title=title, 35 + description=None, 36 + image_id=image_id, 37 + image_url=image_url, 38 + ) 39 + .on_conflict_do_nothing(index_elements=["artist_did", "slug"]) 40 + .returning(Album) 25 41 ) 42 + result = await db.execute(stmt) 26 43 if album := result.scalar_one_or_none(): 27 - return album, False 44 + return album, True 28 45 29 - album = Album( 30 - artist_did=artist.did, 31 - slug=slug, 32 - title=title, 33 - description=None, 34 - image_id=image_id, 35 - image_url=image_url, 46 + # conflict — a concurrent task won the race. fetch the existing row. 47 + existing = await db.execute( 48 + select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 36 49 ) 37 - db.add(album) 38 - try: 39 - await db.flush() 40 - return album, True 41 - except IntegrityError: 42 - # another request created this album concurrently 43 - await db.rollback() 44 - result = await db.execute( 45 - select(Album).where(Album.artist_did == artist.did, Album.slug == slug) 46 - ) 47 - album = result.scalar_one_or_none() 48 - if not album: 49 - raise 50 - return album, False 50 + return existing.scalar_one(), False 51 51 52 52 53 53 __all__ = ["get_or_create_album"]
+95
backend/tests/api/test_get_or_create_album.py
··· 1 + """regression test for concurrent album creation race. 2 + 3 + history: the old SELECT-then-INSERT-then-catch-IntegrityError pattern in 4 + get_or_create_album left the caller's AsyncSession in a fragile state when 5 + multiple concurrent uploads raced to create the same (artist_did, slug) album. 6 + under load this surfaced as pool-level MissingGreenlet errors mid-upload (2 of 7 + 12 concurrent uploads to the same album title failed on stg, 2026-04-24). 8 + 9 + the replacement uses INSERT ... ON CONFLICT DO NOTHING RETURNING so the race 10 + is resolved at the DB level — no rollback, no session state churn. 11 + """ 12 + 13 + import asyncio 14 + 15 + from sqlalchemy import select 16 + from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession 17 + 18 + from backend.api.tracks.services import get_or_create_album 19 + from backend.models import Album, Artist 20 + 21 + from ..conftest import session_context 22 + 23 + 24 + async def _make_artist(db: AsyncSession, did: str = "did:plc:drone-test") -> Artist: 25 + artist = Artist( 26 + did=did, 27 + handle="drone.test", 28 + display_name="drone test", 29 + ) 30 + db.add(artist) 31 + await db.commit() 32 + await db.refresh(artist) 33 + return artist 34 + 35 + 36 + async def test_get_or_create_album_returns_existing(db_session: AsyncSession) -> None: 37 + """second call with same (artist, title) returns existing album, created=False.""" 38 + artist = await _make_artist(db_session) 39 + 40 + album_a, created_a = await get_or_create_album( 41 + db_session, artist, "chromatic drones", image_id=None, image_url=None 42 + ) 43 + await db_session.commit() 44 + 45 + album_b, created_b = await get_or_create_album( 46 + db_session, artist, "chromatic drones", image_id=None, image_url=None 47 + ) 48 + 49 + assert created_a is True 50 + assert created_b is False 51 + assert album_a.id == album_b.id 52 + 53 + 54 + async def test_concurrent_get_or_create_album_same_title_no_duplicates( 55 + _engine: AsyncEngine, _clear_db: None 56 + ) -> None: 57 + """12 concurrent tasks on SEPARATE sessions all trying to create the same 58 + album must produce exactly one row, and all callers must agree on its id. 59 + 60 + this mirrors the 12-chromatic-drone upload scenario that caused 61 + MissingGreenlet failures in stg. each task uses its own AsyncSession — 62 + sharing a single session across coroutines is separately broken and not 63 + what happens in the real upload path (each upload task opens its own). 64 + """ 65 + # seed artist on a dedicated session so the concurrent tasks can see it 66 + async with session_context(engine=_engine) as seed_db: 67 + artist = await _make_artist(seed_db, did="did:plc:drone-concurrent") 68 + artist_did = artist.did 69 + 70 + async def attempt() -> tuple[str, bool]: 71 + async with session_context(engine=_engine) as db: 72 + row = await db.execute(select(Artist).where(Artist.did == artist_did)) 73 + a = row.scalar_one() 74 + album, created = await get_or_create_album( 75 + db, a, "chromatic drones", image_id=None, image_url=None 76 + ) 77 + await db.commit() 78 + return album.id, created 79 + 80 + results = await asyncio.gather(*(attempt() for _ in range(12))) 81 + 82 + ids = {album_id for album_id, _ in results} 83 + assert len(ids) == 1, f"expected 1 unique album id, got {ids}" 84 + 85 + created_count = sum(1 for _, created in results if created) 86 + assert created_count == 1, ( 87 + f"expected exactly 1 caller to see created=True, got {created_count}" 88 + ) 89 + 90 + async with session_context(engine=_engine) as verify_db: 91 + rows = await verify_db.execute( 92 + select(Album).where(Album.artist_did == artist_did) 93 + ) 94 + albums = rows.scalars().all() 95 + assert len(albums) == 1, f"expected 1 album row, found {len(albums)}"