audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: move image moderation to background task + eliminate R2 delete spray (#1266)

* fix: move image moderation to background task + eliminate R2 delete spray

Two perf fixes for track/album image edits, observed via Logfire traces
on a real user session (annamist.com editing track 862):

1. Image moderation scan moved to docket background task (~6s saved).
The moderation service (claude vision on Fly.io) was awaited inline
during PATCH /tracks/{id}, blocking the response for 3-6s + cold
start penalty. The scan only flags and notifies — it never gates the
response — so there's no reason to block on it. New docket task
scan_image_moderation fetches the image from its R2 URL and scans
asynchronously, matching how copyright audio scans already work.

2. R2 image delete now uses the known URL extension (~1.3s saved).
storage.delete() with no file_type falls through to trying every
AudioFormat extension (7-8 HEAD requests) then every ImageFormat
extension (5 HEAD requests) sequentially. New delete_image() method
parses the extension from the image URL and hits the correct key on
the first try. Falls back to the spray only if URL parsing fails.

Combined: a track image edit that was taking ~8.9s should now take
~1s (R2 upload + thumbnail + DB update + ATProto sync).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: mock delete_image alongside delete in track deletion tests

CI caught that test_track_deletion_deletes_unshared_image only mocked
storage.delete but the code now calls storage.delete_image for images
that have a URL. Add the delete_image mock to all three deletion test
cases so they capture both audio and image delete calls in the same
delete_calls list.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6 (1M context)
and committed by
GitHub
05fb4d93 43ca2fcc

+188 -23
+17 -13
backend/src/backend/_internal/image_uploads.py
··· 8 8 from fastapi import HTTPException 9 9 from starlette.datastructures import UploadFile 10 10 11 - from backend._internal.clients.moderation import get_moderation_client 12 11 from backend._internal.image import ImageFormat 13 - from backend._internal.notifications import notification_service 14 12 from backend._internal.thumbnails import generate_and_save 15 13 from backend.config import settings 16 14 from backend.storage import storage ··· 98 96 image_url = storage.build_image_url(image_id, ext) 99 97 thumbnail_url = await generate_and_save(bytes(image_data), image_id, entity_type) 100 98 101 - # moderation scanning (non-blocking — flags but doesn't reject) 99 + # moderation scanning — dispatched as a docket background task so the 100 + # HTTP response isn't blocked on the moderation service round trip 101 + # (~3-6s for claude vision + possible cold-start penalty). the scan 102 + # only flags and notifies; it never gates the response. 102 103 if scan_moderation and settings.moderation.image_moderation_enabled: 103 104 try: 104 - client = get_moderation_client() 105 + from backend._internal.tasks.moderation import ( 106 + schedule_image_moderation_scan, 107 + ) 108 + 105 109 content_type = image_format.media_type if image_format else "image/png" 106 - result = await client.scan_image(bytes(image_data), image_id, content_type) 107 - if not result.is_safe: 108 - await notification_service.send_image_flag_notification( 109 - image_id=image_id, 110 - severity=result.severity, 111 - categories=result.violated_categories, 112 - context=f"{entity_type} cover", 113 - ) 110 + await schedule_image_moderation_scan( 111 + image_id=image_id, 112 + image_url=image_url, 113 + content_type=content_type, 114 + entity_type=entity_type, 115 + ) 114 116 except Exception as e: 115 - logger.warning("image moderation failed for %s: %s", image_id, e) 117 + logger.warning( 118 + "failed to schedule image moderation for %s: %s", image_id, e 119 + ) 116 120 117 121 return ImageUploadResult( 118 122 image_id=image_id,
+7
backend/src/backend/_internal/tasks/__init__.py
··· 24 24 invalidate_tracks_discovery_cache, 25 25 run_post_track_create_hooks, 26 26 ) 27 + from backend._internal.tasks.moderation import ( 28 + scan_image_moderation, 29 + schedule_image_moderation_scan, 30 + ) 27 31 from backend._internal.tasks.ingest import ( 28 32 SubjectNotFoundError, 29 33 ingest_comment_create, ··· 104 108 ingest_list_update, 105 109 ingest_list_delete, 106 110 ingest_profile_update, 111 + scan_image_moderation, 107 112 ] 108 113 109 114 ··· 151 156 "pds_update_comment", 152 157 "run_post_track_create_hooks", 153 158 "scan_copyright", 159 + "scan_image_moderation", 154 160 "schedule_album_list_sync", 155 161 "schedule_atproto_sync", 156 162 "schedule_copyright_resolution_sync", ··· 158 164 "schedule_embedding_generation", 159 165 "schedule_follow_graph_warm", 160 166 "schedule_genre_classification", 167 + "schedule_image_moderation_scan", 161 168 "schedule_move_track_audio", 162 169 "schedule_pds_create_comment", 163 170 "schedule_pds_create_like",
+75
backend/src/backend/_internal/tasks/moderation.py
··· 1 + """background tasks for image moderation. 2 + 3 + image moderation was previously awaited inline during track/album PATCH 4 + requests, blocking the HTTP response for ~3-6s while the moderation 5 + service ran claude vision. moving to a docket task makes the response 6 + instantaneous while still flagging unsafe images. 7 + """ 8 + 9 + import logging 10 + 11 + import httpx 12 + import logfire 13 + 14 + from backend._internal.background import get_docket 15 + from backend._internal.clients.moderation import get_moderation_client 16 + from backend._internal.notifications import notification_service 17 + 18 + logger = logging.getLogger(__name__) 19 + 20 + 21 + async def scan_image_moderation( 22 + image_id: str, 23 + image_url: str, 24 + content_type: str, 25 + entity_type: str, 26 + ) -> None: 27 + """download the image from R2 and scan it for policy violations. 28 + 29 + called as a docket background task after the image has been stored. 30 + fetches the image bytes from its public URL (already on R2 CDN) and 31 + sends them to the moderation service. if the image is flagged, fires 32 + a notification — same behavior as the old inline path, just not 33 + blocking the user's request. 34 + """ 35 + try: 36 + # fetch the image bytes from R2 (public URL, no auth needed) 37 + async with httpx.AsyncClient(timeout=httpx.Timeout(15.0)) as http: 38 + response = await http.get(image_url) 39 + response.raise_for_status() 40 + image_bytes = response.content 41 + 42 + client = get_moderation_client() 43 + result = await client.scan_image(image_bytes, image_id, content_type) 44 + 45 + if not result.is_safe: 46 + await notification_service.send_image_flag_notification( 47 + image_id=image_id, 48 + severity=result.severity, 49 + categories=result.violated_categories, 50 + context=f"{entity_type} cover", 51 + ) 52 + 53 + logfire.info( 54 + "background image moderation complete", 55 + image_id=image_id, 56 + is_safe=result.is_safe, 57 + severity=result.severity, 58 + ) 59 + except Exception as e: 60 + logger.warning("background image moderation failed for %s: %s", image_id, e) 61 + 62 + 63 + async def schedule_image_moderation_scan( 64 + *, 65 + image_id: str, 66 + image_url: str, 67 + content_type: str, 68 + entity_type: str, 69 + ) -> None: 70 + """schedule an image moderation scan via docket.""" 71 + docket = get_docket() 72 + await docket.add(scan_image_moderation)( 73 + image_id, image_url, content_type, entity_type 74 + ) 75 + logfire.info("scheduled image moderation scan", image_id=image_id)
+8 -2
backend/src/backend/api/albums.py
··· 551 551 # delete old image if exists (prevent R2 object leaks) 552 552 if album.image_id and album.image_id != uploaded.image_id: 553 553 with contextlib.suppress(Exception): 554 - await storage.delete(album.image_id) 554 + if album.image_url: 555 + await storage.delete_image(album.image_id, album.image_url) 556 + else: 557 + await storage.delete(album.image_id) 555 558 556 559 # update album with new image 557 560 album.image_id = uploaded.image_id ··· 974 977 # delete cover image from storage if exists 975 978 if album.image_id: 976 979 with contextlib.suppress(Exception): 977 - await storage.delete(album.image_id) 980 + if album.image_url: 981 + await storage.delete_image(album.image_id, album.image_url) 982 + else: 983 + await storage.delete(album.image_id) 978 984 979 985 # capture slug before deletion 980 986 album_slug = album.slug
+4 -1
backend/src/backend/api/lists.py
··· 885 885 # delete old image if exists (prevent R2 object leaks) 886 886 if playlist.image_id and playlist.image_id != uploaded.image_id: 887 887 with contextlib.suppress(Exception): 888 - await storage.delete(playlist.image_id) 888 + if playlist.image_url: 889 + await storage.delete_image(playlist.image_id, playlist.image_url) 890 + else: 891 + await storage.delete(playlist.image_id) 889 892 890 893 # update playlist with new image 891 894 playlist.image_id = uploaded.image_id
+12 -3
backend/src/backend/api/tracks/mutations.py
··· 114 114 ) 115 115 if not album_shares_image: 116 116 try: 117 - await storage.delete(track.image_id) 117 + if track.image_url: 118 + await storage.delete_image(track.image_id, track.image_url) 119 + else: 120 + await storage.delete(track.image_id) 118 121 except Exception as e: 119 122 logger.warning( 120 123 f"failed to delete image {track.image_id}: {e}", exc_info=True ··· 255 258 ) 256 259 if not album_shares_image: 257 260 with contextlib.suppress(Exception): 258 - await storage.delete(track.image_id) 261 + if track.image_url: 262 + await storage.delete_image(track.image_id, track.image_url) 263 + else: 264 + await storage.delete(track.image_id) 259 265 track.image_id = None 260 266 track.image_url = None 261 267 track.thumbnail_url = None ··· 272 278 ) 273 279 if not album_shares_image: 274 280 with contextlib.suppress(Exception): 275 - await storage.delete(track.image_id) 281 + if track.image_url: 282 + await storage.delete_image(track.image_id, track.image_url) 283 + else: 284 + await storage.delete(track.image_id) 276 285 277 286 track.image_id = image_id 278 287 track.image_url = image_url
+2
backend/src/backend/storage/protocol.py
··· 37 37 38 38 async def delete(self, file_id: str, file_type: str | None = None) -> bool: ... 39 39 40 + async def delete_image(self, file_id: str, image_url: str) -> bool: ... 41 + 40 42 async def save_gated( 41 43 self, 42 44 file: BinaryIO | BytesIO,
+45
backend/src/backend/storage/r2.py
··· 490 490 ) 491 491 return False 492 492 493 + async def delete_image(self, file_id: str, image_url: str) -> bool: 494 + """delete an image from R2 using its known URL to derive the exact key. 495 + 496 + avoids the format-spray fallback in delete() which tries every audio 497 + format then every image format via sequential HEAD requests (~1.3s of 498 + wasted round trips). the image URL already encodes the correct 499 + extension, so we can build the key directly. 500 + """ 501 + from pathlib import PurePosixPath 502 + 503 + # extract extension from URL: ".../images/abc123.png?..." → ".png" 504 + ext = PurePosixPath(image_url.split("?")[0]).suffix.lower() 505 + if not ext: 506 + logfire.warning( 507 + "delete_image: could not extract extension from URL, falling back", 508 + file_id=file_id, 509 + image_url=image_url, 510 + ) 511 + return await self.delete(file_id) 512 + 513 + key = f"images/{file_id}{ext}" 514 + async with self.async_session.client( 515 + "s3", 516 + endpoint_url=self.endpoint_url, 517 + aws_access_key_id=self.aws_access_key_id, 518 + aws_secret_access_key=self.aws_secret_access_key, 519 + ) as client: 520 + try: 521 + await client.delete_object(Bucket=self.image_bucket_name, Key=key) 522 + logfire.info( 523 + "R2 image deleted (direct)", 524 + file_id=file_id, 525 + key=key, 526 + bucket=self.image_bucket_name, 527 + ) 528 + return True 529 + except client.exceptions.ClientError as e: 530 + logfire.warning( 531 + "R2 delete_image failed, falling back to format scan", 532 + file_id=file_id, 533 + key=key, 534 + error=str(e), 535 + ) 536 + return await self.delete(file_id) 537 + 493 538 async def save_gated( 494 539 self, 495 540 file: BinaryIO | BytesIO,
+14
backend/tests/api/test_track_deletion.py
··· 312 312 async def mock_delete(file_id: str, file_type: str | None = None): 313 313 delete_calls.append(file_id) 314 314 315 + async def mock_delete_image(file_id: str, image_url: str): 316 + delete_calls.append(file_id) 317 + 315 318 with ( 316 319 patch( 317 320 "backend.api.tracks.mutations.storage.delete", 318 321 side_effect=mock_delete, 322 + ), 323 + patch( 324 + "backend.api.tracks.mutations.storage.delete_image", 325 + side_effect=mock_delete_image, 319 326 ), 320 327 patch( 321 328 "backend.api.tracks.mutations.schedule_album_list_sync", ··· 370 377 async def mock_delete(file_id: str, file_type: str | None = None): 371 378 delete_calls.append(file_id) 372 379 380 + async def mock_delete_image(file_id: str, image_url: str): 381 + delete_calls.append(file_id) 382 + 373 383 with ( 374 384 patch( 375 385 "backend.api.tracks.mutations.storage.delete", 376 386 side_effect=mock_delete, 387 + ), 388 + patch( 389 + "backend.api.tracks.mutations.storage.delete_image", 390 + side_effect=mock_delete_image, 377 391 ), 378 392 patch( 379 393 "backend.api.tracks.mutations.schedule_album_list_sync",
+4 -4
loq.toml
··· 28 28 29 29 [[rules]] 30 30 path = "backend/src/backend/api/albums.py" 31 - max_lines = 989 31 + max_lines = 995 32 32 33 33 [[rules]] 34 34 path = "backend/src/backend/api/auth.py" ··· 36 36 37 37 [[rules]] 38 38 path = "backend/src/backend/api/lists.py" 39 - max_lines = 1146 39 + max_lines = 1149 40 40 41 41 [[rules]] 42 42 path = "backend/src/backend/api/tracks/listing.py" ··· 56 56 57 57 [[rules]] 58 58 path = "backend/src/backend/storage/r2.py" 59 - max_lines = 725 59 + max_lines = 766 60 60 61 61 [[rules]] 62 62 path = "backend/tests/api/test_audio.py" ··· 232 232 233 233 [[rules]] 234 234 path = "backend/tests/api/test_track_deletion.py" 235 - max_lines = 505 235 + max_lines = 519 236 236 237 237 [[rules]] 238 238 path = "backend/tests/api/test_top_tracks.py"