GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7bulk enrich actors missing handles or avatars via bsky getProfiles API.
8
9only fetches+writes actors that actually need data (handle='' or avatar_url=''),
10skipping already-enriched rows entirely. writes in small batches with pauses.
11
12usage:
13 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py
14 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --start-rowid 2529
15 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --dry-run
16"""
17
18import argparse
19import json
20import os
21import re
22import sys
23import time
24import urllib.request
25import urllib.error
26from concurrent.futures import ThreadPoolExecutor, as_completed
27
28BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles"
29PAGE_SIZE = 500 # unenriched DIDs per page
30BSKY_CONCURRENCY = 5 # concurrent getProfiles calls
31WRITE_BATCH = 25 # stmts per Turso write — tiny to minimize lock time
32WRITE_PAUSE = 0.05 # seconds between write batches
33
34DIM = "\033[2m"
35RESET = "\033[0m"
36
37
38def get_turso_url() -> str:
39 url = os.environ.get("TURSO_URL", "")
40 if not url:
41 print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1)
42 return url.replace("libsql://", "https://")
43
44
45def get_turso_token() -> str:
46 token = os.environ.get("TURSO_AUTH_TOKEN", "")
47 if not token:
48 print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1)
49 return token
50
51
52def turso_query(sql, args, turso_url, turso_token):
53 body = json.dumps({"requests": [
54 {"type": "execute", "stmt": {"sql": sql, "args": args}},
55 {"type": "close"},
56 ]}).encode()
57 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={
58 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json",
59 })
60 with urllib.request.urlopen(req, timeout=30) as resp:
61 result = json.loads(resp.read())
62 res = result["results"][0]
63 if res.get("type") == "error":
64 print(f" turso error: {res['error']['message']}", file=sys.stderr)
65 return []
66 cols = [c["name"] for c in res["response"]["result"]["cols"]]
67 return [{c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)}
68 for row in res["response"]["result"]["rows"]]
69
70
71def turso_batch_write(stmts, turso_url, turso_token):
72 reqs = [{"type": "execute", "stmt": s} for s in stmts]
73 reqs.append({"type": "close"})
74 body = json.dumps({"requests": reqs}).encode()
75 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={
76 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json",
77 })
78 try:
79 with urllib.request.urlopen(req, timeout=30) as resp:
80 json.loads(resp.read())
81 return True
82 except Exception as e:
83 print(f"\n turso write failed: {e}", file=sys.stderr)
84 return False
85
86
87def extract_avatar_cid(url):
88 if not url: return ""
89 m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url)
90 return m.group(1) if m else ""
91
92
93def clean_associated(assoc):
94 if not assoc or not isinstance(assoc, dict): return "{}"
95 clean = {k: v for k, v in assoc.items() if v not in (0, False, None)}
96 return json.dumps(clean) if clean else "{}"
97
98
99def fetch_profiles(dids):
100 params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids)
101 req = urllib.request.Request(
102 f"{BSKY_GET_PROFILES}?{params}",
103 headers={"User-Agent": "typeahead-enrich/1.0"},
104 )
105 try:
106 with urllib.request.urlopen(req, timeout=15) as resp:
107 return json.loads(resp.read()).get("profiles", [])
108 except urllib.error.HTTPError as e:
109 if e.code == 429: return "rate_limited"
110 return []
111 except Exception:
112 return []
113
114
115def profile_to_stmt(p):
116 hide_vals = {"!hide", "!takedown", "!suspend", "spam"}
117 mod_did = "did:plc:ar7c4by46qjdydhdevvrndac"
118 hidden = 0
119 for lbl in (p.get("labels") or []):
120 if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did):
121 hidden = 1; break
122
123 return {
124 "sql": """UPDATE actors SET
125 handle = COALESCE(NULLIF(?2, ''), handle),
126 display_name = COALESCE(NULLIF(?3, ''), display_name),
127 avatar_url = COALESCE(NULLIF(?4, ''), avatar_url),
128 labels = ?5, hidden = ?6,
129 created_at = COALESCE(NULLIF(?7, ''), created_at),
130 associated = COALESCE(NULLIF(?8, '{}'), associated),
131 profile_checked_at = unixepoch()
132 WHERE did = ?1""",
133 "args": [
134 {"type": "text", "value": p["did"]},
135 {"type": "text", "value": p.get("handle", "")},
136 {"type": "text", "value": p.get("displayName", "")},
137 {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))},
138 {"type": "text", "value": json.dumps(p.get("labels", []))},
139 {"type": "integer", "value": str(hidden)},
140 {"type": "text", "value": p.get("createdAt", "")},
141 {"type": "text", "value": clean_associated(p.get("associated"))},
142 ],
143 }
144
145
146def main():
147 parser = argparse.ArgumentParser()
148 parser.add_argument("--dry-run", action="store_true")
149 parser.add_argument("--start-rowid", type=int, default=0)
150 args = parser.parse_args()
151
152 turso_url = get_turso_url()
153 turso_token = get_turso_token()
154
155 enriched = 0
156 skipped = 0
157 not_found = 0
158 t0 = time.time()
159 last_rowid = args.start_rowid
160
161 print(f"bulk enriching unenriched actors only (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH})...")
162 if args.dry_run: print(" DRY RUN — no writes")
163 if args.start_rowid: print(f" resuming from rowid {args.start_rowid}")
164
165 while True:
166 # cheap rowid-paginated read — no filter scan, just walk forward
167 rows = turso_query(
168 "SELECT rowid, did, handle, avatar_url FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2",
169 [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(PAGE_SIZE)}],
170 turso_url, turso_token,
171 )
172 if not rows:
173 break
174
175 last_rowid = int(rows[-1]["rowid"])
176
177 # filter client-side: only fetch profiles for rows missing data
178 need = [r for r in rows if not r["handle"] or not r["avatar_url"]]
179 skipped += len(rows) - len(need)
180
181 if not need:
182 elapsed = time.time() - t0
183 print(f" skipped {len(rows)} already-enriched {DIM}rowid={last_rowid}{RESET}")
184 continue
185
186 dids = [r["did"] for r in need]
187
188 # getProfiles in concurrent batches of 25
189 batches = [dids[i:i+25] for i in range(0, len(dids), 25)]
190 pending = []
191
192 with ThreadPoolExecutor(max_workers=BSKY_CONCURRENCY) as pool:
193 futures = {pool.submit(fetch_profiles, b): b for b in batches}
194 for future in as_completed(futures):
195 batch = futures[future]
196 result = future.result()
197
198 if result == "rate_limited":
199 print(f"\n rate limited — pausing 30s...")
200 time.sleep(30)
201 result = fetch_profiles(batch)
202 if result == "rate_limited":
203 print(" still limited, skipping batch")
204 not_found += len(batch)
205 continue
206
207 returned = {p["did"] for p in result}
208 not_found += len(batch) - len(returned)
209
210 for p in result:
211 pending.append(profile_to_stmt(p))
212 enriched += 1
213
214 # write in small batches
215 if pending and not args.dry_run:
216 write_t0 = time.time()
217 ok = 0
218 for i in range(0, len(pending), WRITE_BATCH):
219 if turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token):
220 ok += len(pending[i:i+WRITE_BATCH])
221 time.sleep(WRITE_PAUSE)
222 write_ms = int((time.time() - write_t0) * 1000)
223 print(f" wrote {ok}/{len(pending)} stmts ({write_ms}ms)")
224
225 elapsed = time.time() - t0
226 rate = enriched / elapsed if elapsed > 0 else 0
227 tag = "dry" if args.dry_run else "live"
228 print(
229 f" [{tag}] enriched={enriched:,} skipped={skipped:,} not_found={not_found:,} "
230 f"{DIM}{rate:.0f}/s rowid={last_rowid}{RESET}"
231 )
232
233 elapsed = time.time() - t0
234 print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}")
235
236
237if __name__ == "__main__":
238 main()