atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: harden liveness probe — DB health check, serve on metrics port

_health on port 3000 competed with consumer WebSocket traffic, causing
k8s probe timeouts under load. also returned unconditional 200 without
actually checking database connectivity.

- _health now runs SELECT 1 against postgres, returns 500 on failure
- metrics port (3001) serves /_health alongside /metrics, with routing
- k8s probes move to port 3001 (in relay deploy config)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz bef8dad0 a60ef373

+34 -11
+4
src/api/router.zig
··· 52 52 53 53 fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 54 54 if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 55 + _ = ctx.persist.db.exec("SELECT 1", .{}) catch { 56 + h.respondJson(conn, .internal_server_error, "{\"status\":\"error\",\"msg\":\"database unavailable\"}"); 57 + return; 58 + }; 55 59 h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); 56 60 } else if (std.mem.eql(u8, path, "/_stats")) { 57 61 var stats_buf: [4096]u8 = undefined;
+30 -11
src/main.zig
··· 19 19 //! /admin/hosts/unblock — unblock a host (POST, admin) 20 20 //! /_health, /_stats — health, stats 21 21 //! 22 - //! port 3001 (RELAY_METRICS_PORT): internal metrics only 22 + //! port 3001 (RELAY_METRICS_PORT): internal metrics + health 23 23 //! /metrics — prometheus metrics 24 + //! /_health — liveness probe (DB check) 24 25 25 26 const std = @import("std"); 26 27 const http = std.http; ··· 48 49 stats: *broadcaster.Stats, 49 50 validator: *validator_mod.Validator, 50 51 data_dir: []const u8, 52 + persist: *event_log_mod.DiskPersist, 51 53 52 54 fn run(self: *MetricsServer) void { 53 55 while (!shutdown_flag.load(.acquire)) { ··· 56 58 log.debug("metrics accept error: {s}", .{@errorName(err)}); 57 59 continue; 58 60 }; 59 - handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir); 61 + handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir, self.persist); 60 62 } 61 63 } 62 64 }; 63 65 64 - fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8) void { 66 + fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8, persist: *event_log_mod.DiskPersist) void { 65 67 defer stream.close(); 66 68 67 69 var recv_buf: [4096]u8 = undefined; ··· 71 73 var server = http.Server.init(connection_reader.interface(), &connection_writer.interface); 72 74 73 75 var request = server.receiveHead() catch return; 76 + const path = request.head.target; 74 77 75 - const cache_entries = validator.cacheSize(); 76 - const migration_queue_len = validator.migrationQueueLen(); 78 + if (std.mem.eql(u8, path, "/_health")) { 79 + const db_ok = if (persist.db.exec("SELECT 1", .{})) |_| true else |_| false; 80 + const status: http.Status = if (db_ok) .ok else .internal_server_error; 81 + const body = if (db_ok) "{\"status\":\"ok\"}" else "{\"status\":\"error\",\"msg\":\"database unavailable\"}"; 82 + request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ 83 + .{ .name = "content-type", .value = "application/json" }, 84 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 85 + } }) catch {}; 86 + } else if (std.mem.eql(u8, path, "/metrics")) { 87 + const cache_entries = validator.cacheSize(); 88 + const migration_queue_len = validator.migrationQueueLen(); 77 89 78 - var metrics_buf: [8192]u8 = undefined; 79 - const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, data_dir, &metrics_buf); 80 - request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 81 - .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 82 - .{ .name = "server", .value = "zlay (atproto-relay)" }, 83 - } }) catch {}; 90 + var metrics_buf: [8192]u8 = undefined; 91 + const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, data_dir, &metrics_buf); 92 + request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 93 + .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 94 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 95 + } }) catch {}; 96 + } else { 97 + request.respond("not found", .{ .status = .not_found, .keep_alive = false, .extra_headers = &.{ 98 + .{ .name = "content-type", .value = "text/plain" }, 99 + .{ .name = "server", .value = "zlay (atproto-relay)" }, 100 + } }) catch {}; 101 + } 84 102 } 85 103 86 104 pub fn main() !void { ··· 182 200 .stats = &bc.stats, 183 201 .validator = &val, 184 202 .data_dir = data_dir, 203 + .persist = &dp, 185 204 }; 186 205 const metrics_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, MetricsServer.run, .{&metrics_srv}); 187 206