GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

reduce turso read usage: indexes, cron-only enrichment, batch profiles endpoint

- remove enrichActors from ingest handler (was running every ~5s, scanning
3.4M rows 3x per invocation). enrichment now runs only on hourly cron.
- add indexes for enrichment queries (partial indexes on identity_checked_at,
profile_checked_at) and sync queries (updated_at, did) to eliminate full
table scans.
- add tombstones.deleted_at index for sync tombstone queries.
- add /xrpc/app.bsky.actor.getProfiles endpoint (bsky-compatible, up to 25
actors by DID or handle, no metrics recording).
- bump enrichment batch sizes (identity 100→500, pds 100→500, profile 75→250).
- rewrite bulk-enrich.py to use client-side filtering (cheap rowid reads,
skip already-enriched rows, tiny write batches).
- downsize fly.io VM from 512mb to 256mb.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+123 -41
+1 -1
ingester/fly.toml
··· 29 29 processes = ['app'] 30 30 31 31 [[vm]] 32 - memory = '512mb' 32 + memory = '256mb' 33 33 cpu_kind = 'shared' 34 34 cpus = 1
+10
schema.sql
··· 16 16 CREATE INDEX IF NOT EXISTS idx_actors_handle ON actors(handle COLLATE NOCASE); 17 17 CREATE INDEX IF NOT EXISTS idx_actors_hidden ON actors(hidden) WHERE hidden != 0; 18 18 19 + -- sync index: incremental sync queries filter on updated_at 20 + CREATE INDEX IF NOT EXISTS idx_actors_updated_at ON actors(updated_at, did); 21 + 22 + -- enrichment indexes: avoid full-table scans in cron enrichment queries 23 + CREATE INDEX IF NOT EXISTS idx_actors_enrich_identity ON actors(identity_checked_at) WHERE handle = ''; 24 + CREATE INDEX IF NOT EXISTS idx_actors_enrich_pds ON actors(identity_checked_at) WHERE handle != '' AND pds = ''; 25 + CREATE INDEX IF NOT EXISTS idx_actors_enrich_profile ON actors(profile_checked_at) WHERE handle != '' AND (avatar_url = '' OR labels = '[]' OR created_at = ''); 26 + 19 27 CREATE VIRTUAL TABLE IF NOT EXISTS actors_fts USING fts5( 20 28 handle, display_name, 21 29 content='actors', content_rowid='rowid', ··· 74 82 did TEXT PRIMARY KEY, 75 83 deleted_at INTEGER NOT NULL 76 84 ); 85 + 86 + CREATE INDEX IF NOT EXISTS idx_tombstones_deleted_at ON tombstones(deleted_at); 77 87 78 88 CREATE TABLE IF NOT EXISTS mod_overrides ( 79 89 did TEXT PRIMARY KEY,
+32 -26
scripts/bulk-enrich.py
··· 4 4 # dependencies = [] 5 5 # /// 6 6 """ 7 - bulk enrich actors via bsky getProfiles API. 7 + bulk enrich actors missing handles or avatars via bsky getProfiles API. 8 8 9 - pages through all actors by rowid, calls getProfiles in concurrent batches, 10 - writes back handles, avatars, labels, createdAt, associated. 11 - 12 - writes in small batches (50 stmts) with 200ms pauses between to avoid 13 - saturating Turso and blocking production reads. 9 + only fetches+writes actors that actually need data (handle='' or avatar_url=''), 10 + skipping already-enriched rows entirely. writes in small batches with pauses. 14 11 15 12 usage: 16 13 TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/bulk-enrich.py ··· 29 26 from concurrent.futures import ThreadPoolExecutor, as_completed 30 27 31 28 BSKY_GET_PROFILES = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 32 - PAGE_SIZE = 500 # DIDs per Turso read page 29 + PAGE_SIZE = 500 # unenriched DIDs per page 33 30 BSKY_CONCURRENCY = 5 # concurrent getProfiles calls 34 - WRITE_BATCH = 50 # stmts per Turso write — keep write lock short 35 - WRITE_PAUSE = 0.2 # seconds between write batches — let reads through 31 + WRITE_BATCH = 25 # stmts per Turso write — tiny to minimize lock time 32 + WRITE_PAUSE = 0.05 # seconds between write batches 36 33 37 34 DIM = "\033[2m" 38 35 RESET = "\033[0m" ··· 79 76 "Authorization": f"Bearer {turso_token}", "Content-Type": "application/json", 80 77 }) 81 78 try: 82 - with urllib.request.urlopen(req, timeout=60) as resp: 79 + with urllib.request.urlopen(req, timeout=30) as resp: 83 80 json.loads(resp.read()) 84 81 return True 85 82 except Exception as e: ··· 156 153 turso_token = get_turso_token() 157 154 158 155 enriched = 0 159 - checked = 0 156 + skipped = 0 160 157 not_found = 0 161 158 t0 = time.time() 162 159 last_rowid = args.start_rowid 163 160 164 - print(f"bulk enriching (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH}, write_pause={WRITE_PAUSE}s)...") 161 + print(f"bulk enriching unenriched actors only (concurrency={BSKY_CONCURRENCY}, write_batch={WRITE_BATCH})...") 165 162 if args.dry_run: print(" DRY RUN — no writes") 166 163 if args.start_rowid: print(f" resuming from rowid {args.start_rowid}") 167 164 168 165 while True: 166 + # cheap rowid-paginated read — no filter scan, just walk forward 169 167 rows = turso_query( 170 - "SELECT rowid, did FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2", 168 + "SELECT rowid, did, handle, avatar_url FROM actors WHERE rowid > ?1 ORDER BY rowid ASC LIMIT ?2", 171 169 [{"type": "integer", "value": str(last_rowid)}, {"type": "integer", "value": str(PAGE_SIZE)}], 172 170 turso_url, turso_token, 173 171 ) 174 172 if not rows: 175 173 break 176 174 177 - dids = [r["did"] for r in rows] 178 175 last_rowid = int(rows[-1]["rowid"]) 179 176 177 + # filter client-side: only fetch profiles for rows missing data 178 + need = [r for r in rows if not r["handle"] or not r["avatar_url"]] 179 + skipped += len(rows) - len(need) 180 + 181 + if not need: 182 + elapsed = time.time() - t0 183 + print(f" skipped {len(rows)} already-enriched {DIM}rowid={last_rowid}{RESET}") 184 + continue 185 + 186 + dids = [r["did"] for r in need] 187 + 180 188 # getProfiles in concurrent batches of 25 181 189 batches = [dids[i:i+25] for i in range(0, len(dids), 25)] 182 190 pending = [] ··· 193 201 result = fetch_profiles(batch) 194 202 if result == "rate_limited": 195 203 print(" still limited, skipping batch") 196 - checked += len(batch) 204 + not_found += len(batch) 197 205 continue 198 206 199 - checked += len(batch) 200 207 returned = {p["did"] for p in result} 201 208 not_found += len(batch) - len(returned) 202 209 ··· 204 211 pending.append(profile_to_stmt(p)) 205 212 enriched += 1 206 213 207 - # write in small batches with pauses between each 214 + # write in small batches 208 215 if pending and not args.dry_run: 209 216 write_t0 = time.time() 210 - write_chunks = 0 217 + ok = 0 211 218 for i in range(0, len(pending), WRITE_BATCH): 212 - turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token) 213 - write_chunks += 1 219 + if turso_batch_write(pending[i:i+WRITE_BATCH], turso_url, turso_token): 220 + ok += len(pending[i:i+WRITE_BATCH]) 214 221 time.sleep(WRITE_PAUSE) 215 222 write_ms = int((time.time() - write_t0) * 1000) 216 - print(f" wrote {len(pending)} stmts in {write_chunks} chunks ({write_ms}ms)") 223 + print(f" wrote {ok}/{len(pending)} stmts ({write_ms}ms)") 217 224 218 225 elapsed = time.time() - t0 219 - rate = checked / elapsed if elapsed > 0 else 0 226 + rate = enriched / elapsed if elapsed > 0 else 0 220 227 tag = "dry" if args.dry_run else "live" 221 228 print( 222 - f" [{tag}] checked={checked:,} enriched={enriched:,} " 223 - f"not_found={not_found:,} " 224 - f"{DIM}{rate:.0f} dids/s rowid={last_rowid}{RESET}" 229 + f" [{tag}] enriched={enriched:,} skipped={skipped:,} not_found={not_found:,} " 230 + f"{DIM}{rate:.0f}/s rowid={last_rowid}{RESET}" 225 231 ) 226 232 227 233 elapsed = time.time() - t0 228 - print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}, checked={checked:,}") 234 + print(f"\ndone in {elapsed:.0f}s. enriched={enriched:,}, not_found={not_found:,}") 229 235 230 236 231 237 if __name__ == "__main__":
+3 -3
src/enrichment.ts
··· 56 56 const { results: identityRows } = await db.prepare( 57 57 `SELECT did FROM actors 58 58 WHERE handle = '' AND identity_checked_at < unixepoch() - 3600 59 - ORDER BY identity_checked_at ASC LIMIT 100` 59 + ORDER BY identity_checked_at ASC LIMIT 500` 60 60 ).all<{ did: string }>(); 61 61 62 62 if (identityRows && identityRows.length > 0) { ··· 102 102 `SELECT did FROM actors 103 103 WHERE handle != '' AND pds = '' 104 104 AND identity_checked_at < unixepoch() - 86400 105 - ORDER BY identity_checked_at ASC LIMIT 100` 105 + ORDER BY identity_checked_at ASC LIMIT 500` 106 106 ).all<{ did: string }>(); 107 107 108 108 if (pdsRows && pdsRows.length > 0) { ··· 151 151 WHERE handle != '' 152 152 AND (avatar_url = '' OR labels = '[]' OR created_at = '') 153 153 AND profile_checked_at < unixepoch() - 3600 154 - ORDER BY profile_checked_at ASC LIMIT 75` 154 + ORDER BY profile_checked_at ASC LIMIT 250` 155 155 ).all<{ did: string; pds: string }>(); 156 156 157 157 if (profileRows && profileRows.length > 0) {
-9
src/handlers/ingest.ts
··· 2 2 import type { Env, IngestEvent } from "../types"; 3 3 import { clientIP, json } from "../utils"; 4 4 import { recordActorDelta } from "../metrics"; 5 - import { enrichActors } from "../enrichment"; 6 5 7 6 export async function handleIngest( 8 7 request: Request, ··· 85 84 // best-effort — don't fail the ingest if KV write fails (e.g. quota) 86 85 } 87 86 } 88 - 89 - ctx.waitUntil( 90 - enrichActors(db, env).then(({ resolved, enriched }) => { 91 - if (resolved > 0 || enriched > 0) { 92 - return recordActorDelta(db, { handles: resolved, avatars: enriched }); 93 - } 94 - }), 95 - ); 96 87 97 88 return json({ ok: true, ingested: events.length }); 98 89 }
+63
src/handlers/profiles.ts
··· 1 + import type { TursoDB } from "../db"; 2 + import type { ActorRow, Env } from "../types"; 3 + import { CORS_HEADERS } from "../types"; 4 + import { json, avatarUrl } from "../utils"; 5 + 6 + /** Batch profile lookup by DID or handle — compatible with app.bsky.actor.getProfiles response shape. */ 7 + export async function handleGetProfiles( 8 + request: Request, 9 + db: TursoDB, 10 + _env: Env, 11 + ): Promise<Response> { 12 + const url = new URL(request.url); 13 + const actors = url.searchParams.getAll("actors"); 14 + 15 + if (actors.length === 0) { 16 + return json({ error: "InvalidRequest", message: "at least one 'actors' param required" }, 400); 17 + } 18 + if (actors.length > 25) { 19 + return json({ error: "InvalidRequest", message: "max 25 actors per request" }, 400); 20 + } 21 + 22 + // split into DIDs and handles 23 + const dids: string[] = []; 24 + const handles: string[] = []; 25 + for (const a of actors) { 26 + if (a.startsWith("did:")) dids.push(a); 27 + else handles.push(a); 28 + } 29 + 30 + // build query: lookup by DID and/or handle in one shot 31 + const conditions: string[] = []; 32 + const binds: string[] = []; 33 + let idx = 1; 34 + if (dids.length > 0) { 35 + const ph = dids.map(() => `?${idx++}`).join(", "); 36 + conditions.push(`did IN (${ph})`); 37 + binds.push(...dids); 38 + } 39 + if (handles.length > 0) { 40 + const ph = handles.map(() => `?${idx++}`).join(", "); 41 + conditions.push(`handle COLLATE NOCASE IN (${ph})`); 42 + binds.push(...handles); 43 + } 44 + 45 + const { results } = await db.prepare( 46 + `SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 47 + FROM actors WHERE (${conditions.join(" OR ")}) AND hidden = 0` 48 + ).bind(...binds).all<ActorRow>(); 49 + 50 + const profiles = (results || []).map((r) => ({ 51 + did: r.did, 52 + handle: r.handle, 53 + ...(r.display_name ? { displayName: r.display_name } : {}), 54 + ...(r.avatar_url ? { avatar: avatarUrl(r.did, r.avatar_url, r.pds) } : {}), 55 + ...(r.associated && r.associated !== '{}' ? { associated: JSON.parse(r.associated) } : {}), 56 + labels: JSON.parse(r.labels || '[]'), 57 + ...(r.created_at ? { createdAt: r.created_at } : {}), 58 + })); 59 + 60 + return new Response(JSON.stringify({ profiles }), { 61 + headers: { "Content-Type": "application/json", ...CORS_HEADERS }, 62 + }); 63 + }
+14 -2
src/index.ts
··· 2 2 import { CORS_HEADERS } from "./types"; 3 3 import { getDb } from "./db"; 4 4 import { clientIP, json, html } from "./utils"; 5 - import { recordSnapshot, materializeStats } from "./enrichment"; 6 - import { enrichActors } from "./enrichment"; 5 + import { recordSnapshot, materializeStats, enrichActors } from "./enrichment"; 7 6 import { refreshModeration } from "./cron"; 8 7 import { handleSearch } from "./handlers/search"; 9 8 import { handleIngest } from "./handlers/ingest"; 10 9 import { handleDelete, handleCursor, handleRequestIndexing, handleModOverrideSet, handleModOverrideDelete, handleModOverrideList } from "./handlers/admin"; 10 + import { handleGetProfiles } from "./handlers/profiles"; 11 11 import { handleStats } from "./handlers/stats"; 12 12 import { indexPage } from "./pages/home"; 13 13 import { docsPage } from "./pages/docs"; ··· 71 71 return json({ error: "rate limited" }, 429); 72 72 } 73 73 return handleSearch(request, db, env, ctx); 74 + } 75 + 76 + if ( 77 + pathname === "/xrpc/app.bsky.actor.getProfiles" && 78 + request.method === "GET" 79 + ) { 80 + const ip = clientIP(request); 81 + const { success } = await env.RATE_LIMITER.limit({ key: ip }); 82 + if (!success) { 83 + return json({ error: "rate limited" }, 429); 84 + } 85 + return handleGetProfiles(request, db, env); 74 86 } 75 87 76 88 if (pathname === "/admin/ingest" && request.method === "POST") {