#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = [] # /// """ bulk enrich actors via bsky getProfiles API. pages through all actors by rowid, calls getProfiles in concurrent batches, writes back handles, avatars, labels, createdAt, associated. writes in small batches (50 stmts) with 200ms pauses between to avoid saturating Turso and blocking production reads. usage: TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --start-rowid 2529 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --dry-run """ import argparse import json import os import re import sys import time import urllib.request import urllib.error from concurrent.futures import ThreadPoolExecutor, as_completed BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" PAGE_SIZE = 500 # DIDs per Turso read page BSKY_CONCURRENCY = 5 # concurrent getProfiles calls WRITE_BATCH = 50 # stmts per Turso write — keep write lock short WRITE_PAUSE = 0.2 # seconds between write batches — let reads through DIM = "\033[2m" RESET = "\033[0m" def get_turso_url() -> str: url = os.environ.get("TURSO_URL", "") if not url: print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1) return url.replace("libsql://", "https://") def get_turso_token() -> str: token = os.environ.get("TURSO_AUTH_TOKEN", "") if not token: print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1) return token def turso_query(sql, args, turso_url, turso_token): body = json.dumps({"requests": [ {"type": "execute", "stmt": {"sql": sql, "args": args}}, {"type": "close"}, ]}).encode() req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", }) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read()) res = result["results"][0] if res.get("type") == "error": print(f" turso error: {res['error']['message']}", file=sys.stderr) return [] cols = [c["name"] for c in res["response"]["result"]["cols"]] return [{c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)} for row in res["response"]["result"]["rows"]] def turso_batch_write(stmts, turso_url, turso_token): reqs = [{"type": "execute", "stmt": s} for s in stmts] reqs.append({"type": "close"}) body = json.dumps({"requests": reqs}).encode() req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", }) try: with urllib.request.urlopen(req, timeout=60) as resp: json.loads(resp.read()) return True except Exception as e: print(f"\n turso write failed: {e}", file=sys.stderr) return False def extract_avatar_cid(url): if not url: return "" m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url) return m.group(1) if m else "" def clean_associated(assoc): if not assoc or not isinstance(assoc, dict): return "{}" clean = {k: v for k, v in assoc.items() if v not in (0, False, None)} return json.dumps(clean) if clean else "{}" def fetch_profiles(dids): params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids) req = urllib.request.Request( f"{BSKY_GET_PROFILES}?{params}", headers={"User-Agent": "typeahead-enrich/1.0"}, ) try: with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read()).get("profiles", []) except urllib.error.HTTPError as e: if e.code == 429: return "rate_limited" return [] except Exception: return [] def profile_to_stmt(p): hide_vals = {"!hide", "!takedown", "!suspend", "spam"} mod_did = "did:plc:ar7c4by46qjdydhdevvrndac" hidden = 0 for lbl in (p.get("labels") or []): if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did): hidden = 1; break return { "sql": """UPDATE actors SET handle = COALESCE(NULLIF(?2, ''), handle), display_name = COALESCE(NULLIF(?3, ''), display_name), avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), labels = ?5, hidden = ?6, created_at = COALESCE(NULLIF(?7, ''), created_at), associated = COALESCE(NULLIF(?8, '{}'), associated), profile_checked_at = unixepoch() WHERE did = ?1""", "args": [ {"type": "text", "value": p["did"]}, {"type": "text", "value": p.get("handle", "")}, {"type": "text", "value": p.get("displayName", "")}, {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))}, {"type": "text", "value": json.dumps(p.get("labels", []))}, {"type": "integer", "value": str(hidden)}, {"type": "text", "value": p.get("createdAt", "")}, {"type": "text", "value": clean_associated(p.get("associated"))}, ], } def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") parser.add_argument("--start-rowid", type=int, default=0) args = parser.parse_args() turso_url = get_turso_url() turso_token = get_turso_token() enriched = 0 checked = 0 not_found = 0 t0 = time.time() last_rowid = args.start_rowid print(f"bulk enriching (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH}, write_pause={WRITE_PAUSE}s)...") if args.dry_run: print(" DRY RUN — no writes") if args.start_rowid: print(f" resuming from rowid {args.start_rowid}") while True: rows = turso_query( "SELECT rowid, did FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2", [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(PAGE_SIZE)}], turso_url, turso_token, ) if not rows: break dids = [r["did"] for r in rows] last_rowid = int(rows[-1]["rowid"]) # getProfiles in concurrent batches of 25 batches = [dids[i:i+25] for i in range(0, len(dids), 25)] pending = [] with ThreadPoolExecutor(max_workers=BSKY_CONCURRENCY) as pool: futures = {pool.submit(fetch_profiles, b): b for b in batches} for future in as_completed(futures): batch = futures[future] result = future.result() if result == "rate_limited": print(f"\n rate limited — pausing 30s...") time.sleep(30) result = fetch_profiles(batch) if result == "rate_limited": print(" still limited, skipping batch") checked += len(batch) continue checked += len(batch) returned = {p["did"] for p in result} not_found += len(batch) - len(returned) for p in result: pending.append(profile_to_stmt(p)) enriched += 1 # write in small batches with pauses between each if pending and not args.dry_run: write_t0 = time.time() write_chunks = 0 for i in range(0, len(pending), WRITE_BATCH): turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token) write_chunks += 1 time.sleep(WRITE_PAUSE) write_ms = int((time.time() - write_t0) * 1000) print(f" wrote {len(pending)} stmts in {write_chunks} chunks ({write_ms}ms)") elapsed = time.time() - t0 rate = checked / elapsed if elapsed > 0 else 0 tag = "dry" if args.dry_run else "live" print( f" [{tag}] checked={checked:,} enriched={enriched:,} " f"not_found={not_found:,} " f"{DIM}{rate:.0f} dids/s rowid={last_rowid}{RESET}" ) elapsed = time.time() - t0 print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}, checked={checked:,}") if __name__ == "__main__": main()