GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7smoke tests for typeahead service.
8
9verifies response shape, search behavior, CORS, and optional
10comparison against public.api.bsky.app.
11
12usage:
13 ./scripts/smoke.py --url https://typeahead.waow.tech
14 ./scripts/smoke.py --url http://localhost:8787
15 ./scripts/smoke.py --url https://typeahead.waow.tech --compare
16"""
17
18import argparse
19import json
20import sys
21import urllib.request
22import urllib.error
23
24PASS = "\033[32mpass\033[0m"
25FAIL = "\033[31mFAIL\033[0m"
26SKIP = "\033[33mskip\033[0m"
27
28failures = 0
29
30BSKY_PUBLIC = "https://public.api.bsky.app"
31XRPC_PATH = "/xrpc/app.bsky.actor.searchActorsTypeahead"
32
33
34def check(name: str, ok: bool, detail: str = ""):
35 global failures
36 tag = PASS if ok else FAIL
37 msg = f" [{tag}] {name}"
38 if detail:
39 msg += f" ({detail})"
40 print(msg)
41 if not ok:
42 failures += 1
43 return ok
44
45
46def fetch(url: str, timeout: int = 15, method: str = "GET") -> tuple[dict | None, dict]:
47 """fetch JSON + response headers. returns (body, headers)."""
48 try:
49 req = urllib.request.Request(url, headers={"User-Agent": "typeahead-smoke/1.0"}, method=method)
50 with urllib.request.urlopen(req, timeout=timeout) as resp:
51 headers = {k.lower(): v for k, v in resp.headers.items()}
52 return json.loads(resp.read()), headers
53 except urllib.error.HTTPError as e:
54 return {"_http_error": e.code}, {}
55 except Exception as e:
56 return {"_error": str(e)}, {}
57
58
59def test_response_shape(base_url: str):
60 print("\n--- response shape ---")
61 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=3")
62 check("returns valid JSON", data is not None and "_error" not in data)
63 if not data or "_error" in data or "_http_error" in data:
64 return
65
66 check("has actors array", isinstance(data.get("actors"), list))
67 actors = data.get("actors", [])
68 if actors:
69 a = actors[0]
70 check("actor has did", "did" in a)
71 check("actor has handle", "handle" in a)
72 check("did starts with did:", a.get("did", "").startswith("did:"))
73
74
75def test_known_handle(base_url: str):
76 print("\n--- known handle ---")
77 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzstoatzz&limit=10")
78 if not data or "_error" in data or "_http_error" in data:
79 check("fetch succeeded", False)
80 return
81
82 actors = data.get("actors", [])
83 handles = [a.get("handle", "") for a in actors]
84 check("zzstoatzz.io in results", "zzstoatzz.io" in handles, f"got {handles[:5]}")
85
86
87def test_prefix_match(base_url: str):
88 print("\n--- prefix match ---")
89 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzst&limit=10")
90 if not data or "_error" in data or "_http_error" in data:
91 check("fetch succeeded", False)
92 return
93
94 actors = data.get("actors", [])
95 handles = [a.get("handle", "") for a in actors]
96 check("prefix 'zzst' finds zzstoatzz.io", "zzstoatzz.io" in handles, f"got {handles[:5]}")
97
98
99def test_cors(base_url: str):
100 print("\n--- CORS headers ---")
101 _, headers = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=1")
102 origin = headers.get("access-control-allow-origin", "")
103 check("Access-Control-Allow-Origin: *", origin == "*", f"got '{origin}'")
104
105
106def test_deprecated_param(base_url: str):
107 print("\n--- deprecated param ---")
108 data_q, _ = fetch(f"{base_url}{XRPC_PATH}?q=nate&limit=5")
109 data_term, _ = fetch(f"{base_url}{XRPC_PATH}?term=nate&limit=5")
110
111 if not data_q or "_error" in data_q or not data_term or "_error" in data_term:
112 check("both params work", False)
113 return
114
115 actors_q = {a.get("did") for a in data_q.get("actors", [])}
116 actors_term = {a.get("did") for a in data_term.get("actors", [])}
117 check("?term= returns same as ?q=", actors_q == actors_term, f"q={len(actors_q)}, term={len(actors_term)}")
118
119
120def test_limit_bounds(base_url: str):
121 print("\n--- limit bounds ---")
122 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=a&limit=3")
123 if not data or "_error" in data or "_http_error" in data:
124 check("fetch succeeded", False)
125 return
126
127 actors = data.get("actors", [])
128 check("limit=3 returns ≤3", len(actors) <= 3, f"got {len(actors)}")
129
130
131def test_empty_query(base_url: str):
132 print("\n--- empty query ---")
133 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=")
134 check("empty query returns 400", data is not None and data.get("_http_error") == 400, f"got {data}")
135
136
137def test_limit_over_max(base_url: str):
138 print("\n--- limit > 100 ---")
139 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=200")
140 check("limit>100 returns 400", data is not None and data.get("_http_error") == 400, f"got {data}")
141
142
143def test_stats_page(base_url: str):
144 print("\n--- stats page ---")
145 try:
146 req = urllib.request.Request(
147 f"{base_url}/stats",
148 headers={"User-Agent": "typeahead-smoke/1.0"},
149 )
150 with urllib.request.urlopen(req, timeout=15) as resp:
151 ct = resp.headers.get("Content-Type", "")
152 body = resp.read().decode()
153 check("stats returns 200", resp.status == 200)
154 check("stats content-type is html", "text/html" in ct, f"got '{ct}'")
155 check("stats contains actors indexed", "actors indexed" in body)
156 check("stats contains sparkline heading", "searches / 5 min" in body)
157 check("stats has home link", 'href="/"' in body)
158 check("stats shows hidden count", "hidden by moderation" in body)
159 except urllib.error.HTTPError as e:
160 check("stats returns 200", False, f"got {e.code}")
161 except Exception as e:
162 check("stats fetch succeeded", False, str(e))
163
164
165def test_request_indexing(base_url: str):
166 print("\n--- request indexing ---")
167 # GET should redirect to homepage (302)
168 try:
169 req = urllib.request.Request(
170 f"{base_url}/request-indexing?handle=test",
171 headers={"User-Agent": "typeahead-smoke/1.0"},
172 )
173 opener = urllib.request.build_opener(urllib.request.HTTPHandler)
174 # don't follow redirects
175 class NoRedirect(urllib.request.HTTPRedirectHandler):
176 def redirect_request(self, req, fp, code, msg, headers, newurl):
177 raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
178 opener = urllib.request.build_opener(NoRedirect)
179 opener.open(req, timeout=15)
180 check("GET redirects (302)", False, "no redirect")
181 except urllib.error.HTTPError as e:
182 check("GET redirects (302)", e.code == 302, f"got {e.code}")
183
184 # POST with empty handle should return 400
185 data, _ = fetch(f"{base_url}/request-indexing", method="POST")
186 check("POST empty returns 400", data is not None and data.get("_http_error") == 400, f"got {data}")
187
188 # POST with valid handle should return JSON with handle/did/hidden
189 data, _ = fetch(f"{base_url}/request-indexing?handle=zzstoatzz.io", method="POST")
190 if not data or "_error" in data or "_http_error" in data:
191 check("POST valid handle", False, f"got {data}")
192 else:
193 check("POST returns handle", data.get("handle") == "zzstoatzz.io", f"got {data.get('handle')}")
194 check("POST returns did", data.get("did", "").startswith("did:"), f"got {data.get('did')}")
195 check("POST returns hidden field", "hidden" in data, f"keys: {list(data.keys())}")
196
197
198def test_moderation_filtering(base_url: str):
199 print("\n--- moderation filtering ---")
200 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=10")
201 if not data or "_error" in data or "_http_error" in data:
202 check("fetch succeeded", False)
203 return
204
205 actors = data.get("actors", [])
206
207 # labels, createdAt, associated are intentional fields returned for client rendering
208 allowed_keys = {"did", "handle", "displayName", "avatar", "labels", "createdAt", "associated"}
209 extra_keys: set[str] = set()
210 for a in actors:
211 extra_keys |= set(a.keys()) - allowed_keys
212 check("actor objects have expected shape", len(extra_keys) == 0, f"extra keys: {extra_keys}" if extra_keys else "")
213
214 # verify labels is always an array when present
215 for a in actors:
216 if "labels" in a:
217 check("labels is array", isinstance(a["labels"], list), f"got {type(a['labels']).__name__}")
218 break
219
220 # !no-unauthenticated actors should be VISIBLE (it's about content, not identity)
221 print("\n--- !no-unauthenticated inclusion ---")
222 found = find_noauth_actor(base_url)
223 if not found:
224 check("found a !no-unauthenticated actor to verify", False, "couldn't find one — skipping")
225
226
227def find_noauth_actor(base_url: str) -> bool:
228 """find an actor with !no-unauthenticated and verify they ARE included in our results."""
229 for q in ["alex", "sam", "chris", "jordan"]:
230 bsky_data, _ = fetch(f"{BSKY_PUBLIC}/xrpc/app.bsky.actor.searchActors?q={q}&limit=25")
231 if not bsky_data or "_error" in bsky_data:
232 continue
233
234 for actor in bsky_data.get("actors", []):
235 labels = actor.get("labels", [])
236 has_noauth = any(
237 l.get("val") == "!no-unauthenticated" and not l.get("neg")
238 for l in labels
239 )
240 if not has_noauth:
241 continue
242
243 handle = actor.get("handle", "")
244 if not handle:
245 continue
246
247 # this actor has !no-unauthenticated — check they ARE in our results
248 our_data, _ = fetch(f"{base_url}{XRPC_PATH}?q={handle}&limit=10")
249 if not our_data or "_error" in our_data:
250 continue
251
252 our_handles = {a.get("handle") for a in our_data.get("actors", [])}
253 if handle in our_handles:
254 check(f"!no-unauthenticated actor @{handle} visible in search", True)
255 return True
256 else:
257 check(f"!no-unauthenticated actor @{handle} visible in search", False, "not found in results")
258 return True
259
260 return False
261
262
263def test_comparison(base_url: str, queries: list[str]):
264 print("\n--- comparison vs public.api.bsky.app ---")
265
266 for q in queries:
267 sys.stdout.write(f" comparing '{q}'...")
268 sys.stdout.flush()
269
270 ours, _ = fetch(f"{base_url}{XRPC_PATH}?q={q}&limit=10")
271 theirs, _ = fetch(f"{BSKY_PUBLIC}{XRPC_PATH}?q={q}&limit=10")
272
273 if not ours or "_error" in ours or not theirs or "_error" in theirs:
274 sys.stdout.write(f"\r [{FAIL}] '{q}': fetch failed\n")
275 continue
276
277 our_handles = {a.get("handle") for a in ours.get("actors", [])}
278 their_handles = {a.get("handle") for a in theirs.get("actors", [])}
279
280 overlap = our_handles & their_handles
281 pct = (len(overlap) / len(their_handles) * 100) if their_handles else 0
282 sys.stdout.write(
283 f"\r [{PASS}] '{q}': {len(overlap)}/{len(their_handles)} overlap ({pct:.0f}%)"
284 f" — ours={len(our_handles)}, theirs={len(their_handles)}\n"
285 )
286
287
288def main():
289 parser = argparse.ArgumentParser(description="typeahead smoke tests")
290 parser.add_argument("--url", required=True, help="typeahead service URL")
291 parser.add_argument("--compare", action="store_true", help="compare results vs public.api.bsky.app")
292 parser.add_argument(
293 "--queries",
294 nargs="+",
295 default=["nate", "zzstoatzz", "paul", "dan", "sky"],
296 help="queries for comparison test",
297 )
298 args = parser.parse_args()
299
300 print(f"typeahead: {args.url}")
301
302 test_response_shape(args.url)
303 test_known_handle(args.url)
304 test_prefix_match(args.url)
305 test_cors(args.url)
306 test_deprecated_param(args.url)
307 test_limit_bounds(args.url)
308 test_empty_query(args.url)
309 test_limit_over_max(args.url)
310 test_stats_page(args.url)
311 test_request_indexing(args.url)
312 test_moderation_filtering(args.url)
313
314 if args.compare:
315 test_comparison(args.url, args.queries)
316 else:
317 print(f"\n--- comparison ---")
318 print(f" [{SKIP}] skipped (use --compare)")
319
320 print()
321 if failures == 0:
322 print("all checks passed.")
323 else:
324 print(f"{failures} check(s) failed.")
325 return 1 if failures else 0
326
327
328if __name__ == "__main__":
329 sys.exit(main())