GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7bulk enrichment: walk actors table and populate labels, handle, display_name,
8avatar_url via bsky's public getProfiles API (25 DIDs per call).
9
10targets actors with labels='[]' (default). also recomputes hidden from full
11label data, fixing actors incorrectly hidden by stale !no-unauthenticated logic.
12
13usage:
14 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py
15 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --dry-run --limit 100
16 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --limit 1000 --offset 50000
17"""
18
19import argparse
20import json
21import os
22import re
23import sys
24import time
25import urllib.request
26
27GET_PROFILES_URL = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles"
28BSKY_MOD_DID = "did:plc:ar7c4by46qjdydhdevvrndac"
29MOD_HIDE_VALS = {"!hide", "!takedown", "spam"}
30
31BATCH_SIZE = 25 # getProfiles limit
32PAGE_SIZE = 500 # actors per Turso query
33TURSO_BATCH_SIZE = 200 # statements per pipeline call
34DELAY = 0.3 # seconds between getProfiles calls
35
36DIM = "\033[2m"
37RESET = "\033[0m"
38
39
40# --- turso helpers (from migrate-to-turso.py) ---
41
42def get_turso_url() -> str:
43 url = os.environ.get("TURSO_URL", "")
44 if not url:
45 print("error: TURSO_URL not set", file=sys.stderr)
46 sys.exit(1)
47 return url.replace("libsql://", "https://")
48
49
50def get_turso_token() -> str:
51 token = os.environ.get("TURSO_AUTH_TOKEN", "")
52 if not token:
53 print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr)
54 sys.exit(1)
55 return token
56
57
58def turso_query(sql: str, args: list, turso_url: str, turso_token: str) -> list[dict]:
59 """single query via pipeline API, returns rows as dicts."""
60 body = json.dumps({
61 "requests": [
62 {"type": "execute", "stmt": {"sql": sql, "args": args}},
63 {"type": "close"},
64 ]
65 }).encode()
66 req = urllib.request.Request(
67 f"{turso_url}/v3/pipeline",
68 data=body,
69 headers={
70 "Authorization": f"Bearer {turso_token}",
71 "Content-Type": "application/json",
72 },
73 )
74 try:
75 with urllib.request.urlopen(req, timeout=30) as resp:
76 result = json.loads(resp.read())
77 res = result["results"][0]
78 if res.get("type") == "error":
79 print(f" turso error: {res['error']['message']}", file=sys.stderr)
80 return []
81 cols = [c["name"] for c in res["response"]["result"]["cols"]]
82 rows = []
83 for row in res["response"]["result"]["rows"]:
84 rows.append({c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)})
85 return rows
86 except Exception as e:
87 print(f" turso query failed: {e}", file=sys.stderr)
88 return []
89
90
91def turso_batch(stmts: list[dict], turso_url: str, turso_token: str) -> bool:
92 """execute a batch of statements against Turso via HTTP pipeline API."""
93 requests = [{"type": "execute", "stmt": s} for s in stmts]
94 requests.append({"type": "close"})
95
96 body = json.dumps({"requests": requests}).encode()
97 req = urllib.request.Request(
98 f"{turso_url}/v3/pipeline",
99 data=body,
100 headers={
101 "Authorization": f"Bearer {turso_token}",
102 "Content-Type": "application/json",
103 },
104 )
105 try:
106 with urllib.request.urlopen(req, timeout=60) as resp:
107 result = json.loads(resp.read())
108 for r in result.get("results", []):
109 if r.get("type") == "error":
110 print(f" turso error: {r.get('error', {}).get('message', 'unknown')}")
111 return False
112 return True
113 except urllib.error.HTTPError as e:
114 err_body = e.read().decode()[:300]
115 print(f" turso HTTP {e.code}: {err_body}", file=sys.stderr)
116 return False
117 except Exception as e:
118 print(f" turso request failed: {e}", file=sys.stderr)
119 return False
120
121
122# --- bsky helpers ---
123
124def fetch_profiles(dids: list[str]) -> list[dict]:
125 params = "&".join(f"actors={d}" for d in dids)
126 url = f"{GET_PROFILES_URL}?{params}"
127 req = urllib.request.Request(url, headers={"User-Agent": "typeahead-profile-backfill/1.0"})
128 try:
129 with urllib.request.urlopen(req, timeout=15) as resp:
130 data = json.loads(resp.read())
131 return data.get("profiles", [])
132 except urllib.error.HTTPError as e:
133 if e.code == 429:
134 print("\n rate limited — pausing 60s")
135 time.sleep(60)
136 return fetch_profiles(dids) # retry once
137 print(f"\n HTTP {e.code}")
138 return []
139 except Exception as e:
140 print(f"\n error: {e}")
141 return []
142
143
144_AVATAR_CID_RE = re.compile(r"/([^/]+)@jpeg$")
145
146
147def extract_avatar_cid(url: str) -> str:
148 """extract CID from bsky CDN avatar URL — matches worker's extractAvatarCid."""
149 m = _AVATAR_CID_RE.search(url)
150 return m.group(1) if m else ""
151
152
153def _parse_ts(s: str) -> float:
154 from datetime import datetime
155 try:
156 return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() * 1000
157 except Exception:
158 return 0
159
160
161def should_hide(labels: list | None) -> bool:
162 """matches worker's shouldHide (src/index.ts:133) — only bsky mod service labels."""
163 if not labels:
164 return False
165 now = time.time() * 1000
166 for l in labels:
167 if l.get("neg"):
168 continue
169 if l.get("exp") and _parse_ts(l["exp"]) <= now:
170 continue
171 if l.get("src") == BSKY_MOD_DID and l.get("val") in MOD_HIDE_VALS:
172 return True
173 return False
174
175
176# --- main ---
177
178def main():
179 parser = argparse.ArgumentParser(description="bulk profile backfill via getProfiles")
180 parser.add_argument("--dry-run", action="store_true", help="query + fetch but don't write")
181 parser.add_argument("--limit", type=int, default=0, help="stop after N actors (0 = all)")
182 parser.add_argument("--offset", type=int, default=0, help="start from rowid offset")
183 args = parser.parse_args()
184
185 turso_url = get_turso_url()
186 turso_token = get_turso_token()
187
188 # stats
189 total_queried = 0
190 total_enriched = 0
191 total_hidden = 0
192 total_missing = 0 # DIDs not returned by getProfiles
193 offset = args.offset
194
195 print(f"backfill-profiles: {'DRY RUN' if args.dry_run else 'LIVE'}")
196 if args.limit:
197 print(f" limit: {args.limit} actors")
198 if args.offset:
199 print(f" starting at offset: {args.offset}")
200
201 while True:
202 # check limit
203 if args.limit and total_queried >= args.limit:
204 break
205
206 page_limit = PAGE_SIZE
207 if args.limit:
208 page_limit = min(PAGE_SIZE, args.limit - total_queried)
209
210 rows = turso_query(
211 "SELECT did FROM actors WHERE labels = '[]' ORDER BY rowid ASC LIMIT ? OFFSET ?",
212 [{"type": "integer", "value": str(page_limit)}, {"type": "integer", "value": str(offset)}],
213 turso_url, turso_token,
214 )
215 if not rows:
216 break
217
218 page_enriched = 0
219 page_hidden = 0
220 page_missing = 0
221 stmts = []
222
223 for i in range(0, len(rows), BATCH_SIZE):
224 batch = rows[i : i + BATCH_SIZE]
225 dids = [r["did"] for r in batch]
226
227 if i > 0 or total_queried > 0:
228 time.sleep(DELAY)
229
230 profiles = fetch_profiles(dids)
231 returned_dids = {p["did"] for p in profiles}
232
233 for p in profiles:
234 handle = p.get("handle", "")
235 display_name = p.get("displayName", "")
236 avatar_cid = extract_avatar_cid(p.get("avatar", ""))
237 labels = p.get("labels", [])
238 hide = 1 if should_hide(labels) else 0
239
240 if hide:
241 page_hidden += 1
242
243 stmts.append({
244 "sql": (
245 "UPDATE actors SET "
246 "handle = COALESCE(NULLIF(?2, ''), handle), "
247 "display_name = COALESCE(NULLIF(?3, ''), display_name), "
248 "avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), "
249 "labels = ?5, "
250 "hidden = ?6, "
251 "updated_at = unixepoch() "
252 "WHERE did = ?1"
253 ),
254 "args": [
255 {"type": "text", "value": p["did"]},
256 {"type": "text", "value": handle},
257 {"type": "text", "value": display_name},
258 {"type": "text", "value": avatar_cid},
259 {"type": "text", "value": json.dumps(labels)},
260 {"type": "integer", "value": str(hide)},
261 ],
262 })
263 page_enriched += 1
264
265 # mark missing DIDs so cron skips them
266 for did in dids:
267 if did not in returned_dids:
268 page_missing += 1
269 stmts.append({
270 "sql": "UPDATE actors SET identity_checked_at = unixepoch() WHERE did = ?1",
271 "args": [{"type": "text", "value": did}],
272 })
273
274 # flush writes
275 if stmts and not args.dry_run:
276 for i in range(0, len(stmts), TURSO_BATCH_SIZE):
277 batch = stmts[i : i + TURSO_BATCH_SIZE]
278 if not turso_batch(batch, turso_url, turso_token):
279 print(f"\n batch write failed at offset={offset}")
280 return
281
282 total_queried += len(rows)
283 total_enriched += page_enriched
284 total_hidden += page_hidden
285 total_missing += page_missing
286 offset += len(rows)
287
288 tag = "dry" if args.dry_run else "live"
289 sys.stdout.write(
290 f"\r [{tag}] queried={total_queried} enriched={total_enriched} "
291 f"hidden={total_hidden} missing={total_missing} "
292 f"{DIM}offset={offset}{RESET} "
293 )
294 sys.stdout.flush()
295
296 if len(rows) < page_limit:
297 break
298
299 print(f"\n\ndone. queried={total_queried}, enriched={total_enriched}, hidden={total_hidden}, missing={total_missing}")
300
301
302if __name__ == "__main__":
303 main()