GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 238 lines 8.8 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7bulk enrich actors missing handles or avatars via bsky getProfiles API. 8 9only fetches+writes actors that actually need data (handle='' or avatar_url=''), 10skipping already-enriched rows entirely. writes in small batches with pauses. 11 12usage: 13 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py 14 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --start-rowid 2529 15 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --dry-run 16""" 17 18import argparse 19import json 20import os 21import re 22import sys 23import time 24import urllib.request 25import urllib.error 26from concurrent.futures import ThreadPoolExecutor, as_completed 27 28BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 29PAGE_SIZE = 500 # unenriched DIDs per page 30BSKY_CONCURRENCY = 5 # concurrent getProfiles calls 31WRITE_BATCH = 25 # stmts per Turso write — tiny to minimize lock time 32WRITE_PAUSE = 0.05 # seconds between write batches 33 34DIM = "\033[2m" 35RESET = "\033[0m" 36 37 38def get_turso_url() -> str: 39 url = os.environ.get("TURSO_URL", "") 40 if not url: 41 print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1) 42 return url.replace("libsql://", "https://") 43 44 45def get_turso_token() -> str: 46 token = os.environ.get("TURSO_AUTH_TOKEN", "") 47 if not token: 48 print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1) 49 return token 50 51 52def turso_query(sql, args, turso_url, turso_token): 53 body = json.dumps({"requests": [ 54 {"type": "execute", "stmt": {"sql": sql, "args": args}}, 55 {"type": "close"}, 56 ]}).encode() 57 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 58 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 59 }) 60 with urllib.request.urlopen(req, timeout=30) as resp: 61 result = json.loads(resp.read()) 62 res = result["results"][0] 63 if res.get("type") == "error": 64 print(f" turso error: {res['error']['message']}", file=sys.stderr) 65 return [] 66 cols = [c["name"] for c in res["response"]["result"]["cols"]] 67 return [{c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)} 68 for row in res["response"]["result"]["rows"]] 69 70 71def turso_batch_write(stmts, turso_url, turso_token): 72 reqs = [{"type": "execute", "stmt": s} for s in stmts] 73 reqs.append({"type": "close"}) 74 body = json.dumps({"requests": reqs}).encode() 75 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 76 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 77 }) 78 try: 79 with urllib.request.urlopen(req, timeout=30) as resp: 80 json.loads(resp.read()) 81 return True 82 except Exception as e: 83 print(f"\n turso write failed: {e}", file=sys.stderr) 84 return False 85 86 87def extract_avatar_cid(url): 88 if not url: return "" 89 m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url) 90 return m.group(1) if m else "" 91 92 93def clean_associated(assoc): 94 if not assoc or not isinstance(assoc, dict): return "{}" 95 clean = {k: v for k, v in assoc.items() if v not in (0, False, None)} 96 return json.dumps(clean) if clean else "{}" 97 98 99def fetch_profiles(dids): 100 params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids) 101 req = urllib.request.Request( 102 f"{BSKY_GET_PROFILES}?{params}", 103 headers={"User-Agent": "typeahead-enrich/1.0"}, 104 ) 105 try: 106 with urllib.request.urlopen(req, timeout=15) as resp: 107 return json.loads(resp.read()).get("profiles", []) 108 except urllib.error.HTTPError as e: 109 if e.code == 429: return "rate_limited" 110 return [] 111 except Exception: 112 return [] 113 114 115def profile_to_stmt(p): 116 hide_vals = {"!hide", "!takedown", "!suspend", "spam"} 117 mod_did = "did:plc:ar7c4by46qjdydhdevvrndac" 118 hidden = 0 119 for lbl in (p.get("labels") or []): 120 if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did): 121 hidden = 1; break 122 123 return { 124 "sql": """UPDATE actors SET 125 handle = COALESCE(NULLIF(?2, ''), handle), 126 display_name = COALESCE(NULLIF(?3, ''), display_name), 127 avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), 128 labels = ?5, hidden = ?6, 129 created_at = COALESCE(NULLIF(?7, ''), created_at), 130 associated = COALESCE(NULLIF(?8, '{}'), associated), 131 profile_checked_at = unixepoch() 132 WHERE did = ?1""", 133 "args": [ 134 {"type": "text", "value": p["did"]}, 135 {"type": "text", "value": p.get("handle", "")}, 136 {"type": "text", "value": p.get("displayName", "")}, 137 {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))}, 138 {"type": "text", "value": json.dumps(p.get("labels", []))}, 139 {"type": "integer", "value": str(hidden)}, 140 {"type": "text", "value": p.get("createdAt", "")}, 141 {"type": "text", "value": clean_associated(p.get("associated"))}, 142 ], 143 } 144 145 146def main(): 147 parser = argparse.ArgumentParser() 148 parser.add_argument("--dry-run", action="store_true") 149 parser.add_argument("--start-rowid", type=int, default=0) 150 args = parser.parse_args() 151 152 turso_url = get_turso_url() 153 turso_token = get_turso_token() 154 155 enriched = 0 156 skipped = 0 157 not_found = 0 158 t0 = time.time() 159 last_rowid = args.start_rowid 160 161 print(f"bulk enriching unenriched actors only (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH})...") 162 if args.dry_run: print(" DRY RUN — no writes") 163 if args.start_rowid: print(f" resuming from rowid {args.start_rowid}") 164 165 while True: 166 # cheap rowid-paginated read — no filter scan, just walk forward 167 rows = turso_query( 168 "SELECT rowid, did, handle, avatar_url FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2", 169 [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(PAGE_SIZE)}], 170 turso_url, turso_token, 171 ) 172 if not rows: 173 break 174 175 last_rowid = int(rows[-1]["rowid"]) 176 177 # filter client-side: only fetch profiles for rows missing data 178 need = [r for r in rows if not r["handle"] or not r["avatar_url"]] 179 skipped += len(rows) - len(need) 180 181 if not need: 182 elapsed = time.time() - t0 183 print(f" skipped {len(rows)} already-enriched {DIM}rowid={last_rowid}{RESET}") 184 continue 185 186 dids = [r["did"] for r in need] 187 188 # getProfiles in concurrent batches of 25 189 batches = [dids[i:i+25] for i in range(0, len(dids), 25)] 190 pending = [] 191 192 with ThreadPoolExecutor(max_workers=BSKY_CONCURRENCY) as pool: 193 futures = {pool.submit(fetch_profiles, b): b for b in batches} 194 for future in as_completed(futures): 195 batch = futures[future] 196 result = future.result() 197 198 if result == "rate_limited": 199 print(f"\n rate limited — pausing 30s...") 200 time.sleep(30) 201 result = fetch_profiles(batch) 202 if result == "rate_limited": 203 print(" still limited, skipping batch") 204 not_found += len(batch) 205 continue 206 207 returned = {p["did"] for p in result} 208 not_found += len(batch) - len(returned) 209 210 for p in result: 211 pending.append(profile_to_stmt(p)) 212 enriched += 1 213 214 # write in small batches 215 if pending and not args.dry_run: 216 write_t0 = time.time() 217 ok = 0 218 for i in range(0, len(pending), WRITE_BATCH): 219 if turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token): 220 ok += len(pending[i:i+WRITE_BATCH]) 221 time.sleep(WRITE_PAUSE) 222 write_ms = int((time.time() - write_t0) * 1000) 223 print(f" wrote {ok}/{len(pending)} stmts ({write_ms}ms)") 224 225 elapsed = time.time() - t0 226 rate = enriched / elapsed if elapsed > 0 else 0 227 tag = "dry" if args.dry_run else "live" 228 print( 229 f" [{tag}] enriched={enriched:,} skipped={skipped:,} not_found={not_found:,} " 230 f"{DIM}{rate:.0f}/s rowid={last_rowid}{RESET}" 231 ) 232 233 elapsed = time.time() - t0 234 print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}") 235 236 237if __name__ == "__main__": 238 main()