atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add disk usage prometheus metrics (relay_disk_total_bytes, relay_disk_available_bytes)

disk pressure killed the pod with no warning in metrics. uses statvfs on
the data directory to export filesystem total and available bytes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 66187087 0f30da76

+34 -5
+29 -2
src/broadcaster.zig
··· 543 543 } 544 544 }; 545 545 546 - pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, migration_queue_len: usize, buf: []u8) []const u8 { 546 + pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, migration_queue_len: usize, data_dir: []const u8, buf: []u8) []const u8 { 547 547 const uptime: i64 = std.time.timestamp() - stats.start_time; 548 548 var fbs = std.io.fixedBufferStream(buf); 549 549 const w = fbs.writer(); ··· 621 621 // linux-only process metrics from /proc 622 622 if (comptime builtin.os.tag == .linux) { 623 623 appendProcMetrics(w); 624 + appendDiskMetrics(w, data_dir); 624 625 } 625 626 626 627 return fbs.getWritten(); ··· 671 672 } else |_| {} 672 673 } 673 674 675 + const posix_vfs = @cImport(@cInclude("sys/statvfs.h")); 676 + 677 + fn appendDiskMetrics(w: anytype, data_dir: []const u8) void { 678 + // statvfs needs a null-terminated path 679 + var path_buf: [4096]u8 = undefined; 680 + if (data_dir.len >= path_buf.len) return; 681 + @memcpy(path_buf[0..data_dir.len], data_dir); 682 + path_buf[data_dir.len] = 0; 683 + 684 + var stat: posix_vfs.struct_statvfs = undefined; 685 + if (posix_vfs.statvfs(@ptrCast(&path_buf), &stat) != 0) return; 686 + 687 + const block_size: u64 = stat.f_frsize; 688 + const total = stat.f_blocks * block_size; 689 + const available = stat.f_bavail * block_size; 690 + 691 + std.fmt.format(w, 692 + \\# TYPE relay_disk_total_bytes gauge 693 + \\relay_disk_total_bytes {d} 694 + \\ 695 + \\# TYPE relay_disk_available_bytes gauge 696 + \\relay_disk_available_bytes {d} 697 + \\ 698 + , .{ total, available }) catch {}; 699 + } 700 + 674 701 pub fn formatStatsResponse(stats: *const Stats, buf: []u8) []const u8 { 675 702 return std.fmt.bufPrint(buf, 676 703 \\{{"seq":{d},"relay_seq":{d},"consumers":{d},"connected_inbound":{d},"frames_in":{d},"frames_out":{d},"validated":{d},"failed":{d},"skipped":{d},"decode_errors":{d},"cache_hits":{d},"cache_misses":{d},"slow_consumers":{d},"uptime_seconds":{d}}} ··· 781 808 stats.cache_misses.store(100, .release); 782 809 783 810 var buf: [8192]u8 = undefined; 784 - const output = formatPrometheusMetrics(&stats, 42, 3, &buf); 811 + const output = formatPrometheusMetrics(&stats, 42, 3, "/tmp", &buf); 785 812 786 813 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_received_total 10000") != null); 787 814 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_broadcast_total 9000") != null);
+5 -3
src/main.zig
··· 47 47 server: std.net.Server, 48 48 stats: *broadcaster.Stats, 49 49 validator: *validator_mod.Validator, 50 + data_dir: []const u8, 50 51 51 52 fn run(self: *MetricsServer) void { 52 53 while (!shutdown_flag.load(.acquire)) { ··· 55 56 log.debug("metrics accept error: {s}", .{@errorName(err)}); 56 57 continue; 57 58 }; 58 - handleMetricsConn(conn.stream, self.stats, self.validator); 59 + handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir); 59 60 } 60 61 } 61 62 }; 62 63 63 - fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator) void { 64 + fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8) void { 64 65 defer stream.close(); 65 66 66 67 var recv_buf: [4096]u8 = undefined; ··· 75 76 const migration_queue_len = validator.migrationQueueLen(); 76 77 77 78 var metrics_buf: [8192]u8 = undefined; 78 - const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, &metrics_buf); 79 + const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, data_dir, &metrics_buf); 79 80 request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 80 81 .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 81 82 .{ .name = "server", .value = "zlay (atproto-relay)" }, ··· 180 181 }, 181 182 .stats = &bc.stats, 182 183 .validator = &val, 184 + .data_dir = data_dir, 183 185 }; 184 186 const metrics_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, MetricsServer.run, .{&metrics_srv}); 185 187