atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

host authority: reuse pooled resolvers + add diagnostics

pool of 4 pre-created DidResolvers for host authority checks,
replacing per-call create/destroy that caused fresh TLS handshakes.
new metrics: host_authority_checks, is_new/host_changed triggers,
resolution latency, and pool_queued_bytes for memory pressure.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 1639565a 93512d16

+87 -4
+29
src/broadcaster.zig
··· 49 49 cache_evictions: std.atomic.Value(u64) = .{ .raw = 0 }, 50 50 chain_breaks: std.atomic.Value(u64) = .{ .raw = 0 }, 51 51 pool_backpressure: std.atomic.Value(u64) = .{ .raw = 0 }, 52 + // host authority resolution metrics 53 + host_authority_checks: std.atomic.Value(u64) = .{ .raw = 0 }, 54 + host_authority_is_new: std.atomic.Value(u64) = .{ .raw = 0 }, 55 + host_authority_host_changed: std.atomic.Value(u64) = .{ .raw = 0 }, 56 + host_authority_time_us: std.atomic.Value(u64) = .{ .raw = 0 }, 57 + // frame pool memory pressure 58 + pool_queued_bytes: std.atomic.Value(u64) = .{ .raw = 0 }, 52 59 start_time: i64 = 0, 53 60 }; 54 61 ··· 774 781 \\# HELP relay_chain_breaks_total since/prevData chain continuity failures 775 782 \\relay_chain_breaks_total {d} 776 783 \\ 784 + \\# TYPE relay_host_authority_checks_total counter 785 + \\# HELP relay_host_authority_checks_total resolveHostAuthority calls on frame workers 786 + \\relay_host_authority_checks_total {d} 787 + \\ 788 + \\# TYPE relay_host_authority_trigger counter 789 + \\# HELP relay_host_authority_trigger host authority check triggers by reason 790 + \\relay_host_authority_trigger{{reason="is_new"}} {d} 791 + \\relay_host_authority_trigger{{reason="host_changed"}} {d} 792 + \\ 793 + \\# TYPE relay_host_authority_time_us_total counter 794 + \\# HELP relay_host_authority_time_us_total cumulative microseconds in resolveHostAuthority 795 + \\relay_host_authority_time_us_total {d} 796 + \\ 797 + \\# TYPE relay_pool_queued_bytes gauge 798 + \\# HELP relay_pool_queued_bytes estimated bytes of frame data queued in worker pool 799 + \\relay_pool_queued_bytes {d} 800 + \\ 777 801 , .{ 778 802 stats.frames_in.load(.acquire), 779 803 stats.frames_out.load(.acquire), ··· 802 826 attribution.resolve_queue_len, 803 827 attribution.resolve_queued_set_count, 804 828 stats.chain_breaks.load(.acquire), 829 + stats.host_authority_checks.load(.acquire), 830 + stats.host_authority_is_new.load(.acquire), 831 + stats.host_authority_host_changed.load(.acquire), 832 + stats.host_authority_time_us.load(.acquire), 833 + stats.pool_queued_bytes.load(.acquire), 805 834 }) catch return fbs.getWritten(); 806 835 807 836 // validation failure breakdown by reason
+9
src/frame_worker.zig
··· 33 33 }; 34 34 35 35 pub fn processFrame(work: *FrameWork) void { 36 + _ = work.bc.stats.pool_queued_bytes.fetchSub(work.data.len, .monotonic); 36 37 defer work.allocator.free(work.data); 37 38 38 39 var arena = std.heap.ArenaAllocator.init(work.allocator); ··· 88 89 // #identity events are exempt — any PDS can emit them (same as indigo). 89 90 // covers both first-seen DIDs (is_new) and host migrations (host_changed). 90 91 if ((result.is_new or result.host_changed) and !is_identity) { 92 + if (result.is_new) _ = work.bc.stats.host_authority_is_new.fetchAdd(1, .monotonic); 93 + if (result.host_changed) _ = work.bc.stats.host_authority_host_changed.fetchAdd(1, .monotonic); 94 + _ = work.bc.stats.host_authority_checks.fetchAdd(1, .monotonic); 95 + const ha_t0 = std.time.microTimestamp(); 96 + defer { 97 + const elapsed: u64 = @intCast(@max(0, std.time.microTimestamp() - ha_t0)); 98 + _ = work.bc.stats.host_authority_time_us.fetchAdd(elapsed, .monotonic); 99 + } 91 100 switch (work.validator.resolveHostAuthority(d, work.host_id)) { 92 101 .migrate => { 93 102 // DID doc confirms the host — update and continue
+1
src/subscriber.zig
··· 448 448 .resyncer = sub.resyncer, 449 449 }, sub.shutdown)) { 450 450 // pool accepted — advance cursor past this frame 451 + _ = sub.bc.stats.pool_queued_bytes.fetchAdd(duped.len, .monotonic); 451 452 if (upstream_seq) |s| sub.last_upstream_seq = s; 452 453 if (std.time.nanoTimestamp() - t0 > 1_000_000) { // >1ms = had to wait 453 454 _ = sub.bc.stats.pool_backpressure.fetchAdd(1, .monotonic);
+48 -4
src/validator.zig
··· 57 57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 58 58 alive: std.atomic.Value(bool) = .{ .raw = true }, 59 59 max_cache_size: u32 = 250_000, 60 + // pool of reusable resolvers for inline host authority checks. 61 + // frame workers acquire/release via atomic flag to avoid creating 62 + // a fresh resolver (and fresh TLS handshake) per call. 63 + host_resolvers: [host_resolver_pool_size]zat.DidResolver = undefined, 64 + host_resolver_available: [host_resolver_pool_size]std.atomic.Value(bool) = .{std.atomic.Value(bool){ .raw = false }} ** host_resolver_pool_size, 65 + host_resolver_inited: bool = false, 60 66 61 67 const max_resolver_threads = 8; 62 68 const default_resolver_threads = 4; 63 69 const max_queue_size: usize = 100_000; 70 + const host_resolver_pool_size: usize = 4; 64 71 65 72 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 66 73 return initWithConfig(allocator, stats, .{}); ··· 85 92 } 86 93 } 87 94 95 + if (self.host_resolver_inited) { 96 + for (&self.host_resolvers) |*r| { 97 + r.deinit(); 98 + } 99 + self.host_resolver_inited = false; 100 + } 101 + 88 102 self.cache.deinit(); 89 103 90 104 // free queued DIDs ··· 95 109 self.queued_set.deinit(self.allocator); 96 110 } 97 111 98 - /// start background resolver threads 112 + /// start background resolver threads and host authority resolver pool 99 113 pub fn start(self: *Validator) !void { 100 114 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); 101 115 self.cache.capacity = self.max_cache_size; ··· 104 118 for (self.resolver_threads[0..count]) |*t| { 105 119 t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self}); 106 120 } 121 + 122 + // init host authority resolver pool (reused across calls) 123 + for (&self.host_resolvers) |*r| { 124 + r.* = zat.DidResolver.initWithOptions(self.allocator, .{}); 125 + } 126 + for (&self.host_resolver_available) |*a| { 127 + a.store(true, .release); 128 + } 129 + self.host_resolver_inited = true; 107 130 } 108 131 109 132 /// validate a #sync frame: signature verification only (no ops, no MST). ··· 516 539 /// PDS endpoint matches the incoming host. retries once on failure to 517 540 /// handle transient network errors. 518 541 /// 542 + /// uses a pooled resolver to avoid creating a fresh resolver (and fresh 543 + /// TLS handshake) per call. blocks briefly if all pool slots are in use. 544 + /// 519 545 /// returns: 520 546 /// .accept — should not happen (caller should only call on new/mismatch) 521 547 /// .migrate — DID doc confirms this host, caller should update host_id 522 548 /// .reject — DID doc does not confirm, caller should drop the event 523 549 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { 524 550 const persist = self.persist orelse return .migrate; // no DB — can't check 551 + const parsed = zat.Did.parse(did) orelse return .reject; 525 552 526 - var resolver = zat.DidResolver.initWithOptions(self.allocator, .{}); 527 - defer resolver.deinit(); 553 + const idx = self.acquireHostResolver(); 554 + defer self.releaseHostResolver(idx); 528 555 529 - const parsed = zat.Did.parse(did) orelse return .reject; 556 + var resolver = &self.host_resolvers[idx]; 530 557 531 558 // first resolve attempt 532 559 var doc = resolver.resolve(parsed) catch { ··· 537 564 }; 538 565 defer doc.deinit(); 539 566 return self.checkPdsHost(&doc, persist, incoming_host_id); 567 + } 568 + 569 + /// acquire a resolver from the pool. spins until one is available. 570 + fn acquireHostResolver(self: *Validator) usize { 571 + while (self.alive.load(.acquire)) { 572 + for (0..host_resolver_pool_size) |i| { 573 + if (self.host_resolver_available[i].cmpxchgStrong(true, false, .acquire, .monotonic) == null) { 574 + return i; 575 + } 576 + } 577 + std.Thread.yield() catch {}; 578 + } 579 + return 0; // shutdown path — caller will exit soon 580 + } 581 + 582 + fn releaseHostResolver(self: *Validator, idx: usize) void { 583 + self.host_resolver_available[idx].store(true, .release); 540 584 } 541 585 542 586 fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority {