GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7smoke tests for typeahead service.
8
9verifies response shape, search behavior, CORS, and optional
10comparison against public.api.bsky.app.
11
12usage:
13 ./scripts/smoke.py --url https://typeahead.waow.tech
14 ./scripts/smoke.py --url http://localhost:8787
15 ./scripts/smoke.py --url https://typeahead.waow.tech --compare
16"""
17
18import argparse
19import json
20import os
21import sys
22import urllib.request
23import urllib.error
24
25PASS = "\033[32mpass\033[0m"
26FAIL = "\033[31mFAIL\033[0m"
27SKIP = "\033[33mskip\033[0m"
28
29failures = 0
30
31BSKY_PUBLIC = "https://public.api.bsky.app"
32XRPC_PATH = "/xrpc/app.bsky.actor.searchActorsTypeahead"
33
34
35def check(name: str, ok: bool, detail: str = ""):
36 global failures
37 tag = PASS if ok else FAIL
38 msg = f" [{tag}] {name}"
39 if detail:
40 msg += f" ({detail})"
41 print(msg)
42 if not ok:
43 failures += 1
44 return ok
45
46
47def fetch(url: str, timeout: int = 15, method: str = "GET") -> tuple[dict | None, dict]:
48 """fetch JSON + response headers. returns (body, headers)."""
49 try:
50 req = urllib.request.Request(url, headers={"User-Agent": "typeahead-smoke/1.0"}, method=method)
51 with urllib.request.urlopen(req, timeout=timeout) as resp:
52 headers = {k.lower(): v for k, v in resp.headers.items()}
53 return json.loads(resp.read()), headers
54 except urllib.error.HTTPError as e:
55 return {"_http_error": e.code}, {}
56 except Exception as e:
57 return {"_error": str(e)}, {}
58
59
60def test_response_shape(base_url: str):
61 print("\n--- response shape ---")
62 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=3")
63 check("returns valid JSON", data is not None and "_error" not in data)
64 if not data or "_error" in data or "_http_error" in data:
65 return
66
67 check("has actors array", isinstance(data.get("actors"), list))
68 actors = data.get("actors", [])
69 if actors:
70 a = actors[0]
71 check("actor has did", "did" in a)
72 check("actor has handle", "handle" in a)
73 check("did starts with did:", a.get("did", "").startswith("did:"))
74
75
76def test_known_handle(base_url: str):
77 print("\n--- known handle ---")
78 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzstoatzz&limit=10")
79 if not data or "_error" in data or "_http_error" in data:
80 check("fetch succeeded", False)
81 return
82
83 actors = data.get("actors", [])
84 handles = [a.get("handle", "") for a in actors]
85 check("zzstoatzz.io in results", "zzstoatzz.io" in handles, f"got {handles[:5]}")
86
87
88def test_prefix_match(base_url: str):
89 print("\n--- prefix match ---")
90 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzst&limit=10")
91 if not data or "_error" in data or "_http_error" in data:
92 check("fetch succeeded", False)
93 return
94
95 actors = data.get("actors", [])
96 handles = [a.get("handle", "") for a in actors]
97 check("prefix 'zzst' finds zzstoatzz.io", "zzstoatzz.io" in handles, f"got {handles[:5]}")
98
99
100def test_cors(base_url: str):
101 print("\n--- CORS headers ---")
102 _, headers = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=1")
103 origin = headers.get("access-control-allow-origin", "")
104 check("Access-Control-Allow-Origin: *", origin == "*", f"got '{origin}'")
105
106
107def test_deprecated_param(base_url: str):
108 print("\n--- deprecated param ---")
109 data_q, _ = fetch(f"{base_url}{XRPC_PATH}?q=nate&limit=5")
110 data_term, _ = fetch(f"{base_url}{XRPC_PATH}?term=nate&limit=5")
111
112 if not data_q or "_error" in data_q or not data_term or "_error" in data_term:
113 check("both params work", False)
114 return
115
116 actors_q = {a.get("did") for a in data_q.get("actors", [])}
117 actors_term = {a.get("did") for a in data_term.get("actors", [])}
118 check("?term= returns same as ?q=", actors_q == actors_term, f"q={len(actors_q)}, term={len(actors_term)}")
119
120
121def test_limit_bounds(base_url: str):
122 print("\n--- limit bounds ---")
123 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=a&limit=3")
124 if not data or "_error" in data or "_http_error" in data:
125 check("fetch succeeded", False)
126 return
127
128 actors = data.get("actors", [])
129 check("limit=3 returns ≤3", len(actors) <= 3, f"got {len(actors)}")
130
131
132def test_empty_query(base_url: str):
133 print("\n--- empty query ---")
134 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=")
135 check("empty query returns 400", data is not None and data.get("_http_error") == 400, f"got {data}")
136
137
138def test_limit_over_max(base_url: str):
139 print("\n--- limit > 100 ---")
140 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=200")
141 check("limit>100 returns 400", data is not None and data.get("_http_error") == 400, f"got {data}")
142
143
144def test_stats_page(base_url: str):
145 print("\n--- stats page ---")
146 try:
147 req = urllib.request.Request(
148 f"{base_url}/stats",
149 headers={"User-Agent": "typeahead-smoke/1.0"},
150 )
151 with urllib.request.urlopen(req, timeout=15) as resp:
152 ct = resp.headers.get("Content-Type", "")
153 body = resp.read().decode()
154 check("stats returns 200", resp.status == 200)
155 check("stats content-type is html", "text/html" in ct, f"got '{ct}'")
156 check("stats contains actors indexed", "actors indexed" in body)
157 check("stats contains sparkline heading", "searches / 5 min" in body)
158 check("stats has home link", 'href="/"' in body)
159 check("stats shows hidden count", "hidden by moderation" in body)
160 except urllib.error.HTTPError as e:
161 check("stats returns 200", False, f"got {e.code}")
162 except Exception as e:
163 check("stats fetch succeeded", False, str(e))
164
165
166def test_request_indexing(base_url: str):
167 print("\n--- request indexing ---")
168 # GET should redirect to homepage (302)
169 try:
170 req = urllib.request.Request(
171 f"{base_url}/request-indexing?handle=test",
172 headers={"User-Agent": "typeahead-smoke/1.0"},
173 )
174 opener = urllib.request.build_opener(urllib.request.HTTPHandler)
175 # don't follow redirects
176 class NoRedirect(urllib.request.HTTPRedirectHandler):
177 def redirect_request(self, req, fp, code, msg, headers, newurl):
178 raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
179 opener = urllib.request.build_opener(NoRedirect)
180 opener.open(req, timeout=15)
181 check("GET redirects (302)", False, "no redirect")
182 except urllib.error.HTTPError as e:
183 check("GET redirects (302)", e.code == 302, f"got {e.code}")
184
185 # POST with empty handle should return 400
186 data, _ = fetch(f"{base_url}/request-indexing", method="POST")
187 check("POST empty returns 400", data is not None and data.get("_http_error") == 400, f"got {data}")
188
189 # POST with valid handle should return JSON with handle/did/hidden
190 data, _ = fetch(f"{base_url}/request-indexing?handle=zzstoatzz.io", method="POST")
191 if not data or "_error" in data or "_http_error" in data:
192 check("POST valid handle", False, f"got {data}")
193 else:
194 check("POST returns handle", data.get("handle") == "zzstoatzz.io", f"got {data.get('handle')}")
195 check("POST returns did", data.get("did", "").startswith("did:"), f"got {data.get('did')}")
196 check("POST returns hidden field", "hidden" in data, f"keys: {list(data.keys())}")
197
198
199def test_moderation_filtering(base_url: str):
200 print("\n--- moderation filtering ---")
201 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=10")
202 if not data or "_error" in data or "_http_error" in data:
203 check("fetch succeeded", False)
204 return
205
206 actors = data.get("actors", [])
207
208 # labels, createdAt, associated are intentional fields returned for client rendering
209 allowed_keys = {"did", "handle", "displayName", "avatar", "labels", "createdAt", "associated"}
210 extra_keys: set[str] = set()
211 for a in actors:
212 extra_keys |= set(a.keys()) - allowed_keys
213 check("actor objects have expected shape", len(extra_keys) == 0, f"extra keys: {extra_keys}" if extra_keys else "")
214
215 # verify labels is always an array when present
216 for a in actors:
217 if "labels" in a:
218 check("labels is array", isinstance(a["labels"], list), f"got {type(a['labels']).__name__}")
219 break
220
221 # !no-unauthenticated actors should be VISIBLE (it's about content, not identity)
222 print("\n--- !no-unauthenticated inclusion ---")
223 found = find_noauth_actor(base_url)
224 if not found:
225 check("found a !no-unauthenticated actor to verify", False, "couldn't find one — skipping")
226
227
228def find_noauth_actor(base_url: str) -> bool:
229 """find an actor with !no-unauthenticated and verify they ARE included in our results."""
230 for q in ["alex", "sam", "chris", "jordan"]:
231 bsky_data, _ = fetch(f"{BSKY_PUBLIC}/xrpc/app.bsky.actor.searchActors?q={q}&limit=25")
232 if not bsky_data or "_error" in bsky_data:
233 continue
234
235 for actor in bsky_data.get("actors", []):
236 labels = actor.get("labels", [])
237 has_noauth = any(
238 l.get("val") == "!no-unauthenticated" and not l.get("neg")
239 for l in labels
240 )
241 if not has_noauth:
242 continue
243
244 handle = actor.get("handle", "")
245 if not handle:
246 continue
247
248 # this actor has !no-unauthenticated — check they ARE in our results
249 our_data, _ = fetch(f"{base_url}{XRPC_PATH}?q={handle}&limit=10")
250 if not our_data or "_error" in our_data:
251 continue
252
253 our_handles = {a.get("handle") for a in our_data.get("actors", [])}
254 if handle in our_handles:
255 check(f"!no-unauthenticated actor @{handle} visible in search", True)
256 return True
257 else:
258 check(f"!no-unauthenticated actor @{handle} visible in search", False, "not found in results")
259 return True
260
261 return False
262
263
264LINK_DID = "did:plc:63hvnyjvqi2nzzcsjgnry5we"
265LINK_HANDLE = "spacelawshitpost.me"
266
267
268def admin_fetch(url: str, secret: str, method: str = "GET", body: dict | None = None) -> tuple[dict | None, int]:
269 """fetch with admin auth. returns (body, status_code)."""
270 data = json.dumps(body).encode() if body else None
271 req = urllib.request.Request(
272 url,
273 data=data,
274 headers={
275 "User-Agent": "typeahead-smoke/1.0",
276 "Authorization": f"Bearer {secret}",
277 **({"Content-Type": "application/json"} if data else {}),
278 },
279 method=method,
280 )
281 try:
282 with urllib.request.urlopen(req, timeout=15) as resp:
283 return json.loads(resp.read()), resp.status
284 except urllib.error.HTTPError as e:
285 try:
286 return json.loads(e.read()), e.code
287 except Exception:
288 return None, e.code
289
290
291def test_link_visible(base_url: str):
292 """verify Link (spacelawshitpost.me) is visible — requires a permanent show override."""
293 print("\n--- Link visibility ---")
294 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=spacelawshitpost&limit=5")
295 if not data or "_error" in data or "_http_error" in data:
296 check("search for Link", False, f"got {data}")
297 return
298 handles = [a.get("handle", "") for a in data.get("actors", [])]
299 check(f"Link visible in search", LINK_HANDLE in handles, f"got {handles}")
300
301
302def test_mod_overrides(base_url: str, secret: str):
303 """exercise the override admin CRUD lifecycle."""
304 print("\n--- mod_overrides admin API ---")
305
306 # 1. ensure Link is indexed
307 data, _ = fetch(f"{base_url}/request-indexing?handle={LINK_HANDLE}", method="POST")
308 if not data or "_error" in data:
309 check("index Link", False, f"got {data}")
310 return
311 check("index Link", data.get("did") == LINK_DID, f"got {data.get('did')}")
312
313 # 2. set show override
314 data, status = admin_fetch(
315 f"{base_url}/admin/mod-override", secret, "POST",
316 {"did": LINK_DID, "action": "show", "reason": "FREE LINK"},
317 )
318 check("set show override", status == 200 and data and data.get("action") == "show",
319 f"status={status}, got {data}")
320 if data:
321 check("override sets hidden=0", data.get("hidden") == 0, f"got hidden={data.get('hidden')}")
322
323 # 3. validate input — bad action should 400
324 data, status = admin_fetch(
325 f"{base_url}/admin/mod-override", secret, "POST",
326 {"did": LINK_DID, "action": "yolo"},
327 )
328 check("bad action returns 400", status == 400, f"status={status}")
329
330 # 4. list overrides — Link should be there
331 data, status = admin_fetch(f"{base_url}/admin/mod-overrides", secret)
332 if data:
333 override_dids = [o.get("did") for o in data.get("overrides", [])]
334 check("Link in override list", LINK_DID in override_dids)
335 else:
336 check("list overrides", False, f"status={status}")
337
338 # 5. delete override — hidden recomputed from labels
339 data, status = admin_fetch(f"{base_url}/admin/mod-override?did={LINK_DID}", secret, "DELETE")
340 check("delete override", status == 200, f"status={status}, got {data}")
341
342 # 6. verify override is gone
343 data, status = admin_fetch(f"{base_url}/admin/mod-overrides", secret)
344 if data:
345 override_dids = [o.get("did") for o in data.get("overrides", [])]
346 check("Link removed from overrides", LINK_DID not in override_dids)
347 else:
348 check("list overrides after delete", False, f"status={status}")
349
350 # 7. re-set the override — Link should stay visible permanently
351 data, status = admin_fetch(
352 f"{base_url}/admin/mod-override", secret, "POST",
353 {"did": LINK_DID, "action": "show", "reason": "FREE LINK"},
354 )
355 check("re-set show override", status == 200 and data and data.get("hidden") == 0,
356 f"status={status}, got {data}")
357
358
359def test_comparison(base_url: str, queries: list[str]):
360 print("\n--- comparison vs public.api.bsky.app ---")
361
362 for q in queries:
363 sys.stdout.write(f" comparing '{q}'...")
364 sys.stdout.flush()
365
366 ours, _ = fetch(f"{base_url}{XRPC_PATH}?q={q}&limit=10")
367 theirs, _ = fetch(f"{BSKY_PUBLIC}{XRPC_PATH}?q={q}&limit=10")
368
369 if not ours or "_error" in ours or not theirs or "_error" in theirs:
370 sys.stdout.write(f"\r [{FAIL}] '{q}': fetch failed\n")
371 continue
372
373 our_handles = {a.get("handle") for a in ours.get("actors", [])}
374 their_handles = {a.get("handle") for a in theirs.get("actors", [])}
375
376 overlap = our_handles & their_handles
377 pct = (len(overlap) / len(their_handles) * 100) if their_handles else 0
378 sys.stdout.write(
379 f"\r [{PASS}] '{q}': {len(overlap)}/{len(their_handles)} overlap ({pct:.0f}%)"
380 f" — ours={len(our_handles)}, theirs={len(their_handles)}\n"
381 )
382
383
384def main():
385 parser = argparse.ArgumentParser(description="typeahead smoke tests")
386 parser.add_argument("--url", required=True, help="typeahead service URL")
387 parser.add_argument("--secret", default=os.environ.get("TYPEAHEAD_SECRET", ""),
388 help="admin secret (or set TYPEAHEAD_SECRET env var)")
389 parser.add_argument("--compare", action="store_true", help="compare results vs public.api.bsky.app")
390 parser.add_argument(
391 "--queries",
392 nargs="+",
393 default=["nate", "zzstoatzz", "paul", "dan", "sky"],
394 help="queries for comparison test",
395 )
396 args = parser.parse_args()
397
398 print(f"typeahead: {args.url}")
399
400 test_response_shape(args.url)
401 test_known_handle(args.url)
402 test_prefix_match(args.url)
403 test_cors(args.url)
404 test_deprecated_param(args.url)
405 test_limit_bounds(args.url)
406 test_empty_query(args.url)
407 test_limit_over_max(args.url)
408 test_stats_page(args.url)
409 test_request_indexing(args.url)
410 test_moderation_filtering(args.url)
411 test_link_visible(args.url)
412
413 if args.secret:
414 test_mod_overrides(args.url, args.secret)
415 else:
416 print(f"\n--- mod_overrides admin API ---")
417 print(f" [{SKIP}] skipped (use --secret or set TYPEAHEAD_SECRET)")
418
419 if args.compare:
420 test_comparison(args.url, args.queries)
421 else:
422 print(f"\n--- comparison ---")
423 print(f" [{SKIP}] skipped (use --compare)")
424
425 print()
426 if failures == 0:
427 print("all checks passed.")
428 else:
429 print(f"{failures} check(s) failed.")
430 return 1 if failures else 0
431
432
433if __name__ == "__main__":
434 sys.exit(main())