GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

add backfill-pds script for one-time PDS population

reads DIDs from a file, resolves PDS via slingshot in parallel,
writes back to Turso. used to bootstrap PDS data for the 44K actors
that enrichment tried but bsky refused to serve.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+145
+145
scripts/backfill-pds.py
··· 1 + #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [] 5 + # /// 6 + """ 7 + backfill PDS endpoints via slingshot for actors missing them. 8 + 9 + reads DIDs from a file (one per line), resolves PDS via slingshot, 10 + writes back to Turso. 11 + 12 + generate the input file with: 13 + turso db shell typeahead << 'SQL' | tail -n +2 | sed 's/ *$//' > /tmp/pds-dids.txt 14 + SELECT did FROM actors 15 + WHERE handle != '' AND pds = '' AND profile_checked_at > 0 AND avatar_url = '' 16 + ORDER BY rowid ASC; 17 + SQL 18 + 19 + usage: 20 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-pds.py /tmp/pds-dids.txt 21 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-pds.py --dry-run /tmp/pds-dids.txt 22 + """ 23 + 24 + import argparse 25 + import json 26 + import os 27 + import sys 28 + import time 29 + import urllib.request 30 + import urllib.error 31 + from concurrent.futures import ThreadPoolExecutor, as_completed 32 + 33 + SLINGSHOT_URL = "https://slingshot.microcosm.blue/xrpc/blue.microcosm.identity.resolveMiniDoc" 34 + BATCH_SIZE = 500 35 + CONCURRENCY = 20 36 + WRITE_BATCH = 50 37 + WRITE_PAUSE = 0.2 38 + 39 + DIM = "\033[2m" 40 + RESET = "\033[0m" 41 + 42 + 43 + def get_turso_url() -> str: 44 + url = os.environ.get("TURSO_URL", "") 45 + if not url: 46 + print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1) 47 + return url.replace("libsql://", "https://") 48 + 49 + 50 + def get_turso_token() -> str: 51 + token = os.environ.get("TURSO_AUTH_TOKEN", "") 52 + if not token: 53 + print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1) 54 + return token 55 + 56 + 57 + def turso_batch_write(stmts, turso_url, turso_token): 58 + reqs = [{"type": "execute", "stmt": s} for s in stmts] 59 + reqs.append({"type": "close"}) 60 + body = json.dumps({"requests": reqs}).encode() 61 + req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 62 + "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 63 + }) 64 + try: 65 + with urllib.request.urlopen(req, timeout=60) as resp: 66 + json.loads(resp.read()) 67 + return True 68 + except Exception as e: 69 + print(f"\n turso write failed: {e}", file=sys.stderr) 70 + return False 71 + 72 + 73 + def resolve_pds(did): 74 + url = f"{SLINGSHOT_URL}?identifier={urllib.request.quote(did)}" 75 + req = urllib.request.Request(url, headers={"User-Agent": "typeahead-backfill/1.0"}) 76 + try: 77 + with urllib.request.urlopen(req, timeout=10) as resp: 78 + data = json.loads(resp.read()) 79 + return did, data.get("pds", "") 80 + except Exception: 81 + return did, "" 82 + 83 + 84 + def main(): 85 + parser = argparse.ArgumentParser() 86 + parser.add_argument("file", help="file with one DID per line") 87 + parser.add_argument("--dry-run", action="store_true") 88 + args = parser.parse_args() 89 + 90 + turso_url = get_turso_url() 91 + turso_token = get_turso_token() 92 + 93 + with open(args.file) as f: 94 + all_dids = [line.strip() for line in f if line.strip().startswith("did:")] 95 + 96 + filled = 0 97 + checked = 0 98 + t0 = time.time() 99 + 100 + print(f"backfilling PDS for {len(all_dids):,} DIDs (concurrency={CONCURRENCY})...") 101 + if args.dry_run: print(" DRY RUN — no writes") 102 + 103 + for batch_start in range(0, len(all_dids), BATCH_SIZE): 104 + batch = all_dids[batch_start:batch_start + BATCH_SIZE] 105 + 106 + pending = [] 107 + with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool: 108 + futures = {pool.submit(resolve_pds, did): did for did in batch} 109 + for future in as_completed(futures): 110 + checked += 1 111 + did, pds = future.result() 112 + if pds: 113 + pending.append({ 114 + "sql": "UPDATE actors SET pds = ?1, identity_checked_at = unixepoch() WHERE did = ?2", 115 + "args": [ 116 + {"type": "text", "value": pds}, 117 + {"type": "text", "value": did}, 118 + ], 119 + }) 120 + filled += 1 121 + else: 122 + pending.append({ 123 + "sql": "UPDATE actors SET identity_checked_at = unixepoch() WHERE did = ?1", 124 + "args": [{"type": "text", "value": did}], 125 + }) 126 + 127 + if pending and not args.dry_run: 128 + for i in range(0, len(pending), WRITE_BATCH): 129 + turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token) 130 + time.sleep(WRITE_PAUSE) 131 + 132 + elapsed = time.time() - t0 133 + rate = checked / elapsed if elapsed > 0 else 0 134 + tag = "dry" if args.dry_run else "live" 135 + print( 136 + f" [{tag}] checked={checked:,} filled={filled:,} " 137 + f"{DIM}{rate:.0f} dids/s ({checked}/{len(all_dids)}){RESET}" 138 + ) 139 + 140 + elapsed = time.time() - t0 141 + print(f"\ndone in {elapsed:.0f}s. filled={filled:,}, checked={checked:,}") 142 + 143 + 144 + if __name__ == "__main__": 145 + main()