GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 329 lines 12 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7smoke tests for typeahead service. 8 9verifies response shape, search behavior, CORS, and optional 10comparison against public.api.bsky.app. 11 12usage: 13 ./scripts/smoke.py --url https://typeahead.waow.tech 14 ./scripts/smoke.py --url http://localhost:8787 15 ./scripts/smoke.py --url https://typeahead.waow.tech --compare 16""" 17 18import argparse 19import json 20import sys 21import urllib.request 22import urllib.error 23 24PASS = "\033[32mpass\033[0m" 25FAIL = "\033[31mFAIL\033[0m" 26SKIP = "\033[33mskip\033[0m" 27 28failures = 0 29 30BSKY_PUBLIC = "https://public.api.bsky.app" 31XRPC_PATH = "/xrpc/app.bsky.actor.searchActorsTypeahead" 32 33 34def check(name: str, ok: bool, detail: str = ""): 35 global failures 36 tag = PASS if ok else FAIL 37 msg = f" [{tag}] {name}" 38 if detail: 39 msg += f" ({detail})" 40 print(msg) 41 if not ok: 42 failures += 1 43 return ok 44 45 46def fetch(url: str, timeout: int = 15, method: str = "GET") -> tuple[dict | None, dict]: 47 """fetch JSON + response headers. returns (body, headers).""" 48 try: 49 req = urllib.request.Request(url, headers={"User-Agent": "typeahead-smoke/1.0"}, method=method) 50 with urllib.request.urlopen(req, timeout=timeout) as resp: 51 headers = {k.lower(): v for k, v in resp.headers.items()} 52 return json.loads(resp.read()), headers 53 except urllib.error.HTTPError as e: 54 return {"_http_error": e.code}, {} 55 except Exception as e: 56 return {"_error": str(e)}, {} 57 58 59def test_response_shape(base_url: str): 60 print("\n--- response shape ---") 61 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=3") 62 check("returns valid JSON", data is not None and "_error" not in data) 63 if not data or "_error" in data or "_http_error" in data: 64 return 65 66 check("has actors array", isinstance(data.get("actors"), list)) 67 actors = data.get("actors", []) 68 if actors: 69 a = actors[0] 70 check("actor has did", "did" in a) 71 check("actor has handle", "handle" in a) 72 check("did starts with did:", a.get("did", "").startswith("did:")) 73 74 75def test_known_handle(base_url: str): 76 print("\n--- known handle ---") 77 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzstoatzz&limit=10") 78 if not data or "_error" in data or "_http_error" in data: 79 check("fetch succeeded", False) 80 return 81 82 actors = data.get("actors", []) 83 handles = [a.get("handle", "") for a in actors] 84 check("zzstoatzz.io in results", "zzstoatzz.io" in handles, f"got {handles[:5]}") 85 86 87def test_prefix_match(base_url: str): 88 print("\n--- prefix match ---") 89 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=zzst&limit=10") 90 if not data or "_error" in data or "_http_error" in data: 91 check("fetch succeeded", False) 92 return 93 94 actors = data.get("actors", []) 95 handles = [a.get("handle", "") for a in actors] 96 check("prefix 'zzst' finds zzstoatzz.io", "zzstoatzz.io" in handles, f"got {handles[:5]}") 97 98 99def test_cors(base_url: str): 100 print("\n--- CORS headers ---") 101 _, headers = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=1") 102 origin = headers.get("access-control-allow-origin", "") 103 check("Access-Control-Allow-Origin: *", origin == "*", f"got '{origin}'") 104 105 106def test_deprecated_param(base_url: str): 107 print("\n--- deprecated param ---") 108 data_q, _ = fetch(f"{base_url}{XRPC_PATH}?q=nate&limit=5") 109 data_term, _ = fetch(f"{base_url}{XRPC_PATH}?term=nate&limit=5") 110 111 if not data_q or "_error" in data_q or not data_term or "_error" in data_term: 112 check("both params work", False) 113 return 114 115 actors_q = {a.get("did") for a in data_q.get("actors", [])} 116 actors_term = {a.get("did") for a in data_term.get("actors", [])} 117 check("?term= returns same as ?q=", actors_q == actors_term, f"q={len(actors_q)}, term={len(actors_term)}") 118 119 120def test_limit_bounds(base_url: str): 121 print("\n--- limit bounds ---") 122 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=a&limit=3") 123 if not data or "_error" in data or "_http_error" in data: 124 check("fetch succeeded", False) 125 return 126 127 actors = data.get("actors", []) 128 check("limit=3 returns ≤3", len(actors) <= 3, f"got {len(actors)}") 129 130 131def test_empty_query(base_url: str): 132 print("\n--- empty query ---") 133 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=") 134 check("empty query returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 135 136 137def test_limit_over_max(base_url: str): 138 print("\n--- limit > 100 ---") 139 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=200") 140 check("limit>100 returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 141 142 143def test_stats_page(base_url: str): 144 print("\n--- stats page ---") 145 try: 146 req = urllib.request.Request( 147 f"{base_url}/stats", 148 headers={"User-Agent": "typeahead-smoke/1.0"}, 149 ) 150 with urllib.request.urlopen(req, timeout=15) as resp: 151 ct = resp.headers.get("Content-Type", "") 152 body = resp.read().decode() 153 check("stats returns 200", resp.status == 200) 154 check("stats content-type is html", "text/html" in ct, f"got '{ct}'") 155 check("stats contains actors indexed", "actors indexed" in body) 156 check("stats contains sparkline heading", "searches / 5 min" in body) 157 check("stats has home link", 'href="/"' in body) 158 check("stats shows hidden count", "hidden by moderation" in body) 159 except urllib.error.HTTPError as e: 160 check("stats returns 200", False, f"got {e.code}") 161 except Exception as e: 162 check("stats fetch succeeded", False, str(e)) 163 164 165def test_request_indexing(base_url: str): 166 print("\n--- request indexing ---") 167 # GET should redirect to homepage (302) 168 try: 169 req = urllib.request.Request( 170 f"{base_url}/request-indexing?handle=test", 171 headers={"User-Agent": "typeahead-smoke/1.0"}, 172 ) 173 opener = urllib.request.build_opener(urllib.request.HTTPHandler) 174 # don't follow redirects 175 class NoRedirect(urllib.request.HTTPRedirectHandler): 176 def redirect_request(self, req, fp, code, msg, headers, newurl): 177 raise urllib.error.HTTPError(newurl, code, msg, headers, fp) 178 opener = urllib.request.build_opener(NoRedirect) 179 opener.open(req, timeout=15) 180 check("GET redirects (302)", False, "no redirect") 181 except urllib.error.HTTPError as e: 182 check("GET redirects (302)", e.code == 302, f"got {e.code}") 183 184 # POST with empty handle should return 400 185 data, _ = fetch(f"{base_url}/request-indexing", method="POST") 186 check("POST empty returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 187 188 # POST with valid handle should return JSON with handle/did/hidden 189 data, _ = fetch(f"{base_url}/request-indexing?handle=zzstoatzz.io", method="POST") 190 if not data or "_error" in data or "_http_error" in data: 191 check("POST valid handle", False, f"got {data}") 192 else: 193 check("POST returns handle", data.get("handle") == "zzstoatzz.io", f"got {data.get('handle')}") 194 check("POST returns did", data.get("did", "").startswith("did:"), f"got {data.get('did')}") 195 check("POST returns hidden field", "hidden" in data, f"keys: {list(data.keys())}") 196 197 198def test_moderation_filtering(base_url: str): 199 print("\n--- moderation filtering ---") 200 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=10") 201 if not data or "_error" in data or "_http_error" in data: 202 check("fetch succeeded", False) 203 return 204 205 actors = data.get("actors", []) 206 207 # labels, createdAt, associated are intentional fields returned for client rendering 208 allowed_keys = {"did", "handle", "displayName", "avatar", "labels", "createdAt", "associated"} 209 extra_keys: set[str] = set() 210 for a in actors: 211 extra_keys |= set(a.keys()) - allowed_keys 212 check("actor objects have expected shape", len(extra_keys) == 0, f"extra keys: {extra_keys}" if extra_keys else "") 213 214 # verify labels is always an array when present 215 for a in actors: 216 if "labels" in a: 217 check("labels is array", isinstance(a["labels"], list), f"got {type(a['labels']).__name__}") 218 break 219 220 # !no-unauthenticated actors should be VISIBLE (it's about content, not identity) 221 print("\n--- !no-unauthenticated inclusion ---") 222 found = find_noauth_actor(base_url) 223 if not found: 224 check("found a !no-unauthenticated actor to verify", False, "couldn't find one — skipping") 225 226 227def find_noauth_actor(base_url: str) -> bool: 228 """find an actor with !no-unauthenticated and verify they ARE included in our results.""" 229 for q in ["alex", "sam", "chris", "jordan"]: 230 bsky_data, _ = fetch(f"{BSKY_PUBLIC}/xrpc/app.bsky.actor.searchActors?q={q}&limit=25") 231 if not bsky_data or "_error" in bsky_data: 232 continue 233 234 for actor in bsky_data.get("actors", []): 235 labels = actor.get("labels", []) 236 has_noauth = any( 237 l.get("val") == "!no-unauthenticated" and not l.get("neg") 238 for l in labels 239 ) 240 if not has_noauth: 241 continue 242 243 handle = actor.get("handle", "") 244 if not handle: 245 continue 246 247 # this actor has !no-unauthenticated — check they ARE in our results 248 our_data, _ = fetch(f"{base_url}{XRPC_PATH}?q={handle}&limit=10") 249 if not our_data or "_error" in our_data: 250 continue 251 252 our_handles = {a.get("handle") for a in our_data.get("actors", [])} 253 if handle in our_handles: 254 check(f"!no-unauthenticated actor @{handle} visible in search", True) 255 return True 256 else: 257 check(f"!no-unauthenticated actor @{handle} visible in search", False, "not found in results") 258 return True 259 260 return False 261 262 263def test_comparison(base_url: str, queries: list[str]): 264 print("\n--- comparison vs public.api.bsky.app ---") 265 266 for q in queries: 267 sys.stdout.write(f" comparing '{q}'...") 268 sys.stdout.flush() 269 270 ours, _ = fetch(f"{base_url}{XRPC_PATH}?q={q}&limit=10") 271 theirs, _ = fetch(f"{BSKY_PUBLIC}{XRPC_PATH}?q={q}&limit=10") 272 273 if not ours or "_error" in ours or not theirs or "_error" in theirs: 274 sys.stdout.write(f"\r [{FAIL}] '{q}': fetch failed\n") 275 continue 276 277 our_handles = {a.get("handle") for a in ours.get("actors", [])} 278 their_handles = {a.get("handle") for a in theirs.get("actors", [])} 279 280 overlap = our_handles & their_handles 281 pct = (len(overlap) / len(their_handles) * 100) if their_handles else 0 282 sys.stdout.write( 283 f"\r [{PASS}] '{q}': {len(overlap)}/{len(their_handles)} overlap ({pct:.0f}%)" 284 f" — ours={len(our_handles)}, theirs={len(their_handles)}\n" 285 ) 286 287 288def main(): 289 parser = argparse.ArgumentParser(description="typeahead smoke tests") 290 parser.add_argument("--url", required=True, help="typeahead service URL") 291 parser.add_argument("--compare", action="store_true", help="compare results vs public.api.bsky.app") 292 parser.add_argument( 293 "--queries", 294 nargs="+", 295 default=["nate", "zzstoatzz", "paul", "dan", "sky"], 296 help="queries for comparison test", 297 ) 298 args = parser.parse_args() 299 300 print(f"typeahead: {args.url}") 301 302 test_response_shape(args.url) 303 test_known_handle(args.url) 304 test_prefix_match(args.url) 305 test_cors(args.url) 306 test_deprecated_param(args.url) 307 test_limit_bounds(args.url) 308 test_empty_query(args.url) 309 test_limit_over_max(args.url) 310 test_stats_page(args.url) 311 test_request_indexing(args.url) 312 test_moderation_filtering(args.url) 313 314 if args.compare: 315 test_comparison(args.url, args.queries) 316 else: 317 print(f"\n--- comparison ---") 318 print(f" [{SKIP}] skipped (use --compare)") 319 320 print() 321 if failures == 0: 322 print("all checks passed.") 323 else: 324 print(f"{failures} check(s) failed.") 325 return 1 if failures else 0 326 327 328if __name__ == "__main__": 329 sys.exit(main())