GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

upgrade ingester from zig 0.15 to 0.16

major API migrations:
- std.Thread.Mutex → std.Io.Mutex (lockUncancelable/unlock with io)
- std.time.timestamp/milliTimestamp → Io.Timestamp.now helpers
- std.posix.getenv → std.c.getenv + mem.span
- std.heap.GeneralPurposeAllocator → std.heap.smp_allocator
- std.net.Address → std.Io.net.IpAddress
- Thread.Pool → Thread.spawn + detach
- std.fs.* → std.Io.Dir.* (createDirPath, openFileAbsolute, readStreaming)
- std.http.Server.init takes *Io.Reader/*Io.Writer via stream.interface
- ArrayList init .{} → .empty
- HttpTransport.init now takes (io, allocator) per zat v0.3.0-alpha.4
- Dockerfile: zig 0.15.2 → 0.16.0-dev.3059

deps: zat v0.3.0-alpha.4, logfire-zig zig-0.16 branch, otel-zig zig-0.16 fork

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+206 -216
+3 -3
ingester/Dockerfile
··· 7 7 xz-utils \ 8 8 && rm -rf /var/lib/apt/lists/* 9 9 10 - # install zig 0.15.2 11 - RUN curl -L https://ziglang.org/download/0.15.2/zig-x86_64-linux-0.15.2.tar.xz | tar -xJ -C /usr/local \ 12 - && ln -s /usr/local/zig-x86_64-linux-0.15.2/zig /usr/local/bin/zig 10 + # install zig 0.16.0-dev 11 + RUN curl -L https://ziglang.org/builds/zig-x86_64-linux-0.16.0-dev.3059+42e33db9d.tar.xz | tar -xJ -C /usr/local \ 12 + && ln -s /usr/local/zig-x86_64-linux-0.16.0-dev.3059+42e33db9d/zig /usr/local/bin/zig 13 13 14 14 WORKDIR /app 15 15 COPY build.zig build.zig.zon ./
+2 -3
ingester/build.zig
··· 26 26 .root_source_file = b.path("src/main.zig"), 27 27 .target = target, 28 28 .optimize = optimize, 29 + .link_libc = true, 29 30 .imports = &.{ 30 31 .{ .name = "zat", .module = zat.module("zat") }, 31 32 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, ··· 33 34 }, 34 35 }), 35 36 }); 36 - 37 - exe.linkLibC(); 38 37 39 38 b.installArtifact(exe); 40 39 ··· 54 53 .root_source_file = b.path("src/test_logfire.zig"), 55 54 .target = target, 56 55 .optimize = optimize, 56 + .link_libc = true, 57 57 .imports = &.{ 58 58 .{ .name = "logfire", .module = logfire.module("logfire") }, 59 59 }, 60 60 }), 61 61 }); 62 - test_lf.linkLibC(); 63 62 64 63 const run_test_lf = b.addRunArtifact(test_lf); 65 64 run_test_lf.step.dependOn(b.getInstallStep());
+5 -5
ingester/build.zig.zon
··· 2 2 .name = .typeahead_ingester, 3 3 .version = "0.0.1", 4 4 .fingerprint = 0xc3959a06c9697d7c, 5 - .minimum_zig_version = "0.15.0", 5 + .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.18", 9 - .hash = "zat-0.2.18-5PuC7iVfBQAV6IUlytvAWrpwrVUbwbLlAcSTvkXwncb_", 8 + .url = "https://tangled.sh/zat.dev/zat/archive/v0.3.0-alpha.4", 9 + .hash = "zat-0.3.0-alpha.4-5PuC7pNgBQCDMShv93KamG-leB6U1GOgqflbaX7Faf_1", 10 10 }, 11 11 .zqlite = .{ 12 12 .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 13 13 .hash = "zqlite-0.0.1-RWLaYz6bmAAT7E_jxopXf-j5Ea8VQldnxsd6TU8sa0Bb", 14 14 }, 15 15 .logfire = .{ 16 - .url = "https://tangled.org/zzstoatzz.io/logfire-zig/archive/main", 17 - .hash = "logfire_zig-0.1.0-x2yDLgdwAADOXAZLNQJ8FUH5v1vfFwe5CApJtQ7c_pZd", 16 + .url = "https://tangled.sh/zzstoatzz.io/logfire-zig/archive/1ed66749bb5910d07586f6aa8641682f6b154928", 17 + .hash = "logfire_zig-0.1.0-x2yDLr5wAADNrQCuIHDTBMgyrohjJkhSSXvJRuWU5SPv", 18 18 }, 19 19 }, 20 20 .paths = .{
+30 -8
ingester/src/db/LocalDb.zig
··· 2 2 //! Provides fast FTS5 queries while Turso remains source of truth 3 3 4 4 const std = @import("std"); 5 + const io = std.Options.debug_io; 5 6 const zqlite = @import("zqlite"); 6 7 const Allocator = std.mem.Allocator; 7 8 ··· 13 14 read_conn: ?zqlite.Conn = null, // separate read connection — never blocked by writes in WAL mode 14 15 allocator: Allocator, 15 16 is_ready: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 16 - mutex: std.Thread.Mutex = .{}, // protects write conn only 17 + mutex: std.Io.Mutex = std.Io.Mutex.init, // protects write conn only 17 18 path: []const u8 = "", 18 19 20 + /// column list for full actor row queries (sync + search) 21 + pub const actor_cols = "did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds"; 22 + 23 + /// named column indexes matching actor_cols order 24 + pub const Col = struct { 25 + pub const did: usize = 0; 26 + pub const handle: usize = 1; 27 + pub const display_name: usize = 2; 28 + pub const avatar_url: usize = 3; 29 + pub const hidden: usize = 4; 30 + pub const labels: usize = 5; 31 + pub const created_at: usize = 6; 32 + pub const associated: usize = 7; 33 + pub const pds: usize = 8; 34 + }; 35 + 36 + fn cGetenv(name: [*:0]const u8) ?[]const u8 { 37 + if (std.c.getenv(name)) |p| return std.mem.span(p); 38 + return null; 39 + } 40 + 19 41 pub fn init(allocator: Allocator) LocalDb { 20 42 return .{ .allocator = allocator }; 21 43 } ··· 52 74 } 53 75 54 76 pub fn open(self: *LocalDb) !void { 55 - const path_env = std.posix.getenv("LOCAL_DB_PATH") orelse "/data/local.db"; 77 + const path_env = cGetenv("LOCAL_DB_PATH") orelse "/data/local.db"; 56 78 self.path = path_env; 57 79 58 80 // ensure SQLite temp directory exists on the persistent volume 59 - const tmp_dir = std.posix.getenv("SQLITE_TMPDIR") orelse "/data/tmp"; 60 - std.fs.cwd().makePath(tmp_dir) catch |err| { 81 + const tmp_dir = cGetenv("SQLITE_TMPDIR") orelse "/data/tmp"; 82 + std.Io.Dir.createDirPath(.cwd(), io, tmp_dir) catch |err| { 61 83 log.warn("failed to create temp dir {s}: {}", .{ tmp_dir, err }); 62 84 }; 63 85 ··· 230 252 231 253 /// Execute a statement (INSERT, UPDATE, DELETE) — mutex-protected 232 254 pub fn exec(self: *LocalDb, comptime sql: []const u8, args: anytype) !void { 233 - self.mutex.lock(); 234 - defer self.mutex.unlock(); 255 + self.mutex.lockUncancelable(io); 256 + defer self.mutex.unlock(io); 235 257 236 258 const c = self.conn orelse return error.NotOpen; 237 259 c.exec(sql, args) catch |e| { ··· 246 268 } 247 269 248 270 pub fn lock(self: *LocalDb) void { 249 - self.mutex.lock(); 271 + self.mutex.lockUncancelable(io); 250 272 } 251 273 252 274 pub fn unlock(self: *LocalDb) void { 253 - self.mutex.unlock(); 275 + self.mutex.unlock(io); 254 276 } 255 277 256 278 /// Get cached actor count from sync_meta (written during sync, avoids full table scan)
+13 -7
ingester/src/db/TursoClient.zig
··· 2 2 //! Uses hrana v2 pipeline protocol 3 3 4 4 const std = @import("std"); 5 + const io = std.Options.debug_io; 5 6 const http = std.http; 6 7 const json = std.json; 7 8 const mem = std.mem; ··· 9 10 10 11 const log = std.log.scoped(.turso); 11 12 13 + fn cGetenv(name: [*:0]const u8) ?[]const u8 { 14 + if (std.c.getenv(name)) |p| return mem.span(p); 15 + return null; 16 + } 17 + 12 18 const TursoClient = @This(); 13 19 14 20 const Value = struct { type: []const u8 = "text", value: []const u8 }; ··· 20 26 url: []const u8, // host only (no protocol prefix) 21 27 token: []const u8, 22 28 http_client: http.Client, 23 - mutex: std.Thread.Mutex = .{}, 29 + mutex: std.Io.Mutex = std.Io.Mutex.init, 24 30 25 31 pub fn init(allocator: Allocator) !TursoClient { 26 - const url = std.posix.getenv("TURSO_URL") orelse { 32 + const url = cGetenv("TURSO_URL") orelse { 27 33 log.err("TURSO_URL not set", .{}); 28 34 return error.MissingEnv; 29 35 }; 30 - const token = std.posix.getenv("TURSO_AUTH_TOKEN") orelse { 36 + const token = cGetenv("TURSO_AUTH_TOKEN") orelse { 31 37 log.err("TURSO_AUTH_TOKEN not set", .{}); 32 38 return error.MissingEnv; 33 39 }; ··· 44 50 .allocator = allocator, 45 51 .url = host, 46 52 .token = token, 47 - .http_client = .{ .allocator = allocator }, 53 + .http_client = .{ .allocator = allocator, .io = io }, 48 54 }; 49 55 } 50 56 ··· 84 90 } 85 91 86 92 fn executeRaw(self: *TursoClient, sql: []const u8, args: []const []const u8) ![]const u8 { 87 - self.mutex.lock(); 88 - defer self.mutex.unlock(); 93 + self.mutex.lockUncancelable(io); 94 + defer self.mutex.unlock(io); 89 95 90 96 var url_buf: [512]u8 = undefined; 91 97 const url = std.fmt.bufPrint(&url_buf, "https://{s}/v2/pipeline", .{self.url}) catch ··· 165 171 return .{ .allocator = allocator, .parsed = parsed, .rows = &.{} }; 166 172 }; 167 173 168 - var rows: std.ArrayList(Row) = .{}; 174 + var rows: std.ArrayList(Row) = .empty; 169 175 errdefer rows.deinit(allocator); 170 176 171 177 for (json_rows.items) |item| {
+65 -109
ingester/src/db/sync.zig
··· 2 2 //! Full sync on startup, incremental sync every 5 minutes 3 3 4 4 const std = @import("std"); 5 + const io = std.Options.debug_io; 5 6 const zqlite = @import("zqlite"); 6 7 const logfire = @import("logfire"); 7 8 const Allocator = std.mem.Allocator; 8 9 const TursoClient = @import("TursoClient.zig"); 9 10 const LocalDb = @import("LocalDb.zig"); 11 + const Col = LocalDb.Col; 10 12 11 13 const log = std.log.scoped(.sync); 12 14 15 + fn timestamp() i64 { 16 + return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 17 + } 18 + 19 + fn milliTimestamp() i64 { 20 + return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_ms)); 21 + } 22 + 23 + /// index of the first column appended after actor_cols (rowid or updated_at) 24 + const trailing_col: usize = Col.pds + 1; 25 + 13 26 const BATCH_SIZE = 2000; 14 27 const SYNC_INTERVAL_S = 300; // 5 minutes 15 28 const TOMBSTONE_RETENTION_S = 7 * 24 * 3600; // 7 days — must match cron.ts pruning ··· 61 74 const span = logfire.span("sync.bootstrap", .{}); 62 75 defer span.end(); 63 76 64 - const total_t0 = std.time.milliTimestamp(); 77 + const total_t0 = milliTimestamp(); 65 78 local.setReady(false); 66 79 67 80 // close read connection during bootstrap — not serving traffic, ··· 110 123 var error_count: usize = 0; 111 124 var last_rowid: i64 = 0; 112 125 var had_turso_error = false; 113 - const load_t0 = std.time.milliTimestamp(); 126 + const load_t0 = milliTimestamp(); 114 127 115 128 while (true) { 116 129 var rowid_buf: [20]u8 = undefined; 117 130 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 118 131 119 - const t0 = std.time.milliTimestamp(); 132 + const t0 = milliTimestamp(); 120 133 121 134 // fetch from turso (no lock held) 122 135 var result = turso.query( 123 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, rowid 124 - \\FROM actors WHERE handle != '' AND rowid > ? 125 - \\ORDER BY rowid LIMIT 2000 126 - , &.{rowid_str}) catch |err| { 136 + "SELECT " ++ LocalDb.actor_cols ++ ", rowid FROM actors WHERE handle != '' AND rowid > ? ORDER BY rowid LIMIT 2000", 137 + &.{rowid_str}, 138 + ) catch |err| { 127 139 log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 128 140 had_turso_error = true; 129 141 break; 130 142 }; 131 143 defer result.deinit(); 132 144 133 - const t1 = std.time.milliTimestamp(); 145 + const t1 = milliTimestamp(); 134 146 135 147 if (result.rows.len == 0) break; 136 148 ··· 140 152 defer local.unlock(); 141 153 conn.exec("BEGIN", .{}) catch {}; 142 154 for (result.rows) |row| { 143 - insertActorStage(conn, row) catch |err| { 155 + insertActorRow(conn, "INSERT INTO actors_stage", row) catch |err| { 144 156 log.err("insert actor failed: {}", .{err}); 145 157 error_count += 1; 146 158 continue; ··· 150 162 conn.exec("COMMIT", .{}) catch {}; 151 163 } 152 164 153 - const t2 = std.time.milliTimestamp(); 165 + const t2 = milliTimestamp(); 154 166 155 - // advance cursor to last row's rowid (column 8) 156 - last_rowid = result.rows[result.rows.len - 1].int(9); 167 + last_rowid = result.rows[result.rows.len - 1].int(trailing_col); 157 168 158 169 // high-frequency batch progress — keep on stderr only 159 170 log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ ··· 176 187 return; 177 188 } 178 189 179 - const load_ms = std.time.milliTimestamp() - load_t0; 190 + const load_ms = milliTimestamp() - load_t0; 180 191 181 192 // checkpoint WAL before heavy DDL — clean slate for index builds 182 193 { ··· 186 197 } 187 198 188 199 // build indexes on staging table 189 - const idx_did_t0 = std.time.milliTimestamp(); 200 + const idx_did_t0 = milliTimestamp(); 190 201 { 191 202 local.lock(); 192 203 defer local.unlock(); ··· 196 207 return err; 197 208 }; 198 209 } 199 - const idx_did_ms = std.time.milliTimestamp() - idx_did_t0; 210 + const idx_did_ms = milliTimestamp() - idx_did_t0; 200 211 201 - const idx_handle_t0 = std.time.milliTimestamp(); 212 + const idx_handle_t0 = milliTimestamp(); 202 213 { 203 214 local.lock(); 204 215 defer local.unlock(); ··· 208 219 return err; 209 220 }; 210 221 } 211 - const idx_handle_ms = std.time.milliTimestamp() - idx_handle_t0; 222 + const idx_handle_ms = milliTimestamp() - idx_handle_t0; 212 223 213 224 // swap actors table first, then build FTS with final name. 214 225 { ··· 229 240 } 230 241 231 242 // build FTS with its final name — no rename needed 232 - const fts_create_t0 = std.time.milliTimestamp(); 243 + const fts_create_t0 = milliTimestamp(); 233 244 { 234 245 local.lock(); 235 246 defer local.unlock(); ··· 243 254 return err; 244 255 }; 245 256 } 246 - const fts_create_ms = std.time.milliTimestamp() - fts_create_t0; 257 + const fts_create_ms = milliTimestamp() - fts_create_t0; 247 258 248 - const fts_pop_t0 = std.time.milliTimestamp(); 259 + const fts_pop_t0 = milliTimestamp(); 249 260 { 250 261 local.lock(); 251 262 defer local.unlock(); ··· 257 268 return err; 258 269 }; 259 270 } 260 - const fts_pop_ms = std.time.milliTimestamp() - fts_pop_t0; 271 + const fts_pop_ms = milliTimestamp() - fts_pop_t0; 261 272 262 273 // post-swap: optimize, record completion + actor count, checkpoint 263 274 { ··· 266 277 conn.exec("PRAGMA optimize", .{}) catch {}; 267 278 268 279 var ts_buf: [20]u8 = undefined; 269 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 280 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0"; 270 281 conn.exec( 271 282 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 272 283 .{ts_str}, ··· 286 297 conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 287 298 } 288 299 289 - const total_ms = std.time.milliTimestamp() - total_t0; 300 + const total_ms = milliTimestamp() - total_t0; 290 301 291 302 // reopen read connection before serving traffic 292 303 local.reopenReadConn() catch {}; ··· 321 332 const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 322 333 323 334 const q_span = logfire.span("sync.query.resync_batch", .{}); 324 - const t0 = std.time.milliTimestamp(); 335 + const t0 = milliTimestamp(); 325 336 326 337 var result = turso.query( 327 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, rowid 328 - \\FROM actors WHERE handle != '' AND rowid > ? 329 - \\ORDER BY rowid LIMIT 2000 330 - , &.{rowid_str}) catch |err| { 338 + "SELECT " ++ LocalDb.actor_cols ++ ", rowid FROM actors WHERE handle != '' AND rowid > ? ORDER BY rowid LIMIT 2000", 339 + &.{rowid_str}, 340 + ) catch |err| { 331 341 log.err("resync: turso query failed at rowid {d}: {}", .{ last_rowid, err }); 332 342 q_span.setStatus(.@"error", "query failed"); 333 343 q_span.end(); ··· 336 346 }; 337 347 defer result.deinit(); 338 348 339 - const t1 = std.time.milliTimestamp(); 349 + const t1 = milliTimestamp(); 340 350 q_span.setAttribute("rows", @as(i64, @intCast(result.rows.len))); 341 351 q_span.setAttribute("fetch_ms", t1 - t0); 342 352 q_span.setStatus(.ok, null); ··· 352 362 defer local.unlock(); 353 363 conn.exec("BEGIN", .{}) catch {}; 354 364 for (result.rows) |row| { 355 - insertActorOnly(conn, row) catch |err| { 365 + insertActorRow(conn, "INSERT OR REPLACE INTO actors", row) catch |err| { 356 366 log.err("resync: insert actor failed: {}", .{err}); 357 367 error_count += 1; 358 368 continue; ··· 365 375 a_span.setStatus(.ok, null); 366 376 } 367 377 368 - const t2 = std.time.milliTimestamp(); 369 - last_rowid = result.rows[result.rows.len - 1].int(9); 378 + const t2 = milliTimestamp(); 379 + last_rowid = result.rows[result.rows.len - 1].int(trailing_col); 370 380 batch_num += 1; 371 381 372 382 log.info("resync: batch {d}: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ ··· 416 426 local.lock(); 417 427 defer local.unlock(); 418 428 var ts_buf: [20]u8 = undefined; 419 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 429 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0"; 420 430 conn.exec( 421 431 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 422 432 .{ts_str}, ··· 488 498 } 489 499 490 500 // if last sync is older than tombstone retention, local data is too stale 491 - const now = std.time.timestamp(); 501 + const now = timestamp(); 492 502 if (now - last_sync_ts > TOMBSTONE_RETENTION_S) { 493 503 log.info("incremental: stale ({d}s old), falling back to full sync with wipe", .{now - last_sync_ts}); 494 504 span.setAttribute("fallback", "stale_wipe"); ··· 522 532 &.{cursor_ts}; 523 533 524 534 const sql = if (cursor_did_len > 0) 525 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, updated_at 526 - \\FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) 527 - \\ORDER BY updated_at, did LIMIT 2000 535 + "SELECT " ++ LocalDb.actor_cols ++ ", updated_at FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) ORDER BY updated_at, did LIMIT 2000" 528 536 else 529 - \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds, updated_at 530 - \\FROM actors WHERE handle != '' AND updated_at > ?1 531 - \\ORDER BY updated_at, did LIMIT 2000 537 + "SELECT " ++ LocalDb.actor_cols ++ ", updated_at FROM actors WHERE handle != '' AND updated_at > ?1 ORDER BY updated_at, did LIMIT 2000" 532 538 ; 533 539 534 540 log.info("incremental: querying updated actors (cursor_ts={s})", .{cursor_ts}); ··· 558 564 defer local.unlock(); 559 565 conn.exec("BEGIN", .{}) catch {}; 560 566 for (result.rows) |row| { 561 - upsertActorLocal(conn, row) catch { 567 + insertActorRow(conn, "INSERT OR REPLACE INTO actors", row) catch { 562 568 error_count += 1; 563 569 continue; 564 570 }; ··· 570 576 a_span.setStatus(.ok, null); 571 577 } 572 578 573 - // advance cursor to last row's (updated_at, did) 574 579 const last_row = result.rows[result.rows.len - 1]; 575 - const last_ua_int = last_row.int(9); // updated_at is integer 576 - const last_did = last_row.text(0); 580 + const last_ua_int = last_row.int(trailing_col); 581 + const last_did = last_row.text(Col.did); 577 582 578 583 // serialize updated_at integer to string for next query 579 584 const ua_str = std.fmt.bufPrint(&cursor_ts_buf, "{d}", .{last_ua_int}) catch break; ··· 725 730 local.lock(); 726 731 defer local.unlock(); 727 732 var ts_buf: [20]u8 = undefined; 728 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 733 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{timestamp()}) catch "0"; 729 734 conn.exec( 730 735 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 731 736 .{ts_str}, ··· 764 769 765 770 // periodic incremental sync 766 771 while (true) { 767 - std.Thread.sleep(SYNC_INTERVAL_S * std.time.ns_per_s); 772 + io.sleep(.{ .nanoseconds = SYNC_INTERVAL_S * std.time.ns_per_s }, .real) catch {}; 768 773 769 774 incrementalSync(&turso, local) catch |err| { 770 775 log.err("periodic sync failed: {}", .{err}); ··· 772 777 } 773 778 } 774 779 775 - /// Insert actor row into staging table (no indexes, no FTS — used during bootstrap) 776 - fn insertActorStage(conn: zqlite.Conn, row: anytype) !void { 777 - conn.exec( 778 - \\INSERT INTO actors_stage 779 - \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds) 780 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 781 - , .{ 782 - row.text(0), // did 783 - row.text(1), // handle 784 - row.text(2), // display_name 785 - row.text(3), // avatar_url 786 - row.int(4), // hidden 787 - row.text(5), // labels 788 - row.text(6), // created_at 789 - row.text(7), // associated 790 - row.text(8), // pds 791 - }) catch |err| { 792 - return err; 793 - }; 794 - } 795 - 796 - /// Insert actor row without FTS update (used during re-sync on live table) 797 - fn insertActorOnly(conn: zqlite.Conn, row: anytype) !void { 798 - conn.exec( 799 - \\INSERT OR REPLACE INTO actors 800 - \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds) 801 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 802 - , .{ 803 - row.text(0), // did 804 - row.text(1), // handle 805 - row.text(2), // display_name 806 - row.text(3), // avatar_url 807 - row.int(4), // hidden 808 - row.text(5), // labels 809 - row.text(6), // created_at 810 - row.text(7), // associated 811 - row.text(8), // pds 812 - }) catch |err| { 813 - return err; 814 - }; 815 - } 816 - 817 - /// Upsert a turso row into local SQLite (actors table only, no FTS). 818 - /// FTS is rebuilt in bulk at sync boundaries — per-row FTS deletes are O(N) scans 819 - /// because `did` is UNINDEXED in the FTS5 table. 820 - fn upsertActorLocal(conn: zqlite.Conn, row: anytype) !void { 821 - conn.exec( 822 - \\INSERT OR REPLACE INTO actors 823 - \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds) 824 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 825 - , .{ 826 - row.text(0), // did 827 - row.text(1), // handle 828 - row.text(2), // display_name 829 - row.text(3), // avatar_url 830 - row.int(4), // hidden 831 - row.text(5), // labels 832 - row.text(6), // created_at 833 - row.text(7), // associated 834 - row.text(8), // pds 835 - }) catch |err| { 836 - return err; 837 - }; 780 + /// Insert actor row into a table. `prefix` is the INSERT clause, e.g. 781 + /// "INSERT INTO actors_stage" or "INSERT OR REPLACE INTO actors". 782 + fn insertActorRow(conn: zqlite.Conn, comptime prefix: []const u8, row: anytype) !void { 783 + conn.exec(prefix ++ " (" ++ LocalDb.actor_cols ++ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", .{ 784 + row.text(Col.did), 785 + row.text(Col.handle), 786 + row.text(Col.display_name), 787 + row.text(Col.avatar_url), 788 + row.int(Col.hidden), 789 + row.text(Col.labels), 790 + row.text(Col.created_at), 791 + row.text(Col.associated), 792 + row.text(Col.pds), 793 + }) catch |err| return err; 838 794 } 839 795 840 796 /// Update cached actor count in sync_meta (avoids COUNT(*) on health checks)
+24 -14
ingester/src/ingest.zig
··· 2 2 //! Dual-writes profile/identity events to local SQLite for immediate search 3 3 4 4 const std = @import("std"); 5 + const io = std.Options.debug_io; 5 6 const mem = std.mem; 6 7 const json = std.json; 7 8 const Allocator = mem.Allocator; ··· 11 12 const BloomFilter = @import("bloom.zig").BloomFilter; 12 13 13 14 const log = std.log.scoped(.ingester); 15 + 16 + fn cGetenv(name: [*:0]const u8) ?[]const u8 { 17 + if (std.c.getenv(name)) |p| return mem.span(p); 18 + return null; 19 + } 20 + 21 + fn timestamp() i64 { 22 + return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 23 + } 14 24 15 25 pub const MAX_BATCH: usize = 100; 16 26 const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed ··· 23 33 24 34 pub fn getConfig() Config { 25 35 return .{ 26 - .worker_url = std.posix.getenv("TYPEAHEAD_URL") orelse 36 + .worker_url = cGetenv("TYPEAHEAD_URL") orelse 27 37 @panic("TYPEAHEAD_URL not set"), 28 - .secret = std.posix.getenv("TYPEAHEAD_SECRET") orelse 38 + .secret = cGetenv("TYPEAHEAD_SECRET") orelse 29 39 @panic("TYPEAHEAD_SECRET not set"), 30 40 }; 31 41 } ··· 62 72 .config = config, 63 73 .transport = transport, 64 74 .local_db = local_db, 65 - .buffer = .{}, 66 - .delete_buffer = .{}, 75 + .buffer = .empty, 76 + .delete_buffer = .empty, 67 77 .arena = std.heap.ArenaAllocator.init(allocator), 68 78 .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES), 69 - .last_flush = std.time.timestamp(), 79 + .last_flush = timestamp(), 70 80 }; 71 81 } 72 82 ··· 95 105 96 106 self.pending_cursor = event.timeUs(); 97 107 98 - const now = std.time.timestamp(); 108 + const now = timestamp(); 99 109 // drop incoming events if buffers are backed up (persistent failure) 100 110 const MAX_BACKLOG = MAX_BATCH * 10; 101 111 if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) { ··· 193 203 // dual-write: delete from local (skip if sync holds the lock) 194 204 if (self.local_db) |db| { 195 205 if (db.mutex.tryLock()) { 196 - defer db.mutex.unlock(); 206 + defer db.mutex.unlock(io); 197 207 if (db.getConn()) |c| { 198 208 c.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {}; 199 209 } ··· 214 224 const conn = db.getConn() orelse return; 215 225 216 226 if (!db.mutex.tryLock()) return; // sync thread has the lock, skip 217 - defer db.mutex.unlock(); 227 + defer db.mutex.unlock(io); 218 228 219 229 // only write if we have searchable data (handle or display_name) 220 230 const has_handle = event.handle != null and event.handle.?.len > 0; ··· 238 248 } 239 249 240 250 fn flush(self: *IngestHandler) void { 241 - self.last_flush = std.time.timestamp(); 251 + self.last_flush = timestamp(); 242 252 var any_failure = false; 243 253 244 254 if (self.buffer.items.len > 0) { ··· 303 313 } 304 314 305 315 if (any_failure) { 306 - self.retry_after = std.time.timestamp() + 5; 316 + self.retry_after = timestamp() + 5; 307 317 } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) { 308 318 self.flushed_cursor = self.pending_cursor; 309 319 _ = self.arena.reset(.retain_capacity); ··· 399 409 for (backoff, 0..) |delay, attempt| { 400 410 if (fetchCursorOnce(transport, config)) |cursor| return cursor; 401 411 log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 402 - std.Thread.sleep(delay * std.time.ns_per_s); 412 + io.sleep(.{ .nanoseconds = delay * std.time.ns_per_s }, .real) catch {}; 403 413 } 404 414 return null; 405 415 } ··· 407 417 // -- utilities -- 408 418 409 419 pub fn getRssKB() u64 { 410 - const f = std.fs.openFileAbsolute("/proc/self/statm", .{}) catch return 0; 411 - defer f.close(); 420 + const f = std.Io.Dir.openFileAbsolute(io, "/proc/self/statm", .{}) catch return 0; 421 + defer f.close(io); 412 422 var buf: [128]u8 = undefined; 413 - const n = f.read(&buf) catch return 0; 423 + const n = f.readStreaming(io, &.{&buf}) catch return 0; 414 424 var it = std.mem.splitScalar(u8, buf[0..n], ' '); 415 425 _ = it.next(); // size 416 426 const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0;
+35 -38
ingester/src/main.zig
··· 1 1 const std = @import("std"); 2 2 const mem = std.mem; 3 - const net = std.net; 4 - const posix = std.posix; 3 + const io = std.Options.debug_io; 5 4 const Allocator = mem.Allocator; 6 - const Thread = std.Thread; 7 5 const zat = @import("zat"); 8 6 const logfire = @import("logfire"); 9 7 const HttpTransport = zat.HttpTransport; ··· 14 12 15 13 const log = std.log.scoped(.ingester); 16 14 17 - const MAX_HTTP_WORKERS = 8; 18 15 const SOCKET_TIMEOUT_SECS = 5; 16 + 17 + fn cGetenv(name: [*:0]const u8) ?[]const u8 { 18 + if (std.c.getenv(name)) |p| return mem.span(p); 19 + return null; 20 + } 19 21 20 22 const JetstreamArgs = struct { 21 23 allocator: Allocator, ··· 51 53 client.subscribe(&handler); 52 54 } 53 55 54 - fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 55 - const timeout = mem.toBytes(posix.timeval{ 56 + fn setSocketTimeout(fd: std.posix.fd_t, secs: u32) !void { 57 + const timeout = mem.toBytes(std.posix.timeval{ 56 58 .sec = @intCast(secs), 57 59 .usec = 0, 58 60 }); 59 - try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 60 - try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 61 + try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout); 62 + try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout); 61 63 } 62 64 63 65 pub fn main() !void { 64 - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 65 - defer _ = gpa.deinit(); 66 - const allocator = gpa.allocator(); 66 + const allocator = std.heap.smp_allocator; 67 67 68 68 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 69 69 _ = logfire.configure(.{ 70 70 .service_name = "typeahead-ingester", 71 71 .service_version = "0.1.0", 72 - .environment = posix.getenv("FLY_APP_NAME") orelse "development", 72 + .environment = cGetenv("FLY_APP_NAME") orelse "development", 73 73 }) catch |err| { 74 74 log.err("logfire configure failed: {}", .{err}); 75 75 }; ··· 90 90 91 91 // start HTTP server FIRST so Fly proxy doesn't timeout 92 92 const port: u16 = blk: { 93 - const port_str = posix.getenv("PORT") orelse "8080"; 93 + const port_str = cGetenv("PORT") orelse "8080"; 94 94 break :blk std.fmt.parseInt(u16, port_str, 10) catch 8080; 95 95 }; 96 96 97 - const address = try net.Address.parseIp("0.0.0.0", port); 98 - var listener = try address.listen(.{ .reuse_address = true }); 99 - defer listener.deinit(); 97 + var addr = std.Io.net.IpAddress{ .ip4 = .{ .bytes = .{ 0, 0, 0, 0 }, .port = port } }; 98 + var srv = std.Io.net.IpAddress.listen(&addr, io, .{ .reuse_address = true }) catch |err| { 99 + log.err("listen failed: {}", .{err}); 100 + return err; 101 + }; 102 + defer srv.deinit(io); 100 103 101 104 log.info("listening on port {d}", .{port}); 102 105 103 - // init thread pool for HTTP connections 104 - var pool: Thread.Pool = undefined; 105 - try pool.init(.{ 106 - .allocator = allocator, 107 - .n_jobs = MAX_HTTP_WORKERS, 108 - }); 109 - defer pool.deinit(); 110 - 111 106 // start sync thread (background — turso → local SQLite) 112 107 if (local_db_ptr) |db| { 113 - const sync_thread = try Thread.spawn(.{}, sync.syncLoop, .{ allocator, db }); 108 + const sync_thread = try std.Thread.spawn(.{}, sync.syncLoop, .{ allocator, db }); 114 109 sync_thread.detach(); 115 110 } 116 111 117 112 // start jetstream thread (background — ingestion) 118 - var transport = HttpTransport.init(allocator); 113 + var transport = HttpTransport.init(io, allocator); 119 114 transport.keep_alive = false; 120 115 defer transport.deinit(); 121 116 122 - const js_thread = try Thread.spawn(.{}, jetstreamThread, .{JetstreamArgs{ 117 + const js_thread = try std.Thread.spawn(.{}, jetstreamThread, .{JetstreamArgs{ 123 118 .allocator = allocator, 124 119 .config = config, 125 120 .transport = &transport, ··· 129 124 130 125 // main thread: HTTP accept loop 131 126 while (true) { 132 - const conn = listener.accept() catch |err| { 127 + const stream = srv.accept(io) catch |err| { 133 128 log.err("accept error: {}", .{err}); 134 129 continue; 135 130 }; 136 131 137 - setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 132 + setSocketTimeout(stream.socket.handle, SOCKET_TIMEOUT_SECS) catch |err| { 138 133 log.warn("failed to set socket timeout: {}", .{err}); 139 134 }; 140 135 141 136 if (local_db_ptr) |db| { 142 - pool.spawn(server.handleConnection, .{ conn, db }) catch |err| { 143 - log.err("pool spawn error: {}", .{err}); 144 - conn.stream.close(); 137 + const handler_thread = std.Thread.spawn(.{}, server.handleConnection, .{ stream, db }) catch |err| { 138 + log.err("thread spawn error: {}", .{err}); 139 + stream.close(io); 140 + continue; 145 141 }; 142 + handler_thread.detach(); 146 143 } else { 147 144 // no local DB — just serve health 148 145 var read_buffer: [1024]u8 = undefined; 149 146 var write_buffer: [1024]u8 = undefined; 150 - var reader = conn.stream.reader(&read_buffer); 151 - var writer = conn.stream.writer(&write_buffer); 152 - var srv = std.http.Server.init(reader.interface(), &writer.interface); 153 - var request = srv.receiveHead() catch { 154 - conn.stream.close(); 147 + var reader = stream.reader(io, &read_buffer); 148 + var writer = stream.writer(io, &write_buffer); 149 + var http_srv = std.http.Server.init(&reader.interface, &writer.interface); 150 + var request = http_srv.receiveHead() catch { 151 + stream.close(io); 155 152 continue; 156 153 }; 157 154 request.respond("{\"error\":\"search unavailable\"}", .{ ··· 160 157 .{ .name = "content-type", .value = "application/json" }, 161 158 }, 162 159 }) catch {}; 163 - conn.stream.close(); 160 + stream.close(io); 164 161 } 165 162 } 166 163 }
+23 -22
ingester/src/search.zig
··· 6 6 const mem = std.mem; 7 7 const Allocator = mem.Allocator; 8 8 const LocalDb = @import("db/LocalDb.zig"); 9 + const Col = LocalDb.Col; 9 10 10 11 const log = std.log.scoped(.search); 11 12 ··· 42 43 // 1. exact handle match 43 44 { 44 45 var rows = local.query( 45 - \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 46 - \\FROM actors WHERE handle = ? COLLATE NOCASE AND hidden = 0 LIMIT 1 47 - , .{term}) catch |err| blk: { 46 + "SELECT " ++ LocalDb.actor_cols ++ " FROM actors WHERE handle = ? COLLATE NOCASE AND hidden = 0 LIMIT 1", 47 + .{term}, 48 + ) catch |err| blk: { 48 49 log.err("exact query failed: {}", .{err}); 49 50 break :blk null; 50 51 }; ··· 53 54 if (r.next()) |row| { 54 55 if (emitted < limit) { 55 56 try writeActorFromRow(&jw, row); 56 - if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(0))) {} 57 + if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(Col.did))) {} 57 58 emitted += 1; 58 59 } 59 60 } ··· 135 136 for (prefix_dids[0..prefix_count]) |did| { 136 137 if (emitted >= limit) break; 137 138 var rows = local.query( 138 - \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 139 - \\FROM actors WHERE did = ? 140 - , .{did}) catch continue; 139 + "SELECT " ++ LocalDb.actor_cols ++ " FROM actors WHERE did = ?", 140 + .{did}, 141 + ) catch continue; 141 142 defer rows.deinit(); 142 143 if (rows.next()) |row| { 143 144 try writeActorFromRow(&jw, row); 144 - if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(0))) {} 145 + if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(Col.did))) {} 145 146 emitted += 1; 146 147 } 147 148 } ··· 186 187 for (candidate_dids[0..candidate_count]) |did| { 187 188 if (emitted >= limit) break; 188 189 var rows = local.query( 189 - \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated, pds 190 - \\FROM actors WHERE did = ? AND handle != '' AND hidden = 0 191 - , .{did}) catch continue; 190 + "SELECT " ++ LocalDb.actor_cols ++ " FROM actors WHERE did = ? AND handle != '' AND hidden = 0", 191 + .{did}, 192 + ) catch continue; 192 193 defer rows.deinit(); 193 194 if (rows.next()) |row| { 194 - if (!isDuplicate(seen_dids[0..seen_count], row.text(0))) { 195 + if (!isDuplicate(seen_dids[0..seen_count], row.text(Col.did))) { 195 196 try writeActorFromRow(&jw, row); 196 - if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(0))) {} 197 + if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(Col.did))) {} 197 198 emitted += 1; 198 199 } 199 200 } ··· 206 207 207 208 /// Write a single actor object from a live SQLite row. 208 209 /// Must be called while the row's statement is still alive. 209 - /// Row columns: did, handle, display_name, avatar_url, labels, created_at, associated, pds 210 + /// Row uses LocalDb.actor_cols column order. 210 211 fn writeActorFromRow(jw: anytype, row: LocalDb.Row) !void { 211 - const did = row.text(0); 212 - const handle = row.text(1); 213 - const display_name = row.text(2); 214 - const avatar_url = row.text(3); 215 - const labels = row.text(4); 216 - const created_at = row.text(5); 217 - const associated = row.text(6); 218 - const pds = row.text(7); 212 + const did = row.text(Col.did); 213 + const handle = row.text(Col.handle); 214 + const display_name = row.text(Col.display_name); 215 + const avatar_url = row.text(Col.avatar_url); 216 + const labels = row.text(Col.labels); 217 + const created_at = row.text(Col.created_at); 218 + const associated = row.text(Col.associated); 219 + const pds = row.text(Col.pds); 219 220 220 221 try jw.beginObject(); 221 222
+6 -7
ingester/src/server.zig
··· 3 3 4 4 const std = @import("std"); 5 5 const http = std.http; 6 - const net = std.net; 7 6 const mem = std.mem; 8 7 const json = std.json; 8 + const io = std.Options.debug_io; 9 9 const logfire = @import("logfire"); 10 10 const LocalDb = @import("db/LocalDb.zig"); 11 11 const search = @import("search.zig"); ··· 15 15 16 16 const HTTP_BUF_SIZE = 65536; 17 17 18 - pub fn handleConnection(conn: net.Server.Connection, local_db: *LocalDb) void { 19 - defer conn.stream.close(); 18 + pub fn handleConnection(stream: std.Io.net.Stream, local_db: *LocalDb) void { 19 + defer stream.close(io); 20 20 21 21 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined; 22 22 var write_buffer: [HTTP_BUF_SIZE]u8 = undefined; 23 23 24 - var reader = conn.stream.reader(&read_buffer); 25 - var writer = conn.stream.writer(&write_buffer); 24 + var reader = stream.reader(io, &read_buffer); 25 + var writer = stream.writer(io, &write_buffer); 26 26 27 - var server = http.Server.init(reader.interface(), &writer.interface); 27 + var server = http.Server.init(&reader.interface, &writer.interface); 28 28 29 29 while (true) { 30 30 var request = server.receiveHead() catch |err| { ··· 266 266 try jw.write(null); 267 267 } 268 268 } 269 -