GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

labels support: schema, ingester fixes, backfill scripts

- add labels column to actors, hidden index to schema
- remove stale !no-unauthenticated self-label check from ingester
(only API-based paths can correctly determine hidden)
- fix toArrayList() → written() memory leak in ingester HTTP calls
- add RSS logging to ingester flush loop
- disable HTTP keep-alive (fly.io proxy compat)
- add add-labels-column.sql migration and backfill-profiles.py script
- update architecture docs for labels, enrichment phase 2, moderation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+362 -32
+34 -13
docs/architecture.md
··· 33 33 - **phase 1 — identity**: slingshot resolves DID → handle + PDS endpoint. 34 34 100 DIDs/run, 20 concurrent. attempt tracking (`identity_checked_at`) 35 35 backs off failures for 1 hour. 36 - - **phase 2 — profile**: PDS-native `getRecord` fetches avatar CID + 37 - display_name directly from the actor's PDS. 20/run. attempt tracking 38 - (`profile_checked_at`) backs off failures for 1 hour. 36 + - **phase 2 — profile + labels**: `getProfiles` batch call (25 DIDs/call, 37 + 75/run) fetches avatar, display_name, and labels in one shot. also 38 + recomputes `hidden` from full label data. attempt tracking 39 + (`profile_checked_at`) backs off failures for 1 hour. actors not 40 + returned by getProfiles are marked to avoid retrying. 39 41 - lease-coordinated via KV (`enrich_lock`, 30s TTL) — no stampede from 40 42 overlapping ingest batches. 41 43 - converges to zero work as gaps fill (gap-driven queries return fewer ··· 76 78 77 79 - identity phase queries actors with `handle = ''` and 78 80 `identity_checked_at < now - 1hr` 79 - - profile phase queries actors with `handle != ''`, `avatar_url = ''`, 80 - `pds != ''`, and `profile_checked_at < now - 1hr` 81 + - profile phase queries actors with `handle != ''` and 82 + (`avatar_url = ''` OR `labels = '[]'`), `profile_checked_at < now - 1hr` 81 83 - as actors get enriched, these queries return fewer rows 82 84 - system quiesces when all actors are resolved or backed off 83 85 84 - AppView (public.api.bsky.app) is used only for moderation label checks. 85 - profile data comes from PDS-native fetches. 86 + AppView (public.api.bsky.app) is used for both enrichment (phase 2 87 + getProfiles) and moderation label checks (hourly cron). phase 2 gets 88 + avatar, displayName, labels, and hidden in one batch call. 86 89 87 90 ### read path 88 91 89 92 search query -> cache API (hit?) -> FTS5 prefix match -> reconstruct avatar 90 - URLs from DID + CID -> return `{did, handle, displayName?, avatar?}` 93 + URLs from DID + CID -> return `{did, handle, displayName?, avatar?, labels}` 91 94 92 95 ## storage 93 96 ··· 98 101 display_name TEXT -- ~20 bytes 99 102 avatar_url TEXT -- ~59 bytes (CID only, URL reconstructed at query time) 100 103 hidden INTEGER -- 1 byte 104 + labels TEXT -- 2 bytes ('[]') for ~92% of actors; 105 + -- ~256 bytes per label for the ~8% that have them 106 + -- stored as raw JSON array matching bsky's API shape 101 107 updated_at INTEGER -- 8 bytes 102 108 pds TEXT -- ~40 bytes (PDS endpoint URL) 103 109 identity_checked_at INTEGER -- 8 bytes (last slingshot attempt) 104 110 profile_checked_at INTEGER -- 8 bytes (last PDS profile attempt) 105 111 106 - plus FTS5 index overhead. roughly ~320 bytes/row total. 112 + plus FTS5 index overhead. roughly ~320 bytes/row for unlabeled actors, 113 + ~580 bytes/row for labeled ones (~8% of index). at 1.33M actors this 114 + adds ~30MB over the baseline. 107 115 108 116 storage is [Turso](https://turso.tech) (hosted libSQL). previously used 109 117 Cloudflare D1 but migrated to Turso to avoid D1's 10GB hard limit and ··· 120 128 content, not identity. bluesky's own public typeahead API returns these 121 129 accounts, and so do we. 122 130 123 - the hourly cron refreshes moderation labels by walking the full index over 124 - multiple runs (1000 actors/run via `mod_cursor` in KV). it piggybacks 125 - enrichment data (handle, display_name, avatar) from the same getProfiles 126 - response at no additional API cost. 131 + the full labels array (matching bsky's API shape) is stored per actor and 132 + returned in search results. clients can use this for rendering decisions 133 + (e.g. blur avatars for `porn`/`sexual` labels). the ingester only sees 134 + self-labels from profile records — it cannot see bsky mod labels, so it 135 + does not set `hidden`. only API-based paths (cron, backfill, request-indexing) 136 + set `hidden` from full label data. 137 + 138 + the hourly cron refreshes moderation labels and persists the full labels 139 + array by walking the full index over multiple runs (1000 actors/run via 140 + `mod_cursor` in KV). it piggybacks enrichment data (handle, display_name, 141 + avatar) from the same getProfiles response at no additional API cost. 127 142 128 143 ## scripts 129 144 ··· 133 148 - `scripts/add-enrichment-columns.sql` — one-shot migration adding pds, 134 149 identity_checked_at, profile_checked_at columns (run once before deploying 135 150 enrichment pipeline) 151 + - `scripts/add-labels-column.sql` — one-shot migration adding labels column 152 + to actors table (run once before deploying labels support) 153 + - `scripts/backfill-profiles.py` — bulk enrichment: walks actors table and 154 + populates labels, handle, display_name, avatar_url via getProfiles API. 155 + also recomputes hidden from full label data, fixing actors incorrectly 156 + hidden by stale !no-unauthenticated logic. 136 157 - `scripts/add-actor-deltas.sql` — one-shot migration adding actor_deltas 137 158 table for 5-min granularity delta tracking 138 159 - `scripts/migrate-avatar-cid.sql` — one-shot migration from full avatar
+20 -18
ingester/src/main.zig
··· 82 82 handle: ?[]const u8 = null, 83 83 display_name: ?[]const u8 = null, 84 84 avatar_cid: ?[]const u8 = null, 85 - hidden: ?bool = null, 86 85 }; 87 86 88 87 const IngestHandler = struct { ··· 195 194 } 196 195 } 197 196 198 - // check self-labels for !no-unauthenticated 199 - event.hidden = blk: { 200 - const values = zat.json.getArray(record, "labels.values") orelse break :blk false; 201 - for (values) |item| { 202 - const val = zat.json.getString(item, "val") orelse continue; 203 - if (mem.eql(u8, val, "!no-unauthenticated")) break :blk true; 204 - } 205 - break :blk false; 206 - }; 197 + // ingester only sees self-labels from profile records — it can never 198 + // correctly determine hidden. let refreshModeration cron handle it. 207 199 208 200 self.buffer.append(self.allocator, event) catch return; 209 201 } else { ··· 249 241 ); 250 242 if (ok) { 251 243 self.total_ingested += batch_end; 252 - log.info("+{d} actors (total: {d}, pending: {d}) cursor={d}", .{ 244 + const rss = getRssKB(); 245 + log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{ 253 246 batch_end, self.total_ingested, 254 - self.buffer.items.len - batch_end, self.pending_cursor, 247 + self.buffer.items.len - batch_end, self.pending_cursor, rss, 255 248 }); 256 249 // shift remaining items forward 257 250 if (batch_end < self.buffer.items.len) { ··· 310 303 } 311 304 }; 312 305 306 + fn getRssKB() u64 { 307 + const f = std.fs.openFileAbsolute("/proc/self/statm", .{}) catch return 0; 308 + defer f.close(); 309 + var buf: [128]u8 = undefined; 310 + const n = f.read(&buf) catch return 0; 311 + // statm: size resident shared text lib data dt (in pages) 312 + var it = std.mem.splitScalar(u8, buf[0..n], ' '); 313 + _ = it.next(); // size 314 + const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0; 315 + return rss_pages * 4; // pages are 4KB on Linux 316 + } 317 + 313 318 fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool { 314 319 var output: std.Io.Writer.Allocating = .init(transport.allocator); 315 320 defer output.deinit(); ··· 326 331 var auth_buf: [256]u8 = undefined; 327 332 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 328 333 329 - const body = output.toArrayList(); 330 - 331 334 const result = transport.fetch(.{ 332 335 .url = url, 333 336 .method = .POST, 334 - .payload = body.items, 337 + .payload = output.written(), 335 338 .authorization = auth, 336 339 }) catch return false; 337 340 defer transport.allocator.free(result.body); ··· 356 359 var auth_buf: [256]u8 = undefined; 357 360 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 358 361 359 - const body = output.toArrayList(); 360 - 361 362 const result = transport.fetch(.{ 362 363 .url = url, 363 364 .method = .POST, 364 - .payload = body.items, 365 + .payload = output.written(), 365 366 .authorization = auth, 366 367 }) catch return false; 367 368 defer transport.allocator.free(result.body); ··· 414 415 log.info("typeahead ingester → {s}", .{config.worker_url}); 415 416 416 417 var transport = HttpTransport.init(allocator); 418 + transport.keep_alive = false; 417 419 defer transport.deinit(); 418 420 419 421 const cursor = fetchCursor(&transport, config);
+4 -1
schema.sql
··· 5 5 avatar_url TEXT DEFAULT '', -- stores CID only (e.g. bafkrei...); reconstruct URL at query time 6 6 updated_at INTEGER NOT NULL DEFAULT (unixepoch()), 7 7 hidden INTEGER NOT NULL DEFAULT 0, 8 + labels TEXT NOT NULL DEFAULT '[]', 8 9 pds TEXT DEFAULT '', 9 10 identity_checked_at INTEGER DEFAULT 0, 10 11 profile_checked_at INTEGER DEFAULT 0 11 12 ); 12 13 13 14 CREATE INDEX IF NOT EXISTS idx_actors_handle ON actors(handle COLLATE NOCASE); 15 + CREATE INDEX IF NOT EXISTS idx_actors_hidden ON actors(hidden) WHERE hidden != 0; 14 16 15 17 CREATE VIRTUAL TABLE IF NOT EXISTS actors_fts USING fts5( 16 18 handle, display_name, ··· 46 48 hour INTEGER PRIMARY KEY, 47 49 total INTEGER NOT NULL DEFAULT 0, 48 50 with_handles INTEGER NOT NULL DEFAULT 0, 49 - with_avatars INTEGER NOT NULL DEFAULT 0 51 + with_avatars INTEGER NOT NULL DEFAULT 0, 52 + hidden INTEGER NOT NULL DEFAULT 0 50 53 ); 51 54 52 55 CREATE TABLE IF NOT EXISTS actor_deltas (
+1
scripts/add-labels-column.sql
··· 1 + ALTER TABLE actors ADD COLUMN labels TEXT NOT NULL DEFAULT '[]';
+303
scripts/backfill-profiles.py
··· 1 + #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [] 5 + # /// 6 + """ 7 + bulk enrichment: walk actors table and populate labels, handle, display_name, 8 + avatar_url via bsky's public getProfiles API (25 DIDs per call). 9 + 10 + targets actors with labels='[]' (default). also recomputes hidden from full 11 + label data, fixing actors incorrectly hidden by stale !no-unauthenticated logic. 12 + 13 + usage: 14 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py 15 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --dry-run --limit 100 16 + TURSO_URL=... TURSO_AUTH_TOKEN=... ./scripts/backfill-profiles.py --limit 1000 --offset 50000 17 + """ 18 + 19 + import argparse 20 + import json 21 + import os 22 + import re 23 + import sys 24 + import time 25 + import urllib.request 26 + 27 + GET_PROFILES_URL = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles" 28 + BSKY_MOD_DID = "did:plc:ar7c4by46qjdydhdevvrndac" 29 + MOD_HIDE_VALS = {"!hide", "!takedown", "spam"} 30 + 31 + BATCH_SIZE = 25 # getProfiles limit 32 + PAGE_SIZE = 500 # actors per Turso query 33 + TURSO_BATCH_SIZE = 200 # statements per pipeline call 34 + DELAY = 0.3 # seconds between getProfiles calls 35 + 36 + DIM = "\033[2m" 37 + RESET = "\033[0m" 38 + 39 + 40 + # --- turso helpers (from migrate-to-turso.py) --- 41 + 42 + def get_turso_url() -> str: 43 + url = os.environ.get("TURSO_URL", "") 44 + if not url: 45 + print("error: TURSO_URL not set", file=sys.stderr) 46 + sys.exit(1) 47 + return url.replace("libsql://", "https://") 48 + 49 + 50 + def get_turso_token() -> str: 51 + token = os.environ.get("TURSO_AUTH_TOKEN", "") 52 + if not token: 53 + print("error: TURSO_AUTH_TOKEN not set", file=sys.stderr) 54 + sys.exit(1) 55 + return token 56 + 57 + 58 + def turso_query(sql: str, args: list, turso_url: str, turso_token: str) -> list[dict]: 59 + """single query via pipeline API, returns rows as dicts.""" 60 + body = json.dumps({ 61 + "requests": [ 62 + {"type": "execute", "stmt": {"sql": sql, "args": args}}, 63 + {"type": "close"}, 64 + ] 65 + }).encode() 66 + req = urllib.request.Request( 67 + f"{turso_url}/v3/pipeline", 68 + data=body, 69 + headers={ 70 + "Authorization": f"Bearer {turso_token}", 71 + "Content-Type": "application/json", 72 + }, 73 + ) 74 + try: 75 + with urllib.request.urlopen(req, timeout=30) as resp: 76 + result = json.loads(resp.read()) 77 + res = result["results"][0] 78 + if res.get("type") == "error": 79 + print(f" turso error: {res['error']['message']}", file=sys.stderr) 80 + return [] 81 + cols = [c["name"] for c in res["response"]["result"]["cols"]] 82 + rows = [] 83 + for row in res["response"]["result"]["rows"]: 84 + rows.append({c: (v["value"] if v["type"] != "null" else None) for c, v in zip(cols, row)}) 85 + return rows 86 + except Exception as e: 87 + print(f" turso query failed: {e}", file=sys.stderr) 88 + return [] 89 + 90 + 91 + def turso_batch(stmts: list[dict], turso_url: str, turso_token: str) -> bool: 92 + """execute a batch of statements against Turso via HTTP pipeline API.""" 93 + requests = [{"type": "execute", "stmt": s} for s in stmts] 94 + requests.append({"type": "close"}) 95 + 96 + body = json.dumps({"requests": requests}).encode() 97 + req = urllib.request.Request( 98 + f"{turso_url}/v3/pipeline", 99 + data=body, 100 + headers={ 101 + "Authorization": f"Bearer {turso_token}", 102 + "Content-Type": "application/json", 103 + }, 104 + ) 105 + try: 106 + with urllib.request.urlopen(req, timeout=60) as resp: 107 + result = json.loads(resp.read()) 108 + for r in result.get("results", []): 109 + if r.get("type") == "error": 110 + print(f" turso error: {r.get('error', {}).get('message', 'unknown')}") 111 + return False 112 + return True 113 + except urllib.error.HTTPError as e: 114 + err_body = e.read().decode()[:300] 115 + print(f" turso HTTP {e.code}: {err_body}", file=sys.stderr) 116 + return False 117 + except Exception as e: 118 + print(f" turso request failed: {e}", file=sys.stderr) 119 + return False 120 + 121 + 122 + # --- bsky helpers --- 123 + 124 + def fetch_profiles(dids: list[str]) -> list[dict]: 125 + params = "&".join(f"actors={d}" for d in dids) 126 + url = f"{GET_PROFILES_URL}?{params}" 127 + req = urllib.request.Request(url, headers={"User-Agent": "typeahead-profile-backfill/1.0"}) 128 + try: 129 + with urllib.request.urlopen(req, timeout=15) as resp: 130 + data = json.loads(resp.read()) 131 + return data.get("profiles", []) 132 + except urllib.error.HTTPError as e: 133 + if e.code == 429: 134 + print("\n rate limited — pausing 60s") 135 + time.sleep(60) 136 + return fetch_profiles(dids) # retry once 137 + print(f"\n HTTP {e.code}") 138 + return [] 139 + except Exception as e: 140 + print(f"\n error: {e}") 141 + return [] 142 + 143 + 144 + _AVATAR_CID_RE = re.compile(r"/([^/]+)@jpeg$") 145 + 146 + 147 + def extract_avatar_cid(url: str) -> str: 148 + """extract CID from bsky CDN avatar URL — matches worker's extractAvatarCid.""" 149 + m = _AVATAR_CID_RE.search(url) 150 + return m.group(1) if m else "" 151 + 152 + 153 + def _parse_ts(s: str) -> float: 154 + from datetime import datetime 155 + try: 156 + return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() * 1000 157 + except Exception: 158 + return 0 159 + 160 + 161 + def should_hide(labels: list | None) -> bool: 162 + """matches worker's shouldHide (src/index.ts:133) — only bsky mod service labels.""" 163 + if not labels: 164 + return False 165 + now = time.time() * 1000 166 + for l in labels: 167 + if l.get("neg"): 168 + continue 169 + if l.get("exp") and _parse_ts(l["exp"]) <= now: 170 + continue 171 + if l.get("src") == BSKY_MOD_DID and l.get("val") in MOD_HIDE_VALS: 172 + return True 173 + return False 174 + 175 + 176 + # --- main --- 177 + 178 + def main(): 179 + parser = argparse.ArgumentParser(description="bulk profile backfill via getProfiles") 180 + parser.add_argument("--dry-run", action="store_true", help="query + fetch but don't write") 181 + parser.add_argument("--limit", type=int, default=0, help="stop after N actors (0 = all)") 182 + parser.add_argument("--offset", type=int, default=0, help="start from rowid offset") 183 + args = parser.parse_args() 184 + 185 + turso_url = get_turso_url() 186 + turso_token = get_turso_token() 187 + 188 + # stats 189 + total_queried = 0 190 + total_enriched = 0 191 + total_hidden = 0 192 + total_missing = 0 # DIDs not returned by getProfiles 193 + offset = args.offset 194 + 195 + print(f"backfill-profiles: {'DRY RUN' if args.dry_run else 'LIVE'}") 196 + if args.limit: 197 + print(f" limit: {args.limit} actors") 198 + if args.offset: 199 + print(f" starting at offset: {args.offset}") 200 + 201 + while True: 202 + # check limit 203 + if args.limit and total_queried >= args.limit: 204 + break 205 + 206 + page_limit = PAGE_SIZE 207 + if args.limit: 208 + page_limit = min(PAGE_SIZE, args.limit - total_queried) 209 + 210 + rows = turso_query( 211 + "SELECT did FROM actors WHERE labels = '[]' ORDER BY rowid ASC LIMIT ? OFFSET ?", 212 + [{"type": "integer", "value": str(page_limit)}, {"type": "integer", "value": str(offset)}], 213 + turso_url, turso_token, 214 + ) 215 + if not rows: 216 + break 217 + 218 + page_enriched = 0 219 + page_hidden = 0 220 + page_missing = 0 221 + stmts = [] 222 + 223 + for i in range(0, len(rows), BATCH_SIZE): 224 + batch = rows[i : i + BATCH_SIZE] 225 + dids = [r["did"] for r in batch] 226 + 227 + if i > 0 or total_queried > 0: 228 + time.sleep(DELAY) 229 + 230 + profiles = fetch_profiles(dids) 231 + returned_dids = {p["did"] for p in profiles} 232 + 233 + for p in profiles: 234 + handle = p.get("handle", "") 235 + display_name = p.get("displayName", "") 236 + avatar_cid = extract_avatar_cid(p.get("avatar", "")) 237 + labels = p.get("labels", []) 238 + hide = 1 if should_hide(labels) else 0 239 + 240 + if hide: 241 + page_hidden += 1 242 + 243 + stmts.append({ 244 + "sql": ( 245 + "UPDATE actors SET " 246 + "handle = COALESCE(NULLIF(?2, ''), handle), " 247 + "display_name = COALESCE(NULLIF(?3, ''), display_name), " 248 + "avatar_url = COALESCE(NULLIF(?4, ''), avatar_url), " 249 + "labels = ?5, " 250 + "hidden = ?6, " 251 + "updated_at = unixepoch() " 252 + "WHERE did = ?1" 253 + ), 254 + "args": [ 255 + {"type": "text", "value": p["did"]}, 256 + {"type": "text", "value": handle}, 257 + {"type": "text", "value": display_name}, 258 + {"type": "text", "value": avatar_cid}, 259 + {"type": "text", "value": json.dumps(labels)}, 260 + {"type": "integer", "value": str(hide)}, 261 + ], 262 + }) 263 + page_enriched += 1 264 + 265 + # mark missing DIDs so cron skips them 266 + for did in dids: 267 + if did not in returned_dids: 268 + page_missing += 1 269 + stmts.append({ 270 + "sql": "UPDATE actors SET identity_checked_at = unixepoch() WHERE did = ?1", 271 + "args": [{"type": "text", "value": did}], 272 + }) 273 + 274 + # flush writes 275 + if stmts and not args.dry_run: 276 + for i in range(0, len(stmts), TURSO_BATCH_SIZE): 277 + batch = stmts[i : i + TURSO_BATCH_SIZE] 278 + if not turso_batch(batch, turso_url, turso_token): 279 + print(f"\n batch write failed at offset={offset}") 280 + return 281 + 282 + total_queried += len(rows) 283 + total_enriched += page_enriched 284 + total_hidden += page_hidden 285 + total_missing += page_missing 286 + offset += len(rows) 287 + 288 + tag = "dry" if args.dry_run else "live" 289 + sys.stdout.write( 290 + f"\r [{tag}] queried={total_queried} enriched={total_enriched} " 291 + f"hidden={total_hidden} missing={total_missing} " 292 + f"{DIM}offset={offset}{RESET} " 293 + ) 294 + sys.stdout.flush() 295 + 296 + if len(rows) < page_limit: 297 + break 298 + 299 + print(f"\n\ndone. queried={total_queried}, enriched={total_enriched}, hidden={total_hidden}, missing={total_missing}") 300 + 301 + 302 + if __name__ == "__main__": 303 + main()