GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

dead-actor cleanup: cron prevention + bulk removal script

refreshModeration now deletes empty-handle actors missing from getProfiles.
new cleanup-dead-actors.py script for bulk removal with --dry-run/--limit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+247 -3
+227
scripts/cleanup-dead-actors.py
··· 1 + #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [] 5 + # /// 6 + """ 7 + delete dead actors (empty handle, not returned by bsky getProfiles). 8 + 9 + pages through actors with handle='', verifies each batch via getProfiles: 10 + - returned WITH handle → UPDATE handle (was temporary resolution issue) 11 + - returned WITHOUT handle → leave alone (deactivated but bsky knows it) 12 + - NOT returned → DELETE (truly dead) 13 + 14 + usage: 15 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/cleanup-dead-actors.py --dry-run --limit 100 16 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/cleanup-dead-actors.py --limit 1000 17 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/cleanup-dead-actors.py --start-rowid 50000 18 + """ 19 + 20 + import argparse 21 + import json 22 + import os 23 + import sys 24 + import time 25 + import urllib.request 26 + import urllib.error 27 + from concurrent.futures import ThreadPoolExecutor, as_completed 28 + 29 + BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 30 + PAGE_SIZE = 500 31 + BSKY_CONCURRENCY = 5 32 + DELETE_BATCH = 100 33 + DELETE_PAUSE = 0.1 34 + 35 + DIM = "\033[2m" 36 + RESET = "\033[0m" 37 + RED = "\033[31m" 38 + GREEN = "\033[32m" 39 + 40 + 41 + def get_turso_url() -> str: 42 + url = os.environ.get("TURSO_URL", "") 43 + if not url: 44 + print("error: TURSO_URL not set", file=sys.stderr); sys.exit(1) 45 + return url.replace("libsql://", "https://") 46 + 47 + 48 + def get_turso_token() -> str: 49 + token = os.environ.get("TURSO_AUTH_TOKEN", "") 50 + if not token: 51 + print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr); sys.exit(1) 52 + return token 53 + 54 + 55 + def turso_query(sql, args, turso_url, turso_token): 56 + body = json.dumps({"requests": [ 57 + {"type": "execute", "stmt": {"sql": sql, "args": args}}, 58 + {"type": "close"}, 59 + ]}).encode() 60 + req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 61 + "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 62 + }) 63 + with urllib.request.urlopen(req, timeout=30) as resp: 64 + result = json.loads(resp.read()) 65 + res = result["results"][0] 66 + if res.get("type") == "error": 67 + print(f" turso error: {res['error']['message']}", file=sys.stderr) 68 + return [] 69 + cols = [c["name"] for c in res["response"]["result"]["cols"]] 70 + return [{c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)} 71 + for row in res["response"]["result"]["rows"]] 72 + 73 + 74 + def turso_batch_write(stmts, turso_url, turso_token): 75 + reqs = [{"type": "execute", "stmt": s} for s in stmts] 76 + reqs.append({"type": "close"}) 77 + body = json.dumps({"requests": reqs}).encode() 78 + req = urllib.request.Request(f"{turso_url}/v3/pipeline", data=body, headers={ 79 + "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 80 + }) 81 + try: 82 + with urllib.request.urlopen(req, timeout=60) as resp: 83 + json.loads(resp.read()) 84 + return True 85 + except Exception as e: 86 + print(f"\n turso write failed: {e}", file=sys.stderr) 87 + return False 88 + 89 + 90 + def fetch_profiles(dids): 91 + params = "&".join(f"actors={urllib.request.quote(d)}" for d in dids) 92 + req = urllib.request.Request( 93 + f"{BSKY_GET_PROFILES}?{params}", 94 + headers={"User-Agent": "typeahead-cleanup/1.0"}, 95 + ) 96 + try: 97 + with urllib.request.urlopen(req, timeout=15) as resp: 98 + return json.loads(resp.read()).get("profiles", []) 99 + except urllib.error.HTTPError as e: 100 + if e.code == 429: return "rate_limited" 101 + return [] 102 + except Exception: 103 + return [] 104 + 105 + 106 + def main(): 107 + parser = argparse.ArgumentParser(description="delete dead actors with empty handles") 108 + parser.add_argument("--dry-run", action="store_true", help="log what would happen, no writes") 109 + parser.add_argument("--limit", type=int, default=0, help="cap actors processed (0 = unlimited)") 110 + parser.add_argument("--start-rowid", type=int, default=0, help="resume from this rowid") 111 + args = parser.parse_args() 112 + 113 + turso_url = get_turso_url() 114 + turso_token = get_turso_token() 115 + 116 + checked = 0 117 + deleted = 0 118 + updated = 0 119 + alive = 0 120 + t0 = time.time() 121 + last_rowid = args.start_rowid 122 + 123 + tag = "dry" if args.dry_run else "live" 124 + print(f"cleanup dead actors (concurrency={BSKY_CONCURRENCY}, delete_batch={DELETE_BATCH})") 125 + if args.dry_run: print(" DRY RUN — no writes") 126 + if args.limit: print(f" limit: {args.limit:,}") 127 + if args.start_rowid: print(f" resuming from rowid {args.start_rowid}") 128 + 129 + while True: 130 + remaining = args.limit - checked if args.limit else PAGE_SIZE 131 + if args.limit and remaining <= 0: 132 + break 133 + page_size = min(PAGE_SIZE, remaining) if args.limit else PAGE_SIZE 134 + 135 + rows = turso_query( 136 + "SELECT rowid, did, display_name FROM actors WHERE handle = '' AND rowid > ?1 ORDER BY rowid ASC LIMIT ?2", 137 + [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(page_size)}], 138 + turso_url, turso_token, 139 + ) 140 + if not rows: 141 + break 142 + 143 + dids = [r["did"] for r in rows] 144 + did_to_row = {r["did"]: r for r in rows} 145 + last_rowid = int(rows[-1]["rowid"]) 146 + 147 + batches = [dids[i:i+25] for i in range(0, len(dids), 25)] 148 + to_delete = [] 149 + to_update = [] 150 + 151 + with ThreadPoolExecutor(max_workers=BSKY_CONCURRENCY) as pool: 152 + futures = {pool.submit(fetch_profiles, b): b for b in batches} 153 + for future in as_completed(futures): 154 + batch_dids = futures[future] 155 + result = future.result() 156 + 157 + if result == "rate_limited": 158 + print(f"\n rate limited — pausing 30s...") 159 + time.sleep(30) 160 + result = fetch_profiles(batch_dids) 161 + if result == "rate_limited": 162 + print(" still limited, skipping batch") 163 + checked += len(batch_dids) 164 + continue 165 + 166 + checked += len(batch_dids) 167 + returned = {p["did"]: p for p in result} 168 + 169 + for did in batch_dids: 170 + if did in returned: 171 + p = returned[did] 172 + handle = p.get("handle", "") 173 + if handle: 174 + to_update.append({"did": did, "handle": handle}) 175 + else: 176 + alive += 1 # deactivated but bsky knows it 177 + else: 178 + row = did_to_row[did] 179 + to_delete.append({"did": did, "display_name": row.get("display_name", "")}) 180 + 181 + # log deletions 182 + for d in to_delete: 183 + name = d["display_name"] or "(no name)" 184 + print(f" {RED}DELETE{RESET} {d['did']} {DIM}{name}{RESET}") 185 + 186 + for u in to_update: 187 + print(f" {GREEN}UPDATE{RESET} {u['did']} → {u['handle']}") 188 + 189 + if not args.dry_run: 190 + # batch deletes 191 + if to_delete: 192 + del_stmts = [ 193 + {"sql": "DELETE FROM actors WHERE did = ?1", 194 + "args": [{"type": "text", "value": d["did"]}]} 195 + for d in to_delete 196 + ] 197 + for i in range(0, len(del_stmts), DELETE_BATCH): 198 + turso_batch_write(del_stmts[i:i+DELETE_BATCH], turso_url, turso_token) 199 + time.sleep(DELETE_PAUSE) 200 + 201 + # batch updates (handle recovery) 202 + if to_update: 203 + upd_stmts = [ 204 + {"sql": "UPDATE actors SET handle = ?2 WHERE did = ?1", 205 + "args": [{"type": "text", "value": u["did"]}, 206 + {"type": "text", "value": u["handle"]}]} 207 + for u in to_update 208 + ] 209 + turso_batch_write(upd_stmts, turso_url, turso_token) 210 + 211 + deleted += len(to_delete) 212 + updated += len(to_update) 213 + 214 + elapsed = time.time() - t0 215 + rate = checked / elapsed if elapsed > 0 else 0 216 + print( 217 + f" [{tag}] checked={checked:,} deleted={deleted:,} " 218 + f"updated={updated:,} alive={alive:,} " 219 + f"{DIM}{rate:.0f} dids/s rowid={last_rowid}{RESET}" 220 + ) 221 + 222 + elapsed = time.time() - t0 223 + print(f"\ndone in {elapsed:.0f}s. checked={checked:,}, deleted={deleted:,}, updated={updated:,}, alive={alive:,}") 224 + 225 + 226 + if __name__ == "__main__": 227 + main()
+20 -3
src/cron.ts
··· 2 2 import type { Env } from "./types"; 3 3 import { BSKY_GET_PROFILES_URL } from "./types"; 4 4 import { extractProfileFields } from "./utils"; 5 + import { recordActorDelta } from "./metrics"; 5 6 6 7 /** refresh moderation labels, walking the full index over multiple cron runs */ 7 8 export async function refreshModeration(db: TursoDB, env: Env): Promise<void> { ··· 10 11 const cursor = cursorStr ? Number(cursorStr) : 0; 11 12 12 13 const { results } = await db.prepare( 13 - "SELECT rowid, did FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT 1000" 14 - ).bind(cursor).all<{ rowid: number; did: string }>(); 14 + "SELECT rowid, did, handle FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT 1000" 15 + ).bind(cursor).all<{ rowid: number; did: string; handle: string }>(); 15 16 16 17 if (!results || results.length === 0) { 17 18 // wrapped around — reset cursor for next run ··· 22 23 23 24 let checked = 0; 24 25 let changed = 0; 26 + let deleted = 0; 25 27 26 28 // batch into groups of 25 (getProfiles limit), ~200ms pause between calls 27 29 for (let i = 0; i < results.length; i += 25) { ··· 65 67 const batchResults = await db.batch(stmts); 66 68 changed += batchResults.filter((r) => r.meta.changes > 0).length; 67 69 } 70 + 71 + // dead-actor cleanup: DIDs not returned by getProfiles with empty handles 72 + const returnedDids = new Set(profiles.map((p: any) => p.did)); 73 + const deadDids = batch.filter((r) => !returnedDids.has(r.did) && r.handle === ""); 74 + if (deadDids.length > 0) { 75 + const delStmts: Stmt[] = deadDids.map((r) => 76 + db.prepare("DELETE FROM actors WHERE did = ?1").bind(r.did) 77 + ); 78 + await db.batch(delStmts); 79 + deleted += deadDids.length; 80 + await recordActorDelta(db, { actors: -deadDids.length }).catch(() => {}); 81 + } 68 82 } catch { 69 83 // best-effort — skip failures 70 84 } ··· 73 87 // save cursor at end of this page 74 88 const lastRowid = results[results.length - 1].rowid; 75 89 await env.KV.put("mod_cursor", String(lastRowid)); 76 - console.log(JSON.stringify({ event: "moderation_refresh", checked, changed, cursor: lastRowid })); 90 + if (deleted > 0) { 91 + console.log(JSON.stringify({ event: "mod_cleanup", deleted })); 92 + } 93 + console.log(JSON.stringify({ event: "moderation_refresh", checked, changed, deleted, cursor: lastRowid })); 77 94 }