atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

add memory attribution metrics + fix mallinfo accuracy

new prometheus gauges for internal data structure capacities:
- validator_cache_map_cap, did_cache_map_cap, queued_set_map_cap
- evtbuf_cap, outbuf_cap, workers_count

switch mallinfo() to mallinfo2() for accurate multi-arena reporting
(size_t fields, all thread arenas instead of just main arena).

pure observability — zero behavioral change.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 84108567 b2721970

+114 -9
+17 -1
EXPERIMENTS.md
··· 92 92 93 93 **revert**: just rebuild without `-Duse_gpa=true` (default is false, zero overhead). 94 94 95 - **status**: active 95 + **deployment attempt (2026-03-07)**: 96 + - GPA's per-allocation metadata tracking consumed memory ~55x faster than the base leak 97 + (~16 GiB/hour vs ~290 MiB/hour). at ~700 frames/sec × ~37 allocs/frame = ~26K tracked 98 + allocations/sec, the metadata itself dominates. 99 + - caused severe sawtooth pattern: ~7-8 OOM kills in ~3 hours (8 GiB limit) 100 + - first pod: logs lost when `kubectl delete pod` was used (should have used `kubectl scale --replicas=0`) 101 + - second pod: RocksDB lock file stale after first crash, had to clear manually 102 + - reverted to normal ReleaseSafe build after ~4 hours (relay was submitted for testing) 103 + 104 + **learnings for next attempt**: 105 + - need to reduce incoming load (fewer PDS hosts) to slow memory growth enough that GPA overhead doesn't OOM 106 + - or increase memory limit temporarily (e.g. 16 GiB) for the diagnostic window 107 + - use `kubectl scale deployment/zlay -n zlay --replicas=0` to preserve logs (not `kubectl delete pod`) 108 + - container lacks `kill` binary — need an admin endpoint or install procps in the image 109 + - consider adding `/admin/shutdown` HTTP endpoint to trigger graceful shutdown without `kill` 110 + 111 + **status**: paused — code merged (compiled out by default), needs better deployment strategy
+49 -6
src/broadcaster.zig
··· 552 552 did_cache_entries: usize = 0, 553 553 resolve_queue_len: usize = 0, 554 554 resolve_queued_set_count: usize = 0, 555 + // memory attribution — internal capacities of key data structures 556 + validator_cache_map_cap: usize = 0, 557 + did_cache_map_cap: usize = 0, 558 + queued_set_map_cap: usize = 0, 559 + evtbuf_cap: usize = 0, 560 + outbuf_cap: usize = 0, 561 + workers_count: usize = 0, 555 562 }; 556 563 557 564 pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, attribution: AttributionMetrics, data_dir: []const u8, buf: []u8) []const u8 { ··· 663 670 stats.chain_breaks.load(.acquire), 664 671 }) catch return fbs.getWritten(); 665 672 673 + // memory attribution — internal capacities help identify what's consuming RSS 674 + std.fmt.format(w, 675 + \\# TYPE relay_validator_cache_map_cap gauge 676 + \\# HELP relay_validator_cache_map_cap hashmap backing capacity of signing key LRU 677 + \\relay_validator_cache_map_cap {d} 678 + \\ 679 + \\# TYPE relay_did_cache_map_cap gauge 680 + \\# HELP relay_did_cache_map_cap hashmap backing capacity of DID-to-UID LRU 681 + \\relay_did_cache_map_cap {d} 682 + \\ 683 + \\# TYPE relay_queued_set_map_cap gauge 684 + \\# HELP relay_queued_set_map_cap hashmap backing capacity of resolver dedup set 685 + \\relay_queued_set_map_cap {d} 686 + \\ 687 + \\# TYPE relay_evtbuf_cap gauge 688 + \\# HELP relay_evtbuf_cap allocated capacity of event flush buffer (jobs) 689 + \\relay_evtbuf_cap {d} 690 + \\ 691 + \\# TYPE relay_outbuf_cap gauge 692 + \\# HELP relay_outbuf_cap allocated capacity of output byte buffer 693 + \\relay_outbuf_cap {d} 694 + \\ 695 + \\# TYPE relay_workers_count gauge 696 + \\# HELP relay_workers_count active subscriber worker threads 697 + \\relay_workers_count {d} 698 + \\ 699 + , .{ 700 + attribution.validator_cache_map_cap, 701 + attribution.did_cache_map_cap, 702 + attribution.queued_set_map_cap, 703 + attribution.evtbuf_cap, 704 + attribution.outbuf_cap, 705 + attribution.workers_count, 706 + }) catch {}; 707 + 666 708 // linux-only process metrics from /proc 667 709 if (comptime builtin.os.tag == .linux) { 668 710 appendProcMetrics(w); ··· 724 766 } 725 767 } else |_| {} 726 768 727 - // glibc malloc stats — mallinfo fields are c_int, bitcast to u32 to extend range to 4 GiB 728 - const mi = malloc_h.mallinfo(); 729 - const arena: u64 = @as(u32, @bitCast(mi.arena)); 730 - const in_use: u64 = @as(u32, @bitCast(mi.uordblks)); 731 - const free_bytes: u64 = @as(u32, @bitCast(mi.fordblks)); 732 - const mmap_bytes: u64 = @as(u32, @bitCast(mi.hblkhd)); 769 + // glibc malloc stats via mallinfo2 — uses size_t fields (no 2 GiB overflow), 770 + // reports across ALL thread arenas (mallinfo only reports the main arena) 771 + const mi = malloc_h.mallinfo2(); 772 + const arena: u64 = mi.arena; 773 + const in_use: u64 = mi.uordblks; 774 + const free_bytes: u64 = mi.fordblks; 775 + const mmap_bytes: u64 = mi.hblkhd; 733 776 std.fmt.format(w, 734 777 \\# TYPE relay_malloc_arena_bytes gauge 735 778 \\relay_malloc_arena_bytes {d}
+19
src/event_log.zig
··· 117 117 return self.did_cache.count(); 118 118 } 119 119 120 + /// DID cache hashmap backing capacity (for memory attribution) 121 + pub fn didCacheMapCap(self: *DiskPersist) u32 { 122 + return self.did_cache.mapCapacity(); 123 + } 124 + 125 + /// evtbuf allocated capacity in jobs (for memory attribution — non-blocking) 126 + pub fn evtbufCap(self: *DiskPersist) usize { 127 + if (!self.mutex.tryLock()) return 0; 128 + defer self.mutex.unlock(); 129 + return self.evtbuf.capacity; 130 + } 131 + 132 + /// outbuf allocated capacity in bytes (for memory attribution — non-blocking) 133 + pub fn outbufCap(self: *DiskPersist) usize { 134 + if (!self.mutex.tryLock()) return 0; 135 + defer self.mutex.unlock(); 136 + return self.outbuf.capacity; 137 + } 138 + 120 139 pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8) !DiskPersist { 121 140 // ensure directory exists 122 141 std.fs.cwd().makePath(dir_path) catch |err| switch (err) {
+7
src/lru.zig
··· 115 115 return self.len; 116 116 } 117 117 118 + /// internal hashmap capacity (non-blocking — returns 0 if lock is contended) 119 + pub fn mapCapacity(self: *Self) u32 { 120 + if (!self.mutex.tryLock()) return 0; 121 + defer self.mutex.unlock(); 122 + return self.map.capacity(); 123 + } 124 + 118 125 // --- internal (caller holds mutex) --- 119 126 120 127 fn moveToHead(self: *Self, node: *Node) void {
+10 -2
src/main.zig
··· 53 53 data_dir: []const u8, 54 54 persist: *event_log_mod.DiskPersist, 55 55 bc: *broadcaster.Broadcaster, 56 + slurper: *slurper_mod.Slurper, 56 57 57 58 fn run(self: *MetricsServer) void { 58 59 while (!shutdown_flag.load(.acquire)) { ··· 64 65 // 5s read timeout — prevents stale connections from blocking the single-threaded server 65 66 const timeout = std.posix.timeval{ .sec = 5, .usec = 0 }; 66 67 std.posix.setsockopt(conn.stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, std.mem.asBytes(&timeout)) catch {}; 67 - handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir, self.persist, self.bc); 68 + handleMetricsConn(conn.stream, self.stats, self.validator, self.data_dir, self.persist, self.bc, self.slurper); 68 69 } 69 70 } 70 71 }; 71 72 72 - fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8, persist: *event_log_mod.DiskPersist, bc: *broadcaster.Broadcaster) void { 73 + fn handleMetricsConn(stream: std.net.Stream, stats: *broadcaster.Stats, validator: *validator_mod.Validator, data_dir: []const u8, persist: *event_log_mod.DiskPersist, bc: *broadcaster.Broadcaster, slurp: *slurper_mod.Slurper) void { 73 74 defer stream.close(); 74 75 75 76 var recv_buf: [4096]u8 = undefined; ··· 103 104 .did_cache_entries = persist.didCacheLen(), 104 105 .resolve_queue_len = validator.resolveQueueLen(), 105 106 .resolve_queued_set_count = validator.resolveQueuedSetCount(), 107 + .validator_cache_map_cap = validator.cacheMapCapacity(), 108 + .did_cache_map_cap = persist.didCacheMapCap(), 109 + .queued_set_map_cap = validator.resolveQueuedSetCapacity(), 110 + .evtbuf_cap = persist.evtbufCap(), 111 + .outbuf_cap = persist.outbufCap(), 112 + .workers_count = slurp.workerCount(), 106 113 }; 107 114 108 115 var metrics_buf: [65536]u8 = undefined; ··· 236 243 .data_dir = data_dir, 237 244 .persist = &dp, 238 245 .bc = &bc, 246 + .slurper = &slurper, 239 247 }; 240 248 const metrics_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, MetricsServer.run, .{&metrics_srv}); 241 249
+12
src/validator.zig
··· 489 489 defer self.queue_mutex.unlock(); 490 490 return self.queued_set.count(); 491 491 } 492 + 493 + /// signing key cache hashmap backing capacity (for memory attribution) 494 + pub fn cacheMapCapacity(self: *Validator) u32 { 495 + return self.cache.mapCapacity(); 496 + } 497 + 498 + /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking) 499 + pub fn resolveQueuedSetCapacity(self: *Validator) u32 { 500 + if (!self.queue_mutex.tryLock()) return 0; 501 + defer self.queue_mutex.unlock(); 502 + return self.queued_set.capacity(); 503 + } 492 504 }; 493 505 494 506 /// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path"