GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

exclude bench traffic from stats, update arch docs

add X-Client: bench header to bench.py, filter from pie chart query.
update architecture.md with two-phase search strategy docs.
add justfile, targeted-backfill script, gitignore cleanup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+929 -13
+1
.gitignore
··· 4 4 zig-out/ 5 5 zig-cache/ 6 6 .dev.vars 7 + scripts/bench-results.json
+76 -8
docs/architecture.md
··· 14 14 | two-tier writes: 15 15 | - bare DID: INSERT OR IGNORE (0 Turso writes for known actors) 16 16 | - profile/identity event: full UPSERT with COALESCE 17 + | dual-write to local SQLite (profile/identity events) 18 + | 19 + +---> worker (cloudflare) ---> Turso (source of truth) 20 + | ---> KV (cursor, mod_cursor, enrich_lock) 21 + | 22 + +---> local SQLite replica (/data/local.db on fly volume) 23 + synced from Turso on startup + every 5 min 24 + serves search queries directly (FTS5) 25 + ``` 26 + 27 + ### search read path 28 + 29 + ``` 30 + search request 31 + | 17 32 v 18 - worker (cloudflare) 33 + CF worker ---> cache API (60s edge cache, hit?) 34 + | | 35 + | miss | hit → return cached 36 + v 37 + fly ingester /search (local SQLite, 50-150ms cold) 19 38 | 20 - +---> Turso (actors table + FTS5 index) 21 - +---> KV (cursor, mod_cursor, enrich_lock) 22 - +---> cache API (60s edge cache for search) 39 + | timeout/error (3s) 40 + v 41 + Turso fallback (CF worker queries Turso directly) 23 42 ``` 24 43 44 + the ingester's `/search` uses a 3-tier ranking strategy (matching the 45 + worker's Turso fallback): 46 + 47 + 1. **exact handle** — `WHERE handle = ? COLLATE NOCASE`, instant via index 48 + 2. **handle prefix** (two-phase) — phase 1: `WHERE handle LIKE ? LIMIT N` 49 + with no `ORDER BY` (index-friendly, early termination), overfetch 5x 50 + into a stack buffer. phase 2: insertion-sort candidates by handle length 51 + in zig, then point-lookup by DID for full rows. restores the worker's 52 + shortest-handle-first ranking without the pathological full-scan sort 53 + that `ORDER BY length(handle)` causes on broad prefixes. 54 + 3. **FTS5 prefix** (two-phase, skipped if tiers 1+2 fill the limit or 55 + `term.len < 2`) — phase 1: `SELECT did FROM actors_fts WHERE MATCH ? 56 + ORDER BY rank LIMIT N` (pure FTS5, no JOIN — enables rank optimization 57 + with priority queue + early termination). phase 2: point-lookup each 58 + candidate DID on the actors table, filter `hidden = 0` and 59 + `handle != ''`. 60 + 61 + the two-phase approach avoids two pathological query shapes: 62 + - tier 2: `ORDER BY length(handle)` forces SQLite to scan all LIKE matches 63 + before sorting — catastrophic for short prefixes like `a%` (100k+ rows) 64 + - tier 3: `JOIN actors ON actors.did = actors_fts.did` prevents FTS5's 65 + native rank optimization — SQLite must scan all FTS matches, JOIN each, 66 + filter, sort, then take top N 67 + 68 + **caveat**: the ingester has a single `read_conn` shared across the HTTP 69 + thread pool. one slow query blocks all concurrent reads. the two-phase 70 + strategy bounds individual query time, but read concurrency isolation 71 + (connection pool or per-request connections) is a longer-term fix. 72 + 25 73 ### write paths 26 74 27 75 1. **ingester**: streams jetstream (profiles, posts, likes, follows), buffers ··· 70 118 reconciliation anchors. the trend chart stitches snapshots + deltas for 71 119 sub-hourly resolution between anchor points. 72 120 - **traffic sources**: cumulative hit counters per client identity (X-Client > 73 - Origin > Referer). loopback/localhost normalized to "unknown". 121 + Origin > Referer). loopback/localhost normalized to "unknown". internal 122 + traffic (bench script) identified via `X-Client: bench` and excluded from 123 + the stats page pie chart. 74 124 75 125 ### enrichment convergence 76 126 ··· 89 139 avatar, displayName, labels, createdAt, associated, and hidden in one 90 140 batch call. 91 141 92 - ### read path 142 + ### local replica sync 93 143 94 - search query -> cache API (hit?) -> FTS5 prefix match -> reconstruct avatar 95 - URLs from DID + CID -> return `{did, handle, displayName?, avatar?, associated?, labels, createdAt?}` 144 + the ingester maintains a local SQLite replica on a fly.io persistent volume, 145 + synced from Turso. search queries hit this replica directly — no round-trip 146 + to Turso for reads. 147 + 148 + - **bootstrap** (first run or wipe): stage-and-swap — loads all actors into 149 + staging tables with no indexes, builds indexes in bulk (`CREATE INDEX` on 150 + unindexed data), populates FTS5, then atomically swaps staging tables into 151 + place via `ALTER TABLE RENAME`. temp files pinned to `/data/tmp` on the 152 + volume. read connection closed during bootstrap to avoid WAL interference. 153 + - **re-sync** (restart with existing data): keeps serving while re-syncing 154 + from Turso. updates the live indexed table directly — index maintenance 155 + cost is negligible for incremental changes. 156 + - **incremental** (every 5 min): keyset pagination on `(updated_at, did)`, 157 + tombstone + unsearchable cleanup. only advances watermark when all 158 + queries succeed. 159 + - **dual-write**: profile and identity events from jetstream are written 160 + directly to the local replica for immediate search availability (when 161 + `ready=true`). 96 162 97 163 ## storage 98 164 ··· 148 214 149 215 ## scripts 150 216 217 + - `scripts/bench.py` — latency, coverage, and stress benchmarks against prod 218 + (sends `X-Client: bench` to avoid polluting traffic stats) 151 219 - `scripts/smoke.py` — end-to-end smoke tests against a live deployment 152 220 - `scripts/backfill-moderation.py` — one-shot sweep to set hidden flags on 153 221 existing actors (run once after adding moderation support)
+42
justfile
··· 1 + set dotenv-filename := ".dev.vars" 2 + 3 + # deploy worker to cloudflare 4 + deploy: 5 + npx wrangler deploy 6 + 7 + # cleanup dead actors in batches with pauses between 8 + cleanup batch_size="500" batches="10" pause="30": 9 + #!/usr/bin/env bash 10 + set -euo pipefail 11 + total_deleted=0 12 + total_updated=0 13 + total_checked=0 14 + last_rowid=0 15 + for i in $(seq 1 {{batches}}); do 16 + echo "=== batch $i/{{batches}} (size={{batch_size}}, starting at rowid $last_rowid) ===" 17 + output=$(./scripts/cleanup-dead-actors.py --limit {{batch_size}} --start-rowid "$last_rowid" 2>&1) 18 + echo "$output" 19 + # extract last rowid from output 20 + new_rowid=$(echo "$output" | grep -o 'rowid=[0-9]*' | tail -1 | cut -d= -f2) 21 + if [ -n "$new_rowid" ]; then 22 + last_rowid=$new_rowid 23 + fi 24 + # extract stats from final line 25 + last=$(echo "$output" | tail -1) 26 + d=$(echo "$last" | sed -n 's/.*deleted=\([0-9,]*\).*/\1/p' | tr -d ,) 27 + u=$(echo "$last" | sed -n 's/.*updated=\([0-9,]*\).*/\1/p' | tr -d ,) 28 + c=$(echo "$last" | sed -n 's/.*checked=\([0-9,]*\).*/\1/p' | tr -d ,) 29 + total_deleted=$((total_deleted + ${d:-0})) 30 + total_updated=$((total_updated + ${u:-0})) 31 + total_checked=$((total_checked + ${c:-0})) 32 + if [ "$i" -lt "{{batches}}" ]; then 33 + echo "--- pausing {{pause}}s before next batch ---" 34 + sleep {{pause}} 35 + fi 36 + done 37 + echo "" 38 + echo "=== done: checked=$total_checked deleted=$total_deleted updated=$total_updated ===" 39 + 40 + # dry-run cleanup (no writes) 41 + cleanup-dry batch_size="500": 42 + ./scripts/cleanup-dead-actors.py --dry-run --limit {{batch_size}}
+599
scripts/bench.py
··· 1 + #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx"] 5 + # /// 6 + """ 7 + performance benchmark: typeahead vs bluesky searchActorsTypeahead. 8 + 9 + measures latency (cold + warm), coverage/overlap, field completeness, 10 + display-name search, and stress-tests our API under concurrent load. 11 + 12 + usage: 13 + ./scripts/bench.py # full benchmark against prod 14 + ./scripts/bench.py --url http://localhost:8787 # test local dev 15 + ./scripts/bench.py --quick # 10 queries, 1 run 16 + ./scripts/bench.py --no-stress # skip stress test 17 + ./scripts/bench.py --queries nate boorkie # specific queries only 18 + ./scripts/bench.py --runs 5 # more runs for confidence 19 + """ 20 + 21 + import argparse 22 + import asyncio 23 + import json 24 + import statistics 25 + import sys 26 + import time 27 + from dataclasses import dataclass, field, asdict 28 + from datetime import datetime, timezone 29 + from pathlib import Path 30 + from urllib.parse import quote 31 + 32 + import httpx 33 + 34 + OURS_DEFAULT = "https://typeahead.waow.tech" 35 + BSKY = "https://public.api.bsky.app" 36 + XRPC = "/xrpc/app.bsky.actor.searchActorsTypeahead" 37 + 38 + # colors 39 + BOLD = "\033[1m" 40 + GREEN = "\033[32m" 41 + RED = "\033[31m" 42 + YELLOW = "\033[33m" 43 + CYAN = "\033[36m" 44 + RESET = "\033[0m" 45 + 46 + FULL_CORPUS = [ 47 + # short prefixes 48 + "a", "na", "sky", "bl", "j", 49 + # common names 50 + "nate", "sarah", "alex", "dan", "paul", "sam", "chris", "jordan", "mike", "anna", 51 + # display-name-only terms (no matching handle expected) 52 + "boorkie", "kohari", 53 + # specific handles 54 + "zzstoatzz", "pfrazee", "jay.bsky.team", "alice", "bob", 55 + # unicode 56 + "André", "naïve", "café", 57 + # multi-word display names 58 + "nate kohari", "paul frazee", 59 + # edge cases 60 + "nate.io", "the", "test", "bot", "news", "art", "dev", "music", 61 + # longer/rarer 62 + "photographer", "designer", "engineer", "journalist", 63 + # more diversity 64 + "tokyo", "berlin", "podcast", "crypto", "gaming", 65 + ] 66 + 67 + QUICK_CORPUS = [ 68 + "nate", "zzstoatzz", "paul", "boorkie", "sky", 69 + "sarah", "André", "nate kohari", "a", "dev", 70 + ] 71 + 72 + DISPLAY_NAME_QUERIES = [ 73 + "boorkie", "kohari", "nate kohari", "paul frazee", 74 + ] 75 + 76 + FIELDS_TO_CHECK = ["displayName", "avatar", "createdAt", "associated"] 77 + 78 + 79 + @dataclass 80 + class LatencyStats: 81 + query: str 82 + ours_cold_ms: list[float] = field(default_factory=list) 83 + ours_warm_ms: list[float] = field(default_factory=list) 84 + bsky_ms: list[float] = field(default_factory=list) 85 + 86 + def _summarize(self, ms: list[float]) -> dict: 87 + if not ms: 88 + return {} 89 + s = sorted(ms) 90 + return { 91 + "min": round(min(s), 1), 92 + "max": round(max(s), 1), 93 + "mean": round(statistics.mean(s), 1), 94 + "p50": round(s[len(s) // 2], 1), 95 + "p95": round(s[int(len(s) * 0.95)], 1) if len(s) >= 2 else round(max(s), 1), 96 + } 97 + 98 + def summarize(self, side: str) -> dict: 99 + if side == "ours_cold": 100 + return self._summarize(self.ours_cold_ms) 101 + if side == "ours_warm": 102 + return self._summarize(self.ours_warm_ms) 103 + return self._summarize(self.bsky_ms) 104 + 105 + 106 + @dataclass 107 + class CoverageResult: 108 + query: str 109 + ours_actors: list[dict] 110 + bsky_actors: list[dict] 111 + ours_dids: list[str] 112 + bsky_dids: list[str] 113 + overlap: list[str] 114 + ours_extras: list[str] 115 + bsky_extras: list[str] 116 + rank_deltas: list[int] 117 + 118 + 119 + @dataclass 120 + class StressResult: 121 + concurrency: int 122 + total: int 123 + ok: int 124 + rate_limited: int 125 + errors: int 126 + latencies_ms: list[float] = field(default_factory=list) 127 + 128 + 129 + def progress(label: str, done: int, total: int): 130 + sys.stdout.write(f"\r {label}: {done}/{total}") 131 + sys.stdout.flush() 132 + 133 + 134 + def clear_line(): 135 + sys.stdout.write("\r" + " " * 70 + "\r") 136 + 137 + 138 + async def timed_fetch( 139 + client: httpx.AsyncClient, url: str, timeout: float = 30.0 140 + ) -> tuple[dict | None, float, int]: 141 + """fetch JSON, return (body, latency_ms, status_code).""" 142 + t0 = time.monotonic() 143 + try: 144 + r = await client.get(url, timeout=timeout) 145 + ms = (time.monotonic() - t0) * 1000 146 + if r.status_code == 200: 147 + return r.json(), ms, r.status_code 148 + return None, ms, r.status_code 149 + except Exception: 150 + ms = (time.monotonic() - t0) * 1000 151 + return None, ms, 0 152 + 153 + 154 + def search_url(base: str, q: str, limit: int = 10) -> str: 155 + return f"{base}{XRPC}?q={quote(q)}&limit={limit}" 156 + 157 + 158 + async def run_latency( 159 + client: httpx.AsyncClient, 160 + ours_url: str, 161 + corpus: list[str], 162 + runs: int, 163 + ) -> list[LatencyStats]: 164 + """sequential latency: cold (cache-busted) + warm (cached) + bsky baseline.""" 165 + results = [] 166 + # cold: runs * 2 reqs/query (ours+bsky), warm: 2 reqs/query (ours+bsky) 167 + total = len(corpus) * (runs * 2 + 2) 168 + done = 0 169 + 170 + for q in corpus: 171 + stats = LatencyStats(query=q) 172 + 173 + # cold runs: vary limit to bust CF cache 174 + for run_i in range(runs): 175 + limit = 8 + (run_i % 3) 176 + 177 + _, ms, status = await timed_fetch(client, search_url(ours_url, q, limit)) 178 + if status == 200: 179 + stats.ours_cold_ms.append(ms) 180 + done += 1 181 + progress("latency", done, total) 182 + await asyncio.sleep(1.05) 183 + 184 + _, ms, status = await timed_fetch(client, search_url(BSKY, q, limit)) 185 + if status == 200: 186 + stats.bsky_ms.append(ms) 187 + done += 1 188 + progress("latency", done, total) 189 + await asyncio.sleep(0.2) 190 + 191 + # warm run: repeat exact same request (limit=10, should hit CF cache) 192 + _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10)) 193 + done += 1 194 + progress("latency", done, total) 195 + await asyncio.sleep(1.05) 196 + # second hit — this one should be cached 197 + _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10)) 198 + if status == 200: 199 + stats.ours_warm_ms.append(ms) 200 + done += 1 201 + progress("latency", done, total) 202 + await asyncio.sleep(1.05) 203 + 204 + results.append(stats) 205 + 206 + clear_line() 207 + return results 208 + 209 + 210 + async def run_coverage_and_fields( 211 + client: httpx.AsyncClient, 212 + ours_url: str, 213 + corpus: list[str], 214 + ) -> tuple[list[CoverageResult], dict]: 215 + """compare result sets at limit=10; also compute field completeness.""" 216 + coverage_results = [] 217 + 218 + # aggregate field counts 219 + ours_field_counts = {f: 0 for f in FIELDS_TO_CHECK} 220 + bsky_field_counts = {f: 0 for f in FIELDS_TO_CHECK} 221 + ours_actor_total = 0 222 + bsky_actor_total = 0 223 + 224 + for i, q in enumerate(corpus): 225 + progress("coverage+fields", i + 1, len(corpus)) 226 + 227 + ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q)) 228 + await asyncio.sleep(1.05) 229 + bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q)) 230 + await asyncio.sleep(0.2) 231 + 232 + ours_actors = (ours_data or {}).get("actors", []) 233 + bsky_actors = (bsky_data or {}).get("actors", []) 234 + 235 + ours_dids = [a["did"] for a in ours_actors] 236 + bsky_dids = [a["did"] for a in bsky_actors] 237 + 238 + ours_set = set(ours_dids) 239 + bsky_set = set(bsky_dids) 240 + overlap = list(ours_set & bsky_set) 241 + 242 + ours_pos = {d: i for i, d in enumerate(ours_dids)} 243 + bsky_pos = {d: i for i, d in enumerate(bsky_dids)} 244 + rank_deltas = [abs(ours_pos[d] - bsky_pos[d]) for d in overlap if d in ours_pos and d in bsky_pos] 245 + 246 + coverage_results.append(CoverageResult( 247 + query=q, 248 + ours_actors=ours_actors, 249 + bsky_actors=bsky_actors, 250 + ours_dids=ours_dids, 251 + bsky_dids=bsky_dids, 252 + overlap=overlap, 253 + ours_extras=list(ours_set - bsky_set), 254 + bsky_extras=list(bsky_set - ours_set), 255 + rank_deltas=rank_deltas, 256 + )) 257 + 258 + # field completeness 259 + ours_actor_total += len(ours_actors) 260 + bsky_actor_total += len(bsky_actors) 261 + for f in FIELDS_TO_CHECK: 262 + ours_field_counts[f] += sum(1 for a in ours_actors if a.get(f)) 263 + bsky_field_counts[f] += sum(1 for a in bsky_actors if a.get(f)) 264 + 265 + clear_line() 266 + 267 + field_summary = { 268 + "ours_total": ours_actor_total, 269 + "bsky_total": bsky_actor_total, 270 + "ours": ours_field_counts, 271 + "bsky": bsky_field_counts, 272 + } 273 + return coverage_results, field_summary 274 + 275 + 276 + async def run_display_name_check( 277 + client: httpx.AsyncClient, 278 + ours_url: str, 279 + queries: list[str], 280 + ) -> list[dict]: 281 + """verify display-name-only queries return results.""" 282 + results = [] 283 + for q in queries: 284 + ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q)) 285 + await asyncio.sleep(1.05) 286 + bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q)) 287 + await asyncio.sleep(0.2) 288 + 289 + ours_actors = (ours_data or {}).get("actors", []) 290 + bsky_actors = (bsky_data or {}).get("actors", []) 291 + results.append({ 292 + "query": q, 293 + "ours_count": len(ours_actors), 294 + "bsky_count": len(bsky_actors), 295 + "found": len(ours_actors) > 0, 296 + "ours_sample": [a.get("handle", a.get("did", "?")) for a in ours_actors[:3]], 297 + "bsky_sample": [a.get("handle", a.get("did", "?")) for a in bsky_actors[:3]], 298 + }) 299 + 300 + return results 301 + 302 + 303 + async def run_stress( 304 + client: httpx.AsyncClient, 305 + ours_url: str, 306 + corpus: list[str], 307 + levels: list[int], 308 + ) -> list[StressResult]: 309 + """concurrent request stress test (our API only).""" 310 + results = [] 311 + 312 + for n in levels: 313 + sys.stdout.write(f"\r stress: concurrency={n}...") 314 + sys.stdout.flush() 315 + 316 + queries = (corpus * ((n // len(corpus)) + 1))[:n] 317 + tasks = [ 318 + timed_fetch(client, search_url(ours_url, q, limit=7 + (i % 4))) 319 + for i, q in enumerate(queries) 320 + ] 321 + 322 + responses = await asyncio.gather(*tasks) 323 + 324 + sr = StressResult(concurrency=n, total=n, ok=0, rate_limited=0, errors=0) 325 + for body, ms, status in responses: 326 + sr.latencies_ms.append(ms) 327 + if status == 200: 328 + sr.ok += 1 329 + elif status == 429: 330 + sr.rate_limited += 1 331 + else: 332 + sr.errors += 1 333 + 334 + results.append(sr) 335 + await asyncio.sleep(5) 336 + 337 + clear_line() 338 + return results 339 + 340 + 341 + # ── printing ──────────────────────────────────────────────────────── 342 + 343 + def pct(n: int, total: int) -> str: 344 + return f"{n * 100 / total:.0f}%" if total else "n/a" 345 + 346 + 347 + def fmt_ms(ms: float) -> str: 348 + if ms >= 1000: 349 + return f"{ms / 1000:.1f}s" 350 + return f"{ms:.0f}ms" 351 + 352 + 353 + def print_latency_table(stats_list: list[LatencyStats]): 354 + print(f"\n{BOLD}--- latency (cold, cache-busted) ---{RESET}") 355 + header = f" {'query':<20} {'ours':>10} {'bsky':>10} {'delta':>10} {'winner':>8}" 356 + print(header) 357 + print(f" {'─' * 20} {'─' * 10} {'─' * 10} {'─' * 10} {'─' * 8}") 358 + 359 + all_ours_cold = [] 360 + all_bsky = [] 361 + ours_wins = 0 362 + total_compared = 0 363 + 364 + for s in stats_list: 365 + oc = s.summarize("ours_cold") 366 + b = s.summarize("bsky") 367 + if not oc or not b: 368 + continue 369 + 370 + all_ours_cold.extend(s.ours_cold_ms) 371 + all_bsky.extend(s.bsky_ms) 372 + 373 + op50, bp50 = oc["p50"], b["p50"] 374 + delta = op50 - bp50 375 + total_compared += 1 376 + if delta < 0: 377 + ours_wins += 1 378 + w_str = f"{GREEN}ours{RESET}" 379 + elif delta > 0: 380 + w_str = f"{RED}bsky{RESET}" 381 + else: 382 + w_str = "tie" 383 + 384 + d_str = f"{'+' if delta > 0 else ''}{fmt_ms(abs(delta)) if delta >= 0 else '-' + fmt_ms(abs(delta))}" 385 + print(f" {s.query:<20} {fmt_ms(op50):>10} {fmt_ms(bp50):>10} {d_str:>10} {w_str:>17}") 386 + 387 + if all_ours_cold and all_bsky: 388 + oc50 = sorted(all_ours_cold)[len(all_ours_cold) // 2] 389 + oc95 = sorted(all_ours_cold)[int(len(all_ours_cold) * 0.95)] 390 + b50 = sorted(all_bsky)[len(all_bsky) // 2] 391 + b95 = sorted(all_bsky)[int(len(all_bsky) * 0.95)] 392 + print() 393 + print(f" {BOLD}cold:{RESET} ours p50={fmt_ms(oc50)} p95={fmt_ms(oc95)} | bsky p50={fmt_ms(b50)} p95={fmt_ms(b95)}") 394 + print(f" ours faster on {ours_wins}/{total_compared} queries (cold)") 395 + 396 + # warm summary 397 + all_warm = [] 398 + for s in stats_list: 399 + all_warm.extend(s.ours_warm_ms) 400 + if all_warm: 401 + w50 = sorted(all_warm)[len(all_warm) // 2] 402 + w95 = sorted(all_warm)[int(len(all_warm) * 0.95)] if len(all_warm) >= 2 else max(all_warm) 403 + print(f" {BOLD}warm:{RESET} ours p50={fmt_ms(w50)} p95={fmt_ms(w95)} (CF cache hit)") 404 + 405 + 406 + def print_coverage_table(results: list[CoverageResult]): 407 + print(f"\n{BOLD}--- coverage ---{RESET}") 408 + print(f" {'query':<20} {'overlap':>10} {'ours':>6} {'bsky':>6} {'pct':>6} {'rank Δ':>8}") 409 + print(f" {'─' * 20} {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 6} {'─' * 8}") 410 + 411 + total_overlap = 0 412 + total_bsky = 0 413 + complete_misses = 0 414 + we_have_more = 0 415 + 416 + for r in results: 417 + n_overlap = len(r.overlap) 418 + n_bsky = len(r.bsky_dids) 419 + n_ours = len(r.ours_dids) 420 + total_overlap += n_overlap 421 + total_bsky += n_bsky 422 + 423 + p = f"{n_overlap * 100 // n_bsky}%" if n_bsky else "n/a" 424 + avg_delta = f"{statistics.mean(r.rank_deltas):.1f}" if r.rank_deltas else "—" 425 + 426 + if n_ours == 0 and n_bsky > 0: 427 + complete_misses += 1 428 + if n_ours > n_bsky: 429 + we_have_more += 1 430 + 431 + print(f" {r.query:<20} {n_overlap:>3}/{n_bsky:<6} {n_ours:>6} {n_bsky:>6} {p:>6} {avg_delta:>8}") 432 + 433 + print() 434 + mean_pct = total_overlap * 100 / total_bsky if total_bsky else 0 435 + print(f" {BOLD}mean overlap:{RESET} {mean_pct:.0f}%") 436 + print(f" complete misses (ours=0, bsky>0): {complete_misses}/{len(results)}") 437 + print(f" queries where we have more results: {we_have_more}/{len(results)}") 438 + 439 + 440 + def print_field_table(field_summary: dict): 441 + print(f"\n{BOLD}--- field completeness ---{RESET}") 442 + ours_total = field_summary["ours_total"] 443 + bsky_total = field_summary["bsky_total"] 444 + 445 + print(f" {'field':<16} {'ours':>8} {'bsky':>8}") 446 + print(f" {'─' * 16} {'─' * 8} {'─' * 8}") 447 + for f in FIELDS_TO_CHECK: 448 + o = pct(field_summary["ours"][f], ours_total) 449 + b = pct(field_summary["bsky"][f], bsky_total) 450 + print(f" {f:<16} {o:>8} {b:>8}") 451 + print(f" {'─' * 16} {'─' * 8} {'─' * 8}") 452 + print(f" {'total actors':<16} {ours_total:>8} {bsky_total:>8}") 453 + 454 + 455 + def print_display_name_table(results: list[dict]): 456 + print(f"\n{BOLD}--- display name search ---{RESET}") 457 + print(f" {'query':<20} {'found?':>8} {'ours':>6} {'bsky':>6} samples") 458 + print(f" {'─' * 20} {'─' * 8} {'─' * 6} {'─' * 6} {'─' * 30}") 459 + for r in results: 460 + found = f"{GREEN}yes{RESET}" if r["found"] else f"{RED}no{RESET}" 461 + samples = ", ".join(r["ours_sample"][:3]) if r["ours_sample"] else "—" 462 + print(f" {r['query']:<20} {found:>17} {r['ours_count']:>6} {r['bsky_count']:>6} {samples}") 463 + 464 + 465 + def print_stress_table(results: list[StressResult]): 466 + print(f"\n{BOLD}--- stress test (ours only) ---{RESET}") 467 + print(f" {'concurrency':>12} {'ok':>6} {'429s':>6} {'5xx':>6} {'p50':>8} {'p95':>8}") 468 + print(f" {'─' * 12} {'─' * 6} {'─' * 6} {'─' * 6} {'─' * 8} {'─' * 8}") 469 + for r in results: 470 + lats = sorted(r.latencies_ms) 471 + p50 = fmt_ms(lats[len(lats) // 2]) if lats else "—" 472 + p95 = fmt_ms(lats[int(len(lats) * 0.95)]) if len(lats) >= 2 else p50 473 + print(f" {r.concurrency:>12} {r.ok:>6} {r.rate_limited:>6} {r.errors:>6} {p50:>8} {p95:>8}") 474 + 475 + 476 + # ── JSON report ───────────────────────────────────────────────────── 477 + 478 + def build_report( 479 + ours_url: str, 480 + corpus: list[str], 481 + runs: int, 482 + latency: list[LatencyStats], 483 + coverage: list[CoverageResult], 484 + field_summary: dict, 485 + display_name: list[dict], 486 + stress: list[StressResult], 487 + ) -> dict: 488 + def latency_entry(s: LatencyStats) -> dict: 489 + return { 490 + "query": s.query, 491 + "ours_cold": s.summarize("ours_cold"), 492 + "ours_warm": s.summarize("ours_warm"), 493 + "bsky": s.summarize("bsky"), 494 + } 495 + 496 + def coverage_entry(r: CoverageResult) -> dict: 497 + return { 498 + "query": r.query, 499 + "ours_count": len(r.ours_dids), 500 + "bsky_count": len(r.bsky_dids), 501 + "overlap_count": len(r.overlap), 502 + "overlap_pct": round(len(r.overlap) * 100 / len(r.bsky_dids), 1) if r.bsky_dids else None, 503 + "ours_extras": len(r.ours_extras), 504 + "bsky_extras": len(r.bsky_extras), 505 + "avg_rank_delta": round(statistics.mean(r.rank_deltas), 2) if r.rank_deltas else None, 506 + } 507 + 508 + def stress_entry(r: StressResult) -> dict: 509 + lats = sorted(r.latencies_ms) 510 + return { 511 + "concurrency": r.concurrency, 512 + "ok": r.ok, 513 + "rate_limited": r.rate_limited, 514 + "errors": r.errors, 515 + "p50_ms": round(lats[len(lats) // 2], 1) if lats else None, 516 + "p95_ms": round(lats[int(len(lats) * 0.95)], 1) if len(lats) >= 2 else None, 517 + } 518 + 519 + return { 520 + "meta": { 521 + "target": ours_url, 522 + "baseline": BSKY, 523 + "corpus_size": len(corpus), 524 + "runs": runs, 525 + "date": datetime.now(timezone.utc).isoformat(), 526 + }, 527 + "latency": [latency_entry(s) for s in latency], 528 + "coverage": [coverage_entry(r) for r in coverage], 529 + "field_completeness": field_summary, 530 + "display_name_search": display_name, 531 + "stress": [stress_entry(r) for r in stress], 532 + } 533 + 534 + 535 + # ── main ──────────────────────────────────────────────────────────── 536 + 537 + async def main(): 538 + parser = argparse.ArgumentParser(description="typeahead performance benchmark") 539 + parser.add_argument("--url", default=OURS_DEFAULT, help=f"our API URL (default: {OURS_DEFAULT})") 540 + parser.add_argument("--quick", action="store_true", help="10 queries, 1 run") 541 + parser.add_argument("--no-stress", action="store_true", help="skip stress test") 542 + parser.add_argument("--queries", nargs="+", help="specific queries only") 543 + parser.add_argument("--runs", type=int, default=3, help="runs per query for latency (default: 3)") 544 + parser.add_argument("--output", default="scripts/bench-results.json", help="JSON report path") 545 + args = parser.parse_args() 546 + 547 + if args.quick: 548 + args.runs = 1 549 + 550 + corpus = args.queries or (QUICK_CORPUS if args.quick else FULL_CORPUS) 551 + dn_queries = [q for q in DISPLAY_NAME_QUERIES if q in corpus] or DISPLAY_NAME_QUERIES[:2] 552 + 553 + print(f"\n{BOLD}=== typeahead benchmark ==={RESET}") 554 + print(f" target: {args.url}") 555 + print(f" baseline: {BSKY}") 556 + print(f" corpus: {len(corpus)} queries, {args.runs} run(s) each") 557 + print(f" date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") 558 + 559 + async with httpx.AsyncClient( 560 + headers={"User-Agent": "typeahead-bench/1.0", "X-Client": "bench"}, 561 + follow_redirects=True, 562 + ) as client: 563 + # 1. latency (cold + warm) 564 + print(f"\n{CYAN}[1/4] latency comparison (cold + warm){RESET}") 565 + latency = await run_latency(client, args.url, corpus, args.runs) 566 + print_latency_table(latency) 567 + 568 + # 2. coverage + field completeness (single pass) 569 + print(f"\n{CYAN}[2/4] coverage + field completeness{RESET}") 570 + coverage, field_summary = await run_coverage_and_fields(client, args.url, corpus) 571 + print_coverage_table(coverage) 572 + print_field_table(field_summary) 573 + 574 + # 3. display name search 575 + print(f"\n{CYAN}[3/4] display name search{RESET}") 576 + display_name = await run_display_name_check(client, args.url, dn_queries) 577 + print_display_name_table(display_name) 578 + 579 + # 4. stress test 580 + stress = [] 581 + if args.no_stress: 582 + print(f"\n{CYAN}[4/4] stress test{RESET}") 583 + print(f" {YELLOW}skipped{RESET}") 584 + else: 585 + print(f"\n{CYAN}[4/4] stress test (ours only){RESET}") 586 + stress = await run_stress(client, args.url, corpus, [5, 10, 20]) 587 + print_stress_table(stress) 588 + 589 + # write report 590 + report = build_report(args.url, corpus, args.runs, latency, coverage, field_summary, display_name, stress) 591 + out_path = Path(args.output) 592 + out_path.parent.mkdir(parents=True, exist_ok=True) 593 + out_path.write_text(json.dumps(report, indent=2) + "\n") 594 + print(f"\n full report: {out_path}") 595 + print() 596 + 597 + 598 + if __name__ == "__main__": 599 + asyncio.run(main())
+176
scripts/targeted-backfill.py
··· 1 + #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [] 5 + # /// 6 + """ 7 + targeted backfill: fetch follows/followers from specific accounts, 8 + enrich via getProfiles, upsert into turso. 9 + 10 + usage: 11 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/targeted-backfill.py 12 + """ 13 + 14 + import json 15 + import os 16 + import re 17 + import sys 18 + import time 19 + import urllib.request 20 + import urllib.error 21 + 22 + BSKY = "https://public.api.bsky.app/xrpc" 23 + WRITE_BATCH = 50 24 + WRITE_PAUSE = 0.2 25 + 26 + def env(k): 27 + v = os.environ.get(k, "") 28 + if not v: print(f"error: {k} not set", file=sys.stderr); sys.exit(1) 29 + return v 30 + 31 + def turso_batch_write(stmts, url, token): 32 + reqs = [{"type": "execute", "stmt": s} for s in stmts] 33 + reqs.append({"type": "close"}) 34 + body = json.dumps({"requests": reqs}).encode() 35 + req = urllib.request.Request(f"{url}/v3/pipeline", data=body, headers={ 36 + "Authorization": f"Bearer {token}", "Content-Type": "application/json", 37 + }) 38 + try: 39 + with urllib.request.urlopen(req, timeout=60) as resp: 40 + json.loads(resp.read()) 41 + return True 42 + except Exception as e: 43 + print(f" turso write failed: {e}", file=sys.stderr) 44 + return False 45 + 46 + def fetch_all(endpoint, actor, key): 47 + """page through a list endpoint, return all items""" 48 + items = [] 49 + cursor = None 50 + while True: 51 + url = f"{BSKY}/{endpoint}?actor={actor}&limit=100" 52 + if cursor: url += f"&cursor={cursor}" 53 + req = urllib.request.Request(url) 54 + with urllib.request.urlopen(req, timeout=15) as resp: 55 + data = json.loads(resp.read()) 56 + batch = data.get(key, []) 57 + items.extend(batch) 58 + cursor = data.get("cursor") 59 + print(f" fetched {len(items)} {key}...", end="\r") 60 + if not cursor or not batch: 61 + break 62 + time.sleep(0.1) 63 + print() 64 + return items 65 + 66 + def fetch_profiles(dids): 67 + params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids) 68 + req = urllib.request.Request( 69 + f"{BSKY}/app.bsky.actor.getProfiles?{params}", 70 + headers={"User-Agent": "typeahead-enrich/1.0"}, 71 + ) 72 + try: 73 + with urllib.request.urlopen(req, timeout=15) as resp: 74 + return json.loads(resp.read()).get("profiles", []) 75 + except urllib.error.HTTPError as e: 76 + if e.code == 429: return "rate_limited" 77 + return [] 78 + except Exception: 79 + return [] 80 + 81 + def extract_avatar_cid(url): 82 + if not url: return "" 83 + m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url) 84 + return m.group(1) if m else "" 85 + 86 + def clean_associated(assoc): 87 + if not assoc or not isinstance(assoc, dict): return "{}" 88 + clean = {k: v for k, v in assoc.items() if v not in (0, False, None)} 89 + return json.dumps(clean) if clean else "{}" 90 + 91 + def profile_to_stmt(p): 92 + hide_vals = {"!hide", "!takedown", "!suspend", "spam"} 93 + mod_did = "did:plc:ar7c4by46qjdydhdevvrndac" 94 + hidden = 0 95 + for lbl in (p.get("labels") or []): 96 + if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did): 97 + hidden = 1; break 98 + return { 99 + "sql": """INSERT INTO actors (did, handle, display_name, avatar_url, labels, hidden, created_at, associated, updated_at) 100 + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, unixepoch()) 101 + ON CONFLICT(did) DO UPDATE SET 102 + handle = COALESCE(NULLIF(?2, ''), actors.handle), 103 + display_name = COALESCE(NULLIF(?3, ''), actors.display_name), 104 + avatar_url = COALESCE(NULLIF(?4, ''), actors.avatar_url), 105 + labels = ?5, hidden = ?6, 106 + created_at = COALESCE(NULLIF(?7, ''), actors.created_at), 107 + associated = COALESCE(NULLIF(?8, '{}'), actors.associated), 108 + profile_checked_at = unixepoch(), 109 + updated_at = unixepoch()""", 110 + "args": [ 111 + {"type": "text", "value": p["did"]}, 112 + {"type": "text", "value": p.get("handle", "")}, 113 + {"type": "text", "value": p.get("displayName", "")}, 114 + {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))}, 115 + {"type": "text", "value": json.dumps(p.get("labels", []))}, 116 + {"type": "integer", "value": str(hidden)}, 117 + {"type": "text", "value": p.get("createdAt", "")}, 118 + {"type": "text", "value": clean_associated(p.get("associated"))}, 119 + ], 120 + } 121 + 122 + def main(): 123 + turso_url = env("TURSO_URL").replace("libsql://", "https://") 124 + turso_token = env("TURSO_AUTH_TOKEN") 125 + 126 + # collect DIDs 127 + dids = set() 128 + 129 + print("atprotocol.dev → following:") 130 + follows = fetch_all("app.bsky.graph.getFollows", "atprotocol.dev", "follows") 131 + for f in follows: 132 + dids.add(f["did"]) 133 + 134 + print("atmosphereconf.org → followers:") 135 + followers = fetch_all("app.bsky.graph.getFollowers", "atmosphereconf.org", "followers") 136 + for f in followers: 137 + dids.add(f["did"]) 138 + 139 + dids = list(dids) 140 + print(f"\n{len(dids)} unique DIDs to enrich") 141 + 142 + # enrich via getProfiles in batches of 25 143 + enriched = 0 144 + t0 = time.time() 145 + all_stmts = [] 146 + 147 + for i in range(0, len(dids), 25): 148 + batch = dids[i:i+25] 149 + result = fetch_profiles(batch) 150 + if result == "rate_limited": 151 + print(" rate limited — pausing 30s...") 152 + time.sleep(30) 153 + result = fetch_profiles(batch) 154 + if result == "rate_limited": 155 + print(" still limited, skipping") 156 + continue 157 + for p in result: 158 + all_stmts.append(profile_to_stmt(p)) 159 + enriched += 1 160 + print(f" enriched {enriched}/{len(dids)}...", end="\r") 161 + time.sleep(0.05) 162 + 163 + print(f"\n\nenriched {enriched} profiles, writing to turso...") 164 + 165 + # write in small batches 166 + for i in range(0, len(all_stmts), WRITE_BATCH): 167 + chunk = all_stmts[i:i+WRITE_BATCH] 168 + turso_batch_write(chunk, turso_url, turso_token) 169 + print(f" wrote {min(i+WRITE_BATCH, len(all_stmts))}/{len(all_stmts)}", end="\r") 170 + time.sleep(WRITE_PAUSE) 171 + 172 + elapsed = time.time() - t0 173 + print(f"\n\ndone in {elapsed:.0f}s. enriched {enriched} actors.") 174 + 175 + if __name__ == "__main__": 176 + main()
+30 -1
src/handlers/search.ts
··· 1 1 import type { TursoDB } from "../db"; 2 2 import type { ActorRow, Env } from "../types"; 3 + import { CORS_HEADERS } from "../types"; 3 4 import { json, sanitize, avatarUrl } from "../utils"; 4 5 import { recordMetric, recordCacheHit, recordTrafficSource } from "../metrics"; 5 6 import { throttledBackfill } from "../backfill"; ··· 42 43 return cached; 43 44 } 44 45 45 - // 3-tier ranking: exact handle → handle prefix → FTS prefix 46 + // try fly backend (local SQLite replica) if configured 47 + if (env.SEARCH_BACKEND_URL) { 48 + try { 49 + const backendUrl = `${env.SEARCH_BACKEND_URL}/search?q=${encodeURIComponent(term)}&limit=${limit}`; 50 + const backendRes = await fetch(backendUrl, { signal: AbortSignal.timeout(3000) }); 51 + if (backendRes.ok) { 52 + const body = await backendRes.text(); 53 + 54 + ctx.waitUntil(Promise.all([ 55 + recordMetric(db, Date.now() - t0), 56 + recordTrafficSource(db, request), 57 + ])); 58 + 59 + const response = new Response(body, { 60 + headers: { "Content-Type": "application/json", ...CORS_HEADERS }, 61 + }); 62 + 63 + const cacheable = new Response(response.clone().body, response); 64 + cacheable.headers.set("Cache-Control", "public, max-age=60"); 65 + ctx.waitUntil(cache.put(cacheKey, cacheable)); 66 + 67 + return response; 68 + } 69 + } catch { 70 + // fall through to turso 71 + } 72 + } 73 + 74 + // fallback: 3-tier ranking via turso 46 75 const ftsQuery = `"${term}"*`; 47 76 const [exactRes, prefixRes, ftsRes] = await db.batch([ 48 77 db.prepare(
+2 -2
src/handlers/stats.ts
··· 25 25 materialized_at: number; 26 26 } | null; 27 27 28 - // traffic sources: live from turso (7 rows, real-time) 28 + // traffic sources: live from turso (exclude internal/testing traffic) 29 29 const trafficRes = await db.prepare( 30 - "SELECT domain, hits FROM traffic_sources ORDER BY hits DESC LIMIT 10" 30 + "SELECT domain, hits FROM traffic_sources WHERE domain != 'bench' ORDER BY hits DESC LIMIT 10" 31 31 ).all<{ domain: string; hits: number }>(); 32 32 33 33 const tQuery = performance.now();
+2 -2
src/pages/stats.ts
··· 188 188 <div class="value">${d.totalSearches.toLocaleString()}</div> 189 189 </div> 190 190 <div class="metric"> 191 - <div class="label" data-tip="last 24h — cached (edge) vs uncached (turso)">avg latency</div> 192 - <div class="value" style="font-size:1.3rem">${d.avgCacheLatency.toFixed(0)} ms <span style="opacity:.4;font-size:.85rem">cached</span> · ${d.avgLatency.toFixed(0)} ms <span style="opacity:.4;font-size:.85rem">cold</span></div> 191 + <div class="label" data-tip="last 24h — cached (edge) vs uncached (fly replica)">avg latency</div> 192 + <div class="value" style="font-size:1.3rem">${d.avgCacheLatency > 0 ? `${d.avgCacheLatency.toFixed(1)} ms <span style="opacity:.4;font-size:.85rem">cached</span> · ` : ''}${d.avgLatency.toFixed(0)} ms${d.avgCacheLatency > 0 ? ' <span style="opacity:.4;font-size:.85rem">cold</span>' : ''}</div> 193 193 </div> 194 194 </div> 195 195
+1
src/types.ts
··· 5 5 RATE_LIMITER_STRICT: RateLimit; 6 6 TURSO_URL: string; 7 7 TURSO_AUTH_TOKEN: string; 8 + SEARCH_BACKEND_URL?: string; 8 9 } 9 10 10 11 export interface ActorRow {