GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 319 lines 11 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7discover and index all handles under a domain suffix. 8 9discovery methods: 10 bsky — search bluesky's public API (fast, but misses handles bluesky hasn't indexed) 11 plc — stream PLC directory export (comprehensive, slower — use --after to limit range) 12 pds — enumerate repos on a PDS, resolve handles via PLC (best for domains that run their own PDS) 13 14usage: 15 ./scripts/index-domain.py tngl.sh --source pds --pds https://tngl.sh 16 ./scripts/index-domain.py tngl.sh --source plc --after 2026-01-01 17 ./scripts/index-domain.py bsky.team --dry-run 18 ./scripts/index-domain.py mycustomdomain.com --concurrency 10 19""" 20 21import argparse 22import json 23import sys 24import urllib.parse 25import urllib.request 26import urllib.error 27from concurrent.futures import ThreadPoolExecutor, as_completed 28from typing import TypedDict 29 30 31class Actor(TypedDict): 32 did: str 33 handle: str 34 35 36class IndexResult(TypedDict, total=False): 37 handle: str 38 did: str 39 hidden: bool 40 error: str 41 42BSKY_SEARCH = "https://public.api.bsky.app/xrpc/app.bsky.actor.searchActors" 43PLC_EXPORT = "https://plc.directory/export" 44TYPEAHEAD_URL = "https://typeahead.waow.tech" 45SEARCH_LIMIT = 100 46PLC_PAGE_SIZE = 1000 47 48DIM = "\033[2m" 49GREEN = "\033[32m" 50YELLOW = "\033[33m" 51RED = "\033[31m" 52RESET = "\033[0m" 53 54 55def search_bsky(suffix: str) -> list[Actor]: 56 """paginate through bluesky searchActors for handles ending in .suffix""" 57 found = {} 58 cursor = None 59 page = 0 60 61 while True: 62 page += 1 63 url = f"{BSKY_SEARCH}?q={urllib.parse.quote(suffix)}&limit={SEARCH_LIMIT}" 64 if cursor: 65 url += f"&cursor={urllib.parse.quote(cursor)}" 66 67 try: 68 req = urllib.request.Request(url) 69 with urllib.request.urlopen(req, timeout=15) as resp: 70 data = json.loads(resp.read()) 71 except Exception as e: 72 print(f"{RED}search error page {page}: {e}{RESET}") 73 break 74 75 actors = data.get("actors", []) 76 if not actors: 77 break 78 79 new = 0 80 for actor in actors: 81 handle = actor.get("handle", "") 82 did = actor.get("did", "") 83 if handle.endswith(f".{suffix}") and did not in found: 84 found[did] = handle 85 new += 1 86 87 print(f"{DIM}page {page}: {len(actors)} results, {new} new matches ({len(found)} total){RESET}") 88 89 cursor = data.get("cursor") 90 if not cursor or len(actors) < SEARCH_LIMIT: 91 break 92 93 return [{"did": did, "handle": handle} for did, handle in found.items()] 94 95 96def search_plc(suffix: str, after: str | None = None) -> list[Actor]: 97 """stream PLC directory export, filtering for handles ending in .suffix""" 98 found = {} 99 cursor = after or "1970-01-01T00:00:00Z" 100 pages = 0 101 total_ops = 0 102 103 while True: 104 pages += 1 105 url = f"{PLC_EXPORT}?count={PLC_PAGE_SIZE}&after={urllib.parse.quote(cursor)}" 106 107 try: 108 req = urllib.request.Request(url) 109 with urllib.request.urlopen(req, timeout=30) as resp: 110 lines = resp.read().decode().strip().split("\n") 111 except Exception as e: 112 print(f"{RED}PLC export error at cursor {cursor}: {e}{RESET}") 113 break 114 115 if not lines or lines == [""]: 116 break 117 118 batch_new = 0 119 last_created = cursor 120 for line in lines: 121 if not line.strip(): 122 continue 123 total_ops += 1 124 try: 125 entry = json.loads(line) 126 except json.JSONDecodeError: 127 continue 128 129 last_created = entry.get("createdAt", last_created) 130 did = entry.get("did", "") 131 op = entry.get("operation", {}) 132 133 # newer format: alsoKnownAs 134 for aka in op.get("alsoKnownAs", []): 135 handle = aka.removeprefix("at://") 136 if handle.endswith(f".{suffix}") and did not in found: 137 found[did] = handle 138 batch_new += 1 139 140 # older format: handle field 141 handle = op.get("handle", "") 142 if handle.endswith(f".{suffix}") and did not in found: 143 found[did] = handle 144 batch_new += 1 145 146 if batch_new: 147 print(f"{DIM}page {pages}: scanned {total_ops} ops, +{batch_new} new ({len(found)} total){RESET}") 148 elif pages % 100 == 0: 149 print(f"{DIM}page {pages}: scanned {total_ops} ops, {len(found)} matches so far (at {last_created[:10]}){RESET}") 150 151 if len(lines) < PLC_PAGE_SIZE: 152 break 153 154 cursor = last_created 155 156 print(f"{DIM}scanned {total_ops} PLC operations across {pages} pages{RESET}") 157 return [{"did": did, "handle": handle} for did, handle in found.items()] 158 159 160def search_pds(suffix: str, pds_url: str) -> list[Actor]: 161 """enumerate all repos on a PDS via com.atproto.sync.listRepos, resolve handles via PLC""" 162 # step 1: collect all DIDs from the PDS 163 all_dids = [] 164 cursor = "" 165 page = 0 166 167 while True: 168 page += 1 169 url = f"{pds_url}/xrpc/com.atproto.sync.listRepos?limit=1000" 170 if cursor: 171 url += f"&cursor={urllib.parse.quote(cursor)}" 172 173 try: 174 req = urllib.request.Request(url) 175 with urllib.request.urlopen(req, timeout=30) as resp: 176 data = json.loads(resp.read()) 177 except Exception as e: 178 print(f"{RED}PDS listRepos error page {page}: {e}{RESET}") 179 break 180 181 repos = data.get("repos", []) 182 all_dids.extend(r["did"] for r in repos) 183 print(f"{DIM}page {page}: {len(repos)} repos ({len(all_dids)} total){RESET}") 184 185 cursor = data.get("cursor", "") 186 if not cursor or len(repos) == 0: 187 break 188 189 print(f"found {len(all_dids)} accounts on PDS, resolving handles via PLC...") 190 191 # step 2: resolve each DID via PLC directory (concurrent) 192 found = {} 193 errors = 0 194 195 def resolve_did(did): 196 req = urllib.request.Request(f"https://plc.directory/{did}") 197 with urllib.request.urlopen(req, timeout=10) as resp: 198 doc = json.loads(resp.read()) 199 for aka in doc.get("alsoKnownAs", []): 200 handle = aka.removeprefix("at://") 201 if handle.endswith(f".{suffix}"): 202 return did, handle 203 return None, None 204 205 with ThreadPoolExecutor(max_workers=20) as pool: 206 futures = {pool.submit(resolve_did, did): did for did in all_dids} 207 done = 0 208 for future in as_completed(futures): 209 done += 1 210 try: 211 did, handle = future.result() 212 if did: 213 found[did] = handle 214 except Exception: 215 errors += 1 216 if done % 200 == 0: 217 print(f"{DIM} resolved {done}/{len(all_dids)} ({len(found)} matches){RESET}") 218 219 print(f"{DIM}resolved {len(all_dids)} DIDs: {len(found)} matches, {errors} errors{RESET}") 220 return [{"did": did, "handle": handle} for did, handle in found.items()] 221 222 223def index_one(handle: str, token: str | None = None) -> IndexResult: 224 """call /request-indexing for a single handle""" 225 url = f"{TYPEAHEAD_URL}/request-indexing?handle={urllib.parse.quote(handle)}" 226 headers: dict[str, str] = {"User-Agent": "typeahead-index-domain/1.0"} 227 if token: 228 headers["Authorization"] = f"Bearer {token}" 229 req = urllib.request.Request(url, method="POST", headers=headers) 230 try: 231 with urllib.request.urlopen(req, timeout=15) as resp: 232 return json.loads(resp.read()) 233 except urllib.error.HTTPError as e: 234 body = e.read().decode() if e.fp else "" 235 return {"error": f"HTTP {e.code}: {body}"} 236 except Exception as e: 237 return {"error": str(e)} 238 239 240def main(): 241 parser = argparse.ArgumentParser(description="discover and index handles under a domain suffix") 242 parser.add_argument("suffix", help="domain suffix, e.g. tngl.io") 243 parser.add_argument("--source", choices=["bsky", "plc", "pds"], default="bsky", 244 help="discovery method (default: bsky)") 245 parser.add_argument("--pds", help="for pds source: the PDS URL (e.g. https://tngl.sh)") 246 parser.add_argument("--after", help="for plc source: only scan operations after this date (e.g. 2026-01-01)") 247 parser.add_argument("--dry-run", action="store_true", help="discover only, don't index") 248 parser.add_argument("--token", help="admin token to bypass rate limiting (reads ADMIN_SECRET env var if not set)") 249 parser.add_argument("--concurrency", type=int, default=5, help="concurrent indexing requests (default: 5)") 250 args = parser.parse_args() 251 252 suffix = args.suffix.lstrip("*.") 253 254 if args.source == "pds": 255 if not args.pds: 256 print(f"{RED}--pds URL required for pds source{RESET}") 257 sys.exit(1) 258 print(f"enumerating repos on {args.pds} for *.{suffix} handles...") 259 actors = search_pds(suffix, args.pds.rstrip("/")) 260 elif args.source == "plc": 261 after = args.after 262 if after and "T" not in after: 263 after += "T00:00:00Z" 264 print(f"scanning PLC directory for *.{suffix}" + (f" (after {after})" if after else "") + "...") 265 actors = search_plc(suffix, after) 266 else: 267 print(f"searching bluesky for *.{suffix} handles...") 268 actors = search_bsky(suffix) 269 270 if not actors: 271 print(f"no handles found matching *.{suffix}") 272 if args.source == "bsky": 273 print(f"{DIM}tip: try --source plc --after 2025-01-01 for a more comprehensive scan{RESET}") 274 return 275 276 print(f"\nfound {len(actors)} handles:") 277 for a in sorted(actors, key=lambda x: x["handle"]): 278 print(f" {a['handle']} ({a['did']})") 279 280 if args.dry_run: 281 print(f"\n{YELLOW}dry run — skipping indexing{RESET}") 282 return 283 284 import os 285 token = args.token or os.environ.get("ADMIN_SECRET") 286 287 print(f"\nindexing {len(actors)} handles (concurrency={args.concurrency}" + (", admin auth" if token else ", no auth — may hit rate limit") + ")...") 288 indexed = 0 289 hidden = 0 290 errors = 0 291 done = 0 292 verbose = len(actors) <= 50 293 294 with ThreadPoolExecutor(max_workers=args.concurrency) as pool: 295 futures = {pool.submit(index_one, a["handle"], token): a for a in actors} 296 for future in as_completed(futures): 297 actor = futures[future] 298 result = future.result() 299 done += 1 300 if "error" in result: 301 errors += 1 302 if verbose: 303 print(f" {RED}{actor['handle']}: {result['error']}{RESET}") 304 elif result.get("hidden"): 305 hidden += 1 306 if verbose: 307 print(f" {YELLOW}· {actor['handle']} (hidden){RESET}") 308 else: 309 indexed += 1 310 if verbose: 311 print(f" {GREEN}{actor['handle']}{RESET}") 312 if not verbose and done % 100 == 0: 313 print(f"{DIM} {done}/{len(actors)} ({indexed} ok, {hidden} hidden, {errors} errors){RESET}") 314 315 print(f"\ndone: {indexed} indexed, {hidden} hidden, {errors} errors") 316 317 318if __name__ == "__main__": 319 main()