GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 434 lines 17 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7smoke tests for typeahead service. 8 9verifies response shape, search behavior, CORS, and optional 10comparison against public.api.bsky.app. 11 12usage: 13 ./scripts/smoke.py --url https://typeahead.waow.tech 14 ./scripts/smoke.py --url http://localhost:8787 15 ./scripts/smoke.py --url https://typeahead.waow.tech --compare 16""" 17 18import argparse 19import json 20import os 21import sys 22import urllib.request 23import urllib.error 24 25PASS = "\033[32mpass\033[0m" 26FAIL = "\033[31mFAIL\033[0m" 27SKIP = "\033[33mskip\033[0m" 28 29failures = 0 30 31BSKY_PUBLIC = "https://public.api.bsky.app" 32XRPC_PATH = "/xrpc/app.bsky.actor.searchActorsTypeahead" 33 34 35def check(name: str, ok: bool, detail: str = ""): 36 global failures 37 tag = PASS if ok else FAIL 38 msg = f" [{tag}] {name}" 39 if detail: 40 msg += f" ({detail})" 41 print(msg) 42 if not ok: 43 failures += 1 44 return ok 45 46 47def fetch(url: str, timeout: int = 15, method: str = "GET") -> tuple[dict | None, dict]: 48 """fetch JSON + response headers. returns (body, headers).""" 49 try: 50 req = urllib.request.Request(url, headers={"User-Agent": "typeahead-smoke/1.0"}, method=method) 51 with urllib.request.urlopen(req, timeout=timeout) as resp: 52 headers = {k.lower(): v for k, v in resp.headers.items()} 53 return json.loads(resp.read()), headers 54 except urllib.error.HTTPError as e: 55 return {"_http_error": e.code}, {} 56 except Exception as e: 57 return {"_error": str(e)}, {} 58 59 60def test_response_shape(base_url: str): 61 print("\n--- response shape ---") 62 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=3") 63 check("returns valid JSON", data is not None and "_error" not in data) 64 if not data or "_error" in data or "_http_error" in data: 65 return 66 67 check("has actors array", isinstance(data.get("actors"), list)) 68 actors = data.get("actors", []) 69 if actors: 70 a = actors[0] 71 check("actor has did", "did" in a) 72 check("actor has handle", "handle" in a) 73 check("did starts with did:", a.get("did", "").startswith("did:")) 74 75 76def test_known_handle(base_url: str): 77 print("\n--- known handle ---") 78 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzstoatzz&limit=10") 79 if not data or "_error" in data or "_http_error" in data: 80 check("fetch succeeded", False) 81 return 82 83 actors = data.get("actors", []) 84 handles = [a.get("handle", "") for a in actors] 85 check("zzstoatzz.io in results", "zzstoatzz.io" in handles, f"got {handles[:5]}") 86 87 88def test_prefix_match(base_url: str): 89 print("\n--- prefix match ---") 90 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzst&limit=10") 91 if not data or "_error" in data or "_http_error" in data: 92 check("fetch succeeded", False) 93 return 94 95 actors = data.get("actors", []) 96 handles = [a.get("handle", "") for a in actors] 97 check("prefix 'zzst' finds zzstoatzz.io", "zzstoatzz.io" in handles, f"got {handles[:5]}") 98 99 100def test_cors(base_url: str): 101 print("\n--- CORS headers ---") 102 _, headers = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=1") 103 origin = headers.get("access-control-allow-origin", "") 104 check("Access-Control-Allow-Origin: *", origin == "*", f"got '{origin}'") 105 106 107def test_deprecated_param(base_url: str): 108 print("\n--- deprecated param ---") 109 data_q, _ = fetch(f"{base_url}{XRPC_PATH}?q=nate&limit=5") 110 data_term, _ = fetch(f"{base_url}{XRPC_PATH}?term=nate&limit=5") 111 112 if not data_q or "_error" in data_q or not data_term or "_error" in data_term: 113 check("both params work", False) 114 return 115 116 actors_q = {a.get("did") for a in data_q.get("actors", [])} 117 actors_term = {a.get("did") for a in data_term.get("actors", [])} 118 check("?term= returns same as ?q=", actors_q == actors_term, f"q={len(actors_q)}, term={len(actors_term)}") 119 120 121def test_limit_bounds(base_url: str): 122 print("\n--- limit bounds ---") 123 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=a&limit=3") 124 if not data or "_error" in data or "_http_error" in data: 125 check("fetch succeeded", False) 126 return 127 128 actors = data.get("actors", []) 129 check("limit=3 returns ≤3", len(actors) <= 3, f"got {len(actors)}") 130 131 132def test_empty_query(base_url: str): 133 print("\n--- empty query ---") 134 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=") 135 check("empty query returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 136 137 138def test_limit_over_max(base_url: str): 139 print("\n--- limit > 100 ---") 140 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=200") 141 check("limit>100 returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 142 143 144def test_stats_page(base_url: str): 145 print("\n--- stats page ---") 146 try: 147 req = urllib.request.Request( 148 f"{base_url}/stats", 149 headers={"User-Agent": "typeahead-smoke/1.0"}, 150 ) 151 with urllib.request.urlopen(req, timeout=15) as resp: 152 ct = resp.headers.get("Content-Type", "") 153 body = resp.read().decode() 154 check("stats returns 200", resp.status == 200) 155 check("stats content-type is html", "text/html" in ct, f"got '{ct}'") 156 check("stats contains actors indexed", "actors indexed" in body) 157 check("stats contains sparkline heading", "searches / 5 min" in body) 158 check("stats has home link", 'href="/"' in body) 159 check("stats shows hidden count", "hidden by moderation" in body) 160 except urllib.error.HTTPError as e: 161 check("stats returns 200", False, f"got {e.code}") 162 except Exception as e: 163 check("stats fetch succeeded", False, str(e)) 164 165 166def test_request_indexing(base_url: str): 167 print("\n--- request indexing ---") 168 # GET should redirect to homepage (302) 169 try: 170 req = urllib.request.Request( 171 f"{base_url}/request-indexing?handle=test", 172 headers={"User-Agent": "typeahead-smoke/1.0"}, 173 ) 174 opener = urllib.request.build_opener(urllib.request.HTTPHandler) 175 # don't follow redirects 176 class NoRedirect(urllib.request.HTTPRedirectHandler): 177 def redirect_request(self, req, fp, code, msg, headers, newurl): 178 raise urllib.error.HTTPError(newurl, code, msg, headers, fp) 179 opener = urllib.request.build_opener(NoRedirect) 180 opener.open(req, timeout=15) 181 check("GET redirects (302)", False, "no redirect") 182 except urllib.error.HTTPError as e: 183 check("GET redirects (302)", e.code == 302, f"got {e.code}") 184 185 # POST with empty handle should return 400 186 data, _ = fetch(f"{base_url}/request-indexing", method="POST") 187 check("POST empty returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 188 189 # POST with valid handle should return JSON with handle/did/hidden 190 data, _ = fetch(f"{base_url}/request-indexing?handle=zzstoatzz.io", method="POST") 191 if not data or "_error" in data or "_http_error" in data: 192 check("POST valid handle", False, f"got {data}") 193 else: 194 check("POST returns handle", data.get("handle") == "zzstoatzz.io", f"got {data.get('handle')}") 195 check("POST returns did", data.get("did", "").startswith("did:"), f"got {data.get('did')}") 196 check("POST returns hidden field", "hidden" in data, f"keys: {list(data.keys())}") 197 198 199def test_moderation_filtering(base_url: str): 200 print("\n--- moderation filtering ---") 201 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=10") 202 if not data or "_error" in data or "_http_error" in data: 203 check("fetch succeeded", False) 204 return 205 206 actors = data.get("actors", []) 207 208 # labels, createdAt, associated are intentional fields returned for client rendering 209 allowed_keys = {"did", "handle", "displayName", "avatar", "labels", "createdAt", "associated"} 210 extra_keys: set[str] = set() 211 for a in actors: 212 extra_keys |= set(a.keys()) - allowed_keys 213 check("actor objects have expected shape", len(extra_keys) == 0, f"extra keys: {extra_keys}" if extra_keys else "") 214 215 # verify labels is always an array when present 216 for a in actors: 217 if "labels" in a: 218 check("labels is array", isinstance(a["labels"], list), f"got {type(a['labels']).__name__}") 219 break 220 221 # !no-unauthenticated actors should be VISIBLE (it's about content, not identity) 222 print("\n--- !no-unauthenticated inclusion ---") 223 found = find_noauth_actor(base_url) 224 if not found: 225 check("found a !no-unauthenticated actor to verify", False, "couldn't find one — skipping") 226 227 228def find_noauth_actor(base_url: str) -> bool: 229 """find an actor with !no-unauthenticated and verify they ARE included in our results.""" 230 for q in ["alex", "sam", "chris", "jordan"]: 231 bsky_data, _ = fetch(f"{BSKY_PUBLIC}/xrpc/app.bsky.actor.searchActors?q={q}&limit=25") 232 if not bsky_data or "_error" in bsky_data: 233 continue 234 235 for actor in bsky_data.get("actors", []): 236 labels = actor.get("labels", []) 237 has_noauth = any( 238 l.get("val") == "!no-unauthenticated" and not l.get("neg") 239 for l in labels 240 ) 241 if not has_noauth: 242 continue 243 244 handle = actor.get("handle", "") 245 if not handle: 246 continue 247 248 # this actor has !no-unauthenticated — check they ARE in our results 249 our_data, _ = fetch(f"{base_url}{XRPC_PATH}?q={handle}&limit=10") 250 if not our_data or "_error" in our_data: 251 continue 252 253 our_handles = {a.get("handle") for a in our_data.get("actors", [])} 254 if handle in our_handles: 255 check(f"!no-unauthenticated actor @{handle} visible in search", True) 256 return True 257 else: 258 check(f"!no-unauthenticated actor @{handle} visible in search", False, "not found in results") 259 return True 260 261 return False 262 263 264LINK_DID = "did:plc:63hvnyjvqi2nzzcsjgnry5we" 265LINK_HANDLE = "spacelawshitpost.me" 266 267 268def admin_fetch(url: str, secret: str, method: str = "GET", body: dict | None = None) -> tuple[dict | None, int]: 269 """fetch with admin auth. returns (body, status_code).""" 270 data = json.dumps(body).encode() if body else None 271 req = urllib.request.Request( 272 url, 273 data=data, 274 headers={ 275 "User-Agent": "typeahead-smoke/1.0", 276 "Authorization": f"Bearer {secret}", 277 **({"Content-Type": "application/json"} if data else {}), 278 }, 279 method=method, 280 ) 281 try: 282 with urllib.request.urlopen(req, timeout=15) as resp: 283 return json.loads(resp.read()), resp.status 284 except urllib.error.HTTPError as e: 285 try: 286 return json.loads(e.read()), e.code 287 except Exception: 288 return None, e.code 289 290 291def test_link_visible(base_url: str): 292 """verify Link (spacelawshitpost.me) is visible — requires a permanent show override.""" 293 print("\n--- Link visibility ---") 294 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=spacelawshitpost&limit=5") 295 if not data or "_error" in data or "_http_error" in data: 296 check("search for Link", False, f"got {data}") 297 return 298 handles = [a.get("handle", "") for a in data.get("actors", [])] 299 check(f"Link visible in search", LINK_HANDLE in handles, f"got {handles}") 300 301 302def test_mod_overrides(base_url: str, secret: str): 303 """exercise the override admin CRUD lifecycle.""" 304 print("\n--- mod_overrides admin API ---") 305 306 # 1. ensure Link is indexed 307 data, _ = fetch(f"{base_url}/request-indexing?handle={LINK_HANDLE}", method="POST") 308 if not data or "_error" in data: 309 check("index Link", False, f"got {data}") 310 return 311 check("index Link", data.get("did") == LINK_DID, f"got {data.get('did')}") 312 313 # 2. set show override 314 data, status = admin_fetch( 315 f"{base_url}/admin/mod-override", secret, "POST", 316 {"did": LINK_DID, "action": "show", "reason": "FREE LINK"}, 317 ) 318 check("set show override", status == 200 and data and data.get("action") == "show", 319 f"status={status}, got {data}") 320 if data: 321 check("override sets hidden=0", data.get("hidden") == 0, f"got hidden={data.get('hidden')}") 322 323 # 3. validate input — bad action should 400 324 data, status = admin_fetch( 325 f"{base_url}/admin/mod-override", secret, "POST", 326 {"did": LINK_DID, "action": "yolo"}, 327 ) 328 check("bad action returns 400", status == 400, f"status={status}") 329 330 # 4. list overrides — Link should be there 331 data, status = admin_fetch(f"{base_url}/admin/mod-overrides", secret) 332 if data: 333 override_dids = [o.get("did") for o in data.get("overrides", [])] 334 check("Link in override list", LINK_DID in override_dids) 335 else: 336 check("list overrides", False, f"status={status}") 337 338 # 5. delete override — hidden recomputed from labels 339 data, status = admin_fetch(f"{base_url}/admin/mod-override?did={LINK_DID}", secret, "DELETE") 340 check("delete override", status == 200, f"status={status}, got {data}") 341 342 # 6. verify override is gone 343 data, status = admin_fetch(f"{base_url}/admin/mod-overrides", secret) 344 if data: 345 override_dids = [o.get("did") for o in data.get("overrides", [])] 346 check("Link removed from overrides", LINK_DID not in override_dids) 347 else: 348 check("list overrides after delete", False, f"status={status}") 349 350 # 7. re-set the override — Link should stay visible permanently 351 data, status = admin_fetch( 352 f"{base_url}/admin/mod-override", secret, "POST", 353 {"did": LINK_DID, "action": "show", "reason": "FREE LINK"}, 354 ) 355 check("re-set show override", status == 200 and data and data.get("hidden") == 0, 356 f"status={status}, got {data}") 357 358 359def test_comparison(base_url: str, queries: list[str]): 360 print("\n--- comparison vs public.api.bsky.app ---") 361 362 for q in queries: 363 sys.stdout.write(f" comparing '{q}'...") 364 sys.stdout.flush() 365 366 ours, _ = fetch(f"{base_url}{XRPC_PATH}?q={q}&limit=10") 367 theirs, _ = fetch(f"{BSKY_PUBLIC}{XRPC_PATH}?q={q}&limit=10") 368 369 if not ours or "_error" in ours or not theirs or "_error" in theirs: 370 sys.stdout.write(f"\r [{FAIL}] '{q}': fetch failed\n") 371 continue 372 373 our_handles = {a.get("handle") for a in ours.get("actors", [])} 374 their_handles = {a.get("handle") for a in theirs.get("actors", [])} 375 376 overlap = our_handles & their_handles 377 pct = (len(overlap) / len(their_handles) * 100) if their_handles else 0 378 sys.stdout.write( 379 f"\r [{PASS}] '{q}': {len(overlap)}/{len(their_handles)} overlap ({pct:.0f}%)" 380 f" — ours={len(our_handles)}, theirs={len(their_handles)}\n" 381 ) 382 383 384def main(): 385 parser = argparse.ArgumentParser(description="typeahead smoke tests") 386 parser.add_argument("--url", required=True, help="typeahead service URL") 387 parser.add_argument("--secret", default=os.environ.get("TYPEAHEAD_SECRET", ""), 388 help="admin secret (or set TYPEAHEAD_SECRET env var)") 389 parser.add_argument("--compare", action="store_true", help="compare results vs public.api.bsky.app") 390 parser.add_argument( 391 "--queries", 392 nargs="+", 393 default=["nate", "zzstoatzz", "paul", "dan", "sky"], 394 help="queries for comparison test", 395 ) 396 args = parser.parse_args() 397 398 print(f"typeahead: {args.url}") 399 400 test_response_shape(args.url) 401 test_known_handle(args.url) 402 test_prefix_match(args.url) 403 test_cors(args.url) 404 test_deprecated_param(args.url) 405 test_limit_bounds(args.url) 406 test_empty_query(args.url) 407 test_limit_over_max(args.url) 408 test_stats_page(args.url) 409 test_request_indexing(args.url) 410 test_moderation_filtering(args.url) 411 test_link_visible(args.url) 412 413 if args.secret: 414 test_mod_overrides(args.url, args.secret) 415 else: 416 print(f"\n--- mod_overrides admin API ---") 417 print(f" [{SKIP}] skipped (use --secret or set TYPEAHEAD_SECRET)") 418 419 if args.compare: 420 test_comparison(args.url, args.queries) 421 else: 422 print(f"\n--- comparison ---") 423 print(f" [{SKIP}] skipped (use --compare)") 424 425 print() 426 if failures == 0: 427 print("all checks passed.") 428 else: 429 print(f"{failures} check(s) failed.") 430 return 1 if failures else 0 431 432 433if __name__ == "__main__": 434 sys.exit(main())