GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1//! Local SQLite read replica using zqlite
2//! Provides fast FTS5 queries while Turso remains source of truth
3
4const std = @import("std");
5const io = std.Options.debug_io;
6const zqlite = @import("zqlite");
7const Allocator = std.mem.Allocator;
8
9const log = std.log.scoped(.local_db);
10
11const LocalDb = @This();
12
13conn: ?zqlite.Conn = null,
14read_conn: ?zqlite.Conn = null, // separate read connection — never blocked by writes in WAL mode
15allocator: Allocator,
16is_ready: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
17mutex: std.Io.Mutex = std.Io.Mutex.init, // protects write conn only
18path: []const u8 = "",
19
20/// column list for full actor row queries (sync + search)
21pub const actor_cols = "did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds";
22
23/// named column indexes matching actor_cols order
24pub const Col = struct {
25 pub const did: usize = 0;
26 pub const handle: usize = 1;
27 pub const display_name: usize = 2;
28 pub const avatar_url: usize = 3;
29 pub const hidden: usize = 4;
30 pub const labels: usize = 5;
31 pub const created_at: usize = 6;
32 pub const associated: usize = 7;
33 pub const pds: usize = 8;
34};
35
36fn cGetenv(name: [*:0]const u8) ?[]const u8 {
37 if (std.c.getenv(name)) |p| return std.mem.span(p);
38 return null;
39}
40
41pub fn init(allocator: Allocator) LocalDb {
42 return .{ .allocator = allocator };
43}
44
45/// Clean up leftover staging tables from interrupted bootstrap.
46/// Called during open so sync starts clean.
47fn cleanupStagingTables(self: *LocalDb) void {
48 const c = self.conn orelse return;
49 c.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {};
50 c.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {};
51}
52
53/// Verify FTS5 table is functional. If broken (e.g. from a failed RENAME),
54/// drop it and recreate empty — sync will repopulate it.
55fn repairFtsIfBroken(self: *LocalDb) void {
56 const c = self.conn orelse return;
57 // probe: try a simple MATCH query — if shadow tables are broken this errors
58 const row = c.row("SELECT did FROM actors_fts WHERE actors_fts MATCH '\"test\"*' LIMIT 1", .{}) catch {
59 log.warn("FTS5 table broken, dropping and recreating", .{});
60 c.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {};
61 c.exec(
62 \\CREATE VIRTUAL TABLE actors_fts USING fts5(
63 \\ did UNINDEXED, handle, display_name,
64 \\ tokenize='unicode61 remove_diacritics 2'
65 \\)
66 , .{}) catch {};
67 // clear sync state so fullSync rebuilds FTS
68 c.exec("DELETE FROM sync_meta WHERE key = 'sync_complete'", .{}) catch {};
69 c.exec("DELETE FROM sync_meta WHERE key = 'last_sync'", .{}) catch {};
70 log.info("FTS5 repaired, sync will rebuild on next run", .{});
71 return;
72 };
73 if (row) |r| r.deinit();
74}
75
76pub fn open(self: *LocalDb) !void {
77 const path_env = cGetenv("LOCAL_DB_PATH") orelse "/data/local.db";
78 self.path = path_env;
79
80 // ensure SQLite temp directory exists on the persistent volume
81 const tmp_dir = cGetenv("SQLITE_TMPDIR") orelse "/data/tmp";
82 std.Io.Dir.createDirPath(.cwd(), io, tmp_dir) catch |err| {
83 log.warn("failed to create temp dir {s}: {}", .{ tmp_dir, err });
84 };
85
86 try self.openDb(path_env);
87}
88
89fn openDb(self: *LocalDb, path_env: []const u8) !void {
90 var path_buf: [256]u8 = undefined;
91 if (path_env.len >= path_buf.len) return error.PathTooLong;
92 @memcpy(path_buf[0..path_env.len], path_env);
93 path_buf[path_env.len] = 0;
94 const path: [*:0]const u8 = path_buf[0..path_env.len :0];
95
96 log.info("opening {s}", .{path_env});
97
98 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite;
99 self.conn = zqlite.open(path, flags) catch |err| {
100 log.err("failed to open write conn: {}", .{err});
101 return err;
102 };
103
104 _ = self.conn.?.exec("PRAGMA journal_mode=WAL", .{}) catch {};
105 _ = self.conn.?.exec("PRAGMA busy_timeout=5000", .{}) catch {};
106 _ = self.conn.?.exec("PRAGMA synchronous=NORMAL", .{}) catch {}; // safe with WAL
107 _ = self.conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; // 20MB page cache
108 _ = self.conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; // 256MB
109
110 // separate read connection — WAL allows concurrent reads + writes
111 self.read_conn = zqlite.open(path, zqlite.OpenFlags.ReadOnly) catch |err| {
112 log.err("failed to open read conn: {}", .{err});
113 return err;
114 };
115 _ = self.read_conn.?.exec("PRAGMA busy_timeout=1000", .{}) catch {};
116 _ = self.read_conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; // 256MB
117 _ = self.read_conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; // 20MB
118
119 self.cleanupStagingTables();
120 try self.createSchema();
121 self.repairFtsIfBroken();
122 log.info("initialized", .{});
123}
124
125pub fn deinit(self: *LocalDb) void {
126 if (self.read_conn) |c| c.close();
127 self.read_conn = null;
128 if (self.conn) |c| c.close();
129 self.conn = null;
130}
131
132/// Close read connection (e.g. during bootstrap to avoid WAL reader interference)
133pub fn closeReadConn(self: *LocalDb) void {
134 if (self.read_conn) |c| c.close();
135 self.read_conn = null;
136}
137
138/// Reopen read connection after bootstrap completes
139pub fn reopenReadConn(self: *LocalDb) !void {
140 if (self.read_conn != null) return;
141
142 var path_buf: [256]u8 = undefined;
143 if (self.path.len >= path_buf.len) return error.PathTooLong;
144 @memcpy(path_buf[0..self.path.len], self.path);
145 path_buf[self.path.len] = 0;
146 const path: [*:0]const u8 = path_buf[0..self.path.len :0];
147
148 self.read_conn = zqlite.open(path, zqlite.OpenFlags.ReadOnly) catch |err| {
149 log.err("failed to reopen read conn: {}", .{err});
150 return err;
151 };
152 _ = self.read_conn.?.exec("PRAGMA busy_timeout=1000", .{}) catch {};
153 _ = self.read_conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {};
154 _ = self.read_conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {};
155}
156
157pub fn isReady(self: *LocalDb) bool {
158 return self.is_ready.load(.acquire);
159}
160
161pub fn setReady(self: *LocalDb, ready: bool) void {
162 self.is_ready.store(ready, .release);
163}
164
165fn createSchema(self: *LocalDb) !void {
166 const c = self.conn orelse return error.NotOpen;
167
168 c.exec(
169 \\CREATE TABLE IF NOT EXISTS actors (
170 \\ did TEXT PRIMARY KEY,
171 \\ handle TEXT NOT NULL DEFAULT '',
172 \\ display_name TEXT DEFAULT '',
173 \\ avatar_url TEXT DEFAULT '',
174 \\ hidden INTEGER NOT NULL DEFAULT 0,
175 \\ labels TEXT NOT NULL DEFAULT '[]',
176 \\ created_at TEXT DEFAULT '',
177 \\ associated TEXT DEFAULT '{}',
178 \\ pds TEXT DEFAULT ''
179 \\)
180 , .{}) catch |err| {
181 log.err("failed to create actors table: {}", .{err});
182 return err;
183 };
184
185 // migration: add pds column if missing (existing deployments)
186 c.exec("ALTER TABLE actors ADD COLUMN pds TEXT DEFAULT ''", .{}) catch {};
187
188 c.exec("CREATE INDEX IF NOT EXISTS idx_actors_handle ON actors(handle COLLATE NOCASE)", .{}) catch {};
189
190 // standalone FTS5 (not content-synced — avoids rowid tracking complexity with INSERT OR REPLACE)
191 c.exec(
192 \\CREATE VIRTUAL TABLE IF NOT EXISTS actors_fts USING fts5(
193 \\ did UNINDEXED, handle, display_name,
194 \\ tokenize='unicode61 remove_diacritics 2'
195 \\)
196 , .{}) catch |err| {
197 log.err("failed to create actors_fts: {}", .{err});
198 return err;
199 };
200
201 c.exec(
202 \\CREATE TABLE IF NOT EXISTS sync_meta (
203 \\ key TEXT PRIMARY KEY,
204 \\ value TEXT
205 \\)
206 , .{}) catch |err| {
207 log.err("failed to create sync_meta table: {}", .{err});
208 return err;
209 };
210}
211
212pub const Row = struct {
213 stmt: zqlite.Row,
214
215 pub fn text(self: Row, index: usize) []const u8 {
216 return self.stmt.text(index);
217 }
218
219 pub fn int(self: Row, index: usize) i64 {
220 return self.stmt.int(index);
221 }
222};
223
224pub const Rows = struct {
225 inner: zqlite.Rows,
226
227 pub fn next(self: *Rows) ?Row {
228 if (self.inner.next()) |r| {
229 return .{ .stmt = r };
230 }
231 return null;
232 }
233
234 pub fn deinit(self: *Rows) void {
235 self.inner.deinit();
236 }
237
238 pub fn err(self: *Rows) ?anyerror {
239 return self.inner.err;
240 }
241};
242
243/// SELECT using read connection (never blocked by writes)
244pub fn query(self: *LocalDb, comptime sql: []const u8, args: anytype) !Rows {
245 const c = self.read_conn orelse return error.NotOpen;
246 const rows = c.rows(sql, args) catch |e| {
247 log.err("query failed: {s}", .{@errorName(e)});
248 return e;
249 };
250 return .{ .inner = rows };
251}
252
253/// Execute a statement (INSERT, UPDATE, DELETE) — mutex-protected
254pub fn exec(self: *LocalDb, comptime sql: []const u8, args: anytype) !void {
255 self.mutex.lockUncancelable(io);
256 defer self.mutex.unlock(io);
257
258 const c = self.conn orelse return error.NotOpen;
259 c.exec(sql, args) catch |e| {
260 log.err("exec failed: {s}", .{@errorName(e)});
261 return e;
262 };
263}
264
265/// Get raw write connection for batch operations (caller must hold lock)
266pub fn getConn(self: *LocalDb) ?zqlite.Conn {
267 return self.conn;
268}
269
270pub fn lock(self: *LocalDb) void {
271 self.mutex.lockUncancelable(io);
272}
273
274pub fn unlock(self: *LocalDb) void {
275 self.mutex.unlock(io);
276}
277
278/// Get cached actor count from sync_meta (written during sync, avoids full table scan)
279pub fn countActors(self: *LocalDb) u64 {
280 const c = self.read_conn orelse return 0;
281 const row = c.row("SELECT value FROM sync_meta WHERE key = 'actor_count'", .{}) catch return 0;
282 if (row) |r| {
283 defer r.deinit();
284 const val = r.text(0);
285 return std.fmt.parseInt(u64, val, 10) catch 0;
286 }
287 return 0;
288}