search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: upgrade backend from zig 0.15 to 0.16

- smp_allocator replaces GeneralPurposeAllocator (removed in 0.16)
- std.Io.Threaded backend for async I/O (networking, http)
- std.Io.net replaces std.net for TCP server
- thread-per-connection replaces Thread.Pool (removed)
- compat.zig wraps removed APIs: getenv, sleep, timestamp, Mutex
- C file I/O replaces std.fs (removed) in timing.zig, LocalDb.zig
- ArrayList .empty replaces .{} init syntax
- all deps updated to 0.16-compatible hashes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+346 -183
+1
.gitignore
··· 7 7 zig-out/ 8 8 .loq_cache 9 9 site/atlas.json 10 + zig-pkg/
+4 -3
backend/Dockerfile
··· 7 7 xz-utils \ 8 8 && rm -rf /var/lib/apt/lists/* 9 9 10 - # install zig 0.15.2 11 - RUN curl -L https://ziglang.org/download/0.15.2/zig-x86_64-linux-0.15.2.tar.xz | tar -xJ -C /usr/local \ 12 - && ln -s /usr/local/zig-x86_64-linux-0.15.2/zig /usr/local/bin/zig 10 + # install zig 0.16.0-dev 11 + ARG ZIG_VERSION=0.16.0-dev.3059+42e33db9d 12 + RUN curl -L https://ziglang.org/builds/zig-x86_64-linux-${ZIG_VERSION}.tar.xz | tar -xJ -C /usr/local \ 13 + && ln -s /usr/local/zig-x86_64-linux-${ZIG_VERSION}/zig /usr/local/bin/zig 13 14 14 15 WORKDIR /app 15 16 COPY build.zig build.zig.zon ./
+2 -1
backend/build.zig
··· 44 44 .root_source_file = b.path("src/main.zig"), 45 45 .target = target, 46 46 .optimize = optimize, 47 + .link_libc = true, 47 48 .imports = imports, 48 49 }), 49 50 }); ··· 65 66 .root_source_file = b.path("src/main.zig"), 66 67 .target = target, 67 68 .optimize = optimize, 69 + .link_libc = true, 68 70 .imports = imports, 69 71 }), 70 72 }); 71 - unit_tests.linkLibC(); 72 73 73 74 const run_tests = b.addRunArtifact(unit_tests); 74 75 const test_step = b.step("test", "Run unit tests");
+7 -7
backend/build.zig.zon
··· 2 2 .name = .leaflet_search, 3 3 .version = "0.0.1", 4 4 .fingerprint = 0x4a432eb7171f22eb, 5 - .minimum_zig_version = "0.15.0", 5 + .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .websocket = .{ 8 - .url = "https://github.com/zzstoatzz/websocket.zig/archive/9e6d732b207bdb0cb5fe5efb37a8173ac9638051.tar.gz", 9 - .hash = "websocket-0.1.0-ZPISdeJ2AwC8rczCVo9NwFzIzW7EdvoXlNkNR_P-bdaf", 8 + .url = "https://github.com/zzstoatzz/websocket.zig/archive/80c6434.tar.gz", 9 + .hash = "websocket-0.1.0-ZPISdTHwAwA1d45BsYRE81Z8wNwZ3RhukgNADOma4eym", 10 10 }, 11 11 .zql = .{ 12 12 .url = "https://github.com/zzstoatzz/zql/archive/main.tar.gz", 13 13 .hash = "zql-0.0.1-alpha-xNRI4IRNAABUb9gLat5FWUaZDD5HvxAxet_-elgR_A_y", 14 14 }, 15 15 .zat = .{ 16 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.13", 17 - .hash = "zat-0.2.13-5PuC7tDBBAAchi_u_Myjr1hVhDbOollod03nbXqXHFn_", 16 + .url = "https://tangled.sh/zat.dev/zat/archive/v0.3.0-alpha.16", 17 + .hash = "zat-0.3.0-alpha.15-5PuC7nVhBQC8QDphqwrmXGqng1Xvo8ua_H5MqS-smp1T", 18 18 }, 19 19 .zqlite = .{ 20 20 .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 21 21 .hash = "zqlite-0.0.1-RWLaYz6bmAAT7E_jxopXf-j5Ea8VQldnxsd6TU8sa0Bb", 22 22 }, 23 23 .logfire = .{ 24 - .url = "https://tangled.org/zzstoatzz.io/logfire-zig/archive/main", 25 - .hash = "logfire_zig-0.1.0-x2yDLgdwAADOXAZLNQJ8FUH5v1vfFwe5CApJtQ7c_pZd", 24 + .url = "https://tangled.org/zzstoatzz.io/logfire-zig/archive/zig-0.16", 25 + .hash = "logfire_zig-0.1.0-x2yDLr5wAADNrQCuIHDTBMgyrohjJkhSSXvJRuWU5SPv", 26 26 }, 27 27 }, 28 28 .paths = .{
+98
backend/src/compat.zig
··· 1 + //! Zig 0.16 compatibility helpers. 2 + //! Replaces removed APIs: posix.getenv, std.time.microTimestamp, 3 + //! std.time.timestamp, std.Thread.sleep, std.Thread.Mutex/Condition. 4 + //! Also provides global Io access for http.Client and other networking. 5 + 6 + const std = @import("std"); 7 + 8 + // --- global Io (set once in main, used by http.Client consumers) --- 9 + 10 + var global_io: ?std.Io = null; 11 + 12 + pub fn initIo(io: std.Io) void { 13 + global_io = io; 14 + } 15 + 16 + pub fn getIo() std.Io { 17 + return global_io.?; 18 + } 19 + 20 + // --- environment variables --- 21 + 22 + pub fn getenv(key: [*:0]const u8) ?[]const u8 { 23 + return if (std.c.getenv(key)) |p| std.mem.span(p) else null; 24 + } 25 + 26 + // --- timestamps --- 27 + 28 + pub fn microTimestamp() i64 { 29 + var ts: std.c.timespec = undefined; 30 + _ = std.c.clock_gettime(.REALTIME, &ts); 31 + return @as(i64, ts.sec) * 1_000_000 + @divTrunc(@as(i64, ts.nsec), 1_000); 32 + } 33 + 34 + pub fn timestamp() i64 { 35 + var ts: std.c.timespec = undefined; 36 + _ = std.c.clock_gettime(.REALTIME, &ts); 37 + return @as(i64, ts.sec); 38 + } 39 + 40 + // --- sleep --- 41 + 42 + pub fn sleep(ns: u64) void { 43 + var req: std.c.timespec = .{ 44 + .sec = @intCast(@divFloor(ns, std.time.ns_per_s)), 45 + .nsec = @intCast(@mod(ns, std.time.ns_per_s)), 46 + }; 47 + while (true) { 48 + var rem: std.c.timespec = undefined; 49 + const rc = std.c.nanosleep(&req, &rem); 50 + if (rc == 0) return; 51 + // EINTR: interrupted, retry with remaining time 52 + req = rem; 53 + } 54 + } 55 + 56 + pub fn sleepSecs(secs: u64) void { 57 + sleep(secs * std.time.ns_per_s); 58 + } 59 + 60 + pub fn sleepMs(ms: u64) void { 61 + sleep(ms * std.time.ns_per_ms); 62 + } 63 + 64 + // --- mutex (pthread-based, works without Io) --- 65 + 66 + pub const Mutex = struct { 67 + inner: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, 68 + 69 + pub fn lock(self: *Mutex) void { 70 + _ = std.c.pthread_mutex_lock(&self.inner); 71 + } 72 + 73 + pub fn unlock(self: *Mutex) void { 74 + _ = std.c.pthread_mutex_unlock(&self.inner); 75 + } 76 + 77 + pub fn tryLock(self: *Mutex) bool { 78 + return std.c.pthread_mutex_trylock(&self.inner) == .SUCCESS; 79 + } 80 + }; 81 + 82 + // --- condition variable --- 83 + 84 + pub const Condition = struct { 85 + inner: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, 86 + 87 + pub fn wait(self: *Condition, mutex: *Mutex) void { 88 + _ = std.c.pthread_cond_wait(&self.inner, &mutex.inner); 89 + } 90 + 91 + pub fn signal(self: *Condition) void { 92 + _ = std.c.pthread_cond_signal(&self.inner); 93 + } 94 + 95 + pub fn broadcast(self: *Condition) void { 96 + _ = std.c.pthread_cond_broadcast(&self.inner); 97 + } 98 + };
+10 -9
backend/src/db.zig
··· 1 1 const std = @import("std"); 2 - const posix = std.posix; 2 + const Io = std.Io; 3 + const compat = @import("compat.zig"); 3 4 4 5 const schema = @import("db/schema.zig"); 5 6 const result = @import("db/result.zig"); ··· 13 14 pub const BatchResult = result.BatchResult; 14 15 15 16 // global state 16 - var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 17 + const allocator = std.heap.smp_allocator; 17 18 var client: ?Client = null; 18 19 var sync_client: ?Client = null; 19 20 var local_db: ?LocalDb = null; ··· 21 22 /// Initialize Turso client only (fast, call synchronously at startup). 22 23 /// Schema migrations run separately via initSchema() in the background thread 23 24 /// so a slow/unreachable turso doesn't block the accept loop. 24 - pub fn initTurso() !void { 25 - client = try Client.init(gpa.allocator()); 26 - sync_client = try Client.init(gpa.allocator()); 25 + pub fn initTurso(io: Io) !void { 26 + client = try Client.init(allocator, io); 27 + sync_client = try Client.init(allocator, io); 27 28 } 28 29 29 30 /// Run schema migrations (idempotent). Call from background thread. ··· 44 45 45 46 fn initLocal() !void { 46 47 // check if local db is disabled 47 - if (posix.getenv("LOCAL_DB_ENABLED")) |val| { 48 + if (compat.getenv("LOCAL_DB_ENABLED")) |val| { 48 49 if (std.mem.eql(u8, val, "false") or std.mem.eql(u8, val, "0")) { 49 50 std.debug.print("local db disabled via LOCAL_DB_ENABLED\n", .{}); 50 51 return; 51 52 } 52 53 } 53 54 54 - local_db = LocalDb.init(gpa.allocator()); 55 + local_db = LocalDb.init(allocator); 55 56 try local_db.?.open(); 56 57 } 57 58 ··· 106 107 107 108 // get sync interval from env (default 5 minutes) 108 109 const interval_secs: u64 = blk: { 109 - const env_val = posix.getenv("SYNC_INTERVAL_SECS") orelse "300"; 110 + const env_val = compat.getenv("SYNC_INTERVAL_SECS") orelse "300"; 110 111 break :blk std.fmt.parseInt(u64, env_val, 10) catch 300; 111 112 }; 112 113 ··· 114 115 115 116 // periodic incremental sync 116 117 while (true) { 117 - std.Thread.sleep(interval_secs * std.time.ns_per_s); 118 + compat.sleepSecs(interval_secs); 118 119 sync.incrementalSync(turso, local) catch |err| { 119 120 std.debug.print("sync: incremental sync failed: {}\n", .{err}); 120 121 };
+9 -7
backend/src/db/Client.zig
··· 2 2 //! https://docs.turso.tech/sdk/http/reference 3 3 4 4 const std = @import("std"); 5 + const Io = std.Io; 5 6 const http = std.http; 6 7 const json = std.json; 7 8 const mem = std.mem; 8 9 const Allocator = mem.Allocator; 9 10 const logfire = @import("logfire"); 11 + const compat = @import("../compat.zig"); 10 12 11 13 const result = @import("result.zig"); 12 14 pub const Result = result.Result; ··· 27 29 allocator: Allocator, 28 30 url: []const u8, 29 31 token: []const u8, 30 - mutex: std.Thread.Mutex = .{}, 32 + mutex: compat.Mutex = .{}, 31 33 http_client: http.Client, 32 34 33 - pub fn init(allocator: Allocator) !Client { 34 - const url = std.posix.getenv("TURSO_URL") orelse { 35 + pub fn init(allocator_param: Allocator, io: Io) !Client { 36 + const url = compat.getenv("TURSO_URL") orelse { 35 37 std.debug.print("TURSO_URL not set\n", .{}); 36 38 return error.MissingEnv; 37 39 }; 38 - const token = std.posix.getenv("TURSO_TOKEN") orelse { 40 + const token = compat.getenv("TURSO_TOKEN") orelse { 39 41 std.debug.print("TURSO_TOKEN not set\n", .{}); 40 42 return error.MissingEnv; 41 43 }; ··· 49 51 std.debug.print("turso client initialized: {s}\n", .{host}); 50 52 51 53 return .{ 52 - .allocator = allocator, 54 + .allocator = allocator_param, 53 55 .url = host, 54 56 .token = token, 55 - .http_client = .{ .allocator = allocator }, 57 + .http_client = .{ .allocator = allocator_param, .io = io }, 56 58 }; 57 59 } 58 60 ··· 333 335 334 336 fn keepaliveLoop(self: *Client) void { 335 337 while (true) { 336 - std.Thread.sleep(KEEPALIVE_INTERVAL_NS); 338 + compat.sleep(KEEPALIVE_INTERVAL_NS); 337 339 _ = self.exec("SELECT 1", .{}) catch |err| { 338 340 logfire.debug("turso: keepalive ping failed: {}", .{err}); 339 341 };
+15 -6
backend/src/db/LocalDb.zig
··· 2 2 //! Provides fast FTS5 queries while Turso remains source of truth 3 3 4 4 const std = @import("std"); 5 - const posix = std.posix; 6 5 const zqlite = @import("zqlite"); 7 6 const Allocator = std.mem.Allocator; 8 7 const logfire = @import("logfire"); 8 + const compat = @import("../compat.zig"); 9 9 10 10 const LocalDb = @This(); 11 11 ··· 14 14 allocator: Allocator, 15 15 is_ready: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 16 16 needs_resync: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), 17 - mutex: std.Thread.Mutex = .{}, // protects write conn only 17 + mutex: compat.Mutex = .{}, // protects write conn only 18 18 path: []const u8 = "", 19 19 consecutive_errors: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), 20 20 ··· 40 40 41 41 /// Delete the database file and WAL/SHM files 42 42 fn deleteDbFiles(path: []const u8) void { 43 - std.fs.cwd().deleteFile(path) catch {}; 43 + unlinkPath(path); 44 44 // also delete WAL and SHM files 45 45 var wal_buf: [260]u8 = undefined; 46 46 var shm_buf: [260]u8 = undefined; 47 47 if (path.len < 252) { 48 48 const wal_path = std.fmt.bufPrint(&wal_buf, "{s}-wal", .{path}) catch return; 49 49 const shm_path = std.fmt.bufPrint(&shm_buf, "{s}-shm", .{path}) catch return; 50 - std.fs.cwd().deleteFile(wal_path) catch {}; 51 - std.fs.cwd().deleteFile(shm_path) catch {}; 50 + unlinkPath(wal_path); 51 + unlinkPath(shm_path); 52 52 } 53 53 } 54 54 55 + /// Delete a file by path using C unlink (std.fs.cwd removed in 0.16) 56 + fn unlinkPath(path: []const u8) void { 57 + var buf: [260]u8 = undefined; 58 + if (path.len >= buf.len) return; 59 + @memcpy(buf[0..path.len], path); 60 + buf[path.len] = 0; 61 + _ = std.c.unlink(@ptrCast(&buf)); 62 + } 63 + 55 64 pub fn open(self: *LocalDb) !void { 56 - const path_env = posix.getenv("LOCAL_DB_PATH") orelse "/data/local.db"; 65 + const path_env = compat.getenv("LOCAL_DB_PATH") orelse "/data/local.db"; 57 66 self.path = path_env; 58 67 59 68 try self.openDb(path_env, false);
+3 -3
backend/src/db/result.zig
··· 32 32 return .{ .allocator = allocator, .parsed = parsed, .rows = &.{} }; 33 33 }; 34 34 35 - var rows: std.ArrayList(Row) = .{}; 35 + var rows: std.ArrayList(Row) = .empty; 36 36 errdefer rows.deinit(allocator); 37 37 38 38 for (json_rows.items) |item| { ··· 87 87 return .{ .allocator = allocator, .parsed = parsed, .results = &.{} }; 88 88 } 89 89 90 - var all_results: std.ArrayList([]const Row) = .{}; 90 + var all_results: std.ArrayList([]const Row) = .empty; 91 91 errdefer { 92 92 for (all_results.items) |rows| allocator.free(rows); 93 93 all_results.deinit(allocator); ··· 100 100 const item = turso_results.array.items[i]; 101 101 const json_rows = getRowsFromResult(item); 102 102 103 - var rows: std.ArrayList(Row) = .{}; 103 + var rows: std.ArrayList(Row) = .empty; 104 104 if (json_rows) |jr| { 105 105 for (jr.items) |row_item| { 106 106 if (row_item == .array) {
+2 -2
backend/src/db/schema.zig
··· 1 1 const std = @import("std"); 2 2 const Client = @import("Client.zig"); 3 + const compat = @import("../compat.zig"); 3 4 4 5 /// Initialize database schema and run migrations 5 6 pub fn init(client: *Client) !void { ··· 77 78 78 79 // set service_started_at if not already set (first run ever) 79 80 var ts_buf: [20]u8 = undefined; 80 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 81 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 81 82 client.exec( 82 83 "UPDATE stats SET service_started_at = ? WHERE id = 1 AND service_started_at IS NULL", 83 84 &.{ts_str}, ··· 99 100 \\ deleted_at INTEGER NOT NULL 100 101 \\) 101 102 , &.{}); 102 - 103 103 } 104 104 105 105 fn runMigrations(client: *Client) !void {
+15 -14
backend/src/db/sync.zig
··· 6 6 const Allocator = std.mem.Allocator; 7 7 const Client = @import("Client.zig"); 8 8 const LocalDb = @import("LocalDb.zig"); 9 + const compat = @import("../compat.zig"); 9 10 10 11 const BATCH_SIZE = 500; 11 12 ··· 190 191 local.lock(); 191 192 defer local.unlock(); 192 193 var ts_buf: [20]u8 = undefined; 193 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 194 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 194 195 conn.exec( 195 196 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 196 197 .{ts_str}, ··· 295 296 296 297 // update sync time 297 298 var ts_buf: [20]u8 = undefined; 298 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 299 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 299 300 conn.exec( 300 301 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 301 302 .{ts_str}, ··· 354 355 \\ platform, source_collection, path, base_path, has_publication, indexed_at, embedded_at, cover_image, is_bridgyfed) 355 356 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 356 357 , .{ 357 - row.text(0), // uri 358 - row.text(1), // did 359 - row.text(2), // rkey 360 - row.text(3), // title 361 - row.text(4), // content 362 - row.text(5), // created_at 363 - row.text(6), // publication_uri 364 - row.text(7), // platform 365 - row.text(8), // source_collection 366 - row.text(9), // path 358 + row.text(0), // uri 359 + row.text(1), // did 360 + row.text(2), // rkey 361 + row.text(3), // title 362 + row.text(4), // content 363 + row.text(5), // created_at 364 + row.text(6), // publication_uri 365 + row.text(7), // platform 366 + row.text(8), // source_collection 367 + row.text(9), // path 367 368 row.text(10), // base_path 368 - row.int(11), // has_publication 369 + row.int(11), // has_publication 369 370 row.text(12), // indexed_at 370 371 row.text(13), // embedded_at 371 372 row.text(14), // cover_image 372 - row.int(15), // is_bridgyfed 373 + row.int(15), // is_bridgyfed 373 374 }) catch |err| { 374 375 return err; 375 376 };
+8 -9
backend/src/ingest/embedder.zig
··· 7 7 const http = std.http; 8 8 const json = std.json; 9 9 const mem = std.mem; 10 - const posix = std.posix; 11 10 const Allocator = mem.Allocator; 12 11 const logfire = @import("logfire"); 13 12 const zql = @import("zql"); 14 13 const db = @import("../db.zig"); 15 14 const tpuf = @import("../tpuf.zig"); 15 + const compat = @import("../compat.zig"); 16 16 17 17 // voyage-4-lite limits 18 18 const MAX_BATCH_SIZE = 20; // conservative batch size for reliability ··· 34 34 35 35 /// Start the embedder background worker 36 36 pub fn start(allocator: Allocator) void { 37 - const api_key = posix.getenv("VOYAGE_API_KEY") orelse { 37 + const api_key = compat.getenv("VOYAGE_API_KEY") orelse { 38 38 logfire.info("embedder: VOYAGE_API_KEY not set, embeddings disabled", .{}); 39 39 return; 40 40 }; ··· 49 49 50 50 fn worker(allocator: Allocator, api_key: []const u8) void { 51 51 // wait for db to be ready 52 - std.Thread.sleep(5 * std.time.ns_per_s); 52 + compat.sleep(5 * std.time.ns_per_s); 53 53 54 54 var consecutive_errors: u32 = 0; 55 55 ··· 58 58 consecutive_errors += 1; 59 59 const backoff: u64 = @min(ERROR_BACKOFF_SECS * consecutive_errors, 3600); 60 60 logfire.warn("embedder: error {}, backing off {d}s", .{ err, backoff }); 61 - std.Thread.sleep(backoff * std.time.ns_per_s); 61 + compat.sleep(backoff * std.time.ns_per_s); 62 62 continue; 63 63 }; 64 64 ··· 71 71 72 72 // no work, sleep 73 73 consecutive_errors = 0; 74 - std.Thread.sleep(POLL_INTERVAL_SECS * std.time.ns_per_s); 74 + compat.sleep(POLL_INTERVAL_SECS * std.time.ns_per_s); 75 75 } 76 76 } 77 77 ··· 176 176 177 177 // mark docs as embedded in turso 178 178 // generate timestamp in Zig (avoids any strftime/pipeline API quirks) 179 - const now_ts = std.time.timestamp(); 179 + const now_ts = compat.timestamp(); 180 180 const epoch_secs: u64 = @intCast(now_ts); 181 181 const epoch = std.time.epoch.EpochSeconds{ .secs = epoch_secs }; 182 182 const day = epoch.getDaySeconds(); ··· 184 184 const md = yd.calculateMonthDay(); 185 185 var ts_buf: [20]u8 = undefined; 186 186 const ts = std.fmt.bufPrint(&ts_buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}", .{ 187 - yd.year, md.month.numeric(), @as(u32, md.day_index) + 1, 187 + yd.year, md.month.numeric(), @as(u32, md.day_index) + 1, 188 188 day.getHoursIntoDay(), day.getMinutesIntoHour(), day.getSecondsIntoMinute(), 189 189 }) catch "1970-01-01T00:00:00"; 190 190 ··· 247 247 }); 248 248 defer span.end(); 249 249 250 - var http_client: http.Client = .{ .allocator = allocator }; 250 + var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 251 251 defer http_client.deinit(); 252 252 253 253 // build request body ··· 363 363 364 364 return embeddings; 365 365 } 366 -
+1 -2
backend/src/ingest/extractor.zig
··· 26 26 pub fn name(self: Platform) []const u8 { 27 27 return @tagName(self); 28 28 } 29 - 30 29 }; 31 30 32 31 /// Extracted document data ready for indexing. ··· 151 150 } 152 151 153 152 fn extractContent(allocator: Allocator, record: json.Value) ![]u8 { 154 - var buf: std.ArrayList(u8) = .{}; 153 + var buf: std.ArrayList(u8) = .empty; 155 154 errdefer buf.deinit(allocator); 156 155 157 156 // try textContent first (site.standard.document has this pre-flattened)
+3 -2
backend/src/ingest/indexer.zig
··· 1 1 const std = @import("std"); 2 2 const logfire = @import("logfire"); 3 3 const db = @import("../db.zig"); 4 + const compat = @import("../compat.zig"); 4 5 5 6 /// Hash title+content for cross-platform dedup. 6 7 /// Returns a 16-char hex string (wyhash of "title\x00content"). ··· 288 289 289 290 // record tombstone 290 291 var ts_buf: [20]u8 = undefined; 291 - const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 292 + const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 292 293 c.exec( 293 294 "INSERT OR REPLACE INTO tombstones (uri, record_type, deleted_at) VALUES (?, 'document', ?)", 294 295 &.{ uri, ts }, ··· 304 305 305 306 // record tombstone 306 307 var ts_buf: [20]u8 = undefined; 307 - const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 308 + const ts = std.fmt.bufPrint(&ts_buf, "{d}", .{compat.timestamp()}) catch "0"; 308 309 c.exec( 309 310 "INSERT OR REPLACE INTO tombstones (uri, record_type, deleted_at) VALUES (?, 'publication', ?)", 310 311 &.{ uri, ts },
+13 -13
backend/src/ingest/reconciler.zig
··· 12 12 const http = std.http; 13 13 const json = std.json; 14 14 const mem = std.mem; 15 - const posix = std.posix; 16 15 const Allocator = mem.Allocator; 17 16 const logfire = @import("logfire"); 17 + const compat = @import("../compat.zig"); 18 18 const db = @import("../db.zig"); 19 19 const tpuf = @import("../tpuf.zig"); 20 20 const indexer = @import("indexer.zig"); 21 21 22 22 // config (env vars with defaults) 23 23 fn getIntervalSecs() u64 { 24 - const val = posix.getenv("RECONCILE_INTERVAL_SECS") orelse "1800"; 24 + const val = compat.getenv("RECONCILE_INTERVAL_SECS") orelse "1800"; 25 25 return std.fmt.parseInt(u64, val, 10) catch 1800; 26 26 } 27 27 28 28 fn getBatchSize() usize { 29 - const val = posix.getenv("RECONCILE_BATCH_SIZE") orelse "50"; 29 + const val = compat.getenv("RECONCILE_BATCH_SIZE") orelse "50"; 30 30 return std.fmt.parseInt(usize, val, 10) catch 50; 31 31 } 32 32 33 33 fn getReverifyDays() u64 { 34 - const val = posix.getenv("RECONCILE_REVERIFY_DAYS") orelse "7"; 34 + const val = compat.getenv("RECONCILE_REVERIFY_DAYS") orelse "7"; 35 35 return std.fmt.parseInt(u64, val, 10) catch 7; 36 36 } 37 37 38 38 fn isEnabled() bool { 39 - const val = posix.getenv("RECONCILE_ENABLED") orelse "true"; 39 + const val = compat.getenv("RECONCILE_ENABLED") orelse "true"; 40 40 return !mem.eql(u8, val, "false") and !mem.eql(u8, val, "0"); 41 41 } 42 42 ··· 81 81 82 82 fn worker(allocator: Allocator) void { 83 83 // wait for db to be ready 84 - std.Thread.sleep(10 * std.time.ns_per_s); 84 + compat.sleep(10 * std.time.ns_per_s); 85 85 86 86 // PDS cache: DID → PDS endpoint URL (persists across cycles) 87 87 var pds_cache = std.StringHashMap([]const u8).init(allocator); ··· 113 113 @min(interval * consecutive_errors, 3600) 114 114 else 115 115 interval; 116 - std.Thread.sleep(backoff * std.time.ns_per_s); 116 + compat.sleep(backoff * std.time.ns_per_s); 117 117 } 118 118 } 119 119 ··· 136 136 var batch_str: [10]u8 = undefined; 137 137 const batch_str_val = std.fmt.bufPrint(&batch_str, "{d}", .{batch_size}) catch "50"; 138 138 139 - const cutoff_ts = formatTimestamp(std.time.timestamp() - @as(i64, @intCast(reverify_days * 86400))); 139 + const cutoff_ts = formatTimestamp(compat.timestamp() - @as(i64, @intCast(reverify_days * 86400))); 140 140 const cutoff = cutoff_ts.slice(); 141 141 142 142 var result = try client.query( ··· 220 220 } 221 221 222 222 // rate limit: 200ms between PDS requests 223 - std.Thread.sleep(200 * std.time.ns_per_ms); 223 + compat.sleep(200 * std.time.ns_per_ms); 224 224 } 225 225 226 226 // batch delete from tpuf ··· 245 245 } 246 246 247 247 fn updateVerifiedAt(client: *db.Client, uri: []const u8) void { 248 - const now = formatTimestamp(std.time.timestamp()); 248 + const now = formatTimestamp(compat.timestamp()); 249 249 client.exec( 250 250 "UPDATE documents SET verified_at = ? WHERE uri = ?", 251 251 &.{ now.slice(), uri }, ··· 272 272 const md = yd.calculateMonthDay(); 273 273 var result: TimestampBuf = undefined; 274 274 const formatted = std.fmt.bufPrint(&result.buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}", .{ 275 - yd.year, md.month.numeric(), @as(u32, md.day_index) + 1, 275 + yd.year, md.month.numeric(), @as(u32, md.day_index) + 1, 276 276 day.getHoursIntoDay(), day.getMinutesIntoHour(), day.getSecondsIntoMinute(), 277 277 }) catch { 278 278 // fallback: epoch (will cause re-verify, which is safe) ··· 294 294 return .error_skip; 295 295 }; 296 296 297 - var http_client: http.Client = .{ .allocator = allocator }; 297 + var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 298 298 defer http_client.deinit(); 299 299 300 300 var response_body: std.Io.Writer.Allocating = .init(allocator); ··· 336 336 var url_buf: [256]u8 = undefined; 337 337 const url = std.fmt.bufPrint(&url_buf, "https://plc.directory/{s}", .{did}) catch return null; 338 338 339 - var http_client: http.Client = .{ .allocator = allocator }; 339 + var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 340 340 defer http_client.deinit(); 341 341 342 342 var response_body: std.Io.Writer.Allocating = .init(allocator);
+7 -7
backend/src/ingest/tap.zig
··· 1 1 const std = @import("std"); 2 2 const mem = std.mem; 3 3 const json = std.json; 4 - const posix = std.posix; 5 4 const Allocator = mem.Allocator; 6 5 const websocket = @import("websocket"); 7 6 const zat = @import("zat"); ··· 9 8 const indexer = @import("indexer.zig"); 10 9 const extractor = @import("extractor.zig"); 11 10 const tpuf = @import("../tpuf.zig"); 11 + const compat = @import("../compat.zig"); 12 12 13 13 // leaflet-specific collections 14 14 const LEAFLET_DOCUMENT = "pub.leaflet.document"; ··· 33 33 } 34 34 35 35 fn getTapHost() []const u8 { 36 - return posix.getenv("TAP_HOST") orelse "leaflet-search-tap.fly.dev"; 36 + return compat.getenv("TAP_HOST") orelse "leaflet-search-tap.fly.dev"; 37 37 } 38 38 39 39 fn getTapPort() u16 { 40 - const port_str = posix.getenv("TAP_PORT") orelse "443"; 40 + const port_str = compat.getenv("TAP_PORT") orelse "443"; 41 41 return std.fmt.parseInt(u16, port_str, 10) catch 443; 42 42 } 43 43 ··· 51 51 const QUEUE_CAPACITY = 256; 52 52 53 53 const ProcessQueue = struct { 54 - mutex: std.Thread.Mutex = .{}, 55 - cond: std.Thread.Condition = .{}, 54 + mutex: compat.Mutex = .{}, 55 + cond: compat.Condition = .{}, 56 56 items: [QUEUE_CAPACITY]?[]u8 = .{null} ** QUEUE_CAPACITY, 57 57 head: usize = 0, 58 58 tail: usize = 0, ··· 148 148 } else |err| { 149 149 // connection failed - backoff 150 150 logfire.warn("tap error: {}, reconnecting in {d}s", .{ err, backoff }); 151 - posix.nanosleep(backoff, 0); 151 + compat.sleepSecs(backoff); 152 152 backoff = @min(backoff * 2, max_backoff); 153 153 } 154 154 } ··· 226 226 227 227 logfire.info("connecting to {s}://{s}:{d}{s}", .{ if (tls) "wss" else "ws", host, port, path }); 228 228 229 - var client = websocket.Client.init(allocator, .{ 229 + var client = websocket.Client.init(compat.getIo(), allocator, .{ 230 230 .host = host, 231 231 .port = port, 232 232 .tls = tls,
+36 -32
backend/src/main.zig
··· 1 1 const std = @import("std"); 2 - const net = std.net; 3 - const posix = std.posix; 2 + const Io = std.Io; 4 3 const Thread = std.Thread; 5 4 const logfire = @import("logfire"); 6 5 const db = @import("db.zig"); ··· 8 7 const metrics = @import("metrics.zig"); 9 8 const server = @import("server.zig"); 10 9 const ingest = @import("ingest.zig"); 10 + const compat = @import("compat.zig"); 11 11 12 - const MAX_HTTP_WORKERS = 16; 13 12 const SOCKET_TIMEOUT_SECS = 5; 13 + 14 + // multi-threaded debug_io — required for safe std.debug.print from worker threads 15 + var threaded_io: Io.Threaded = undefined; 16 + pub const std_options_debug_threaded_io: ?*Io.Threaded = &threaded_io; 14 17 15 18 pub fn main() !void { 16 - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 17 - defer _ = gpa.deinit(); 18 - const allocator = gpa.allocator(); 19 + const allocator = std.heap.smp_allocator; 20 + 21 + // init Io backend for networking 22 + threaded_io = Io.Threaded.init(allocator, .{}); 23 + const io = threaded_io.io(); 24 + compat.initIo(io); 19 25 20 26 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 21 27 _ = logfire.configure(.{ 22 28 .service_name = "leaflet-search", 23 29 .service_version = "0.1.0", 24 - .environment = posix.getenv("FLY_APP_NAME") orelse "development", 30 + .environment = compat.getenv("FLY_APP_NAME") orelse "development", 25 31 }) catch |err| { 26 32 std.debug.print("logfire init failed: {}, continuing without observability\n", .{err}); 27 33 }; 28 34 29 35 // start http server FIRST so Fly proxy doesn't timeout 30 36 const port: u16 = blk: { 31 - const port_str = posix.getenv("PORT") orelse "3000"; 37 + const port_str = compat.getenv("PORT") orelse "3000"; 32 38 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000; 33 39 }; 34 40 35 - const address = try net.Address.parseIp("0.0.0.0", port); 36 - var listener = try address.listen(.{ .reuse_address = true }); 37 - defer listener.deinit(); 41 + const address = Io.net.Ip4Address.unspecified(port); 42 + var listener = (Io.net.IpAddress{ .ip4 = address }).listen(io, .{ .reuse_address = true }) catch |err| { 43 + logfire.err("failed to listen on port {d}: {}", .{ port, err }); 44 + return err; 45 + }; 46 + defer listener.deinit(io); 38 47 39 - const app_name = posix.getenv("APP_NAME") orelse "leaflet-search"; 40 - logfire.info("{s} listening on port {d} (max {d} workers)", .{ app_name, port, MAX_HTTP_WORKERS }); 48 + const app_name = compat.getenv("APP_NAME") orelse "leaflet-search"; 49 + logfire.info("{s} listening on port {d}", .{ app_name, port }); 41 50 42 51 // init turso client synchronously (fast, needed for search fallback) 43 - try db.initTurso(); 44 - 45 - // init thread pool for http connections 46 - var pool: Thread.Pool = undefined; 47 - try pool.init(.{ 48 - .allocator = allocator, 49 - .n_jobs = MAX_HTTP_WORKERS, 50 - }); 51 - defer pool.deinit(); 52 + try db.initTurso(io); 52 53 53 54 // init local db and other services in background (slow) 54 55 const init_thread = try Thread.spawn(.{}, initServices, .{allocator}); 55 56 init_thread.detach(); 56 57 58 + // thread-per-connection (Thread.Pool removed in 0.16) 57 59 while (true) { 58 - const conn = listener.accept() catch |err| { 60 + const stream = listener.accept(io) catch |err| { 59 61 logfire.err("accept error: {}", .{err}); 60 62 continue; 61 63 }; 62 64 63 - setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 65 + setSocketTimeout(stream.socket.handle, SOCKET_TIMEOUT_SECS) catch |err| { 64 66 logfire.warn("failed to set socket timeout: {}", .{err}); 65 67 }; 66 68 67 - const accepted_at = std.time.microTimestamp(); 68 - pool.spawn(server.handleConnection, .{ conn, accepted_at }) catch |err| { 69 - logfire.err("pool spawn error: {}", .{err}); 70 - conn.stream.close(); 69 + const accepted_at = compat.microTimestamp(); 70 + const thread = Thread.spawn(.{}, server.handleConnection, .{ stream, io, accepted_at }) catch |err| { 71 + logfire.err("spawn error: {}", .{err}); 72 + stream.close(io); 73 + continue; 71 74 }; 75 + thread.detach(); 72 76 } 73 77 } 74 78 ··· 103 107 ingest.tap.consumer(allocator); 104 108 } 105 109 106 - fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 107 - const timeout = std.mem.toBytes(posix.timeval{ 110 + fn setSocketTimeout(fd: std.posix.fd_t, secs: u32) !void { 111 + const timeout = std.mem.toBytes(std.posix.timeval{ 108 112 .sec = @intCast(secs), 109 113 .usec = 0, 110 114 }); 111 - try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 112 - try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 115 + try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout); 116 + try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout); 113 117 }
+3 -2
backend/src/metrics/activity.zig
··· 1 1 const std = @import("std"); 2 + const compat = @import("../compat.zig"); 2 3 3 4 // ring buffer for real-time search activity 4 5 pub const SLOTS = 60; ··· 6 7 7 8 var counts: [SLOTS]u16 = .{0} ** SLOTS; 8 9 var slot: usize = 0; 9 - var mutex: std.Thread.Mutex = .{}; 10 + var mutex: compat.Mutex = .{}; 10 11 var thread: ?std.Thread = null; 11 12 12 13 fn tickLoop() void { 13 14 while (true) { 14 - std.Thread.sleep(TICK_MS * std.time.ns_per_ms); 15 + compat.sleepMs(TICK_MS); 15 16 mutex.lock(); 16 17 slot = (slot + 1) % SLOTS; 17 18 counts[slot] = 0;
+7 -6
backend/src/metrics/buffer.zig
··· 4 4 const std = @import("std"); 5 5 const db = @import("../db.zig"); 6 6 const logfire = @import("logfire"); 7 + const compat = @import("../compat.zig"); 7 8 8 9 const SYNC_INTERVAL_MS = 5000; // 5 seconds 9 10 const MAX_PENDING_SEARCHES = 256; ··· 26 27 var pending_searches: [MAX_PENDING_SEARCHES]?[]const u8 = .{null} ** MAX_PENDING_SEARCHES; 27 28 var search_write_idx: usize = 0; 28 29 var search_read_idx: usize = 0; 29 - var search_mutex: std.Thread.Mutex = .{}; 30 + var search_mutex: compat.Mutex = .{}; 30 31 31 32 // allocator for search string copies 32 - var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 33 + const search_allocator = std.heap.smp_allocator; 33 34 34 35 var sync_thread: ?std.Thread = null; 35 36 ··· 77 78 if (next_write == search_read_idx) { 78 79 // buffer full, drop oldest 79 80 if (pending_searches[search_read_idx]) |old| { 80 - gpa.allocator().free(old); 81 + search_allocator.free(old); 81 82 pending_searches[search_read_idx] = null; 82 83 } 83 84 search_read_idx = (search_read_idx + 1) % MAX_PENDING_SEARCHES; 84 85 } 85 86 86 87 // allocate copy 87 - const copy = gpa.allocator().dupe(u8, query) catch return; 88 + const copy = search_allocator.dupe(u8, query) catch return; 88 89 pending_searches[search_write_idx] = copy; 89 90 search_write_idx = next_write; 90 91 } ··· 128 129 129 130 fn syncLoop() void { 130 131 while (true) { 131 - std.Thread.sleep(SYNC_INTERVAL_MS * std.time.ns_per_ms); 132 + compat.sleepMs(SYNC_INTERVAL_MS); 132 133 syncToTurso(); 133 134 } 134 135 } ··· 226 227 ) catch {}; 227 228 228 229 // free and clear 229 - gpa.allocator().free(query); 230 + search_allocator.free(query); 230 231 pending_searches[search_read_idx] = null; 231 232 synced += 1; 232 233 }
+49 -18
backend/src/metrics/timing.zig
··· 1 1 const std = @import("std"); 2 + const compat = @import("../compat.zig"); 2 3 3 4 /// endpoints we track latency for 4 5 pub const Endpoint = enum { ··· 77 78 78 79 var buffers: [ENDPOINT_COUNT]LatencyBuffer = [_]LatencyBuffer{.{}} ** ENDPOINT_COUNT; 79 80 var hourly: [ENDPOINT_COUNT][HOURS_TO_KEEP]HourlyBucket = [_][HOURS_TO_KEEP]HourlyBucket{[_]HourlyBucket{.{}} ** HOURS_TO_KEEP} ** ENDPOINT_COUNT; 80 - var mutex: std.Thread.Mutex = .{}; 81 + var mutex: compat.Mutex = .{}; 81 82 var initialized: bool = false; 82 83 83 84 fn getCurrentHour() i64 { 84 - const now_s = @divFloor(std.time.timestamp(), 3600) * 3600; 85 + const now_s = @divFloor(compat.timestamp(), 3600) * 3600; 85 86 return now_s; 86 87 } 87 88 ··· 92 93 93 94 /// record a request latency (call after request completes) 94 95 pub fn record(endpoint: Endpoint, start_time: i64) void { 95 - const now = std.time.microTimestamp(); 96 + const now = compat.microTimestamp(); 96 97 const elapsed_us: u32 = @intCast(@max(0, now - start_time)); 97 98 const current_hour = getCurrentHour(); 98 99 const hour_idx = getHourIndex(current_hour); ··· 112 113 } 113 114 114 115 fn loadLocked() void { 115 - const file = std.fs.openFileAbsolute(PERSIST_PATH, .{}) catch return; 116 - defer file.close(); 116 + const fd = openForRead(PERSIST_PATH) orelse return; 117 + defer _ = std.c.close(fd); 117 118 118 119 // read entire file at once (small file, ~16KB per endpoint) 119 120 var file_buf: [ENDPOINT_COUNT * (@sizeOf([SAMPLE_COUNT]u32) + @sizeOf(usize) * 2 + @sizeOf(u64))]u8 = undefined; 120 - const bytes_read = file.readAll(&file_buf) catch return; 121 + const bytes_read = readAll(fd, &file_buf) orelse return; 121 122 if (bytes_read != file_buf.len) return; // incomplete file 122 123 123 124 var offset: usize = 0; ··· 138 139 } 139 140 140 141 fn persistLocked() void { 141 - const file = std.fs.createFileAbsolute(PERSIST_PATH, .{}) catch return; 142 - defer file.close(); 142 + const fd = openForWrite(PERSIST_PATH) orelse return; 143 + defer _ = std.c.close(fd); 143 144 144 145 // write all buffers 145 146 for (buffers) |buf| { 146 - file.writeAll(std.mem.asBytes(&buf.samples)) catch return; 147 - file.writeAll(std.mem.asBytes(&buf.count)) catch return; 148 - file.writeAll(std.mem.asBytes(&buf.head)) catch return; 149 - file.writeAll(std.mem.asBytes(&buf.total_count)) catch return; 147 + writeAll(fd, std.mem.asBytes(&buf.samples)); 148 + writeAll(fd, std.mem.asBytes(&buf.count)); 149 + writeAll(fd, std.mem.asBytes(&buf.head)); 150 + writeAll(fd, std.mem.asBytes(&buf.total_count)); 150 151 } 151 152 } 152 153 153 154 fn loadHourlyLocked() void { 154 - const file = std.fs.openFileAbsolute(PERSIST_PATH_HOURLY, .{}) catch return; 155 - defer file.close(); 155 + const fd = openForRead(PERSIST_PATH_HOURLY) orelse return; 156 + defer _ = std.c.close(fd); 156 157 157 158 const bucket_size = @sizeOf(HourlyBucket); 158 159 const total_size = ENDPOINT_COUNT * HOURS_TO_KEEP * bucket_size; 159 160 var file_buf: [total_size]u8 = undefined; 160 - const bytes_read = file.readAll(&file_buf) catch return; 161 + const bytes_read = readAll(fd, &file_buf) orelse return; 161 162 if (bytes_read != total_size) return; 162 163 163 164 var offset: usize = 0; ··· 170 171 } 171 172 172 173 fn persistHourlyLocked() void { 173 - const file = std.fs.createFileAbsolute(PERSIST_PATH_HOURLY, .{}) catch return; 174 - defer file.close(); 174 + const fd = openForWrite(PERSIST_PATH_HOURLY) orelse return; 175 + defer _ = std.c.close(fd); 175 176 176 177 for (hourly) |ep_buckets| { 177 178 for (ep_buckets) |bucket| { 178 - file.writeAll(std.mem.asBytes(&bucket)) catch return; 179 + writeAll(fd, std.mem.asBytes(&bucket)); 179 180 } 181 + } 182 + } 183 + 184 + // C file helpers (std.fs removed in 0.16) 185 + fn openForRead(path: [*:0]const u8) ?std.c.fd_t { 186 + const fd = std.c.open(path, .{ .ACCMODE = .RDONLY }, @as(std.c.mode_t, 0)); 187 + return if (fd < 0) null else fd; 188 + } 189 + 190 + fn openForWrite(path: [*:0]const u8) ?std.c.fd_t { 191 + const fd = std.c.open(path, .{ .ACCMODE = .WRONLY, .CREAT = true, .TRUNC = true }, @as(std.c.mode_t, 0o644)); 192 + return if (fd < 0) null else fd; 193 + } 194 + 195 + fn readAll(fd: std.c.fd_t, buf: []u8) ?usize { 196 + var total: usize = 0; 197 + while (total < buf.len) { 198 + const n = std.c.read(fd, buf[total..].ptr, buf.len - total); 199 + if (n <= 0) break; 200 + total += @intCast(n); 201 + } 202 + return total; 203 + } 204 + 205 + fn writeAll(fd: std.c.fd_t, data: []const u8) void { 206 + var total: usize = 0; 207 + while (total < data.len) { 208 + const n = std.c.write(fd, data[total..].ptr, data.len - total); 209 + if (n <= 0) return; 210 + total += @intCast(n); 180 211 } 181 212 } 182 213
+35 -30
backend/src/server.zig
··· 1 1 const std = @import("std"); 2 - const net = std.net; 2 + const Io = std.Io; 3 3 const http = std.http; 4 4 const mem = std.mem; 5 5 const json = std.json; ··· 7 7 const logfire = @import("logfire"); 8 8 const zql = @import("zql"); 9 9 const zat = @import("zat"); 10 + const compat = @import("compat.zig"); 10 11 const db = @import("db.zig"); 11 12 const metrics = @import("metrics.zig"); 12 13 const search = @import("server/search.zig"); ··· 15 16 const HTTP_BUF_SIZE = 65536; 16 17 const QUERY_PARAM_BUF_SIZE = 64; 17 18 18 - pub fn handleConnection(conn: net.Server.Connection, accepted_at: i64) void { 19 - defer conn.stream.close(); 19 + pub fn handleConnection(stream: Io.net.Stream, io: Io, accepted_at: i64) void { 20 + defer stream.close(io); 20 21 21 - const queue_us = std.time.microTimestamp() - accepted_at; 22 + const queue_us = compat.microTimestamp() - accepted_at; 22 23 if (queue_us > 100_000) { // > 100ms 23 24 logfire.warn("http.queue slow: {d}ms", .{@divTrunc(queue_us, 1000)}); 24 25 } ··· 26 27 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined; 27 28 var write_buffer: [HTTP_BUF_SIZE]u8 = undefined; 28 29 29 - var reader = conn.stream.reader(&read_buffer); 30 - var writer = conn.stream.writer(&write_buffer); 30 + var reader = stream.reader(io, &read_buffer); 31 + var writer = stream.writer(io, &write_buffer); 31 32 32 - var server = http.Server.init(reader.interface(), &writer.interface); 33 + var server = http.Server.init(&reader.interface, &writer.interface); 33 34 34 35 while (true) { 35 - const recv_start = std.time.microTimestamp(); 36 + const recv_start = compat.microTimestamp(); 36 37 var request = server.receiveHead() catch |err| { 37 38 if (err != error.HttpConnectionClosing and err != error.EndOfStream) { 38 39 logfire.debug("http receive error: {}", .{err}); 39 40 } 40 41 return; 41 42 }; 42 - const recv_us = std.time.microTimestamp() - recv_start; 43 + const recv_us = compat.microTimestamp() - recv_start; 43 44 const target = request.head.target; 44 45 45 46 const req_span = logfire.span("http.request", .{ ··· 48 49 .receive_ms = @divTrunc(recv_us, 1000), 49 50 }); 50 51 51 - handleRequest(&server, &request) catch |err| { 52 + handleRequest(&server, &request, io) catch |err| { 52 53 logfire.err("request error: {}", .{err}); 53 54 req_span.end(); 54 55 return; ··· 59 60 } 60 61 } 61 62 62 - fn handleRequest(server: *http.Server, request: *http.Server.Request) !void { 63 + fn handleRequest(server: *http.Server, request: *http.Server.Request, io: Io) !void { 63 64 _ = server; 64 65 const target = request.head.target; 65 66 ··· 73 74 const path = if (mem.indexOf(u8, target, "?")) |qi| target[0..qi] else target; 74 75 75 76 if (mem.startsWith(u8, path, "/search")) { 76 - try handleSearch(request, target); 77 + try handleSearch(request, target, io); 77 78 } else if (mem.eql(u8, path, "/tags")) { 78 79 try handleTags(request, target); 79 80 } else if (mem.eql(u8, path, "/stats")) { ··· 95 96 } 96 97 } 97 98 98 - fn handleSearch(request: *http.Server.Request, target: []const u8) !void { 99 - const start_time = std.time.microTimestamp(); 99 + fn handleSearch(request: *http.Server.Request, target: []const u8, io: Io) !void { 100 + const start_time = compat.microTimestamp(); 100 101 101 102 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 102 103 defer arena.deinit(); ··· 118 119 // resolve author param: if it's a handle (not a DID), resolve via AT Protocol 119 120 const author_filter: ?[]const u8 = if (author_param) |ap| blk: { 120 121 if (mem.startsWith(u8, ap, "did:")) break :blk ap; 121 - break :blk resolveHandle(alloc, ap) catch null; 122 + break :blk resolveHandle(alloc, ap, io) catch null; 122 123 } else null; 123 124 124 125 // record per-mode latency ··· 168 169 } 169 170 170 171 fn handleTags(request: *http.Server.Request, target: []const u8) !void { 171 - const start_time = std.time.microTimestamp(); 172 + const start_time = compat.microTimestamp(); 172 173 defer metrics.timing.record(.tags, start_time); 173 174 174 175 const span = logfire.span("http.tags", .{}); ··· 190 191 } 191 192 192 193 fn handlePopular(request: *http.Server.Request, target: []const u8) !void { 193 - const start_time = std.time.microTimestamp(); 194 + const start_time = compat.microTimestamp(); 194 195 defer metrics.timing.record(.popular, start_time); 195 196 196 197 const span = logfire.span("http.popular", .{}); ··· 447 448 } 448 449 449 450 fn getDashboardUrl() []const u8 { 450 - return std.posix.getenv("DASHBOARD_URL") orelse "https://pub-search.waow.tech/dashboard.html"; 451 + return compat.getenv("DASHBOARD_URL") orelse "https://pub-search.waow.tech/dashboard.html"; 451 452 } 452 453 453 454 fn handleDashboard(request: *http.Server.Request) !void { ··· 461 462 } 462 463 463 464 fn handleSimilar(request: *http.Server.Request, target: []const u8) !void { 464 - const start_time = std.time.microTimestamp(); 465 + const start_time = compat.microTimestamp(); 465 466 defer metrics.timing.record(.similar, start_time); 466 467 467 468 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); ··· 577 578 578 579 /// Resolve an AT Protocol handle to a DID via zat's HandleResolver. 579 580 /// Tries HTTP .well-known first, falls back to DNS-over-HTTPS. 580 - fn resolveHandle(alloc: std.mem.Allocator, handle: []const u8) ![]const u8 { 581 + fn resolveHandle(alloc: std.mem.Allocator, handle: []const u8, io: Io) ![]const u8 { 581 582 const parsed = zat.Handle.parse(handle) orelse { 582 583 logfire.warn("resolveHandle: invalid handle: {s}", .{handle}); 583 584 return error.InvalidHandle; 584 585 }; 585 586 586 - var resolver = zat.HandleResolver.init(alloc); 587 + var resolver = zat.HandleResolver.init(io, alloc); 587 588 defer resolver.deinit(); 588 589 589 590 return resolver.resolve(parsed) catch |err| { ··· 595 596 fn handleActivity(request: *http.Server.Request) !void { 596 597 const counts = metrics.activity.getCounts(); 597 598 598 - // format as JSON array 599 + // format as JSON array manually into buffer 599 600 var buf: [512]u8 = undefined; 600 - var stream = std.io.fixedBufferStream(&buf); 601 - const writer = stream.writer(); 602 - 603 - writer.writeByte('[') catch return; 601 + var pos: usize = 0; 602 + buf[pos] = '['; 603 + pos += 1; 604 604 for (counts, 0..) |c, i| { 605 - if (i > 0) writer.writeByte(',') catch return; 606 - writer.print("{d}", .{c}) catch return; 605 + if (i > 0) { 606 + buf[pos] = ','; 607 + pos += 1; 608 + } 609 + const written = std.fmt.bufPrint(buf[pos..], "{d}", .{c}) catch return; 610 + pos += written.len; 607 611 } 608 - writer.writeByte(']') catch return; 612 + buf[pos] = ']'; 613 + pos += 1; 609 614 610 - try sendJson(request, stream.getWritten()); 615 + try sendJson(request, buf[0..pos]); 611 616 }
+3 -1
backend/src/server/dashboard.zig
··· 28 28 traffic_json: []const u8, 29 29 }; 30 30 31 + const compat = @import("../compat.zig"); 32 + 31 33 fn getRelayUrl() []const u8 { 32 - return std.posix.getenv("TAP_RELAY_URL") orelse "unknown"; 34 + return compat.getenv("TAP_RELAY_URL") orelse "unknown"; 33 35 } 34 36 35 37 // all dashboard queries batched into one request
+8 -2
backend/src/server/search.zig
··· 775 775 while (bp_rows.next()) |row| { 776 776 const doc = Doc.fromLocalRow(row); 777 777 if (author_filter) |af| { 778 - if (!std.mem.eql(u8, doc.did, af)) { bp_count += 1; continue; } 778 + if (!std.mem.eql(u8, doc.did, af)) { 779 + bp_count += 1; 780 + continue; 781 + } 779 782 } 780 783 if (since_filter) |since| { 781 784 if (doc.createdAt.len > 0 and std.mem.order(u8, doc.createdAt, since) == .lt) { ··· 810 813 while (pub_rows.next()) |row| { 811 814 const pub_result = Pub.fromLocalRow(row); 812 815 if (author_filter) |af| { 813 - if (!std.mem.eql(u8, pub_result.did, af)) { pub_count += 1; continue; } 816 + if (!std.mem.eql(u8, pub_result.did, af)) { 817 + pub_count += 1; 818 + continue; 819 + } 814 820 } 815 821 try jw.write(pub_result.toJson(alloc)); 816 822 pub_count += 1;
+7 -7
backend/src/tpuf.zig
··· 12 12 const json = std.json; 13 13 const http = std.http; 14 14 const mem = std.mem; 15 - const posix = std.posix; 16 15 const Allocator = mem.Allocator; 16 + const compat = @import("compat.zig"); 17 17 const logfire = @import("logfire"); 18 18 19 19 const Sha256 = std.crypto.hash.sha2.Sha256; ··· 64 64 65 65 /// Read config from environment. Call once at startup. 66 66 pub fn init() void { 67 - api_key = posix.getenv("TURBOPUFFER_API_KEY"); 68 - if (posix.getenv("TURBOPUFFER_NAMESPACE")) |ns| { 67 + api_key = compat.getenv("TURBOPUFFER_API_KEY"); 68 + if (compat.getenv("TURBOPUFFER_NAMESPACE")) |ns| { 69 69 namespace = ns; 70 70 } 71 71 ··· 86 86 logfire.info("tpuf: TURBOPUFFER_API_KEY not set, vector store disabled", .{}); 87 87 } 88 88 89 - voyage_api_key = posix.getenv("VOYAGE_API_KEY"); 89 + voyage_api_key = compat.getenv("VOYAGE_API_KEY"); 90 90 if (voyage_api_key != null) { 91 91 logfire.info("tpuf: voyage query embedding enabled", .{}); 92 92 } ··· 130 130 defer allocator.free(body); 131 131 132 132 // make request 133 - var http_client: http.Client = .{ .allocator = allocator }; 133 + var http_client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 134 134 defer http_client.deinit(); 135 135 136 136 var auth_buf: [256]u8 = undefined; ··· 553 553 // --- HTTP --- 554 554 555 555 fn doRequest(allocator: Allocator, key: []const u8, url: []const u8, body: []const u8) ![]const u8 { 556 - var client: http.Client = .{ .allocator = allocator }; 556 + var client: http.Client = .{ .allocator = allocator, .io = compat.getIo() }; 557 557 defer client.deinit(); 558 558 559 559 var auth_buf: [256]u8 = undefined; ··· 607 607 // minimal query body: rank by ID, top_k=1, no attributes 608 608 const ping_body = "{\"rank_by\":[\"id\",\"asc\"],\"top_k\":1,\"include_attributes\":[]}"; 609 609 while (true) { 610 - std.Thread.sleep(KEEPALIVE_INTERVAL_NS); 610 + compat.sleep(KEEPALIVE_INTERVAL_NS); 611 611 const key = api_key orelse return; 612 612 const response = doRequest(allocator, key, query_url, ping_body) catch |err| { 613 613 logfire.debug("tpuf: keepalive ping failed: {}", .{err});