GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 288 lines 9.8 kB view raw
1//! Local SQLite read replica using zqlite 2//! Provides fast FTS5 queries while Turso remains source of truth 3 4const std = @import("std"); 5const io = std.Options.debug_io; 6const zqlite = @import("zqlite"); 7const Allocator = std.mem.Allocator; 8 9const log = std.log.scoped(.local_db); 10 11const LocalDb = @This(); 12 13conn: ?zqlite.Conn = null, 14read_conn: ?zqlite.Conn = null, // separate read connection — never blocked by writes in WAL mode 15allocator: Allocator, 16is_ready: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 17mutex: std.Io.Mutex = std.Io.Mutex.init, // protects write conn only 18path: []const u8 = "", 19 20/// column list for full actor row queries (sync + search) 21pub const actor_cols = "did, handle, display_name, avatar_url, hidden, labels, created_at, associated, pds"; 22 23/// named column indexes matching actor_cols order 24pub const Col = struct { 25 pub const did: usize = 0; 26 pub const handle: usize = 1; 27 pub const display_name: usize = 2; 28 pub const avatar_url: usize = 3; 29 pub const hidden: usize = 4; 30 pub const labels: usize = 5; 31 pub const created_at: usize = 6; 32 pub const associated: usize = 7; 33 pub const pds: usize = 8; 34}; 35 36fn cGetenv(name: [*:0]const u8) ?[]const u8 { 37 if (std.c.getenv(name)) |p| return std.mem.span(p); 38 return null; 39} 40 41pub fn init(allocator: Allocator) LocalDb { 42 return .{ .allocator = allocator }; 43} 44 45/// Clean up leftover staging tables from interrupted bootstrap. 46/// Called during open so sync starts clean. 47fn cleanupStagingTables(self: *LocalDb) void { 48 const c = self.conn orelse return; 49 c.exec("DROP TABLE IF EXISTS actors_fts_stage", .{}) catch {}; 50 c.exec("DROP TABLE IF EXISTS actors_stage", .{}) catch {}; 51} 52 53/// Verify FTS5 table is functional. If broken (e.g. from a failed RENAME), 54/// drop it and recreate empty — sync will repopulate it. 55fn repairFtsIfBroken(self: *LocalDb) void { 56 const c = self.conn orelse return; 57 // probe: try a simple MATCH query — if shadow tables are broken this errors 58 const row = c.row("SELECT did FROM actors_fts WHERE actors_fts MATCH '\"test\"*' LIMIT 1", .{}) catch { 59 log.warn("FTS5 table broken, dropping and recreating", .{}); 60 c.exec("DROP TABLE IF EXISTS actors_fts", .{}) catch {}; 61 c.exec( 62 \\CREATE VIRTUAL TABLE actors_fts USING fts5( 63 \\ did UNINDEXED, handle, display_name, 64 \\ tokenize='unicode61 remove_diacritics 2' 65 \\) 66 , .{}) catch {}; 67 // clear sync state so fullSync rebuilds FTS 68 c.exec("DELETE FROM sync_meta WHERE key = 'sync_complete'", .{}) catch {}; 69 c.exec("DELETE FROM sync_meta WHERE key = 'last_sync'", .{}) catch {}; 70 log.info("FTS5 repaired, sync will rebuild on next run", .{}); 71 return; 72 }; 73 if (row) |r| r.deinit(); 74} 75 76pub fn open(self: *LocalDb) !void { 77 const path_env = cGetenv("LOCAL_DB_PATH") orelse "/data/local.db"; 78 self.path = path_env; 79 80 // ensure SQLite temp directory exists on the persistent volume 81 const tmp_dir = cGetenv("SQLITE_TMPDIR") orelse "/data/tmp"; 82 std.Io.Dir.createDirPath(.cwd(), io, tmp_dir) catch |err| { 83 log.warn("failed to create temp dir {s}: {}", .{ tmp_dir, err }); 84 }; 85 86 try self.openDb(path_env); 87} 88 89fn openDb(self: *LocalDb, path_env: []const u8) !void { 90 var path_buf: [256]u8 = undefined; 91 if (path_env.len >= path_buf.len) return error.PathTooLong; 92 @memcpy(path_buf[0..path_env.len], path_env); 93 path_buf[path_env.len] = 0; 94 const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 95 96 log.info("opening {s}", .{path_env}); 97 98 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 99 self.conn = zqlite.open(path, flags) catch |err| { 100 log.err("failed to open write conn: {}", .{err}); 101 return err; 102 }; 103 104 _ = self.conn.?.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 105 _ = self.conn.?.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 106 _ = self.conn.?.exec("PRAGMA synchronous=NORMAL", .{}) catch {}; // safe with WAL 107 _ = self.conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; // 20MB page cache 108 _ = self.conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; // 256MB 109 110 // separate read connection — WAL allows concurrent reads + writes 111 self.read_conn = zqlite.open(path, zqlite.OpenFlags.ReadOnly) catch |err| { 112 log.err("failed to open read conn: {}", .{err}); 113 return err; 114 }; 115 _ = self.read_conn.?.exec("PRAGMA busy_timeout=1000", .{}) catch {}; 116 _ = self.read_conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; // 256MB 117 _ = self.read_conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; // 20MB 118 119 self.cleanupStagingTables(); 120 try self.createSchema(); 121 self.repairFtsIfBroken(); 122 log.info("initialized", .{}); 123} 124 125pub fn deinit(self: *LocalDb) void { 126 if (self.read_conn) |c| c.close(); 127 self.read_conn = null; 128 if (self.conn) |c| c.close(); 129 self.conn = null; 130} 131 132/// Close read connection (e.g. during bootstrap to avoid WAL reader interference) 133pub fn closeReadConn(self: *LocalDb) void { 134 if (self.read_conn) |c| c.close(); 135 self.read_conn = null; 136} 137 138/// Reopen read connection after bootstrap completes 139pub fn reopenReadConn(self: *LocalDb) !void { 140 if (self.read_conn != null) return; 141 142 var path_buf: [256]u8 = undefined; 143 if (self.path.len >= path_buf.len) return error.PathTooLong; 144 @memcpy(path_buf[0..self.path.len], self.path); 145 path_buf[self.path.len] = 0; 146 const path: [*:0]const u8 = path_buf[0..self.path.len :0]; 147 148 self.read_conn = zqlite.open(path, zqlite.OpenFlags.ReadOnly) catch |err| { 149 log.err("failed to reopen read conn: {}", .{err}); 150 return err; 151 }; 152 _ = self.read_conn.?.exec("PRAGMA busy_timeout=1000", .{}) catch {}; 153 _ = self.read_conn.?.exec("PRAGMA mmap_size=268435456", .{}) catch {}; 154 _ = self.read_conn.?.exec("PRAGMA cache_size=-20000", .{}) catch {}; 155} 156 157pub fn isReady(self: *LocalDb) bool { 158 return self.is_ready.load(.acquire); 159} 160 161pub fn setReady(self: *LocalDb, ready: bool) void { 162 self.is_ready.store(ready, .release); 163} 164 165fn createSchema(self: *LocalDb) !void { 166 const c = self.conn orelse return error.NotOpen; 167 168 c.exec( 169 \\CREATE TABLE IF NOT EXISTS actors ( 170 \\ did TEXT PRIMARY KEY, 171 \\ handle TEXT NOT NULL DEFAULT '', 172 \\ display_name TEXT DEFAULT '', 173 \\ avatar_url TEXT DEFAULT '', 174 \\ hidden INTEGER NOT NULL DEFAULT 0, 175 \\ labels TEXT NOT NULL DEFAULT '[]', 176 \\ created_at TEXT DEFAULT '', 177 \\ associated TEXT DEFAULT '{}', 178 \\ pds TEXT DEFAULT '' 179 \\) 180 , .{}) catch |err| { 181 log.err("failed to create actors table: {}", .{err}); 182 return err; 183 }; 184 185 // migration: add pds column if missing (existing deployments) 186 c.exec("ALTER TABLE actors ADD COLUMN pds TEXT DEFAULT ''", .{}) catch {}; 187 188 c.exec("CREATE INDEX IF NOT EXISTS idx_actors_handle ON actors(handle COLLATE NOCASE)", .{}) catch {}; 189 190 // standalone FTS5 (not content-synced — avoids rowid tracking complexity with INSERT OR REPLACE) 191 c.exec( 192 \\CREATE VIRTUAL TABLE IF NOT EXISTS actors_fts USING fts5( 193 \\ did UNINDEXED, handle, display_name, 194 \\ tokenize='unicode61 remove_diacritics 2' 195 \\) 196 , .{}) catch |err| { 197 log.err("failed to create actors_fts: {}", .{err}); 198 return err; 199 }; 200 201 c.exec( 202 \\CREATE TABLE IF NOT EXISTS sync_meta ( 203 \\ key TEXT PRIMARY KEY, 204 \\ value TEXT 205 \\) 206 , .{}) catch |err| { 207 log.err("failed to create sync_meta table: {}", .{err}); 208 return err; 209 }; 210} 211 212pub const Row = struct { 213 stmt: zqlite.Row, 214 215 pub fn text(self: Row, index: usize) []const u8 { 216 return self.stmt.text(index); 217 } 218 219 pub fn int(self: Row, index: usize) i64 { 220 return self.stmt.int(index); 221 } 222}; 223 224pub const Rows = struct { 225 inner: zqlite.Rows, 226 227 pub fn next(self: *Rows) ?Row { 228 if (self.inner.next()) |r| { 229 return .{ .stmt = r }; 230 } 231 return null; 232 } 233 234 pub fn deinit(self: *Rows) void { 235 self.inner.deinit(); 236 } 237 238 pub fn err(self: *Rows) ?anyerror { 239 return self.inner.err; 240 } 241}; 242 243/// SELECT using read connection (never blocked by writes) 244pub fn query(self: *LocalDb, comptime sql: []const u8, args: anytype) !Rows { 245 const c = self.read_conn orelse return error.NotOpen; 246 const rows = c.rows(sql, args) catch |e| { 247 log.err("query failed: {s}", .{@errorName(e)}); 248 return e; 249 }; 250 return .{ .inner = rows }; 251} 252 253/// Execute a statement (INSERT, UPDATE, DELETE) — mutex-protected 254pub fn exec(self: *LocalDb, comptime sql: []const u8, args: anytype) !void { 255 self.mutex.lockUncancelable(io); 256 defer self.mutex.unlock(io); 257 258 const c = self.conn orelse return error.NotOpen; 259 c.exec(sql, args) catch |e| { 260 log.err("exec failed: {s}", .{@errorName(e)}); 261 return e; 262 }; 263} 264 265/// Get raw write connection for batch operations (caller must hold lock) 266pub fn getConn(self: *LocalDb) ?zqlite.Conn { 267 return self.conn; 268} 269 270pub fn lock(self: *LocalDb) void { 271 self.mutex.lockUncancelable(io); 272} 273 274pub fn unlock(self: *LocalDb) void { 275 self.mutex.unlock(io); 276} 277 278/// Get cached actor count from sync_meta (written during sync, avoids full table scan) 279pub fn countActors(self: *LocalDb) u64 { 280 const c = self.read_conn orelse return 0; 281 const row = c.row("SELECT value FROM sync_meta WHERE key = 'actor_count'", .{}) catch return 0; 282 if (row) |r| { 283 defer r.deinit(); 284 const val = r.text(0); 285 return std.fmt.parseInt(u64, val, 10) catch 0; 286 } 287 return 0; 288}