declarative relay deployment on hetzner relay-eval.waow.tech
atproto relay
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

relay-eval: enable WAL mode for concurrent DB access

the web server couldn't start while an eval run held the SQLite lock.
WAL mode allows concurrent readers + one writer. busy_timeout gives
5 seconds of retry before failing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+272
+272
relay-eval/src/store.zig
··· 1 + const std = @import("std"); 2 + const c = @cImport(@cInclude("sqlite3.h")); 3 + 4 + // SQLITE_STATIC: data outlives the statement (we finalize before returning). 5 + // using SQLITE_STATIC avoids the SQLITE_STATIC pointer cast issue in zig 0.15. 6 + const SQLITE_STATIC: c.sqlite3_destructor_type = null; 7 + 8 + const log = std.log.scoped(.store); 9 + 10 + pub const Store = struct { 11 + db: *c.sqlite3, 12 + 13 + pub fn open(path: [*:0]const u8) !Store { 14 + var db: ?*c.sqlite3 = null; 15 + if (c.sqlite3_open(path, &db) != c.SQLITE_OK) { 16 + if (db) |d| { 17 + log.err("sqlite open: {s}", .{c.sqlite3_errmsg(d)}); 18 + _ = c.sqlite3_close(d); 19 + } 20 + return error.SqliteOpen; 21 + } 22 + var store = Store{ .db = db.? }; 23 + // WAL mode allows concurrent readers + one writer (eval + web server) 24 + try store.exec("PRAGMA journal_mode=WAL"); 25 + try store.exec("PRAGMA busy_timeout=5000"); 26 + try store.migrate(); 27 + return store; 28 + } 29 + 30 + pub fn close(self: *Store) void { 31 + _ = c.sqlite3_close(self.db); 32 + } 33 + 34 + fn migrate(self: *Store) !void { 35 + const sql = 36 + \\CREATE TABLE IF NOT EXISTS runs ( 37 + \\ id INTEGER PRIMARY KEY, 38 + \\ timestamp TEXT NOT NULL, 39 + \\ window_seconds INTEGER NOT NULL, 40 + \\ union_dids INTEGER NOT NULL DEFAULT 0 41 + \\); 42 + \\CREATE TABLE IF NOT EXISTS relay_stats ( 43 + \\ id INTEGER PRIMARY KEY, 44 + \\ run_id INTEGER REFERENCES runs(id), 45 + \\ host TEXT NOT NULL, 46 + \\ events INTEGER NOT NULL, 47 + \\ unique_dids INTEGER NOT NULL, 48 + \\ connected BOOLEAN NOT NULL 49 + \\); 50 + \\CREATE TABLE IF NOT EXISTS diffs ( 51 + \\ id INTEGER PRIMARY KEY, 52 + \\ run_id INTEGER REFERENCES runs(id), 53 + \\ relay TEXT NOT NULL, 54 + \\ did TEXT NOT NULL, 55 + \\ classification TEXT NOT NULL 56 + \\); 57 + ; 58 + try self.exec(sql); 59 + } 60 + 61 + pub const RunStats = struct { 62 + host: []const u8, 63 + events: u64, 64 + unique_dids: u32, 65 + connected: bool, 66 + }; 67 + 68 + pub const Diff = struct { 69 + relay: []const u8, 70 + did: []const u8, 71 + classification: []const u8, 72 + }; 73 + 74 + pub fn insertRun(self: *Store, timestamp: []const u8, window_seconds: u32, union_dids: u32) !i64 { 75 + const sql = "INSERT INTO runs (timestamp, window_seconds, union_dids) VALUES (?, ?, ?)"; 76 + var stmt: ?*c.sqlite3_stmt = null; 77 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 78 + return self.sqlError(); 79 + } 80 + defer _ = c.sqlite3_finalize(stmt); 81 + 82 + _ = c.sqlite3_bind_text(stmt, 1, timestamp.ptr, @intCast(timestamp.len), SQLITE_STATIC); 83 + _ = c.sqlite3_bind_int(stmt, 2, @intCast(window_seconds)); 84 + _ = c.sqlite3_bind_int(stmt, 3, @intCast(union_dids)); 85 + 86 + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) { 87 + return self.sqlError(); 88 + } 89 + 90 + return c.sqlite3_last_insert_rowid(self.db); 91 + } 92 + 93 + pub fn insertRelayStat(self: *Store, run_id: i64, stat: RunStats) !void { 94 + const sql = "INSERT INTO relay_stats (run_id, host, events, unique_dids, connected) VALUES (?, ?, ?, ?, ?)"; 95 + var stmt: ?*c.sqlite3_stmt = null; 96 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 97 + return self.sqlError(); 98 + } 99 + defer _ = c.sqlite3_finalize(stmt); 100 + 101 + _ = c.sqlite3_bind_int64(stmt, 1, run_id); 102 + _ = c.sqlite3_bind_text(stmt, 2, stat.host.ptr, @intCast(stat.host.len), SQLITE_STATIC); 103 + _ = c.sqlite3_bind_int64(stmt, 3, @intCast(stat.events)); 104 + _ = c.sqlite3_bind_int(stmt, 4, @intCast(stat.unique_dids)); 105 + _ = c.sqlite3_bind_int(stmt, 5, @intFromBool(stat.connected)); 106 + 107 + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) { 108 + return self.sqlError(); 109 + } 110 + } 111 + 112 + pub fn insertDiff(self: *Store, run_id: i64, diff: Diff) !void { 113 + const sql = "INSERT INTO diffs (run_id, relay, did, classification) VALUES (?, ?, ?, ?)"; 114 + var stmt: ?*c.sqlite3_stmt = null; 115 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 116 + return self.sqlError(); 117 + } 118 + defer _ = c.sqlite3_finalize(stmt); 119 + 120 + _ = c.sqlite3_bind_int64(stmt, 1, run_id); 121 + _ = c.sqlite3_bind_text(stmt, 2, diff.relay.ptr, @intCast(diff.relay.len), SQLITE_STATIC); 122 + _ = c.sqlite3_bind_text(stmt, 3, diff.did.ptr, @intCast(diff.did.len), SQLITE_STATIC); 123 + _ = c.sqlite3_bind_text(stmt, 4, diff.classification.ptr, @intCast(diff.classification.len), SQLITE_STATIC); 124 + 125 + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) { 126 + return self.sqlError(); 127 + } 128 + } 129 + 130 + // --- read queries for dashboard --- 131 + 132 + pub const RunSummary = struct { 133 + id: i64, 134 + timestamp: []const u8, 135 + window_seconds: i32, 136 + union_dids: i32, 137 + }; 138 + 139 + /// get recent runs (caller owns returned memory) 140 + pub fn getRecentRuns(self: *Store, allocator: std.mem.Allocator, limit: u32) ![]RunSummary { 141 + const sql = "SELECT id, timestamp, window_seconds, union_dids FROM runs ORDER BY id DESC LIMIT ?"; 142 + var stmt: ?*c.sqlite3_stmt = null; 143 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 144 + return self.sqlError(); 145 + } 146 + defer _ = c.sqlite3_finalize(stmt); 147 + 148 + _ = c.sqlite3_bind_int(stmt, 1, @intCast(limit)); 149 + 150 + var runs: std.ArrayList(RunSummary) = .empty; 151 + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 152 + const ts_ptr = c.sqlite3_column_text(stmt, 1); 153 + const ts_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 1)); 154 + try runs.append(allocator, .{ 155 + .id = c.sqlite3_column_int64(stmt, 0), 156 + .timestamp = try allocator.dupe(u8, ts_ptr[0..ts_len]), 157 + .window_seconds = c.sqlite3_column_int(stmt, 2), 158 + .union_dids = c.sqlite3_column_int(stmt, 3), 159 + }); 160 + } 161 + return runs.toOwnedSlice(allocator); 162 + } 163 + 164 + /// get stats for a specific run 165 + pub fn getRunStats(self: *Store, allocator: std.mem.Allocator, run_id: i64) ![]RunStats { 166 + const sql = "SELECT host, events, unique_dids, connected FROM relay_stats WHERE run_id = ? ORDER BY unique_dids DESC"; 167 + var stmt: ?*c.sqlite3_stmt = null; 168 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 169 + return self.sqlError(); 170 + } 171 + defer _ = c.sqlite3_finalize(stmt); 172 + 173 + _ = c.sqlite3_bind_int64(stmt, 1, run_id); 174 + 175 + var stats: std.ArrayList(RunStats) = .empty; 176 + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 177 + const host_ptr = c.sqlite3_column_text(stmt, 0); 178 + const host_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 0)); 179 + try stats.append(allocator, .{ 180 + .host = try allocator.dupe(u8, host_ptr[0..host_len]), 181 + .events = @intCast(c.sqlite3_column_int64(stmt, 1)), 182 + .unique_dids = @intCast(c.sqlite3_column_int(stmt, 2)), 183 + .connected = c.sqlite3_column_int(stmt, 3) != 0, 184 + }); 185 + } 186 + return stats.toOwnedSlice(allocator); 187 + } 188 + 189 + /// get diffs for a specific run 190 + pub fn getRunDiffs(self: *Store, allocator: std.mem.Allocator, run_id: i64) ![]Diff { 191 + const sql = "SELECT relay, did, classification FROM diffs WHERE run_id = ?"; 192 + var stmt: ?*c.sqlite3_stmt = null; 193 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 194 + return self.sqlError(); 195 + } 196 + defer _ = c.sqlite3_finalize(stmt); 197 + 198 + _ = c.sqlite3_bind_int64(stmt, 1, run_id); 199 + 200 + var diffs: std.ArrayList(Diff) = .empty; 201 + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 202 + try diffs.append(allocator, .{ 203 + .relay = try self.colText(allocator, stmt, 0), 204 + .did = try self.colText(allocator, stmt, 1), 205 + .classification = try self.colText(allocator, stmt, 2), 206 + }); 207 + } 208 + return diffs.toOwnedSlice(allocator); 209 + } 210 + 211 + /// get the most recent run ID 212 + pub fn getLatestRunId(self: *Store) !?i64 { 213 + const sql = "SELECT id FROM runs ORDER BY id DESC LIMIT 1"; 214 + var stmt: ?*c.sqlite3_stmt = null; 215 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 216 + return self.sqlError(); 217 + } 218 + defer _ = c.sqlite3_finalize(stmt); 219 + 220 + if (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 221 + return c.sqlite3_column_int64(stmt, 0); 222 + } 223 + return null; 224 + } 225 + 226 + /// get run metadata 227 + pub fn getRun(self: *Store, allocator: std.mem.Allocator, run_id: i64) !?RunSummary { 228 + const sql = "SELECT id, timestamp, window_seconds, union_dids FROM runs WHERE id = ?"; 229 + var stmt: ?*c.sqlite3_stmt = null; 230 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 231 + return self.sqlError(); 232 + } 233 + defer _ = c.sqlite3_finalize(stmt); 234 + 235 + _ = c.sqlite3_bind_int64(stmt, 1, run_id); 236 + 237 + if (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 238 + const ts_ptr = c.sqlite3_column_text(stmt, 1); 239 + const ts_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 1)); 240 + return .{ 241 + .id = c.sqlite3_column_int64(stmt, 0), 242 + .timestamp = try allocator.dupe(u8, ts_ptr[0..ts_len]), 243 + .window_seconds = c.sqlite3_column_int(stmt, 2), 244 + .union_dids = c.sqlite3_column_int(stmt, 3), 245 + }; 246 + } 247 + return null; 248 + } 249 + 250 + fn colText(self: *Store, allocator: std.mem.Allocator, stmt: ?*c.sqlite3_stmt, col: c_int) ![]const u8 { 251 + _ = self; 252 + const ptr = c.sqlite3_column_text(stmt, col); 253 + const len: usize = @intCast(c.sqlite3_column_bytes(stmt, col)); 254 + return try allocator.dupe(u8, ptr[0..len]); 255 + } 256 + 257 + fn exec(self: *Store, sql: [*:0]const u8) !void { 258 + var err_msg: [*c]u8 = null; 259 + if (c.sqlite3_exec(self.db, sql, null, null, &err_msg) != c.SQLITE_OK) { 260 + if (err_msg) |msg| { 261 + log.err("sqlite exec: {s}", .{std.mem.span(msg)}); 262 + c.sqlite3_free(msg); 263 + } 264 + return error.SqliteExec; 265 + } 266 + } 267 + 268 + fn sqlError(self: *Store) error{SqliteError} { 269 + log.err("sqlite: {s}", .{c.sqlite3_errmsg(self.db)}); 270 + return error.SqliteError; 271 + } 272 + };