atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: add attribution metrics, malloc_trim

4 new prometheus gauges to attribute memory usage:
- relay_history_entries (ring buffer depth)
- relay_evtbuf_entries (pending flush buffer)
- relay_did_cache_entries (DID→UID cache)
- relay_consumer_queue_depth (total consumer send buffers)

malloc_trim(0) in GC loop returns freed glibc arena pages to OS.

mallinfo stays as-is (u32 bitcast, 4 GiB range) — mallinfo2 requires
glibc 2.33 but zig cross-compiles against glibc 2.2.5 by default.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz c8754237 1ec352da

+81 -10
+45 -6
src/broadcaster.zig
··· 480 480 defer self.consumers_mutex.unlock(); 481 481 return self.consumers.items.len; 482 482 } 483 + 484 + /// sum of all consumer send buffer depths (for metrics) 485 + pub fn consumerQueueDepth(self: *Broadcaster) usize { 486 + self.consumers_mutex.lock(); 487 + defer self.consumers_mutex.unlock(); 488 + var total: usize = 0; 489 + for (self.consumers.items) |c| { 490 + c.mutex.lock(); 491 + total += c.buf_len; 492 + c.mutex.unlock(); 493 + } 494 + return total; 495 + } 483 496 }; 484 497 485 498 // --- websocket handler --- ··· 546 559 } 547 560 }; 548 561 549 - pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, migration_queue_len: usize, data_dir: []const u8, buf: []u8) []const u8 { 562 + pub const AttributionMetrics = struct { 563 + history_entries: usize = 0, 564 + evtbuf_entries: usize = 0, 565 + did_cache_entries: usize = 0, 566 + consumer_queue_depth: usize = 0, 567 + }; 568 + 569 + pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, migration_queue_len: usize, attribution: AttributionMetrics, data_dir: []const u8, buf: []u8) []const u8 { 550 570 const uptime: i64 = std.time.timestamp() - stats.start_time; 551 571 var fbs = std.io.fixedBufferStream(buf); 552 572 const w = fbs.writer(); ··· 600 620 \\# TYPE relay_validator_cache_evictions_total counter 601 621 \\relay_validator_cache_evictions_total {d} 602 622 \\ 623 + \\# TYPE relay_history_entries gauge 624 + \\# HELP relay_history_entries in-memory frame history ring buffer entries 625 + \\relay_history_entries {d} 626 + \\ 627 + \\# TYPE relay_evtbuf_entries gauge 628 + \\# HELP relay_evtbuf_entries pending flush buffer entries 629 + \\relay_evtbuf_entries {d} 630 + \\ 631 + \\# TYPE relay_did_cache_entries gauge 632 + \\# HELP relay_did_cache_entries DID-to-UID cache entries 633 + \\relay_did_cache_entries {d} 634 + \\ 635 + \\# TYPE relay_consumer_queue_depth gauge 636 + \\# HELP relay_consumer_queue_depth total frames queued across all consumer send buffers 637 + \\relay_consumer_queue_depth {d} 638 + \\ 603 639 , .{ 604 640 stats.frames_in.load(.acquire), 605 641 stats.frames_out.load(.acquire), ··· 619 655 cache_entries, 620 656 migration_queue_len, 621 657 stats.cache_evictions.load(.acquire), 658 + attribution.history_entries, 659 + attribution.evtbuf_entries, 660 + attribution.did_cache_entries, 661 + attribution.consumer_queue_depth, 622 662 }) catch return fbs.getWritten(); 623 663 624 664 // linux-only process metrics from /proc ··· 684 724 } 685 725 } else |_| {} 686 726 687 - // glibc malloc arena stats — distinguishes in-use heap from fragmentation 688 - // mallinfo returns c_int (i32) fields which overflow at 2 GiB; bitcast to u32 689 - // extends useful range to 4 GiB per field 727 + // glibc malloc arena stats — mallinfo returns c_int (i32) fields which 728 + // overflow at 2 GiB; bitcast to u32 extends useful range to 4 GiB 690 729 const mi = malloc_h.mallinfo(); 691 730 const arena: u64 = @as(u32, @bitCast(mi.arena)); 692 731 const in_use: u64 = @as(u32, @bitCast(mi.uordblks)); ··· 847 886 stats.cache_hits.store(400, .release); 848 887 stats.cache_misses.store(100, .release); 849 888 850 - var buf: [8192]u8 = undefined; 851 - const output = formatPrometheusMetrics(&stats, 42, 3, "/tmp", &buf); 889 + var buf: [12288]u8 = undefined; 890 + const output = formatPrometheusMetrics(&stats, 42, 3, .{}, "/tmp", &buf); 852 891 853 892 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_received_total 10000") != null); 854 893 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_broadcast_total 9000") != null);
+14
src/event_log.zig
··· 105 105 alive: std.atomic.Value(bool) = .{ .raw = true }, 106 106 flush_cond: std.Thread.Condition = .{}, 107 107 108 + /// current evtbuf entry count (for metrics) 109 + pub fn evtbufLen(self: *DiskPersist) usize { 110 + self.mutex.lock(); 111 + defer self.mutex.unlock(); 112 + return self.evtbuf.items.len; 113 + } 114 + 115 + /// current DID cache entry count (for metrics) 116 + pub fn didCacheLen(self: *DiskPersist) usize { 117 + self.did_cache_mutex.lock(); 118 + defer self.did_cache_mutex.unlock(); 119 + return self.did_cache.count(); 120 + } 121 + 108 122 pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8) !DiskPersist { 109 123 // ensure directory exists 110 124 std.fs.cwd().makePath(dir_path) catch |err| switch (err) {
+22 -4
src/main.zig
··· 50 50 validator: *validator_mod.Validator, 51 51 data_dir: []const u8, 52 52 persist: *event_log_mod.DiskPersist, 53 + bc: *broadcaster.Broadcaster, 53 54 54 55 fn run(self: *MetricsServer) void { 55 56 while (!shutdown_flag.load(.acquire)) { ··· 58 59 log.debug("metrics accept error: {s}", .{@errorName(err)}); 59 60 continue; 60 61 }; 61 - handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir, self.persist); 62 + handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir, self.persist, self.bc); 62 63 } 63 64 } 64 65 }; 65 66 66 - fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8, persist: *event_log_mod.DiskPersist) void { 67 + fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8, persist: *event_log_mod.DiskPersist, bc: *broadcaster.Broadcaster) void { 67 68 defer stream.close(); 68 69 69 70 var recv_buf: [4096]u8 = undefined; ··· 86 87 } else if (std.mem.eql(u8, path, "/metrics")) { 87 88 const cache_entries = validator.cacheSize(); 88 89 const migration_queue_len = validator.migrationQueueLen(); 90 + const attribution = broadcaster.AttributionMetrics{ 91 + .history_entries = bc.history.count(), 92 + .evtbuf_entries = persist.evtbufLen(), 93 + .did_cache_entries = persist.didCacheLen(), 94 + .consumer_queue_depth = bc.consumerQueueDepth(), 95 + }; 89 96 90 - var metrics_buf: [8192]u8 = undefined; 91 - const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, data_dir, &metrics_buf); 97 + var metrics_buf: [12288]u8 = undefined; 98 + const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, attribution, data_dir, &metrics_buf); 92 99 request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 93 100 .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 94 101 .{ .name = "server", .value = "zlay (atproto-relay)" }, ··· 201 208 .validator = &val, 202 209 .data_dir = data_dir, 203 210 .persist = &dp, 211 + .bc = &bc, 204 212 }; 205 213 const metrics_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, MetricsServer.run, .{&metrics_srv}); 206 214 ··· 240 248 log.info("relay stopped cleanly", .{}); 241 249 } 242 250 251 + const builtin = @import("builtin"); 252 + const malloc_h = if (builtin.os.tag == .linux) @cImport(@cInclude("malloc.h")) else struct {}; 253 + 243 254 fn gcLoop(dp: *event_log_mod.DiskPersist) void { 244 255 const gc_interval: u64 = 10 * 60; // 10 minutes in seconds 245 256 while (!shutdown_flag.load(.acquire)) { ··· 255 266 dp.gc() catch |err| { 256 267 log.warn("event log GC failed: {s}", .{@errorName(err)}); 257 268 }; 269 + 270 + // return free heap pages to the OS — glibc retains freed pages in 271 + // per-thread arenas indefinitely; with ~2,700 threads this accumulates 272 + // significant RSS that the application no longer needs. 273 + if (comptime builtin.os.tag == .linux) { 274 + _ = malloc_h.malloc_trim(0); 275 + } 258 276 } 259 277 } 260 278