audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: catch deadlock on concurrent ingest deletes (#1073)

when the API delete and Jetstream's ingest delete race on the same
record, the FK cascade to track_tags can deadlock. since the API
transaction handles the actual delete, the ingest side can safely
swallow the OperationalError. observed during Jetstream staging
smoketest (integration test run #22979847609).

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6
and committed by
GitHub
3a642624 df186f5b

+75 -37
+50 -35
backend/src/backend/_internal/tasks/ingest.py
··· 13 13 import logfire 14 14 from docket import ConcurrencyLimit, ExponentialRetry 15 15 from sqlalchemy import delete, select, update 16 - from sqlalchemy.exc import IntegrityError 16 + from sqlalchemy.exc import IntegrityError, OperationalError 17 17 18 18 from backend._internal.atproto.client import pds_blob_url 19 19 from backend._internal.tasks.hooks import run_post_track_create_hooks ··· 259 259 retry: ExponentialRetry = _INGEST_RETRY, 260 260 ) -> None: 261 261 """delete a track by its AT URI.""" 262 - async with db_session() as db: 263 - result = await db.execute(delete(Track).where(Track.atproto_record_uri == uri)) 264 - if result.rowcount: # type: ignore[union-attr] 265 - await db.commit() 266 - logfire.info("ingest: track deleted", uri=uri, artist_did=did) 267 - else: 268 - logger.debug("ingest_track_delete: track %s not found", uri) 262 + try: 263 + async with db_session() as db: 264 + result = await db.execute( 265 + delete(Track).where(Track.atproto_record_uri == uri) 266 + ) 267 + if result.rowcount: # type: ignore[union-attr] 268 + await db.commit() 269 + logfire.info("ingest: track deleted", uri=uri, artist_did=did) 270 + else: 271 + logger.debug("ingest_track_delete: track %s not found", uri) 272 + except OperationalError: 273 + # deadlock with the API delete — the other transaction will handle it 274 + logger.debug("ingest_track_delete: deadlock on %s, skipping", uri) 269 275 270 276 271 277 # --- like tasks --- ··· 328 334 retry: ExponentialRetry = _INGEST_RETRY, 329 335 ) -> None: 330 336 """delete a like by its AT URI.""" 331 - async with db_session() as db: 332 - result = await db.execute( 333 - delete(TrackLike).where(TrackLike.atproto_like_uri == uri) 334 - ) 335 - if result.rowcount: # type: ignore[union-attr] 336 - await db.commit() 337 - logfire.info("ingest: like deleted", uri=uri) 338 - else: 339 - logger.debug("ingest_like_delete: like %s not found", uri) 337 + try: 338 + async with db_session() as db: 339 + result = await db.execute( 340 + delete(TrackLike).where(TrackLike.atproto_like_uri == uri) 341 + ) 342 + if result.rowcount: # type: ignore[union-attr] 343 + await db.commit() 344 + logfire.info("ingest: like deleted", uri=uri) 345 + else: 346 + logger.debug("ingest_like_delete: like %s not found", uri) 347 + except OperationalError: 348 + logger.debug("ingest_like_delete: deadlock on %s, skipping", uri) 340 349 341 350 342 351 # --- comment tasks --- ··· 428 437 retry: ExponentialRetry = _INGEST_RETRY, 429 438 ) -> None: 430 439 """delete a comment by its AT URI.""" 431 - async with db_session() as db: 432 - result = await db.execute( 433 - delete(TrackComment).where(TrackComment.atproto_comment_uri == uri) 434 - ) 435 - if result.rowcount: # type: ignore[union-attr] 436 - await db.commit() 437 - logfire.info("ingest: comment deleted", uri=uri) 438 - else: 439 - logger.debug("ingest_comment_delete: comment %s not found", uri) 440 + try: 441 + async with db_session() as db: 442 + result = await db.execute( 443 + delete(TrackComment).where(TrackComment.atproto_comment_uri == uri) 444 + ) 445 + if result.rowcount: # type: ignore[union-attr] 446 + await db.commit() 447 + logfire.info("ingest: comment deleted", uri=uri) 448 + else: 449 + logger.debug("ingest_comment_delete: comment %s not found", uri) 450 + except OperationalError: 451 + logger.debug("ingest_comment_delete: deadlock on %s, skipping", uri) 440 452 441 453 442 454 # --- list (playlist) tasks --- ··· 531 543 retry: ExponentialRetry = _INGEST_RETRY, 532 544 ) -> None: 533 545 """delete a playlist by its AT URI.""" 534 - async with db_session() as db: 535 - result = await db.execute( 536 - delete(Playlist).where(Playlist.atproto_record_uri == uri) 537 - ) 538 - if result.rowcount: # type: ignore[union-attr] 539 - await db.commit() 540 - logfire.info("ingest: playlist deleted", uri=uri) 541 - else: 542 - logger.debug("ingest_list_delete: playlist %s not found", uri) 546 + try: 547 + async with db_session() as db: 548 + result = await db.execute( 549 + delete(Playlist).where(Playlist.atproto_record_uri == uri) 550 + ) 551 + if result.rowcount: # type: ignore[union-attr] 552 + await db.commit() 553 + logfire.info("ingest: playlist deleted", uri=uri) 554 + else: 555 + logger.debug("ingest_list_delete: playlist %s not found", uri) 556 + except OperationalError: 557 + logger.debug("ingest_list_delete: deadlock on %s, skipping", uri) 543 558 544 559 545 560 # --- profile task ---
+23
backend/tests/test_jetstream.py
··· 611 611 ) 612 612 assert result.scalar_one_or_none() is None 613 613 614 + async def test_deadlock_does_not_raise(self, artist: Artist) -> None: 615 + """deadlock from concurrent API + Jetstream delete is swallowed.""" 616 + from sqlalchemy.exc import OperationalError 617 + 618 + with patch( 619 + "backend._internal.tasks.ingest.db_session", 620 + ) as mock_ctx: 621 + mock_db = AsyncMock() 622 + mock_db.execute = AsyncMock( 623 + side_effect=OperationalError( 624 + "deadlock", {}, Exception("deadlock detected") 625 + ) 626 + ) 627 + mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db) 628 + mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False) 629 + 630 + # should not raise 631 + await ingest_track_delete( 632 + did=artist.did, 633 + rkey="rkey", 634 + uri="at://did:plc:test/fm.plyr.track/deadlocked", 635 + ) 636 + 614 637 615 638 class TestIngestTrackUpdate: 616 639 async def test_updates_mutable_fields(
+2 -2
loq.toml
··· 219 219 220 220 [[rules]] 221 221 path = "backend/src/backend/_internal/tasks/ingest.py" 222 - max_lines = 570 222 + max_lines = 582 223 223 224 224 [[rules]] 225 225 path = "backend/tests/test_jetstream.py" 226 - max_lines = 1270 226 + max_lines = 1290 227 227 228 228 [[rules]] 229 229 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte"