GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7targeted backfill: fetch follows/followers from specific accounts,
8enrich via getProfiles, upsert into turso.
9
10usage:
11 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/targeted-backfill.py
12"""
13
14import json
15import os
16import re
17import sys
18import time
19import urllib.request
20import urllib.error
21
22BSKY = "https://public.api.bsky.app/xrpc"
23WRITE_BATCH = 50
24WRITE_PAUSE = 0.2
25
26def env(k):
27 v = os.environ.get(k, "")
28 if not v: print(f"error: {k} not set", file=sys.stderr); sys.exit(1)
29 return v
30
31def turso_batch_write(stmts, url, token):
32 reqs = [{"type": "execute", "stmt": s} for s in stmts]
33 reqs.append({"type": "close"})
34 body = json.dumps({"requests": reqs}).encode()
35 req = urllib.request.Request(f"{url}/v3/pipeline", data=body, headers={
36 "Authorization": f"Bearer {token}", "Content-Type": "application/json",
37 })
38 try:
39 with urllib.request.urlopen(req, timeout=60) as resp:
40 json.loads(resp.read())
41 return True
42 except Exception as e:
43 print(f" turso write failed: {e}", file=sys.stderr)
44 return False
45
46def fetch_all(endpoint, actor, key):
47 """page through a list endpoint, return all items"""
48 items = []
49 cursor = None
50 while True:
51 url = f"{BSKY}/{endpoint}?actor={actor}&limit=100"
52 if cursor: url += f"&cursor={cursor}"
53 req = urllib.request.Request(url)
54 with urllib.request.urlopen(req, timeout=15) as resp:
55 data = json.loads(resp.read())
56 batch = data.get(key, [])
57 items.extend(batch)
58 cursor = data.get("cursor")
59 print(f" fetched {len(items)} {key}...", end="\r")
60 if not cursor or not batch:
61 break
62 time.sleep(0.1)
63 print()
64 return items
65
66def fetch_profiles(dids):
67 params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids)
68 req = urllib.request.Request(
69 f"{BSKY}/app.bsky.actor.getProfiles?{params}",
70 headers={"User-Agent": "typeahead-enrich/1.0"},
71 )
72 try:
73 with urllib.request.urlopen(req, timeout=15) as resp:
74 return json.loads(resp.read()).get("profiles", [])
75 except urllib.error.HTTPError as e:
76 if e.code == 429: return "rate_limited"
77 return []
78 except Exception:
79 return []
80
81def extract_avatar_cid(url):
82 if not url: return ""
83 m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url)
84 return m.group(1) if m else ""
85
86def clean_associated(assoc):
87 if not assoc or not isinstance(assoc, dict): return "{}"
88 clean = {k: v for k, v in assoc.items() if v not in (0, False, None)}
89 return json.dumps(clean) if clean else "{}"
90
91def profile_to_stmt(p):
92 hide_vals = {"!hide", "!takedown", "!suspend", "spam"}
93 mod_did = "did:plc:ar7c4by46qjdydhdevvrndac"
94 hidden = 0
95 for lbl in (p.get("labels") or []):
96 if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did):
97 hidden = 1; break
98 return {
99 "sql": """INSERT INTO actors (did, handle, display_name, avatar_url, labels, hidden, created_at, associated, updated_at)
100 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, unixepoch())
101 ON CONFLICT(did) DO UPDATE SET
102 handle = COALESCE(NULLIF(?2, ''), actors.handle),
103 display_name = COALESCE(NULLIF(?3, ''), actors.display_name),
104 avatar_url = COALESCE(NULLIF(?4, ''), actors.avatar_url),
105 labels = ?5, hidden = ?6,
106 created_at = COALESCE(NULLIF(?7, ''), actors.created_at),
107 associated = COALESCE(NULLIF(?8, '{}'), actors.associated),
108 profile_checked_at = unixepoch(),
109 updated_at = unixepoch()""",
110 "args": [
111 {"type": "text", "value": p["did"]},
112 {"type": "text", "value": p.get("handle", "")},
113 {"type": "text", "value": p.get("displayName", "")},
114 {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))},
115 {"type": "text", "value": json.dumps(p.get("labels", []))},
116 {"type": "integer", "value": str(hidden)},
117 {"type": "text", "value": p.get("createdAt", "")},
118 {"type": "text", "value": clean_associated(p.get("associated"))},
119 ],
120 }
121
122def main():
123 turso_url = env("TURSO_URL").replace("libsql://", "https://")
124 turso_token = env("TURSO_AUTH_TOKEN")
125
126 # collect DIDs
127 dids = set()
128
129 print("atprotocol.dev → following:")
130 follows = fetch_all("app.bsky.graph.getFollows", "atprotocol.dev", "follows")
131 for f in follows:
132 dids.add(f["did"])
133
134 print("atmosphereconf.org → followers:")
135 followers = fetch_all("app.bsky.graph.getFollowers", "atmosphereconf.org", "followers")
136 for f in followers:
137 dids.add(f["did"])
138
139 dids = list(dids)
140 print(f"\n{len(dids)} unique DIDs to enrich")
141
142 # enrich via getProfiles in batches of 25
143 enriched = 0
144 t0 = time.time()
145 all_stmts = []
146
147 for i in range(0, len(dids), 25):
148 batch = dids[i:i+25]
149 result = fetch_profiles(batch)
150 if result == "rate_limited":
151 print(" rate limited — pausing 30s...")
152 time.sleep(30)
153 result = fetch_profiles(batch)
154 if result == "rate_limited":
155 print(" still limited, skipping")
156 continue
157 for p in result:
158 all_stmts.append(profile_to_stmt(p))
159 enriched += 1
160 print(f" enriched {enriched}/{len(dids)}...", end="\r")
161 time.sleep(0.05)
162
163 print(f"\n\nenriched {enriched} profiles, writing to turso...")
164
165 # write in small batches
166 for i in range(0, len(all_stmts), WRITE_BATCH):
167 chunk = all_stmts[i:i+WRITE_BATCH]
168 turso_batch_write(chunk, turso_url, turso_token)
169 print(f" wrote {min(i+WRITE_BATCH, len(all_stmts))}/{len(all_stmts)}", end="\r")
170 time.sleep(WRITE_PAUSE)
171
172 elapsed = time.time() - t0
173 print(f"\n\ndone in {elapsed:.0f}s. enriched {enriched} actors.")
174
175if __name__ == "__main__":
176 main()