GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

clean up: remove one-time migration scripts, update gitignore

delete 9 completed migration/backfill scripts (SQL schema migrations,
migrate-to-turso, backfill-created-at, backfill-moderation, backfill-profiles).
add .env, zig-pkg/ to gitignore.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+2 -1070
+2
.gitignore
··· 3 3 .zig-cache/ 4 4 zig-out/ 5 5 zig-cache/ 6 + zig-pkg/ 6 7 .dev.vars 8 + .env 7 9 scripts/bench-results.json
-6
scripts/add-actor-deltas.sql
··· 1 - CREATE TABLE IF NOT EXISTS actor_deltas ( 2 - bucket INTEGER PRIMARY KEY, -- 5-min bucket (Date.now() / 300_000) 3 - actors_delta INTEGER NOT NULL DEFAULT 0, 4 - handles_delta INTEGER NOT NULL DEFAULT 0, 5 - avatars_delta INTEGER NOT NULL DEFAULT 0 6 - );
-5
scripts/add-created-associated.sql
··· 1 - -- one-shot migration: add created_at and associated columns to actors table 2 - -- run against Turso before deploying the updated worker 3 - 4 - ALTER TABLE actors ADD COLUMN created_at TEXT DEFAULT ''; 5 - ALTER TABLE actors ADD COLUMN associated TEXT DEFAULT '{}';
-6
scripts/add-enrichment-columns.sql
··· 1 - -- one-shot migration: add enrichment pipeline columns to actors table 2 - -- run once against Turso before deploying the updated worker 3 - 4 - ALTER TABLE actors ADD COLUMN pds TEXT DEFAULT ''; 5 - ALTER TABLE actors ADD COLUMN identity_checked_at INTEGER DEFAULT 0; 6 - ALTER TABLE actors ADD COLUMN profile_checked_at INTEGER DEFAULT 0;
-1
scripts/add-labels-column.sql
··· 1 - ALTER TABLE actors ADD COLUMN labels TEXT NOT NULL DEFAULT '[]';
-237
scripts/backfill-created-at.py
··· 1 - #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 - # /// script 3 - # requires-python = ">=3.12" 4 - # dependencies = [] 5 - # /// 6 - """ 7 - bulk backfill created_at by streaming the PLC directory export. 8 - 9 - loads all DIDs missing created_at from Turso into a set, then streams 10 - the PLC export (JSONL, chronological) matching creation operations 11 - against our set. batches updates to Turso as matches accumulate. 12 - 13 - usage: 14 - TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-created-at.py 15 - TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-created-at.py --dry-run 16 - """ 17 - 18 - import argparse 19 - import json 20 - import os 21 - import sys 22 - import time 23 - import urllib.request 24 - 25 - PLC_EXPORT_URL = "https://plc.directory/export" 26 - PLC_PAGE_SIZE = 1000 27 - TURSO_BATCH_SIZE = 200 28 - FLUSH_THRESHOLD = 500 # flush to Turso every N matches 29 - 30 - DIM = "\033[2m" 31 - RESET = "\033[0m" 32 - 33 - 34 - # --- turso helpers --- 35 - 36 - def get_turso_url() -> str: 37 - url = os.environ.get("TURSO_URL", "") 38 - if not url: 39 - print("error: TURSO_URL not set", file=sys.stderr) 40 - sys.exit(1) 41 - return url.replace("libsql://", "https://") 42 - 43 - 44 - def get_turso_token() -> str: 45 - token = os.environ.get("TURSO_AUTH_TOKEN", "") 46 - if not token: 47 - print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr) 48 - sys.exit(1) 49 - return token 50 - 51 - 52 - def turso_query_all(sql: str, turso_url: str, turso_token: str) -> list[dict]: 53 - """paginated query to fetch all rows.""" 54 - all_rows = [] 55 - offset = 0 56 - page = 10000 57 - while True: 58 - body = json.dumps({ 59 - "requests": [ 60 - {"type": "execute", "stmt": { 61 - "sql": f"{sql} LIMIT {page} OFFSET {offset}", 62 - "args": [], 63 - }}, 64 - {"type": "close"}, 65 - ] 66 - }).encode() 67 - req = urllib.request.Request( 68 - f"{turso_url}/v3/pipeline", 69 - data=body, 70 - headers={ 71 - "Authorization": f"Bearer {turso_token}", 72 - "Content-Type": "application/json", 73 - }, 74 - ) 75 - with urllib.request.urlopen(req, timeout=60) as resp: 76 - result = json.loads(resp.read()) 77 - res = result["results"][0] 78 - if res.get("type") == "error": 79 - print(f" turso error: {res['error']['message']}", file=sys.stderr) 80 - break 81 - cols = [c["name"] for c in res["response"]["result"]["cols"]] 82 - rows = [] 83 - for row in res["response"]["result"]["rows"]: 84 - rows.append({c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)}) 85 - all_rows.extend(rows) 86 - if len(rows) < page: 87 - break 88 - offset += page 89 - sys.stdout.write(f"\r loading DIDs... {len(all_rows):,}") 90 - sys.stdout.flush() 91 - return all_rows 92 - 93 - 94 - def turso_batch(stmts: list[dict], turso_url: str, turso_token: str) -> bool: 95 - requests = [{"type": "execute", "stmt": s} for s in stmts] 96 - requests.append({"type": "close"}) 97 - body = json.dumps({"requests": requests}).encode() 98 - req = urllib.request.Request( 99 - f"{turso_url}/v3/pipeline", 100 - data=body, 101 - headers={ 102 - "Authorization": f"Bearer {turso_token}", 103 - "Content-Type": "application/json", 104 - }, 105 - ) 106 - try: 107 - with urllib.request.urlopen(req, timeout=60) as resp: 108 - result = json.loads(resp.read()) 109 - for r in result.get("results", []): 110 - if r.get("type") == "error": 111 - print(f" turso error: {r.get('error', {}).get('message', 'unknown')}") 112 - return False 113 - return True 114 - except urllib.error.HTTPError as e: 115 - err_body = e.read().decode()[:300] 116 - print(f" turso HTTP {e.code}: {err_body}", file=sys.stderr) 117 - return False 118 - except Exception as e: 119 - print(f" turso request failed: {e}", file=sys.stderr) 120 - return False 121 - 122 - 123 - # --- PLC export streaming --- 124 - 125 - def stream_plc_export(after: str = "") -> list[dict]: 126 - """fetch one page from PLC export. returns list of ops.""" 127 - url = f"{PLC_EXPORT_URL}?count={PLC_PAGE_SIZE}" 128 - if after: 129 - url += f"&after={after}" 130 - req = urllib.request.Request(url, headers={"User-Agent": "typeahead-backfill/1.0"}) 131 - try: 132 - with urllib.request.urlopen(req, timeout=30) as resp: 133 - lines = resp.read().decode().strip().split("\n") 134 - return [json.loads(line) for line in lines if line] 135 - except urllib.error.HTTPError as e: 136 - if e.code == 429: 137 - print("\n PLC rate limited — pausing 10s") 138 - time.sleep(10) 139 - return stream_plc_export(after) 140 - raise 141 - except Exception as e: 142 - print(f"\n PLC fetch error: {e} — retrying in 5s") 143 - time.sleep(5) 144 - return stream_plc_export(after) 145 - 146 - 147 - # --- main --- 148 - 149 - def main(): 150 - parser = argparse.ArgumentParser(description="bulk backfill created_at from PLC directory export") 151 - parser.add_argument("--dry-run", action="store_true", help="stream + match but don't write") 152 - args = parser.parse_args() 153 - 154 - turso_url = get_turso_url() 155 - turso_token = get_turso_token() 156 - 157 - # step 1: load all DIDs missing created_at 158 - print("loading DIDs missing created_at from Turso...") 159 - rows = turso_query_all( 160 - "SELECT did FROM actors WHERE length(created_at) = 0 ORDER BY rowid ASC", 161 - turso_url, turso_token, 162 - ) 163 - wanted = {r["did"] for r in rows} 164 - print(f"\n {len(wanted):,} DIDs to backfill") 165 - 166 - if not wanted: 167 - print("nothing to do.") 168 - return 169 - 170 - # step 2: stream PLC export, match creations 171 - matched = 0 172 - scanned = 0 173 - pending: list[dict] = [] # buffered Turso statements 174 - after = "" 175 - t0 = time.time() 176 - 177 - print(f"streaming PLC export... {'(DRY RUN)' if args.dry_run else ''}") 178 - 179 - while wanted: 180 - ops = stream_plc_export(after) 181 - if not ops: 182 - break 183 - 184 - for op in ops: 185 - scanned += 1 186 - did = op.get("did", "") 187 - created_at = op.get("createdAt", "") 188 - 189 - # only care about creation ops for DIDs we're looking for 190 - if did not in wanted: 191 - continue 192 - 193 - # first op for a DID is its creation (prev=null) 194 - prev = op.get("operation", {}).get("prev") 195 - if prev is not None: 196 - continue 197 - 198 - wanted.discard(did) 199 - matched += 1 200 - 201 - pending.append({ 202 - "sql": "UPDATE actors SET created_at = ?1 WHERE did = ?2", 203 - "args": [ 204 - {"type": "text", "value": created_at}, 205 - {"type": "text", "value": did}, 206 - ], 207 - }) 208 - 209 - # flush when buffer is full 210 - if len(pending) >= FLUSH_THRESHOLD and not args.dry_run: 211 - for i in range(0, len(pending), TURSO_BATCH_SIZE): 212 - turso_batch(pending[i : i + TURSO_BATCH_SIZE], turso_url, turso_token) 213 - pending.clear() 214 - 215 - after = ops[-1]["createdAt"] 216 - elapsed = time.time() - t0 217 - rate = scanned / elapsed if elapsed > 0 else 0 218 - remaining = len(wanted) 219 - tag = "dry" if args.dry_run else "live" 220 - sys.stdout.write( 221 - f"\r [{tag}] scanned={scanned:,} matched={matched:,} " 222 - f"remaining={remaining:,} " 223 - f"{DIM}{rate:.0f} ops/s cursor={after}{RESET} " 224 - ) 225 - sys.stdout.flush() 226 - 227 - # flush remaining 228 - if pending and not args.dry_run: 229 - for i in range(0, len(pending), TURSO_BATCH_SIZE): 230 - turso_batch(pending[i : i + TURSO_BATCH_SIZE], turso_url, turso_token) 231 - 232 - elapsed = time.time() - t0 233 - print(f"\n\ndone in {elapsed:.0f}s. matched={matched:,}, missed={len(wanted):,}, scanned={scanned:,} ops") 234 - 235 - 236 - if __name__ == "__main__": 237 - main()
-153
scripts/backfill-moderation.py
··· 1 - #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 - # /// script 3 - # requires-python = ">=3.12" 4 - # dependencies = [] 5 - # /// 6 - """ 7 - one-shot: sweep entire actors table and set hidden flags from bsky moderation labels. 8 - 9 - usage: 10 - ./scripts/backfill-moderation.py 11 - """ 12 - 13 - import json 14 - import subprocess 15 - import sys 16 - import time 17 - import urllib.request 18 - 19 - BSKY_MOD_DID = "did:plc:ar7c4by46qjdydhdevvrndac" 20 - MOD_HIDE_VALS = {"!hide", "!takedown", "spam"} 21 - ANY_SRC_HIDE_VALS = {"!no-unauthenticated"} 22 - GET_PROFILES_URL = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 23 - BATCH_SIZE = 25 # getProfiles limit 24 - PAGE_SIZE = 500 # actors per D1 query 25 - DELAY = 0.3 # seconds between API calls 26 - 27 - 28 - def should_hide(labels: list | None) -> bool: 29 - if not labels: 30 - return False 31 - now = time.time() * 1000 32 - for l in labels: 33 - if l.get("neg"): 34 - continue 35 - if l.get("exp") and _parse_ts(l["exp"]) <= now: 36 - continue 37 - if l.get("src") == BSKY_MOD_DID and l.get("val") in MOD_HIDE_VALS: 38 - return True 39 - if l.get("val") in ANY_SRC_HIDE_VALS: 40 - return True 41 - return False 42 - 43 - 44 - def _parse_ts(s: str) -> float: 45 - """rough ISO8601 parse — good enough for expiry comparison.""" 46 - from datetime import datetime, timezone 47 - try: 48 - return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() * 1000 49 - except Exception: 50 - return 0 51 - 52 - 53 - def d1_query(sql: str) -> list[dict]: 54 - result = subprocess.run( 55 - ["npx", "wrangler", "d1", "execute", "typeahead-db", "--remote", "--command", sql, "--json"], 56 - capture_output=True, text=True, cwd="." 57 - ) 58 - # --json still emits warnings to stdout; extract the JSON array 59 - stdout = result.stdout 60 - bracket = stdout.find("[") 61 - if bracket == -1: 62 - if result.returncode != 0: 63 - print(f" D1 error: {result.stderr[:200]}", file=sys.stderr) 64 - return [] 65 - try: 66 - data = json.loads(stdout[bracket:]) 67 - return data[0]["results"] if data else [] 68 - except (json.JSONDecodeError, IndexError, KeyError) as e: 69 - print(f" D1 parse error: {e}", file=sys.stderr) 70 - return [] 71 - 72 - 73 - def fetch_profiles(dids: list[str]) -> list[dict]: 74 - params = "&".join(f"actors={d}" for d in dids) 75 - url = f"{GET_PROFILES_URL}?{params}" 76 - req = urllib.request.Request(url, headers={"User-Agent": "typeahead-mod-backfill/1.0"}) 77 - try: 78 - with urllib.request.urlopen(req, timeout=15) as resp: 79 - data = json.loads(resp.read()) 80 - return data.get("profiles", []) 81 - except urllib.error.HTTPError as e: 82 - if e.code == 429: 83 - print(" rate limited — pausing 60s") 84 - time.sleep(60) 85 - return fetch_profiles(dids) # retry once 86 - print(f" HTTP {e.code}") 87 - return [] 88 - except Exception as e: 89 - print(f" error: {e}") 90 - return [] 91 - 92 - 93 - def main(): 94 - cursor = 0 95 - total_checked = 0 96 - total_hidden = 0 97 - total_unhidden = 0 98 - 99 - while True: 100 - rows = d1_query( 101 - f"SELECT rowid, did FROM actors WHERE rowid > {cursor} ORDER BY rowid ASC LIMIT {PAGE_SIZE}" 102 - ) 103 - if not rows: 104 - break 105 - 106 - page_hidden = 0 107 - page_unhidden = 0 108 - 109 - for i in range(0, len(rows), BATCH_SIZE): 110 - batch = rows[i : i + BATCH_SIZE] 111 - dids = [r["did"] for r in batch] 112 - 113 - if i > 0: 114 - time.sleep(DELAY) 115 - 116 - profiles = fetch_profiles(dids) 117 - total_checked += len(profiles) 118 - 119 - hide_dids = [] 120 - unhide_dids = [] 121 - for p in profiles: 122 - if should_hide(p.get("labels")): 123 - hide_dids.append(p["did"]) 124 - else: 125 - unhide_dids.append(p["did"]) 126 - 127 - if hide_dids: 128 - did_list = ", ".join(f"'{d}'" for d in hide_dids) 129 - changed = d1_query( 130 - f"UPDATE actors SET hidden = 1 WHERE did IN ({did_list}) AND hidden = 0" 131 - ) 132 - page_hidden += len(hide_dids) 133 - 134 - if unhide_dids: 135 - did_list = ", ".join(f"'{d}'" for d in unhide_dids) 136 - d1_query( 137 - f"UPDATE actors SET hidden = 0 WHERE did IN ({did_list}) AND hidden = 1" 138 - ) 139 - 140 - cursor = rows[-1]["rowid"] 141 - total_hidden += page_hidden 142 - total_unhidden += page_unhidden 143 - 144 - print( 145 - f" cursor={cursor} checked={total_checked} hidden={total_hidden} " 146 - f"(page: {len(rows)} actors, {page_hidden} newly hidden)" 147 - ) 148 - 149 - print(f"\ndone. checked={total_checked}, hidden={total_hidden}, unhidden={total_unhidden}") 150 - 151 - 152 - if __name__ == "__main__": 153 - main()
-303
scripts/backfill-profiles.py
··· 1 - #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 - # /// script 3 - # requires-python = ">=3.12" 4 - # dependencies = [] 5 - # /// 6 - """ 7 - bulk enrichment: walk actors table and populate labels, handle, display_name, 8 - avatar_url via bsky's public getProfiles API (25 DIDs per call). 9 - 10 - targets actors with labels='[]' (default). also recomputes hidden from full 11 - label data, fixing actors incorrectly hidden by stale !no-unauthenticated logic. 12 - 13 - usage: 14 - TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py 15 - TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --dry-run --limit 100 16 - TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --limit 1000 --offset 50000 17 - """ 18 - 19 - import argparse 20 - import json 21 - import os 22 - import re 23 - import sys 24 - import time 25 - import urllib.request 26 - 27 - GET_PROFILES_URL = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 28 - BSKY_MOD_DID = "did:plc:ar7c4by46qjdydhdevvrndac" 29 - MOD_HIDE_VALS = {"!hide", "!takedown", "spam"} 30 - 31 - BATCH_SIZE = 25 # getProfiles limit 32 - PAGE_SIZE = 500 # actors per Turso query 33 - TURSO_BATCH_SIZE = 200 # statements per pipeline call 34 - DELAY = 0.3 # seconds between getProfiles calls 35 - 36 - DIM = "\033[2m" 37 - RESET = "\033[0m" 38 - 39 - 40 - # --- turso helpers (from migrate-to-turso.py) --- 41 - 42 - def get_turso_url() -> str: 43 - url = os.environ.get("TURSO_URL", "") 44 - if not url: 45 - print("error: TURSO_URL not set", file=sys.stderr) 46 - sys.exit(1) 47 - return url.replace("libsql://", "https://") 48 - 49 - 50 - def get_turso_token() -> str: 51 - token = os.environ.get("TURSO_AUTH_TOKEN", "") 52 - if not token: 53 - print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr) 54 - sys.exit(1) 55 - return token 56 - 57 - 58 - def turso_query(sql: str, args: list, turso_url: str, turso_token: str) -> list[dict]: 59 - """single query via pipeline API, returns rows as dicts.""" 60 - body = json.dumps({ 61 - "requests": [ 62 - {"type": "execute", "stmt": {"sql": sql, "args": args}}, 63 - {"type": "close"}, 64 - ] 65 - }).encode() 66 - req = urllib.request.Request( 67 - f"{turso_url}/v3/pipeline", 68 - data=body, 69 - headers={ 70 - "Authorization": f"Bearer {turso_token}", 71 - "Content-Type": "application/json", 72 - }, 73 - ) 74 - try: 75 - with urllib.request.urlopen(req, timeout=30) as resp: 76 - result = json.loads(resp.read()) 77 - res = result["results"][0] 78 - if res.get("type") == "error": 79 - print(f" turso error: {res['error']['message']}", file=sys.stderr) 80 - return [] 81 - cols = [c["name"] for c in res["response"]["result"]["cols"]] 82 - rows = [] 83 - for row in res["response"]["result"]["rows"]: 84 - rows.append({c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)}) 85 - return rows 86 - except Exception as e: 87 - print(f" turso query failed: {e}", file=sys.stderr) 88 - return [] 89 - 90 - 91 - def turso_batch(stmts: list[dict], turso_url: str, turso_token: str) -> bool: 92 - """execute a batch of statements against Turso via HTTP pipeline API.""" 93 - requests = [{"type": "execute", "stmt": s} for s in stmts] 94 - requests.append({"type": "close"}) 95 - 96 - body = json.dumps({"requests": requests}).encode() 97 - req = urllib.request.Request( 98 - f"{turso_url}/v3/pipeline", 99 - data=body, 100 - headers={ 101 - "Authorization": f"Bearer {turso_token}", 102 - "Content-Type": "application/json", 103 - }, 104 - ) 105 - try: 106 - with urllib.request.urlopen(req, timeout=60) as resp: 107 - result = json.loads(resp.read()) 108 - for r in result.get("results", []): 109 - if r.get("type") == "error": 110 - print(f" turso error: {r.get('error', {}).get('message', 'unknown')}") 111 - return False 112 - return True 113 - except urllib.error.HTTPError as e: 114 - err_body = e.read().decode()[:300] 115 - print(f" turso HTTP {e.code}: {err_body}", file=sys.stderr) 116 - return False 117 - except Exception as e: 118 - print(f" turso request failed: {e}", file=sys.stderr) 119 - return False 120 - 121 - 122 - # --- bsky helpers --- 123 - 124 - def fetch_profiles(dids: list[str]) -> list[dict]: 125 - params = "&".join(f"actors={d}" for d in dids) 126 - url = f"{GET_PROFILES_URL}?{params}" 127 - req = urllib.request.Request(url, headers={"User-Agent": "typeahead-profile-backfill/1.0"}) 128 - try: 129 - with urllib.request.urlopen(req, timeout=15) as resp: 130 - data = json.loads(resp.read()) 131 - return data.get("profiles", []) 132 - except urllib.error.HTTPError as e: 133 - if e.code == 429: 134 - print("\n rate limited — pausing 60s") 135 - time.sleep(60) 136 - return fetch_profiles(dids) # retry once 137 - print(f"\n HTTP {e.code}") 138 - return [] 139 - except Exception as e: 140 - print(f"\n error: {e}") 141 - return [] 142 - 143 - 144 - _AVATAR_CID_RE = re.compile(r"/([^/]+)@jpeg$") 145 - 146 - 147 - def extract_avatar_cid(url: str) -> str: 148 - """extract CID from bsky CDN avatar URL — matches worker's extractAvatarCid.""" 149 - m = _AVATAR_CID_RE.search(url) 150 - return m.group(1) if m else "" 151 - 152 - 153 - def _parse_ts(s: str) -> float: 154 - from datetime import datetime 155 - try: 156 - return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() * 1000 157 - except Exception: 158 - return 0 159 - 160 - 161 - def should_hide(labels: list | None) -> bool: 162 - """matches worker's shouldHide (src/index.ts:133) — only bsky mod service labels.""" 163 - if not labels: 164 - return False 165 - now = time.time() * 1000 166 - for l in labels: 167 - if l.get("neg"): 168 - continue 169 - if l.get("exp") and _parse_ts(l["exp"]) <= now: 170 - continue 171 - if l.get("src") == BSKY_MOD_DID and l.get("val") in MOD_HIDE_VALS: 172 - return True 173 - return False 174 - 175 - 176 - # --- main --- 177 - 178 - def main(): 179 - parser = argparse.ArgumentParser(description="bulk profile backfill via getProfiles") 180 - parser.add_argument("--dry-run", action="store_true", help="query + fetch but don't write") 181 - parser.add_argument("--limit", type=int, default=0, help="stop after N actors (0 = all)") 182 - parser.add_argument("--offset", type=int, default=0, help="start from rowid offset") 183 - args = parser.parse_args() 184 - 185 - turso_url = get_turso_url() 186 - turso_token = get_turso_token() 187 - 188 - # stats 189 - total_queried = 0 190 - total_enriched = 0 191 - total_hidden = 0 192 - total_missing = 0 # DIDs not returned by getProfiles 193 - offset = args.offset 194 - 195 - print(f"backfill-profiles: {'DRY RUN' if args.dry_run else 'LIVE'}") 196 - if args.limit: 197 - print(f" limit: {args.limit} actors") 198 - if args.offset: 199 - print(f" starting at offset: {args.offset}") 200 - 201 - while True: 202 - # check limit 203 - if args.limit and total_queried >= args.limit: 204 - break 205 - 206 - page_limit = PAGE_SIZE 207 - if args.limit: 208 - page_limit = min(PAGE_SIZE, args.limit - total_queried) 209 - 210 - rows = turso_query( 211 - "SELECT did FROM actors WHERE labels = '[]' ORDER BY rowid ASC LIMIT ? OFFSET ?", 212 - [{"type": "integer", "value": str(page_limit)}, {"type": "integer", "value": str(offset)}], 213 - turso_url, turso_token, 214 - ) 215 - if not rows: 216 - break 217 - 218 - page_enriched = 0 219 - page_hidden = 0 220 - page_missing = 0 221 - stmts = [] 222 - 223 - for i in range(0, len(rows), BATCH_SIZE): 224 - batch = rows[i : i + BATCH_SIZE] 225 - dids = [r["did"] for r in batch] 226 - 227 - if i > 0 or total_queried > 0: 228 - time.sleep(DELAY) 229 - 230 - profiles = fetch_profiles(dids) 231 - returned_dids = {p["did"] for p in profiles} 232 - 233 - for p in profiles: 234 - handle = p.get("handle", "") 235 - display_name = p.get("displayName", "") 236 - avatar_cid = extract_avatar_cid(p.get("avatar", "")) 237 - labels = p.get("labels", []) 238 - hide = 1 if should_hide(labels) else 0 239 - 240 - if hide: 241 - page_hidden += 1 242 - 243 - stmts.append({ 244 - "sql": ( 245 - "UPDATE actors SET " 246 - "handle = COALESCE(NULLIF(?2, ''), handle), " 247 - "display_name = COALESCE(NULLIF(?3, ''), display_name), " 248 - "avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), " 249 - "labels = ?5, " 250 - "hidden = ?6, " 251 - "updated_at = unixepoch() " 252 - "WHERE did = ?1" 253 - ), 254 - "args": [ 255 - {"type": "text", "value": p["did"]}, 256 - {"type": "text", "value": handle}, 257 - {"type": "text", "value": display_name}, 258 - {"type": "text", "value": avatar_cid}, 259 - {"type": "text", "value": json.dumps(labels)}, 260 - {"type": "integer", "value": str(hide)}, 261 - ], 262 - }) 263 - page_enriched += 1 264 - 265 - # mark missing DIDs so cron skips them 266 - for did in dids: 267 - if did not in returned_dids: 268 - page_missing += 1 269 - stmts.append({ 270 - "sql": "UPDATE actors SET identity_checked_at = unixepoch() WHERE did = ?1", 271 - "args": [{"type": "text", "value": did}], 272 - }) 273 - 274 - # flush writes 275 - if stmts and not args.dry_run: 276 - for i in range(0, len(stmts), TURSO_BATCH_SIZE): 277 - batch = stmts[i : i + TURSO_BATCH_SIZE] 278 - if not turso_batch(batch, turso_url, turso_token): 279 - print(f"\n batch write failed at offset={offset}") 280 - return 281 - 282 - total_queried += len(rows) 283 - total_enriched += page_enriched 284 - total_hidden += page_hidden 285 - total_missing += page_missing 286 - offset += len(rows) 287 - 288 - tag = "dry" if args.dry_run else "live" 289 - sys.stdout.write( 290 - f"\r [{tag}] queried={total_queried} enriched={total_enriched} " 291 - f"hidden={total_hidden} missing={total_missing} " 292 - f"{DIM}offset={offset}{RESET} " 293 - ) 294 - sys.stdout.flush() 295 - 296 - if len(rows) < page_limit: 297 - break 298 - 299 - print(f"\n\ndone. queried={total_queried}, enriched={total_enriched}, hidden={total_hidden}, missing={total_missing}") 300 - 301 - 302 - if __name__ == "__main__": 303 - main()
-12
scripts/migrate-avatar-cid.sql
··· 1 - -- one-shot migration: convert full avatar URLs to CIDs 2 - -- strips the URL prefix (including DID) and @jpeg suffix, leaving just the CID 3 - -- 4 - -- run with: npx wrangler d1 execute typeahead --remote --file scripts/migrate-avatar-cid.sql 5 - 6 - UPDATE actors 7 - SET avatar_url = REPLACE( 8 - REPLACE(avatar_url, 'https://cdn.bsky.app/img/avatar/plain/' || did || '/', ''), 9 - '@jpeg', 10 - '' 11 - ) 12 - WHERE avatar_url LIKE 'https://cdn.bsky.app/%';
-347
scripts/migrate-to-turso.py
··· 1 - #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 - # /// script 3 - # requires-python = ">=3.12" 4 - # dependencies = [] 5 - # /// 6 - """ 7 - one-shot: migrate D1 data to Turso. 8 - 9 - reads D1 via Cloudflare REST API (fast), writes to Turso via pipeline API. 10 - uses ON CONFLICT upserts so re-running is safe (idempotent). 11 - 12 - prerequisites: 13 - turso db create typeahead 14 - turso db shell typeahead < schema.sql 15 - 16 - usage: 17 - TURSO_URL=libsql://... TURSO_AUTH_TOKEN=... ./scripts/migrate-to-turso.py 18 - TURSO_URL=libsql://... TURSO_AUTH_TOKEN=... ./scripts/migrate-to-turso.py --verify-only 19 - """ 20 - 21 - import argparse 22 - import json 23 - import os 24 - import re 25 - import subprocess 26 - import sys 27 - import urllib.request 28 - 29 - PAGE_SIZE = 1000 30 - TURSO_BATCH_SIZE = 200 # rows per Turso pipeline request 31 - 32 - PASS = "\033[32m✓\033[0m" 33 - FAIL = "\033[31m✗\033[0m" 34 - DIM = "\033[2m" 35 - RESET = "\033[0m" 36 - 37 - # D1 config (from wrangler.jsonc) 38 - CF_ACCOUNT_ID = "8feb33b5fb57ce2bc093bc6f4141f40a" 39 - CF_D1_DB_ID = "7e289d5d-dc50-46d1-8084-49aeec2679e5" 40 - D1_API = f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/d1/database/{CF_D1_DB_ID}/query" 41 - 42 - _ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") 43 - 44 - 45 - def get_cf_token() -> str: 46 - """read wrangler's OAuth token from its config file.""" 47 - config_path = os.path.expanduser( 48 - "~/Library/Preferences/.wrangler/config/default.toml" 49 - ) 50 - try: 51 - with open(config_path) as f: 52 - for line in f: 53 - if line.startswith("oauth_token"): 54 - return line.split("=", 1)[1].strip().strip('"') 55 - except FileNotFoundError: 56 - pass 57 - # fallback: try CLOUDFLARE_API_TOKEN env var 58 - token = os.environ.get("CLOUDFLARE_API_TOKEN", "") 59 - if token: 60 - return token 61 - print("error: no Cloudflare API token found", file=sys.stderr) 62 - print(" run `wrangler login` or set CLOUDFLARE_API_TOKEN", file=sys.stderr) 63 - sys.exit(1) 64 - 65 - 66 - def get_turso_url() -> str: 67 - url = os.environ.get("TURSO_URL", "") 68 - if not url: 69 - print("error: TURSO_URL not set", file=sys.stderr) 70 - sys.exit(1) 71 - return url.replace("libsql://", "https://") 72 - 73 - 74 - def get_turso_token() -> str: 75 - token = os.environ.get("TURSO_AUTH_TOKEN", "") 76 - if not token: 77 - print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr) 78 - sys.exit(1) 79 - return token 80 - 81 - 82 - def d1_query(sql: str, cf_token: str, params: list | None = None) -> list[dict]: 83 - """query D1 via Cloudflare REST API.""" 84 - payload: dict = {"sql": sql} 85 - if params: 86 - payload["params"] = params 87 - body = json.dumps(payload).encode() 88 - req = urllib.request.Request( 89 - D1_API, 90 - data=body, 91 - headers={ 92 - "Authorization": f"Bearer {cf_token}", 93 - "Content-Type": "application/json", 94 - }, 95 - ) 96 - try: 97 - with urllib.request.urlopen(req, timeout=30) as resp: 98 - data = json.loads(resp.read()) 99 - if data.get("success"): 100 - return data["result"][0]["results"] 101 - print(f" D1 API error: {data.get('errors')}", file=sys.stderr) 102 - return [] 103 - except urllib.error.HTTPError as e: 104 - body_text = e.read().decode()[:300] 105 - print(f" D1 HTTP {e.code}: {body_text}", file=sys.stderr) 106 - return [] 107 - except Exception as e: 108 - print(f" D1 request failed: {e}", file=sys.stderr) 109 - return [] 110 - 111 - 112 - def d1_query_wrangler(sql: str) -> list[dict]: 113 - """fallback: query D1 via wrangler CLI.""" 114 - result = subprocess.run( 115 - [ 116 - "npx", "wrangler", "d1", "execute", "typeahead-db", 117 - "--remote", "--command", sql, "--json", 118 - ], 119 - capture_output=True, text=True, cwd=".", 120 - ) 121 - stdout = _ANSI_RE.sub("", result.stdout) 122 - bracket = stdout.find("[") 123 - if bracket == -1: 124 - return [] 125 - try: 126 - data = json.loads(stdout[bracket:]) 127 - return data[0]["results"] if data else [] 128 - except (json.JSONDecodeError, IndexError, KeyError): 129 - return [] 130 - 131 - 132 - def turso_batch(stmts: list[dict], turso_url: str, turso_token: str) -> bool: 133 - """execute a batch of statements against Turso via HTTP pipeline API.""" 134 - requests = [{"type": "execute", "stmt": s} for s in stmts] 135 - requests.append({"type": "close"}) 136 - 137 - body = json.dumps({"requests": requests}).encode() 138 - req = urllib.request.Request( 139 - f"{turso_url}/v3/pipeline", 140 - data=body, 141 - headers={ 142 - "Authorization": f"Bearer {turso_token}", 143 - "Content-Type": "application/json", 144 - }, 145 - ) 146 - try: 147 - with urllib.request.urlopen(req, timeout=60) as resp: 148 - result = json.loads(resp.read()) 149 - for r in result.get("results", []): 150 - if r.get("type") == "error": 151 - print(f" Turso error: {r.get('error', {}).get('message', 'unknown')}") 152 - return False 153 - return True 154 - except urllib.error.HTTPError as e: 155 - err_body = e.read().decode()[:300] 156 - print(f" Turso HTTP {e.code}: {err_body}", file=sys.stderr) 157 - return False 158 - except Exception as e: 159 - print(f" Turso request failed: {e}", file=sys.stderr) 160 - return False 161 - 162 - 163 - def turso_count(table: str, turso_url: str, turso_token: str) -> int | str: 164 - """get row count from Turso.""" 165 - body = json.dumps({ 166 - "requests": [ 167 - {"type": "execute", "stmt": {"sql": f"SELECT COUNT(*) AS cnt FROM {table}", "args": []}}, 168 - {"type": "close"}, 169 - ] 170 - }).encode() 171 - req = urllib.request.Request( 172 - f"{turso_url}/v3/pipeline", 173 - data=body, 174 - headers={ 175 - "Authorization": f"Bearer {turso_token}", 176 - "Content-Type": "application/json", 177 - }, 178 - ) 179 - try: 180 - with urllib.request.urlopen(req, timeout=15) as resp: 181 - result = json.loads(resp.read()) 182 - return int(result["results"][0]["response"]["result"]["rows"][0][0]["value"]) 183 - except Exception as e: 184 - return f"error: {e}" 185 - 186 - 187 - def progress(msg: str): 188 - """overwrite current line with progress.""" 189 - sys.stdout.write(f"\r {msg}") 190 - sys.stdout.flush() 191 - 192 - 193 - def migrate_actors(turso_url: str, turso_token: str, cf_token: str) -> int: 194 - print("\n--- actors ---") 195 - 196 - # get total for progress reporting 197 - count_rows = d1_query("SELECT COUNT(*) AS cnt FROM actors", cf_token) 198 - d1_total = count_rows[0]["cnt"] if count_rows else "?" 199 - print(f" D1 has {d1_total} actors") 200 - 201 - cursor = 0 202 - total = 0 203 - 204 - while True: 205 - rows = d1_query( 206 - "SELECT rowid, did, handle, display_name, avatar_url, updated_at, hidden " 207 - f"FROM actors WHERE rowid > {cursor} ORDER BY rowid ASC LIMIT {PAGE_SIZE}", 208 - cf_token, 209 - ) 210 - if not rows: 211 - break 212 - 213 - for i in range(0, len(rows), TURSO_BATCH_SIZE): 214 - batch = rows[i : i + TURSO_BATCH_SIZE] 215 - stmts = [] 216 - for r in batch: 217 - stmts.append({ 218 - "sql": ( 219 - "INSERT INTO actors (did, handle, display_name, avatar_url, updated_at, hidden) " 220 - "VALUES (?, ?, ?, ?, ?, ?) " 221 - "ON CONFLICT(did) DO UPDATE SET " 222 - "handle = COALESCE(NULLIF(excluded.handle, ''), actors.handle), " 223 - "display_name = COALESCE(NULLIF(excluded.display_name, ''), actors.display_name), " 224 - "avatar_url = COALESCE(NULLIF(excluded.avatar_url, ''), actors.avatar_url), " 225 - "updated_at = excluded.updated_at, hidden = excluded.hidden" 226 - ), 227 - "args": [ 228 - {"type": "text", "value": r["did"]}, 229 - {"type": "text", "value": r.get("handle") or ""}, 230 - {"type": "text", "value": r.get("display_name") or ""}, 231 - {"type": "text", "value": r.get("avatar_url") or ""}, 232 - {"type": "integer", "value": str(r.get("updated_at") or 0)}, 233 - {"type": "integer", "value": str(r.get("hidden") or 0)}, 234 - ], 235 - }) 236 - 237 - if not turso_batch(stmts, turso_url, turso_token): 238 - print(f"\n batch failed at cursor={cursor}") 239 - return total 240 - total += len(batch) 241 - 242 - cursor = rows[-1]["rowid"] 243 - pct = f" ({total * 100 // d1_total}%)" if isinstance(d1_total, int) else "" 244 - progress(f"{total}/{d1_total} actors{pct} {DIM}cursor={cursor}{RESET}") 245 - 246 - print(f"\r {PASS} actors: {total} rows" + " " * 30) 247 - return total 248 - 249 - 250 - def migrate_table( 251 - table: str, 252 - columns: list[str], 253 - col_types: list[str], 254 - turso_url: str, 255 - turso_token: str, 256 - cf_token: str, 257 - ) -> int: 258 - print(f"\n--- {table} ---") 259 - col_list = ", ".join(columns) 260 - rows = d1_query(f"SELECT {col_list} FROM {table}", cf_token) 261 - 262 - if not rows: 263 - print(f" {PASS} {table}: 0 rows (empty)") 264 - return 0 265 - 266 - total = 0 267 - for i in range(0, len(rows), TURSO_BATCH_SIZE): 268 - batch = rows[i : i + TURSO_BATCH_SIZE] 269 - stmts = [] 270 - placeholders = ", ".join("?" for _ in columns) 271 - sql = f"INSERT OR REPLACE INTO {table} ({col_list}) VALUES ({placeholders})" 272 - 273 - for r in batch: 274 - args = [] 275 - for col, ctype in zip(columns, col_types): 276 - val = r.get(col) or 0 277 - if ctype == "text": 278 - args.append({"type": "text", "value": str(val)}) 279 - elif ctype == "float": 280 - args.append({"type": "float", "value": float(val)}) 281 - else: 282 - args.append({"type": "integer", "value": str(int(val))}) 283 - stmts.append({"sql": sql, "args": args}) 284 - 285 - if not turso_batch(stmts, turso_url, turso_token): 286 - print(f" batch failed") 287 - return total 288 - total += len(batch) 289 - progress(f"{total}/{len(rows)} {table}") 290 - 291 - print(f"\r {PASS} {table}: {total} rows" + " " * 20) 292 - return total 293 - 294 - 295 - def verify_counts(turso_url: str, turso_token: str, cf_token: str): 296 - print("\n--- verification ---") 297 - tables = ["actors", "metrics", "snapshots"] 298 - for table in tables: 299 - d1_rows = d1_query(f"SELECT COUNT(*) AS cnt FROM {table}", cf_token) 300 - d1_count = d1_rows[0]["cnt"] if d1_rows else "?" 301 - turso_cnt = turso_count(table, turso_url, turso_token) 302 - match = str(d1_count) == str(turso_cnt) 303 - tag = PASS if match else FAIL 304 - print(f" [{tag}] {table}: D1={d1_count}, Turso={turso_cnt}") 305 - 306 - 307 - def main(): 308 - parser = argparse.ArgumentParser(description="migrate D1 → Turso") 309 - parser.add_argument("--verify-only", action="store_true", help="only compare row counts") 310 - args = parser.parse_args() 311 - 312 - cf_token = get_cf_token() 313 - turso_url = get_turso_url() 314 - turso_token = get_turso_token() 315 - 316 - # quick API check 317 - test = d1_query("SELECT 1 AS ok", cf_token) 318 - if not test: 319 - print("error: D1 API connection failed", file=sys.stderr) 320 - sys.exit(1) 321 - 322 - if args.verify_only: 323 - verify_counts(turso_url, turso_token, cf_token) 324 - return 325 - 326 - print("migrating D1 → Turso") 327 - 328 - migrate_actors(turso_url, turso_token, cf_token) 329 - migrate_table( 330 - "metrics", 331 - ["hour", "searches", "total_ms"], 332 - ["integer", "integer", "float"], 333 - turso_url, turso_token, cf_token, 334 - ) 335 - migrate_table( 336 - "snapshots", 337 - ["hour", "total", "with_handles", "with_avatars"], 338 - ["integer", "integer", "integer", "integer"], 339 - turso_url, turso_token, cf_token, 340 - ) 341 - 342 - verify_counts(turso_url, turso_token, cf_token) 343 - print("\ndone.") 344 - 345 - 346 - if __name__ == "__main__": 347 - main()