GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7bulk enrich actors via bsky getProfiles API.
8
9pages through all actors by rowid, calls getProfiles in concurrent batches,
10writes back handles, avatars, labels, createdAt, associated.
11
12writes in small batches (50 stmts) with 200ms pauses between to avoid
13saturating Turso and blocking production reads.
14
15usage:
16 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py
17 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --start-rowid 2529
18 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py --dry-run
19"""
20
21import argparse
22import json
23import os
24import re
25import sys
26import time
27import urllib.request
28import urllib.error
29from concurrent.futures import ThreadPoolExecutor, as_completed
30
31BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles"
32PAGE_SIZE = 500 # DIDs per Turso read page
33BSKY_CONCURRENCY = 5 # concurrent getProfiles calls
34WRITE_BATCH = 50 # stmts per Turso write — keep write lock short
35WRITE_PAUSE = 0.2 # seconds between write batches — let reads through
36
37DIM = "\033[2m"
38RESET = "\033[0m"
39
40
41def get_turso_url() -> str:
42 url = os.environ.get("TURSO_URL", "")
43 if not url:
44 print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1)
45 return url.replace("libsql://", "https://")
46
47
48def get_turso_token() -> str:
49 token = os.environ.get("TURSO_AUTH_TOKEN", "")
50 if not token:
51 print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1)
52 return token
53
54
55def turso_query(sql, args, turso_url, turso_token):
56 body = json.dumps({"requests": [
57 {"type": "execute", "stmt": {"sql": sql, "args": args}},
58 {"type": "close"},
59 ]}).encode()
60 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={
61 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json",
62 })
63 with urllib.request.urlopen(req, timeout=30) as resp:
64 result = json.loads(resp.read())
65 res = result["results"][0]
66 if res.get("type") == "error":
67 print(f" turso error: {res['error']['message']}", file=sys.stderr)
68 return []
69 cols = [c["name"] for c in res["response"]["result"]["cols"]]
70 return [{c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)}
71 for row in res["response"]["result"]["rows"]]
72
73
74def turso_batch_write(stmts, turso_url, turso_token):
75 reqs = [{"type": "execute", "stmt": s} for s in stmts]
76 reqs.append({"type": "close"})
77 body = json.dumps({"requests": reqs}).encode()
78 req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={
79 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json",
80 })
81 try:
82 with urllib.request.urlopen(req, timeout=60) as resp:
83 json.loads(resp.read())
84 return True
85 except Exception as e:
86 print(f"\n turso write failed: {e}", file=sys.stderr)
87 return False
88
89
90def extract_avatar_cid(url):
91 if not url: return ""
92 m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url)
93 return m.group(1) if m else ""
94
95
96def clean_associated(assoc):
97 if not assoc or not isinstance(assoc, dict): return "{}"
98 clean = {k: v for k, v in assoc.items() if v not in (0, False, None)}
99 return json.dumps(clean) if clean else "{}"
100
101
102def fetch_profiles(dids):
103 params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids)
104 req = urllib.request.Request(
105 f"{BSKY_GET_PROFILES}?{params}",
106 headers={"User-Agent": "typeahead-enrich/1.0"},
107 )
108 try:
109 with urllib.request.urlopen(req, timeout=15) as resp:
110 return json.loads(resp.read()).get("profiles", [])
111 except urllib.error.HTTPError as e:
112 if e.code == 429: return "rate_limited"
113 return []
114 except Exception:
115 return []
116
117
118def profile_to_stmt(p):
119 hide_vals = {"!hide", "!takedown", "!suspend", "spam"}
120 mod_did = "did:plc:ar7c4by46qjdydhdevvrndac"
121 hidden = 0
122 for lbl in (p.get("labels") or []):
123 if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did):
124 hidden = 1; break
125
126 return {
127 "sql": """UPDATE actors SET
128 handle = COALESCE(NULLIF(?2, ''), handle),
129 display_name = COALESCE(NULLIF(?3, ''), display_name),
130 avatar_url = COALESCE(NULLIF(?4, ''), avatar_url),
131 labels = ?5, hidden = ?6,
132 created_at = COALESCE(NULLIF(?7, ''), created_at),
133 associated = COALESCE(NULLIF(?8, '{}'), associated),
134 profile_checked_at = unixepoch()
135 WHERE did = ?1""",
136 "args": [
137 {"type": "text", "value": p["did"]},
138 {"type": "text", "value": p.get("handle", "")},
139 {"type": "text", "value": p.get("displayName", "")},
140 {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))},
141 {"type": "text", "value": json.dumps(p.get("labels", []))},
142 {"type": "integer", "value": str(hidden)},
143 {"type": "text", "value": p.get("createdAt", "")},
144 {"type": "text", "value": clean_associated(p.get("associated"))},
145 ],
146 }
147
148
149def main():
150 parser = argparse.ArgumentParser()
151 parser.add_argument("--dry-run", action="store_true")
152 parser.add_argument("--start-rowid", type=int, default=0)
153 args = parser.parse_args()
154
155 turso_url = get_turso_url()
156 turso_token = get_turso_token()
157
158 enriched = 0
159 checked = 0
160 not_found = 0
161 t0 = time.time()
162 last_rowid = args.start_rowid
163
164 print(f"bulk enriching (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH}, write_pause={WRITE_PAUSE}s)...")
165 if args.dry_run: print(" DRY RUN — no writes")
166 if args.start_rowid: print(f" resuming from rowid {args.start_rowid}")
167
168 while True:
169 rows = turso_query(
170 "SELECT rowid, did FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2",
171 [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(PAGE_SIZE)}],
172 turso_url, turso_token,
173 )
174 if not rows:
175 break
176
177 dids = [r["did"] for r in rows]
178 last_rowid = int(rows[-1]["rowid"])
179
180 # getProfiles in concurrent batches of 25
181 batches = [dids[i:i+25] for i in range(0, len(dids), 25)]
182 pending = []
183
184 with ThreadPoolExecutor(max_workers=BSKY_CONCURRENCY) as pool:
185 futures = {pool.submit(fetch_profiles, b): b for b in batches}
186 for future in as_completed(futures):
187 batch = futures[future]
188 result = future.result()
189
190 if result == "rate_limited":
191 print(f"\n rate limited — pausing 30s...")
192 time.sleep(30)
193 result = fetch_profiles(batch)
194 if result == "rate_limited":
195 print(" still limited, skipping batch")
196 checked += len(batch)
197 continue
198
199 checked += len(batch)
200 returned = {p["did"] for p in result}
201 not_found += len(batch) - len(returned)
202
203 for p in result:
204 pending.append(profile_to_stmt(p))
205 enriched += 1
206
207 # write in small batches with pauses between each
208 if pending and not args.dry_run:
209 write_t0 = time.time()
210 write_chunks = 0
211 for i in range(0, len(pending), WRITE_BATCH):
212 turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token)
213 write_chunks += 1
214 time.sleep(WRITE_PAUSE)
215 write_ms = int((time.time() - write_t0) * 1000)
216 print(f" wrote {len(pending)} stmts in {write_chunks} chunks ({write_ms}ms)")
217
218 elapsed = time.time() - t0
219 rate = checked / elapsed if elapsed > 0 else 0
220 tag = "dry" if args.dry_run else "live"
221 print(
222 f" [{tag}] checked={checked:,} enriched={enriched:,} "
223 f"not_found={not_found:,} "
224 f"{DIM}{rate:.0f} dids/s rowid={last_rowid}{RESET}"
225 )
226
227 elapsed = time.time() - t0
228 print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}, checked={checked:,}")
229
230
231if __name__ == "__main__":
232 main()