atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: diagnose memory growth — mallinfo2, smaps attribution, rocksdb caps

- mallinfo() → mallinfo2() for accurate metrics past 2 GiB
- add smaps_rollup + smaps parsing: relay_smaps_rss_kb, relay_smaps_anon_kb,
relay_heap_rss_kb, relay_stack_rss_kb for heap/stack/mmap attribution
- rocksdb: max_open_files=256 (was unlimited), write_buffer_size=16M per CF
(was 64M default × 3 CFs = 192M memtable overhead)
- metrics buffer 12K → 16K

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz a3055253 213dce75

+141 -12
+97 -9
src/broadcaster.zig
··· 724 724 } 725 725 } else |_| {} 726 726 727 - // glibc malloc arena stats — mallinfo returns c_int (i32) fields which 728 - // overflow at 2 GiB; bitcast to u32 extends useful range to 4 GiB 729 - const mi = malloc_h.mallinfo(); 730 - const arena: u64 = @as(u32, @bitCast(mi.arena)); 731 - const in_use: u64 = @as(u32, @bitCast(mi.uordblks)); 732 - const free_bytes: u64 = @as(u32, @bitCast(mi.fordblks)); 733 - const mmap_bytes: u64 = @as(u32, @bitCast(mi.hblkhd)); 727 + // smaps attribution — heap vs stack vs anon mmap breakdown 728 + appendSmapsMetrics(w); 729 + 730 + // glibc malloc arena stats — mallinfo2 returns size_t fields (accurate 731 + // to full address space, unlike mallinfo which overflows at 2 GiB) 732 + const mi = malloc_h.mallinfo2(); 734 733 std.fmt.format(w, 735 734 \\# TYPE relay_malloc_arena_bytes gauge 736 735 \\# HELP relay_malloc_arena_bytes total bytes claimed from OS by malloc ··· 748 747 \\# HELP relay_malloc_mmap_bytes bytes allocated via mmap (large blocks) 749 748 \\relay_malloc_mmap_bytes {d} 750 749 \\ 751 - , .{ arena, in_use, free_bytes, mmap_bytes }) catch {}; 750 + , .{ mi.arena, mi.uordblks, mi.fordblks, mi.hblkhd }) catch {}; 751 + } 752 + 753 + fn appendSmapsMetrics(w: anytype) void { 754 + // smaps_rollup: total RSS and anonymous memory from kernel 755 + if (std.fs.openFileAbsolute("/proc/self/smaps_rollup", .{})) |f| { 756 + defer f.close(); 757 + var buf: [4096]u8 = undefined; 758 + const n = f.readAll(&buf) catch 0; 759 + if (n > 0) { 760 + const content = buf[0..n]; 761 + const rollup_fields = .{ 762 + .{ "Rss:", "relay_smaps_rss_kb" }, 763 + .{ "Anonymous:", "relay_smaps_anon_kb" }, 764 + }; 765 + inline for (rollup_fields) |entry| { 766 + if (std.mem.indexOf(u8, content, entry[0])) |pos| { 767 + const rest = content[pos + entry[0].len ..]; 768 + const trimmed = std.mem.trimLeft(u8, rest, " \t"); 769 + const end = std.mem.indexOfScalar(u8, trimmed, ' ') orelse 770 + (std.mem.indexOfScalar(u8, trimmed, '\n') orelse trimmed.len); 771 + if (std.fmt.parseInt(u64, trimmed[0..end], 10)) |val| { 772 + std.fmt.format(w, 773 + \\# TYPE {s} gauge 774 + \\{s} {d} 775 + \\ 776 + , .{ entry[1], entry[1], val }) catch {}; 777 + } else |_| {} 778 + } 779 + } 780 + } 781 + } else |_| {} 782 + 783 + // smaps: per-region RSS for [heap] and [stack] attribution 784 + if (std.fs.openFileAbsolute("/proc/self/smaps", .{})) |f| { 785 + defer f.close(); 786 + // smaps can be large — read in chunks and parse line-by-line 787 + var smaps_buf: [65536]u8 = undefined; 788 + const n = f.readAll(&smaps_buf) catch 0; 789 + if (n > 0) { 790 + const content = smaps_buf[0..n]; 791 + var heap_rss_kb: u64 = 0; 792 + var stack_rss_kb: u64 = 0; 793 + var in_heap = false; 794 + var in_stack = false; 795 + 796 + var lines = std.mem.splitScalar(u8, content, '\n'); 797 + while (lines.next()) |line| { 798 + // region header lines end with the mapping name 799 + if (std.mem.indexOf(u8, line, "[heap]") != null) { 800 + in_heap = true; 801 + in_stack = false; 802 + continue; 803 + } else if (std.mem.indexOf(u8, line, "[stack]") != null) { 804 + in_stack = true; 805 + in_heap = false; 806 + continue; 807 + } else if (line.len > 0 and !std.ascii.isWhitespace(line[0]) and 808 + std.mem.indexOfScalar(u8, line, '-') != null) 809 + { 810 + // new region header (address range like "7f...–7f...") 811 + in_heap = false; 812 + in_stack = false; 813 + continue; 814 + } 815 + 816 + // inside a region — look for Rss: line 817 + if ((in_heap or in_stack) and std.mem.startsWith(u8, std.mem.trimLeft(u8, line, " "), "Rss:")) { 818 + const rss_start = (std.mem.indexOf(u8, line, "Rss:") orelse continue) + 4; 819 + const rest = std.mem.trimLeft(u8, line[rss_start..], " \t"); 820 + const end = std.mem.indexOfScalar(u8, rest, ' ') orelse rest.len; 821 + if (std.fmt.parseInt(u64, rest[0..end], 10)) |val| { 822 + if (in_heap) heap_rss_kb += val; 823 + if (in_stack) stack_rss_kb += val; 824 + } else |_| {} 825 + } 826 + } 827 + 828 + std.fmt.format(w, 829 + \\# TYPE relay_heap_rss_kb gauge 830 + \\# HELP relay_heap_rss_kb RSS from [heap] region only 831 + \\relay_heap_rss_kb {d} 832 + \\ 833 + \\# TYPE relay_stack_rss_kb gauge 834 + \\# HELP relay_stack_rss_kb sum of all [stack] region RSS 835 + \\relay_stack_rss_kb {d} 836 + \\ 837 + , .{ heap_rss_kb, stack_rss_kb }) catch {}; 838 + } 839 + } else |_| {} 752 840 } 753 841 754 842 const posix_vfs = @cImport(@cInclude("sys/statvfs.h")); ··· 886 974 stats.cache_hits.store(400, .release); 887 975 stats.cache_misses.store(100, .release); 888 976 889 - var buf: [12288]u8 = undefined; 977 + var buf: [16384]u8 = undefined; 890 978 const output = formatPrometheusMetrics(&stats, 42, 3, .{}, "/tmp", &buf); 891 979 892 980 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_received_total 10000") != null);
+43 -2
src/collection_index.zig
··· 17 17 18 18 const separator = '\x00'; 19 19 20 + // raw rocksdb C API — symbols linked via the rocksdb-zig dependency. 21 + // used to configure options the zig wrapper doesn't expose. 22 + const rdb_c = struct { 23 + extern fn rocksdb_set_options_cf( 24 + db: ?*anyopaque, 25 + cf: ?*anyopaque, 26 + count: c_int, 27 + keys: [*]const [*:0]const u8, 28 + values: [*]const [*:0]const u8, 29 + errptr: *?[*:0]u8, 30 + ) void; 31 + }; 32 + 20 33 pub const CollectionIndex = struct { 21 34 db: rocksdb.DB, 22 35 rbc: rocksdb.ColumnFamilyHandle, ··· 32 45 .{ 33 46 .create_if_missing = true, 34 47 .create_missing_column_families = true, 48 + .max_open_files = 256, 35 49 }, 36 50 &.{ 37 51 .{ .name = "default" }, ··· 57 71 if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle; 58 72 } 59 73 60 - log.info("collection index opened at {s}", .{data_dir}); 74 + log.info("collection index opened at {s} (max_open_files=256)", .{data_dir}); 61 75 62 - return .{ 76 + var ci = CollectionIndex{ 63 77 .db = db, 64 78 .rbc = rbc orelse return error.MissingColumnFamily, 65 79 .cbr = cbr orelse return error.MissingColumnFamily, 66 80 .allocator = allocator, 67 81 }; 82 + 83 + // cap write buffer per CF — default is 64 MiB × 3 CFs = 192 MiB. 84 + // 16 MiB per CF is plenty for our write pattern (small KV pairs). 85 + ci.setCfOption(ci.rbc, "write_buffer_size", "16777216"); 86 + ci.setCfOption(ci.cbr, "write_buffer_size", "16777216"); 87 + 88 + return ci; 89 + } 90 + 91 + /// set a RocksDB option on a column family via the raw C API. 92 + /// the zig wrapper's ColumnFamilyOptions is a stub, so we call 93 + /// rocksdb_set_options_cf directly (symbols linked via dependency). 94 + fn setCfOption(self: *CollectionIndex, cf: rocksdb.ColumnFamilyHandle, key: [*:0]const u8, value: [*:0]const u8) void { 95 + var err_ptr: ?[*:0]u8 = null; 96 + const keys = [_][*:0]const u8{key}; 97 + const values = [_][*:0]const u8{value}; 98 + rdb_c.rocksdb_set_options_cf( 99 + @ptrCast(self.db.db), 100 + @ptrCast(cf), 101 + 1, 102 + &keys, 103 + &values, 104 + &err_ptr, 105 + ); 106 + if (err_ptr) |e| { 107 + log.warn("rocksdb set option '{s}' failed: {s}", .{ key, std.mem.span(e) }); 108 + } 68 109 } 69 110 70 111 pub fn deinit(self: *CollectionIndex) void {
+1 -1
src/main.zig
··· 94 94 .consumer_queue_depth = bc.consumerQueueDepth(), 95 95 }; 96 96 97 - var metrics_buf: [12288]u8 = undefined; 97 + var metrics_buf: [16384]u8 = undefined; 98 98 const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, attribution, data_dir, &metrics_buf); 99 99 request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 100 100 .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" },