GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor ingester: split main.zig into focused modules

extract bloom filter, ingest handler, and search into separate files.
main.zig is now just the entrypoint + thread orchestration (617→155 lines).
also includes two-phase search fix for FTS5 and LIKE prefix queries,
eliminating pathological full-scan sorts on broad prefixes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+2378 -448
+9
ingester/build.zig
··· 9 9 .optimize = optimize, 10 10 }); 11 11 12 + const zqlite = b.dependency("zqlite", .{ 13 + .target = target, 14 + .optimize = optimize, 15 + .sqlite3 = &[_][]const u8{ "-std=c99", "-DSQLITE_ENABLE_FTS5" }, 16 + }); 17 + 12 18 const exe = b.addExecutable(.{ 13 19 .name = "typeahead-ingester", 14 20 .root_module = b.createModule(.{ ··· 17 23 .optimize = optimize, 18 24 .imports = &.{ 19 25 .{ .name = "zat", .module = zat.module("zat") }, 26 + .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 20 27 }, 21 28 }), 22 29 }); 30 + 31 + exe.linkLibC(); 23 32 24 33 b.installArtifact(exe); 25 34
+4
ingester/build.zig.zon
··· 8 8 .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.18", 9 9 .hash = "zat-0.2.18-5PuC7iVfBQAV6IUlytvAWrpwrVUbwbLlAcSTvkXwncb_", 10 10 }, 11 + .zqlite = .{ 12 + .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 13 + .hash = "zqlite-0.0.1-RWLaYz6bmAAT7E_jxopXf-j5Ea8VQldnxsd6TU8sa0Bb", 14 + }, 11 15 }, 12 16 .paths = .{ 13 17 "build.zig",
+6 -2
ingester/fly.toml
··· 9 9 [build] 10 10 dockerfile = 'Dockerfile' 11 11 12 + [mounts] 13 + source = 'typeahead_data' 14 + destination = '/data' 15 + 12 16 [env] 13 17 TYPEAHEAD_URL = 'https://typeahead.nate-8fe.workers.dev' 18 + SQLITE_TMPDIR = '/data/tmp' 14 19 15 20 [processes] 16 21 app = './typeahead-ingester' ··· 24 29 processes = ['app'] 25 30 26 31 [[vm]] 27 - memory = '256mb' 32 + memory = '512mb' 28 33 cpu_kind = 'shared' 29 34 cpus = 1 30 - memory_mb = 256
+58
ingester/src/bloom.zig
··· 1 + //! Fixed-size bloom filter for DID dedup 2 + //! Avoids re-sending already-known bare DIDs to the worker 3 + 4 + const std = @import("std"); 5 + const Allocator = std.mem.Allocator; 6 + 7 + const BLOOM_HASHES: usize = 7; 8 + 9 + pub const BloomFilter = struct { 10 + bits: std.DynamicBitSetUnmanaged, 11 + num_bits: usize, 12 + num_hashes: usize, 13 + count: usize = 0, 14 + 15 + pub fn init(allocator: Allocator, num_bits: usize, num_hashes: usize) !BloomFilter { 16 + const bits = try std.DynamicBitSetUnmanaged.initEmpty(allocator, num_bits); 17 + return .{ 18 + .bits = bits, 19 + .num_bits = num_bits, 20 + .num_hashes = num_hashes, 21 + }; 22 + } 23 + 24 + pub fn deinit(self: *BloomFilter, allocator: Allocator) void { 25 + self.bits.deinit(allocator); 26 + } 27 + 28 + fn hashIndices(self: *const BloomFilter, key: []const u8) [BLOOM_HASHES]usize { 29 + const h1 = std.hash.Wyhash.hash(0, key); 30 + const h2 = std.hash.Wyhash.hash(1, key); 31 + var indices: [BLOOM_HASHES]usize = undefined; 32 + for (0..self.num_hashes) |i| { 33 + indices[i] = @intCast((h1 +% i *% h2) % self.num_bits); 34 + } 35 + return indices; 36 + } 37 + 38 + pub fn insert(self: *BloomFilter, key: []const u8) void { 39 + const indices = self.hashIndices(key); 40 + for (indices) |idx| { 41 + self.bits.set(idx); 42 + } 43 + self.count += 1; 44 + } 45 + 46 + pub fn contains(self: *const BloomFilter, key: []const u8) bool { 47 + const indices = self.hashIndices(key); 48 + for (indices) |idx| { 49 + if (!self.bits.isSet(idx)) return false; 50 + } 51 + return true; 52 + } 53 + 54 + pub fn reset(self: *BloomFilter) void { 55 + self.bits.unsetAll(); 56 + self.count = 0; 57 + } 58 + };
+262
ingester/src/db/LocalDb.zig
··· 1 + //! Local SQLite read replica using zqlite 2 + //! Provides fast FTS5 queries while Turso remains source of truth 3 + 4 + const std = @import("std"); 5 + const zqlite = @import("zqlite"); 6 + const Allocator = std.mem.Allocator; 7 + 8 + const log = std.log.scoped(.local_db); 9 + 10 + const LocalDb = @This(); 11 + 12 + conn: ?zqlite.Conn = null, 13 + read_conn: ?zqlite.Conn = null, // separate read connection — never blocked by writes in WAL mode 14 + allocator: Allocator, 15 + is_ready: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 16 + mutex: std.Thread.Mutex = .{}, // protects write conn only 17 + path: []const u8 = "", 18 + 19 + pub fn init(allocator: Allocator) LocalDb { 20 + return .{ .allocator = allocator }; 21 + } 22 + 23 + /// Clean up leftover staging tables from interrupted bootstrap. 24 + /// Called during open so sync starts clean. 25 + fn cleanupStagingTables(self: *LocalDb) void { 26 + const c = self.conn orelse return; 27 + c.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {}; 28 + c.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 29 + } 30 + 31 + /// Verify FTS5 table is functional. If broken (e.g. from a failed RENAME), 32 + /// drop it and recreate empty — sync will repopulate it. 33 + fn repairFtsIfBroken(self: *LocalDb) void { 34 + const c = self.conn orelse return; 35 + // probe: try a simple MATCH query — if shadow tables are broken this errors 36 + const row = c.row("SELECT did FROM actors_fts WHERE actors_fts MATCH '\"test\"*' LIMIT 1", .{}) catch { 37 + log.warn("FTS5 table broken, dropping and recreating", .{}); 38 + c.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 39 + c.exec( 40 + \\CREATE VIRTUAL TABLE actors_fts USING fts5( 41 + \\ did UNINDEXED, handle, display_name, 42 + \\ tokenize='unicode61 remove_diacritics 2' 43 + \\) 44 + , .{}) catch {}; 45 + // clear sync state so fullSync rebuilds FTS 46 + c.exec("DELETE FROM sync_meta WHERE key = 'sync_complete'", .{}) catch {}; 47 + c.exec("DELETE FROM sync_meta WHERE key = 'last_sync'", .{}) catch {}; 48 + log.info("FTS5 repaired, sync will rebuild on next run", .{}); 49 + return; 50 + }; 51 + if (row) |r| r.deinit(); 52 + } 53 + 54 + pub fn open(self: *LocalDb) !void { 55 + const path_env = std.posix.getenv("LOCAL_DB_PATH") orelse "/data/local.db"; 56 + self.path = path_env; 57 + 58 + // ensure SQLite temp directory exists on the persistent volume 59 + const tmp_dir = std.posix.getenv("SQLITE_TMPDIR") orelse "/data/tmp"; 60 + std.fs.cwd().makePath(tmp_dir) catch |err| { 61 + log.warn("failed to create temp dir {s}: {}", .{ tmp_dir, err }); 62 + }; 63 + 64 + try self.openDb(path_env); 65 + } 66 + 67 + fn openDb(self: *LocalDb, path_env: []const u8) !void { 68 + var path_buf: [256]u8 = undefined; 69 + if (path_env.len >= path_buf.len) return error.PathTooLong; 70 + @memcpy(path_buf[0..path_env.len], path_env); 71 + path_buf[path_env.len] = 0; 72 + const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 73 + 74 + log.info("opening {s}", .{path_env}); 75 + 76 + const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 77 + self.conn = zqlite.open(path, flags) catch |err| { 78 + log.err("failed to open write conn: {}", .{err}); 79 + return err; 80 + }; 81 + 82 + _ = self.conn.?.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 83 + _ = self.conn.?.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 84 + _ = self.conn.?.exec("PRAGMA synchronous=NORMAL", .{}) catch {}; // safe with WAL 85 + _ = self.conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; // 20MB page cache 86 + _ = self.conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; // 256MB 87 + 88 + // separate read connection — WAL allows concurrent reads + writes 89 + self.read_conn = zqlite.open(path, zqlite.OpenFlags.ReadOnly) catch |err| { 90 + log.err("failed to open read conn: {}", .{err}); 91 + return err; 92 + }; 93 + _ = self.read_conn.?.exec("PRAGMA busy_timeout=1000", .{}) catch {}; 94 + _ = self.read_conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; // 256MB 95 + _ = self.read_conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; // 20MB 96 + 97 + self.cleanupStagingTables(); 98 + try self.createSchema(); 99 + self.repairFtsIfBroken(); 100 + log.info("initialized", .{}); 101 + } 102 + 103 + pub fn deinit(self: *LocalDb) void { 104 + if (self.read_conn) |c| c.close(); 105 + self.read_conn = null; 106 + if (self.conn) |c| c.close(); 107 + self.conn = null; 108 + } 109 + 110 + /// Close read connection (e.g. during bootstrap to avoid WAL reader interference) 111 + pub fn closeReadConn(self: *LocalDb) void { 112 + if (self.read_conn) |c| c.close(); 113 + self.read_conn = null; 114 + } 115 + 116 + /// Reopen read connection after bootstrap completes 117 + pub fn reopenReadConn(self: *LocalDb) !void { 118 + if (self.read_conn != null) return; 119 + 120 + var path_buf: [256]u8 = undefined; 121 + if (self.path.len >= path_buf.len) return error.PathTooLong; 122 + @memcpy(path_buf[0..self.path.len], self.path); 123 + path_buf[self.path.len] = 0; 124 + const path: [*:0]const u8 = path_buf[0..self.path.len :0]; 125 + 126 + self.read_conn = zqlite.open(path, zqlite.OpenFlags.ReadOnly) catch |err| { 127 + log.err("failed to reopen read conn: {}", .{err}); 128 + return err; 129 + }; 130 + _ = self.read_conn.?.exec("PRAGMA busy_timeout=1000", .{}) catch {}; 131 + _ = self.read_conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; 132 + _ = self.read_conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; 133 + } 134 + 135 + pub fn isReady(self: *LocalDb) bool { 136 + return self.is_ready.load(.acquire); 137 + } 138 + 139 + pub fn setReady(self: *LocalDb, ready: bool) void { 140 + self.is_ready.store(ready, .release); 141 + } 142 + 143 + fn createSchema(self: *LocalDb) !void { 144 + const c = self.conn orelse return error.NotOpen; 145 + 146 + c.exec( 147 + \\CREATE TABLE IF NOT EXISTS actors ( 148 + \\ did TEXT PRIMARY KEY, 149 + \\ handle TEXT NOT NULL DEFAULT '', 150 + \\ display_name TEXT DEFAULT '', 151 + \\ avatar_url TEXT DEFAULT '', 152 + \\ hidden INTEGER NOT NULL DEFAULT 0, 153 + \\ labels TEXT NOT NULL DEFAULT '[]', 154 + \\ created_at TEXT DEFAULT '', 155 + \\ associated TEXT DEFAULT '{}' 156 + \\) 157 + , .{}) catch |err| { 158 + log.err("failed to create actors table: {}", .{err}); 159 + return err; 160 + }; 161 + 162 + c.exec("CREATE INDEX IF NOT EXISTS idx_actors_handle ON actors(handle COLLATE NOCASE)", .{}) catch {}; 163 + 164 + // standalone FTS5 (not content-synced — avoids rowid tracking complexity with INSERT OR REPLACE) 165 + c.exec( 166 + \\CREATE VIRTUAL TABLE IF NOT EXISTS actors_fts USING fts5( 167 + \\ did UNINDEXED, handle, display_name, 168 + \\ tokenize='unicode61 remove_diacritics 2' 169 + \\) 170 + , .{}) catch |err| { 171 + log.err("failed to create actors_fts: {}", .{err}); 172 + return err; 173 + }; 174 + 175 + c.exec( 176 + \\CREATE TABLE IF NOT EXISTS sync_meta ( 177 + \\ key TEXT PRIMARY KEY, 178 + \\ value TEXT 179 + \\) 180 + , .{}) catch |err| { 181 + log.err("failed to create sync_meta table: {}", .{err}); 182 + return err; 183 + }; 184 + } 185 + 186 + pub const Row = struct { 187 + stmt: zqlite.Row, 188 + 189 + pub fn text(self: Row, index: usize) []const u8 { 190 + return self.stmt.text(index); 191 + } 192 + 193 + pub fn int(self: Row, index: usize) i64 { 194 + return self.stmt.int(index); 195 + } 196 + }; 197 + 198 + pub const Rows = struct { 199 + inner: zqlite.Rows, 200 + 201 + pub fn next(self: *Rows) ?Row { 202 + if (self.inner.next()) |r| { 203 + return .{ .stmt = r }; 204 + } 205 + return null; 206 + } 207 + 208 + pub fn deinit(self: *Rows) void { 209 + self.inner.deinit(); 210 + } 211 + 212 + pub fn err(self: *Rows) ?anyerror { 213 + return self.inner.err; 214 + } 215 + }; 216 + 217 + /// SELECT using read connection (never blocked by writes) 218 + pub fn query(self: *LocalDb, comptime sql: []const u8, args: anytype) !Rows { 219 + const c = self.read_conn orelse return error.NotOpen; 220 + const rows = c.rows(sql, args) catch |e| { 221 + log.err("query failed: {s}", .{@errorName(e)}); 222 + return e; 223 + }; 224 + return .{ .inner = rows }; 225 + } 226 + 227 + /// Execute a statement (INSERT, UPDATE, DELETE) — mutex-protected 228 + pub fn exec(self: *LocalDb, comptime sql: []const u8, args: anytype) !void { 229 + self.mutex.lock(); 230 + defer self.mutex.unlock(); 231 + 232 + const c = self.conn orelse return error.NotOpen; 233 + c.exec(sql, args) catch |e| { 234 + log.err("exec failed: {s}", .{@errorName(e)}); 235 + return e; 236 + }; 237 + } 238 + 239 + /// Get raw write connection for batch operations (caller must hold lock) 240 + pub fn getConn(self: *LocalDb) ?zqlite.Conn { 241 + return self.conn; 242 + } 243 + 244 + pub fn lock(self: *LocalDb) void { 245 + self.mutex.lock(); 246 + } 247 + 248 + pub fn unlock(self: *LocalDb) void { 249 + self.mutex.unlock(); 250 + } 251 + 252 + /// Get cached actor count from sync_meta (written during sync, avoids full table scan) 253 + pub fn countActors(self: *LocalDb) u64 { 254 + const c = self.read_conn orelse return 0; 255 + const row = c.row("SELECT value FROM sync_meta WHERE key = 'actor_count'", .{}) catch return 0; 256 + if (row) |r| { 257 + defer r.deinit(); 258 + const val = r.text(0); 259 + return std.fmt.parseInt(u64, val, 10) catch 0; 260 + } 261 + return 0; 262 + }
+224
ingester/src/db/TursoClient.zig
··· 1 + //! Read-only Turso HTTP API client for sync 2 + //! Uses hrana v2 pipeline protocol 3 + 4 + const std = @import("std"); 5 + const http = std.http; 6 + const json = std.json; 7 + const mem = std.mem; 8 + const Allocator = mem.Allocator; 9 + 10 + const log = std.log.scoped(.turso); 11 + 12 + const TursoClient = @This(); 13 + 14 + const Value = struct { type: []const u8 = "text", value: []const u8 }; 15 + const Stmt = struct { sql: []const u8, args: ?[]const Value = null }; 16 + const ExecuteReq = struct { type: []const u8 = "execute", stmt: Stmt }; 17 + const CloseReq = struct { type: []const u8 = "close" }; 18 + 19 + allocator: Allocator, 20 + url: []const u8, // host only (no protocol prefix) 21 + token: []const u8, 22 + http_client: http.Client, 23 + mutex: std.Thread.Mutex = .{}, 24 + 25 + pub fn init(allocator: Allocator) !TursoClient { 26 + const url = std.posix.getenv("TURSO_URL") orelse { 27 + log.err("TURSO_URL not set", .{}); 28 + return error.MissingEnv; 29 + }; 30 + const token = std.posix.getenv("TURSO_AUTH_TOKEN") orelse { 31 + log.err("TURSO_AUTH_TOKEN not set", .{}); 32 + return error.MissingEnv; 33 + }; 34 + 35 + const libsql_prefix = "libsql://"; 36 + const host = if (mem.startsWith(u8, url, libsql_prefix)) 37 + url[libsql_prefix.len..] 38 + else 39 + url; 40 + 41 + log.info("turso client → {s}", .{host}); 42 + 43 + return .{ 44 + .allocator = allocator, 45 + .url = host, 46 + .token = token, 47 + .http_client = .{ .allocator = allocator }, 48 + }; 49 + } 50 + 51 + pub fn deinit(self: *TursoClient) void { 52 + self.http_client.deinit(); 53 + } 54 + 55 + pub const Row = struct { 56 + columns: []const json.Value, 57 + 58 + pub fn text(self: Row, index: usize) []const u8 { 59 + if (index >= self.columns.len) return ""; 60 + return extractText(self.columns[index]); 61 + } 62 + 63 + pub fn int(self: Row, index: usize) i64 { 64 + if (index >= self.columns.len) return 0; 65 + return extractInt(self.columns[index]); 66 + } 67 + }; 68 + 69 + pub const Result = struct { 70 + allocator: Allocator, 71 + parsed: ?json.Parsed(json.Value), 72 + rows: []const Row, 73 + 74 + pub fn deinit(self: *Result) void { 75 + self.allocator.free(self.rows); 76 + if (self.parsed) |*p| p.deinit(); 77 + } 78 + }; 79 + 80 + pub fn query(self: *TursoClient, sql: []const u8, args: []const []const u8) !Result { 81 + const response = try self.executeRaw(sql, args); 82 + defer self.allocator.free(response); 83 + return parseResult(self.allocator, response); 84 + } 85 + 86 + fn executeRaw(self: *TursoClient, sql: []const u8, args: []const []const u8) ![]const u8 { 87 + self.mutex.lock(); 88 + defer self.mutex.unlock(); 89 + 90 + var url_buf: [512]u8 = undefined; 91 + const url = std.fmt.bufPrint(&url_buf, "https://{s}/v2/pipeline", .{self.url}) catch 92 + return error.UrlTooLong; 93 + 94 + const body = try self.buildRequestBody(sql, args); 95 + defer self.allocator.free(body); 96 + 97 + var auth_buf: [512]u8 = undefined; 98 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{self.token}) catch 99 + return error.AuthTooLong; 100 + 101 + var response_body: std.Io.Writer.Allocating = .init(self.allocator); 102 + errdefer response_body.deinit(); 103 + 104 + const res = self.http_client.fetch(.{ 105 + .location = .{ .url = url }, 106 + .method = .POST, 107 + .headers = .{ 108 + .content_type = .{ .override = "application/json" }, 109 + .authorization = .{ .override = auth }, 110 + }, 111 + .payload = body, 112 + .response_writer = &response_body.writer, 113 + }) catch |err| { 114 + log.err("http failed: {s}", .{@errorName(err)}); 115 + return error.HttpError; 116 + }; 117 + 118 + if (res.status != .ok) { 119 + const resp_text = response_body.toOwnedSlice() catch ""; 120 + defer if (resp_text.len > 0) self.allocator.free(resp_text); 121 + const preview = if (resp_text.len > 200) resp_text[0..200] else resp_text; 122 + log.err("turso error: {} | {s}", .{ res.status, preview }); 123 + return error.TursoError; 124 + } 125 + 126 + return try response_body.toOwnedSlice(); 127 + } 128 + 129 + fn buildRequestBody(self: *TursoClient, sql: []const u8, args: []const []const u8) ![]const u8 { 130 + var body: std.Io.Writer.Allocating = .init(self.allocator); 131 + errdefer body.deinit(); 132 + var jw: json.Stringify = .{ .writer = &body.writer, .options = .{ .emit_null_optional_fields = false } }; 133 + 134 + var values: []const Value = &.{}; 135 + defer if (values.len > 0) self.allocator.free(values); 136 + 137 + if (args.len > 0) { 138 + const v = try self.allocator.alloc(Value, args.len); 139 + for (args, 0..) |arg, i| { 140 + v[i] = .{ .value = arg }; 141 + } 142 + values = v; 143 + } 144 + 145 + try jw.beginObject(); 146 + try jw.objectField("requests"); 147 + try jw.beginArray(); 148 + try jw.write(ExecuteReq{ 149 + .stmt = .{ .sql = sql, .args = if (values.len > 0) values else null }, 150 + }); 151 + try jw.write(CloseReq{}); 152 + try jw.endArray(); 153 + try jw.endObject(); 154 + 155 + return try body.toOwnedSlice(); 156 + } 157 + 158 + fn parseResult(allocator: Allocator, response: []const u8) !Result { 159 + const parsed = json.parseFromSlice(json.Value, allocator, response, .{}) catch { 160 + return .{ .allocator = allocator, .parsed = null, .rows = &.{} }; 161 + }; 162 + 163 + const json_rows = getRowsFromParsed(parsed.value) orelse { 164 + return .{ .allocator = allocator, .parsed = parsed, .rows = &.{} }; 165 + }; 166 + 167 + var rows: std.ArrayList(Row) = .{}; 168 + errdefer rows.deinit(allocator); 169 + 170 + for (json_rows.items) |item| { 171 + if (item == .array) { 172 + try rows.append(allocator, .{ .columns = item.array.items }); 173 + } 174 + } 175 + 176 + return .{ 177 + .allocator = allocator, 178 + .parsed = parsed, 179 + .rows = try rows.toOwnedSlice(allocator), 180 + }; 181 + } 182 + 183 + fn getRowsFromResult(item: json.Value) ?json.Array { 184 + if (item != .object) return null; 185 + const resp = item.object.get("response") orelse return null; 186 + if (resp != .object) return null; 187 + const res = resp.object.get("result") orelse return null; 188 + if (res != .object) return null; 189 + const rows = res.object.get("rows") orelse return null; 190 + if (rows != .array) return null; 191 + return rows.array; 192 + } 193 + 194 + fn getRowsFromParsed(value: json.Value) ?json.Array { 195 + const results = value.object.get("results") orelse return null; 196 + if (results != .array or results.array.items.len == 0) return null; 197 + return getRowsFromResult(results.array.items[0]); 198 + } 199 + 200 + fn extractText(val: json.Value) []const u8 { 201 + return switch (val) { 202 + .string => |s| s, 203 + .object => |obj| { 204 + const v = obj.get("value") orelse return ""; 205 + return if (v == .string) v.string else ""; 206 + }, 207 + else => "", 208 + }; 209 + } 210 + 211 + fn extractInt(val: json.Value) i64 { 212 + return switch (val) { 213 + .integer => |i| i, 214 + .object => |obj| { 215 + const v = obj.get("value") orelse return 0; 216 + return switch (v) { 217 + .integer => |i| i, 218 + .string => |s| std.fmt.parseInt(i64, s, 10) catch 0, 219 + else => 0, 220 + }; 221 + }, 222 + else => 0, 223 + }; 224 + }
+713
ingester/src/db/sync.zig
··· 1 + //! Sync from Turso to local SQLite 2 + //! Full sync on startup, incremental sync every 5 minutes 3 + 4 + const std = @import("std"); 5 + const zqlite = @import("zqlite"); 6 + const Allocator = std.mem.Allocator; 7 + const TursoClient = @import("TursoClient.zig"); 8 + const LocalDb = @import("LocalDb.zig"); 9 + 10 + const log = std.log.scoped(.sync); 11 + 12 + const BATCH_SIZE = 2000; 13 + const SYNC_INTERVAL_S = 300; // 5 minutes 14 + const TOMBSTONE_RETENTION_S = 7 * 24 * 3600; // 7 days — must match cron.ts pruning 15 + 16 + /// Full sync: fetch all searchable actors from Turso using keyset pagination. 17 + /// Does NOT run stale cleanup — that's handled by incremental sync (tombstones + 18 + /// "became unsearchable" queries). This avoids races with the dual-write path. 19 + /// When `wipe` is true, deletes all local data first (used by stale rebuild path). 20 + pub fn fullSync(turso: *TursoClient, local: *LocalDb, wipe: bool) !void { 21 + log.info("starting full sync (wipe={})...", .{wipe}); 22 + 23 + const conn = local.getConn() orelse return error.LocalNotOpen; 24 + 25 + // check if a previous full sync completed successfully 26 + const was_completed = blk: { 27 + local.lock(); 28 + defer local.unlock(); 29 + const row = conn.row( 30 + "SELECT value FROM sync_meta WHERE key = 'sync_complete'", 31 + .{}, 32 + ) catch break :blk false; 33 + if (row) |r| { 34 + defer r.deinit(); 35 + break :blk std.mem.eql(u8, r.text(0), "1"); 36 + } 37 + break :blk false; 38 + }; 39 + 40 + if (wipe or !was_completed) { 41 + // bootstrap path: use staging tables for fast bulk load 42 + return fullSyncBootstrap(turso, local, conn, wipe); 43 + } 44 + 45 + // re-sync path: previous sync completed, keep serving while re-syncing 46 + local.setReady(true); 47 + log.info("previous sync complete, keeping ready during re-sync", .{}); 48 + return fullSyncResync(turso, local, conn); 49 + } 50 + 51 + /// Bootstrap path: load into staging tables (no indexes), build indexes in bulk, 52 + /// then atomically swap into place. Live schema is never in a degraded state. 53 + fn fullSyncBootstrap(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn, wipe: bool) !void { 54 + const total_t0 = std.time.milliTimestamp(); 55 + local.setReady(false); 56 + 57 + // close read connection during bootstrap — not serving traffic, 58 + // avoids WAL reader lock interference with DDL/index builds 59 + local.closeReadConn(); 60 + errdefer local.reopenReadConn() catch {}; 61 + 62 + if (wipe) { 63 + local.lock(); 64 + defer local.unlock(); 65 + conn.exec("DELETE FROM sync_meta", .{}) catch {}; 66 + } 67 + 68 + // clean up any leftover staging tables from a previous failed attempt 69 + { 70 + local.lock(); 71 + defer local.unlock(); 72 + conn.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {}; 73 + conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 74 + } 75 + 76 + // create staging table — no PK, no indexes for fast sequential appends 77 + { 78 + local.lock(); 79 + defer local.unlock(); 80 + conn.exec( 81 + \\CREATE TABLE actors_stage ( 82 + \\ did TEXT NOT NULL, 83 + \\ handle TEXT NOT NULL DEFAULT '', 84 + \\ display_name TEXT DEFAULT '', 85 + \\ avatar_url TEXT DEFAULT '', 86 + \\ hidden INTEGER NOT NULL DEFAULT 0, 87 + \\ labels TEXT NOT NULL DEFAULT '[]', 88 + \\ created_at TEXT DEFAULT '', 89 + \\ associated TEXT DEFAULT '{}' 90 + \\) 91 + , .{}) catch |err| { 92 + log.err("failed to create actors_stage: {}", .{err}); 93 + return err; 94 + }; 95 + } 96 + 97 + // keyset pagination: fetch rows WHERE rowid > last_rowid ORDER BY rowid 98 + var actor_count: usize = 0; 99 + var error_count: usize = 0; 100 + var last_rowid: i64 = 0; 101 + var had_turso_error = false; 102 + const load_t0 = std.time.milliTimestamp(); 103 + 104 + while (true) { 105 + var rowid_buf: [20]u8 = undefined; 106 + const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 107 + 108 + const t0 = std.time.milliTimestamp(); 109 + 110 + // fetch from turso (no lock held) 111 + var result = turso.query( 112 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid 113 + \\FROM actors WHERE handle != '' AND rowid > ? 114 + \\ORDER BY rowid LIMIT 2000 115 + , &.{rowid_str}) catch |err| { 116 + log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 117 + had_turso_error = true; 118 + break; 119 + }; 120 + defer result.deinit(); 121 + 122 + const t1 = std.time.milliTimestamp(); 123 + 124 + if (result.rows.len == 0) break; 125 + 126 + // write batch to staging table — no indexes, fast sequential appends 127 + { 128 + local.lock(); 129 + defer local.unlock(); 130 + conn.exec("BEGIN", .{}) catch {}; 131 + for (result.rows) |row| { 132 + insertActorStage(conn, row) catch |err| { 133 + log.err("insert actor failed: {}", .{err}); 134 + error_count += 1; 135 + continue; 136 + }; 137 + actor_count += 1; 138 + } 139 + conn.exec("COMMIT", .{}) catch {}; 140 + } 141 + 142 + const t2 = std.time.milliTimestamp(); 143 + 144 + // advance cursor to last row's rowid (column 8) 145 + last_rowid = result.rows[result.rows.len - 1].int(8); 146 + 147 + log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 148 + result.rows.len, last_rowid, actor_count, 149 + t1 - t0, t2 - t1, 150 + }); 151 + } 152 + 153 + if (had_turso_error or error_count > 0) { 154 + log.warn("bootstrap incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error }); 155 + // clean up staging table 156 + { 157 + local.lock(); 158 + defer local.unlock(); 159 + conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 160 + } 161 + // restore read connection so health endpoint works 162 + local.reopenReadConn() catch {}; 163 + return; 164 + } 165 + 166 + const load_ms = std.time.milliTimestamp() - load_t0; 167 + 168 + // checkpoint WAL before heavy DDL — clean slate for index builds 169 + { 170 + local.lock(); 171 + defer local.unlock(); 172 + conn.exec("PRAGMA wal_checkpoint(TRUNCATE)", .{}) catch {}; 173 + } 174 + 175 + // build indexes on staging table 176 + log.info("building indexes on {d} rows...", .{actor_count}); 177 + const idx_did_t0 = std.time.milliTimestamp(); 178 + { 179 + local.lock(); 180 + defer local.unlock(); 181 + conn.exec("CREATE UNIQUE INDEX idx_stage_did ON actors_stage(did)", .{}) catch |err| { 182 + log.err("failed to create unique index on staging: {}", .{err}); 183 + conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 184 + return err; 185 + }; 186 + } 187 + const idx_did_ms = std.time.milliTimestamp() - idx_did_t0; 188 + 189 + const idx_handle_t0 = std.time.milliTimestamp(); 190 + { 191 + local.lock(); 192 + defer local.unlock(); 193 + conn.exec("CREATE INDEX idx_stage_handle ON actors_stage(handle COLLATE NOCASE)", .{}) catch |err| { 194 + log.err("failed to create handle index on staging: {}", .{err}); 195 + conn.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 196 + return err; 197 + }; 198 + } 199 + const idx_handle_ms = std.time.milliTimestamp() - idx_handle_t0; 200 + 201 + // swap actors table first, then build FTS with final name. 202 + // FTS5 virtual tables use shadow tables (_content, _data, _idx, etc.) 203 + // that don't reliably rename with ALTER TABLE RENAME — so we never rename FTS5. 204 + { 205 + local.lock(); 206 + defer local.unlock(); 207 + conn.exec("BEGIN", .{}) catch {}; 208 + conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 209 + conn.exec("DROP TABLE IF EXISTS actors", .{}) catch {}; 210 + conn.exec("ALTER TABLE actors_stage RENAME TO actors", .{}) catch |err| { 211 + log.err("swap failed (rename actors_stage): {}", .{err}); 212 + conn.exec("ROLLBACK", .{}) catch {}; 213 + return err; 214 + }; 215 + conn.exec("COMMIT", .{}) catch |err| { 216 + log.err("swap commit failed: {}", .{err}); 217 + return err; 218 + }; 219 + } 220 + 221 + // build FTS with its final name — no rename needed 222 + const fts_create_t0 = std.time.milliTimestamp(); 223 + { 224 + local.lock(); 225 + defer local.unlock(); 226 + conn.exec( 227 + \\CREATE VIRTUAL TABLE actors_fts USING fts5( 228 + \\ did UNINDEXED, handle, display_name, 229 + \\ tokenize='unicode61 remove_diacritics 2' 230 + \\) 231 + , .{}) catch |err| { 232 + log.err("failed to create FTS table: {}", .{err}); 233 + return err; 234 + }; 235 + } 236 + const fts_create_ms = std.time.milliTimestamp() - fts_create_t0; 237 + 238 + const fts_pop_t0 = std.time.milliTimestamp(); 239 + { 240 + local.lock(); 241 + defer local.unlock(); 242 + conn.exec( 243 + \\INSERT INTO actors_fts (did, handle, display_name) 244 + \\SELECT did, handle, display_name FROM actors WHERE handle != '' 245 + , .{}) catch |err| { 246 + log.err("FTS populate failed: {}", .{err}); 247 + return err; 248 + }; 249 + } 250 + const fts_pop_ms = std.time.milliTimestamp() - fts_pop_t0; 251 + 252 + // post-swap: optimize, record completion + actor count, checkpoint 253 + { 254 + local.lock(); 255 + defer local.unlock(); 256 + conn.exec("PRAGMA optimize", .{}) catch {}; 257 + 258 + var ts_buf: [20]u8 = undefined; 259 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 260 + conn.exec( 261 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 262 + .{ts_str}, 263 + ) catch {}; 264 + conn.exec( 265 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')", 266 + .{}, 267 + ) catch {}; 268 + 269 + // cache actor count so health endpoint doesn't need COUNT(*) 270 + var count_buf: [20]u8 = undefined; 271 + const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 272 + conn.exec( 273 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 274 + .{count_str}, 275 + ) catch {}; 276 + 277 + conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 278 + } 279 + 280 + const total_ms = std.time.milliTimestamp() - total_t0; 281 + 282 + // reopen read connection before serving traffic 283 + local.reopenReadConn() catch |err| { 284 + log.err("failed to reopen read conn after bootstrap: {}", .{err}); 285 + }; 286 + 287 + local.setReady(true); 288 + log.info("bootstrap complete — total_rows={d} load_ms={d} idx_did_ms={d} idx_handle_ms={d} fts_create_ms={d} fts_populate_ms={d} total_ms={d}", .{ 289 + actor_count, load_ms, idx_did_ms, idx_handle_ms, fts_create_ms, fts_pop_ms, total_ms, 290 + }); 291 + } 292 + 293 + /// Re-sync path: previous sync completed, update live table directly. 294 + /// Only processes actors that may have changed — index maintenance cost is negligible. 295 + fn fullSyncResync(turso: *TursoClient, local: *LocalDb, conn: zqlite.Conn) !void { 296 + var actor_count: usize = 0; 297 + var error_count: usize = 0; 298 + var last_rowid: i64 = 0; 299 + var had_turso_error = false; 300 + 301 + while (true) { 302 + var rowid_buf: [20]u8 = undefined; 303 + const rowid_str = std.fmt.bufPrint(&rowid_buf, "{d}", .{last_rowid}) catch break; 304 + 305 + const t0 = std.time.milliTimestamp(); 306 + 307 + var result = turso.query( 308 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, rowid 309 + \\FROM actors WHERE handle != '' AND rowid > ? 310 + \\ORDER BY rowid LIMIT 2000 311 + , &.{rowid_str}) catch |err| { 312 + log.err("turso query failed at rowid {d}: {}", .{ last_rowid, err }); 313 + had_turso_error = true; 314 + break; 315 + }; 316 + defer result.deinit(); 317 + 318 + const t1 = std.time.milliTimestamp(); 319 + 320 + if (result.rows.len == 0) break; 321 + 322 + { 323 + local.lock(); 324 + defer local.unlock(); 325 + conn.exec("BEGIN", .{}) catch {}; 326 + for (result.rows) |row| { 327 + insertActorOnly(conn, row) catch |err| { 328 + log.err("insert actor failed: {}", .{err}); 329 + error_count += 1; 330 + continue; 331 + }; 332 + actor_count += 1; 333 + } 334 + conn.exec("COMMIT", .{}) catch {}; 335 + } 336 + 337 + const t2 = std.time.milliTimestamp(); 338 + 339 + last_rowid = result.rows[result.rows.len - 1].int(8); 340 + 341 + log.info("batch: {d} rows, rowid={d}, total={d} | fetch={d}ms apply={d}ms", .{ 342 + result.rows.len, last_rowid, actor_count, 343 + t1 - t0, t2 - t1, 344 + }); 345 + } 346 + 347 + if (had_turso_error or error_count > 0) { 348 + log.warn("full sync incomplete — {d} synced, {d} local write errors, turso_error={}", .{ actor_count, error_count, had_turso_error }); 349 + return; 350 + } 351 + 352 + // rebuild FTS from scratch (DROP + CREATE — never reuse a potentially 353 + // broken FTS table from a previous failed rename) 354 + { 355 + log.info("building FTS index for {d} actors...", .{actor_count}); 356 + const fts_t0 = std.time.milliTimestamp(); 357 + local.lock(); 358 + defer local.unlock(); 359 + conn.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 360 + conn.exec( 361 + \\CREATE VIRTUAL TABLE actors_fts USING fts5( 362 + \\ did UNINDEXED, handle, display_name, 363 + \\ tokenize='unicode61 remove_diacritics 2' 364 + \\) 365 + , .{}) catch |err| { 366 + log.err("failed to create FTS table: {}", .{err}); 367 + }; 368 + conn.exec( 369 + \\INSERT INTO actors_fts (did, handle, display_name) 370 + \\SELECT did, handle, display_name FROM actors WHERE handle != '' 371 + , .{}) catch |err| { 372 + log.err("FTS bulk insert failed: {}", .{err}); 373 + }; 374 + const fts_t1 = std.time.milliTimestamp(); 375 + log.info("FTS index built in {d}ms", .{fts_t1 - fts_t0}); 376 + } 377 + 378 + // record sync time + mark complete + actor count 379 + { 380 + local.lock(); 381 + defer local.unlock(); 382 + var ts_buf: [20]u8 = undefined; 383 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 384 + conn.exec( 385 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 386 + .{ts_str}, 387 + ) catch {}; 388 + conn.exec( 389 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('sync_complete', '1')", 390 + .{}, 391 + ) catch {}; 392 + 393 + var count_buf: [20]u8 = undefined; 394 + const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{actor_count}) catch "0"; 395 + conn.exec( 396 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 397 + .{count_str}, 398 + ) catch {}; 399 + 400 + conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 401 + } 402 + 403 + local.setReady(true); 404 + log.info("full sync complete — {d} actors", .{actor_count}); 405 + } 406 + 407 + /// Incremental sync: fetch actors updated since last sync + tombstones. 408 + /// Uses keyset pagination on (updated_at, did) to drain all updates. 409 + /// Only advances watermark when all turso queries succeed and all local writes succeed. 410 + pub fn incrementalSync(turso: *TursoClient, local: *LocalDb) !void { 411 + const conn = local.getConn() orelse return error.LocalNotOpen; 412 + 413 + // get last sync time 414 + local.lock(); 415 + const last_sync_ts = blk: { 416 + const row = conn.row( 417 + "SELECT value FROM sync_meta WHERE key = 'last_sync'", 418 + .{}, 419 + ) catch { 420 + local.unlock(); 421 + break :blk @as(i64, 0); 422 + }; 423 + if (row) |r| { 424 + defer r.deinit(); 425 + const val = r.text(0); 426 + local.unlock(); 427 + break :blk if (val.len == 0) 0 else std.fmt.parseInt(i64, val, 10) catch 0; 428 + } 429 + local.unlock(); 430 + break :blk @as(i64, 0); 431 + }; 432 + 433 + if (last_sync_ts == 0) { 434 + log.info("no last_sync found, doing full sync", .{}); 435 + return fullSync(turso, local, false); 436 + } 437 + 438 + // if last sync is older than tombstone retention, local data is too stale — 439 + // tombstones may have been pruned, so incremental sync can't catch deletions. 440 + // wipe and rebuild from scratch. 441 + const now = std.time.timestamp(); 442 + if (now - last_sync_ts > TOMBSTONE_RETENTION_S) { 443 + log.warn("last sync {d}s ago (>{d}s tombstone retention), wiping stale replica", .{ 444 + now - last_sync_ts, TOMBSTONE_RETENTION_S, 445 + }); 446 + return fullSync(turso, local, true); 447 + } 448 + 449 + local.setReady(true); 450 + 451 + // buffer: subtract 300s to catch stragglers (matches leaflet-search) 452 + const since_ts = last_sync_ts - 300; 453 + var since_buf: [20]u8 = undefined; 454 + const since_str = std.fmt.bufPrint(&since_buf, "{d}", .{since_ts}) catch return; 455 + 456 + // fetch updated searchable actors — keyset pagination to drain all 457 + var updated: usize = 0; 458 + var error_count: usize = 0; 459 + var had_turso_error = false; 460 + { 461 + var cursor_ts_buf: [20]u8 = undefined; 462 + var cursor_did_buf: [128]u8 = undefined; 463 + @memcpy(cursor_ts_buf[0..since_str.len], since_str); 464 + var cursor_ts: []const u8 = cursor_ts_buf[0..since_str.len]; 465 + var cursor_did_len: usize = 0; 466 + 467 + while (true) { 468 + const cursor_did: []const u8 = if (cursor_did_len > 0) cursor_did_buf[0..cursor_did_len] else ""; 469 + 470 + const args: []const []const u8 = if (cursor_did_len > 0) 471 + &.{ cursor_ts, cursor_ts, cursor_did } 472 + else 473 + &.{cursor_ts}; 474 + 475 + const sql = if (cursor_did_len > 0) 476 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at 477 + \\FROM actors WHERE handle != '' AND (updated_at > ?1 OR (updated_at = ?2 AND did > ?3)) 478 + \\ORDER BY updated_at, did LIMIT 2000 479 + else 480 + \\SELECT did, handle, display_name, avatar_url, hidden, labels, created_at, associated, updated_at 481 + \\FROM actors WHERE handle != '' AND updated_at > ?1 482 + \\ORDER BY updated_at, did LIMIT 2000 483 + ; 484 + 485 + var result = turso.query(sql, args) catch |err| { 486 + log.err("incremental query failed: {}", .{err}); 487 + had_turso_error = true; 488 + break; 489 + }; 490 + defer result.deinit(); 491 + 492 + if (result.rows.len == 0) break; 493 + 494 + { 495 + local.lock(); 496 + defer local.unlock(); 497 + conn.exec("BEGIN", .{}) catch {}; 498 + for (result.rows) |row| { 499 + upsertActorLocal(conn, row) catch { 500 + error_count += 1; 501 + continue; 502 + }; 503 + updated += 1; 504 + } 505 + conn.exec("COMMIT", .{}) catch {}; 506 + } 507 + 508 + // advance cursor to last row's (updated_at, did) 509 + const last_row = result.rows[result.rows.len - 1]; 510 + const last_ua_int = last_row.int(8); // updated_at is integer 511 + const last_did = last_row.text(0); 512 + 513 + // serialize updated_at integer to string for next query 514 + const ua_str = std.fmt.bufPrint(&cursor_ts_buf, "{d}", .{last_ua_int}) catch break; 515 + cursor_ts = ua_str; 516 + 517 + if (last_did.len > 0 and last_did.len <= cursor_did_buf.len) { 518 + @memcpy(cursor_did_buf[0..last_did.len], last_did); 519 + cursor_did_len = last_did.len; 520 + } 521 + 522 + // safety: stop if batch was less than full (we've drained) 523 + if (result.rows.len < BATCH_SIZE) break; 524 + } 525 + } 526 + 527 + // fetch actors that became unsearchable (handle cleared) 528 + var cleared: usize = 0; 529 + unsearchable: { 530 + var result = turso.query( 531 + "SELECT did FROM actors WHERE updated_at > ? AND handle = ''", 532 + &.{since_str}, 533 + ) catch |err| { 534 + log.err("unsearchable query failed: {}", .{err}); 535 + had_turso_error = true; 536 + break :unsearchable; 537 + }; 538 + defer result.deinit(); 539 + 540 + if (result.rows.len > 0) { 541 + local.lock(); 542 + defer local.unlock(); 543 + for (result.rows) |row| { 544 + const did = row.text(0); 545 + conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 546 + error_count += 1; 547 + continue; 548 + }; 549 + conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 550 + cleared += 1; 551 + } 552 + } 553 + } 554 + 555 + // fetch tombstones 556 + var deleted: usize = 0; 557 + tombstone: { 558 + var tomb_result = turso.query( 559 + "SELECT did FROM tombstones WHERE deleted_at > ?", 560 + &.{since_str}, 561 + ) catch |err| { 562 + log.err("tombstone query failed: {}", .{err}); 563 + had_turso_error = true; 564 + break :tombstone; 565 + }; 566 + defer tomb_result.deinit(); 567 + 568 + if (tomb_result.rows.len > 0) { 569 + local.lock(); 570 + defer local.unlock(); 571 + for (tomb_result.rows) |row| { 572 + const did = row.text(0); 573 + conn.exec("DELETE FROM actors WHERE did = ?", .{did}) catch { 574 + error_count += 1; 575 + continue; 576 + }; 577 + conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 578 + deleted += 1; 579 + } 580 + } 581 + } 582 + 583 + // only advance watermark when everything succeeded 584 + const had_error = had_turso_error or error_count > 0; 585 + if (had_error) { 586 + log.warn("incremental sync had errors — {d} updated, {d} deleted, {d} cleared, {d} write errors, turso_error={}", .{ 587 + updated, deleted, cleared, error_count, had_turso_error, 588 + }); 589 + } else { 590 + local.lock(); 591 + defer local.unlock(); 592 + var ts_buf: [20]u8 = undefined; 593 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 594 + conn.exec( 595 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 596 + .{ts_str}, 597 + ) catch {}; 598 + 599 + // refresh cached actor count when data changed 600 + if (updated > 0 or deleted > 0 or cleared > 0) { 601 + updateActorCount(conn); 602 + log.info("incremental sync — {d} updated, {d} deleted, {d} cleared", .{ updated, deleted, cleared }); 603 + } 604 + 605 + conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch {}; 606 + } 607 + } 608 + 609 + /// Background sync loop: full sync on boot, then incremental every 5 min 610 + pub fn syncLoop(allocator: Allocator, local: *LocalDb) void { 611 + var turso = TursoClient.init(allocator) catch |err| { 612 + log.err("turso client init failed: {}, sync disabled", .{err}); 613 + return; 614 + }; 615 + defer turso.deinit(); 616 + 617 + // initial sync (full or incremental depending on state) 618 + incrementalSync(&turso, local) catch |err| { 619 + log.err("initial sync failed: {}", .{err}); 620 + }; 621 + 622 + // periodic incremental sync 623 + while (true) { 624 + std.Thread.sleep(SYNC_INTERVAL_S * std.time.ns_per_s); 625 + 626 + incrementalSync(&turso, local) catch |err| { 627 + log.err("periodic sync failed: {}", .{err}); 628 + }; 629 + } 630 + } 631 + 632 + /// Insert actor row into staging table (no indexes, no FTS — used during bootstrap) 633 + fn insertActorStage(conn: zqlite.Conn, row: anytype) !void { 634 + conn.exec( 635 + \\INSERT INTO actors_stage 636 + \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 637 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 638 + , .{ 639 + row.text(0), // did 640 + row.text(1), // handle 641 + row.text(2), // display_name 642 + row.text(3), // avatar_url 643 + row.int(4), // hidden 644 + row.text(5), // labels 645 + row.text(6), // created_at 646 + row.text(7), // associated 647 + }) catch |err| { 648 + return err; 649 + }; 650 + } 651 + 652 + /// Insert actor row without FTS update (used during re-sync on live table) 653 + fn insertActorOnly(conn: zqlite.Conn, row: anytype) !void { 654 + conn.exec( 655 + \\INSERT OR REPLACE INTO actors 656 + \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 657 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 658 + , .{ 659 + row.text(0), // did 660 + row.text(1), // handle 661 + row.text(2), // display_name 662 + row.text(3), // avatar_url 663 + row.int(4), // hidden 664 + row.text(5), // labels 665 + row.text(6), // created_at 666 + row.text(7), // associated 667 + }) catch |err| { 668 + return err; 669 + }; 670 + } 671 + 672 + /// Upsert a turso row into local SQLite + FTS 673 + fn upsertActorLocal(conn: zqlite.Conn, row: anytype) !void { 674 + const did = row.text(0); 675 + 676 + conn.exec( 677 + \\INSERT OR REPLACE INTO actors 678 + \\(did, handle, display_name, avatar_url, hidden, labels, created_at, associated) 679 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 680 + , .{ 681 + did, 682 + row.text(1), // handle 683 + row.text(2), // display_name 684 + row.text(3), // avatar_url 685 + row.int(4), // hidden 686 + row.text(5), // labels 687 + row.text(6), // created_at 688 + row.text(7), // associated 689 + }) catch |err| { 690 + return err; 691 + }; 692 + 693 + // manual FTS update (standalone FTS, not content-synced) 694 + conn.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 695 + conn.exec( 696 + "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)", 697 + .{ did, row.text(1), row.text(2) }, 698 + ) catch {}; 699 + } 700 + 701 + /// Update cached actor count in sync_meta (avoids COUNT(*) on health checks) 702 + fn updateActorCount(conn: zqlite.Conn) void { 703 + const row = conn.row("SELECT COUNT(*) FROM actors WHERE handle != ''", .{}) catch return; 704 + if (row) |r| { 705 + defer r.deinit(); 706 + var buf: [20]u8 = undefined; 707 + const count_str = std.fmt.bufPrint(&buf, "{d}", .{r.int(0)}) catch return; 708 + conn.exec( 709 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('actor_count', ?)", 710 + .{count_str}, 711 + ) catch {}; 712 + } 713 + }
+429
ingester/src/ingest.zig
··· 1 + //! Jetstream event handler: buffers actor events, flushes batches to worker 2 + //! Dual-writes profile/identity events to local SQLite for immediate search 3 + 4 + const std = @import("std"); 5 + const mem = std.mem; 6 + const json = std.json; 7 + const Allocator = mem.Allocator; 8 + const zat = @import("zat"); 9 + const HttpTransport = zat.HttpTransport; 10 + const LocalDb = @import("db/LocalDb.zig"); 11 + const BloomFilter = @import("bloom.zig").BloomFilter; 12 + 13 + const log = std.log.scoped(.ingester); 14 + 15 + pub const MAX_BATCH: usize = 100; 16 + const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed 17 + const BLOOM_HASHES: usize = 7; 18 + 19 + pub const Config = struct { 20 + worker_url: []const u8, 21 + secret: []const u8, 22 + }; 23 + 24 + pub fn getConfig() Config { 25 + return .{ 26 + .worker_url = std.posix.getenv("TYPEAHEAD_URL") orelse 27 + @panic("TYPEAHEAD_URL not set"), 28 + .secret = std.posix.getenv("TYPEAHEAD_SECRET") orelse 29 + @panic("TYPEAHEAD_SECRET not set"), 30 + }; 31 + } 32 + 33 + /// an actor event to POST to the worker 34 + pub const ActorEvent = struct { 35 + did: []const u8, 36 + handle: ?[]const u8 = null, 37 + display_name: ?[]const u8 = null, 38 + avatar_cid: ?[]const u8 = null, 39 + }; 40 + 41 + pub const IngestHandler = struct { 42 + allocator: Allocator, 43 + config: Config, 44 + transport: *HttpTransport, 45 + local_db: ?*LocalDb, 46 + buffer: std.ArrayList(ActorEvent), 47 + delete_buffer: std.ArrayList([]const u8), 48 + /// arena owns all string data in buffer/delete_buffer 49 + arena: std.heap.ArenaAllocator, 50 + /// bloom filter: fixed-size dedup for non-profile DIDs (~1.2MB) 51 + bloom: BloomFilter, 52 + pending_cursor: i64 = 0, 53 + flushed_cursor: i64 = 0, 54 + total_ingested: u64 = 0, 55 + total_deleted: u64 = 0, 56 + last_flush: i64 = 0, 57 + retry_after: i64 = 0, // timestamp before which we skip flush attempts 58 + 59 + pub fn init(allocator: Allocator, config: Config, transport: *HttpTransport, local_db: ?*LocalDb) !IngestHandler { 60 + return .{ 61 + .allocator = allocator, 62 + .config = config, 63 + .transport = transport, 64 + .local_db = local_db, 65 + .buffer = .{}, 66 + .delete_buffer = .{}, 67 + .arena = std.heap.ArenaAllocator.init(allocator), 68 + .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES), 69 + .last_flush = std.time.timestamp(), 70 + }; 71 + } 72 + 73 + pub fn deinit(self: *IngestHandler) void { 74 + self.bloom.deinit(self.allocator); 75 + self.arena.deinit(); 76 + self.buffer.deinit(self.allocator); 77 + self.delete_buffer.deinit(self.allocator); 78 + } 79 + 80 + fn dupe(self: *IngestHandler, s: []const u8) ?[]const u8 { 81 + return self.arena.allocator().dupe(u8, s) catch null; 82 + } 83 + 84 + fn resetBloom(self: *IngestHandler) void { 85 + log.info("resetting bloom filter ({d} insertions)", .{self.bloom.count}); 86 + self.bloom.reset(); 87 + } 88 + 89 + pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void { 90 + switch (event) { 91 + .commit => |c| self.handleCommit(c), 92 + .identity => |id| self.handleIdentity(id), 93 + .account => |acct| self.handleAccount(acct), 94 + } 95 + 96 + self.pending_cursor = event.timeUs(); 97 + 98 + const now = std.time.timestamp(); 99 + // drop incoming events if buffers are backed up (persistent failure) 100 + const MAX_BACKLOG = MAX_BATCH * 10; 101 + if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) { 102 + log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{ 103 + self.buffer.items.len, self.delete_buffer.items.len, 104 + }); 105 + self.buffer.clearRetainingCapacity(); 106 + self.delete_buffer.clearRetainingCapacity(); 107 + _ = self.arena.reset(.retain_capacity); 108 + self.resetBloom(); 109 + self.flushed_cursor = self.pending_cursor; 110 + self.retry_after = 0; 111 + } 112 + 113 + // respect backoff cooldown after failures 114 + if (now < self.retry_after) return; 115 + 116 + const should_flush = self.buffer.items.len >= MAX_BATCH or 117 + self.delete_buffer.items.len >= MAX_BATCH or 118 + (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0)); 119 + 120 + if (should_flush) { 121 + self.flush(); 122 + } 123 + } 124 + 125 + pub fn onError(_: *IngestHandler, err: anyerror) void { 126 + log.err("jetstream: {s}", .{@errorName(err)}); 127 + } 128 + 129 + pub fn onConnect(_: *IngestHandler, host: []const u8) void { 130 + log.info("connected to {s}", .{host}); 131 + } 132 + 133 + fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void { 134 + if (mem.eql(u8, c.collection, "app.bsky.actor.profile")) { 135 + // rich extraction from profile records 136 + if (c.operation != .create and c.operation != .update) return; 137 + 138 + const record = c.record orelse return; 139 + 140 + const did = self.dupe(c.did) orelse return; 141 + var event = ActorEvent{ .did = did }; 142 + 143 + if (zat.json.getString(record, "displayName")) |name| { 144 + event.display_name = self.dupe(name); 145 + } 146 + 147 + if (zat.json.getPath(record, "avatar")) |avatar| { 148 + if (zat.json.getString(avatar, "ref.$link")) |cid| { 149 + event.avatar_cid = self.dupe(cid); 150 + } 151 + } 152 + 153 + // dual-write to local SQLite (profile events have searchable data) 154 + self.writeToLocal(event); 155 + 156 + // ingester only sees self-labels from profile records — it can never 157 + // correctly determine hidden. let refreshModeration cron handle it. 158 + 159 + self.buffer.append(self.allocator, event) catch return; 160 + } else { 161 + // non-profile collections: just discover the DID 162 + if (c.operation == .delete) return; 163 + 164 + // dedup: skip if bloom filter says we've seen this DID recently 165 + if (self.bloom.contains(c.did)) return; 166 + self.bloom.insert(c.did); 167 + 168 + const did = self.dupe(c.did) orelse return; 169 + self.buffer.append(self.allocator, .{ .did = did }) catch return; 170 + } 171 + } 172 + 173 + fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void { 174 + const handle = id.handle orelse return; 175 + const event = ActorEvent{ 176 + .did = self.dupe(id.did) orelse return, 177 + .handle = self.dupe(handle), 178 + }; 179 + 180 + // dual-write: identity events have handles (searchable) 181 + self.writeToLocal(event); 182 + 183 + self.buffer.append(self.allocator, event) catch return; 184 + } 185 + 186 + fn handleAccount(self: *IngestHandler, acct: zat.jetstream.AccountEvent) void { 187 + if (!acct.active) { 188 + const did = self.dupe(acct.did) orelse return; 189 + 190 + // dual-write: delete from local 191 + if (self.local_db) |db| { 192 + db.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {}; 193 + db.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 194 + } 195 + 196 + self.delete_buffer.append(self.allocator, did) catch return; 197 + } 198 + } 199 + 200 + /// Dual-write to local SQLite for immediate search availability 201 + fn writeToLocal(self: *IngestHandler, event: ActorEvent) void { 202 + const db = self.local_db orelse return; 203 + if (!db.isReady()) return; 204 + 205 + const conn = db.getConn() orelse return; 206 + 207 + db.lock(); 208 + defer db.unlock(); 209 + 210 + // only write if we have searchable data (handle or display_name) 211 + const has_handle = event.handle != null and event.handle.?.len > 0; 212 + const has_name = event.display_name != null and event.display_name.?.len > 0; 213 + if (!has_handle and !has_name) return; 214 + 215 + // upsert actor 216 + conn.exec( 217 + \\INSERT INTO actors (did, handle, display_name, avatar_url) 218 + \\VALUES (?, ?, ?, ?) 219 + \\ON CONFLICT(did) DO UPDATE SET 220 + \\ handle = COALESCE(NULLIF(excluded.handle, ''), actors.handle), 221 + \\ display_name = COALESCE(NULLIF(excluded.display_name, ''), actors.display_name), 222 + \\ avatar_url = COALESCE(NULLIF(excluded.avatar_url, ''), actors.avatar_url) 223 + , .{ 224 + event.did, 225 + event.handle orelse "", 226 + event.display_name orelse "", 227 + event.avatar_cid orelse "", 228 + }) catch return; 229 + 230 + // update FTS 231 + conn.exec("DELETE FROM actors_fts WHERE did = ?", .{event.did}) catch {}; 232 + 233 + // read back current handle + display_name for FTS 234 + const row = conn.row( 235 + "SELECT handle, display_name FROM actors WHERE did = ?", 236 + .{event.did}, 237 + ) catch return; 238 + if (row) |r| { 239 + defer r.deinit(); 240 + const h = r.text(0); 241 + const dn = r.text(1); 242 + if (h.len > 0 or dn.len > 0) { 243 + conn.exec( 244 + "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)", 245 + .{ event.did, h, dn }, 246 + ) catch {}; 247 + } 248 + } 249 + } 250 + 251 + fn flush(self: *IngestHandler) void { 252 + self.last_flush = std.time.timestamp(); 253 + var any_failure = false; 254 + 255 + if (self.buffer.items.len > 0) { 256 + const batch_end = @min(self.buffer.items.len, MAX_BATCH); 257 + const batch = self.buffer.items[0..batch_end]; 258 + const ok = postBatch( 259 + self.transport, 260 + self.config, 261 + batch, 262 + self.flushed_cursor, 263 + ); 264 + if (ok) { 265 + self.total_ingested += batch_end; 266 + const rss = getRssKB(); 267 + log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{ 268 + batch_end, self.total_ingested, 269 + self.buffer.items.len - batch_end, self.pending_cursor, rss, 270 + }); 271 + if (batch_end < self.buffer.items.len) { 272 + std.mem.copyForwards( 273 + ActorEvent, 274 + self.buffer.items[0 .. self.buffer.items.len - batch_end], 275 + self.buffer.items[batch_end..], 276 + ); 277 + } 278 + self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end); 279 + } else { 280 + log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{ 281 + batch_end, self.buffer.items.len, 282 + }); 283 + any_failure = true; 284 + } 285 + } 286 + 287 + if (self.delete_buffer.items.len > 0) { 288 + const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH); 289 + const batch = self.delete_buffer.items[0..batch_end]; 290 + const ok = deleteActors( 291 + self.transport, 292 + self.config, 293 + batch, 294 + ); 295 + if (ok) { 296 + self.total_deleted += batch_end; 297 + log.info("-{d} moderated (total: {d})", .{ 298 + batch_end, self.total_deleted, 299 + }); 300 + if (batch_end < self.delete_buffer.items.len) { 301 + const remaining = self.delete_buffer.items.len - batch_end; 302 + var i: usize = 0; 303 + while (i < remaining) : (i += 1) { 304 + self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i]; 305 + } 306 + self.delete_buffer.shrinkRetainingCapacity(remaining); 307 + } else { 308 + self.delete_buffer.clearRetainingCapacity(); 309 + } 310 + } else { 311 + log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end}); 312 + any_failure = true; 313 + } 314 + } 315 + 316 + if (any_failure) { 317 + self.retry_after = std.time.timestamp() + 5; 318 + } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) { 319 + self.flushed_cursor = self.pending_cursor; 320 + _ = self.arena.reset(.retain_capacity); 321 + } 322 + } 323 + }; 324 + 325 + // -- HTTP client functions (worker API) -- 326 + 327 + fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool { 328 + var output: std.Io.Writer.Allocating = .init(transport.allocator); 329 + defer output.deinit(); 330 + 331 + var jw: json.Stringify = .{ 332 + .writer = &output.writer, 333 + .options = .{ .emit_null_optional_fields = false }, 334 + }; 335 + jw.write(.{ .events = events, .cursor = cursor }) catch return false; 336 + 337 + var url_buf: [512]u8 = undefined; 338 + const url = std.fmt.bufPrint(&url_buf, "{s}/admin/ingest", .{config.worker_url}) catch return false; 339 + 340 + var auth_buf: [256]u8 = undefined; 341 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 342 + 343 + const result = transport.fetch(.{ 344 + .url = url, 345 + .method = .POST, 346 + .payload = output.written(), 347 + .authorization = auth, 348 + }) catch return false; 349 + defer transport.allocator.free(result.body); 350 + 351 + if (result.status != .ok) { 352 + log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body }); 353 + } 354 + 355 + return result.status == .ok; 356 + } 357 + 358 + fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool { 359 + var output: std.Io.Writer.Allocating = .init(transport.allocator); 360 + defer output.deinit(); 361 + 362 + var jw: json.Stringify = .{ .writer = &output.writer }; 363 + jw.write(.{ .dids = dids }) catch return false; 364 + 365 + var url_buf: [512]u8 = undefined; 366 + const url = std.fmt.bufPrint(&url_buf, "{s}/admin/delete", .{config.worker_url}) catch return false; 367 + 368 + var auth_buf: [256]u8 = undefined; 369 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 370 + 371 + const result = transport.fetch(.{ 372 + .url = url, 373 + .method = .POST, 374 + .payload = output.written(), 375 + .authorization = auth, 376 + }) catch return false; 377 + defer transport.allocator.free(result.body); 378 + 379 + return result.status == .ok; 380 + } 381 + 382 + fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 { 383 + var url_buf: [512]u8 = undefined; 384 + const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null; 385 + 386 + var auth_buf: [256]u8 = undefined; 387 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null; 388 + 389 + const result = transport.fetch(.{ 390 + .url = url, 391 + .authorization = auth, 392 + }) catch return null; 393 + defer transport.allocator.free(result.body); 394 + 395 + if (result.status != .ok) return null; 396 + 397 + const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null; 398 + defer parsed.deinit(); 399 + 400 + const cursor_val = parsed.value.object.get("cursor") orelse return null; 401 + return switch (cursor_val) { 402 + .integer => |n| n, 403 + .float => |f| @intFromFloat(f), 404 + else => null, 405 + }; 406 + } 407 + 408 + pub fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 { 409 + const backoff = [_]u64{ 1, 3, 10 }; 410 + for (backoff, 0..) |delay, attempt| { 411 + if (fetchCursorOnce(transport, config)) |cursor| return cursor; 412 + log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 413 + std.Thread.sleep(delay * std.time.ns_per_s); 414 + } 415 + return null; 416 + } 417 + 418 + // -- utilities -- 419 + 420 + pub fn getRssKB() u64 { 421 + const f = std.fs.openFileAbsolute("/proc/self/statm", .{}) catch return 0; 422 + defer f.close(); 423 + var buf: [128]u8 = undefined; 424 + const n = f.read(&buf) catch return 0; 425 + var it = std.mem.splitScalar(u8, buf[0..n], ' '); 426 + _ = it.next(); // size 427 + const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0; 428 + return rss_pages * 4; 429 + }
+116 -446
ingester/src/main.zig
··· 1 1 const std = @import("std"); 2 2 const mem = std.mem; 3 - const json = std.json; 3 + const net = std.net; 4 + const posix = std.posix; 4 5 const Allocator = mem.Allocator; 6 + const Thread = std.Thread; 5 7 const zat = @import("zat"); 6 8 const HttpTransport = zat.HttpTransport; 9 + const LocalDb = @import("db/LocalDb.zig"); 10 + const sync = @import("db/sync.zig"); 11 + const server = @import("server.zig"); 12 + const ingest = @import("ingest.zig"); 7 13 8 14 const log = std.log.scoped(.ingester); 9 15 10 - const MAX_BATCH: usize = 100; 11 - const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed 12 - const BLOOM_HASHES: usize = 7; 13 - 14 - const BloomFilter = struct { 15 - bits: std.DynamicBitSetUnmanaged, 16 - num_bits: usize, 17 - num_hashes: usize, 18 - count: usize = 0, 19 - 20 - fn init(allocator: Allocator, num_bits: usize, num_hashes: usize) !BloomFilter { 21 - const bits = try std.DynamicBitSetUnmanaged.initEmpty(allocator, num_bits); 22 - return .{ 23 - .bits = bits, 24 - .num_bits = num_bits, 25 - .num_hashes = num_hashes, 26 - }; 27 - } 28 - 29 - fn deinit(self: *BloomFilter, allocator: Allocator) void { 30 - self.bits.deinit(allocator); 31 - } 16 + const MAX_HTTP_WORKERS = 8; 17 + const SOCKET_TIMEOUT_SECS = 5; 32 18 33 - fn hashIndices(self: *const BloomFilter, key: []const u8) [BLOOM_HASHES]usize { 34 - const h1 = std.hash.Wyhash.hash(0, key); 35 - const h2 = std.hash.Wyhash.hash(1, key); 36 - var indices: [BLOOM_HASHES]usize = undefined; 37 - for (0..self.num_hashes) |i| { 38 - indices[i] = @intCast((h1 +% i *% h2) % self.num_bits); 39 - } 40 - return indices; 41 - } 42 - 43 - fn insert(self: *BloomFilter, key: []const u8) void { 44 - const indices = self.hashIndices(key); 45 - for (indices) |idx| { 46 - self.bits.set(idx); 47 - } 48 - self.count += 1; 49 - } 50 - 51 - fn contains(self: *const BloomFilter, key: []const u8) bool { 52 - const indices = self.hashIndices(key); 53 - for (indices) |idx| { 54 - if (!self.bits.isSet(idx)) return false; 55 - } 56 - return true; 57 - } 58 - 59 - fn reset(self: *BloomFilter) void { 60 - self.bits.unsetAll(); 61 - self.count = 0; 62 - } 63 - }; 64 - 65 - const Config = struct { 66 - worker_url: []const u8, 67 - secret: []const u8, 68 - }; 69 - 70 - fn getConfig() Config { 71 - return .{ 72 - .worker_url = std.posix.getenv("TYPEAHEAD_URL") orelse 73 - @panic("TYPEAHEAD_URL not set"), 74 - .secret = std.posix.getenv("TYPEAHEAD_SECRET") orelse 75 - @panic("TYPEAHEAD_SECRET not set"), 76 - }; 77 - } 78 - 79 - /// an actor event to POST to the worker 80 - const ActorEvent = struct { 81 - did: []const u8, 82 - handle: ?[]const u8 = null, 83 - display_name: ?[]const u8 = null, 84 - avatar_cid: ?[]const u8 = null, 85 - }; 86 - 87 - const IngestHandler = struct { 19 + const JetstreamArgs = struct { 88 20 allocator: Allocator, 89 - config: Config, 21 + config: ingest.Config, 90 22 transport: *HttpTransport, 91 - buffer: std.ArrayList(ActorEvent), 92 - delete_buffer: std.ArrayList([]const u8), 93 - /// arena owns all string data in buffer/delete_buffer 94 - arena: std.heap.ArenaAllocator, 95 - /// bloom filter: fixed-size dedup for non-profile DIDs (~1.2MB) 96 - bloom: BloomFilter, 97 - pending_cursor: i64 = 0, 98 - flushed_cursor: i64 = 0, 99 - total_ingested: u64 = 0, 100 - total_deleted: u64 = 0, 101 - last_flush: i64 = 0, 102 - retry_after: i64 = 0, // timestamp before which we skip flush attempts 103 - 104 - fn init(allocator: Allocator, config: Config, transport: *HttpTransport) !IngestHandler { 105 - return .{ 106 - .allocator = allocator, 107 - .config = config, 108 - .transport = transport, 109 - .buffer = .{}, 110 - .delete_buffer = .{}, 111 - .arena = std.heap.ArenaAllocator.init(allocator), 112 - .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES), 113 - .last_flush = std.time.timestamp(), 114 - }; 115 - } 116 - 117 - fn deinit(self: *IngestHandler) void { 118 - self.bloom.deinit(self.allocator); 119 - self.arena.deinit(); 120 - self.buffer.deinit(self.allocator); 121 - self.delete_buffer.deinit(self.allocator); 122 - } 123 - 124 - fn dupe(self: *IngestHandler, s: []const u8) ?[]const u8 { 125 - return self.arena.allocator().dupe(u8, s) catch null; 126 - } 127 - 128 - fn resetBloom(self: *IngestHandler) void { 129 - log.info("resetting bloom filter ({d} insertions)", .{self.bloom.count}); 130 - self.bloom.reset(); 131 - } 132 - 133 - pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void { 134 - switch (event) { 135 - .commit => |c| self.handleCommit(c), 136 - .identity => |id| self.handleIdentity(id), 137 - .account => |acct| self.handleAccount(acct), 138 - } 139 - 140 - self.pending_cursor = event.timeUs(); 141 - 142 - const now = std.time.timestamp(); 143 - // drop incoming events if buffers are backed up (persistent failure) 144 - const MAX_BACKLOG = MAX_BATCH * 10; 145 - if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) { 146 - log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{ 147 - self.buffer.items.len, self.delete_buffer.items.len, 148 - }); 149 - self.buffer.clearRetainingCapacity(); 150 - self.delete_buffer.clearRetainingCapacity(); 151 - _ = self.arena.reset(.retain_capacity); 152 - self.resetBloom(); 153 - self.flushed_cursor = self.pending_cursor; 154 - self.retry_after = 0; 155 - } 156 - 157 - // respect backoff cooldown after failures 158 - if (now < self.retry_after) return; 159 - 160 - const should_flush = self.buffer.items.len >= MAX_BATCH or 161 - self.delete_buffer.items.len >= MAX_BATCH or 162 - (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0)); 163 - 164 - if (should_flush) { 165 - self.flush(); 166 - } 167 - } 168 - 169 - pub fn onError(_: *IngestHandler, err: anyerror) void { 170 - log.err("jetstream: {s}", .{@errorName(err)}); 171 - } 172 - 173 - pub fn onConnect(_: *IngestHandler, host: []const u8) void { 174 - log.info("connected to {s}", .{host}); 175 - } 176 - 177 - fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void { 178 - if (mem.eql(u8, c.collection, "app.bsky.actor.profile")) { 179 - // rich extraction from profile records 180 - if (c.operation != .create and c.operation != .update) return; 181 - 182 - const record = c.record orelse return; 183 - 184 - const did = self.dupe(c.did) orelse return; 185 - var event = ActorEvent{ .did = did }; 186 - 187 - if (zat.json.getString(record, "displayName")) |name| { 188 - event.display_name = self.dupe(name); 189 - } 190 - 191 - if (zat.json.getPath(record, "avatar")) |avatar| { 192 - if (zat.json.getString(avatar, "ref.$link")) |cid| { 193 - event.avatar_cid = self.dupe(cid); 194 - } 195 - } 196 - 197 - // ingester only sees self-labels from profile records — it can never 198 - // correctly determine hidden. let refreshModeration cron handle it. 199 - 200 - self.buffer.append(self.allocator, event) catch return; 201 - } else { 202 - // non-profile collections: just discover the DID 203 - if (c.operation == .delete) return; 204 - 205 - // dedup: skip if bloom filter says we've seen this DID recently 206 - if (self.bloom.contains(c.did)) return; 207 - self.bloom.insert(c.did); 208 - 209 - const did = self.dupe(c.did) orelse return; 210 - self.buffer.append(self.allocator, .{ .did = did }) catch return; 211 - } 212 - } 213 - 214 - fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void { 215 - const handle = id.handle orelse return; 216 - self.buffer.append(self.allocator, .{ 217 - .did = self.dupe(id.did) orelse return, 218 - .handle = self.dupe(handle), 219 - }) catch return; 220 - } 221 - 222 - fn handleAccount(self: *IngestHandler, acct: zat.jetstream.AccountEvent) void { 223 - if (!acct.active) { 224 - self.delete_buffer.append(self.allocator, self.dupe(acct.did) orelse return) catch return; 225 - } 226 - } 227 - 228 - fn flush(self: *IngestHandler) void { 229 - self.last_flush = std.time.timestamp(); 230 - var any_failure = false; 231 - 232 - if (self.buffer.items.len > 0) { 233 - // send at most MAX_BATCH at a time to avoid overwhelming D1 234 - const batch_end = @min(self.buffer.items.len, MAX_BATCH); 235 - const batch = self.buffer.items[0..batch_end]; 236 - const ok = postBatch( 237 - self.transport, 238 - self.config, 239 - batch, 240 - self.flushed_cursor, 241 - ); 242 - if (ok) { 243 - self.total_ingested += batch_end; 244 - const rss = getRssKB(); 245 - log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{ 246 - batch_end, self.total_ingested, 247 - self.buffer.items.len - batch_end, self.pending_cursor, rss, 248 - }); 249 - // shift remaining items forward 250 - if (batch_end < self.buffer.items.len) { 251 - std.mem.copyForwards( 252 - ActorEvent, 253 - self.buffer.items[0 .. self.buffer.items.len - batch_end], 254 - self.buffer.items[batch_end..], 255 - ); 256 - } 257 - self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end); 258 - } else { 259 - log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{ 260 - batch_end, self.buffer.items.len, 261 - }); 262 - any_failure = true; 263 - } 264 - } 265 - 266 - if (self.delete_buffer.items.len > 0) { 267 - const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH); 268 - const batch = self.delete_buffer.items[0..batch_end]; 269 - const ok = deleteActors( 270 - self.transport, 271 - self.config, 272 - batch, 273 - ); 274 - if (ok) { 275 - self.total_deleted += batch_end; 276 - log.info("-{d} moderated (total: {d})", .{ 277 - batch_end, self.total_deleted, 278 - }); 279 - if (batch_end < self.delete_buffer.items.len) { 280 - const remaining = self.delete_buffer.items.len - batch_end; 281 - // shift items - need to copy as slices 282 - var i: usize = 0; 283 - while (i < remaining) : (i += 1) { 284 - self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i]; 285 - } 286 - self.delete_buffer.shrinkRetainingCapacity(remaining); 287 - } else { 288 - self.delete_buffer.clearRetainingCapacity(); 289 - } 290 - } else { 291 - log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end}); 292 - any_failure = true; 293 - } 294 - } 295 - 296 - if (any_failure) { 297 - self.retry_after = std.time.timestamp() + 5; 298 - } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) { 299 - // only advance cursor and free arena when all work is drained 300 - self.flushed_cursor = self.pending_cursor; 301 - _ = self.arena.reset(.retain_capacity); 302 - } 303 - } 23 + local_db: ?*LocalDb, 304 24 }; 305 25 306 - fn getRssKB() u64 { 307 - const f = std.fs.openFileAbsolute("/proc/self/statm", .{}) catch return 0; 308 - defer f.close(); 309 - var buf: [128]u8 = undefined; 310 - const n = f.read(&buf) catch return 0; 311 - // statm: size resident shared text lib data dt (in pages) 312 - var it = std.mem.splitScalar(u8, buf[0..n], ' '); 313 - _ = it.next(); // size 314 - const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0; 315 - return rss_pages * 4; // pages are 4KB on Linux 316 - } 317 - 318 - fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool { 319 - var output: std.Io.Writer.Allocating = .init(transport.allocator); 320 - defer output.deinit(); 321 - 322 - var jw: json.Stringify = .{ 323 - .writer = &output.writer, 324 - .options = .{ .emit_null_optional_fields = false }, 26 + fn jetstreamThread(args: JetstreamArgs) void { 27 + var handler = ingest.IngestHandler.init( 28 + args.allocator, 29 + args.config, 30 + args.transport, 31 + args.local_db, 32 + ) catch |err| { 33 + log.err("handler init failed: {}", .{err}); 34 + return; 325 35 }; 326 - jw.write(.{ .events = events, .cursor = cursor }) catch return false; 36 + defer handler.deinit(); 327 37 328 - var url_buf: [512]u8 = undefined; 329 - const url = std.fmt.bufPrint(&url_buf, "{s}/admin/ingest", .{config.worker_url}) catch return false; 38 + var client = zat.JetstreamClient.init(args.allocator, .{ 39 + .wanted_collections = &.{ 40 + "app.bsky.actor.profile", 41 + "app.bsky.feed.post", 42 + "app.bsky.feed.like", 43 + "app.bsky.graph.follow", 44 + }, 45 + .cursor = ingest.fetchCursor(args.transport, args.config), 46 + }); 47 + defer client.deinit(); 330 48 331 - var auth_buf: [256]u8 = undefined; 332 - const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 333 - 334 - const result = transport.fetch(.{ 335 - .url = url, 336 - .method = .POST, 337 - .payload = output.written(), 338 - .authorization = auth, 339 - }) catch return false; 340 - defer transport.allocator.free(result.body); 341 - 342 - if (result.status != .ok) { 343 - log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body }); 344 - } 345 - 346 - return result.status == .ok; 49 + client.subscribe(&handler); 347 50 } 348 51 349 - fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool { 350 - var output: std.Io.Writer.Allocating = .init(transport.allocator); 351 - defer output.deinit(); 352 - 353 - var jw: json.Stringify = .{ .writer = &output.writer }; 354 - jw.write(.{ .dids = dids }) catch return false; 355 - 356 - var url_buf: [512]u8 = undefined; 357 - const url = std.fmt.bufPrint(&url_buf, "{s}/admin/delete", .{config.worker_url}) catch return false; 358 - 359 - var auth_buf: [256]u8 = undefined; 360 - const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 361 - 362 - const result = transport.fetch(.{ 363 - .url = url, 364 - .method = .POST, 365 - .payload = output.written(), 366 - .authorization = auth, 367 - }) catch return false; 368 - defer transport.allocator.free(result.body); 369 - 370 - return result.status == .ok; 52 + fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 53 + const timeout = mem.toBytes(posix.timeval{ 54 + .sec = @intCast(secs), 55 + .usec = 0, 56 + }); 57 + try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 58 + try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 371 59 } 372 60 373 - fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 { 374 - var url_buf: [512]u8 = undefined; 375 - const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null; 61 + pub fn main() !void { 62 + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 63 + defer _ = gpa.deinit(); 64 + const allocator = gpa.allocator(); 376 65 377 - var auth_buf: [256]u8 = undefined; 378 - const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null; 66 + const config = ingest.getConfig(); 67 + log.info("typeahead ingester → {s}", .{config.worker_url}); 379 68 380 - const result = transport.fetch(.{ 381 - .url = url, 382 - .authorization = auth, 383 - }) catch return null; 384 - defer transport.allocator.free(result.body); 385 - 386 - if (result.status != .ok) return null; 387 - 388 - const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null; 389 - defer parsed.deinit(); 390 - 391 - const cursor_val = parsed.value.object.get("cursor") orelse return null; 392 - return switch (cursor_val) { 393 - .integer => |n| n, 394 - .float => |f| @intFromFloat(f), 395 - else => null, 69 + // open local SQLite (optional — graceful degradation if no volume) 70 + var local_db = LocalDb.init(allocator); 71 + var local_db_ptr: ?*LocalDb = null; 72 + local_db.open() catch |err| { 73 + log.warn("local db failed to open: {}, search will be unavailable", .{err}); 396 74 }; 397 - } 398 - 399 - fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 { 400 - const backoff = [_]u64{ 1, 3, 10 }; 401 - for (backoff, 0..) |delay, attempt| { 402 - if (fetchCursorOnce(transport, config)) |cursor| return cursor; 403 - log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 404 - std.Thread.sleep(delay * std.time.ns_per_s); 75 + if (local_db.conn != null) { 76 + local_db_ptr = &local_db; 405 77 } 406 - return null; 407 - } 78 + defer local_db.deinit(); 408 79 409 - const HealthState = struct { 410 - handler: *IngestHandler, 411 - }; 412 - 413 - fn healthThread(state: *HealthState) void { 414 - const addr = std.net.Address.parseIp4("0.0.0.0", 8080) catch return; 415 - var srv = addr.listen(.{ .reuse_address = true }) catch |err| { 416 - log.err("health listener failed: {s}", .{@errorName(err)}); 417 - return; 80 + // start HTTP server FIRST so Fly proxy doesn't timeout 81 + const port: u16 = blk: { 82 + const port_str = posix.getenv("PORT") orelse "8080"; 83 + break :blk std.fmt.parseInt(u16, port_str, 10) catch 8080; 418 84 }; 419 - log.info("health endpoint listening on :8080", .{}); 420 85 421 - while (true) { 422 - const conn = srv.accept() catch continue; 423 - handleHealthConn(conn.stream, state.handler) catch {}; 424 - conn.stream.close(); 425 - } 426 - } 86 + const address = try net.Address.parseIp("0.0.0.0", port); 87 + var listener = try address.listen(.{ .reuse_address = true }); 88 + defer listener.deinit(); 427 89 428 - fn handleHealthConn(stream: std.net.Stream, handler: *IngestHandler) !void { 429 - var buf: [1024]u8 = undefined; 430 - _ = stream.read(&buf) catch return; 90 + log.info("listening on port {d}", .{port}); 431 91 432 - const rss = getRssKB(); 433 - var body_buf: [256]u8 = undefined; 434 - const body = std.fmt.bufPrint(&body_buf, "{{\"status\":\"ok\",\"ingested\":{d},\"deleted\":{d},\"bloom\":{d},\"rss_kb\":{d}}}", .{ 435 - handler.total_ingested, handler.total_deleted, handler.bloom.count, rss, 436 - }) catch return; 92 + // init thread pool for HTTP connections 93 + var pool: Thread.Pool = undefined; 94 + try pool.init(.{ 95 + .allocator = allocator, 96 + .n_jobs = MAX_HTTP_WORKERS, 97 + }); 98 + defer pool.deinit(); 437 99 438 - var resp_buf: [512]u8 = undefined; 439 - const resp = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\nConnection: close\r\n\r\n{s}", .{ 440 - body.len, body, 441 - }) catch return; 442 - 443 - _ = stream.write(resp) catch {}; 444 - } 445 - 446 - pub fn main() !void { 447 - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 448 - defer _ = gpa.deinit(); 449 - const allocator = gpa.allocator(); 450 - 451 - const config = getConfig(); 452 - log.info("typeahead ingester → {s}", .{config.worker_url}); 100 + // start sync thread (background — turso → local SQLite) 101 + if (local_db_ptr) |db| { 102 + const sync_thread = try Thread.spawn(.{}, sync.syncLoop, .{ allocator, db }); 103 + sync_thread.detach(); 104 + } 453 105 106 + // start jetstream thread (background — ingestion) 454 107 var transport = HttpTransport.init(allocator); 455 108 transport.keep_alive = false; 456 109 defer transport.deinit(); 457 110 458 - const cursor = fetchCursor(&transport, config); 459 - if (cursor) |c| { 460 - log.info("resuming from cursor {d}", .{c}); 461 - } else { 462 - log.info("no cursor found, starting from live", .{}); 463 - } 111 + const js_thread = try Thread.spawn(.{}, jetstreamThread, .{JetstreamArgs{ 112 + .allocator = allocator, 113 + .config = config, 114 + .transport = &transport, 115 + .local_db = local_db_ptr, 116 + }}); 117 + js_thread.detach(); 464 118 465 - var handler = try IngestHandler.init(allocator, config, &transport); 466 - defer handler.deinit(); 119 + // main thread: HTTP accept loop 120 + while (true) { 121 + const conn = listener.accept() catch |err| { 122 + log.err("accept error: {}", .{err}); 123 + continue; 124 + }; 467 125 468 - // health endpoint in background thread 469 - var health_state = HealthState{ .handler = &handler }; 470 - const health_thread = try std.Thread.spawn(.{}, healthThread, .{&health_state}); 471 - health_thread.detach(); 472 - 473 - var client = zat.JetstreamClient.init(allocator, .{ 474 - .wanted_collections = &.{ 475 - "app.bsky.actor.profile", 476 - "app.bsky.feed.post", 477 - "app.bsky.feed.like", 478 - "app.bsky.graph.follow", 479 - }, 480 - .cursor = cursor, 481 - }); 482 - defer client.deinit(); 126 + setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 127 + log.warn("failed to set socket timeout: {}", .{err}); 128 + }; 483 129 484 - client.subscribe(&handler); 130 + if (local_db_ptr) |db| { 131 + pool.spawn(server.handleConnection, .{ conn, db }) catch |err| { 132 + log.err("pool spawn error: {}", .{err}); 133 + conn.stream.close(); 134 + }; 135 + } else { 136 + // no local DB — just serve health 137 + var read_buffer: [1024]u8 = undefined; 138 + var write_buffer: [1024]u8 = undefined; 139 + var reader = conn.stream.reader(&read_buffer); 140 + var writer = conn.stream.writer(&write_buffer); 141 + var srv = std.http.Server.init(reader.interface(), &writer.interface); 142 + var request = srv.receiveHead() catch { 143 + conn.stream.close(); 144 + continue; 145 + }; 146 + request.respond("{\"error\":\"search unavailable\"}", .{ 147 + .status = .service_unavailable, 148 + .extra_headers = &.{ 149 + .{ .name = "content-type", .value = "application/json" }, 150 + }, 151 + }) catch {}; 152 + conn.stream.close(); 153 + } 154 + } 485 155 }
+296
ingester/src/search.zig
··· 1 + //! Search handler: 3-query ranking strategy 2 + //! Replicates src/handlers/search.ts logic using local SQLite FTS5 3 + 4 + const std = @import("std"); 5 + const json = std.json; 6 + const mem = std.mem; 7 + const Allocator = mem.Allocator; 8 + const LocalDb = @import("db/LocalDb.zig"); 9 + 10 + const log = std.log.scoped(.search); 11 + 12 + const MAX_SEEN = 201; // 1 exact + 100 prefix + 100 FTS 13 + const MAX_DID_LEN = 64; 14 + 15 + /// Perform the 3-query search and write JSON response into the writer. 16 + /// Ranking: exact handle → handle prefix (LIKE) → FTS5 prefix 17 + /// Matches the CF Worker's search.ts ranking. 18 + /// 19 + /// Results are written inline while SQLite statements are alive to avoid 20 + /// use-after-free on row text pointers (which are only valid until the 21 + /// next step() or deinit() on the statement). 22 + pub fn search(local: *LocalDb, raw_query: []const u8, limit: usize, writer: anytype) !void { 23 + const term = sanitize(raw_query); 24 + if (term.len == 0) { 25 + try writer.writeAll("{\"actors\":[]}"); 26 + return; 27 + } 28 + 29 + // DID dedup: copy DIDs into stack storage so they outlive each query 30 + var seen_storage: [MAX_SEEN * MAX_DID_LEN]u8 = undefined; 31 + var seen_fba = std.heap.FixedBufferAllocator.init(&seen_storage); 32 + var seen_dids: [MAX_SEEN][]const u8 = undefined; 33 + var seen_count: usize = 0; 34 + 35 + var jw: json.Stringify = .{ .writer = writer }; 36 + try jw.beginObject(); 37 + try jw.objectField("actors"); 38 + try jw.beginArray(); 39 + 40 + var emitted: usize = 0; 41 + 42 + // 1. exact handle match 43 + { 44 + var rows = local.query( 45 + \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated 46 + \\FROM actors WHERE handle = ? COLLATE NOCASE AND hidden = 0 LIMIT 1 47 + , .{term}) catch |err| blk: { 48 + log.err("exact query failed: {}", .{err}); 49 + break :blk null; 50 + }; 51 + if (rows) |*r| { 52 + defer r.deinit(); 53 + if (r.next()) |row| { 54 + if (emitted < limit) { 55 + try writeActorFromRow(&jw, row); 56 + if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(0))) {} 57 + emitted += 1; 58 + } 59 + } 60 + } 61 + } 62 + 63 + // build queries 64 + var fts_query_buf: [256]u8 = undefined; 65 + var like_query_buf: [256]u8 = undefined; 66 + 67 + const fts_query = std.fmt.bufPrint(&fts_query_buf, "\"{s}\"*", .{term}) catch { 68 + try jw.endArray(); 69 + try jw.endObject(); 70 + return; 71 + }; 72 + const like_query = std.fmt.bufPrint(&like_query_buf, "{s}%", .{term}) catch { 73 + try jw.endArray(); 74 + try jw.endObject(); 75 + return; 76 + }; 77 + 78 + // 2. two-phase handle prefix match 79 + // Phase 1: index-friendly LIKE with no ORDER BY (enables index early termination) 80 + // Phase 2: sort candidates by handle length in Zig, point-lookup for full rows 81 + // This restores the Worker's length-based ranking without the pathological full-scan sort. 82 + { 83 + const max_prefix = 100; 84 + const prefix_fetch = @min(limit * 5, max_prefix); 85 + 86 + var prefix_did_storage: [max_prefix * MAX_DID_LEN]u8 = undefined; 87 + var prefix_fba = std.heap.FixedBufferAllocator.init(&prefix_did_storage); 88 + var prefix_dids: [max_prefix][]const u8 = undefined; 89 + var prefix_hlens: [max_prefix]usize = undefined; 90 + var prefix_count: usize = 0; 91 + 92 + // phase 1: fast index scan (no ORDER BY — uses handle COLLATE NOCASE index) 93 + { 94 + var rows = local.query( 95 + \\SELECT did, handle FROM actors 96 + \\WHERE handle LIKE ? COLLATE NOCASE AND hidden = 0 LIMIT ? 97 + , .{ like_query, prefix_fetch }) catch |err| blk: { 98 + log.err("prefix query failed: {}", .{err}); 99 + break :blk null; 100 + }; 101 + if (rows) |*r| { 102 + defer r.deinit(); 103 + while (r.next()) |row| { 104 + const did = row.text(0); 105 + const handle = row.text(1); 106 + // skip exact match (already in tier 1) 107 + if (handle.len == term.len and asciiEqlIgnoreCase(handle, term)) continue; 108 + if (isDuplicate(seen_dids[0..seen_count], did)) continue; 109 + const copy = prefix_fba.allocator().dupe(u8, did) catch break; 110 + prefix_dids[prefix_count] = copy; 111 + prefix_hlens[prefix_count] = handle.len; 112 + prefix_count += 1; 113 + } 114 + } 115 + } 116 + 117 + // sort by handle length (insertion sort — at most 100 elements, <1μs) 118 + { 119 + var i: usize = 1; 120 + while (i < prefix_count) : (i += 1) { 121 + const key_did = prefix_dids[i]; 122 + const key_hlen = prefix_hlens[i]; 123 + var j: usize = i; 124 + while (j > 0 and prefix_hlens[j - 1] > key_hlen) { 125 + prefix_dids[j] = prefix_dids[j - 1]; 126 + prefix_hlens[j] = prefix_hlens[j - 1]; 127 + j -= 1; 128 + } 129 + prefix_dids[j] = key_did; 130 + prefix_hlens[j] = key_hlen; 131 + } 132 + } 133 + 134 + // phase 2: point lookups in handle-length order 135 + for (prefix_dids[0..prefix_count]) |did| { 136 + if (emitted >= limit) break; 137 + var rows = local.query( 138 + \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated 139 + \\FROM actors WHERE did = ? 140 + , .{did}) catch continue; 141 + defer rows.deinit(); 142 + if (rows.next()) |row| { 143 + try writeActorFromRow(&jw, row); 144 + if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(0))) {} 145 + emitted += 1; 146 + } 147 + } 148 + } 149 + 150 + // 3. two-phase FTS5 prefix search 151 + // Phase 1: pure FTS5 ranked query (no JOIN — enables rank optimization + early termination) 152 + // Phase 2: point lookups on actors table by primary key 153 + // Skip if tiers 1+2 already filled the limit, or term too short (single-char prefix is pathological) 154 + if (emitted < limit and term.len >= 2) { 155 + const overfetch = limit * 5; 156 + const max_candidates = 500; 157 + const fetch_count = @min(overfetch, max_candidates); 158 + 159 + // collect candidate DIDs into stack buffer (row text pointers die on step/deinit) 160 + var candidate_storage: [max_candidates * MAX_DID_LEN]u8 = undefined; 161 + var candidate_fba = std.heap.FixedBufferAllocator.init(&candidate_storage); 162 + var candidate_dids: [max_candidates][]const u8 = undefined; 163 + var candidate_count: usize = 0; 164 + 165 + // phase 1: FTS5-only ranked query 166 + { 167 + var rows = local.query( 168 + \\SELECT did FROM actors_fts WHERE actors_fts MATCH ? ORDER BY rank LIMIT ? 169 + , .{ fts_query, fetch_count }) catch |err| blk: { 170 + log.err("FTS query failed for '{s}': {}", .{ fts_query, err }); 171 + break :blk null; 172 + }; 173 + if (rows) |*r| { 174 + defer r.deinit(); 175 + while (r.next()) |row| { 176 + const did = row.text(0); 177 + if (isDuplicate(seen_dids[0..seen_count], did)) continue; 178 + const copy = candidate_fba.allocator().dupe(u8, did) catch break; 179 + candidate_dids[candidate_count] = copy; 180 + candidate_count += 1; 181 + } 182 + } 183 + } 184 + 185 + // phase 2: point lookups by primary key 186 + for (candidate_dids[0..candidate_count]) |did| { 187 + if (emitted >= limit) break; 188 + var rows = local.query( 189 + \\SELECT did, handle, display_name, avatar_url, labels, created_at, associated 190 + \\FROM actors WHERE did = ? AND handle != '' AND hidden = 0 191 + , .{did}) catch continue; 192 + defer rows.deinit(); 193 + if (rows.next()) |row| { 194 + if (!isDuplicate(seen_dids[0..seen_count], row.text(0))) { 195 + try writeActorFromRow(&jw, row); 196 + if (trackDid(&seen_fba, &seen_dids, &seen_count, row.text(0))) {} 197 + emitted += 1; 198 + } 199 + } 200 + } 201 + } 202 + 203 + try jw.endArray(); 204 + try jw.endObject(); 205 + } 206 + 207 + /// Write a single actor object from a live SQLite row. 208 + /// Must be called while the row's statement is still alive. 209 + fn writeActorFromRow(jw: anytype, row: LocalDb.Row) !void { 210 + const did = row.text(0); 211 + const handle = row.text(1); 212 + const display_name = row.text(2); 213 + const avatar_url = row.text(3); 214 + const labels = row.text(4); 215 + const created_at = row.text(5); 216 + const associated = row.text(6); 217 + 218 + try jw.beginObject(); 219 + 220 + try jw.objectField("did"); 221 + try jw.write(did); 222 + try jw.objectField("handle"); 223 + try jw.write(handle); 224 + 225 + if (display_name.len > 0) { 226 + try jw.objectField("displayName"); 227 + try jw.write(display_name); 228 + } 229 + if (avatar_url.len > 0) { 230 + try jw.objectField("avatar"); 231 + try jw.print("\"https://cdn.bsky.app/img/avatar/plain/{s}/{s}\"", .{ did, avatar_url }); 232 + } 233 + if (!mem.eql(u8, associated, "{}") and associated.len > 2) { 234 + try jw.objectField("associated"); 235 + try jw.print("{s}", .{associated}); 236 + } 237 + try jw.objectField("labels"); 238 + try jw.print("{s}", .{labels}); 239 + 240 + if (created_at.len > 0) { 241 + try jw.objectField("createdAt"); 242 + try jw.write(created_at); 243 + } 244 + 245 + try jw.endObject(); 246 + } 247 + 248 + /// Copy a DID into the fixed buffer for cross-query dedup. 249 + fn trackDid(fba: *std.heap.FixedBufferAllocator, seen: *[MAX_SEEN][]const u8, count: *usize, did: []const u8) bool { 250 + if (count.* >= MAX_SEEN) return false; 251 + const copy = fba.allocator().dupe(u8, did) catch return false; 252 + seen[count.*] = copy; 253 + count.* += 1; 254 + return true; 255 + } 256 + 257 + fn asciiEqlIgnoreCase(a: []const u8, b: []const u8) bool { 258 + if (a.len != b.len) return false; 259 + for (a, b) |ac, bc| { 260 + if (toLower(ac) != toLower(bc)) return false; 261 + } 262 + return true; 263 + } 264 + 265 + fn toLower(c: u8) u8 { 266 + return if (c >= 'A' and c <= 'Z') c + 32 else c; 267 + } 268 + 269 + fn isDuplicate(seen: []const []const u8, did: []const u8) bool { 270 + for (seen) |s| { 271 + if (mem.eql(u8, s, did)) return true; 272 + } 273 + return false; 274 + } 275 + 276 + /// Keep only unicode letters, digits, whitespace, '.', '-' 277 + /// Matches the worker's sanitize() in src/utils.ts 278 + /// Uses a static buffer — safe for single query at a time per thread. 279 + var sanitize_buf: [512]u8 = undefined; 280 + 281 + fn sanitize(input: []const u8) []const u8 { 282 + var out: usize = 0; 283 + for (input) |c| { 284 + if (out >= sanitize_buf.len) break; 285 + if ((c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z') or (c >= '0' and c <= '9') or c == '.' or c == '-' or c == ' ' or c == '\t') { 286 + sanitize_buf[out] = c; 287 + out += 1; 288 + } else if (c >= 0x80) { 289 + // pass through UTF-8 bytes (unicode letters/digits) 290 + sanitize_buf[out] = c; 291 + out += 1; 292 + } 293 + // else: strip it (FTS5 special chars, punctuation, etc.) 294 + } 295 + return mem.trim(u8, sanitize_buf[0..out], " \t\n\r"); 296 + }
+261
ingester/src/server.zig
··· 1 + //! HTTP server for search and health endpoints 2 + //! Uses std.http.Server with thread pool 3 + 4 + const std = @import("std"); 5 + const http = std.http; 6 + const net = std.net; 7 + const mem = std.mem; 8 + const json = std.json; 9 + const LocalDb = @import("db/LocalDb.zig"); 10 + const search = @import("search.zig"); 11 + const ingest = @import("ingest.zig"); 12 + 13 + const log = std.log.scoped(.server); 14 + 15 + const HTTP_BUF_SIZE = 65536; 16 + 17 + pub fn handleConnection(conn: net.Server.Connection, local_db: *LocalDb) void { 18 + defer conn.stream.close(); 19 + 20 + var read_buffer: [HTTP_BUF_SIZE]u8 = undefined; 21 + var write_buffer: [HTTP_BUF_SIZE]u8 = undefined; 22 + 23 + var reader = conn.stream.reader(&read_buffer); 24 + var writer = conn.stream.writer(&write_buffer); 25 + 26 + var server = http.Server.init(reader.interface(), &writer.interface); 27 + 28 + while (true) { 29 + var request = server.receiveHead() catch |err| { 30 + if (err != error.HttpConnectionClosing and err != error.EndOfStream) { 31 + log.debug("http receive error: {}", .{err}); 32 + } 33 + return; 34 + }; 35 + 36 + handleRequest(&request, local_db) catch |err| { 37 + log.err("request error: {}", .{err}); 38 + return; 39 + }; 40 + 41 + if (!request.head.keep_alive) return; 42 + } 43 + } 44 + 45 + fn handleRequest(request: *http.Server.Request, local_db: *LocalDb) !void { 46 + const target = request.head.target; 47 + 48 + if (request.head.method == .OPTIONS) { 49 + try sendCors(request); 50 + return; 51 + } 52 + 53 + const path = if (mem.indexOf(u8, target, "?")) |qi| target[0..qi] else target; 54 + 55 + if (mem.eql(u8, path, "/search")) { 56 + try handleSearch(request, target, local_db); 57 + } else if (mem.eql(u8, path, "/health")) { 58 + try handleHealth(request, local_db); 59 + } else if (mem.eql(u8, path, "/debug/fts")) { 60 + try handleDebugFts(request, target, local_db); 61 + } else { 62 + try sendJson(request, .not_found, "{\"error\":\"not found\"}"); 63 + } 64 + } 65 + 66 + fn handleSearch(request: *http.Server.Request, target: []const u8, local_db: *LocalDb) !void { 67 + if (!local_db.isReady()) { 68 + try sendJson(request, .service_unavailable, "{\"error\":\"not ready\"}"); 69 + return; 70 + } 71 + 72 + const raw_q = parseQueryParam(target, "q") orelse parseQueryParam(target, "term") orelse { 73 + try sendJson(request, .bad_request, "{\"error\":\"missing q parameter\"}"); 74 + return; 75 + }; 76 + 77 + var decode_buf: [512]u8 = undefined; 78 + const q = percentDecode(raw_q, &decode_buf) orelse raw_q; 79 + 80 + if (q.len == 0) { 81 + try sendJson(request, .bad_request, "{\"error\":\"empty query\"}"); 82 + return; 83 + } 84 + 85 + const limit_str = parseQueryParam(target, "limit"); 86 + const limit: usize = if (limit_str) |s| 87 + std.fmt.parseInt(usize, s, 10) catch 10 88 + else 89 + 10; 90 + const clamped_limit = @min(@max(limit, 1), 100); 91 + 92 + var output: std.Io.Writer.Allocating = .init(std.heap.page_allocator); 93 + defer output.deinit(); 94 + 95 + search.search(local_db, q, clamped_limit, &output.writer) catch { 96 + try sendJson(request, .internal_server_error, "{\"error\":\"search failed\"}"); 97 + return; 98 + }; 99 + 100 + const body = output.written(); 101 + try request.respond(body, .{ 102 + .extra_headers = &.{ 103 + .{ .name = "content-type", .value = "application/json" }, 104 + .{ .name = "access-control-allow-origin", .value = "*" }, 105 + .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 106 + .{ .name = "access-control-allow-headers", .value = "content-type" }, 107 + }, 108 + }); 109 + } 110 + 111 + fn handleHealth(request: *http.Server.Request, local_db: *LocalDb) !void { 112 + const ready = local_db.isReady(); 113 + const actors = local_db.countActors(); 114 + const rss = ingest.getRssKB(); 115 + 116 + var buf: [256]u8 = undefined; 117 + const body = std.fmt.bufPrint(&buf, 118 + \\{{"status":"ok","ready":{s},"actors":{d},"rss_kb":{d}}} 119 + , .{ 120 + if (ready) "true" else "false", 121 + actors, 122 + rss, 123 + }) catch "{\"status\":\"ok\"}"; 124 + 125 + try sendJson(request, .ok, body); 126 + } 127 + 128 + fn sendJson(request: *http.Server.Request, status: http.Status, body: []const u8) !void { 129 + try request.respond(body, .{ 130 + .status = status, 131 + .extra_headers = &.{ 132 + .{ .name = "content-type", .value = "application/json" }, 133 + .{ .name = "access-control-allow-origin", .value = "*" }, 134 + .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 135 + .{ .name = "access-control-allow-headers", .value = "content-type" }, 136 + }, 137 + }); 138 + } 139 + 140 + fn sendCors(request: *http.Server.Request) !void { 141 + try request.respond("", .{ 142 + .status = .no_content, 143 + .extra_headers = &.{ 144 + .{ .name = "access-control-allow-origin", .value = "*" }, 145 + .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 146 + .{ .name = "access-control-allow-headers", .value = "content-type" }, 147 + }, 148 + }); 149 + } 150 + 151 + fn parseQueryParam(target: []const u8, param: []const u8) ?[]const u8 { 152 + const prefixes = [_][]const u8{ "?", "&" }; 153 + for (prefixes) |prefix| { 154 + var search_buf: [64]u8 = undefined; 155 + const search_str = std.fmt.bufPrint(&search_buf, "{s}{s}=", .{ prefix, param }) catch continue; 156 + if (mem.indexOf(u8, target, search_str)) |idx| { 157 + const rest = target[idx + search_str.len ..]; 158 + const end = mem.indexOf(u8, rest, "&") orelse rest.len; 159 + const value = rest[0..end]; 160 + if (value.len == 0) return null; 161 + return value; 162 + } 163 + } 164 + return null; 165 + } 166 + 167 + /// Decode percent-encoded URL query value into caller-provided buffer. 168 + /// Handles %XX hex pairs and '+' as space. Thread-safe (no shared state). 169 + fn percentDecode(input: []const u8, buf: []u8) ?[]const u8 { 170 + var out: usize = 0; 171 + var i: usize = 0; 172 + while (i < input.len) { 173 + if (out >= buf.len) return null; 174 + if (input[i] == '%' and i + 2 < input.len) { 175 + const hi = hexVal(input[i + 1]) orelse { 176 + buf[out] = input[i]; 177 + out += 1; 178 + i += 1; 179 + continue; 180 + }; 181 + const lo = hexVal(input[i + 2]) orelse { 182 + buf[out] = input[i]; 183 + out += 1; 184 + i += 1; 185 + continue; 186 + }; 187 + buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 188 + out += 1; 189 + i += 3; 190 + } else if (input[i] == '+') { 191 + buf[out] = ' '; 192 + out += 1; 193 + i += 1; 194 + } else { 195 + buf[out] = input[i]; 196 + out += 1; 197 + i += 1; 198 + } 199 + } 200 + return buf[0..out]; 201 + } 202 + 203 + fn hexVal(c: u8) ?u4 { 204 + return switch (c) { 205 + '0'...'9' => @intCast(c - '0'), 206 + 'a'...'f' => @intCast(c - 'a' + 10), 207 + 'A'...'F' => @intCast(c - 'A' + 10), 208 + else => null, 209 + }; 210 + } 211 + 212 + /// Debug endpoint: compare actors vs actors_fts for a DID 213 + /// Usage: /debug/fts?did=did:plc:xxx 214 + fn handleDebugFts(request: *http.Server.Request, target: []const u8, local_db: *LocalDb) !void { 215 + const did = parseQueryParam(target, "did") orelse { 216 + try sendJson(request, .bad_request, "{\"error\":\"missing did parameter\"}"); 217 + return; 218 + }; 219 + 220 + var decode_buf: [512]u8 = undefined; 221 + const term = percentDecode(did, &decode_buf) orelse did; 222 + 223 + var out: std.Io.Writer.Allocating = .init(std.heap.page_allocator); 224 + defer out.deinit(); 225 + var jw: json.Stringify = .{ .writer = &out.writer }; 226 + 227 + try jw.beginObject(); 228 + 229 + // actors table row 230 + try jw.objectField("actors"); 231 + try writeDebugRow(&jw, local_db, "SELECT did, handle, display_name FROM actors WHERE did = ?", term); 232 + 233 + // FTS table row for same DID 234 + try jw.objectField("fts"); 235 + try writeDebugRow(&jw, local_db, "SELECT did, handle, display_name FROM actors_fts WHERE did = ?", term); 236 + 237 + try jw.endObject(); 238 + 239 + try sendJson(request, .ok, out.written()); 240 + } 241 + 242 + fn writeDebugRow(jw: anytype, local_db: *LocalDb, comptime sql: []const u8, param: []const u8) !void { 243 + var rows = local_db.query(sql, .{param}) catch { 244 + try jw.write("error"); 245 + return; 246 + }; 247 + defer rows.deinit(); 248 + if (rows.next()) |row| { 249 + try jw.beginObject(); 250 + try jw.objectField("did"); 251 + try jw.write(row.text(0)); 252 + try jw.objectField("handle"); 253 + try jw.write(row.text(1)); 254 + try jw.objectField("display_name"); 255 + try jw.write(row.text(2)); 256 + try jw.endObject(); 257 + } else { 258 + try jw.write(null); 259 + } 260 + } 261 +