GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7discover and index all handles under a domain suffix.
8
9discovery methods:
10 bsky — search bluesky's public API (fast, but misses handles bluesky hasn't indexed)
11 plc — stream PLC directory export (comprehensive, slower — use --after to limit range)
12 pds — enumerate repos on a PDS, resolve handles via PLC (best for domains that run their own PDS)
13
14usage:
15 ./scripts/index-domain.py tngl.sh --source pds --pds https://tngl.sh
16 ./scripts/index-domain.py tngl.sh --source plc --after 2026-01-01
17 ./scripts/index-domain.py bsky.team --dry-run
18 ./scripts/index-domain.py mycustomdomain.com --concurrency 10
19"""
20
21import argparse
22import json
23import sys
24import urllib.parse
25import urllib.request
26import urllib.error
27from concurrent.futures import ThreadPoolExecutor, as_completed
28from typing import TypedDict
29
30
31class Actor(TypedDict):
32 did: str
33 handle: str
34
35
36class IndexResult(TypedDict, total=False):
37 handle: str
38 did: str
39 hidden: bool
40 error: str
41
42BSKY_SEARCH = "https://public.api.bsky.app/xrpc/app.bsky.actor.searchActors"
43PLC_EXPORT = "https://plc.directory/export"
44TYPEAHEAD_URL = "https://typeahead.waow.tech"
45SEARCH_LIMIT = 100
46PLC_PAGE_SIZE = 1000
47
48DIM = "\033[2m"
49GREEN = "\033[32m"
50YELLOW = "\033[33m"
51RED = "\033[31m"
52RESET = "\033[0m"
53
54
55def search_bsky(suffix: str) -> list[Actor]:
56 """paginate through bluesky searchActors for handles ending in .suffix"""
57 found = {}
58 cursor = None
59 page = 0
60
61 while True:
62 page += 1
63 url = f"{BSKY_SEARCH}?q={urllib.parse.quote(suffix)}&limit={SEARCH_LIMIT}"
64 if cursor:
65 url += f"&cursor={urllib.parse.quote(cursor)}"
66
67 try:
68 req = urllib.request.Request(url)
69 with urllib.request.urlopen(req, timeout=15) as resp:
70 data = json.loads(resp.read())
71 except Exception as e:
72 print(f"{RED}search error page {page}: {e}{RESET}")
73 break
74
75 actors = data.get("actors", [])
76 if not actors:
77 break
78
79 new = 0
80 for actor in actors:
81 handle = actor.get("handle", "")
82 did = actor.get("did", "")
83 if handle.endswith(f".{suffix}") and did not in found:
84 found[did] = handle
85 new += 1
86
87 print(f"{DIM}page {page}: {len(actors)} results, {new} new matches ({len(found)} total){RESET}")
88
89 cursor = data.get("cursor")
90 if not cursor or len(actors) < SEARCH_LIMIT:
91 break
92
93 return [{"did": did, "handle": handle} for did, handle in found.items()]
94
95
96def search_plc(suffix: str, after: str | None = None) -> list[Actor]:
97 """stream PLC directory export, filtering for handles ending in .suffix"""
98 found = {}
99 cursor = after or "1970-01-01T00:00:00Z"
100 pages = 0
101 total_ops = 0
102
103 while True:
104 pages += 1
105 url = f"{PLC_EXPORT}?count={PLC_PAGE_SIZE}&after={urllib.parse.quote(cursor)}"
106
107 try:
108 req = urllib.request.Request(url)
109 with urllib.request.urlopen(req, timeout=30) as resp:
110 lines = resp.read().decode().strip().split("\n")
111 except Exception as e:
112 print(f"{RED}PLC export error at cursor {cursor}: {e}{RESET}")
113 break
114
115 if not lines or lines == [""]:
116 break
117
118 batch_new = 0
119 last_created = cursor
120 for line in lines:
121 if not line.strip():
122 continue
123 total_ops += 1
124 try:
125 entry = json.loads(line)
126 except json.JSONDecodeError:
127 continue
128
129 last_created = entry.get("createdAt", last_created)
130 did = entry.get("did", "")
131 op = entry.get("operation", {})
132
133 # newer format: alsoKnownAs
134 for aka in op.get("alsoKnownAs", []):
135 handle = aka.removeprefix("at://")
136 if handle.endswith(f".{suffix}") and did not in found:
137 found[did] = handle
138 batch_new += 1
139
140 # older format: handle field
141 handle = op.get("handle", "")
142 if handle.endswith(f".{suffix}") and did not in found:
143 found[did] = handle
144 batch_new += 1
145
146 if batch_new:
147 print(f"{DIM}page {pages}: scanned {total_ops} ops, +{batch_new} new ({len(found)} total){RESET}")
148 elif pages % 100 == 0:
149 print(f"{DIM}page {pages}: scanned {total_ops} ops, {len(found)} matches so far (at {last_created[:10]}){RESET}")
150
151 if len(lines) < PLC_PAGE_SIZE:
152 break
153
154 cursor = last_created
155
156 print(f"{DIM}scanned {total_ops} PLC operations across {pages} pages{RESET}")
157 return [{"did": did, "handle": handle} for did, handle in found.items()]
158
159
160def search_pds(suffix: str, pds_url: str) -> list[Actor]:
161 """enumerate all repos on a PDS via com.atproto.sync.listRepos, resolve handles via PLC"""
162 # step 1: collect all DIDs from the PDS
163 all_dids = []
164 cursor = ""
165 page = 0
166
167 while True:
168 page += 1
169 url = f"{pds_url}/xrpc/com.atproto.sync.listRepos?limit=1000"
170 if cursor:
171 url += f"&cursor={urllib.parse.quote(cursor)}"
172
173 try:
174 req = urllib.request.Request(url)
175 with urllib.request.urlopen(req, timeout=30) as resp:
176 data = json.loads(resp.read())
177 except Exception as e:
178 print(f"{RED}PDS listRepos error page {page}: {e}{RESET}")
179 break
180
181 repos = data.get("repos", [])
182 all_dids.extend(r["did"] for r in repos)
183 print(f"{DIM}page {page}: {len(repos)} repos ({len(all_dids)} total){RESET}")
184
185 cursor = data.get("cursor", "")
186 if not cursor or len(repos) == 0:
187 break
188
189 print(f"found {len(all_dids)} accounts on PDS, resolving handles via PLC...")
190
191 # step 2: resolve each DID via PLC directory (concurrent)
192 found = {}
193 errors = 0
194
195 def resolve_did(did):
196 req = urllib.request.Request(f"https://plc.directory/{did}")
197 with urllib.request.urlopen(req, timeout=10) as resp:
198 doc = json.loads(resp.read())
199 for aka in doc.get("alsoKnownAs", []):
200 handle = aka.removeprefix("at://")
201 if handle.endswith(f".{suffix}"):
202 return did, handle
203 return None, None
204
205 with ThreadPoolExecutor(max_workers=20) as pool:
206 futures = {pool.submit(resolve_did, did): did for did in all_dids}
207 done = 0
208 for future in as_completed(futures):
209 done += 1
210 try:
211 did, handle = future.result()
212 if did:
213 found[did] = handle
214 except Exception:
215 errors += 1
216 if done % 200 == 0:
217 print(f"{DIM} resolved {done}/{len(all_dids)} ({len(found)} matches){RESET}")
218
219 print(f"{DIM}resolved {len(all_dids)} DIDs: {len(found)} matches, {errors} errors{RESET}")
220 return [{"did": did, "handle": handle} for did, handle in found.items()]
221
222
223def index_one(handle: str, token: str | None = None) -> IndexResult:
224 """call /request-indexing for a single handle"""
225 url = f"{TYPEAHEAD_URL}/request-indexing?handle={urllib.parse.quote(handle)}"
226 headers: dict[str, str] = {"User-Agent": "typeahead-index-domain/1.0"}
227 if token:
228 headers["Authorization"] = f"Bearer {token}"
229 req = urllib.request.Request(url, method="POST", headers=headers)
230 try:
231 with urllib.request.urlopen(req, timeout=15) as resp:
232 return json.loads(resp.read())
233 except urllib.error.HTTPError as e:
234 body = e.read().decode() if e.fp else ""
235 return {"error": f"HTTP {e.code}: {body}"}
236 except Exception as e:
237 return {"error": str(e)}
238
239
240def main():
241 parser = argparse.ArgumentParser(description="discover and index handles under a domain suffix")
242 parser.add_argument("suffix", help="domain suffix, e.g. tngl.io")
243 parser.add_argument("--source", choices=["bsky", "plc", "pds"], default="bsky",
244 help="discovery method (default: bsky)")
245 parser.add_argument("--pds", help="for pds source: the PDS URL (e.g. https://tngl.sh)")
246 parser.add_argument("--after", help="for plc source: only scan operations after this date (e.g. 2026-01-01)")
247 parser.add_argument("--dry-run", action="store_true", help="discover only, don't index")
248 parser.add_argument("--token", help="admin token to bypass rate limiting (reads ADMIN_SECRET env var if not set)")
249 parser.add_argument("--concurrency", type=int, default=5, help="concurrent indexing requests (default: 5)")
250 args = parser.parse_args()
251
252 suffix = args.suffix.lstrip("*.")
253
254 if args.source == "pds":
255 if not args.pds:
256 print(f"{RED}--pds URL required for pds source{RESET}")
257 sys.exit(1)
258 print(f"enumerating repos on {args.pds} for *.{suffix} handles...")
259 actors = search_pds(suffix, args.pds.rstrip("/"))
260 elif args.source == "plc":
261 after = args.after
262 if after and "T" not in after:
263 after += "T00:00:00Z"
264 print(f"scanning PLC directory for *.{suffix}" + (f" (after {after})" if after else "") + "...")
265 actors = search_plc(suffix, after)
266 else:
267 print(f"searching bluesky for *.{suffix} handles...")
268 actors = search_bsky(suffix)
269
270 if not actors:
271 print(f"no handles found matching *.{suffix}")
272 if args.source == "bsky":
273 print(f"{DIM}tip: try --source plc --after 2025-01-01 for a more comprehensive scan{RESET}")
274 return
275
276 print(f"\nfound {len(actors)} handles:")
277 for a in sorted(actors, key=lambda x: x["handle"]):
278 print(f" {a['handle']} ({a['did']})")
279
280 if args.dry_run:
281 print(f"\n{YELLOW}dry run — skipping indexing{RESET}")
282 return
283
284 import os
285 token = args.token or os.environ.get("ADMIN_SECRET")
286
287 print(f"\nindexing {len(actors)} handles (concurrency={args.concurrency}" + (", admin auth" if token else ", no auth — may hit rate limit") + ")...")
288 indexed = 0
289 hidden = 0
290 errors = 0
291 done = 0
292 verbose = len(actors) <= 50
293
294 with ThreadPoolExecutor(max_workers=args.concurrency) as pool:
295 futures = {pool.submit(index_one, a["handle"], token): a for a in actors}
296 for future in as_completed(futures):
297 actor = futures[future]
298 result = future.result()
299 done += 1
300 if "error" in result:
301 errors += 1
302 if verbose:
303 print(f" {RED}✗ {actor['handle']}: {result['error']}{RESET}")
304 elif result.get("hidden"):
305 hidden += 1
306 if verbose:
307 print(f" {YELLOW}· {actor['handle']} (hidden){RESET}")
308 else:
309 indexed += 1
310 if verbose:
311 print(f" {GREEN}✓ {actor['handle']}{RESET}")
312 if not verbose and done % 100 == 0:
313 print(f"{DIM} {done}/{len(actors)} ({indexed} ok, {hidden} hidden, {errors} errors){RESET}")
314
315 print(f"\ndone: {indexed} indexed, {hidden} hidden, {errors} errors")
316
317
318if __name__ == "__main__":
319 main()