GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

add tangled profile support, domain indexing, and pds-aware avatars

- subscribe to sh.tangled.actor.profile in ingester (jetstream)
- fetchProfileFromPds tries tangled profile collection as fallback
- store bare CIDs, reconstruct avatar URLs at search time using pds column
- sync pds column from turso to local SQLite for zig search backend
- enrichment: try PDS fallback for non-bsky PDS actors (not just takedowns)
- add admin auth bypass for /request-indexing rate limiter
- add scripts/index-domain.py for bulk domain handle discovery + indexing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+815 -170
+25
ingester/build.zig
··· 15 15 .sqlite3 = &[_][]const u8{ "-std=c99", "-DSQLITE_ENABLE_FTS5" }, 16 16 }); 17 17 18 + const logfire = b.dependency("logfire", .{ 19 + .target = target, 20 + .optimize = optimize, 21 + }); 22 + 18 23 const exe = b.addExecutable(.{ 19 24 .name = "typeahead-ingester", 20 25 .root_module = b.createModule(.{ ··· 24 29 .imports = &.{ 25 30 .{ .name = "zat", .module = zat.module("zat") }, 26 31 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 32 + .{ .name = "logfire", .module = logfire.module("logfire") }, 27 33 }, 28 34 }), 29 35 }); ··· 40 46 41 47 const run_step = b.step("run", "Run the ingester"); 42 48 run_step.dependOn(&run_cmd.step); 49 + 50 + // minimal logfire test binary 51 + const test_lf = b.addExecutable(.{ 52 + .name = "test-logfire", 53 + .root_module = b.createModule(.{ 54 + .root_source_file = b.path("src/test_logfire.zig"), 55 + .target = target, 56 + .optimize = optimize, 57 + .imports = &.{ 58 + .{ .name = "logfire", .module = logfire.module("logfire") }, 59 + }, 60 + }), 61 + }); 62 + test_lf.linkLibC(); 63 + 64 + const run_test_lf = b.addRunArtifact(test_lf); 65 + run_test_lf.step.dependOn(b.getInstallStep()); 66 + const test_lf_step = b.step("test-logfire", "Run logfire test"); 67 + test_lf_step.dependOn(&run_test_lf.step); 43 68 }
+4
ingester/build.zig.zon
··· 12 12 .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 13 13 .hash = "zqlite-0.0.1-RWLaYz6bmAAT7E_jxopXf-j5Ea8VQldnxsd6TU8sa0Bb", 14 14 }, 15 + .logfire = .{ 16 + .url = "https://tangled.org/zzstoatzz.io/logfire-zig/archive/main", 17 + .hash = "logfire_zig-0.1.0-x2yDLgdwAADOXAZLNQJ8FUH5v1vfFwe5CApJtQ7c_pZd", 18 + }, 15 19 }, 16 20 .paths = .{ 17 21 "build.zig",
+5 -1
ingester/src/db/LocalDb.zig
··· 152 152 \\ hidden INTEGER NOT NULL DEFAULT 0, 153 153 \\ labels TEXT NOT NULL DEFAULT '[]', 154 154 \\ created_at TEXT DEFAULT '', 155 - \\ associated TEXT DEFAULT '{}' 155 + \\ associated TEXT DEFAULT '{}', 156 + \\ pds TEXT DEFAULT '' 156 157 \\) 157 158 , .{}) catch |err| { 158 159 log.err("failed to create actors table: {}", .{err}); 159 160 return err; 160 161 }; 162 + 163 + // migration: add pds column if missing (existing deployments) 164 + c.exec("ALTER TABLE actors ADD COLUMN pds TEXT DEFAULT ''", .{}) catch {}; 161 165 162 166 c.exec("CREATE INDEX IF NOT EXISTS idx_actors_handle ON actors(handle COLLATE NOCASE)", .{}) catch {}; 163 167
+1
ingester/src/db/TursoClient.zig
··· 110 110 }, 111 111 .payload = body, 112 112 .response_writer = &response_body.writer, 113 + .keep_alive = false, 113 114 }) catch |err| { 114 115 log.err("http failed: {s}", .{@errorName(err)}); 115 116 return error.HttpError;
+219 -80
ingester/src/db/sync.zig
··· 3 3 4 4 const std = @import("std"); 5 5 const zqlite = @import("zqlite"); 6 + const logfire = @import("logfire"); 6 7 const Allocator = std.mem.Allocator; 7 8 const TursoClient = @import("TursoClient.zig"); 8 9 const LocalDb = @import("LocalDb.zig"); ··· 18 19 /// "became unsearchable" queries). This avoids races with the dual-write path. 19 20 /// When `wipe` is true, deletes all local data first (used by stale rebuild path). 20 21 pub fn fullSync(turso: *TursoClient, local: *LocalDb, wipe: bool) !void { 21 - log.info("starting full sync (wipe={})...", .{wipe}); 22 + const span = logfire.span("sync.full", .{}); 23 + defer span.end(); 22 24 23 - const conn = local.getConn() orelse return error.LocalNotOpen; 25 + const conn = local.getConn() orelse { 26 + span.setStatus(.@"error", "LocalNotOpen"); 27 + return error.LocalNotOpen; 28 + }; 24 29 25 30 // check if a previous full sync completed successfully 26 31 const was_completed = blk: { ··· 38 43 }; 39 44 40 45 if (wipe or !was_completed) { 41 - // bootstrap path: use staging tables for fast bulk load 46 + log.info("full_sync: taking bootstrap path (wipe={}, was_completed={})", .{ wipe, was_completed }); 47 + span.setAttribute("mode", "bootstrap"); 42 48 return fullSyncBootstrap(turso, local, conn, wipe); 43 49 } 44 50 45 51 // re-sync path: previous sync completed, keep serving while re-syncing 52 + log.info("full_sync: taking resync path", .{}); 46 53 local.setReady(true); 47 - log.info("previous sync complete, keeping ready during re-sync", .{}); 54 + span.setAttribute("mode", "resync"); 48 55 return fullSyncResync(turso, local, conn); 49 56 } 50 57 51 58 /// Bootstrap path: load into staging tables (no indexes), build indexes in bulk, 52 59 /// then atomically swap into place. Live schema is never in a degraded state. 53 60 fn fullSyncBootstrap(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn, wipe: bool) !void { 61 + const span = logfire.span("sync.bootstrap", .{}); 62 + defer span.end(); 63 + 54 64 const total_t0 = std.time.milliTimestamp(); 55 65 local.setReady(false); 56 66 ··· 86 96 \\ hidden INTEGER NOT NULL DEFAULT 0, 87 97 \\ labels TEXT NOT NULL DEFAULT '[]', 88 98 \\ created_at TEXT DEFAULT '', 89 - \\ associated TEXT DEFAULT '{}' 99 + \\ associated TEXT DEFAULT '{}', 100 + \\ pds TEXT DEFAULT '' 90 101 \\) 91 102 , .{}) catch |err| { 92 - log.err("failed to create actors_stage: {}", .{err}); 103 + span.recordError(err); 93 104 return err; 94 105 }; 95 106 } ··· 109 120 110 121 // fetch from turso (no lock held) 111 122 var result = turso.query( 112 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid 123 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, rowid 113 124 \\FROM actors WHERE handle != '' AND rowid > ? 114 125 \\ORDER BY rowid LIMIT 2000 115 126 , &.{rowid_str}) catch |err| { ··· 142 153 const t2 = std.time.milliTimestamp(); 143 154 144 155 // advance cursor to last row's rowid (column 8) 145 - last_rowid = result.rows[result.rows.len - 1].int(8); 156 + last_rowid = result.rows[result.rows.len - 1].int(9); 146 157 158 + // high-frequency batch progress — keep on stderr only 147 159 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 148 160 result.rows.len, last_rowid, actor_count, 149 161 t1 - t0, t2 - t1, ··· 151 163 } 152 164 153 165 if (had_turso_error or error_count > 0) { 154 - log.warn("bootstrap incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error }); 166 + span.setAttribute("error_count", @as(i64, @intCast(error_count))); 167 + span.setStatus(.@"error", "bootstrap incomplete"); 155 168 // clean up staging table 156 169 { 157 170 local.lock(); ··· 173 186 } 174 187 175 188 // build indexes on staging table 176 - log.info("building indexes on {d} rows...", .{actor_count}); 177 189 const idx_did_t0 = std.time.milliTimestamp(); 178 190 { 179 191 local.lock(); 180 192 defer local.unlock(); 181 193 conn.exec("CREATE UNIQUE INDEX idx_stage_did ON actors_stage(did)", .{}) catch |err| { 182 - log.err("failed to create unique index on staging: {}", .{err}); 194 + span.recordError(err); 183 195 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 184 196 return err; 185 197 }; ··· 191 203 local.lock(); 192 204 defer local.unlock(); 193 205 conn.exec("CREATE INDEX idx_stage_handle ON actors_stage(handle COLLATE NOCASE)", .{}) catch |err| { 194 - log.err("failed to create handle index on staging: {}", .{err}); 206 + span.recordError(err); 195 207 conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 196 208 return err; 197 209 }; ··· 199 211 const idx_handle_ms = std.time.milliTimestamp() - idx_handle_t0; 200 212 201 213 // swap actors table first, then build FTS with final name. 202 - // FTS5 virtual tables use shadow tables (_content, _data, _idx, etc.) 203 - // that don't reliably rename with ALTER TABLE RENAME — so we never rename FTS5. 204 214 { 205 215 local.lock(); 206 216 defer local.unlock(); ··· 208 218 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 209 219 conn.exec("DROP TABLE IF EXISTS actors", .{}) catch {}; 210 220 conn.exec("ALTER TABLE actors_stage RENAME TO actors", .{}) catch |err| { 211 - log.err("swap failed (rename actors_stage): {}", .{err}); 221 + span.recordError(err); 212 222 conn.exec("ROLLBACK", .{}) catch {}; 213 223 return err; 214 224 }; 215 225 conn.exec("COMMIT", .{}) catch |err| { 216 - log.err("swap commit failed: {}", .{err}); 226 + span.recordError(err); 217 227 return err; 218 228 }; 219 229 } ··· 229 239 \\ tokenize='unicode61 remove_diacritics 2' 230 240 \\) 231 241 , .{}) catch |err| { 232 - log.err("failed to create FTS table: {}", .{err}); 242 + span.recordError(err); 233 243 return err; 234 244 }; 235 245 } ··· 243 253 \\INSERT INTO actors_fts (did, handle, display_name) 244 254 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 245 255 , .{}) catch |err| { 246 - log.err("FTS populate failed: {}", .{err}); 256 + span.recordError(err); 247 257 return err; 248 258 }; 249 259 } ··· 266 276 .{}, 267 277 ) catch {}; 268 278 269 - // cache actor count so health endpoint doesn't need COUNT(*) 270 279 var count_buf: [20]u8 = undefined; 271 280 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 272 281 conn.exec( ··· 280 289 const total_ms = std.time.milliTimestamp() - total_t0; 281 290 282 291 // reopen read connection before serving traffic 283 - local.reopenReadConn() catch |err| { 284 - log.err("failed to reopen read conn after bootstrap: {}", .{err}); 285 - }; 292 + local.reopenReadConn() catch {}; 286 293 287 294 local.setReady(true); 288 - log.info("bootstrap complete — total_rows={d} load_ms={d} idx_did_ms={d} idx_handle_ms={d} fts_create_ms={d} fts_populate_ms={d} total_ms={d}", .{ 289 - actor_count, load_ms, idx_did_ms, idx_handle_ms, fts_create_ms, fts_pop_ms, total_ms, 290 - }); 295 + span.setAttribute("actors", @as(i64, @intCast(actor_count))); 296 + span.setAttribute("load_ms", load_ms); 297 + span.setAttribute("idx_did_ms", idx_did_ms); 298 + span.setAttribute("idx_handle_ms", idx_handle_ms); 299 + span.setAttribute("fts_create_ms", fts_create_ms); 300 + span.setAttribute("fts_populate_ms", fts_pop_ms); 301 + span.setAttribute("total_ms", total_ms); 302 + span.setStatus(.ok, null); 291 303 } 292 304 293 305 /// Re-sync path: previous sync completed, update live table directly. 294 306 /// Only processes actors that may have changed — index maintenance cost is negligible. 295 307 fn fullSyncResync(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn) !void { 308 + const span = logfire.span("sync.resync", .{}); 309 + defer span.end(); 310 + 311 + log.info("resync: starting full table resync", .{}); 312 + 296 313 var actor_count: usize = 0; 297 314 var error_count: usize = 0; 298 315 var last_rowid: i64 = 0; 299 316 var had_turso_error = false; 317 + var batch_num: usize = 0; 300 318 301 319 while (true) { 302 320 var rowid_buf: [20]u8 = undefined; 303 321 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 304 322 323 + const q_span = logfire.span("sync.query.resync_batch", .{}); 305 324 const t0 = std.time.milliTimestamp(); 306 325 307 326 var result = turso.query( 308 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid 327 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, rowid 309 328 \\FROM actors WHERE handle != '' AND rowid > ? 310 329 \\ORDER BY rowid LIMIT 2000 311 330 , &.{rowid_str}) catch |err| { 312 - log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 331 + log.err("resync: turso query failed at rowid {d}: {}", .{ last_rowid, err }); 332 + q_span.setStatus(.@"error", "query failed"); 333 + q_span.end(); 313 334 had_turso_error = true; 314 335 break; 315 336 }; 316 337 defer result.deinit(); 317 338 318 339 const t1 = std.time.milliTimestamp(); 340 + q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 341 + q_span.setAttribute("fetch_ms", t1 - t0); 342 + q_span.setStatus(.ok, null); 343 + q_span.end(); 319 344 320 345 if (result.rows.len == 0) break; 321 346 322 347 { 348 + const a_span = logfire.span("sync.apply.resync_batch", .{}); 349 + defer a_span.end(); 350 + 323 351 local.lock(); 324 352 defer local.unlock(); 325 353 conn.exec("BEGIN", .{}) catch {}; 326 354 for (result.rows) |row| { 327 355 insertActorOnly(conn, row) catch |err| { 328 - log.err("insert actor failed: {}", .{err}); 356 + log.err("resync: insert actor failed: {}", .{err}); 329 357 error_count += 1; 330 358 continue; 331 359 }; 332 360 actor_count += 1; 333 361 } 334 362 conn.exec("COMMIT", .{}) catch {}; 363 + 364 + a_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 365 + a_span.setStatus(.ok, null); 335 366 } 336 367 337 368 const t2 = std.time.milliTimestamp(); 369 + last_rowid = result.rows[result.rows.len - 1].int(9); 370 + batch_num += 1; 338 371 339 - last_rowid = result.rows[result.rows.len - 1].int(8); 340 - 341 - log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 342 - result.rows.len, last_rowid, actor_count, 372 + log.info("resync: batch {d}: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 373 + batch_num, result.rows.len, last_rowid, actor_count, 343 374 t1 - t0, t2 - t1, 344 375 }); 345 376 } 346 377 347 378 if (had_turso_error or error_count > 0) { 348 - log.warn("full sync incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error }); 379 + span.setAttribute("error_count", @as(i64, @intCast(error_count))); 380 + span.setStatus(.@"error", "resync incomplete"); 381 + log.err("resync: incomplete (actors={d}, errors={d})", .{ actor_count, error_count }); 349 382 return; 350 383 } 351 384 352 - // rebuild FTS from scratch (DROP + CREATE — never reuse a potentially 353 - // broken FTS table from a previous failed rename) 385 + log.info("resync: download complete ({d} actors), rebuilding FTS", .{actor_count}); 386 + 387 + // rebuild FTS from scratch 354 388 { 355 - log.info("building FTS index for {d} actors...", .{actor_count}); 356 - const fts_t0 = std.time.milliTimestamp(); 389 + const fts_span = logfire.span("sync.fts_rebuild", .{}); 390 + defer fts_span.end(); 391 + 357 392 local.lock(); 358 393 defer local.unlock(); 359 394 conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; ··· 363 398 \\ tokenize='unicode61 remove_diacritics 2' 364 399 \\) 365 400 , .{}) catch |err| { 366 - log.err("failed to create FTS table: {}", .{err}); 401 + fts_span.recordError(err); 367 402 }; 368 403 conn.exec( 369 404 \\INSERT INTO actors_fts (did, handle, display_name) 370 405 \\SELECT did, handle, display_name FROM actors WHERE handle != '' 371 406 , .{}) catch |err| { 372 - log.err("FTS bulk insert failed: {}", .{err}); 407 + fts_span.recordError(err); 373 408 }; 374 - const fts_t1 = std.time.milliTimestamp(); 375 - log.info("FTS index built in {d}ms", .{fts_t1 - fts_t0}); 409 + fts_span.setAttribute("actors", @as(i64, @intCast(actor_count))); 376 410 } 411 + 412 + log.info("resync: FTS rebuild complete", .{}); 377 413 378 414 // record sync time + mark complete + actor count 379 415 { ··· 401 437 } 402 438 403 439 local.setReady(true); 404 - log.info("full sync complete — {d} actors", .{actor_count}); 440 + span.setAttribute("actors", @as(i64, @intCast(actor_count))); 441 + span.setStatus(.ok, null); 442 + log.info("resync: complete ({d} actors)", .{actor_count}); 405 443 } 406 444 407 445 /// Incremental sync: fetch actors updated since last sync + tombstones. 408 446 /// Uses keyset pagination on (updated_at, did) to drain all updates. 409 447 /// Only advances watermark when all turso queries succeed and all local writes succeed. 410 448 pub fn incrementalSync(turso: *TursoClient, local: *LocalDb) !void { 411 - const conn = local.getConn() orelse return error.LocalNotOpen; 449 + const span = logfire.span("sync.incremental", .{}); 450 + defer span.end(); 451 + 452 + const conn = local.getConn() orelse { 453 + span.setStatus(.@"error", "LocalNotOpen"); 454 + return error.LocalNotOpen; 455 + }; 412 456 413 457 // get last sync time 414 - local.lock(); 415 458 const last_sync_ts = blk: { 459 + const s = logfire.span("sync.read_last_sync", .{}); 460 + defer s.end(); 461 + local.lock(); 462 + defer local.unlock(); 416 463 const row = conn.row( 417 464 "SELECT value FROM sync_meta WHERE key = 'last_sync'", 418 465 .{}, 419 466 ) catch { 420 - local.unlock(); 467 + s.setStatus(.@"error", "query failed"); 421 468 break :blk @as(i64, 0); 422 469 }; 423 470 if (row) |r| { 424 471 defer r.deinit(); 425 472 const val = r.text(0); 426 - local.unlock(); 427 - break :blk if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0; 473 + const ts = if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0; 474 + s.setAttribute("last_sync", ts); 475 + s.setStatus(.ok, null); 476 + break :blk ts; 428 477 } 429 - local.unlock(); 478 + s.setStatus(.ok, null); 430 479 break :blk @as(i64, 0); 431 480 }; 432 481 482 + log.info("incremental: last_sync={d}", .{last_sync_ts}); 483 + 433 484 if (last_sync_ts == 0) { 434 - log.info("no last_sync found, doing full sync", .{}); 485 + log.info("incremental: no last_sync, falling back to full sync", .{}); 486 + span.setAttribute("fallback", "full_sync"); 435 487 return fullSync(turso, local, false); 436 488 } 437 489 438 - // if last sync is older than tombstone retention, local data is too stale — 439 - // tombstones may have been pruned, so incremental sync can't catch deletions. 440 - // wipe and rebuild from scratch. 490 + // if last sync is older than tombstone retention, local data is too stale 441 491 const now = std.time.timestamp(); 442 492 if (now - last_sync_ts > TOMBSTONE_RETENTION_S) { 443 - log.warn("last sync {d}s ago (>{d}s tombstone retention), wiping stale replica", .{ 444 - now - last_sync_ts, TOMBSTONE_RETENTION_S, 445 - }); 493 + log.info("incremental: stale ({d}s old), falling back to full sync with wipe", .{now - last_sync_ts}); 494 + span.setAttribute("fallback", "stale_wipe"); 446 495 return fullSync(turso, local, true); 447 496 } 448 497 ··· 473 522 &.{cursor_ts}; 474 523 475 524 const sql = if (cursor_did_len > 0) 476 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at 525 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, updated_at 477 526 \\FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) 478 527 \\ORDER BY updated_at, did LIMIT 2000 479 528 else 480 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at 529 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, updated_at 481 530 \\FROM actors WHERE handle != '' AND updated_at > ?1 482 531 \\ORDER BY updated_at, did LIMIT 2000 483 532 ; 484 533 534 + log.info("incremental: querying updated actors (cursor_ts={s})", .{cursor_ts}); 535 + const q_span = logfire.span("sync.query.updated", .{}); 536 + 485 537 var result = turso.query(sql, args) catch |err| { 486 538 log.err("incremental query failed: {}", .{err}); 539 + q_span.setStatus(.@"error", "query failed"); 540 + q_span.end(); 487 541 had_turso_error = true; 488 542 break; 489 543 }; 490 544 defer result.deinit(); 491 545 546 + q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 547 + q_span.setStatus(.ok, null); 548 + q_span.end(); 549 + 492 550 if (result.rows.len == 0) break; 493 551 552 + log.info("incremental: applying {d} updated actors", .{result.rows.len}); 494 553 { 554 + const a_span = logfire.span("sync.apply.updated", .{}); 555 + defer a_span.end(); 556 + 495 557 local.lock(); 496 558 defer local.unlock(); 497 559 conn.exec("BEGIN", .{}) catch {}; ··· 503 565 updated += 1; 504 566 } 505 567 conn.exec("COMMIT", .{}) catch {}; 568 + 569 + a_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 570 + a_span.setStatus(.ok, null); 506 571 } 507 572 508 573 // advance cursor to last row's (updated_at, did) 509 574 const last_row = result.rows[result.rows.len - 1]; 510 - const last_ua_int = last_row.int(8); // updated_at is integer 575 + const last_ua_int = last_row.int(9); // updated_at is integer 511 576 const last_did = last_row.text(0); 512 577 513 578 // serialize updated_at integer to string for next query ··· 527 592 // fetch actors that became unsearchable (handle cleared) 528 593 var cleared: usize = 0; 529 594 unsearchable: { 595 + log.info("incremental: querying unsearchable actors", .{}); 596 + const q_span = logfire.span("sync.query.unsearchable", .{}); 597 + 530 598 var result = turso.query( 531 599 "SELECT did FROM actors WHERE updated_at > ? AND handle = ''", 532 600 &.{since_str}, 533 601 ) catch |err| { 534 602 log.err("unsearchable query failed: {}", .{err}); 603 + q_span.setStatus(.@"error", "query failed"); 604 + q_span.end(); 535 605 had_turso_error = true; 536 606 break :unsearchable; 537 607 }; 538 608 defer result.deinit(); 539 609 610 + q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 611 + q_span.setStatus(.ok, null); 612 + q_span.end(); 613 + 540 614 if (result.rows.len > 0) { 615 + log.info("incremental: clearing {d} unsearchable actors", .{result.rows.len}); 541 616 local.lock(); 542 617 defer local.unlock(); 618 + conn.exec("BEGIN", .{}) catch {}; 543 619 for (result.rows) |row| { 544 620 const did = row.text(0); 545 621 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 546 622 error_count += 1; 547 623 continue; 548 624 }; 549 - conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 550 625 cleared += 1; 551 626 } 627 + conn.exec("COMMIT", .{}) catch {}; 552 628 } 553 629 } 554 630 555 631 // fetch tombstones 556 632 var deleted: usize = 0; 557 633 tombstone: { 634 + log.info("incremental: querying tombstones", .{}); 635 + const q_span = logfire.span("sync.query.tombstones", .{}); 636 + 558 637 var tomb_result = turso.query( 559 638 "SELECT did FROM tombstones WHERE deleted_at > ?", 560 639 &.{since_str}, 561 640 ) catch |err| { 562 641 log.err("tombstone query failed: {}", .{err}); 642 + q_span.setStatus(.@"error", "query failed"); 643 + q_span.end(); 563 644 had_turso_error = true; 564 645 break :tombstone; 565 646 }; 566 647 defer tomb_result.deinit(); 648 + 649 + q_span.setAttribute("rows", @as(i64, @intCast(tomb_result.rows.len))); 650 + q_span.setStatus(.ok, null); 651 + q_span.end(); 567 652 568 653 if (tomb_result.rows.len > 0) { 654 + log.info("incremental: deleting {d} tombstoned actors", .{tomb_result.rows.len}); 569 655 local.lock(); 570 656 defer local.unlock(); 657 + conn.exec("BEGIN", .{}) catch {}; 571 658 for (tomb_result.rows) |row| { 572 659 const did = row.text(0); 573 660 conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 574 661 error_count += 1; 575 662 continue; 576 663 }; 577 - conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 578 664 deleted += 1; 579 665 } 666 + conn.exec("COMMIT", .{}) catch {}; 580 667 } 581 668 } 582 669 670 + // rebuild FTS from actors table if anything changed. 671 + // per-row FTS maintenance is O(N) per delete because `did` is UNINDEXED — 672 + // bulk rebuild is O(N) total regardless of how many rows changed. 673 + if (updated > 0 or cleared > 0 or deleted > 0) { 674 + // checkpoint WAL before FTS rebuild — large WAL from batch deletes 675 + // makes the INSERT SELECT scan extremely slow 676 + { 677 + local.lock(); 678 + defer local.unlock(); 679 + conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 680 + } 681 + 682 + const fts_span = logfire.span("sync.fts_rebuild", .{}); 683 + log.info("incremental: rebuilding FTS ({d} changes)", .{updated + cleared + deleted}); 684 + { 685 + local.lock(); 686 + defer local.unlock(); 687 + conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 688 + conn.exec( 689 + \\CREATE VIRTUAL TABLE actors_fts USING fts5( 690 + \\ did UNINDEXED, handle, display_name, 691 + \\ tokenize='unicode61 remove_diacritics 2' 692 + \\) 693 + , .{}) catch |err| { 694 + log.err("FTS rebuild: create failed: {}", .{err}); 695 + fts_span.recordError(err); 696 + fts_span.setStatus(.@"error", "create failed"); 697 + fts_span.end(); 698 + return; 699 + }; 700 + conn.exec( 701 + \\INSERT INTO actors_fts (did, handle, display_name) 702 + \\SELECT did, handle, display_name FROM actors WHERE handle != '' 703 + , .{}) catch |err| { 704 + log.err("FTS rebuild: populate failed: {}", .{err}); 705 + fts_span.recordError(err); 706 + fts_span.setStatus(.@"error", "populate failed"); 707 + fts_span.end(); 708 + return; 709 + }; 710 + } 711 + fts_span.setStatus(.ok, null); 712 + fts_span.end(); 713 + log.info("incremental: FTS rebuild complete", .{}); 714 + } 715 + 583 716 // only advance watermark when everything succeeded 584 717 const had_error = had_turso_error or error_count > 0; 585 718 if (had_error) { 586 - log.warn("incremental sync had errors — {d} updated, {d} deleted, {d} cleared, {d} write errors, turso_error={}", .{ 587 - updated, deleted, cleared, error_count, had_turso_error, 719 + span.setAttribute("error_count", @as(i64, @intCast(error_count))); 720 + span.setStatus(.@"error", "incremental sync had errors"); 721 + log.err("incremental: done with errors (updated={d}, cleared={d}, deleted={d}, errors={d})", .{ 722 + updated, cleared, deleted, error_count, 588 723 }); 589 724 } else { 590 725 local.lock(); ··· 599 734 // refresh cached actor count when data changed 600 735 if (updated > 0 or deleted > 0 or cleared > 0) { 601 736 updateActorCount(conn); 602 - log.info("incremental sync — {d} updated, {d} deleted, {d} cleared", .{ updated, deleted, cleared }); 603 737 } 604 738 605 739 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 740 + span.setStatus(.ok, null); 741 + log.info("incremental: done (updated={d}, cleared={d}, deleted={d})", .{ 742 + updated, cleared, deleted, 743 + }); 606 744 } 745 + 746 + span.setAttribute("updated", @as(i64, @intCast(updated))); 747 + span.setAttribute("deleted", @as(i64, @intCast(deleted))); 748 + span.setAttribute("cleared", @as(i64, @intCast(cleared))); 607 749 } 608 750 609 751 /// Background sync loop: full sync on boot, then incremental every 5 min 610 752 pub fn syncLoop(allocator: Allocator, local: *LocalDb) void { 753 + log.info("sync loop started", .{}); 611 754 var turso = TursoClient.init(allocator) catch |err| { 612 755 log.err("turso client init failed: {}, sync disabled", .{err}); 613 756 return; ··· 633 776 fn insertActorStage(conn: zqlite.Conn, row: anytype) !void { 634 777 conn.exec( 635 778 \\INSERT INTO actors_stage 636 - \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 637 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 779 + \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds) 780 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 638 781 , .{ 639 782 row.text(0), // did 640 783 row.text(1), // handle ··· 644 787 row.text(5), // labels 645 788 row.text(6), // created_at 646 789 row.text(7), // associated 790 + row.text(8), // pds 647 791 }) catch |err| { 648 792 return err; 649 793 }; ··· 653 797 fn insertActorOnly(conn: zqlite.Conn, row: anytype) !void { 654 798 conn.exec( 655 799 \\INSERT OR REPLACE INTO actors 656 - \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 657 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 800 + \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds) 801 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 658 802 , .{ 659 803 row.text(0), // did 660 804 row.text(1), // handle ··· 664 808 row.text(5), // labels 665 809 row.text(6), // created_at 666 810 row.text(7), // associated 811 + row.text(8), // pds 667 812 }) catch |err| { 668 813 return err; 669 814 }; 670 815 } 671 816 672 - /// Upsert a turso row into local SQLite + FTS 817 + /// Upsert a turso row into local SQLite (actors table only, no FTS). 818 + /// FTS is rebuilt in bulk at sync boundaries — per-row FTS deletes are O(N) scans 819 + /// because `did` is UNINDEXED in the FTS5 table. 673 820 fn upsertActorLocal(conn: zqlite.Conn, row: anytype) !void { 674 - const did = row.text(0); 675 - 676 821 conn.exec( 677 822 \\INSERT OR REPLACE INTO actors 678 - \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 679 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 823 + \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds) 824 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 680 825 , .{ 681 - did, 826 + row.text(0), // did 682 827 row.text(1), // handle 683 828 row.text(2), // display_name 684 829 row.text(3), // avatar_url ··· 686 831 row.text(5), // labels 687 832 row.text(6), // created_at 688 833 row.text(7), // associated 834 + row.text(8), // pds 689 835 }) catch |err| { 690 836 return err; 691 837 }; 692 - 693 - // manual FTS update (standalone FTS, not content-synced) 694 - conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 695 - conn.exec( 696 - "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)", 697 - .{ did, row.text(1), row.text(2) }, 698 - ) catch {}; 699 838 } 700 839 701 840 /// Update cached actor count in sync_meta (avoids COUNT(*) on health checks)
+18 -29
ingester/src/ingest.zig
··· 131 131 } 132 132 133 133 fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void { 134 - if (mem.eql(u8, c.collection, "app.bsky.actor.profile")) { 135 - // rich extraction from profile records 134 + if (mem.eql(u8, c.collection, "app.bsky.actor.profile") or 135 + mem.eql(u8, c.collection, "sh.tangled.actor.profile")) 136 + { 137 + // rich extraction from profile records (bsky and tangled) 136 138 if (c.operation != .create and c.operation != .update) return; 137 139 138 140 const record = c.record orelse return; ··· 140 142 const did = self.dupe(c.did) orelse return; 141 143 var event = ActorEvent{ .did = did }; 142 144 145 + // bsky profiles have displayName; tangled profiles don't 143 146 if (zat.json.getString(record, "displayName")) |name| { 144 147 event.display_name = self.dupe(name); 145 148 } ··· 187 190 if (!acct.active) { 188 191 const did = self.dupe(acct.did) orelse return; 189 192 190 - // dual-write: delete from local 193 + // dual-write: delete from local (skip if sync holds the lock) 191 194 if (self.local_db) |db| { 192 - db.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {}; 193 - db.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 195 + if (db.mutex.tryLock()) { 196 + defer db.mutex.unlock(); 197 + if (db.getConn()) |c| { 198 + c.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {}; 199 + } 200 + } 194 201 } 195 202 196 203 self.delete_buffer.append(self.allocator, did) catch return; 197 204 } 198 205 } 199 206 200 - /// Dual-write to local SQLite for immediate search availability 207 + /// Dual-write to local SQLite for immediate search availability. 208 + /// Uses tryLock to avoid blocking ingester when sync holds the write lock. 209 + /// FTS is rebuilt by sync at cycle boundaries — no per-row FTS maintenance here. 201 210 fn writeToLocal(self: *IngestHandler, event: ActorEvent) void { 202 211 const db = self.local_db orelse return; 203 212 if (!db.isReady()) return; 204 213 205 214 const conn = db.getConn() orelse return; 206 215 207 - db.lock(); 208 - defer db.unlock(); 216 + if (!db.mutex.tryLock()) return; // sync thread has the lock, skip 217 + defer db.mutex.unlock(); 209 218 210 219 // only write if we have searchable data (handle or display_name) 211 220 const has_handle = event.handle != null and event.handle.?.len > 0; 212 221 const has_name = event.display_name != null and event.display_name.?.len > 0; 213 222 if (!has_handle and !has_name) return; 214 223 215 - // upsert actor 224 + // upsert actor (FTS will be rebuilt by next sync cycle) 216 225 conn.exec( 217 226 \\INSERT INTO actors (did, handle, display_name, avatar_url) 218 227 \\VALUES (?, ?, ?, ?) ··· 226 235 event.display_name orelse "", 227 236 event.avatar_cid orelse "", 228 237 }) catch return; 229 - 230 - // update FTS 231 - conn.exec("DELETE FROM actors_fts WHERE did = ?", .{event.did}) catch {}; 232 - 233 - // read back current handle + display_name for FTS 234 - const row = conn.row( 235 - "SELECT handle, display_name FROM actors WHERE did = ?", 236 - .{event.did}, 237 - ) catch return; 238 - if (row) |r| { 239 - defer r.deinit(); 240 - const h = r.text(0); 241 - const dn = r.text(1); 242 - if (h.len > 0 or dn.len > 0) { 243 - conn.exec( 244 - "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)", 245 - .{ event.did, h, dn }, 246 - ) catch {}; 247 - } 248 - } 249 238 } 250 239 251 240 fn flush(self: *IngestHandler) void {
+11
ingester/src/main.zig
··· 5 5 const Allocator = mem.Allocator; 6 6 const Thread = std.Thread; 7 7 const zat = @import("zat"); 8 + const logfire = @import("logfire"); 8 9 const HttpTransport = zat.HttpTransport; 9 10 const LocalDb = @import("db/LocalDb.zig"); 10 11 const sync = @import("db/sync.zig"); ··· 41 42 "app.bsky.feed.post", 42 43 "app.bsky.feed.like", 43 44 "app.bsky.graph.follow", 45 + "sh.tangled.actor.profile", 44 46 }, 45 47 .cursor = ingest.fetchCursor(args.transport, args.config), 46 48 }); ··· 62 64 var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 63 65 defer _ = gpa.deinit(); 64 66 const allocator = gpa.allocator(); 67 + 68 + // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 69 + _ = logfire.configure(.{ 70 + .service_name = "typeahead-ingester", 71 + .service_version = "0.1.0", 72 + .environment = posix.getenv("FLY_APP_NAME") orelse "development", 73 + }) catch |err| { 74 + log.err("logfire configure failed: {}", .{err}); 75 + }; 65 76 66 77 const config = ingest.getConfig(); 67 78 log.info("typeahead ingester → {s}", .{config.worker_url});
+8 -3
ingester/src/search.zig
··· 42 42 // 1. exact handle match 43 43 { 44 44 var rows = local.query( 45 - \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated 45 + \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 46 46 \\FROM actors WHERE handle = ? COLLATE NOCASE AND hidden = 0 LIMIT 1 47 47 , .{term}) catch |err| blk: { 48 48 log.err("exact query failed: {}", .{err}); ··· 135 135 for (prefix_dids[0..prefix_count]) |did| { 136 136 if (emitted >= limit) break; 137 137 var rows = local.query( 138 - \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated 138 + \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 139 139 \\FROM actors WHERE did = ? 140 140 , .{did}) catch continue; 141 141 defer rows.deinit(); ··· 186 186 for (candidate_dids[0..candidate_count]) |did| { 187 187 if (emitted >= limit) break; 188 188 var rows = local.query( 189 - \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated 189 + \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 190 190 \\FROM actors WHERE did = ? AND handle != '' AND hidden = 0 191 191 , .{did}) catch continue; 192 192 defer rows.deinit(); ··· 206 206 207 207 /// Write a single actor object from a live SQLite row. 208 208 /// Must be called while the row's statement is still alive. 209 + /// Row columns: did, handle, display_name, avatar_url, labels, created_at, associated, pds 209 210 fn writeActorFromRow(jw: anytype, row: LocalDb.Row) !void { 210 211 const did = row.text(0); 211 212 const handle = row.text(1); ··· 214 215 const labels = row.text(4); 215 216 const created_at = row.text(5); 216 217 const associated = row.text(6); 218 + const pds = row.text(7); 217 219 218 220 try jw.beginObject(); 219 221 ··· 230 232 try jw.objectField("avatar"); 231 233 if (mem.startsWith(u8, avatar_url, "https://")) { 232 234 try jw.write(avatar_url); 235 + } else if (pds.len > 0 and !mem.endsWith(u8, pds, ".bsky.network")) { 236 + // non-bsky PDS: construct blob URL from PDS + DID + CID 237 + try jw.print("\"{s}/xrpc/com.atproto.sync.getBlob?did={s}&cid={s}\"", .{ pds, did, avatar_url }); 233 238 } else { 234 239 try jw.print("\"https://cdn.bsky.app/img/avatar/plain/{s}/{s}\"", .{ did, avatar_url }); 235 240 }
+8
ingester/src/server.zig
··· 6 6 const net = std.net; 7 7 const mem = std.mem; 8 8 const json = std.json; 9 + const logfire = @import("logfire"); 9 10 const LocalDb = @import("db/LocalDb.zig"); 10 11 const search = @import("search.zig"); 11 12 const ingest = @import("ingest.zig"); ··· 109 110 } 110 111 111 112 fn handleHealth(request: *http.Server.Request, local_db: *LocalDb) !void { 113 + const span = logfire.span("http.health", .{}); 114 + defer span.end(); 115 + 112 116 const ready = local_db.isReady(); 113 117 const actors = local_db.countActors(); 114 118 const rss = ingest.getRssKB(); ··· 121 125 actors, 122 126 rss, 123 127 }) catch "{\"status\":\"ok\"}"; 128 + 129 + span.setAttribute("ready", if (ready) "true" else "false"); 130 + span.setAttribute("actors", @as(i64, @intCast(actors))); 131 + span.setStatus(.ok, null); 124 132 125 133 try sendJson(request, .ok, body); 126 134 }
+7
schema.sql
··· 74 74 did TEXT PRIMARY KEY, 75 75 deleted_at INTEGER NOT NULL 76 76 ); 77 + 78 + CREATE TABLE IF NOT EXISTS mod_overrides ( 79 + did TEXT PRIMARY KEY, 80 + action TEXT NOT NULL, -- 'show' or 'hide' 81 + reason TEXT DEFAULT '', 82 + created_at INTEGER NOT NULL DEFAULT (unixepoch()) 83 + );
+319
scripts/index-domain.py
··· 1 + #!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [] 5 + # /// 6 + """ 7 + discover and index all handles under a domain suffix. 8 + 9 + discovery methods: 10 + bsky — search bluesky's public API (fast, but misses handles bluesky hasn't indexed) 11 + plc — stream PLC directory export (comprehensive, slower — use --after to limit range) 12 + pds — enumerate repos on a PDS, resolve handles via PLC (best for domains that run their own PDS) 13 + 14 + usage: 15 + ./scripts/index-domain.py tngl.sh --source pds --pds https://tngl.sh 16 + ./scripts/index-domain.py tngl.sh --source plc --after 2026-01-01 17 + ./scripts/index-domain.py bsky.team --dry-run 18 + ./scripts/index-domain.py mycustomdomain.com --concurrency 10 19 + """ 20 + 21 + import argparse 22 + import json 23 + import sys 24 + import urllib.parse 25 + import urllib.request 26 + import urllib.error 27 + from concurrent.futures import ThreadPoolExecutor, as_completed 28 + from typing import TypedDict 29 + 30 + 31 + class Actor(TypedDict): 32 + did: str 33 + handle: str 34 + 35 + 36 + class IndexResult(TypedDict, total=False): 37 + handle: str 38 + did: str 39 + hidden: bool 40 + error: str 41 + 42 + BSKY_SEARCH = "https://public.api.bsky.app/xrpc/app.bsky.actor.searchActors" 43 + PLC_EXPORT = "https://plc.directory/export" 44 + TYPEAHEAD_URL = "https://typeahead.waow.tech" 45 + SEARCH_LIMIT = 100 46 + PLC_PAGE_SIZE = 1000 47 + 48 + DIM = "\033[2m" 49 + GREEN = "\033[32m" 50 + YELLOW = "\033[33m" 51 + RED = "\033[31m" 52 + RESET = "\033[0m" 53 + 54 + 55 + def search_bsky(suffix: str) -> list[Actor]: 56 + """paginate through bluesky searchActors for handles ending in .suffix""" 57 + found = {} 58 + cursor = None 59 + page = 0 60 + 61 + while True: 62 + page += 1 63 + url = f"{BSKY_SEARCH}?q={urllib.parse.quote(suffix)}&limit={SEARCH_LIMIT}" 64 + if cursor: 65 + url += f"&cursor={urllib.parse.quote(cursor)}" 66 + 67 + try: 68 + req = urllib.request.Request(url) 69 + with urllib.request.urlopen(req, timeout=15) as resp: 70 + data = json.loads(resp.read()) 71 + except Exception as e: 72 + print(f"{RED}search error page {page}: {e}{RESET}") 73 + break 74 + 75 + actors = data.get("actors", []) 76 + if not actors: 77 + break 78 + 79 + new = 0 80 + for actor in actors: 81 + handle = actor.get("handle", "") 82 + did = actor.get("did", "") 83 + if handle.endswith(f".{suffix}") and did not in found: 84 + found[did] = handle 85 + new += 1 86 + 87 + print(f"{DIM}page {page}: {len(actors)} results, {new} new matches ({len(found)} total){RESET}") 88 + 89 + cursor = data.get("cursor") 90 + if not cursor or len(actors) < SEARCH_LIMIT: 91 + break 92 + 93 + return [{"did": did, "handle": handle} for did, handle in found.items()] 94 + 95 + 96 + def search_plc(suffix: str, after: str | None = None) -> list[Actor]: 97 + """stream PLC directory export, filtering for handles ending in .suffix""" 98 + found = {} 99 + cursor = after or "1970-01-01T00:00:00Z" 100 + pages = 0 101 + total_ops = 0 102 + 103 + while True: 104 + pages += 1 105 + url = f"{PLC_EXPORT}?count={PLC_PAGE_SIZE}&after={urllib.parse.quote(cursor)}" 106 + 107 + try: 108 + req = urllib.request.Request(url) 109 + with urllib.request.urlopen(req, timeout=30) as resp: 110 + lines = resp.read().decode().strip().split("\n") 111 + except Exception as e: 112 + print(f"{RED}PLC export error at cursor {cursor}: {e}{RESET}") 113 + break 114 + 115 + if not lines or lines == [""]: 116 + break 117 + 118 + batch_new = 0 119 + last_created = cursor 120 + for line in lines: 121 + if not line.strip(): 122 + continue 123 + total_ops += 1 124 + try: 125 + entry = json.loads(line) 126 + except json.JSONDecodeError: 127 + continue 128 + 129 + last_created = entry.get("createdAt", last_created) 130 + did = entry.get("did", "") 131 + op = entry.get("operation", {}) 132 + 133 + # newer format: alsoKnownAs 134 + for aka in op.get("alsoKnownAs", []): 135 + handle = aka.removeprefix("at://") 136 + if handle.endswith(f".{suffix}") and did not in found: 137 + found[did] = handle 138 + batch_new += 1 139 + 140 + # older format: handle field 141 + handle = op.get("handle", "") 142 + if handle.endswith(f".{suffix}") and did not in found: 143 + found[did] = handle 144 + batch_new += 1 145 + 146 + if batch_new: 147 + print(f"{DIM}page {pages}: scanned {total_ops} ops, +{batch_new} new ({len(found)} total){RESET}") 148 + elif pages % 100 == 0: 149 + print(f"{DIM}page {pages}: scanned {total_ops} ops, {len(found)} matches so far (at {last_created[:10]}){RESET}") 150 + 151 + if len(lines) < PLC_PAGE_SIZE: 152 + break 153 + 154 + cursor = last_created 155 + 156 + print(f"{DIM}scanned {total_ops} PLC operations across {pages} pages{RESET}") 157 + return [{"did": did, "handle": handle} for did, handle in found.items()] 158 + 159 + 160 + def search_pds(suffix: str, pds_url: str) -> list[Actor]: 161 + """enumerate all repos on a PDS via com.atproto.sync.listRepos, resolve handles via PLC""" 162 + # step 1: collect all DIDs from the PDS 163 + all_dids = [] 164 + cursor = "" 165 + page = 0 166 + 167 + while True: 168 + page += 1 169 + url = f"{pds_url}/xrpc/com.atproto.sync.listRepos?limit=1000" 170 + if cursor: 171 + url += f"&cursor={urllib.parse.quote(cursor)}" 172 + 173 + try: 174 + req = urllib.request.Request(url) 175 + with urllib.request.urlopen(req, timeout=30) as resp: 176 + data = json.loads(resp.read()) 177 + except Exception as e: 178 + print(f"{RED}PDS listRepos error page {page}: {e}{RESET}") 179 + break 180 + 181 + repos = data.get("repos", []) 182 + all_dids.extend(r["did"] for r in repos) 183 + print(f"{DIM}page {page}: {len(repos)} repos ({len(all_dids)} total){RESET}") 184 + 185 + cursor = data.get("cursor", "") 186 + if not cursor or len(repos) == 0: 187 + break 188 + 189 + print(f"found {len(all_dids)} accounts on PDS, resolving handles via PLC...") 190 + 191 + # step 2: resolve each DID via PLC directory (concurrent) 192 + found = {} 193 + errors = 0 194 + 195 + def resolve_did(did): 196 + req = urllib.request.Request(f"https://plc.directory/{did}") 197 + with urllib.request.urlopen(req, timeout=10) as resp: 198 + doc = json.loads(resp.read()) 199 + for aka in doc.get("alsoKnownAs", []): 200 + handle = aka.removeprefix("at://") 201 + if handle.endswith(f".{suffix}"): 202 + return did, handle 203 + return None, None 204 + 205 + with ThreadPoolExecutor(max_workers=20) as pool: 206 + futures = {pool.submit(resolve_did, did): did for did in all_dids} 207 + done = 0 208 + for future in as_completed(futures): 209 + done += 1 210 + try: 211 + did, handle = future.result() 212 + if did: 213 + found[did] = handle 214 + except Exception: 215 + errors += 1 216 + if done % 200 == 0: 217 + print(f"{DIM} resolved {done}/{len(all_dids)} ({len(found)} matches){RESET}") 218 + 219 + print(f"{DIM}resolved {len(all_dids)} DIDs: {len(found)} matches, {errors} errors{RESET}") 220 + return [{"did": did, "handle": handle} for did, handle in found.items()] 221 + 222 + 223 + def index_one(handle: str, token: str | None = None) -> IndexResult: 224 + """call /request-indexing for a single handle""" 225 + url = f"{TYPEAHEAD_URL}/request-indexing?handle={urllib.parse.quote(handle)}" 226 + headers: dict[str, str] = {"User-Agent": "typeahead-index-domain/1.0"} 227 + if token: 228 + headers["Authorization"] = f"Bearer {token}" 229 + req = urllib.request.Request(url, method="POST", headers=headers) 230 + try: 231 + with urllib.request.urlopen(req, timeout=15) as resp: 232 + return json.loads(resp.read()) 233 + except urllib.error.HTTPError as e: 234 + body = e.read().decode() if e.fp else "" 235 + return {"error": f"HTTP {e.code}: {body}"} 236 + except Exception as e: 237 + return {"error": str(e)} 238 + 239 + 240 + def main(): 241 + parser = argparse.ArgumentParser(description="discover and index handles under a domain suffix") 242 + parser.add_argument("suffix", help="domain suffix, e.g. tngl.io") 243 + parser.add_argument("--source", choices=["bsky", "plc", "pds"], default="bsky", 244 + help="discovery method (default: bsky)") 245 + parser.add_argument("--pds", help="for pds source: the PDS URL (e.g. https://tngl.sh)") 246 + parser.add_argument("--after", help="for plc source: only scan operations after this date (e.g. 2026-01-01)") 247 + parser.add_argument("--dry-run", action="store_true", help="discover only, don't index") 248 + parser.add_argument("--token", help="admin token to bypass rate limiting (reads ADMIN_SECRET env var if not set)") 249 + parser.add_argument("--concurrency", type=int, default=5, help="concurrent indexing requests (default: 5)") 250 + args = parser.parse_args() 251 + 252 + suffix = args.suffix.lstrip("*.") 253 + 254 + if args.source == "pds": 255 + if not args.pds: 256 + print(f"{RED}--pds URL required for pds source{RESET}") 257 + sys.exit(1) 258 + print(f"enumerating repos on {args.pds} for *.{suffix} handles...") 259 + actors = search_pds(suffix, args.pds.rstrip("/")) 260 + elif args.source == "plc": 261 + after = args.after 262 + if after and "T" not in after: 263 + after += "T00:00:00Z" 264 + print(f"scanning PLC directory for *.{suffix}" + (f" (after {after})" if after else "") + "...") 265 + actors = search_plc(suffix, after) 266 + else: 267 + print(f"searching bluesky for *.{suffix} handles...") 268 + actors = search_bsky(suffix) 269 + 270 + if not actors: 271 + print(f"no handles found matching *.{suffix}") 272 + if args.source == "bsky": 273 + print(f"{DIM}tip: try --source plc --after 2025-01-01 for a more comprehensive scan{RESET}") 274 + return 275 + 276 + print(f"\nfound {len(actors)} handles:") 277 + for a in sorted(actors, key=lambda x: x["handle"]): 278 + print(f" {a['handle']} ({a['did']})") 279 + 280 + if args.dry_run: 281 + print(f"\n{YELLOW}dry run — skipping indexing{RESET}") 282 + return 283 + 284 + import os 285 + token = args.token or os.environ.get("ADMIN_SECRET") 286 + 287 + print(f"\nindexing {len(actors)} handles (concurrency={args.concurrency}" + (", admin auth" if token else ", no auth — may hit rate limit") + ")...") 288 + indexed = 0 289 + hidden = 0 290 + errors = 0 291 + done = 0 292 + verbose = len(actors) <= 50 293 + 294 + with ThreadPoolExecutor(max_workers=args.concurrency) as pool: 295 + futures = {pool.submit(index_one, a["handle"], token): a for a in actors} 296 + for future in as_completed(futures): 297 + actor = futures[future] 298 + result = future.result() 299 + done += 1 300 + if "error" in result: 301 + errors += 1 302 + if verbose: 303 + print(f" {RED}✗ {actor['handle']}: {result['error']}{RESET}") 304 + elif result.get("hidden"): 305 + hidden += 1 306 + if verbose: 307 + print(f" {YELLOW}· {actor['handle']} (hidden){RESET}") 308 + else: 309 + indexed += 1 310 + if verbose: 311 + print(f" {GREEN}✓ {actor['handle']}{RESET}") 312 + if not verbose and done % 100 == 0: 313 + print(f"{DIM} {done}/{len(actors)} ({indexed} ok, {hidden} hidden, {errors} errors){RESET}") 314 + 315 + print(f"\ndone: {indexed} indexed, {hidden} hidden, {errors} errors") 316 + 317 + 318 + if __name__ == "__main__": 319 + main()
+105
scripts/smoke.py
··· 17 17 18 18 import argparse 19 19 import json 20 + import os 20 21 import sys 21 22 import urllib.request 22 23 import urllib.error ··· 260 261 return False 261 262 262 263 264 + LINK_DID = "did:plc:63hvnyjvqi2nzzcsjgnry5we" 265 + LINK_HANDLE = "spacelawshitpost.me" 266 + 267 + 268 + def admin_fetch(url: str, secret: str, method: str = "GET", body: dict | None = None) -> tuple[dict | None, int]: 269 + """fetch with admin auth. returns (body, status_code).""" 270 + data = json.dumps(body).encode() if body else None 271 + req = urllib.request.Request( 272 + url, 273 + data=data, 274 + headers={ 275 + "User-Agent": "typeahead-smoke/1.0", 276 + "Authorization": f"Bearer {secret}", 277 + **({"Content-Type": "application/json"} if data else {}), 278 + }, 279 + method=method, 280 + ) 281 + try: 282 + with urllib.request.urlopen(req, timeout=15) as resp: 283 + return json.loads(resp.read()), resp.status 284 + except urllib.error.HTTPError as e: 285 + try: 286 + return json.loads(e.read()), e.code 287 + except Exception: 288 + return None, e.code 289 + 290 + 291 + def test_link_visible(base_url: str): 292 + """verify Link (spacelawshitpost.me) is visible — requires a permanent show override.""" 293 + print("\n--- Link visibility ---") 294 + data, _ = fetch(f"{base_url}{XRPC_PATH}?q=spacelawshitpost&limit=5") 295 + if not data or "_error" in data or "_http_error" in data: 296 + check("search for Link", False, f"got {data}") 297 + return 298 + handles = [a.get("handle", "") for a in data.get("actors", [])] 299 + check(f"Link visible in search", LINK_HANDLE in handles, f"got {handles}") 300 + 301 + 302 + def test_mod_overrides(base_url: str, secret: str): 303 + """exercise the override admin CRUD lifecycle.""" 304 + print("\n--- mod_overrides admin API ---") 305 + 306 + # 1. ensure Link is indexed 307 + data, _ = fetch(f"{base_url}/request-indexing?handle={LINK_HANDLE}", method="POST") 308 + if not data or "_error" in data: 309 + check("index Link", False, f"got {data}") 310 + return 311 + check("index Link", data.get("did") == LINK_DID, f"got {data.get('did')}") 312 + 313 + # 2. set show override 314 + data, status = admin_fetch( 315 + f"{base_url}/admin/mod-override", secret, "POST", 316 + {"did": LINK_DID, "action": "show", "reason": "FREE LINK"}, 317 + ) 318 + check("set show override", status == 200 and data and data.get("action") == "show", 319 + f"status={status}, got {data}") 320 + if data: 321 + check("override sets hidden=0", data.get("hidden") == 0, f"got hidden={data.get('hidden')}") 322 + 323 + # 3. validate input — bad action should 400 324 + data, status = admin_fetch( 325 + f"{base_url}/admin/mod-override", secret, "POST", 326 + {"did": LINK_DID, "action": "yolo"}, 327 + ) 328 + check("bad action returns 400", status == 400, f"status={status}") 329 + 330 + # 4. list overrides — Link should be there 331 + data, status = admin_fetch(f"{base_url}/admin/mod-overrides", secret) 332 + if data: 333 + override_dids = [o.get("did") for o in data.get("overrides", [])] 334 + check("Link in override list", LINK_DID in override_dids) 335 + else: 336 + check("list overrides", False, f"status={status}") 337 + 338 + # 5. delete override — hidden recomputed from labels 339 + data, status = admin_fetch(f"{base_url}/admin/mod-override?did={LINK_DID}", secret, "DELETE") 340 + check("delete override", status == 200, f"status={status}, got {data}") 341 + 342 + # 6. verify override is gone 343 + data, status = admin_fetch(f"{base_url}/admin/mod-overrides", secret) 344 + if data: 345 + override_dids = [o.get("did") for o in data.get("overrides", [])] 346 + check("Link removed from overrides", LINK_DID not in override_dids) 347 + else: 348 + check("list overrides after delete", False, f"status={status}") 349 + 350 + # 7. re-set the override — Link should stay visible permanently 351 + data, status = admin_fetch( 352 + f"{base_url}/admin/mod-override", secret, "POST", 353 + {"did": LINK_DID, "action": "show", "reason": "FREE LINK"}, 354 + ) 355 + check("re-set show override", status == 200 and data and data.get("hidden") == 0, 356 + f"status={status}, got {data}") 357 + 358 + 263 359 def test_comparison(base_url: str, queries: list[str]): 264 360 print("\n--- comparison vs public.api.bsky.app ---") 265 361 ··· 288 384 def main(): 289 385 parser = argparse.ArgumentParser(description="typeahead smoke tests") 290 386 parser.add_argument("--url", required=True, help="typeahead service URL") 387 + parser.add_argument("--secret", default=os.environ.get("TYPEAHEAD_SECRET", ""), 388 + help="admin secret (or set TYPEAHEAD_SECRET env var)") 291 389 parser.add_argument("--compare", action="store_true", help="compare results vs public.api.bsky.app") 292 390 parser.add_argument( 293 391 "--queries", ··· 310 408 test_stats_page(args.url) 311 409 test_request_indexing(args.url) 312 410 test_moderation_filtering(args.url) 411 + test_link_visible(args.url) 412 + 413 + if args.secret: 414 + test_mod_overrides(args.url, args.secret) 415 + else: 416 + print(f"\n--- mod_overrides admin API ---") 417 + print(f" [{SKIP}] skipped (use --secret or set TYPEAHEAD_SECRET)") 313 418 314 419 if args.compare: 315 420 test_comparison(args.url, args.queries)
+3 -1
src/backfill.ts
··· 2 2 import type { Env } from "./types"; 3 3 import { BSKY_TYPEAHEAD_URL } from "./types"; 4 4 import { extractProfileFields } from "./utils"; 5 + import { getOverrides } from "./moderation"; 5 6 6 7 // --- backfill: remove this block once at parity with Bluesky --- 7 8 ··· 29 30 30 31 // upsert all — fills in missing actors AND enriches existing ones 31 32 // (e.g. actors ingested via Jetstream that lack avatar/displayName) 33 + const overrides = await getOverrides(db, actors.map((a) => a.did)); 32 34 const stmts = actors.map((a) => { 33 - const f = extractProfileFields(a); 35 + const f = extractProfileFields(a, overrides.get(a.did) ?? null); 34 36 return db.prepare( 35 37 `INSERT INTO actors (did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at) 36 38 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, unixepoch())
+20 -14
src/enrichment.ts
··· 196 196 ); 197 197 if (f.avatarCid) enriched++; 198 198 } 199 - // PDS fallback for unreturned DIDs — check if bsky banned them 199 + // PDS fallback for unreturned DIDs 200 200 for (const r of batch) { 201 201 if (returned.has(r.did)) continue; 202 202 const override = overrides.get(r.did) ?? null; 203 - // if we have a PDS, check if this is a takedown (or has show override) 204 203 if (r.pds && (override === 'show' || !override)) { 205 - let isTakedown = override === 'show'; // trust override 206 - if (!isTakedown) { 207 - try { 208 - const probe = await fetch( 209 - `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(r.did)}` 210 - ); 211 - if (!probe.ok) { 212 - const err: any = await probe.json().catch(() => ({})); 213 - isTakedown = err.error === 'AccountTakedown'; 214 - } 215 - } catch {} 204 + // non-bsky PDS (e.g. tngl.sh): always try PDS fallback directly 205 + const isNonBsky = !r.pds.includes('.bsky.network'); 206 + let tryPds = isNonBsky; 207 + if (!tryPds) { 208 + // bsky PDS: check if this is a takedown 209 + if (override === 'show') { 210 + tryPds = true; 211 + } else { 212 + try { 213 + const probe = await fetch( 214 + `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(r.did)}` 215 + ); 216 + if (!probe.ok) { 217 + const err: any = await probe.json().catch(() => ({})); 218 + tryPds = err.error === 'AccountTakedown'; 219 + } 220 + } catch {} 221 + } 216 222 } 217 - if (isTakedown) { 223 + if (tryPds) { 218 224 const pdsProfile = await fetchProfileFromPds(r.did, r.pds); 219 225 if (pdsProfile) { 220 226 const f = extractProfileFields(pdsProfile, override);
+4 -4
src/handlers/search.ts
··· 75 75 const ftsQuery = `"${term}"*`; 76 76 const [exactRes, prefixRes, ftsRes] = await db.batch([ 77 77 db.prepare( 78 - `SELECT did, handle, display_name, avatar_url, labels, created_at, associated 78 + `SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 79 79 FROM actors WHERE handle = ?1 COLLATE NOCASE AND hidden = 0 LIMIT 1` 80 80 ).bind(term), 81 81 db.prepare( 82 - `SELECT did, handle, display_name, avatar_url, labels, created_at, associated 82 + `SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 83 83 FROM actors WHERE handle LIKE ?1 COLLATE NOCASE AND handle != ?2 COLLATE NOCASE AND hidden = 0 84 84 ORDER BY length(handle) LIMIT ?3` 85 85 ).bind(term + '%', term, limit), 86 86 db.prepare( 87 - `SELECT a.did, a.handle, a.display_name, a.avatar_url, a.labels, a.created_at, a.associated 87 + `SELECT a.did, a.handle, a.display_name, a.avatar_url, a.labels, a.created_at, a.associated, a.pds 88 88 FROM actors_fts 89 89 JOIN actors a ON a.rowid = actors_fts.rowid 90 90 WHERE actors_fts MATCH ?1 AND a.handle != '' AND a.hidden = 0 ··· 110 110 did: r.did, 111 111 handle: r.handle, 112 112 ...(r.display_name ? { displayName: r.display_name } : {}), 113 - ...(r.avatar_url ? { avatar: avatarUrl(r.did, r.avatar_url) } : {}), 113 + ...(r.avatar_url ? { avatar: avatarUrl(r.did, r.avatar_url, r.pds) } : {}), 114 114 ...(r.associated && r.associated !== '{}' ? { associated: JSON.parse(r.associated) } : {}), 115 115 labels: JSON.parse(r.labels || '[]'), 116 116 ...(r.created_at ? { createdAt: r.created_at } : {}),
+19 -6
src/index.ts
··· 7 7 import { refreshModeration } from "./cron"; 8 8 import { handleSearch } from "./handlers/search"; 9 9 import { handleIngest } from "./handlers/ingest"; 10 - import { handleDelete, handleCursor, handleRequestIndexing } from "./handlers/admin"; 10 + import { handleDelete, handleCursor, handleRequestIndexing, handleModOverrideSet, handleModOverrideDelete, handleModOverrideList } from "./handlers/admin"; 11 11 import { handleStats } from "./handlers/stats"; 12 12 import { indexPage } from "./pages/home"; 13 13 import { docsPage } from "./pages/docs"; ··· 48 48 } 49 49 50 50 if (pathname === "/request-indexing" && request.method === "POST") { 51 - const ip = clientIP(request); 52 - const { success } = await env.RATE_LIMITER.limit({ key: `index:${ip}` }); 53 - if (!success) { 54 - console.log(JSON.stringify({ event: "rate_limited", endpoint: "/request-indexing", ip })); 55 - return json({ error: "slow down — try again in a minute." }, 429); 51 + const isAdmin = request.headers.get("Authorization") === `Bearer ${env.ADMIN_SECRET}`; 52 + if (!isAdmin) { 53 + const ip = clientIP(request); 54 + const { success } = await env.RATE_LIMITER.limit({ key: `index:${ip}` }); 55 + if (!success) { 56 + console.log(JSON.stringify({ event: "rate_limited", endpoint: "/request-indexing", ip })); 57 + return json({ error: "slow down — try again in a minute." }, 429); 58 + } 56 59 } 57 60 return handleRequestIndexing(request, db, env); 58 61 } ··· 76 79 77 80 if (pathname === "/admin/delete" && request.method === "POST") { 78 81 return handleDelete(request, db, env, ctx); 82 + } 83 + 84 + if (pathname === "/admin/mod-override" && request.method === "POST") { 85 + return handleModOverrideSet(request, db, env); 86 + } 87 + if (pathname === "/admin/mod-override" && request.method === "DELETE") { 88 + return handleModOverrideDelete(request, db, env); 89 + } 90 + if (pathname === "/admin/mod-overrides" && request.method === "GET") { 91 + return handleModOverrideList(request, db, env); 79 92 } 80 93 81 94 return json({ error: "not found" }, 404);
+1
src/types.ts
··· 16 16 labels: string; 17 17 created_at: string; 18 18 associated: string; 19 + pds: string; 19 20 } 20 21 21 22 export interface IngestEvent {
+38 -32
src/utils.ts
··· 13 13 return Response.json(data, { status, headers: CORS_HEADERS }); 14 14 } 15 15 16 - export function avatarUrl(did: string, cidOrUrl: string): string { 16 + export function avatarUrl(did: string, cidOrUrl: string, pds?: string): string { 17 17 if (cidOrUrl.startsWith("https://")) return cidOrUrl; 18 + if (pds && !pds.includes('.bsky.network')) { 19 + return `${pds}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(cidOrUrl)}`; 20 + } 18 21 return `https://cdn.bsky.app/img/avatar/plain/${did}/${cidOrUrl}`; 19 22 } 20 23 21 - export function extractAvatarCid(url: string): string { 22 - const match = url.match(/\/([^/]+?)(?:@[a-z]+)?$/); 24 + export function extractAvatarCid(urlOrCid: string): string { 25 + if (!urlOrCid) return ''; 26 + // bare CID (from PDS fallback) — no slashes 27 + if (!urlOrCid.includes('/')) return urlOrCid; 28 + // PDS getBlob URL — extract cid param 29 + const cidParam = urlOrCid.match(/[?&]cid=([^&]+)/); 30 + if (cidParam) return decodeURIComponent(cidParam[1]); 31 + // bsky CDN URL — extract last path segment 32 + const match = urlOrCid.match(/\/([^/]+?)(?:@[a-z]+)?$/); 23 33 return match?.[1] ?? ''; 24 34 } 25 35 26 - /** fetch profile directly from an actor's PDS — fallback for banned/suspended accounts */ 36 + /** fetch profile directly from an actor's PDS — tries bsky profile first, then tangled. 37 + * returns bare CID for avatar (not full URL) — caller reconstructs at query time via pds column. */ 27 38 export async function fetchProfileFromPds(did: string, pds: string): Promise<any | null> { 28 - try { 29 - const res = await fetch( 30 - `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=app.bsky.actor.profile&rkey=self` 31 - ); 32 - if (!res.ok) return null; 33 - const data: any = await res.json(); 34 - const val = data?.value; 35 - if (!val) return null; 39 + const collections = ['app.bsky.actor.profile', 'sh.tangled.actor.profile']; 40 + for (const collection of collections) { 41 + try { 42 + const res = await fetch( 43 + `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=${collection}&rkey=self` 44 + ); 45 + if (!res.ok) continue; 46 + const data: any = await res.json(); 47 + const val = data?.value; 48 + if (!val) continue; 36 49 37 - let avatar = ''; 38 - const cid = val.avatar?.ref?.$link || val.avatar?.ref?.['$link']; 39 - if (cid) { 40 - avatar = `${pds}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(cid)}`; 50 + return { 51 + did, 52 + handle: '', 53 + displayName: val.displayName || '', 54 + avatar: val.avatar?.ref?.$link || val.avatar?.ref?.['$link'] || '', 55 + labels: [], 56 + createdAt: val.createdAt || '', 57 + associated: {}, 58 + }; 59 + } catch { 60 + continue; 41 61 } 42 - 43 - return { 44 - did, 45 - handle: '', 46 - displayName: val.displayName || '', 47 - avatar, 48 - labels: [], 49 - createdAt: val.createdAt || '', 50 - associated: {}, 51 - }; 52 - } catch { 53 - return null; 54 62 } 63 + return null; 55 64 } 56 65 57 66 /** strip zero/false fields from associated object to match bsky's typeahead shape */ ··· 82 91 if (override === 'show') hidden = 0; 83 92 if (override === 'hide') hidden = 1; 84 93 const raw = profile.avatar || ''; 85 - // PDS blob URLs are already full URLs — store as-is; CDN URLs get CID extracted 86 - const avatarCid = raw.startsWith('https://') && !raw.includes('cdn.bsky.app') 87 - ? raw 88 - : extractAvatarCid(raw); 94 + const avatarCid = extractAvatarCid(raw); 89 95 return { 90 96 handle: profile.handle || '', 91 97 displayName: profile.displayName || '',