audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: remove completed one-time migration scripts, add CDN URL backfill (#1280)

Remove 8 scripts for migrations that completed months ago:
- copy_r2_buckets.py (relay → audio-prod, Nov 2025)
- migrate_r2_bucket.py (same with DB updates)
- migrate_images_to_new_buckets.py (audio → images buckets, Nov 2025)
- migrate_sensitive_images.py (Jan 2026)
- backfill_image_urls.py (Nov 2025)
- backfill_atproto_records.py (Nov 2025)
- backfill_avatars.py (Dec 2025)
- backfill_duration.py (Dec 2025)

Add migrate_cdn_urls.py for the r2.dev → custom domain URL migration.
Dry-run by default, auto-detects environment from DATABASE_URL,
updates tracks/albums/playlists URL columns.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6 (1M context)
and committed by
GitHub
7ed276c5 61dd3639

+169 -1291
-202
scripts/backfill_atproto_records.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """backfill ATProto records for tracks missing atproto_record_uri. 3 - 4 - Creates ATProto records on user's PDS for tracks that: 5 - 1. Exist in the database 6 - 2. Have no atproto_record_uri (orphaned/never synced) 7 - 3. Belong to the configured user (ATPROTO_MAIN_HANDLE) 8 - 9 - The script uses the app's namespace configuration (settings.atproto.track_collection) 10 - to create records in the correct namespace for the current environment. 11 - 12 - ## Prerequisites 13 - 14 - Set credentials in .env: 15 - ```bash 16 - ATPROTO_MAIN_HANDLE=your.handle 17 - ATPROTO_MAIN_PASSWORD=your-app-password 18 - DATABASE_URL=postgresql://... # target database 19 - ``` 20 - 21 - ## Usage 22 - 23 - ```bash 24 - uv run scripts/backfill_atproto_records.py 25 - ``` 26 - 27 - The script will: 28 - 1. Resolve user's PDS URL from handle/DID 29 - 2. Query database for tracks without atproto_record_uri 30 - 3. Create ATProto records on PDS using configured namespace 31 - 4. Update database with new URIs and CIDs 32 - 33 - ## Verification 34 - 35 - After running, verify success: 36 - ```bash 37 - # check ATProto records on PDS (see docs/tools/pdsx.md) 38 - uvx pdsx --pds <pds-url> -r <handle> ls <namespace> 39 - 40 - # check database (see docs/tools/neon.md) 41 - SELECT COUNT(*) FROM tracks WHERE atproto_record_uri IS NOT NULL; 42 - ``` 43 - 44 - ## References 45 - 46 - - Database queries: docs/tools/neon.md 47 - - PDS inspection: docs/tools/pdsx.md 48 - - ATProto records: src/backend/_internal/atproto/records.py 49 - """ 50 - 51 - import asyncio 52 - from datetime import UTC, datetime 53 - 54 - from atproto import AsyncClient 55 - from atproto_identity.resolver import AsyncIdResolver 56 - from pydantic import Field 57 - from pydantic_settings import BaseSettings, SettingsConfigDict 58 - from sqlalchemy import select 59 - 60 - from backend.config import settings as app_settings 61 - from backend.models import Artist, Track, db_session 62 - 63 - 64 - class BackfillSettings(BaseSettings): 65 - """settings for backfill script.""" 66 - 67 - model_config = SettingsConfigDict( 68 - env_file=".env", 69 - case_sensitive=False, 70 - extra="ignore", 71 - ) 72 - 73 - handle: str = Field(validation_alias="ATPROTO_MAIN_HANDLE") 74 - password: str = Field(validation_alias="ATPROTO_MAIN_PASSWORD") 75 - 76 - 77 - async def main(): 78 - """backfill ATProto records for orphaned tracks.""" 79 - settings = BackfillSettings() 80 - 81 - # resolve PDS from handle 82 - print(f"resolving PDS for {settings.handle}...") 83 - resolver = AsyncIdResolver() 84 - 85 - # first resolve handle to DID 86 - user_did = await resolver.handle.resolve(settings.handle) 87 - print(f"resolved DID: {user_did}") 88 - 89 - # then get PDS URL from DID document 90 - did_doc = await resolver.did.resolve(user_did) 91 - pds_url = None 92 - for service in did_doc.service: 93 - if service.type == "AtprotoPersonalDataServer": 94 - pds_url = service.service_endpoint 95 - break 96 - 97 - if not pds_url: 98 - raise ValueError(f"no PDS found for {settings.handle}") 99 - 100 - print(f"using PDS: {pds_url}") 101 - 102 - # create atproto client with correct PDS 103 - client = AsyncClient(base_url=pds_url) 104 - await client.login(settings.handle, settings.password) 105 - 106 - print(f"logged in as {settings.handle} (DID: {user_did})") 107 - 108 - # fetch tracks that need backfilling 109 - async with db_session() as db: 110 - stmt = ( 111 - select(Track) 112 - .join(Artist) 113 - .where(Track.artist_did == user_did) 114 - .where(Track.atproto_record_uri.is_(None)) 115 - .order_by(Track.id) 116 - ) 117 - result = await db.execute(stmt) 118 - tracks = result.scalars().all() 119 - 120 - # eagerly load artist for each track 121 - for track in tracks: 122 - await db.refresh(track, ["artist"]) 123 - 124 - print(f"found {len(tracks)} tracks to backfill") 125 - 126 - if not tracks: 127 - print("no tracks need backfilling!") 128 - return 129 - 130 - # backfill each track 131 - for track in tracks: 132 - print(f"\nbackfilling track {track.id}: {track.title}") 133 - 134 - # build record 135 - record = { 136 - "$type": app_settings.atproto.track_collection, 137 - "title": track.title, 138 - "artist": track.artist.display_name, 139 - "audioUrl": track.r2_url, 140 - "fileType": track.file_type, 141 - "createdAt": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 142 - } 143 - 144 - # add optional fields 145 - if track.album: 146 - record["album"] = track.album 147 - 148 - if track.features: 149 - # convert to ATProto format 150 - record["features"] = [ 151 - { 152 - "did": f["did"], 153 - "handle": f["handle"], 154 - "displayName": f.get("display_name", f["handle"]), 155 - } 156 - for f in track.features 157 - ] 158 - 159 - if track.image_id: 160 - # manually construct image URL since images table doesn't exist in prod 161 - # try common image formats - in practice these are likely jpg/png 162 - r2_public_url = "https://pub-d4ed8a1e39d44dac85263d86ad5676fd.r2.dev" 163 - # assume jpg for now - can be updated later if needed 164 - record["imageUrl"] = f"{r2_public_url}/images/{track.image_id}.jpg" 165 - 166 - # create record 167 - try: 168 - response = await client.com.atproto.repo.create_record( 169 - { 170 - "repo": user_did, 171 - "collection": app_settings.atproto.track_collection, 172 - "record": record, 173 - } 174 - ) 175 - 176 - record_uri = response.uri 177 - record_cid = response.cid 178 - 179 - print(f" ✓ created record: {record_uri}") 180 - 181 - # update database 182 - async with db_session() as db: 183 - stmt = select(Track).where(Track.id == track.id) 184 - result = await db.execute(stmt) 185 - db_track = result.scalar_one() 186 - 187 - db_track.atproto_record_uri = record_uri 188 - db_track.atproto_record_cid = record_cid 189 - 190 - await db.commit() 191 - 192 - print(" ✓ updated database") 193 - 194 - except Exception as e: 195 - print(f" ✗ failed: {e}") 196 - continue 197 - 198 - print(f"\nbackfilled {len(tracks)} tracks successfully!") 199 - 200 - 201 - if __name__ == "__main__": 202 - asyncio.run(main())
-146
scripts/backfill_avatars.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """backfill avatar_url for all artists from bluesky. 3 - 4 - ## Context 5 - 6 - Avatar URLs were only set at artist creation and never refreshed. This caused 7 - stale/broken avatars throughout the app (likers tooltip, profiles, etc). 8 - 9 - PR #685 added avatar sync on login, but users who don't log in will still have 10 - stale avatars. This script does a one-time refresh of all avatars. 11 - 12 - ## What This Script Does 13 - 14 - 1. Fetches all artists from the database 15 - 2. For each artist, fetches current avatar from Bluesky public API 16 - 3. Updates avatar_url in database if changed 17 - 4. Reports summary of changes 18 - 19 - ## Usage 20 - 21 - ```bash 22 - # dry run (show what would be updated) 23 - uv run scripts/backfill_avatars.py --dry-run 24 - 25 - # actually update the database 26 - uv run scripts/backfill_avatars.py 27 - 28 - # target specific environment 29 - DATABASE_URL=postgresql://... uv run scripts/backfill_avatars.py 30 - ``` 31 - """ 32 - 33 - import asyncio 34 - import logging 35 - import sys 36 - 37 - # scripts are run from backend/ directory via: uv run python ../scripts/backfill_avatars.py 38 - 39 - from sqlalchemy import select 40 - 41 - from backend._internal.atproto.profile import fetch_user_avatar 42 - from backend.models import Artist 43 - from backend.utilities.database import db_session 44 - 45 - logging.basicConfig( 46 - level=logging.INFO, 47 - format="%(asctime)s - %(levelname)s - %(message)s", 48 - ) 49 - logger = logging.getLogger(__name__) 50 - 51 - # rate limit to avoid hammering bluesky API 52 - CONCURRENCY_LIMIT = 5 53 - DELAY_BETWEEN_BATCHES = 0.5 # seconds 54 - 55 - 56 - async def backfill_avatars(dry_run: bool = False) -> None: 57 - """backfill avatar_url for all artists from bluesky.""" 58 - 59 - async with db_session() as db: 60 - result = await db.execute(select(Artist)) 61 - artists = result.scalars().all() 62 - 63 - if not artists: 64 - logger.info("no artists found") 65 - return 66 - 67 - logger.info(f"found {len(artists)} artists to check") 68 - 69 - if dry_run: 70 - logger.info("dry run mode - checking avatars without updating:") 71 - 72 - updated = 0 73 - unchanged = 0 74 - failed = 0 75 - cleared = 0 76 - 77 - # process in batches to rate limit 78 - semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) 79 - 80 - async def process_artist( 81 - artist: Artist, 82 - ) -> tuple[str, str | None, str | None, Exception | None]: 83 - """fetch avatar for artist, return (did, old_url, new_url, error).""" 84 - async with semaphore: 85 - try: 86 - fresh_avatar = await fetch_user_avatar(artist.did) 87 - return (artist.did, artist.avatar_url, fresh_avatar, None) 88 - except Exception as e: 89 - return (artist.did, artist.avatar_url, None, e) 90 - 91 - # fetch all avatars concurrently (with semaphore limiting) 92 - logger.info("fetching avatars from bluesky...") 93 - results = await asyncio.gather(*[process_artist(a) for a in artists]) 94 - 95 - # process results 96 - for did, old_url, new_url, error in results: 97 - artist = next(a for a in artists if a.did == did) 98 - 99 - if error: 100 - failed += 1 101 - logger.warning(f"failed to fetch avatar for {artist.handle}: {error}") 102 - continue 103 - 104 - if old_url == new_url: 105 - unchanged += 1 106 - continue 107 - 108 - if new_url is None and old_url is not None: 109 - cleared += 1 110 - action = "would clear" if dry_run else "clearing" 111 - logger.info( 112 - f"{action} avatar for {artist.handle} (was: {old_url[:50]}...)" 113 - ) 114 - elif new_url is not None and old_url is None: 115 - updated += 1 116 - action = "would set" if dry_run else "setting" 117 - logger.info(f"{action} avatar for {artist.handle}") 118 - else: 119 - updated += 1 120 - action = "would update" if dry_run else "updating" 121 - logger.info(f"{action} avatar for {artist.handle}") 122 - 123 - if not dry_run: 124 - artist.avatar_url = new_url 125 - 126 - if not dry_run: 127 - await db.commit() 128 - 129 - logger.info( 130 - f"backfill complete: {updated} updated, {cleared} cleared, " 131 - f"{unchanged} unchanged, {failed} failed" 132 - ) 133 - 134 - 135 - async def main() -> None: 136 - """main entry point.""" 137 - dry_run = "--dry-run" in sys.argv 138 - 139 - if dry_run: 140 - logger.info("running in DRY RUN mode - no changes will be made") 141 - 142 - await backfill_avatars(dry_run=dry_run) 143 - 144 - 145 - if __name__ == "__main__": 146 - asyncio.run(main())
-243
scripts/backfill_duration.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """backfill duration for tracks missing it. 3 - 4 - ## Context 5 - 6 - Tracks uploaded before duration extraction was implemented have NULL duration. 7 - This affects teal.fm scrobbles which should include duration metadata. 8 - 9 - ## What This Script Does 10 - 11 - 1. Finds all tracks with NULL duration in extra 12 - 2. Downloads audio files from R2 concurrently (semaphore-limited) 13 - 3. Extracts duration using mutagen 14 - 4. Updates database with extracted durations 15 - 16 - ## Usage 17 - 18 - ```bash 19 - # dry run (show what would be updated) 20 - uv run scripts/backfill_duration.py --dry-run 21 - 22 - # actually update the database 23 - uv run scripts/backfill_duration.py 24 - 25 - # limit concurrency (default: 10) 26 - uv run scripts/backfill_duration.py --concurrency 5 27 - 28 - # target specific environment 29 - DATABASE_URL=postgresql://... uv run scripts/backfill_duration.py 30 - ``` 31 - 32 - Run in order: dev → staging → prod 33 - """ 34 - 35 - import asyncio 36 - import io 37 - import logging 38 - import sys 39 - from pathlib import Path 40 - 41 - import httpx 42 - from mutagen import File as MutagenFile 43 - 44 - # add src to path so we can import backend modules 45 - sys.path.insert(0, str(Path(__file__).parent.parent / "backend" / "src")) 46 - 47 - from sqlalchemy import select, update 48 - 49 - from backend.models import Track 50 - from backend.utilities.database import db_session 51 - 52 - logging.basicConfig( 53 - level=logging.INFO, 54 - format="%(asctime)s - %(levelname)s - %(message)s", 55 - ) 56 - logger = logging.getLogger(__name__) 57 - 58 - 59 - def extract_duration_from_bytes(audio_data: bytes) -> int | None: 60 - """extract duration from audio bytes.""" 61 - try: 62 - audio = MutagenFile(io.BytesIO(audio_data)) 63 - if audio is None or audio.info is None: 64 - return None 65 - length = getattr(audio.info, "length", None) 66 - return int(length) if length else None 67 - except Exception as e: 68 - logger.warning(f"mutagen error: {e}") 69 - return None 70 - 71 - 72 - async def fetch_and_extract( 73 - client: httpx.AsyncClient, 74 - track: Track, 75 - semaphore: asyncio.Semaphore, 76 - ) -> tuple[int, int | None, str | None]: 77 - """fetch audio from R2 and extract duration. 78 - 79 - returns: (track_id, duration, error) 80 - """ 81 - async with semaphore: 82 - if not track.r2_url: 83 - return (track.id, None, "no r2_url") 84 - 85 - try: 86 - logger.info(f"fetching track {track.id}: {track.title[:40]}...") 87 - response = await client.get(track.r2_url, follow_redirects=True) 88 - response.raise_for_status() 89 - 90 - duration = extract_duration_from_bytes(response.content) 91 - if duration: 92 - logger.info(f" → {duration}s") 93 - return (track.id, duration, None) 94 - else: 95 - return (track.id, None, "could not extract duration") 96 - 97 - except httpx.HTTPStatusError as e: 98 - return (track.id, None, f"HTTP {e.response.status_code}") 99 - except Exception as e: 100 - return (track.id, None, str(e)) 101 - 102 - 103 - async def fetch_and_extract_simple( 104 - client: httpx.AsyncClient, 105 - track_id: int, 106 - title: str, 107 - r2_url: str | None, 108 - semaphore: asyncio.Semaphore, 109 - ) -> tuple[int, int | None, str | None]: 110 - """fetch audio header from R2 and extract duration. 111 - 112 - uses Range request to fetch only first 256KB (enough for metadata). 113 - falls back to full download if range request fails or duration not found. 114 - 115 - returns: (track_id, duration, error) 116 - """ 117 - async with semaphore: 118 - if not r2_url: 119 - return (track_id, None, "no r2_url") 120 - 121 - try: 122 - logger.info(f"fetching track {track_id}: {title[:40]}...") 123 - 124 - # try range request first (256KB should be enough for most formats) 125 - headers = {"Range": "bytes=0-262143"} 126 - response = await client.get(r2_url, headers=headers, follow_redirects=True) 127 - 128 - # 206 = partial content (range worked), 200 = full file (range ignored) 129 - if response.status_code not in (200, 206): 130 - response.raise_for_status() 131 - 132 - duration = extract_duration_from_bytes(response.content) 133 - if duration: 134 - logger.info(f" → {duration}s") 135 - return (track_id, duration, None) 136 - 137 - # if range didn't give us duration, try full file 138 - if response.status_code == 206: 139 - logger.info(" range request didn't work, fetching full file...") 140 - response = await client.get(r2_url, follow_redirects=True) 141 - response.raise_for_status() 142 - duration = extract_duration_from_bytes(response.content) 143 - if duration: 144 - logger.info(f" → {duration}s") 145 - return (track_id, duration, None) 146 - 147 - return (track_id, None, "could not extract duration") 148 - 149 - except httpx.HTTPStatusError as e: 150 - return (track_id, None, f"HTTP {e.response.status_code}") 151 - except Exception as e: 152 - return (track_id, None, str(e)) 153 - 154 - 155 - async def backfill_duration(dry_run: bool = False, concurrency: int = 10) -> None: 156 - """backfill duration for tracks missing it.""" 157 - 158 - # phase 1: query tracks needing backfill, then close connection 159 - track_data: list[tuple[int, str, str | None, dict | None]] = [] 160 - async with db_session() as db: 161 - stmt = select(Track).where( 162 - Track.extra["duration"].astext.is_(None) | ~Track.extra.has_key("duration") 163 - ) 164 - result = await db.execute(stmt) 165 - tracks = list(result.scalars().all()) 166 - 167 - if not tracks: 168 - logger.info("no tracks need duration backfill") 169 - return 170 - 171 - logger.info(f"found {len(tracks)} tracks needing duration backfill") 172 - 173 - if dry_run: 174 - logger.info("dry run mode - tracks that would be updated:") 175 - for track in tracks: 176 - logger.info(f" {track.id}: {track.title} ({track.r2_url})") 177 - return 178 - 179 - # extract plain data before closing session 180 - track_data = [(t.id, t.title, t.r2_url, t.extra) for t in tracks] 181 - 182 - # phase 2: download files and extract durations (no DB connection) 183 - semaphore = asyncio.Semaphore(concurrency) 184 - logger.info( 185 - f"processing {len(track_data)} tracks with concurrency={concurrency}..." 186 - ) 187 - 188 - async with httpx.AsyncClient(timeout=120.0) as client: 189 - results = await asyncio.gather( 190 - *[ 191 - fetch_and_extract_simple(client, tid, title, r2_url, semaphore) 192 - for tid, title, r2_url, _ in track_data 193 - ] 194 - ) 195 - 196 - # build update map 197 - updates: list[tuple[int, dict]] = [] 198 - failed = 0 199 - track_extras = {tid: extra or {} for tid, _, _, extra in track_data} 200 - track_titles = {tid: title for tid, title, _, _ in track_data} 201 - 202 - for track_id, duration, error in results: 203 - if duration: 204 - new_extra = {**track_extras[track_id], "duration": duration} 205 - updates.append((track_id, new_extra)) 206 - else: 207 - failed += 1 208 - logger.warning( 209 - f"failed track {track_id} ({track_titles[track_id]}): {error}" 210 - ) 211 - 212 - if not updates: 213 - logger.info("no updates to commit") 214 - return 215 - 216 - # phase 3: fresh connection to commit updates 217 - logger.info(f"committing {len(updates)} updates...") 218 - async with db_session() as db: 219 - for track_id, new_extra in updates: 220 - stmt = update(Track).where(Track.id == track_id).values(extra=new_extra) 221 - await db.execute(stmt) 222 - await db.commit() 223 - 224 - logger.info(f"backfill complete: {len(updates)} updated, {failed} failed") 225 - 226 - 227 - async def main() -> None: 228 - """main entry point.""" 229 - dry_run = "--dry-run" in sys.argv 230 - 231 - concurrency = 10 232 - for i, arg in enumerate(sys.argv): 233 - if arg == "--concurrency" and i + 1 < len(sys.argv): 234 - concurrency = int(sys.argv[i + 1]) 235 - 236 - if dry_run: 237 - logger.info("DRY RUN mode - no changes will be made") 238 - 239 - await backfill_duration(dry_run=dry_run, concurrency=concurrency) 240 - 241 - 242 - if __name__ == "__main__": 243 - asyncio.run(main())
-137
scripts/backfill_image_urls.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """backfill image_url for tracks with image_id but missing image_url. 3 - 4 - ## Context 5 - 6 - PR #184 added image_url column to tracks table to eliminate N+1 R2 API calls. 7 - New uploads automatically populate image_url at creation time, but 15 legacy 8 - tracks uploaded before the PR still have image_url = NULL. 9 - 10 - This causes slow /tracks/liked endpoint performance because we fall back to 11 - calling track.get_image_url() which hits R2 API on every request. 12 - 13 - ## What This Script Does 14 - 15 - 1. Finds all tracks with image_id but NULL image_url 16 - 2. Computes image_url by calling storage.get_url(image_id) 17 - 3. Updates database with computed URLs 18 - 4. Runs concurrently for performance 19 - 20 - ## Usage 21 - 22 - ```bash 23 - # dry run (show what would be updated) 24 - uv run scripts/backfill_image_urls.py --dry-run 25 - 26 - # actually update the database 27 - uv run scripts/backfill_image_urls.py 28 - 29 - # target specific environment 30 - DATABASE_URL=postgresql://... uv run scripts/backfill_image_urls.py 31 - ``` 32 - """ 33 - 34 - import asyncio 35 - import logging 36 - import sys 37 - from pathlib import Path 38 - 39 - # add src to path so we can import backend modules 40 - sys.path.insert(0, str(Path(__file__).parent.parent / "src")) 41 - 42 - from sqlalchemy import select 43 - 44 - from backend.config import settings 45 - from backend.models import Track 46 - from backend.storage import storage 47 - from backend.storage.r2 import R2Storage 48 - from backend.utilities.database import db_session 49 - 50 - logging.basicConfig( 51 - level=logging.INFO, 52 - format="%(asctime)s - %(levelname)s - %(message)s", 53 - ) 54 - logger = logging.getLogger(__name__) 55 - 56 - 57 - async def backfill_image_urls(dry_run: bool = False) -> None: 58 - """backfill image_url for tracks with image_id but missing image_url.""" 59 - 60 - if not isinstance(storage, R2Storage): 61 - logger.error("storage backend is not R2, cannot compute image URLs") 62 - return 63 - 64 - logger.info(f"storage backend: {settings.storage.backend}") 65 - 66 - async with db_session() as db: 67 - # find tracks with image_id but no image_url 68 - stmt = select(Track).where( 69 - Track.image_id.isnot(None), Track.image_url.is_(None) 70 - ) 71 - result = await db.execute(stmt) 72 - tracks = result.scalars().all() 73 - 74 - if not tracks: 75 - logger.info("no tracks need backfilling") 76 - return 77 - 78 - logger.info(f"found {len(tracks)} tracks needing image_url backfill") 79 - 80 - if dry_run: 81 - logger.info("dry run mode - showing tracks that would be updated:") 82 - for track in tracks: 83 - logger.info( 84 - f" track {track.id}: {track.title} (image_id: {track.image_id})" 85 - ) 86 - return 87 - 88 - # compute image URLs concurrently 89 - logger.info("computing image URLs from R2...") 90 - 91 - async def compute_and_update( 92 - track: Track, 93 - ) -> tuple[int, str | None, Exception | None]: 94 - """compute image_url for a track and return (track_id, url, error).""" 95 - try: 96 - url = await storage.get_url(track.image_id, file_type="image") 97 - return (track.id, url, None) 98 - except Exception as e: 99 - return (track.id, None, e) 100 - 101 - results = await asyncio.gather(*[compute_and_update(t) for t in tracks]) 102 - 103 - # update database with computed URLs 104 - updated = 0 105 - failed = 0 106 - 107 - for track_id, image_url, error in results: 108 - if image_url: 109 - # find the track object 110 - track = next(t for t in tracks if t.id == track_id) 111 - track.image_url = image_url 112 - updated += 1 113 - logger.info(f"updated track {track_id}: {track.title}") 114 - else: 115 - failed += 1 116 - track = next(t for t in tracks if t.id == track_id) 117 - logger.error( 118 - f"failed to compute URL for track {track_id} ({track.title}): {error}" 119 - ) 120 - 121 - await db.commit() 122 - 123 - logger.info(f"backfill complete: {updated} updated, {failed} failed") 124 - 125 - 126 - async def main() -> None: 127 - """main entry point.""" 128 - dry_run = "--dry-run" in sys.argv 129 - 130 - if dry_run: 131 - logger.info("running in DRY RUN mode - no changes will be made") 132 - 133 - await backfill_image_urls(dry_run=dry_run) 134 - 135 - 136 - if __name__ == "__main__": 137 - asyncio.run(main())
-101
scripts/copy_r2_buckets.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """Copy R2 bucket data from old buckets to new buckets.""" 3 - 4 - from pathlib import Path 5 - 6 - import boto3 7 - from pydantic import Field 8 - from pydantic_settings import BaseSettings, SettingsConfigDict 9 - 10 - BASE_DIR = Path(__file__).resolve().parents[1] 11 - 12 - 13 - class R2Settings(BaseSettings): 14 - """R2 credentials from environment.""" 15 - 16 - model_config = SettingsConfigDict( 17 - env_file=str(BASE_DIR / ".env"), 18 - env_file_encoding="utf-8", 19 - extra="ignore", 20 - case_sensitive=False, 21 - ) 22 - 23 - aws_access_key_id: str = Field(validation_alias="AWS_ACCESS_KEY_ID") 24 - aws_secret_access_key: str = Field(validation_alias="AWS_SECRET_ACCESS_KEY") 25 - r2_endpoint_url: str = Field(validation_alias="R2_ENDPOINT_URL") 26 - 27 - 28 - def get_s3_client(): 29 - """Create S3 client for R2.""" 30 - settings = R2Settings() 31 - return boto3.client( 32 - "s3", 33 - endpoint_url=settings.r2_endpoint_url, 34 - aws_access_key_id=settings.aws_access_key_id, 35 - aws_secret_access_key=settings.aws_secret_access_key, 36 - region_name="auto", 37 - ) 38 - 39 - 40 - def copy_bucket(s3_client, source_bucket: str, dest_bucket: str): 41 - """Copy all objects from source bucket to destination bucket.""" 42 - print(f"\n{'=' * 60}") 43 - print(f"Copying from '{source_bucket}' to '{dest_bucket}'") 44 - print(f"{'=' * 60}\n") 45 - 46 - # List all objects in source bucket 47 - paginator = s3_client.get_paginator("list_objects_v2") 48 - pages = paginator.paginate(Bucket=source_bucket) 49 - 50 - total_objects = 0 51 - copied_objects = 0 52 - 53 - for page in pages: 54 - if "Contents" not in page: 55 - print(f"No objects found in {source_bucket}") 56 - return 57 - 58 - for obj in page["Contents"]: 59 - key = obj["Key"] 60 - total_objects += 1 61 - 62 - try: 63 - # Copy object 64 - copy_source = {"Bucket": source_bucket, "Key": key} 65 - s3_client.copy_object( 66 - CopySource=copy_source, Bucket=dest_bucket, Key=key 67 - ) 68 - copied_objects += 1 69 - print(f"✓ Copied: {key} ({obj['Size']} bytes)") 70 - except Exception as e: 71 - print(f"✗ Failed to copy {key}: {e}") 72 - 73 - print(f"\n{'=' * 60}") 74 - print(f"Summary: {copied_objects}/{total_objects} objects copied successfully") 75 - print(f"{'=' * 60}\n") 76 - 77 - 78 - def main(): 79 - """Copy data from old buckets to new buckets.""" 80 - s3_client = get_s3_client() 81 - 82 - # Define bucket mappings 83 - bucket_mappings = [ 84 - ("relay", "audio-prod"), 85 - ("relay-stg", "audio-staging"), 86 - ] 87 - 88 - print("\n🚀 Starting R2 bucket copy operation\n") 89 - 90 - for source, dest in bucket_mappings: 91 - try: 92 - copy_bucket(s3_client, source, dest) 93 - except Exception as e: 94 - print(f"✗ Error copying {source} -> {dest}: {e}") 95 - continue 96 - 97 - print("✅ All copy operations completed!\n") 98 - 99 - 100 - if __name__ == "__main__": 101 - main()
+169
scripts/migrate_cdn_urls.py
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.11" 4 + # dependencies = ["asyncpg", "rich", "typer"] 5 + # /// 6 + """migrate R2 URLs from r2.dev to custom CDN domains. 7 + 8 + ## context 9 + 10 + R2 buckets were originally exposed via r2.dev managed subdomains, which 11 + bypass Cloudflare's CDN cache layer entirely. custom domains 12 + (audio.plyr.fm, images.plyr.fm) were provisioned to enable edge caching. 13 + 14 + this script updates the cached URLs in the database so the API serves 15 + CDN-backed URLs instead of direct R2 URLs. the underlying R2 objects 16 + don't move — both URLs resolve to the same bytes. 17 + 18 + ## what it updates 19 + 20 + - tracks.r2_url (audio CDN URLs) 21 + - tracks.image_url (image CDN URLs) 22 + - tracks.thumbnail_url (thumbnail CDN URLs) 23 + - albums.image_url 24 + - albums.thumbnail_url 25 + - playlists.image_url 26 + - playlists.thumbnail_url 27 + 28 + ## usage 29 + 30 + # dry run — show what would change 31 + uv run scripts/migrate_cdn_urls.py 32 + 33 + # apply changes 34 + uv run scripts/migrate_cdn_urls.py --apply 35 + 36 + # target a specific database 37 + DATABASE_URL=postgresql://... uv run scripts/migrate_cdn_urls.py --apply 38 + """ 39 + 40 + import asyncio 41 + import os 42 + 43 + import asyncpg 44 + import typer 45 + from rich.console import Console 46 + from rich.table import Table 47 + 48 + console = Console() 49 + 50 + 51 + def _get_database_url() -> str: 52 + url = os.environ.get("DATABASE_URL", "") 53 + if not url: 54 + typer.echo("DATABASE_URL not set", err=True) 55 + raise typer.Exit(1) 56 + return url 57 + 58 + 59 + async def _run( 60 + old_audio: str, 61 + new_audio: str, 62 + old_images: str, 63 + new_images: str, 64 + apply: bool, 65 + ) -> None: 66 + conn = await asyncpg.connect(_get_database_url()) 67 + 68 + table = Table(title="CDN URL migration" + (" (dry run)" if not apply else "")) 69 + table.add_column("table.column") 70 + table.add_column("rows") 71 + table.add_column("old domain") 72 + table.add_column("new domain") 73 + 74 + updates = [ 75 + ("tracks", "r2_url", old_audio, new_audio), 76 + ("tracks", "image_url", old_images, new_images), 77 + ("tracks", "thumbnail_url", old_images, new_images), 78 + ("albums", "image_url", old_images, new_images), 79 + ("albums", "thumbnail_url", old_images, new_images), 80 + ("playlists", "image_url", old_images, new_images), 81 + ("playlists", "thumbnail_url", old_images, new_images), 82 + ] 83 + 84 + total = 0 85 + for tbl, col, old, new in updates: 86 + count = await conn.fetchval( 87 + f"SELECT COUNT(*) FROM {tbl} WHERE {col} LIKE $1", 88 + f"%{old}%", 89 + ) 90 + if count > 0: 91 + table.add_row(f"{tbl}.{col}", str(count), old, new) 92 + total += count 93 + 94 + if apply: 95 + await conn.execute( 96 + f"UPDATE {tbl} SET {col} = replace({col}, $1, $2) WHERE {col} LIKE $3", 97 + old, 98 + new, 99 + f"%{old}%", 100 + ) 101 + 102 + console.print(table) 103 + console.print( 104 + f"\ntotal: {total} rows" + (" updated" if apply else " would be updated") 105 + ) 106 + 107 + if not apply and total > 0: 108 + console.print("\nrun with --apply to execute", style="dim") 109 + 110 + await conn.close() 111 + 112 + 113 + def main( 114 + apply: bool = typer.Option(False, help="apply changes (default is dry run)"), 115 + old_audio: str = typer.Option( 116 + "", 117 + help="r2.dev audio domain to replace (auto-detected from DATABASE_URL if empty)", 118 + ), 119 + new_audio: str = typer.Option( 120 + "", 121 + help="custom audio domain (auto-detected from DATABASE_URL if empty)", 122 + ), 123 + old_images: str = typer.Option( 124 + "", 125 + help="r2.dev images domain to replace (auto-detected from DATABASE_URL if empty)", 126 + ), 127 + new_images: str = typer.Option( 128 + "", 129 + help="custom images domain (auto-detected from DATABASE_URL if empty)", 130 + ), 131 + ) -> None: 132 + """migrate R2 URLs from r2.dev to custom CDN domains.""" 133 + db_url = _get_database_url() 134 + 135 + # auto-detect environment from DATABASE_URL 136 + if not all([old_audio, new_audio, old_images, new_images]): 137 + if "plyr-prd" in db_url or "cold-butterfly" in db_url: 138 + old_audio = ( 139 + old_audio or "https://pub-d4ed8a1e39d44dac85263d86ad5676fd.r2.dev" 140 + ) 141 + new_audio = new_audio or "https://audio.plyr.fm" 142 + old_images = ( 143 + old_images or "https://pub-7ea7ea9a6f224f4f8c0321a2bb008c5a.r2.dev" 144 + ) 145 + new_images = new_images or "https://images.plyr.fm" 146 + console.print("detected: [bold]production[/bold]") 147 + elif "plyr-stg" in db_url or "frosty-math" in db_url: 148 + old_audio = ( 149 + old_audio or "https://pub-0a0a2e70496c461581c9fafb442b269d.r2.dev" 150 + ) 151 + new_audio = new_audio or "https://audio-stg.plyr.fm" 152 + old_images = ( 153 + old_images or "https://pub-6991ec380502409380d5b3c3aa28230c.r2.dev" 154 + ) 155 + new_images = new_images or "https://images-stg.plyr.fm" 156 + console.print("detected: [bold]staging[/bold]") 157 + else: 158 + console.print( 159 + "could not detect environment from DATABASE_URL. " 160 + "pass --old-audio, --new-audio, --old-images, --new-images explicitly.", 161 + style="red", 162 + ) 163 + raise typer.Exit(1) 164 + 165 + asyncio.run(_run(old_audio, new_audio, old_images, new_images, apply)) 166 + 167 + 168 + if __name__ == "__main__": 169 + typer.run(main)
-89
scripts/migrate_images_to_new_buckets.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """migrate images from audio-* buckets to images-* buckets.""" 3 - 4 - from pathlib import Path 5 - 6 - import boto3 7 - from pydantic import Field 8 - from pydantic_settings import BaseSettings, SettingsConfigDict 9 - 10 - BASE_DIR = Path(__file__).resolve().parents[1] 11 - 12 - 13 - class R2Settings(BaseSettings): 14 - """R2 credentials from environment.""" 15 - 16 - model_config = SettingsConfigDict( 17 - env_file=str(BASE_DIR / ".env"), 18 - env_file_encoding="utf-8", 19 - extra="ignore", 20 - case_sensitive=False, 21 - ) 22 - 23 - aws_access_key_id: str = Field(validation_alias="AWS_ACCESS_KEY_ID") 24 - aws_secret_access_key: str = Field(validation_alias="AWS_SECRET_ACCESS_KEY") 25 - r2_endpoint_url: str = Field(validation_alias="R2_ENDPOINT_URL") 26 - 27 - 28 - def migrate_images(env: str): 29 - """migrate images for a specific environment. 30 - 31 - args: 32 - env: environment name (dev, staging, prod) 33 - """ 34 - settings = R2Settings() 35 - 36 - s3 = boto3.client( 37 - "s3", 38 - endpoint_url=settings.r2_endpoint_url, 39 - aws_access_key_id=settings.aws_access_key_id, 40 - aws_secret_access_key=settings.aws_secret_access_key, 41 - region_name="auto", 42 - ) 43 - 44 - source_bucket = f"audio-{env}" 45 - dest_bucket = f"images-{env}" 46 - prefix = "images/" 47 - 48 - print(f"\nmigrating {env}:") 49 - print(f" source: {source_bucket}/{prefix}") 50 - print(f" dest: {dest_bucket}/") 51 - 52 - # list all objects in source bucket with images/ prefix 53 - paginator = s3.get_paginator("list_objects_v2") 54 - pages = paginator.paginate(Bucket=source_bucket, Prefix=prefix) 55 - 56 - copied_count = 0 57 - for page in pages: 58 - if "Contents" not in page: 59 - continue 60 - 61 - for obj in page["Contents"]: 62 - source_key = obj["Key"] 63 - # remove images/ prefix for destination 64 - dest_key = source_key.replace("images/", "", 1) 65 - 66 - # copy object 67 - copy_source = {"Bucket": source_bucket, "Key": source_key} 68 - s3.copy_object( 69 - CopySource=copy_source, 70 - Bucket=dest_bucket, 71 - Key=dest_key, 72 - ) 73 - 74 - copied_count += 1 75 - print(f" ✓ copied {source_key} -> {dest_key}") 76 - 77 - print(f" total: {copied_count} files") 78 - 79 - 80 - def main(): 81 - """migrate images for all environments.""" 82 - for env in ["dev", "staging", "prod"]: 83 - migrate_images(env) 84 - 85 - print("\n✅ migration complete!") 86 - 87 - 88 - if __name__ == "__main__": 89 - main()
-144
scripts/migrate_r2_bucket.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - """One-time migration script to copy audio files from old 'relay' bucket to new 'audio-prod' bucket. 3 - 4 - This script: 5 - 1. Fetches all tracks from the production database 6 - 2. Identifies tracks with R2 URLs pointing to the old bucket 7 - 3. Copies files from old bucket to new bucket 8 - 4. Updates the r2_url column in the database 9 - 10 - Usage: 11 - uv run python scripts/migrate_r2_bucket.py 12 - """ 13 - 14 - import asyncio 15 - import logging 16 - 17 - import boto3 18 - from botocore.config import Config 19 - from sqlalchemy import select, update 20 - from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine 21 - 22 - from backend.config import settings 23 - from backend.models.track import Track 24 - 25 - logging.basicConfig(level=logging.INFO) 26 - logger = logging.getLogger(__name__) 27 - 28 - # Old and new bucket configuration 29 - OLD_BUCKET = "relay" 30 - OLD_PUBLIC_URL = "https://pub-841ec0f5a7854eaab01292d44aca4820.r2.dev" 31 - NEW_BUCKET = "audio-prod" 32 - NEW_PUBLIC_URL = "https://pub-d4ed8a1e39d44dac85263d86ad5676fd.r2.dev" 33 - R2_ENDPOINT_URL = "https://8feb33b5fb57ce2bc093bc6f4141f40a.r2.cloudflarestorage.com" 34 - 35 - 36 - async def main(): 37 - """Run the R2 bucket migration.""" 38 - logger.info("Starting R2 bucket migration") 39 - 40 - # Create R2 client 41 - if not all( 42 - [settings.storage.aws_access_key_id, settings.storage.aws_secret_access_key] 43 - ): 44 - logger.error("AWS credentials not found in environment") 45 - logger.error( 46 - "Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables" 47 - ) 48 - return 49 - 50 - r2_client = boto3.client( 51 - "s3", 52 - endpoint_url=R2_ENDPOINT_URL, 53 - aws_access_key_id=settings.storage.aws_access_key_id, 54 - aws_secret_access_key=settings.storage.aws_secret_access_key, 55 - config=Config( 56 - request_checksum_calculation="WHEN_REQUIRED", 57 - response_checksum_validation="WHEN_REQUIRED", 58 - ), 59 - ) 60 - 61 - # Create database session 62 - engine = create_async_engine(settings.database.url) 63 - async_session = async_sessionmaker(engine, expire_on_commit=False) 64 - 65 - async with async_session() as session: 66 - # Fetch all tracks with old bucket URLs 67 - result = await session.execute( 68 - select(Track).where(Track.r2_url.like(f"{OLD_PUBLIC_URL}%")) 69 - ) 70 - tracks = result.scalars().all() 71 - 72 - if not tracks: 73 - logger.info("No tracks found with old bucket URLs") 74 - return 75 - 76 - logger.info(f"Found {len(tracks)} tracks to migrate") 77 - 78 - migrated_count = 0 79 - failed_count = 0 80 - 81 - for track in tracks: 82 - try: 83 - # Extract the S3 key from the old URL 84 - # Format: https://pub-841ec0f5a7854eaab01292d44aca4820.r2.dev/audio/FILE_ID.EXT 85 - old_key = track.r2_url.replace(f"{OLD_PUBLIC_URL}/", "") 86 - new_key = old_key # Same key structure in new bucket 87 - 88 - logger.info(f"Migrating track {track.id}: {track.title}") 89 - logger.info(f" Old: {OLD_BUCKET}/{old_key}") 90 - logger.info(f" New: {NEW_BUCKET}/{new_key}") 91 - 92 - # Check if file exists in old bucket 93 - try: 94 - r2_client.head_object(Bucket=OLD_BUCKET, Key=old_key) 95 - except r2_client.exceptions.ClientError as e: 96 - if e.response["Error"]["Code"] == "404": 97 - logger.error(f" File not found in old bucket: {old_key}") 98 - failed_count += 1 99 - continue 100 - raise 101 - 102 - # Check if file already exists in new bucket 103 - try: 104 - r2_client.head_object(Bucket=NEW_BUCKET, Key=new_key) 105 - logger.info(" File already exists in new bucket, skipping copy") 106 - except r2_client.exceptions.ClientError as e: 107 - if e.response["Error"]["Code"] == "404": 108 - # Copy file from old bucket to new bucket 109 - logger.info(" Copying file to new bucket...") 110 - r2_client.copy_object( 111 - Bucket=NEW_BUCKET, 112 - Key=new_key, 113 - CopySource={"Bucket": OLD_BUCKET, "Key": old_key}, 114 - ) 115 - logger.info(" File copied successfully") 116 - else: 117 - raise 118 - 119 - # Update database with new URL 120 - new_url = f"{NEW_PUBLIC_URL}/{new_key}" 121 - await session.execute( 122 - update(Track).where(Track.id == track.id).values(r2_url=new_url) 123 - ) 124 - 125 - logger.info(f" Updated database: {new_url}") 126 - migrated_count += 1 127 - 128 - except Exception as e: 129 - logger.error(f" Failed to migrate track {track.id}: {e}") 130 - failed_count += 1 131 - 132 - # Commit all database changes 133 - await session.commit() 134 - 135 - logger.info("") 136 - logger.info("=" * 60) 137 - logger.info("Migration complete!") 138 - logger.info(f" Migrated: {migrated_count}") 139 - logger.info(f" Failed: {failed_count}") 140 - logger.info("=" * 60) 141 - 142 - 143 - if __name__ == "__main__": 144 - asyncio.run(main())
-229
scripts/migrate_sensitive_images.py
··· 1 - #!/usr/bin/env -S uv run --script --quiet --with-editable=backend 2 - # /// script 3 - # requires-python = ">=3.12" 4 - # dependencies = [ 5 - # "httpx", 6 - # "pydantic-settings", 7 - # "asyncpg", 8 - # "sqlalchemy[asyncio]", 9 - # ] 10 - # /// 11 - """migrate sensitive images from backend database to moderation service. 12 - 13 - this script reads sensitive images from the backend database and creates 14 - them in the moderation service. after migration, the backend will proxy 15 - sensitive image requests to the moderation service. 16 - 17 - usage: 18 - uv run scripts/migrate_sensitive_images.py --env prod --dry-run 19 - uv run scripts/migrate_sensitive_images.py --env prod 20 - 21 - environment variables (set in .env or export): 22 - PROD_DATABASE_URL - production database connection string 23 - STAGING_DATABASE_URL - staging database connection string 24 - DEV_DATABASE_URL - development database connection string 25 - MODERATION_SERVICE_URL - URL of moderation service 26 - MODERATION_AUTH_TOKEN - auth token for moderation service 27 - """ 28 - 29 - import argparse 30 - import asyncio 31 - import os 32 - from typing import Literal 33 - 34 - import httpx 35 - from pydantic import Field 36 - from pydantic_settings import BaseSettings, SettingsConfigDict 37 - from sqlalchemy import text 38 - from sqlalchemy.ext.asyncio import create_async_engine 39 - 40 - Environment = Literal["dev", "staging", "prod"] 41 - 42 - 43 - class MigrationSettings(BaseSettings): 44 - """settings for migration script.""" 45 - 46 - model_config = SettingsConfigDict( 47 - env_file=".env", 48 - case_sensitive=False, 49 - extra="ignore", 50 - ) 51 - 52 - dev_database_url: str = Field(default="", validation_alias="DEV_DATABASE_URL") 53 - staging_database_url: str = Field( 54 - default="", validation_alias="STAGING_DATABASE_URL" 55 - ) 56 - prod_database_url: str = Field(default="", validation_alias="PROD_DATABASE_URL") 57 - 58 - moderation_service_url: str = Field( 59 - default="https://moderation.plyr.fm", 60 - validation_alias="MODERATION_SERVICE_URL", 61 - ) 62 - moderation_auth_token: str = Field( 63 - default="", validation_alias="MODERATION_AUTH_TOKEN" 64 - ) 65 - 66 - def get_database_url(self, env: Environment) -> str: 67 - """get database URL for environment.""" 68 - urls = { 69 - "dev": self.dev_database_url, 70 - "staging": self.staging_database_url, 71 - "prod": self.prod_database_url, 72 - } 73 - url = urls.get(env, "") 74 - if not url: 75 - raise ValueError(f"no database URL configured for {env}") 76 - # ensure asyncpg driver is used 77 - if url.startswith("postgresql://"): 78 - url = url.replace("postgresql://", "postgresql+asyncpg://", 1) 79 - return url 80 - 81 - def get_moderation_url(self, env: Environment) -> str: 82 - """get moderation service URL for environment.""" 83 - if env == "dev": 84 - return os.getenv("DEV_MODERATION_URL", "http://localhost:8002") 85 - elif env == "staging": 86 - return os.getenv("STAGING_MODERATION_URL", "https://moderation-stg.plyr.fm") 87 - else: 88 - return self.moderation_service_url 89 - 90 - 91 - async def fetch_sensitive_images(db_url: str) -> list[dict]: 92 - """fetch all sensitive images from backend database.""" 93 - engine = create_async_engine(db_url) 94 - 95 - async with engine.begin() as conn: 96 - result = await conn.execute( 97 - text( 98 - """ 99 - SELECT id, image_id, url, reason, flagged_at, flagged_by 100 - FROM sensitive_images 101 - ORDER BY id 102 - """ 103 - ) 104 - ) 105 - rows = result.fetchall() 106 - 107 - await engine.dispose() 108 - 109 - return [ 110 - { 111 - "id": row[0], 112 - "image_id": row[1], 113 - "url": row[2], 114 - "reason": row[3], 115 - "flagged_at": row[4].isoformat() if row[4] else None, 116 - "flagged_by": row[5], 117 - } 118 - for row in rows 119 - ] 120 - 121 - 122 - async def migrate_to_moderation_service( 123 - images: list[dict], 124 - moderation_url: str, 125 - auth_token: str, 126 - dry_run: bool = False, 127 - ) -> tuple[int, int]: 128 - """migrate images to moderation service. 129 - 130 - returns: 131 - tuple of (success_count, error_count) 132 - """ 133 - success_count = 0 134 - error_count = 0 135 - 136 - headers = {"X-Moderation-Key": auth_token} 137 - 138 - async with httpx.AsyncClient(timeout=30.0) as client: 139 - for image in images: 140 - payload = { 141 - "image_id": image["image_id"], 142 - "url": image["url"], 143 - "reason": image["reason"], 144 - "flagged_by": image["flagged_by"], 145 - } 146 - 147 - if dry_run: 148 - print(f" [dry-run] would migrate: {payload}") 149 - success_count += 1 150 - continue 151 - 152 - try: 153 - response = await client.post( 154 - f"{moderation_url}/admin/sensitive-images", 155 - json=payload, 156 - headers=headers, 157 - ) 158 - response.raise_for_status() 159 - result = response.json() 160 - print(f" migrated id={image['id']} -> moderation id={result['id']}") 161 - success_count += 1 162 - except Exception as e: 163 - print(f" ERROR migrating id={image['id']}: {e}") 164 - error_count += 1 165 - 166 - return success_count, error_count 167 - 168 - 169 - async def main() -> None: 170 - parser = argparse.ArgumentParser( 171 - description="migrate sensitive images to moderation service" 172 - ) 173 - parser.add_argument( 174 - "--env", 175 - choices=["dev", "staging", "prod"], 176 - required=True, 177 - help="environment to migrate", 178 - ) 179 - parser.add_argument( 180 - "--dry-run", 181 - action="store_true", 182 - help="show what would be migrated without making changes", 183 - ) 184 - args = parser.parse_args() 185 - 186 - settings = MigrationSettings() 187 - 188 - print(f"migrating sensitive images for {args.env}") 189 - if args.dry_run: 190 - print("(dry run - no changes will be made)") 191 - 192 - # fetch from backend database 193 - db_url = settings.get_database_url(args.env) 194 - print("\nfetching from backend database...") 195 - images = await fetch_sensitive_images(db_url) 196 - print(f"found {len(images)} sensitive images") 197 - 198 - if not images: 199 - print("nothing to migrate") 200 - return 201 - 202 - # migrate to moderation service 203 - moderation_url = settings.get_moderation_url(args.env) 204 - print(f"\nmigrating to moderation service at {moderation_url}...") 205 - 206 - if not settings.moderation_auth_token and not args.dry_run: 207 - print("ERROR: MODERATION_AUTH_TOKEN not set") 208 - return 209 - 210 - success, errors = await migrate_to_moderation_service( 211 - images, 212 - moderation_url, 213 - settings.moderation_auth_token, 214 - dry_run=args.dry_run, 215 - ) 216 - 217 - print(f"\ndone: {success} migrated, {errors} errors") 218 - 219 - if not args.dry_run and errors == 0: 220 - print( 221 - "\nnext steps:\n" 222 - " 1. verify data in moderation service: GET /sensitive-images\n" 223 - " 2. update backend to proxy to moderation service\n" 224 - " 3. optionally drop sensitive_images table from backend db" 225 - ) 226 - 227 - 228 - if __name__ == "__main__": 229 - asyncio.run(main())