search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: remove compat.zig — use native std.Io throughout

Thread io: std.Io through all modules instead of global compat bridge.
Replaces compat.Mutex/Condition with Io.Mutex/Io.Condition, compat.sleep*
with io.sleep(), compat.timestamp/microTimestamp with Io.Timestamp, and
inlines std.c.getenv at each call site.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+225 -265
-98
backend/src/compat.zig
··· 1 - //! Zig 0.16 compatibility helpers. 2 - //! Replaces removed APIs: posix.getenv, std.time.microTimestamp, 3 - //! std.time.timestamp, std.Thread.sleep, std.Thread.Mutex/Condition. 4 - //! Also provides global Io access for http.Client and other networking. 5 - 6 - const std = @import("std"); 7 - 8 - // --- global Io (set once in main, used by http.Client consumers) --- 9 - 10 - var global_io: ?std.Io = null; 11 - 12 - pub fn initIo(io: std.Io) void { 13 - global_io = io; 14 - } 15 - 16 - pub fn getIo() std.Io { 17 - return global_io.?; 18 - } 19 - 20 - // --- environment variables --- 21 - 22 - pub fn getenv(key: [*:0]const u8) ?[]const u8 { 23 - return if (std.c.getenv(key)) |p| std.mem.span(p) else null; 24 - } 25 - 26 - // --- timestamps --- 27 - 28 - pub fn microTimestamp() i64 { 29 - var ts: std.c.timespec = undefined; 30 - _ = std.c.clock_gettime(.REALTIME, &ts); 31 - return @as(i64, ts.sec) * 1_000_000 + @divTrunc(@as(i64, ts.nsec), 1_000); 32 - } 33 - 34 - pub fn timestamp() i64 { 35 - var ts: std.c.timespec = undefined; 36 - _ = std.c.clock_gettime(.REALTIME, &ts); 37 - return @as(i64, ts.sec); 38 - } 39 - 40 - // --- sleep --- 41 - 42 - pub fn sleep(ns: u64) void { 43 - var req: std.c.timespec = .{ 44 - .sec = @intCast(@divFloor(ns, std.time.ns_per_s)), 45 - .nsec = @intCast(@mod(ns, std.time.ns_per_s)), 46 - }; 47 - while (true) { 48 - var rem: std.c.timespec = undefined; 49 - const rc = std.c.nanosleep(&req, &rem); 50 - if (rc == 0) return; 51 - // EINTR: interrupted, retry with remaining time 52 - req = rem; 53 - } 54 - } 55 - 56 - pub fn sleepSecs(secs: u64) void { 57 - sleep(secs * std.time.ns_per_s); 58 - } 59 - 60 - pub fn sleepMs(ms: u64) void { 61 - sleep(ms * std.time.ns_per_ms); 62 - } 63 - 64 - // --- mutex (pthread-based, works without Io) --- 65 - 66 - pub const Mutex = struct { 67 - inner: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, 68 - 69 - pub fn lock(self: *Mutex) void { 70 - _ = std.c.pthread_mutex_lock(&self.inner); 71 - } 72 - 73 - pub fn unlock(self: *Mutex) void { 74 - _ = std.c.pthread_mutex_unlock(&self.inner); 75 - } 76 - 77 - pub fn tryLock(self: *Mutex) bool { 78 - return std.c.pthread_mutex_trylock(&self.inner) == .SUCCESS; 79 - } 80 - }; 81 - 82 - // --- condition variable --- 83 - 84 - pub const Condition = struct { 85 - inner: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, 86 - 87 - pub fn wait(self: *Condition, mutex: *Mutex) void { 88 - _ = std.c.pthread_cond_wait(&self.inner, &mutex.inner); 89 - } 90 - 91 - pub fn signal(self: *Condition) void { 92 - _ = std.c.pthread_cond_signal(&self.inner); 93 - } 94 - 95 - pub fn broadcast(self: *Condition) void { 96 - _ = std.c.pthread_cond_broadcast(&self.inner); 97 - } 98 - };
+11 -11
backend/src/db.zig
··· 1 1 const std = @import("std"); 2 2 const Io = std.Io; 3 - const compat = @import("compat.zig"); 4 3 5 4 const schema = @import("db/schema.zig"); 6 5 const result = @import("db/result.zig"); ··· 37 36 } 38 37 39 38 /// Initialize local SQLite replica (slow, call in background thread) 40 - pub fn initLocalDb() void { 41 - initLocal() catch |err| { 39 + pub fn initLocalDb(io: Io) void { 40 + initLocal(io) catch |err| { 42 41 std.debug.print("local db init failed (will use turso only): {}\n", .{err}); 43 42 }; 44 43 } 45 44 46 - fn initLocal() !void { 45 + fn initLocal(io: Io) !void { 47 46 // check if local db is disabled 48 - if (compat.getenv("LOCAL_DB_ENABLED")) |val| { 47 + if (std.c.getenv("LOCAL_DB_ENABLED")) |p| { 48 + const val = std.mem.span(p); 49 49 if (std.mem.eql(u8, val, "false") or std.mem.eql(u8, val, "0")) { 50 50 std.debug.print("local db disabled via LOCAL_DB_ENABLED\n", .{}); 51 51 return; 52 52 } 53 53 } 54 54 55 - local_db = LocalDb.init(allocator); 55 + local_db = LocalDb.init(allocator, io); 56 56 try local_db.?.open(); 57 57 } 58 58 ··· 80 80 } 81 81 82 82 /// Start background sync thread (call from main after db.init) 83 - pub fn startSync() void { 83 + pub fn startSync(io: Io) void { 84 84 const c = if (sync_client) |*sc| sc else { 85 85 std.debug.print("sync: no sync client, skipping\n", .{}); 86 86 return; ··· 90 90 return; 91 91 }; 92 92 93 - const thread = std.Thread.spawn(.{}, syncLoop, .{ c, local }) catch |err| { 93 + const thread = std.Thread.spawn(.{}, syncLoop, .{ c, local, io }) catch |err| { 94 94 std.debug.print("sync: failed to start thread: {}\n", .{err}); 95 95 return; 96 96 }; ··· 98 98 std.debug.print("sync: background thread started\n", .{}); 99 99 } 100 100 101 - fn syncLoop(turso: *Client, local: *LocalDb) void { 101 + fn syncLoop(turso: *Client, local: *LocalDb, io: Io) void { 102 102 // incremental sync on startup — gets new docs + cleans tombstoned deletions 103 103 // (falls back to full sync automatically if no last_sync exists, i.e., first boot) 104 104 sync.incrementalSync(turso, local) catch |err| { ··· 107 107 108 108 // get sync interval from env (default 5 minutes) 109 109 const interval_secs: u64 = blk: { 110 - const env_val = compat.getenv("SYNC_INTERVAL_SECS") orelse "300"; 110 + const env_val = if (std.c.getenv("SYNC_INTERVAL_SECS")) |p| std.mem.span(p) else "300"; 111 111 break :blk std.fmt.parseInt(u64, env_val, 10) catch 300; 112 112 }; 113 113 ··· 115 115 116 116 // periodic incremental sync 117 117 while (true) { 118 - compat.sleepSecs(interval_secs); 118 + io.sleep(Io.Duration.fromSeconds(@intCast(interval_secs)), .awake) catch {}; 119 119 sync.incrementalSync(turso, local) catch |err| { 120 120 std.debug.print("sync: incremental sync failed: {}\n", .{err}); 121 121 };
+10 -9
backend/src/db/Client.zig
··· 8 8 const mem = std.mem; 9 9 const Allocator = mem.Allocator; 10 10 const logfire = @import("logfire"); 11 - const compat = @import("../compat.zig"); 12 11 13 12 const result = @import("result.zig"); 14 13 pub const Result = result.Result; ··· 29 28 allocator: Allocator, 30 29 url: []const u8, 31 30 token: []const u8, 32 - mutex: compat.Mutex = .{}, 31 + mutex: Io.Mutex = Io.Mutex.init, 32 + io: Io, 33 33 http_client: http.Client, 34 34 35 35 pub fn init(allocator_param: Allocator, io: Io) !Client { 36 - const url = compat.getenv("TURSO_URL") orelse { 36 + const url = if (std.c.getenv("TURSO_URL")) |p| std.mem.span(p) else { 37 37 std.debug.print("TURSO_URL not set\n", .{}); 38 38 return error.MissingEnv; 39 39 }; 40 - const token = compat.getenv("TURSO_TOKEN") orelse { 40 + const token = if (std.c.getenv("TURSO_TOKEN")) |p| std.mem.span(p) else { 41 41 std.debug.print("TURSO_TOKEN not set\n", .{}); 42 42 return error.MissingEnv; 43 43 }; ··· 54 54 .allocator = allocator_param, 55 55 .url = host, 56 56 .token = token, 57 + .io = io, 57 58 .http_client = .{ .allocator = allocator_param, .io = io }, 58 59 }; 59 60 } ··· 125 126 }); 126 127 defer span.end(); 127 128 128 - self.mutex.lock(); 129 - defer self.mutex.unlock(); 129 + self.mutex.lockUncancelable(self.io); 130 + defer self.mutex.unlock(self.io); 130 131 131 132 var url_buf: [URL_BUF_SIZE]u8 = undefined; 132 133 const url = std.fmt.bufPrint(&url_buf, "https://{s}/v2/pipeline", .{self.url}) catch ··· 179 180 }); 180 181 defer span.end(); 181 182 182 - self.mutex.lock(); 183 - defer self.mutex.unlock(); 183 + self.mutex.lockUncancelable(self.io); 184 + defer self.mutex.unlock(self.io); 184 185 185 186 var url_buf: [URL_BUF_SIZE]u8 = undefined; 186 187 const url = std.fmt.bufPrint(&url_buf, "https://{s}/v2/pipeline", .{self.url}) catch ··· 335 336 336 337 fn keepaliveLoop(self: *Client) void { 337 338 while (true) { 338 - compat.sleep(KEEPALIVE_INTERVAL_NS); 339 + self.io.sleep(Io.Duration.fromNanoseconds(KEEPALIVE_INTERVAL_NS), .awake) catch return; 339 340 _ = self.exec("SELECT 1", .{}) catch |err| { 340 341 logfire.debug("turso: keepalive ping failed: {}", .{err}); 341 342 };
+10 -9
backend/src/db/LocalDb.zig
··· 2 2 //! Provides fast FTS5 queries while Turso remains source of truth 3 3 4 4 const std = @import("std"); 5 + const Io = std.Io; 5 6 const zqlite = @import("zqlite"); 6 7 const Allocator = std.mem.Allocator; 7 8 const logfire = @import("logfire"); 8 - const compat = @import("../compat.zig"); 9 9 10 10 const LocalDb = @This(); 11 11 12 12 conn: ?zqlite.Conn = null, 13 13 read_conn: ?zqlite.Conn = null, // separate read connection — never blocked by writes in WAL mode 14 14 allocator: Allocator, 15 + io: Io, 15 16 is_ready: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 16 17 needs_resync: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 17 - mutex: compat.Mutex = .{}, // protects write conn only 18 + mutex: Io.Mutex = Io.Mutex.init, // protects write conn only 18 19 path: []const u8 = "", 19 20 consecutive_errors: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), 20 21 21 - pub fn init(allocator: Allocator) LocalDb { 22 - return .{ .allocator = allocator }; 22 + pub fn init(allocator: Allocator, io: Io) LocalDb { 23 + return .{ .allocator = allocator, .io = io }; 23 24 } 24 25 25 26 /// Check database integrity and return false if corrupt ··· 62 63 } 63 64 64 65 pub fn open(self: *LocalDb) !void { 65 - const path_env = compat.getenv("LOCAL_DB_PATH") orelse "/data/local.db"; 66 + const path_env = if (std.c.getenv("LOCAL_DB_PATH")) |p| std.mem.span(p) else "/data/local.db"; 66 67 self.path = path_env; 67 68 68 69 try self.openDb(path_env, false); ··· 306 307 307 308 /// Execute a statement (INSERT, UPDATE, DELETE) 308 309 pub fn exec(self: *LocalDb, comptime sql: []const u8, args: anytype) !void { 309 - self.mutex.lock(); 310 - defer self.mutex.unlock(); 310 + self.mutex.lockUncancelable(self.io); 311 + defer self.mutex.unlock(self.io); 311 312 312 313 const c = self.conn orelse return error.NotOpen; 313 314 c.exec(sql, args) catch |e| { ··· 323 324 324 325 /// Lock for batch operations 325 326 pub fn lock(self: *LocalDb) void { 326 - self.mutex.lock(); 327 + self.mutex.lockUncancelable(self.io); 327 328 } 328 329 329 330 /// Unlock after batch operations 330 331 pub fn unlock(self: *LocalDb) void { 331 - self.mutex.unlock(); 332 + self.mutex.unlock(self.io); 332 333 } 333 334 334 335 fn truncateSql(sql: []const u8) []const u8 {
+3 -2
backend/src/db/schema.zig
··· 1 1 const std = @import("std"); 2 + const Io = std.Io; 2 3 const Client = @import("Client.zig"); 3 - const compat = @import("../compat.zig"); 4 4 5 5 /// Initialize database schema and run migrations 6 6 pub fn init(client: *Client) !void { ··· 78 78 79 79 // set service_started_at if not already set (first run ever) 80 80 var ts_buf: [20]u8 = undefined; 81 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 81 + const now_s: i64 = @intCast(@divFloor(Io.Timestamp.now(client.io, .real).nanoseconds, std.time.ns_per_s)); 82 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{now_s}) catch "0"; 82 83 client.exec( 83 84 "UPDATE stats SET service_started_at = ? WHERE id = 1 AND service_started_at IS NULL", 84 85 &.{ts_str},
+5 -3
backend/src/db/sync.zig
··· 2 2 //! Full sync on startup, incremental sync periodically 3 3 4 4 const std = @import("std"); 5 + const Io = std.Io; 5 6 const zqlite = @import("zqlite"); 6 7 const Allocator = std.mem.Allocator; 7 8 const Client = @import("Client.zig"); 8 9 const LocalDb = @import("LocalDb.zig"); 9 - const compat = @import("../compat.zig"); 10 10 11 11 const BATCH_SIZE = 500; 12 12 ··· 191 191 local.lock(); 192 192 defer local.unlock(); 193 193 var ts_buf: [20]u8 = undefined; 194 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 194 + const now_s: i64 = @intCast(@divFloor(Io.Timestamp.now(turso.io, .real).nanoseconds, std.time.ns_per_s)); 195 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{now_s}) catch "0"; 195 196 conn.exec( 196 197 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 197 198 .{ts_str}, ··· 296 297 297 298 // update sync time 298 299 var ts_buf: [20]u8 = undefined; 299 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 300 + const inc_now_s: i64 = @intCast(@divFloor(Io.Timestamp.now(turso.io, .real).nanoseconds, std.time.ns_per_s)); 301 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{inc_now_s}) catch "0"; 300 302 conn.exec( 301 303 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 302 304 .{ts_str},
+14 -14
backend/src/ingest/embedder.zig
··· 8 8 const json = std.json; 9 9 const mem = std.mem; 10 10 const Allocator = mem.Allocator; 11 + const Io = std.Io; 11 12 const logfire = @import("logfire"); 12 13 const zql = @import("zql"); 13 14 const db = @import("../db.zig"); 14 15 const tpuf = @import("../tpuf.zig"); 15 - const compat = @import("../compat.zig"); 16 16 17 17 // voyage-4-lite limits 18 18 const MAX_BATCH_SIZE = 20; // conservative batch size for reliability ··· 33 33 ); 34 34 35 35 /// Start the embedder background worker 36 - pub fn start(allocator: Allocator) void { 37 - const api_key = compat.getenv("VOYAGE_API_KEY") orelse { 36 + pub fn start(allocator: Allocator, io: Io) void { 37 + const api_key = if (std.c.getenv("VOYAGE_API_KEY")) |p| std.mem.span(p) else { 38 38 logfire.info("embedder: VOYAGE_API_KEY not set, embeddings disabled", .{}); 39 39 return; 40 40 }; 41 41 42 - const thread = std.Thread.spawn(.{}, worker, .{ allocator, api_key }) catch |err| { 42 + const thread = std.Thread.spawn(.{}, worker, .{ allocator, api_key, io }) catch |err| { 43 43 logfire.err("embedder: failed to start thread: {}", .{err}); 44 44 return; 45 45 }; ··· 47 47 logfire.info("embedder: background worker started", .{}); 48 48 } 49 49 50 - fn worker(allocator: Allocator, api_key: []const u8) void { 50 + fn worker(allocator: Allocator, api_key: []const u8, io: Io) void { 51 51 // wait for db to be ready 52 - compat.sleep(5 * std.time.ns_per_s); 52 + io.sleep(Io.Duration.fromSeconds(5), .awake) catch {}; 53 53 54 54 var consecutive_errors: u32 = 0; 55 55 56 56 while (true) { 57 - const processed = processNextBatch(allocator, api_key) catch |err| { 57 + const processed = processNextBatch(allocator, api_key, io) catch |err| { 58 58 consecutive_errors += 1; 59 59 const backoff: u64 = @min(ERROR_BACKOFF_SECS * consecutive_errors, 3600); 60 60 logfire.warn("embedder: error {}, backing off {d}s", .{ err, backoff }); 61 - compat.sleep(backoff * std.time.ns_per_s); 61 + io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake) catch {}; 62 62 continue; 63 63 }; 64 64 ··· 71 71 72 72 // no work, sleep 73 73 consecutive_errors = 0; 74 - compat.sleep(POLL_INTERVAL_SECS * std.time.ns_per_s); 74 + io.sleep(Io.Duration.fromSeconds(@intCast(POLL_INTERVAL_SECS)), .awake) catch {}; 75 75 } 76 76 } 77 77 ··· 105 105 } 106 106 }; 107 107 108 - fn processNextBatch(allocator: Allocator, api_key: []const u8) !usize { 108 + fn processNextBatch(allocator: Allocator, api_key: []const u8, io: Io) !usize { 109 109 const span = logfire.span("embedder.process_batch", .{}); 110 110 defer span.end(); 111 111 ··· 133 133 if (docs.items.len == 0) return 0; 134 134 135 135 // call Voyage API 136 - const embeddings = try callVoyageApi(allocator, api_key, docs.items); 136 + const embeddings = try callVoyageApi(allocator, api_key, docs.items, io); 137 137 defer { 138 138 for (embeddings) |e| allocator.free(e); 139 139 allocator.free(embeddings); ··· 176 176 177 177 // mark docs as embedded in turso 178 178 // generate timestamp in Zig (avoids any strftime/pipeline API quirks) 179 - const now_ts = compat.timestamp(); 179 + const now_ts: i64 = @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 180 180 const epoch_secs: u64 = @intCast(now_ts); 181 181 const epoch = std.time.epoch.EpochSeconds{ .secs = epoch_secs }; 182 182 const day = epoch.getDaySeconds(); ··· 241 241 } 242 242 } 243 243 244 - fn callVoyageApi(allocator: Allocator, api_key: []const u8, docs: []const DocToEmbed) ![][]f32 { 244 + fn callVoyageApi(allocator: Allocator, api_key: []const u8, docs: []const DocToEmbed, io: Io) ![][]f32 { 245 245 const span = logfire.span("embedder.voyage_api", .{ 246 246 .batch_size = @as(i64, @intCast(docs.len)), 247 247 }); 248 248 defer span.end(); 249 249 250 - var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 250 + var http_client: http.Client = .{ .allocator = allocator, .io = io }; 251 251 defer http_client.deinit(); 252 252 253 253 // build request body
+7 -3
backend/src/ingest/indexer.zig
··· 1 1 const std = @import("std"); 2 + const Io = std.Io; 2 3 const logfire = @import("logfire"); 3 4 const db = @import("../db.zig"); 4 - const compat = @import("../compat.zig"); 5 5 6 6 /// Hash title+content for cross-platform dedup. 7 7 /// Returns a 16-char hex string (wyhash of "title\x00content"). ··· 284 284 ) catch {}; 285 285 } 286 286 287 + fn currentTimestamp(io: Io) i64 { 288 + return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 289 + } 290 + 287 291 pub fn deleteDocument(uri: []const u8) void { 288 292 const c = db.getClient() orelse return; 289 293 290 294 // record tombstone 291 295 var ts_buf: [20]u8 = undefined; 292 - const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 296 + const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{currentTimestamp(c.io)}) catch "0"; 293 297 c.exec( 294 298 "INSERT OR REPLACE INTO tombstones (uri, record_type, deleted_at) VALUES (?, 'document', ?)", 295 299 &.{ uri, ts }, ··· 305 309 306 310 // record tombstone 307 311 var ts_buf: [20]u8 = undefined; 308 - const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 312 + const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{currentTimestamp(c.io)}) catch "0"; 309 313 c.exec( 310 314 "INSERT OR REPLACE INTO tombstones (uri, record_type, deleted_at) VALUES (?, 'publication', ?)", 311 315 &.{ uri, ts },
+26 -16
backend/src/ingest/reconciler.zig
··· 13 13 const json = std.json; 14 14 const mem = std.mem; 15 15 const Allocator = mem.Allocator; 16 + const Io = std.Io; 16 17 const logfire = @import("logfire"); 17 - const compat = @import("../compat.zig"); 18 18 const db = @import("../db.zig"); 19 19 const tpuf = @import("../tpuf.zig"); 20 20 const indexer = @import("indexer.zig"); 21 21 22 22 // config (env vars with defaults) 23 + fn getenv(key: [*:0]const u8) ?[]const u8 { 24 + return if (std.c.getenv(key)) |p| std.mem.span(p) else null; 25 + } 26 + 23 27 fn getIntervalSecs() u64 { 24 - const val = compat.getenv("RECONCILE_INTERVAL_SECS") orelse "1800"; 28 + const val = getenv("RECONCILE_INTERVAL_SECS") orelse "1800"; 25 29 return std.fmt.parseInt(u64, val, 10) catch 1800; 26 30 } 27 31 28 32 fn getBatchSize() usize { 29 - const val = compat.getenv("RECONCILE_BATCH_SIZE") orelse "50"; 33 + const val = getenv("RECONCILE_BATCH_SIZE") orelse "50"; 30 34 return std.fmt.parseInt(usize, val, 10) catch 50; 31 35 } 32 36 33 37 fn getReverifyDays() u64 { 34 - const val = compat.getenv("RECONCILE_REVERIFY_DAYS") orelse "7"; 38 + const val = getenv("RECONCILE_REVERIFY_DAYS") orelse "7"; 35 39 return std.fmt.parseInt(u64, val, 10) catch 7; 36 40 } 37 41 38 42 fn isEnabled() bool { 39 - const val = compat.getenv("RECONCILE_ENABLED") orelse "true"; 43 + const val = getenv("RECONCILE_ENABLED") orelse "true"; 40 44 return !mem.eql(u8, val, "false") and !mem.eql(u8, val, "0"); 41 45 } 46 + 47 + var global_io: ?Io = null; 42 48 43 49 /// AT-URI components parsed from "at://{did}/{collection}/{rkey}" 44 50 const UriParts = struct { ··· 65 71 } 66 72 67 73 /// Start the reconciler background worker. 68 - pub fn start(allocator: Allocator) void { 74 + pub fn start(allocator: Allocator, io: Io) void { 69 75 if (!isEnabled()) { 70 76 logfire.info("reconcile: disabled via RECONCILE_ENABLED", .{}); 71 77 return; 72 78 } 73 79 74 - const thread = std.Thread.spawn(.{}, worker, .{allocator}) catch |err| { 80 + global_io = io; 81 + const thread = std.Thread.spawn(.{}, worker, .{ allocator, io }) catch |err| { 75 82 logfire.err("reconcile: failed to start thread: {}", .{err}); 76 83 return; 77 84 }; ··· 79 86 logfire.info("reconcile: background worker started", .{}); 80 87 } 81 88 82 - fn worker(allocator: Allocator) void { 89 + fn worker(allocator: Allocator, io: Io) void { 83 90 // wait for db to be ready 84 - compat.sleep(10 * std.time.ns_per_s); 91 + io.sleep(Io.Duration.fromSeconds(10), .awake) catch {}; 85 92 86 93 // PDS cache: DID → PDS endpoint URL (persists across cycles) 87 94 var pds_cache = std.StringHashMap([]const u8).init(allocator); ··· 109 116 } 110 117 111 118 const interval = getIntervalSecs(); 112 - const backoff: u64 = if (consecutive_errors > 0) 119 + const backoff_secs: u64 = if (consecutive_errors > 0) 113 120 @min(interval * consecutive_errors, 3600) 114 121 else 115 122 interval; 116 - compat.sleep(backoff * std.time.ns_per_s); 123 + io.sleep(Io.Duration.fromSeconds(@intCast(backoff_secs)), .awake) catch {}; 117 124 } 118 125 } 119 126 ··· 136 143 var batch_str: [10]u8 = undefined; 137 144 const batch_str_val = std.fmt.bufPrint(&batch_str, "{d}", .{batch_size}) catch "50"; 138 145 139 - const cutoff_ts = formatTimestamp(compat.timestamp() - @as(i64, @intCast(reverify_days * 86400))); 146 + const io = global_io.?; 147 + const now_s: i64 = @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 148 + const cutoff_ts = formatTimestamp(now_s - @as(i64, @intCast(reverify_days * 86400))); 140 149 const cutoff = cutoff_ts.slice(); 141 150 142 151 var result = try client.query( ··· 220 229 } 221 230 222 231 // rate limit: 200ms between PDS requests 223 - compat.sleep(200 * std.time.ns_per_ms); 232 + io.sleep(Io.Duration.fromMilliseconds(200), .awake) catch {}; 224 233 } 225 234 226 235 // batch delete from tpuf ··· 245 254 } 246 255 247 256 fn updateVerifiedAt(client: *db.Client, uri: []const u8) void { 248 - const now = formatTimestamp(compat.timestamp()); 257 + const ts: i64 = @intCast(@divFloor(Io.Timestamp.now(global_io.?, .real).nanoseconds, std.time.ns_per_s)); 258 + const now = formatTimestamp(ts); 249 259 client.exec( 250 260 "UPDATE documents SET verified_at = ? WHERE uri = ?", 251 261 &.{ now.slice(), uri }, ··· 294 304 return .error_skip; 295 305 }; 296 306 297 - var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 307 + var http_client: http.Client = .{ .allocator = allocator, .io = global_io.? }; 298 308 defer http_client.deinit(); 299 309 300 310 var response_body: std.Io.Writer.Allocating = .init(allocator); ··· 336 346 var url_buf: [256]u8 = undefined; 337 347 const url = std.fmt.bufPrint(&url_buf, "https://plc.directory/{s}", .{did}) catch return null; 338 348 339 - var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 349 + var http_client: http.Client = .{ .allocator = allocator, .io = global_io.? }; 340 350 defer http_client.deinit(); 341 351 342 352 var response_body: std.Io.Writer.Allocating = .init(allocator);
+27 -26
backend/src/ingest/tap.zig
··· 7 7 const logfire = @import("logfire"); 8 8 const indexer = @import("indexer.zig"); 9 9 const extractor = @import("extractor.zig"); 10 + const Io = std.Io; 10 11 const tpuf = @import("../tpuf.zig"); 11 - const compat = @import("../compat.zig"); 12 12 13 13 // leaflet-specific collections 14 14 const LEAFLET_DOCUMENT = "pub.leaflet.document"; ··· 33 33 } 34 34 35 35 fn getTapHost() []const u8 { 36 - return compat.getenv("TAP_HOST") orelse "leaflet-search-tap.fly.dev"; 36 + return if (std.c.getenv("TAP_HOST")) |p| std.mem.span(p) else "leaflet-search-tap.fly.dev"; 37 37 } 38 38 39 39 fn getTapPort() u16 { 40 - const port_str = compat.getenv("TAP_PORT") orelse "443"; 40 + const port_str = if (std.c.getenv("TAP_PORT")) |p| std.mem.span(p) else "443"; 41 41 return std.fmt.parseInt(u16, port_str, 10) catch 443; 42 42 } 43 43 ··· 51 51 const QUEUE_CAPACITY = 256; 52 52 53 53 const ProcessQueue = struct { 54 - mutex: compat.Mutex = .{}, 55 - cond: compat.Condition = .{}, 54 + mutex: Io.Mutex = Io.Mutex.init, 55 + cond: Io.Condition = Io.Condition.init, 56 56 items: [QUEUE_CAPACITY]?[]u8 = .{null} ** QUEUE_CAPACITY, 57 57 head: usize = 0, 58 58 tail: usize = 0, 59 59 len: usize = 0, 60 60 stopped: bool = false, 61 61 allocator: Allocator, 62 + io: Io, 62 63 dropped: usize = 0, 63 64 processed: usize = 0, 64 65 65 66 fn push(self: *ProcessQueue, data: []u8) void { 66 - self.mutex.lock(); 67 - defer self.mutex.unlock(); 67 + self.mutex.lockUncancelable(self.io); 68 + defer self.mutex.unlock(self.io); 68 69 69 70 if (self.len == QUEUE_CAPACITY) { 70 71 // queue full — drop oldest (already ACK'd) ··· 82 83 self.items[self.tail] = data; 83 84 self.tail = (self.tail + 1) % QUEUE_CAPACITY; 84 85 self.len += 1; 85 - self.cond.signal(); 86 + self.cond.signal(self.io); 86 87 } 87 88 88 89 fn pop(self: *ProcessQueue) ?[]u8 { 89 - self.mutex.lock(); 90 - defer self.mutex.unlock(); 90 + self.mutex.lockUncancelable(self.io); 91 + defer self.mutex.unlock(self.io); 91 92 92 93 while (self.len == 0 and !self.stopped) { 93 - self.cond.wait(&self.mutex); 94 + self.cond.waitUncancelable(self.io, &self.mutex); 94 95 } 95 96 96 97 if (self.len == 0) return null; // stopped with empty queue ··· 103 104 } 104 105 105 106 fn stop(self: *ProcessQueue) void { 106 - self.mutex.lock(); 107 - defer self.mutex.unlock(); 107 + self.mutex.lockUncancelable(self.io); 108 + defer self.mutex.unlock(self.io); 108 109 self.stopped = true; 109 - self.cond.signal(); 110 + self.cond.signal(self.io); 110 111 } 111 112 112 113 fn drain(self: *ProcessQueue) void { 113 - self.mutex.lock(); 114 - defer self.mutex.unlock(); 114 + self.mutex.lockUncancelable(self.io); 115 + defer self.mutex.unlock(self.io); 115 116 for (&self.items) |*item| { 116 117 if (item.*) |data| { 117 118 self.allocator.free(data); ··· 128 129 processMessage(queue.allocator, data) catch |err| { 129 130 logfire.err("message processing error: {}", .{err}); 130 131 }; 131 - queue.mutex.lock(); 132 + queue.mutex.lockUncancelable(queue.io); 132 133 queue.processed += 1; 133 - queue.mutex.unlock(); 134 + queue.mutex.unlock(queue.io); 134 135 } 135 136 logfire.info("tap: process worker stopped (processed {d}, dropped {d})", .{ queue.processed, queue.dropped }); 136 137 } 137 138 138 - pub fn consumer(allocator: Allocator) void { 139 + pub fn consumer(allocator: Allocator, io: Io) void { 139 140 var backoff: u64 = 1; 140 141 const max_backoff: u64 = 30; 141 142 142 143 while (true) { 143 - const connected = connect(allocator); 144 + const connected = connect(allocator, io); 144 145 if (connected) |_| { 145 146 // connection succeeded then closed - reset backoff 146 147 backoff = 1; ··· 148 149 } else |err| { 149 150 // connection failed - backoff 150 151 logfire.warn("tap error: {}, reconnecting in {d}s", .{ err, backoff }); 151 - compat.sleepSecs(backoff); 152 + io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake) catch {}; 152 153 backoff = @min(backoff * 2, max_backoff); 153 154 } 154 155 } ··· 218 219 return zat.json.getInt(parsed.value, "id"); 219 220 } 220 221 221 - fn connect(allocator: Allocator) !void { 222 + fn connect(allocator: Allocator, io: Io) !void { 222 223 const host = getTapHost(); 223 224 const port = getTapPort(); 224 225 const tls = useTls(); ··· 226 227 227 228 logfire.info("connecting to {s}://{s}:{d}{s}", .{ if (tls) "wss" else "ws", host, port, path }); 228 229 229 - var client = websocket.Client.init(compat.getIo(), allocator, .{ 230 + var client = websocket.Client.init(io, allocator, .{ 230 231 .host = host, 231 232 .port = port, 232 233 .tls = tls, ··· 249 250 250 251 // processing queue + worker thread: decouples readLoop from turso writes 251 252 // so a slow/hung turso request never blocks ACKs 252 - var queue = ProcessQueue{ .allocator = allocator }; 253 - const worker = std.Thread.spawn(.{}, processWorker, .{&queue}) catch |err| { 253 + var queue = ProcessQueue{ .allocator = allocator, .io = io }; 254 + const worker_thread = std.Thread.spawn(.{}, processWorker, .{&queue}) catch |err| { 254 255 logfire.err("tap: failed to spawn process worker: {}", .{err}); 255 256 return err; 256 257 }; 257 258 defer { 258 259 queue.stop(); 259 - worker.join(); 260 + worker_thread.join(); 260 261 queue.drain(); 261 262 } 262 263
+17 -16
backend/src/main.zig
··· 7 7 const metrics = @import("metrics.zig"); 8 8 const server = @import("server.zig"); 9 9 const ingest = @import("ingest.zig"); 10 - const compat = @import("compat.zig"); 11 10 12 11 const SOCKET_TIMEOUT_SECS = 5; 13 12 ··· 21 20 // init Io backend for networking 22 21 threaded_io = Io.Threaded.init(allocator, .{}); 23 22 const io = threaded_io.io(); 24 - compat.initIo(io); 25 23 26 24 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 27 25 _ = logfire.configure(.{ 28 26 .service_name = "leaflet-search", 29 27 .service_version = "0.1.0", 30 - .environment = compat.getenv("FLY_APP_NAME") orelse "development", 28 + .environment = if (std.c.getenv("FLY_APP_NAME")) |p| std.mem.span(p) else "development", 31 29 }) catch |err| { 32 30 std.debug.print("logfire init failed: {}, continuing without observability\n", .{err}); 33 31 }; 34 32 35 33 // start http server FIRST so Fly proxy doesn't timeout 36 34 const port: u16 = blk: { 37 - const port_str = compat.getenv("PORT") orelse "3000"; 35 + const port_str = if (std.c.getenv("PORT")) |p| std.mem.span(p) else "3000"; 38 36 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000; 39 37 }; 40 38 ··· 45 43 }; 46 44 defer listener.deinit(io); 47 45 48 - const app_name = compat.getenv("APP_NAME") orelse "leaflet-search"; 46 + const app_name = if (std.c.getenv("APP_NAME")) |p| std.mem.span(p) else "leaflet-search"; 49 47 logfire.info("{s} listening on port {d}", .{ app_name, port }); 50 48 51 49 // init turso client synchronously (fast, needed for search fallback) 52 50 try db.initTurso(io); 53 51 54 52 // init local db and other services in background (slow) 55 - const init_thread = try Thread.spawn(.{}, initServices, .{allocator}); 53 + const init_thread = try Thread.spawn(.{}, initServices, .{ allocator, io }); 56 54 init_thread.detach(); 57 55 58 56 // thread-per-connection (Thread.Pool removed in 0.16) ··· 66 64 logfire.warn("failed to set socket timeout: {}", .{err}); 67 65 }; 68 66 69 - const accepted_at = compat.microTimestamp(); 67 + const accepted_at = Io.Timestamp.now(io, .real).toMicroseconds(); 70 68 const thread = Thread.spawn(.{}, server.handleConnection, .{ stream, io, accepted_at }) catch |err| { 71 69 logfire.err("spawn error: {}", .{err}); 72 70 stream.close(io); ··· 76 74 } 77 75 } 78 76 79 - fn initServices(allocator: std.mem.Allocator) void { 77 + fn initServices(allocator: std.mem.Allocator, io: Io) void { 80 78 // run schema migrations first (idempotent, but may be slow if turso is laggy) 81 79 db.initSchema(); 82 80 83 81 // init local db (slow - turso already initialized) 84 - db.initLocalDb(); 85 - db.startSync(); 82 + db.initLocalDb(io); 83 + db.startSync(io); 86 84 87 85 // start activity tracker 88 - metrics.activity.init(); 86 + metrics.activity.init(io); 89 87 90 88 // start stats buffer (background sync to Turso) 91 - metrics.buffer.init(); 89 + metrics.buffer.init(io); 90 + 91 + // init timing module 92 + metrics.timing.setIo(io); 92 93 93 94 // init vector store (reads TURBOPUFFER_API_KEY from env) 94 - tpuf.init(); 95 + tpuf.init(io); 95 96 tpuf.startKeepalive(allocator); 96 97 97 98 // keep turso connection warm (avoids ~1s TLS handshake on first query after idle) 98 99 db.startKeepalive(); 99 100 100 101 // start reconciler (verifies documents still exist at source PDS) 101 - ingest.reconciler.start(allocator); 102 + ingest.reconciler.start(allocator, io); 102 103 103 104 // start embedder (voyage-4-lite, 1024 dims, 1 worker) 104 - ingest.embedder.start(allocator); 105 + ingest.embedder.start(allocator, io); 105 106 106 107 // start tap consumer 107 - ingest.tap.consumer(allocator); 108 + ingest.tap.consumer(allocator, io); 108 109 } 109 110 110 111 fn setSocketTimeout(fd: std.posix.fd_t, secs: u32) !void {
+17 -12
backend/src/metrics/activity.zig
··· 1 1 const std = @import("std"); 2 - const compat = @import("../compat.zig"); 2 + const Io = std.Io; 3 3 4 4 // ring buffer for real-time search activity 5 5 pub const SLOTS = 60; ··· 7 7 8 8 var counts: [SLOTS]u16 = .{0} ** SLOTS; 9 9 var slot: usize = 0; 10 - var mutex: compat.Mutex = .{}; 11 - var thread: ?std.Thread = null; 10 + var mutex: Io.Mutex = Io.Mutex.init; 11 + var global_io: ?Io = null; 12 12 13 13 fn tickLoop() void { 14 + const io = global_io.?; 14 15 while (true) { 15 - compat.sleepMs(TICK_MS); 16 - mutex.lock(); 16 + io.sleep(Io.Duration.fromMilliseconds(TICK_MS), .awake) catch {}; 17 + mutex.lockUncancelable(io); 17 18 slot = (slot + 1) % SLOTS; 18 19 counts[slot] = 0; 19 - mutex.unlock(); 20 + mutex.unlock(io); 20 21 } 21 22 } 22 23 23 - pub fn init() void { 24 - thread = std.Thread.spawn(.{}, tickLoop, .{}) catch null; 24 + pub fn init(io: Io) void { 25 + global_io = io; 26 + const thread = std.Thread.spawn(.{}, tickLoop, .{}) catch return; 27 + thread.detach(); 25 28 } 26 29 27 30 pub fn getCounts() [SLOTS]u16 { 28 - mutex.lock(); 29 - defer mutex.unlock(); 31 + const io = global_io.?; 32 + mutex.lockUncancelable(io); 33 + defer mutex.unlock(io); 30 34 var result: [SLOTS]u16 = undefined; 31 35 for (0..SLOTS) |i| { 32 36 const idx = (slot + 1 + i) % SLOTS; ··· 36 40 } 37 41 38 42 pub fn record() void { 39 - mutex.lock(); 40 - defer mutex.unlock(); 43 + const io = global_io.?; 44 + mutex.lockUncancelable(io); 45 + defer mutex.unlock(io); 41 46 counts[slot] +|= 1; 42 47 }
+14 -8
backend/src/metrics/buffer.zig
··· 2 2 //! Follows activity.zig pattern: instant local writes, periodic remote sync 3 3 4 4 const std = @import("std"); 5 + const Io = std.Io; 5 6 const db = @import("../db.zig"); 6 7 const logfire = @import("logfire"); 7 - const compat = @import("../compat.zig"); 8 8 9 9 const SYNC_INTERVAL_MS = 5000; // 5 seconds 10 10 const MAX_PENDING_SEARCHES = 256; ··· 27 27 var pending_searches: [MAX_PENDING_SEARCHES]?[]const u8 = .{null} ** MAX_PENDING_SEARCHES; 28 28 var search_write_idx: usize = 0; 29 29 var search_read_idx: usize = 0; 30 - var search_mutex: compat.Mutex = .{}; 30 + var search_mutex: Io.Mutex = Io.Mutex.init; 31 + var global_io: ?Io = null; 31 32 32 33 // allocator for search string copies 33 34 const search_allocator = std.heap.smp_allocator; 34 35 35 36 var sync_thread: ?std.Thread = null; 36 37 37 - pub fn init() void { 38 + pub fn init(io: Io) void { 39 + global_io = io; 40 + 38 41 // seed cache immediately (so first /stats request is fast) 39 42 if (db.getClient()) |c| { 40 43 refreshCachedStats(c); ··· 69 72 // queue popular search (best effort, drops if full) 70 73 pub fn queuePopularSearch(query: []const u8) void { 71 74 if (query.len < 2) return; 75 + const io = global_io.?; 72 76 73 - search_mutex.lock(); 74 - defer search_mutex.unlock(); 77 + search_mutex.lockUncancelable(io); 78 + defer search_mutex.unlock(io); 75 79 76 80 // check if buffer is full 77 81 const next_write = (search_write_idx + 1) % MAX_PENDING_SEARCHES; ··· 128 132 } 129 133 130 134 fn syncLoop() void { 135 + const io = global_io.?; 131 136 while (true) { 132 - compat.sleepMs(SYNC_INTERVAL_MS); 137 + io.sleep(Io.Duration.fromMilliseconds(SYNC_INTERVAL_MS), .awake) catch {}; 133 138 syncToTurso(); 134 139 } 135 140 } ··· 214 219 } 215 220 216 221 fn syncPopularSearches(c: *db.Client) void { 217 - search_mutex.lock(); 218 - defer search_mutex.unlock(); 222 + const io = global_io.?; 223 + search_mutex.lockUncancelable(io); 224 + defer search_mutex.unlock(io); 219 225 220 226 var synced: usize = 0; 221 227 while (search_read_idx != search_write_idx) {
+33 -12
backend/src/metrics/timing.zig
··· 1 1 const std = @import("std"); 2 - const compat = @import("../compat.zig"); 2 + const Io = std.Io; 3 3 4 4 /// endpoints we track latency for 5 5 pub const Endpoint = enum { ··· 78 78 79 79 var buffers: [ENDPOINT_COUNT]LatencyBuffer = [_]LatencyBuffer{.{}} ** ENDPOINT_COUNT; 80 80 var hourly: [ENDPOINT_COUNT][HOURS_TO_KEEP]HourlyBucket = [_][HOURS_TO_KEEP]HourlyBucket{[_]HourlyBucket{.{}} ** HOURS_TO_KEEP} ** ENDPOINT_COUNT; 81 - var mutex: compat.Mutex = .{}; 81 + var mutex: Io.Mutex = Io.Mutex.init; 82 + var global_io: ?Io = null; 82 83 var initialized: bool = false; 84 + 85 + pub fn setIo(io: Io) void { 86 + global_io = io; 87 + } 88 + 89 + fn getIo() Io { 90 + return global_io.?; 91 + } 83 92 84 93 fn getCurrentHour() i64 { 85 - const now_s = @divFloor(compat.timestamp(), 3600) * 3600; 94 + const now_s = @divFloor(timestamp(), 3600) * 3600; 86 95 return now_s; 96 + } 97 + 98 + fn timestamp() i64 { 99 + return @intCast(@divFloor(Io.Timestamp.now(getIo(), .real).nanoseconds, std.time.ns_per_s)); 100 + } 101 + 102 + fn microTimestamp() i64 { 103 + return Io.Timestamp.now(getIo(), .real).toMicroseconds(); 87 104 } 88 105 89 106 fn getHourIndex(hour: i64) usize { ··· 93 110 94 111 /// record a request latency (call after request completes) 95 112 pub fn record(endpoint: Endpoint, start_time: i64) void { 96 - const now = compat.microTimestamp(); 113 + const io = getIo(); 114 + const now = microTimestamp(); 97 115 const elapsed_us: u32 = @intCast(@max(0, now - start_time)); 98 116 const current_hour = getCurrentHour(); 99 117 const hour_idx = getHourIndex(current_hour); 100 118 101 - mutex.lock(); 102 - defer mutex.unlock(); 119 + mutex.lockUncancelable(io); 120 + defer mutex.unlock(io); 103 121 104 122 ensureInitialized(); 105 123 ··· 221 239 222 240 /// get stats for a specific endpoint 223 241 pub fn getStats(endpoint: Endpoint) EndpointStats { 224 - mutex.lock(); 225 - defer mutex.unlock(); 242 + const io = getIo(); 243 + mutex.lockUncancelable(io); 244 + defer mutex.unlock(io); 226 245 227 246 ensureInitialized(); 228 247 ··· 259 278 260 279 /// get time series for an endpoint (last 24 hours, for latency charts) 261 280 pub fn getTimeSeries(endpoint: Endpoint) [LATENCY_HISTORY_HOURS]TimeSeriesPoint { 262 - mutex.lock(); 263 - defer mutex.unlock(); 281 + const io = getIo(); 282 + mutex.lockUncancelable(io); 283 + defer mutex.unlock(io); 264 284 265 285 ensureInitialized(); 266 286 ··· 306 326 307 327 /// get aggregate traffic series (all endpoints summed, last 720 hours) 308 328 pub fn getTrafficSeries() [HOURS_TO_KEEP]TrafficPoint { 309 - mutex.lock(); 310 - defer mutex.unlock(); 329 + const io = getIo(); 330 + mutex.lockUncancelable(io); 331 + defer mutex.unlock(io); 311 332 312 333 ensureInitialized(); 313 334
+18 -15
backend/src/server.zig
··· 7 7 const logfire = @import("logfire"); 8 8 const zql = @import("zql"); 9 9 const zat = @import("zat"); 10 - const compat = @import("compat.zig"); 11 10 const db = @import("db.zig"); 12 11 const metrics = @import("metrics.zig"); 13 12 const search = @import("server/search.zig"); ··· 16 15 const HTTP_BUF_SIZE = 65536; 17 16 const QUERY_PARAM_BUF_SIZE = 64; 18 17 18 + fn microTimestamp(io: Io) i64 { 19 + return Io.Timestamp.now(io, .real).toMicroseconds(); 20 + } 21 + 19 22 pub fn handleConnection(stream: Io.net.Stream, io: Io, accepted_at: i64) void { 20 23 defer stream.close(io); 21 24 22 - const queue_us = compat.microTimestamp() - accepted_at; 25 + const queue_us = microTimestamp(io) - accepted_at; 23 26 if (queue_us > 100_000) { // > 100ms 24 27 logfire.warn("http.queue slow: {d}ms", .{@divTrunc(queue_us, 1000)}); 25 28 } ··· 33 36 var server = http.Server.init(&reader.interface, &writer.interface); 34 37 35 38 while (true) { 36 - const recv_start = compat.microTimestamp(); 39 + const recv_start = microTimestamp(io); 37 40 var request = server.receiveHead() catch |err| { 38 41 if (err != error.HttpConnectionClosing and err != error.EndOfStream) { 39 42 logfire.debug("http receive error: {}", .{err}); 40 43 } 41 44 return; 42 45 }; 43 - const recv_us = compat.microTimestamp() - recv_start; 46 + const recv_us = microTimestamp(io) - recv_start; 44 47 const target = request.head.target; 45 48 46 49 const req_span = logfire.span("http.request", .{ ··· 76 79 if (mem.startsWith(u8, path, "/search")) { 77 80 try handleSearch(request, target, io); 78 81 } else if (mem.eql(u8, path, "/tags")) { 79 - try handleTags(request, target); 82 + try handleTags(request, target, io); 80 83 } else if (mem.eql(u8, path, "/stats")) { 81 84 try handleStats(request); 82 85 } else if (mem.eql(u8, path, "/health")) { 83 86 try sendJson(request, "{\"status\":\"ok\"}"); 84 87 } else if (mem.eql(u8, path, "/popular")) { 85 - try handlePopular(request, target); 88 + try handlePopular(request, target, io); 86 89 } else if (mem.eql(u8, path, "/dashboard")) { 87 90 try handleDashboard(request); 88 91 } else if (mem.eql(u8, path, "/api/dashboard")) { 89 92 try handleDashboardApi(request); 90 93 } else if (mem.startsWith(u8, path, "/similar")) { 91 - try handleSimilar(request, target); 94 + try handleSimilar(request, target, io); 92 95 } else if (mem.eql(u8, path, "/activity")) { 93 96 try handleActivity(request); 94 97 } else { ··· 97 100 } 98 101 99 102 fn handleSearch(request: *http.Server.Request, target: []const u8, io: Io) !void { 100 - const start_time = compat.microTimestamp(); 103 + const start_time = microTimestamp(io); 101 104 102 105 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 103 106 defer arena.deinit(); ··· 168 171 } 169 172 } 170 173 171 - fn handleTags(request: *http.Server.Request, target: []const u8) !void { 172 - const start_time = compat.microTimestamp(); 174 + fn handleTags(request: *http.Server.Request, target: []const u8, io: Io) !void { 175 + const start_time = microTimestamp(io); 173 176 defer metrics.timing.record(.tags, start_time); 174 177 175 178 const span = logfire.span("http.tags", .{}); ··· 190 193 } 191 194 } 192 195 193 - fn handlePopular(request: *http.Server.Request, target: []const u8) !void { 194 - const start_time = compat.microTimestamp(); 196 + fn handlePopular(request: *http.Server.Request, target: []const u8, io: Io) !void { 197 + const start_time = microTimestamp(io); 195 198 defer metrics.timing.record(.popular, start_time); 196 199 197 200 const span = logfire.span("http.popular", .{}); ··· 448 451 } 449 452 450 453 fn getDashboardUrl() []const u8 { 451 - return compat.getenv("DASHBOARD_URL") orelse "https://pub-search.waow.tech/dashboard.html"; 454 + return if (std.c.getenv("DASHBOARD_URL")) |p| std.mem.span(p) else "https://pub-search.waow.tech/dashboard.html"; 452 455 } 453 456 454 457 fn handleDashboard(request: *http.Server.Request) !void { ··· 461 464 }); 462 465 } 463 466 464 - fn handleSimilar(request: *http.Server.Request, target: []const u8) !void { 465 - const start_time = compat.microTimestamp(); 467 + fn handleSimilar(request: *http.Server.Request, target: []const u8, io: Io) !void { 468 + const start_time = microTimestamp(io); 466 469 defer metrics.timing.record(.similar, start_time); 467 470 468 471 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
+1 -3
backend/src/server/dashboard.zig
··· 28 28 traffic_json: []const u8, 29 29 }; 30 30 31 - const compat = @import("../compat.zig"); 32 - 33 31 fn getRelayUrl() []const u8 { 34 - return compat.getenv("TAP_RELAY_URL") orelse "unknown"; 32 + return if (std.c.getenv("TAP_RELAY_URL")) |p| std.mem.span(p) else "unknown"; 35 33 } 36 34 37 35 // all dashboard queries batched into one request
+12 -8
backend/src/tpuf.zig
··· 13 13 const http = std.http; 14 14 const mem = std.mem; 15 15 const Allocator = mem.Allocator; 16 - const compat = @import("compat.zig"); 16 + const Io = std.Io; 17 17 const logfire = @import("logfire"); 18 + 19 + var global_io: ?Io = null; 18 20 19 21 const Sha256 = std.crypto.hash.sha2.Sha256; 20 22 ··· 63 65 }; 64 66 65 67 /// Read config from environment. Call once at startup. 66 - pub fn init() void { 67 - api_key = compat.getenv("TURBOPUFFER_API_KEY"); 68 - if (compat.getenv("TURBOPUFFER_NAMESPACE")) |ns| { 68 + pub fn init(io: Io) void { 69 + global_io = io; 70 + api_key = if (std.c.getenv("TURBOPUFFER_API_KEY")) |p| std.mem.span(p) else null; 71 + if (if (std.c.getenv("TURBOPUFFER_NAMESPACE")) |p| std.mem.span(p) else null) |ns| { 69 72 namespace = ns; 70 73 } 71 74 ··· 86 89 logfire.info("tpuf: TURBOPUFFER_API_KEY not set, vector store disabled", .{}); 87 90 } 88 91 89 - voyage_api_key = compat.getenv("VOYAGE_API_KEY"); 92 + voyage_api_key = if (std.c.getenv("VOYAGE_API_KEY")) |p| std.mem.span(p) else null; 90 93 if (voyage_api_key != null) { 91 94 logfire.info("tpuf: voyage query embedding enabled", .{}); 92 95 } ··· 130 133 defer allocator.free(body); 131 134 132 135 // make request 133 - var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 136 + var http_client: http.Client = .{ .allocator = allocator, .io = global_io.? }; 134 137 defer http_client.deinit(); 135 138 136 139 var auth_buf: [256]u8 = undefined; ··· 553 556 // --- HTTP --- 554 557 555 558 fn doRequest(allocator: Allocator, key: []const u8, url: []const u8, body: []const u8) ![]const u8 { 556 - var client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 559 + var client: http.Client = .{ .allocator = allocator, .io = global_io.? }; 557 560 defer client.deinit(); 558 561 559 562 var auth_buf: [256]u8 = undefined; ··· 604 607 } 605 608 606 609 fn keepaliveLoop(allocator: Allocator) void { 610 + const io = global_io.?; 607 611 // minimal query body: rank by ID, top_k=1, no attributes 608 612 const ping_body = "{\"rank_by\":[\"id\",\"asc\"],\"top_k\":1,\"include_attributes\":[]}"; 609 613 while (true) { 610 - compat.sleep(KEEPALIVE_INTERVAL_NS); 614 + io.sleep(Io.Duration.fromNanoseconds(KEEPALIVE_INTERVAL_NS), .awake) catch {}; 611 615 const key = api_key orelse return; 612 616 const response = doRequest(allocator, key, query_url, ping_body) catch |err| { 613 617 logfire.debug("tpuf: keepalive ping failed: {}", .{err});