GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 176 lines 6.2 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7targeted backfill: fetch follows/followers from specific accounts, 8enrich via getProfiles, upsert into turso. 9 10usage: 11 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/targeted-backfill.py 12""" 13 14import json 15import os 16import re 17import sys 18import time 19import urllib.request 20import urllib.error 21 22BSKY = "https://public.api.bsky.app/xrpc" 23WRITE_BATCH = 50 24WRITE_PAUSE = 0.2 25 26def env(k): 27 v = os.environ.get(k, "") 28 if not v: print(f"error: {k} not set", file=sys.stderr); sys.exit(1) 29 return v 30 31def turso_batch_write(stmts, url, token): 32 reqs = [{"type": "execute", "stmt": s} for s in stmts] 33 reqs.append({"type": "close"}) 34 body = json.dumps({"requests": reqs}).encode() 35 req = urllib.request.Request(f"{url}/v3/pipeline", data=body, headers={ 36 "Authorization": f"Bearer {token}", "Content-Type": "application/json", 37 }) 38 try: 39 with urllib.request.urlopen(req, timeout=60) as resp: 40 json.loads(resp.read()) 41 return True 42 except Exception as e: 43 print(f" turso write failed: {e}", file=sys.stderr) 44 return False 45 46def fetch_all(endpoint, actor, key): 47 """page through a list endpoint, return all items""" 48 items = [] 49 cursor = None 50 while True: 51 url = f"{BSKY}/{endpoint}?actor={actor}&limit=100" 52 if cursor: url += f"&cursor={cursor}" 53 req = urllib.request.Request(url) 54 with urllib.request.urlopen(req, timeout=15) as resp: 55 data = json.loads(resp.read()) 56 batch = data.get(key, []) 57 items.extend(batch) 58 cursor = data.get("cursor") 59 print(f" fetched {len(items)} {key}...", end="\r") 60 if not cursor or not batch: 61 break 62 time.sleep(0.1) 63 print() 64 return items 65 66def fetch_profiles(dids): 67 params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids) 68 req = urllib.request.Request( 69 f"{BSKY}/app.bsky.actor.getProfiles?{params}", 70 headers={"User-Agent": "typeahead-enrich/1.0"}, 71 ) 72 try: 73 with urllib.request.urlopen(req, timeout=15) as resp: 74 return json.loads(resp.read()).get("profiles", []) 75 except urllib.error.HTTPError as e: 76 if e.code == 429: return "rate_limited" 77 return [] 78 except Exception: 79 return [] 80 81def extract_avatar_cid(url): 82 if not url: return "" 83 m = re.search(r'/([^/]+?)(?:@[a-z]+)?$', url) 84 return m.group(1) if m else "" 85 86def clean_associated(assoc): 87 if not assoc or not isinstance(assoc, dict): return "{}" 88 clean = {k: v for k, v in assoc.items() if v not in (0, False, None)} 89 return json.dumps(clean) if clean else "{}" 90 91def profile_to_stmt(p): 92 hide_vals = {"!hide", "!takedown", "!suspend", "spam"} 93 mod_did = "did:plc:ar7c4by46qjdydhdevvrndac" 94 hidden = 0 95 for lbl in (p.get("labels") or []): 96 if lbl.get("val", "") in hide_vals or (lbl.get("val") == "!no-unauthenticated" and lbl.get("src") == mod_did): 97 hidden = 1; break 98 return { 99 "sql": """INSERT INTO actors (did, handle, display_name, avatar_url, labels, hidden, created_at, associated, updated_at) 100 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, unixepoch()) 101 ON CONFLICT(did) DO UPDATE SET 102 handle = COALESCE(NULLIF(?2, ''), actors.handle), 103 display_name = COALESCE(NULLIF(?3, ''), actors.display_name), 104 avatar_url = COALESCE(NULLIF(?4, ''), actors.avatar_url), 105 labels = ?5, hidden = ?6, 106 created_at = COALESCE(NULLIF(?7, ''), actors.created_at), 107 associated = COALESCE(NULLIF(?8, '{}'), actors.associated), 108 profile_checked_at = unixepoch(), 109 updated_at = unixepoch()""", 110 "args": [ 111 {"type": "text", "value": p["did"]}, 112 {"type": "text", "value": p.get("handle", "")}, 113 {"type": "text", "value": p.get("displayName", "")}, 114 {"type": "text", "value": extract_avatar_cid(p.get("avatar", ""))}, 115 {"type": "text", "value": json.dumps(p.get("labels", []))}, 116 {"type": "integer", "value": str(hidden)}, 117 {"type": "text", "value": p.get("createdAt", "")}, 118 {"type": "text", "value": clean_associated(p.get("associated"))}, 119 ], 120 } 121 122def main(): 123 turso_url = env("TURSO_URL").replace("libsql://", "https://") 124 turso_token = env("TURSO_AUTH_TOKEN") 125 126 # collect DIDs 127 dids = set() 128 129 print("atprotocol.dev → following:") 130 follows = fetch_all("app.bsky.graph.getFollows", "atprotocol.dev", "follows") 131 for f in follows: 132 dids.add(f["did"]) 133 134 print("atmosphereconf.org → followers:") 135 followers = fetch_all("app.bsky.graph.getFollowers", "atmosphereconf.org", "followers") 136 for f in followers: 137 dids.add(f["did"]) 138 139 dids = list(dids) 140 print(f"\n{len(dids)} unique DIDs to enrich") 141 142 # enrich via getProfiles in batches of 25 143 enriched = 0 144 t0 = time.time() 145 all_stmts = [] 146 147 for i in range(0, len(dids), 25): 148 batch = dids[i:i+25] 149 result = fetch_profiles(batch) 150 if result == "rate_limited": 151 print(" rate limited — pausing 30s...") 152 time.sleep(30) 153 result = fetch_profiles(batch) 154 if result == "rate_limited": 155 print(" still limited, skipping") 156 continue 157 for p in result: 158 all_stmts.append(profile_to_stmt(p)) 159 enriched += 1 160 print(f" enriched {enriched}/{len(dids)}...", end="\r") 161 time.sleep(0.05) 162 163 print(f"\n\nenriched {enriched} profiles, writing to turso...") 164 165 # write in small batches 166 for i in range(0, len(all_stmts), WRITE_BATCH): 167 chunk = all_stmts[i:i+WRITE_BATCH] 168 turso_batch_write(chunk, turso_url, turso_token) 169 print(f" wrote {min(i+WRITE_BATCH, len(all_stmts))}/{len(all_stmts)}", end="\r") 170 time.sleep(WRITE_PAUSE) 171 172 elapsed = time.time() - t0 173 print(f"\n\ndone in {elapsed:.0f}s. enriched {enriched} actors.") 174 175if __name__ == "__main__": 176 main()