GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 232 lines 8.5 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7bulk enrich actors via bsky getProfiles API. 8 9pages through all actors by rowid, calls getProfiles in concurrent batches, 10writes back handles, avatars, labels, createdAt, associated. 11 12writes in small batches (50 stmts) with 200ms pauses between to avoid 13saturating Turso and blocking production reads. 14 15usage: 16 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py 17 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --start-rowid 2529 18 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --dry-run 19""" 20 21import argparse 22import json 23import os 24import re 25import sys 26import time 27import urllib.request 28import urllib.error 29from concurrent.futures import ThreadPoolExecutor, as_completed 30 31BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 32PAGE_SIZE = 500 # DIDs per Turso read page 33BSKY_CONCURRENCY = 5 # concurrent getProfiles calls 34WRITE_BATCH = 50 # stmts per Turso write — keep write lock short 35WRITE_PAUSE = 0.2 # seconds between write batches — let reads through 36 37DIM = "\033[2m" 38RESET = "\033[0m" 39 40 41def get_turso_url() -> str: 42 url = os.environ.get("TURSO_URL", "") 43 if not url: 44 print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1) 45 return url.replace("libsql://", "https://") 46 47 48def get_turso_token() -> str: 49 token = os.environ.get("TURSO_AUTH_TOKEN", "") 50 if not token: 51 print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1) 52 return token 53 54 55def turso_query(sql, args, turso_url, turso_token): 56 body = json.dumps({"requests": [ 57 {"type": "execute", "stmt": {"sql": sql, "args": args}}, 58 {"type": "close"}, 59 ]}).encode() 60 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 61 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 62 }) 63 with urllib.request.urlopen(req, timeout=30) as resp: 64 result = json.loads(resp.read()) 65 res = result["results"][0] 66 if res.get("type") == "error": 67 print(f" turso error: {res['error']['message']}", file=sys.stderr) 68 return [] 69 cols = [c["name"] for c in res["response"]["result"]["cols"]] 70 return [{c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)} 71 for row in res["response"]["result"]["rows"]] 72 73 74def turso_batch_write(stmts, turso_url, turso_token): 75 reqs = [{"type": "execute", "stmt": s} for s in stmts] 76 reqs.append({"type": "close"}) 77 body = json.dumps({"requests": reqs}).encode() 78 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 79 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 80 }) 81 try: 82 with urllib.request.urlopen(req, timeout=60) as resp: 83 json.loads(resp.read()) 84 return True 85 except Exception as e: 86 print(f"\n turso write failed: {e}", file=sys.stderr) 87 return False 88 89 90def extract_avatar_cid(url): 91 if not url: return "" 92 m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url) 93 return m.group(1) if m else "" 94 95 96def clean_associated(assoc): 97 if not assoc or not isinstance(assoc, dict): return "{}" 98 clean = {k: v for k, v in assoc.items() if v not in (0, False, None)} 99 return json.dumps(clean) if clean else "{}" 100 101 102def fetch_profiles(dids): 103 params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids) 104 req = urllib.request.Request( 105 f"{BSKY_GET_PROFILES}?{params}", 106 headers={"User-Agent": "typeahead-enrich/1.0"}, 107 ) 108 try: 109 with urllib.request.urlopen(req, timeout=15) as resp: 110 return json.loads(resp.read()).get("profiles", []) 111 except urllib.error.HTTPError as e: 112 if e.code == 429: return "rate_limited" 113 return [] 114 except Exception: 115 return [] 116 117 118def profile_to_stmt(p): 119 hide_vals = {"!hide", "!takedown", "!suspend", "spam"} 120 mod_did = "did:plc:ar7c4by46qjdydhdevvrndac" 121 hidden = 0 122 for lbl in (p.get("labels") or []): 123 if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did): 124 hidden = 1; break 125 126 return { 127 "sql": """UPDATE actors SET 128 handle = COALESCE(NULLIF(?2, ''), handle), 129 display_name = COALESCE(NULLIF(?3, ''), display_name), 130 avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), 131 labels = ?5, hidden = ?6, 132 created_at = COALESCE(NULLIF(?7, ''), created_at), 133 associated = COALESCE(NULLIF(?8, '{}'), associated), 134 profile_checked_at = unixepoch() 135 WHERE did = ?1""", 136 "args": [ 137 {"type": "text", "value": p["did"]}, 138 {"type": "text", "value": p.get("handle", "")}, 139 {"type": "text", "value": p.get("displayName", "")}, 140 {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))}, 141 {"type": "text", "value": json.dumps(p.get("labels", []))}, 142 {"type": "integer", "value": str(hidden)}, 143 {"type": "text", "value": p.get("createdAt", "")}, 144 {"type": "text", "value": clean_associated(p.get("associated"))}, 145 ], 146 } 147 148 149def main(): 150 parser = argparse.ArgumentParser() 151 parser.add_argument("--dry-run", action="store_true") 152 parser.add_argument("--start-rowid", type=int, default=0) 153 args = parser.parse_args() 154 155 turso_url = get_turso_url() 156 turso_token = get_turso_token() 157 158 enriched = 0 159 checked = 0 160 not_found = 0 161 t0 = time.time() 162 last_rowid = args.start_rowid 163 164 print(f"bulk enriching (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH}, write_pause={WRITE_PAUSE}s)...") 165 if args.dry_run: print(" DRY RUN — no writes") 166 if args.start_rowid: print(f" resuming from rowid {args.start_rowid}") 167 168 while True: 169 rows = turso_query( 170 "SELECT rowid, did FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2", 171 [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(PAGE_SIZE)}], 172 turso_url, turso_token, 173 ) 174 if not rows: 175 break 176 177 dids = [r["did"] for r in rows] 178 last_rowid = int(rows[-1]["rowid"]) 179 180 # getProfiles in concurrent batches of 25 181 batches = [dids[i:i+25] for i in range(0, len(dids), 25)] 182 pending = [] 183 184 with ThreadPoolExecutor(max_workers=BSKY_CONCURRENCY) as pool: 185 futures = {pool.submit(fetch_profiles, b): b for b in batches} 186 for future in as_completed(futures): 187 batch = futures[future] 188 result = future.result() 189 190 if result == "rate_limited": 191 print(f"\n rate limited — pausing 30s...") 192 time.sleep(30) 193 result = fetch_profiles(batch) 194 if result == "rate_limited": 195 print(" still limited, skipping batch") 196 checked += len(batch) 197 continue 198 199 checked += len(batch) 200 returned = {p["did"] for p in result} 201 not_found += len(batch) - len(returned) 202 203 for p in result: 204 pending.append(profile_to_stmt(p)) 205 enriched += 1 206 207 # write in small batches with pauses between each 208 if pending and not args.dry_run: 209 write_t0 = time.time() 210 write_chunks = 0 211 for i in range(0, len(pending), WRITE_BATCH): 212 turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token) 213 write_chunks += 1 214 time.sleep(WRITE_PAUSE) 215 write_ms = int((time.time() - write_t0) * 1000) 216 print(f" wrote {len(pending)} stmts in {write_chunks} chunks ({write_ms}ms)") 217 218 elapsed = time.time() - t0 219 rate = checked / elapsed if elapsed > 0 else 0 220 tag = "dry" if args.dry_run else "live" 221 print( 222 f" [{tag}] checked={checked:,} enriched={enriched:,} " 223 f"not_found={not_found:,} " 224 f"{DIM}{rate:.0f} dids/s rowid={last_rowid}{RESET}" 225 ) 226 227 elapsed = time.time() - t0 228 print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}, checked={checked:,}") 229 230 231if __name__ == "__main__": 232 main()