atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

bump per-consumer buffer 8192→65536 + host_authority reject breakdown

the 8192-entry per-consumer ring = ~33s of headroom at 250fps. pulsar's
60-min snapshot accumulated repeated ConsumerTooSlow kicks within that
window (ops_changelog 2026-04-01). bumping to 65536 gives ~4.4min of
headroom — enough to absorb transient write stalls without dropping
consumers mid-run.

checkPdsHost had five silent-reject branches collapsed into one
failed_host_authority counter, which hid the 100% rejection rate
diagnosed 2026-04-08. split into per-branch counters emitted as
relay_host_authority_reject{branch=...} alongside a 1-in-2048 sampled
warn log so we can tell whether the DID doc lookup is failing, the
endpoint is unparseable, the host isn't in our table, or the resolved
host genuinely differs from the incoming host.

+81 -10
+32 -1
src/broadcaster.zig
··· 55 55 host_authority_is_new: std.atomic.Value(u64) = .{ .raw = 0 }, 56 56 host_authority_host_changed: std.atomic.Value(u64) = .{ .raw = 0 }, 57 57 host_authority_time_us: std.atomic.Value(u64) = .{ .raw = 0 }, 58 + // per-branch reject breakdown (subsets of failed_host_authority). 59 + // added 2026-04-08 to diagnose the 100% host_authority failure rate — 60 + // without this breakdown we can't tell whether the DID doc lookup is 61 + // failing, the endpoint is unparseable, the host isn't in our table, 62 + // or the resolved host genuinely differs from the incoming host. 63 + host_authority_reject_parse_did: std.atomic.Value(u64) = .{ .raw = 0 }, 64 + host_authority_reject_resolve: std.atomic.Value(u64) = .{ .raw = 0 }, 65 + host_authority_reject_no_endpoint: std.atomic.Value(u64) = .{ .raw = 0 }, 66 + host_authority_reject_bad_url: std.atomic.Value(u64) = .{ .raw = 0 }, 67 + host_authority_reject_unknown_host: std.atomic.Value(u64) = .{ .raw = 0 }, 68 + host_authority_reject_host_mismatch: std.atomic.Value(u64) = .{ .raw = 0 }, 58 69 // frame pool memory pressure 59 70 pool_queued_bytes: std.atomic.Value(u64) = .{ .raw = 0 }, 60 71 // persist/broadcast pipeline contention ··· 353 364 const ping_timeout_ns: u64 = 5 * std.time.ns_per_s; 354 365 355 366 pub const Consumer = struct { 356 - const BUFFER_CAP = 8192; 367 + // per-consumer ring buffer. sized to absorb transient write stalls 368 + // without kicking the consumer with ConsumerTooSlow. at steady-state 369 + // ~250 events/sec, 65536 entries = ~4.4 minutes of headroom. 370 + // previously 8192 (~33s) which was short enough that pulsar's 60-min 371 + // snapshot run accumulated repeated kicks (see ops_changelog 2026-04-01). 372 + const BUFFER_CAP = 65536; 357 373 358 374 conn: *websocket.Conn, 359 375 allocator: Allocator, ··· 1069 1085 \\relay_validation_failed{{reason="host_authority"}} {d} 1070 1086 \\relay_validation_failed{{reason="future_rev"}} {d} 1071 1087 \\ 1088 + \\# TYPE relay_host_authority_reject counter 1089 + \\# HELP relay_host_authority_reject host authority reject breakdown by branch 1090 + \\relay_host_authority_reject{{branch="parse_did"}} {d} 1091 + \\relay_host_authority_reject{{branch="resolve"}} {d} 1092 + \\relay_host_authority_reject{{branch="no_endpoint"}} {d} 1093 + \\relay_host_authority_reject{{branch="bad_url"}} {d} 1094 + \\relay_host_authority_reject{{branch="unknown_host"}} {d} 1095 + \\relay_host_authority_reject{{branch="host_mismatch"}} {d} 1096 + \\ 1072 1097 , .{ 1073 1098 stats.failed_bad_did.load(.acquire), 1074 1099 stats.failed_bad_rev.load(.acquire), ··· 1077 1102 stats.failed_bad_structure.load(.acquire), 1078 1103 stats.failed_host_authority.load(.acquire), 1079 1104 stats.failed_future_rev.load(.acquire), 1105 + stats.host_authority_reject_parse_did.load(.acquire), 1106 + stats.host_authority_reject_resolve.load(.acquire), 1107 + stats.host_authority_reject_no_endpoint.load(.acquire), 1108 + stats.host_authority_reject_bad_url.load(.acquire), 1109 + stats.host_authority_reject_unknown_host.load(.acquire), 1110 + stats.host_authority_reject_host_mismatch.load(.acquire), 1080 1111 }) catch return w.buffered(); 1081 1112 1082 1113 // memory attribution — internal capacities help identify what's consuming RSS
+49 -9
src/validator.zig
··· 551 551 /// .reject — DID doc does not confirm, caller should drop the event 552 552 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { 553 553 const persist = self.persist orelse return .migrate; // no DB — can't check 554 - const parsed = zat.Did.parse(did) orelse return .reject; 554 + const parsed = zat.Did.parse(did) orelse { 555 + _ = self.stats.host_authority_reject_parse_did.fetchAdd(1, .monotonic); 556 + return .reject; 557 + }; 555 558 556 559 const idx = self.acquireHostResolver(); 557 560 defer self.releaseHostResolver(idx); ··· 561 564 // first resolve attempt 562 565 var doc = resolver.resolve(parsed) catch { 563 566 // retry once on network failure 564 - var doc2 = resolver.resolve(parsed) catch return .reject; 567 + var doc2 = resolver.resolve(parsed) catch { 568 + _ = self.stats.host_authority_reject_resolve.fetchAdd(1, .monotonic); 569 + return .reject; 570 + }; 565 571 defer doc2.deinit(); 566 - return self.checkPdsHost(&doc2, persist, incoming_host_id); 572 + return self.checkPdsHost(&doc2, persist, did, incoming_host_id); 567 573 }; 568 574 defer doc.deinit(); 569 - return self.checkPdsHost(&doc, persist, incoming_host_id); 575 + return self.checkPdsHost(&doc, persist, did, incoming_host_id); 570 576 } 571 577 572 578 /// acquire a resolver from the pool. spins until one is available. ··· 586 592 self.host_resolver_available[idx].store(true, .release); 587 593 } 588 594 589 - fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority { 590 - _ = self; 591 - const pds_endpoint = doc.pdsEndpoint() orelse return .reject; 592 - const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject; 593 - const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject; 595 + fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, did: []const u8, incoming_host_id: u64) HostAuthority { 596 + const pds_endpoint = doc.pdsEndpoint() orelse { 597 + _ = self.stats.host_authority_reject_no_endpoint.fetchAdd(1, .monotonic); 598 + self.sampleLogReject("no_endpoint", did, "", incoming_host_id, 0); 599 + return .reject; 600 + }; 601 + const pds_host = extractHostFromUrl(pds_endpoint) orelse { 602 + _ = self.stats.host_authority_reject_bad_url.fetchAdd(1, .monotonic); 603 + self.sampleLogReject("bad_url", did, pds_endpoint, incoming_host_id, 0); 604 + return .reject; 605 + }; 606 + const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse { 607 + _ = self.stats.host_authority_reject_unknown_host.fetchAdd(1, .monotonic); 608 + self.sampleLogReject("unknown_host", did, pds_host, incoming_host_id, 0); 609 + return .reject; 610 + }; 594 611 if (pds_host_id == incoming_host_id) return .migrate; 612 + _ = self.stats.host_authority_reject_host_mismatch.fetchAdd(1, .monotonic); 613 + self.sampleLogReject("host_mismatch", did, pds_host, incoming_host_id, pds_host_id); 595 614 return .reject; 615 + } 616 + 617 + /// log a rejection sample at 1-in-N rate to avoid drowning the log at 618 + /// ~10 rejections/sec. total rejections per branch are available via 619 + /// relay_host_authority_reject{branch=...} in prometheus. 620 + fn sampleLogReject( 621 + self: *Validator, 622 + branch: []const u8, 623 + did: []const u8, 624 + detail: []const u8, 625 + incoming_host_id: u64, 626 + resolved_host_id: u64, 627 + ) void { 628 + const count = self.stats.failed_host_authority.load(.monotonic); 629 + // sample 1 in 2048 — at 10/s that's one log line every ~3.5min. 630 + // parens mandatory: `&` and `!=` have surprising precedence in zig. 631 + if ((count & 0x7ff) != 0) return; 632 + log.warn( 633 + "host_authority reject branch={s} did={s} detail={s} incoming_host_id={d} resolved_host_id={d}", 634 + .{ branch, did, detail, incoming_host_id, resolved_host_id }, 635 + ); 596 636 } 597 637 }; 598 638