GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 303 lines 10 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7bulk enrichment: walk actors table and populate labels, handle, display_name, 8avatar_url via bsky's public getProfiles API (25 DIDs per call). 9 10targets actors with labels='[]' (default). also recomputes hidden from full 11label data, fixing actors incorrectly hidden by stale !no-unauthenticated logic. 12 13usage: 14 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py 15 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --dry-run --limit 100 16 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --limit 1000 --offset 50000 17""" 18 19import argparse 20import json 21import os 22import re 23import sys 24import time 25import urllib.request 26 27GET_PROFILES_URL = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 28BSKY_MOD_DID = "did:plc:ar7c4by46qjdydhdevvrndac" 29MOD_HIDE_VALS = {"!hide", "!takedown", "spam"} 30 31BATCH_SIZE = 25 # getProfiles limit 32PAGE_SIZE = 500 # actors per Turso query 33TURSO_BATCH_SIZE = 200 # statements per pipeline call 34DELAY = 0.3 # seconds between getProfiles calls 35 36DIM = "\033[2m" 37RESET = "\033[0m" 38 39 40# --- turso helpers (from migrate-to-turso.py) --- 41 42def get_turso_url() -> str: 43 url = os.environ.get("TURSO_URL", "") 44 if not url: 45 print("error: TURSO_URL not set", file=sys.stderr) 46 sys.exit(1) 47 return url.replace("libsql://", "https://") 48 49 50def get_turso_token() -> str: 51 token = os.environ.get("TURSO_AUTH_TOKEN", "") 52 if not token: 53 print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr) 54 sys.exit(1) 55 return token 56 57 58def turso_query(sql: str, args: list, turso_url: str, turso_token: str) -> list[dict]: 59 """single query via pipeline API, returns rows as dicts.""" 60 body = json.dumps({ 61 "requests": [ 62 {"type": "execute", "stmt": {"sql": sql, "args": args}}, 63 {"type": "close"}, 64 ] 65 }).encode() 66 req = urllib.request.Request( 67 f"{turso_url}/v3/pipeline", 68 data=body, 69 headers={ 70 "Authorization": f"Bearer {turso_token}", 71 "Content-Type": "application/json", 72 }, 73 ) 74 try: 75 with urllib.request.urlopen(req, timeout=30) as resp: 76 result = json.loads(resp.read()) 77 res = result["results"][0] 78 if res.get("type") == "error": 79 print(f" turso error: {res['error']['message']}", file=sys.stderr) 80 return [] 81 cols = [c["name"] for c in res["response"]["result"]["cols"]] 82 rows = [] 83 for row in res["response"]["result"]["rows"]: 84 rows.append({c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)}) 85 return rows 86 except Exception as e: 87 print(f" turso query failed: {e}", file=sys.stderr) 88 return [] 89 90 91def turso_batch(stmts: list[dict], turso_url: str, turso_token: str) -> bool: 92 """execute a batch of statements against Turso via HTTP pipeline API.""" 93 requests = [{"type": "execute", "stmt": s} for s in stmts] 94 requests.append({"type": "close"}) 95 96 body = json.dumps({"requests": requests}).encode() 97 req = urllib.request.Request( 98 f"{turso_url}/v3/pipeline", 99 data=body, 100 headers={ 101 "Authorization": f"Bearer {turso_token}", 102 "Content-Type": "application/json", 103 }, 104 ) 105 try: 106 with urllib.request.urlopen(req, timeout=60) as resp: 107 result = json.loads(resp.read()) 108 for r in result.get("results", []): 109 if r.get("type") == "error": 110 print(f" turso error: {r.get('error', {}).get('message', 'unknown')}") 111 return False 112 return True 113 except urllib.error.HTTPError as e: 114 err_body = e.read().decode()[:300] 115 print(f" turso HTTP {e.code}: {err_body}", file=sys.stderr) 116 return False 117 except Exception as e: 118 print(f" turso request failed: {e}", file=sys.stderr) 119 return False 120 121 122# --- bsky helpers --- 123 124def fetch_profiles(dids: list[str]) -> list[dict]: 125 params = "&".join(f"actors={d}" for d in dids) 126 url = f"{GET_PROFILES_URL}?{params}" 127 req = urllib.request.Request(url, headers={"User-Agent": "typeahead-profile-backfill/1.0"}) 128 try: 129 with urllib.request.urlopen(req, timeout=15) as resp: 130 data = json.loads(resp.read()) 131 return data.get("profiles", []) 132 except urllib.error.HTTPError as e: 133 if e.code == 429: 134 print("\n rate limited — pausing 60s") 135 time.sleep(60) 136 return fetch_profiles(dids) # retry once 137 print(f"\n HTTP {e.code}") 138 return [] 139 except Exception as e: 140 print(f"\n error: {e}") 141 return [] 142 143 144_AVATAR_CID_RE = re.compile(r"/([^/]+)@jpeg$") 145 146 147def extract_avatar_cid(url: str) -> str: 148 """extract CID from bsky CDN avatar URL — matches worker's extractAvatarCid.""" 149 m = _AVATAR_CID_RE.search(url) 150 return m.group(1) if m else "" 151 152 153def _parse_ts(s: str) -> float: 154 from datetime import datetime 155 try: 156 return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() * 1000 157 except Exception: 158 return 0 159 160 161def should_hide(labels: list | None) -> bool: 162 """matches worker's shouldHide (src/index.ts:133) — only bsky mod service labels.""" 163 if not labels: 164 return False 165 now = time.time() * 1000 166 for l in labels: 167 if l.get("neg"): 168 continue 169 if l.get("exp") and _parse_ts(l["exp"]) <= now: 170 continue 171 if l.get("src") == BSKY_MOD_DID and l.get("val") in MOD_HIDE_VALS: 172 return True 173 return False 174 175 176# --- main --- 177 178def main(): 179 parser = argparse.ArgumentParser(description="bulk profile backfill via getProfiles") 180 parser.add_argument("--dry-run", action="store_true", help="query + fetch but don't write") 181 parser.add_argument("--limit", type=int, default=0, help="stop after N actors (0 = all)") 182 parser.add_argument("--offset", type=int, default=0, help="start from rowid offset") 183 args = parser.parse_args() 184 185 turso_url = get_turso_url() 186 turso_token = get_turso_token() 187 188 # stats 189 total_queried = 0 190 total_enriched = 0 191 total_hidden = 0 192 total_missing = 0 # DIDs not returned by getProfiles 193 offset = args.offset 194 195 print(f"backfill-profiles: {'DRY RUN' if args.dry_run else 'LIVE'}") 196 if args.limit: 197 print(f" limit: {args.limit} actors") 198 if args.offset: 199 print(f" starting at offset: {args.offset}") 200 201 while True: 202 # check limit 203 if args.limit and total_queried >= args.limit: 204 break 205 206 page_limit = PAGE_SIZE 207 if args.limit: 208 page_limit = min(PAGE_SIZE, args.limit - total_queried) 209 210 rows = turso_query( 211 "SELECT did FROM actors WHERE labels = '[]' ORDER BY rowid ASC LIMIT ? OFFSET ?", 212 [{"type": "integer", "value": str(page_limit)}, {"type": "integer", "value": str(offset)}], 213 turso_url, turso_token, 214 ) 215 if not rows: 216 break 217 218 page_enriched = 0 219 page_hidden = 0 220 page_missing = 0 221 stmts = [] 222 223 for i in range(0, len(rows), BATCH_SIZE): 224 batch = rows[i : i + BATCH_SIZE] 225 dids = [r["did"] for r in batch] 226 227 if i > 0 or total_queried > 0: 228 time.sleep(DELAY) 229 230 profiles = fetch_profiles(dids) 231 returned_dids = {p["did"] for p in profiles} 232 233 for p in profiles: 234 handle = p.get("handle", "") 235 display_name = p.get("displayName", "") 236 avatar_cid = extract_avatar_cid(p.get("avatar", "")) 237 labels = p.get("labels", []) 238 hide = 1 if should_hide(labels) else 0 239 240 if hide: 241 page_hidden += 1 242 243 stmts.append({ 244 "sql": ( 245 "UPDATE actors SET " 246 "handle = COALESCE(NULLIF(?2, ''), handle), " 247 "display_name = COALESCE(NULLIF(?3, ''), display_name), " 248 "avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), " 249 "labels = ?5, " 250 "hidden = ?6, " 251 "updated_at = unixepoch() " 252 "WHERE did = ?1" 253 ), 254 "args": [ 255 {"type": "text", "value": p["did"]}, 256 {"type": "text", "value": handle}, 257 {"type": "text", "value": display_name}, 258 {"type": "text", "value": avatar_cid}, 259 {"type": "text", "value": json.dumps(labels)}, 260 {"type": "integer", "value": str(hide)}, 261 ], 262 }) 263 page_enriched += 1 264 265 # mark missing DIDs so cron skips them 266 for did in dids: 267 if did not in returned_dids: 268 page_missing += 1 269 stmts.append({ 270 "sql": "UPDATE actors SET identity_checked_at = unixepoch() WHERE did = ?1", 271 "args": [{"type": "text", "value": did}], 272 }) 273 274 # flush writes 275 if stmts and not args.dry_run: 276 for i in range(0, len(stmts), TURSO_BATCH_SIZE): 277 batch = stmts[i : i + TURSO_BATCH_SIZE] 278 if not turso_batch(batch, turso_url, turso_token): 279 print(f"\n batch write failed at offset={offset}") 280 return 281 282 total_queried += len(rows) 283 total_enriched += page_enriched 284 total_hidden += page_hidden 285 total_missing += page_missing 286 offset += len(rows) 287 288 tag = "dry" if args.dry_run else "live" 289 sys.stdout.write( 290 f"\r [{tag}] queried={total_queried} enriched={total_enriched} " 291 f"hidden={total_hidden} missing={total_missing} " 292 f"{DIM}offset={offset}{RESET} " 293 ) 294 sys.stdout.flush() 295 296 if len(rows) < page_limit: 297 break 298 299 print(f"\n\ndone. queried={total_queried}, enriched={total_enriched}, hidden={total_hidden}, missing={total_missing}") 300 301 302if __name__ == "__main__": 303 main()