atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

reset tree to b91382b for canary 1

forward-only rewind: every commit on main between b91382b and 4f3d1d4
has been superseded or is suspected of being implicated in the
2026-04-09 HTTP / delivery outage. rather than force-pushing history
backward, this commit creates a new snapshot whose tree matches b91382b
exactly, parented on 4f3d1d4. git pull --ff-only continues to work.
the superseded commits remain in ancestry and can be referenced via
the ops-changelog:

- 4f3d1d4 gcLoop: disable malloc_trim, bump interval 10min→1h
- 795cc41 host_authority: slot recovery + pool metrics + preload account count
- bbba92c fix build: drop unused err1 capture in resolveHostAuthority
- 584571a disable keep_alive on host authority resolver pool + log resolve errors
- ee4e368 bump per-consumer buffer 8192→65536 + host_authority reject breakdown
- 31825b2 subscriber: extract prepareFrameWork + add UAF regression test
- 1eec324 fix UAF: dupe FrameWork.hostname per submit (will be re-applied on top)
- 168d9f1 bump websocket.zig + zat: fix requestCrawl POST hang
- fbdffbe mark DB success on did_cache hits
- 3dc21b9 fix gcLoop: silently exited after one tick
- e5f415f update README, CLAUDE.md, Dockerfile for current state

this commit and the two following it (cherry-pick 1eec324 + pin zat
alpha.21) constitute canary 1 per docs/zlay-canary-plan-2026-04-09.md.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

zzstoatzz 284008fa 4f3d1d4c

+106 -542
+6 -9
CLAUDE.md
··· 1 1 # zlay 2 2 3 - AT Protocol relay in zig 0.16. Io.Evented backend (io_uring fibers, 4 - ~47 OS threads for ~2,800 PDS connections). ReleaseFast in production 5 - (Evented + ReleaseSafe GPFs on startup — upstream zig bug, see 6 - scripts/fiber_gpf_issue.md). 3 + AT Protocol relay in zig 0.16. reader thread per PDS + shared frame 4 + processing pool. ReleaseSafe in production. Io.Threaded backend (Evented 5 + attempt shelved — see docs/evented-attempt.md). 7 6 8 7 ## before pushing 9 8 10 9 - `zig fmt --check .` and `zig build test` 11 10 - MUST use `-Dtarget=x86_64-linux-gnu` for production (musl breaks RocksDB) 12 - - MUST use `-Doptimize=ReleaseFast` — ReleaseSafe GPFs under Evented 13 - - do NOT use Debug builds in production (2.5 GiB vs 1.5 GiB RSS) 11 + - ReleaseFast has a known double-free — do not use 14 12 15 13 ## deploy 16 14 17 - configs at `../@zzstoatzz.io/relay/` — `just zlay publish-remote ReleaseFast` 15 + configs at `../@zzstoatzz.io/relay/` — `just zlay publish-remote ReleaseSafe` 18 16 19 17 KUBECONFIG is set automatically by the zlay module (`zlay/kubeconfig.yaml`). 20 18 ··· 24 22 - [docs/deployment.md](docs/deployment.md) — build flags, infra, resource usage 25 23 - [docs/gotchas.md](docs/gotchas.md) — zig/pg.zig/rocksdb-zig/deploy traps 26 24 - [docs/incident-2026-03-04.md](docs/incident-2026-03-04.md) — ReleaseSafe RSS analysis 27 - - [docs/evented-attempt.md](docs/evented-attempt.md) — Evented backend migration story 28 - - [scripts/fiber_gpf_issue.md](scripts/fiber_gpf_issue.md) — upstream zig bug report 25 + - [docs/evented-attempt.md](docs/evented-attempt.md) — Evented backend attempt and why we reverted
+1 -1
Dockerfile
··· 22 22 # contextSwitch, confirmed 2026-04-05). This is a zig codegen bug, not our code. 23 23 # ReleaseFast avoids the bad optimization path. The previous production SIGSEGV 24 24 # under ReleaseFast was a websocket handshake bug, now fixed (9ac64da). 25 - RUN zig build -Doptimize=ReleaseFast -Dcpu=baseline -Dtarget=x86_64-linux-gnu 25 + RUN zig build -Doptimize=ReleaseSafe -Dcpu=baseline -Dtarget=x86_64-linux-gnu 26 26 27 27 FROM --platform=linux/amd64 debian:bookworm-slim 28 28 RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
+8 -12
README.md
··· 8 8 9 9 - **direct PDS crawl** — the bootstrap relay is called once at startup for the host list via `listHosts`, then all data flows directly from each PDS. 10 10 11 - - **Io.Evented backend** — uses zig 0.16's [`std.Io`](https://ziglang.org/documentation/master/std/#std.Io) with the [Evented](https://ziglang.org/documentation/master/std/#std.Io.Evented) backend (io_uring). ~2,800 PDS subscriber fibers run on ~47 OS threads — a 60x reduction from the 0.15 thread-per-PDS model. 12 - 13 - - **cross-Io architecture** — networking runs on Evented fibers, database access runs on Threaded workers. a lock-free [DbRequestQueue](https://tangled.org/zzstoatzz.io/zlay/blob/main/src/db_request_queue.zig) bridges the two using atomic spinlocks — no futex, no cross-Io boundary violations. see [devlog 008](https://zat.dev/#devlog/008-the-io-migration.md) for the full migration story. 14 - 15 11 - **optimistic signature validation** — on signing key cache miss, the frame passes through immediately and the DID is queued for background resolution. all subsequent commits are verified against the cached key. the cache caps at a configurable size and evicts the least recently used entry when full. 16 12 17 13 - **inline collection index** — indexes `(DID, collection)` pairs in the event processing pipeline using RocksDB. serves `listReposByCollection` from the relay process — no sidecar. the index design draws on [fig](https://tangled.org/microcosm.blue)'s work on [lightrail](https://tangled.org/microcosm.blue/lightrail). 14 + 15 + - **reader thread per PDS + frame processing pool** — each PDS gets a lightweight reader thread (cursor tracking, rate limiting, header decode). heavy work (full CBOR decode, validation, DB persist, broadcast) runs on a shared pool of frame workers (configurable, default 16). 18 16 19 17 ## spec compliance 20 18 ··· 26 24 27 25 | dependency | purpose | 28 26 |---|---| 29 - | [zat](https://tangled.org/zat.dev/zat) | AT Protocol primitives (CBOR, CAR, signatures, DID resolution) | 30 - | [websocket.zig](https://github.com/zzstoatzz/websocket.zig) | WebSocket client/server (fork with write lock, HTTP fallback, TCP split fix) | 27 + | [zat](https://tangled.org/zzstoatzz.io/zat) | AT Protocol primitives (CBOR, CAR, signatures, DID resolution) | 28 + | [websocket.zig](https://github.com/zzstoatzz/websocket.zig) | WebSocket client/server (fork with HTTP fallback + TCP split fixes) | 31 29 | [pg.zig](https://github.com/karlseguin/pg.zig) | PostgreSQL driver | 32 30 | [rocksdb-zig](https://github.com/Syndica/rocksdb-zig) | RocksDB bindings | 33 31 34 32 ## build 35 33 36 - requires zig 0.16 and a C/C++ toolchain (for RocksDB). 34 + requires zig 0.15 and a C/C++ toolchain (for RocksDB). 37 35 38 36 ```bash 39 - zig build # build (debug) 40 - zig build test # run tests 41 - zig build -Doptimize=ReleaseFast # release build (production) 37 + zig build # build (debug) 38 + zig build test # run tests 39 + zig build -Doptimize=ReleaseSafe # release build (production default) 42 40 ``` 43 - 44 - note: `ReleaseSafe` GPFs on startup with the Evented backend due to a [zig stdlib bug](https://tangled.org/zzstoatzz.io/zlay/blob/main/scripts/fiber_gpf_issue.md) in `fiber.zig` context switching. use `ReleaseFast` for production. 45 41 46 42 ## configuration 47 43
+4 -4
build.zig.zon
··· 5 5 .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.23.tar.gz", 9 - .hash = "zat-0.3.0-alpha.23-5PuC7k1VCACPkCoMnIstIbVu1yIrVP5Yx3l0G-lZ2Qoa", 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.17.tar.gz", 9 + .hash = "zat-0.3.0-alpha.15-5PuC7nVhBQBnyDEw50Zwitd8ujG7mJumQlVtqYh08QML", 10 10 }, 11 11 .websocket = .{ 12 - .url = "https://github.com/zzstoatzz/websocket.zig/archive/3c6794a.tar.gz", 13 - .hash = "websocket-0.1.0-ZPISdYv9AwB5YM5SQKd7B9kRGNcQ4O8f68yfYsCMfU5h", 12 + .url = "https://github.com/zzstoatzz/websocket.zig/archive/9ac64da.tar.gz", 13 + .hash = "websocket-0.1.0-ZPISdebwAwAGqC8MRLe7MFedo_K_Yy2mjYVAch4Ya4hZ", 14 14 }, 15 15 .pg = .{ 16 16 .url = "git+https://github.com/zzstoatzz/pg.zig?ref=dev#5ce2355b1d851075523709c7d3068dcdb0224322",
-108
docs/zlay-gcloop-stall-2026-04-09.md
··· 1 - # zlay gcLoop stall — 2026-04-09 2 - 3 - ## tl;dr 4 - 5 - zlay pods were flapping on a ~10 minute cadence: ~10 min healthy, then /metrics 6 - and /_readyz stop responding, kubelet marks NotReady, pod eventually restarts, 7 - cycle repeats. Initial hypothesis was broadcaster writeLoop starvation 8 - (see [zlay-broadcaster-starvation-2026-04-09.md] — now superseded as primary 9 - cause). The actual cadence lines up precisely with `gcLoop` (main.zig), which 10 - became functional again on 2026-04-06 via commit 3dc21b9 ("fix gcLoop: silently 11 - exited after one tick"). Prior to that fix, gc was silently dead after one tick 12 - per pod, masking the underlying problem. 13 - 14 - ## causal chain (suspected) 15 - 16 - 1. `gcLoop` fires every 10 minutes (main.zig). 17 - 2. `dp.gc()` holds `DiskPersist.mutex` for its entire duration 18 - (event_log.zig:977-1033). That critical section includes: 19 - - `SELECT` of expired log_file_refs 20 - - per-file `DELETE FROM log_file_refs` + `unlink` 21 - - `gcBySize()` — another pass of queries + `stat` + `unlink` 22 - 3. `DiskPersist.persist()` (event_log.zig:864-899) takes the same mutex on the 23 - frame-worker hot path. For the duration of gc, every frame worker blocks on 24 - persist → the broadcast queue dries up → consumers see ~0 events/sec. This 25 - alone explains the "zlay only delivers 0.035 events/sec" symptom that was 26 - previously blamed on writeLoop polling. 27 - 4. After `dp.gc()` returns, `gcLoop` called `malloc_trim(0)`. The pod runs with 28 - `MALLOC_ARENA_MAX=4`, so glibc holds per-arena locks while walking free 29 - lists. On a ~1.5 GiB RSS process this can stall every allocator user for 30 - seconds. The Evented fiber serving /metrics and /_readyz would stall on its 31 - next malloc → kubelet liveness/readiness probes time out → pod marked 32 - NotReady → restart. 33 - 34 - Two separable suspects (important for isolation): 35 - - **`dp.gc()` mutex hold** freezes ingest via the persist hot path. 36 - - **`malloc_trim(0)`** freezes everything via arena locks. 37 - 38 - Either alone is sufficient to flunk probes. Don't conflate them when 39 - investigating. 40 - 41 - ## stabilization shipped 42 - 43 - Commit on top of 795cc41: 44 - 45 - - **Disabled `malloc_trim(0)`** in `gcLoop` (main.zig:502). Comment preserved 46 - so future maintainers know why. If RSS growth becomes an issue, prefer 47 - tuning `MALLOC_MMAP_THRESHOLD_` or running trim out-of-band. 48 - - **Bumped gc interval from 10 min → 1 hour** (main.zig:473). Reduces 49 - frequency and blast radius of the persist-mutex hold while the real fix 50 - (mutex narrowing) is prepared. Not a cure — a stall at hour boundaries is 51 - still possible if gc runs long. 52 - - **Added timing log** around `dp.gc()` using `clock_gettime(.MONOTONIC)` 53 - (plain thread — no Io available). Next incident will tell us how long 54 - `dp.gc()` actually runs on a production dataset. 55 - 56 - These changes are in main.zig only. No dependency or schema changes. Safe to 57 - revert by undoing the single commit. 58 - 59 - ## validation plan post-deploy 60 - 61 - After deploying on top of 795cc41: 62 - 63 - 1. Pod uptime should exceed 10 minutes with no NotReady flap. 64 - 2. Grep logs for `gc: dp.gc complete in` — verify gc runs and record its 65 - duration on production data. 66 - 3. `frames_broadcast_total` should climb at ingest rate (~300/sec), not 67 - the previously measured ~0.035/sec. 68 - 4. `tap run --relay-url https://zlay.waow.tech` for 60 seconds should 69 - deliver thousands of events, not single digits. 70 - 71 - If the pod still flaps after this change, the hypothesis is wrong or 72 - incomplete — do **not** proceed to the follow-up fixes until we re-diagnose. 73 - Most likely remaining suspect in that case is the `dp.gc()` mutex hold on 74 - a dataset big enough that even hourly gc takes long enough to trip probes. 75 - Mitigation in that case: temporarily comment out the `dp.gc()` call body to 76 - isolate. 77 - 78 - ## follow-up work (not in this change) 79 - 80 - 1. **Narrow `DiskPersist.mutex` scope in `gc()`**. The mutex protects 81 - `evtbuf`/`outbuf`/`cur_seq`/`current_file_path`/`flushLocked`. Nothing in 82 - gc's DB iteration or per-file unlink genuinely needs that lock. The only 83 - shared state is a read of `current_file_path` to skip the active file. 84 - Plan: do DB discovery and file discovery without the lock; acquire briefly 85 - only to re-check `current_file_path` against each candidate before unlink. 86 - Same treatment for `gcBySize()` and `takeDownUser()`. 87 - 2. **Broadcaster writeLoop polling** (broadcaster.zig:447-453). Real bug — 88 - `cond.signal` at line 413 is a no-op because writeLoop polls with 89 - `io.sleep(100ms)` instead of `cond.wait`. This caps per-consumer drain at 90 - ~10/sec even under zero contention. Fix: use `cond.wait`, schedule pings 91 - via a separate timer fiber or piggyback on next wakeup. 92 - **Do NOT** move writeLoop off Evented to pool_io — commit 6674812 documents 93 - that cross-Io path crashes via `Thread.current()` NULL deref. 94 - 3. **Consider whether `malloc_trim` should ever run on-process**. For a 95 - steady-state relay, the answer is probably no; mmap threshold tuning is a 96 - better choice. 97 - 98 - ## code pointers 99 - 100 - - main.zig:467-510 — `gcLoop` 101 - - event_log.zig:977-1033 — `DiskPersist.gc` 102 - - event_log.zig:1036-1099 — `DiskPersist.gcBySize` 103 - - event_log.zig:864-899 — `DiskPersist.persist` (hot path, same mutex) 104 - - broadcaster.zig:439-477 — `Consumer.writeLoop` (secondary bug, not fixed here) 105 - - commit 3dc21b9 — "fix gcLoop: silently exited after one tick" (the fix that 106 - unmasked this) 107 - - commit 6674812 — "fix SIGSEGV: plain threads calling Evented Io.Mutex" 108 - (cross-Io landmine, read before touching Consumer)
+1 -76
src/broadcaster.zig
··· 55 55 host_authority_is_new: std.atomic.Value(u64) = .{ .raw = 0 }, 56 56 host_authority_host_changed: std.atomic.Value(u64) = .{ .raw = 0 }, 57 57 host_authority_time_us: std.atomic.Value(u64) = .{ .raw = 0 }, 58 - // host_authority resolver pool mechanics (added 2026-04-09 per external 59 - // review). these expose pool contention and the slot-recovery code path 60 - // that previous reject-branch counters couldn't see. see relay 61 - // docs/zlay-external-review-2026-04-09.md. 62 - host_resolver_acquire_wait_us_total: std.atomic.Value(u64) = .{ .raw = 0 }, 63 - host_resolver_in_use: std.atomic.Value(u32) = .{ .raw = 0 }, 64 - host_resolver_resets_total: std.atomic.Value(u64) = .{ .raw = 0 }, 65 - host_resolver_resolve_fail_total: std.atomic.Value(u64) = .{ .raw = 0 }, 66 - // background DID resolveLoop ok/fail. previously these failures were 67 - // log.debug + continue with no observability — we treated the loop as 68 - // "working" without ever measuring it. baseline the rate so we can 69 - // tell if it's silently degraded. 70 - resolve_loop_resolve_ok_total: std.atomic.Value(u64) = .{ .raw = 0 }, 71 - resolve_loop_resolve_fail_total: std.atomic.Value(u64) = .{ .raw = 0 }, 72 - // per-branch reject breakdown (subsets of failed_host_authority). 73 - // added 2026-04-08 to diagnose the 100% host_authority failure rate — 74 - // without this breakdown we can't tell whether the DID doc lookup is 75 - // failing, the endpoint is unparseable, the host isn't in our table, 76 - // or the resolved host genuinely differs from the incoming host. 77 - host_authority_reject_parse_did: std.atomic.Value(u64) = .{ .raw = 0 }, 78 - host_authority_reject_resolve: std.atomic.Value(u64) = .{ .raw = 0 }, 79 - host_authority_reject_no_endpoint: std.atomic.Value(u64) = .{ .raw = 0 }, 80 - host_authority_reject_bad_url: std.atomic.Value(u64) = .{ .raw = 0 }, 81 - host_authority_reject_unknown_host: std.atomic.Value(u64) = .{ .raw = 0 }, 82 - host_authority_reject_host_mismatch: std.atomic.Value(u64) = .{ .raw = 0 }, 83 58 // frame pool memory pressure 84 59 pool_queued_bytes: std.atomic.Value(u64) = .{ .raw = 0 }, 85 60 // persist/broadcast pipeline contention ··· 378 353 const ping_timeout_ns: u64 = 5 * std.time.ns_per_s; 379 354 380 355 pub const Consumer = struct { 381 - // per-consumer ring buffer. sized to absorb transient write stalls 382 - // without kicking the consumer with ConsumerTooSlow. at steady-state 383 - // ~250 events/sec, 65536 entries = ~4.4 minutes of headroom. 384 - // previously 8192 (~33s) which was short enough that pulsar's 60-min 385 - // snapshot run accumulated repeated kicks (see ops_changelog 2026-04-01). 386 - const BUFFER_CAP = 65536; 356 + const BUFFER_CAP = 8192; 387 357 388 358 conn: *websocket.Conn, 389 359 allocator: Allocator, ··· 1079 1049 \\# HELP relay_broadcast_no_consumers_total frames skipped broadcast (no consumers) 1080 1050 \\relay_broadcast_no_consumers_total {d} 1081 1051 \\ 1082 - \\# TYPE relay_host_resolver_acquire_wait_us_total counter 1083 - \\# HELP relay_host_resolver_acquire_wait_us_total cumulative microseconds spent spinning in acquireHostResolver 1084 - \\relay_host_resolver_acquire_wait_us_total {d} 1085 - \\ 1086 - \\# TYPE relay_host_resolver_in_use gauge 1087 - \\# HELP relay_host_resolver_in_use host_authority resolver pool slots currently held by callers 1088 - \\relay_host_resolver_in_use {d} 1089 - \\ 1090 - \\# TYPE relay_host_resolver_resets_total counter 1091 - \\# HELP relay_host_resolver_resets_total slot deinit+reinit on first-attempt resolve failure (slot recovery path) 1092 - \\relay_host_resolver_resets_total {d} 1093 - \\ 1094 - \\# TYPE relay_host_resolver_resolve_fail_total counter 1095 - \\# HELP relay_host_resolver_resolve_fail_total first-attempt resolve failures in the host_authority pool (before recovery/retry) 1096 - \\relay_host_resolver_resolve_fail_total {d} 1097 - \\ 1098 - \\# TYPE relay_resolve_loop_resolve_ok_total counter 1099 - \\# HELP relay_resolve_loop_resolve_ok_total successful resolves in the background signing-key resolveLoop 1100 - \\relay_resolve_loop_resolve_ok_total {d} 1101 - \\ 1102 - \\# TYPE relay_resolve_loop_resolve_fail_total counter 1103 - \\# HELP relay_resolve_loop_resolve_fail_total failed resolves in the background signing-key resolveLoop 1104 - \\relay_resolve_loop_resolve_fail_total {d} 1105 - \\ 1106 1052 , .{ 1107 1053 stats.persist_order_spins.load(.acquire), 1108 1054 stats.broadcast_queue_push_lock_spins.load(.acquire), 1109 1055 stats.broadcast_queue_full.load(.acquire), 1110 1056 stats.broadcast_queue_depth_hwm.load(.acquire), 1111 1057 stats.broadcast_no_consumers.load(.acquire), 1112 - stats.host_resolver_acquire_wait_us_total.load(.acquire), 1113 - stats.host_resolver_in_use.load(.acquire), 1114 - stats.host_resolver_resets_total.load(.acquire), 1115 - stats.host_resolver_resolve_fail_total.load(.acquire), 1116 - stats.resolve_loop_resolve_ok_total.load(.acquire), 1117 - stats.resolve_loop_resolve_fail_total.load(.acquire), 1118 1058 }) catch {}; 1119 1059 1120 1060 // validation failure breakdown by reason ··· 1129 1069 \\relay_validation_failed{{reason="host_authority"}} {d} 1130 1070 \\relay_validation_failed{{reason="future_rev"}} {d} 1131 1071 \\ 1132 - \\# TYPE relay_host_authority_reject counter 1133 - \\# HELP relay_host_authority_reject host authority reject breakdown by branch 1134 - \\relay_host_authority_reject{{branch="parse_did"}} {d} 1135 - \\relay_host_authority_reject{{branch="resolve"}} {d} 1136 - \\relay_host_authority_reject{{branch="no_endpoint"}} {d} 1137 - \\relay_host_authority_reject{{branch="bad_url"}} {d} 1138 - \\relay_host_authority_reject{{branch="unknown_host"}} {d} 1139 - \\relay_host_authority_reject{{branch="host_mismatch"}} {d} 1140 - \\ 1141 1072 , .{ 1142 1073 stats.failed_bad_did.load(.acquire), 1143 1074 stats.failed_bad_rev.load(.acquire), ··· 1146 1077 stats.failed_bad_structure.load(.acquire), 1147 1078 stats.failed_host_authority.load(.acquire), 1148 1079 stats.failed_future_rev.load(.acquire), 1149 - stats.host_authority_reject_parse_did.load(.acquire), 1150 - stats.host_authority_reject_resolve.load(.acquire), 1151 - stats.host_authority_reject_no_endpoint.load(.acquire), 1152 - stats.host_authority_reject_bad_url.load(.acquire), 1153 - stats.host_authority_reject_unknown_host.load(.acquire), 1154 - stats.host_authority_reject_host_mismatch.load(.acquire), 1155 1080 }) catch return w.buffered(); 1156 1081 1157 1082 // memory attribution — internal capacities help identify what's consuming RSS
+3 -25
src/event_log.zig
··· 432 432 /// resolve a DID to a numeric UID. creates a new account row on first encounter. 433 433 /// matches indigo's Relay.DidToUid → Account.UID mapping. 434 434 pub fn uidForDid(self: *DiskPersist, did: []const u8) !u64 { 435 - // fast path: check in-memory cache. mark DB success even on hits — 436 - // a hot cache means the DB-derived data path is functioning, which 437 - // is what /_readyz cares about. without this, the 30s health window 438 - // depends entirely on cache misses + the 10-min GC tick, which can 439 - // gap during steady-state and trip k8s liveness probes. 440 - if (self.did_cache.get(did)) |uid| { 441 - self.markDbSuccess(); 442 - return uid; 443 - } 435 + // fast path: check in-memory cache 436 + if (self.did_cache.get(did)) |uid| return uid; 444 437 445 438 // check database 446 439 if (try self.db.rowUnsafe( ··· 622 615 last_seq: u64, 623 616 failed_attempts: u32, 624 617 account_limit: ?u64 = null, 625 - // computed by listActiveHosts at load time so cold-start spawn doesn't 626 - // need a per-host DbRequest round-trip. equals account_limit when set, 627 - // otherwise COUNT(account.uid) for this host. 628 - effective_account_count: u64 = 0, 629 618 }; 630 619 631 620 const HostResult = struct { id: u64, last_seq: u64 }; ··· 722 711 hosts.deinit(allocator); 723 712 } 724 713 725 - // batch the effective_account_count into the host load — same JOIN/COUNT 726 - // shape as getEffectiveAccountCountImpl but folded into one query so 727 - // spawnWorker doesn't need a per-host DbRequest round-trip during 728 - // cold-start. h.id is the primary key so other h.* columns are 729 - // functionally dependent for the GROUP BY. 730 714 var result = try db.query( 731 - "SELECT h.id, h.hostname, h.status, h.last_seq, h.failed_attempts, h.account_limit, " ++ 732 - "COALESCE(h.account_limit, COUNT(a.uid)) AS effective_account_count " ++ 733 - "FROM host h LEFT JOIN account a ON a.host_id = h.id " ++ 734 - "WHERE h.status = 'active' " ++ 735 - "GROUP BY h.id ORDER BY h.id ASC", 715 + "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC", 736 716 .{}, 737 717 ); 738 718 defer result.deinit(); 739 719 740 720 while (result.nextUnsafe() catch null) |row| { 741 - const eff_count_i64 = row.get(i64, 6); 742 721 try hosts.append(allocator, .{ 743 722 .id = @intCast(row.get(i64, 0)), 744 723 .hostname = try allocator.dupe(u8, row.get([]const u8, 1)), ··· 746 725 .last_seq = @intCast(row.get(i64, 3)), 747 726 .failed_attempts = @intCast(row.get(i32, 4)), 748 727 .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null, 749 - .effective_account_count = if (eff_count_i64 > 0) @intCast(eff_count_i64) else 0, 750 728 }); 751 729 } 752 730
+1 -2
src/frame_worker.zig
··· 27 27 pub const FrameWork = struct { 28 28 data: []u8, // raw frame bytes (heap-duped by reader, freed by worker) 29 29 host_id: u64, 30 - hostname: []const u8, // owned (heap-duped at submit, freed by worker) 30 + hostname: []const u8, // borrowed from subscriber (stable lifetime) 31 31 allocator: Allocator, 32 32 io: Io, 33 33 // shared references (all thread-safe, all outlive the work item) ··· 41 41 pub fn processFrame(work: *FrameWork) void { 42 42 _ = work.bc.stats.pool_queued_bytes.fetchSub(work.data.len, .monotonic); 43 43 defer work.allocator.free(work.data); 44 - defer work.allocator.free(work.hostname); 45 44 46 45 var arena = std.heap.ArenaAllocator.init(work.allocator); 47 46 defer arena.deinit();
+14 -37
src/main.zig
··· 351 351 // start GC loop on a plain thread — dp.gc() uses pool_io (Threaded) mutex 352 352 // and pg.Pool. MUST NOT run as Evented fiber: Threaded futex on Evented 353 353 // fiber dereferences NULL Thread.current() threadlocal → heap corruption. 354 - // sleeps via std.Thread.sleep (NOT io.sleep) — io.sleep on pool_io from 355 - // a non-Io thread fails on the second tick and silently kills the loop. 356 - const gc_thread = std.Thread.spawn(.{}, gcLoop, .{&dp}) catch |err| { 354 + const gc_thread = std.Thread.spawn(.{}, gcLoop, .{ &dp, pool_io }) catch |err| { 357 355 log.err("failed to start GC thread: {s}", .{@errorName(err)}); 358 356 return err; 359 357 }; ··· 464 462 server.runIo(listener, bc); 465 463 } 466 464 467 - fn gcLoop(dp: *event_log_mod.DiskPersist) void { 468 - // gc cadence: 1 hour (was 10 min before 2026-04-09 incident). DiskPersist.gc() 469 - // currently holds DiskPersist.mutex for its entire duration — covering DB 470 - // iteration and per-file unlinks — which blocks persist() on every frame 471 - // worker. until the mutex scope is narrowed (follow-up), run hourly to 472 - // bound blast radius. see docs/zlay-gcloop-stall-2026-04-09.md. 473 - const gc_interval_s: u64 = 60 * 60; // 1 hour 465 + fn gcLoop(dp: *event_log_mod.DiskPersist, io: Io) void { 466 + const gc_interval: u64 = 10 * 60; // 10 minutes in seconds 474 467 while (!shutdown_flag.load(.acquire)) { 475 - // sleep in 1s ticks so shutdown is checked frequently. uses 476 - // std.c.nanosleep directly — this is a plain OS thread, so calling 477 - // io.sleep on pool_io would fail on the second tick and silently 478 - // exit the loop via `catch return`. zig 0.16 has no std.Thread.sleep 479 - // and std.posix.nanosleep was removed during the Io migration. 480 - var elapsed: u64 = 0; 481 - while (elapsed < gc_interval_s and !shutdown_flag.load(.acquire)) { 482 - const ts: std.c.timespec = .{ .sec = 1, .nsec = 0 }; 483 - _ = std.c.nanosleep(&ts, null); 484 - elapsed += 1; 468 + // sleep in small increments to check shutdown 469 + var remaining: u64 = gc_interval; 470 + while (remaining > 0 and !shutdown_flag.load(.acquire)) { 471 + const chunk = @min(remaining, 1); 472 + io.sleep(Io.Duration.fromSeconds(@intCast(chunk)), .awake) catch return; 473 + remaining -= chunk; 485 474 } 486 475 if (shutdown_flag.load(.acquire)) return; 487 476 488 - // time the gc call so the next incident log tells us whether gc itself 489 - // or something around it is the stall. plain-thread context — use 490 - // clock_gettime directly rather than Io.Timestamp. 491 - var ts_start: std.c.timespec = undefined; 492 - _ = std.c.clock_gettime(.MONOTONIC, &ts_start); 493 477 dp.gc() catch |err| { 494 478 log.warn("event log GC failed: {s}", .{@errorName(err)}); 495 479 }; 496 - var ts_end: std.c.timespec = undefined; 497 - _ = std.c.clock_gettime(.MONOTONIC, &ts_end); 498 - const elapsed_ns: i64 = (@as(i64, ts_end.sec) - @as(i64, ts_start.sec)) * std.time.ns_per_s + 499 - (@as(i64, ts_end.nsec) - @as(i64, ts_start.nsec)); 500 - log.info("gc: dp.gc complete in {d}ms", .{@divTrunc(elapsed_ns, std.time.ns_per_ms)}); 501 480 502 - // NOTE: malloc_trim(0) disabled 2026-04-09. with MALLOC_ARENA_MAX=4 it 503 - // walks free lists holding per-arena locks, which on a ~1.5 GiB RSS 504 - // process can stall every allocator user (including the Evented http 505 - // fiber serving /metrics and /_readyz) long enough to flunk liveness 506 - // probes. if memory reclamation becomes an issue, prefer tuning 507 - // MALLOC_MMAP_THRESHOLD_ or calling trim from a dedicated maintenance 508 - // window rather than on a hot process. 481 + // return freed pages to OS (glibc-specific, no-op on other platforms) 482 + if (comptime malloc_trim) |trim| { 483 + _ = trim(0); 484 + log.info("gc: malloc_trim complete", .{}); 485 + } 509 486 } 510 487 } 511 488
+24 -28
src/slurper.zig
··· 526 526 db_queue.push(&reset_req.base); 527 527 reset_req.base.wait(self.io, self.shutdown); 528 528 529 - // phase 5: fetch effective account count (single DbRequest, addHost is 530 - // a rare one-off path — not the cold-start hot loop). cold-start uses 531 - // listActiveHosts which preloads this in the batch query. 532 - var account_count: u64 = 0; 533 - const GetCountReq = struct { 534 - base: event_log_mod.DbRequest = .{ .callback = &execute }, 535 - hid: u64, 536 - count: u64 = 0, 537 - 538 - fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 539 - const s: *@This() = @fieldParentPtr("base", b); 540 - s.count = dp.getEffectiveAccountCount(s.hid); 541 - } 542 - }; 543 - var count_req: GetCountReq = .{ .hid = db_req.host_id }; 544 - db_queue.push(&count_req.base); 545 - count_req.base.wait(self.io, self.shutdown); 546 - account_count = count_req.count; 547 - 548 - // phase 6: spawn worker (Evented) 549 - try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq, account_count); 529 + // phase 5: spawn worker (Evented) 530 + try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq); 550 531 log.info("added host {s} (id={d})", .{ hostname, db_req.host_id }); 551 532 } 552 533 553 - /// spawn a subscriber thread for a host. callers must pass the 554 - /// effective_account_count up front — see comment on the addHost path 555 - /// below for the one site that still computes it inline. 556 - fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64, account_count: u64) !void { 534 + /// spawn a subscriber thread for a host 535 + fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64) !void { 557 536 const hostname_duped = try self.allocator.dupe(u8, hostname); 558 537 errdefer self.allocator.free(hostname_duped); 559 538 560 539 const sub = try self.allocator.create(subscriber_mod.Subscriber); 561 540 errdefer self.allocator.destroy(sub); 541 + 542 + // get effective account count via DbRequestQueue 543 + var account_count: u64 = 0; 544 + if (self.db_queue) |db_queue| { 545 + const GetCountReq = struct { 546 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 547 + hid: u64, 548 + count: u64 = 0, 549 + 550 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 551 + const s: *@This() = @fieldParentPtr("base", b); 552 + s.count = dp.getEffectiveAccountCount(s.hid); 553 + } 554 + }; 555 + var count_req: GetCountReq = .{ .hid = host_id }; 556 + db_queue.push(&count_req.base); 557 + count_req.base.wait(self.io, self.shutdown); 558 + account_count = count_req.count; 559 + } 562 560 563 561 sub.* = subscriber_mod.Subscriber.init( 564 562 self.allocator, ··· 672 670 var spawned: usize = 0; 673 671 for (hosts) |host| { 674 672 if (self.shutdown.load(.acquire)) break; 675 - // host.effective_account_count was preloaded by listActiveHostsImpl 676 - // — no per-host DbRequest round-trip during cold start. 677 - self.spawnWorker(host.id, host.hostname, host.last_seq, host.effective_account_count) catch |err| { 673 + self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 678 674 log.warn("failed to spawn worker for {s}: {s}", .{ host.hostname, @errorName(err) }); 679 675 }; 680 676 spawned += 1;
+15 -95
src/subscriber.zig
··· 256 256 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); 257 257 } 258 258 259 - /// build a FrameWork that owns its data + hostname, decoupled from the 260 - /// subscriber lifetime. returns null on OOM. 261 - /// 262 - /// UAF-safe contract: after this returns, the caller (or the slurper) 263 - /// may free `self.options.hostname` and the input `data` immediately 264 - /// without affecting the work item. the worker will free both through 265 - /// `work.allocator` when it finishes processing. 266 - /// 267 - /// see `test "prepareFrameWork dupes hostname and data (UAF regression)"` 268 - fn prepareFrameWork(self: *Subscriber, data: []const u8) ?frame_worker_mod.FrameWork { 269 - const duped = self.allocator.dupe(u8, data) catch return null; 270 - const hostname_dup = self.allocator.dupe(u8, self.options.hostname) catch { 271 - self.allocator.free(duped); 272 - return null; 273 - }; 274 - return .{ 275 - .data = duped, 276 - .host_id = self.options.host_id, 277 - .hostname = hostname_dup, 278 - .allocator = self.allocator, 279 - .io = self.pool_io orelse self.io, 280 - .bc = self.bc, 281 - .validator = self.validator, 282 - .persist = self.persist, 283 - .collection_index = self.collection_index, 284 - .resyncer = self.resyncer, 285 - }; 286 - } 287 - 288 259 /// run the subscriber loop. reconnects with exponential backoff. 289 260 /// blocks until shutdown flag is set or host is exhausted. 290 261 pub fn run(self: *Subscriber) void { ··· 506 477 const d = payload.getString("repo") orelse payload.getString("did"); 507 478 break :blk if (d) |s| std.hash.Wyhash.hash(0, s) else sub.options.host_id; 508 479 }; 509 - // dupe data + hostname per-frame: subscriber teardown (slurper.runWorker) 510 - // frees sub.options.hostname after sub.run() returns, but FrameWorks 511 - // can still be queued in the pool. borrowing the slice would be a 512 - // use-after-free (corrupt hostnames in chain-break logs, etc.). 513 - const work = sub.prepareFrameWork(data) orelse return; 480 + const duped = sub.allocator.dupe(u8, data) catch return; 514 481 const t0 = nanoTimestamp(io); 515 - if (pool.submit(did_key, work, sub.shutdown)) { 482 + if (pool.submit(did_key, .{ 483 + .data = duped, 484 + .host_id = sub.options.host_id, 485 + .hostname = sub.options.hostname, 486 + .allocator = sub.allocator, 487 + .io = sub.pool_io orelse sub.io, // pool_io (Threaded) for worker-safe ops 488 + .bc = sub.bc, 489 + .validator = sub.validator, 490 + .persist = sub.persist, 491 + .collection_index = sub.collection_index, 492 + .resyncer = sub.resyncer, 493 + }, sub.shutdown)) { 516 494 // pool accepted — advance cursor past this frame 517 - _ = sub.bc.stats.pool_queued_bytes.fetchAdd(work.data.len, .monotonic); 495 + _ = sub.bc.stats.pool_queued_bytes.fetchAdd(duped.len, .monotonic); 518 496 if (upstream_seq) |s| sub.last_upstream_seq = s; 519 497 if (nanoTimestamp(io) - t0 > 1_000_000) { // >1ms = had to wait 520 498 _ = sub.bc.stats.pool_backpressure.fetchAdd(1, .monotonic); 521 499 } 522 500 } else { 523 501 // shutdown requested — don't advance cursor so reconnect replays this frame 524 - sub.allocator.free(work.data); 525 - sub.allocator.free(work.hostname); 502 + sub.allocator.free(duped); 526 503 } 527 504 return; 528 505 } ··· 792 769 }; 793 770 794 771 // --- tests --- 795 - 796 - test "prepareFrameWork dupes hostname and data (UAF regression)" { 797 - // regression test for UAF: slurper.runWorker frees sub.options.hostname 798 - // after sub.run() returns, but FrameWorks can still be queued in the 799 - // frame pool with that hostname slice. prepareFrameWork must heap-dupe 800 - // both `data` and `hostname` so the work item is independent of the 801 - // subscriber's lifetime. 802 - // 803 - // the test simulates subscriber teardown by freeing the source hostname 804 - // after the FrameWork is built, then asserts the work item is still 805 - // intact (would trip the testing allocator's use-after-free detection 806 - // if the dupe was skipped). 807 - const alloc = std.testing.allocator; 808 - 809 - const orig_hostname = try alloc.dupe(u8, "example.pds.host"); 810 - const data_input = try alloc.dupe(u8, "raw frame bytes"); 811 - defer alloc.free(data_input); 812 - 813 - var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 814 - var sub: Subscriber = .{ 815 - .allocator = alloc, 816 - .io = std.testing.io, 817 - .options = .{ .hostname = orig_hostname, .host_id = 42 }, 818 - .bc = undefined, // not dereferenced by prepareFrameWork 819 - .validator = undefined, 820 - .persist = null, 821 - .collection_index = null, 822 - .resyncer = null, 823 - .pool = null, 824 - .pool_io = null, 825 - .shutdown = &shutdown, 826 - }; 827 - 828 - const work = sub.prepareFrameWork(data_input) orelse return error.OutOfMemory; 829 - defer alloc.free(work.data); 830 - defer alloc.free(work.hostname); 831 - 832 - // content is correct 833 - try std.testing.expectEqualStrings("example.pds.host", work.hostname); 834 - try std.testing.expectEqualStrings("raw frame bytes", work.data); 835 - 836 - // pointers must be distinct from caller's buffers — this is the core 837 - // UAF invariant. if the dupe was elided, these would alias. 838 - try std.testing.expect(work.hostname.ptr != orig_hostname.ptr); 839 - try std.testing.expect(work.data.ptr != data_input.ptr); 840 - 841 - // scalar fields propagate 842 - try std.testing.expectEqual(@as(u64, 42), work.host_id); 843 - 844 - // simulate subscriber teardown (slurper.runWorker frees hostname) 845 - alloc.free(orig_hostname); 846 - 847 - // work item must still be readable and correct — if hostname was 848 - // borrowed instead of duped, the testing allocator would catch the 849 - // read-after-free above when expectEqualStrings is called again here. 850 - try std.testing.expectEqualStrings("example.pds.host", work.hostname); 851 - } 852 772 853 773 test "decode frame via SDK and extract fields" { 854 774 const cbor = zat.cbor;
+29 -145
src/validator.zig
··· 61 61 io: Io, 62 62 // pool of reusable resolvers for inline host authority checks. 63 63 // frame workers acquire/release via atomic flag to avoid creating 64 - // a fresh resolver (and fresh TLS handshake) per call. heap-allocated 65 - // in start() so the pool size can be tuned via HOST_RESOLVER_POOL_SIZE 66 - // env var without recompiling. with keep_alive=false, pool width is a 67 - // real startup throughput knob. 68 - host_resolvers: []zat.DidResolver = &.{}, 69 - host_resolver_available: []std.atomic.Value(bool) = &.{}, 64 + // a fresh resolver (and fresh TLS handshake) per call. 65 + host_resolvers: [host_resolver_pool_size]zat.DidResolver = undefined, 66 + host_resolver_available: [host_resolver_pool_size]std.atomic.Value(bool) = .{std.atomic.Value(bool){ .raw = false }} ** host_resolver_pool_size, 70 67 host_resolver_inited: bool = false, 71 68 72 69 const max_resolver_threads = 8; 73 70 const default_resolver_threads = 4; 74 71 const max_queue_size: usize = 100_000; 75 - const default_host_resolver_pool_size: usize = 4; 76 - const max_host_resolver_pool_size: usize = 64; 72 + const host_resolver_pool_size: usize = 4; 77 73 78 74 pub fn init(allocator: Allocator, stats: *broadcaster.Stats, io: Io) Validator { 79 75 return initWithConfig(allocator, stats, .{}, io); ··· 100 96 } 101 97 102 98 if (self.host_resolver_inited) { 103 - for (self.host_resolvers) |*r| { 99 + for (&self.host_resolvers) |*r| { 104 100 r.deinit(); 105 101 } 106 - self.allocator.free(self.host_resolvers); 107 - self.allocator.free(self.host_resolver_available); 108 - self.host_resolvers = &.{}; 109 - self.host_resolver_available = &.{}; 110 102 self.host_resolver_inited = false; 111 103 } 112 104 ··· 130 122 slot.* = try self.io.concurrent(resolveLoop, .{self}); 131 123 } 132 124 133 - // init host authority resolver pool (reused across calls). 134 - // 135 - // keep_alive = false: workaround for ~99% rejection rate observed 136 - // 2026-04-08. root cause not yet known — local repro couldn't 137 - // reproduce the failure, the leading hypothesis is that pool slots 138 - // get poisoned by a transient network condition and never recover 139 - // because there's no slot-recovery path. slot recovery added in 140 - // resolveHostAuthority below; keep_alive can flip back to true via 141 - // canary once resolve_loop_resolve_fail and the new sampled warn 142 - // log give us the actual underlying error kind from zat 143 - // v0.3.0-alpha.23. see relay docs/zlay-external-review-2026-04-09.md. 144 - // 145 - // pool size is HOST_RESOLVER_POOL_SIZE (default 4, max 64). with 146 - // keep_alive=false, every check is a fresh TLS handshake (~tens of 147 - // ms), so pool width is a real startup throughput knob — bumping 148 - // it lets more is_new checks run concurrently during cold-start 149 - // reconnect storms. 150 - const requested_size = parseEnvInt(usize, "HOST_RESOLVER_POOL_SIZE", default_host_resolver_pool_size); 151 - const pool_size = @min(requested_size, max_host_resolver_pool_size); 152 - 153 - self.host_resolvers = try self.allocator.alloc(zat.DidResolver, pool_size); 154 - errdefer self.allocator.free(self.host_resolvers); 155 - self.host_resolver_available = try self.allocator.alloc(std.atomic.Value(bool), pool_size); 156 - errdefer self.allocator.free(self.host_resolver_available); 157 - 158 - for (self.host_resolvers) |*r| { 159 - r.* = zat.DidResolver.initWithOptions(self.io, self.allocator, .{ .keep_alive = false }); 125 + // init host authority resolver pool (reused across calls) 126 + for (&self.host_resolvers) |*r| { 127 + r.* = zat.DidResolver.initWithOptions(self.io, self.allocator, .{}); 160 128 } 161 - for (self.host_resolver_available) |*a| { 162 - a.* = .{ .raw = true }; 129 + for (&self.host_resolver_available) |*a| { 130 + a.store(true, .release); 163 131 } 164 132 self.host_resolver_inited = true; 165 - 166 - log.info("host_authority resolver pool: size={d} keep_alive=false", .{pool_size}); 167 133 } 168 134 169 135 /// validate a #sync frame: signature verification only (no ops, no MST). ··· 491 457 // resolve DID → signing key 492 458 const parsed = zat.Did.parse(d) orelse continue; 493 459 var doc = resolver.resolve(parsed) catch |err| { 494 - _ = self.stats.resolve_loop_resolve_fail_total.fetchAdd(1, .monotonic); 495 460 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 496 461 continue; 497 462 }; 498 463 defer doc.deinit(); 499 - _ = self.stats.resolve_loop_resolve_ok_total.fetchAdd(1, .monotonic); 500 464 501 465 // extract and decode signing key 502 466 const vm = doc.signingKey() orelse continue; ··· 575 539 576 540 /// synchronous host authority check. called on first-seen DIDs (is_new) 577 541 /// and host migrations (host_changed). resolves the DID doc to verify the 578 - /// PDS endpoint matches the incoming host. 542 + /// PDS endpoint matches the incoming host. retries once on failure to 543 + /// handle transient network errors. 579 544 /// 580 545 /// uses a pooled resolver to avoid creating a fresh resolver (and fresh 581 546 /// TLS handshake) per call. blocks briefly if all pool slots are in use. 582 547 /// 583 - /// on resolve failure, the slot is destroyed and re-initialized before 584 - /// the retry, so any state corruption (poisoned http client, half-closed 585 - /// connection, etc.) doesn't persist across calls. this is the leading 586 - /// hypothesis for the 2026-04-08 ~99% rejection rate — pool slots had 587 - /// no recovery path. see relay docs/zlay-external-review-2026-04-09.md. 588 - /// 589 548 /// returns: 590 549 /// .accept — should not happen (caller should only call on new/mismatch) 591 550 /// .migrate — DID doc confirms this host, caller should update host_id 592 551 /// .reject — DID doc does not confirm, caller should drop the event 593 552 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { 594 553 const persist = self.persist orelse return .migrate; // no DB — can't check 595 - const parsed = zat.Did.parse(did) orelse { 596 - _ = self.stats.host_authority_reject_parse_did.fetchAdd(1, .monotonic); 597 - self.sampleLogReject("parse_did", did, "", incoming_host_id, 0); 598 - return .reject; 599 - }; 554 + const parsed = zat.Did.parse(did) orelse return .reject; 600 555 601 556 const idx = self.acquireHostResolver(); 602 557 defer self.releaseHostResolver(idx); 603 558 604 - // first attempt on the existing pool slot. the first-attempt error is 605 - // not captured: if state corruption was the cause, the kind from the 606 - // fresh-slot retry below is what we want to log. 607 - if (self.host_resolvers[idx].resolve(parsed)) |doc_first| { 608 - var d = doc_first; 609 - defer d.deinit(); 610 - return self.checkPdsHost(&d, persist, did, incoming_host_id); 611 - } else |_| { 612 - // first-attempt failure: count it (steady-state signal independent 613 - // of recovery success), then destroy + re-init the slot before the 614 - // retry so any internal state corruption doesn't persist. 615 - _ = self.stats.host_resolver_resolve_fail_total.fetchAdd(1, .monotonic); 616 - self.recycleHostResolver(idx); 617 - _ = self.stats.host_resolver_resets_total.fetchAdd(1, .monotonic); 559 + var resolver = &self.host_resolvers[idx]; 618 560 619 - if (self.host_resolvers[idx].resolve(parsed)) |doc_retry| { 620 - var d = doc_retry; 621 - defer d.deinit(); 622 - return self.checkPdsHost(&d, persist, did, incoming_host_id); 623 - } else |err| { 624 - _ = self.stats.host_authority_reject_resolve.fetchAdd(1, .monotonic); 625 - self.sampleLogReject("resolve", did, @errorName(err), incoming_host_id, 0); 626 - return .reject; 627 - } 628 - } 629 - } 630 - 631 - /// destroy and re-initialize a pool slot in place. caller must hold the 632 - /// slot via acquireHostResolver — concurrent access is not safe. used by 633 - /// the slot-recovery path on resolve failure. if the re-init alloc fails, 634 - /// the slot is left in a degraded state and the next caller will see the 635 - /// failure naturally; we don't crash on OOM here. 636 - fn recycleHostResolver(self: *Validator, idx: usize) void { 637 - self.host_resolvers[idx].deinit(); 638 - self.host_resolvers[idx] = zat.DidResolver.initWithOptions( 639 - self.io, 640 - self.allocator, 641 - .{ .keep_alive = false }, 642 - ); 561 + // first resolve attempt 562 + var doc = resolver.resolve(parsed) catch { 563 + // retry once on network failure 564 + var doc2 = resolver.resolve(parsed) catch return .reject; 565 + defer doc2.deinit(); 566 + return self.checkPdsHost(&doc2, persist, incoming_host_id); 567 + }; 568 + defer doc.deinit(); 569 + return self.checkPdsHost(&doc, persist, incoming_host_id); 643 570 } 644 571 645 572 /// acquire a resolver from the pool. spins until one is available. 646 - /// records cumulative wait time + in_use gauge for diagnosing pool 647 - /// contention. matches the codebase convention of timing via Io.Timestamp 648 - /// (see frame_worker.zig microTimestamp). 649 573 fn acquireHostResolver(self: *Validator) usize { 650 - const start_us = Io.Timestamp.now(self.io, .real).toMicroseconds(); 651 574 while (self.alive.load(.acquire)) { 652 - for (0..self.host_resolvers.len) |i| { 575 + for (0..host_resolver_pool_size) |i| { 653 576 if (self.host_resolver_available[i].cmpxchgStrong(true, false, .acquire, .monotonic) == null) { 654 - const now_us = Io.Timestamp.now(self.io, .real).toMicroseconds(); 655 - const elapsed_us: u64 = @intCast(@max(0, now_us - start_us)); 656 - _ = self.stats.host_resolver_acquire_wait_us_total.fetchAdd(elapsed_us, .monotonic); 657 - _ = self.stats.host_resolver_in_use.fetchAdd(1, .monotonic); 658 577 return i; 659 578 } 660 579 } ··· 664 583 } 665 584 666 585 fn releaseHostResolver(self: *Validator, idx: usize) void { 667 - _ = self.stats.host_resolver_in_use.fetchSub(1, .monotonic); 668 586 self.host_resolver_available[idx].store(true, .release); 669 587 } 670 588 671 - fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, did: []const u8, incoming_host_id: u64) HostAuthority { 672 - const pds_endpoint = doc.pdsEndpoint() orelse { 673 - _ = self.stats.host_authority_reject_no_endpoint.fetchAdd(1, .monotonic); 674 - self.sampleLogReject("no_endpoint", did, "", incoming_host_id, 0); 675 - return .reject; 676 - }; 677 - const pds_host = extractHostFromUrl(pds_endpoint) orelse { 678 - _ = self.stats.host_authority_reject_bad_url.fetchAdd(1, .monotonic); 679 - self.sampleLogReject("bad_url", did, pds_endpoint, incoming_host_id, 0); 680 - return .reject; 681 - }; 682 - const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse { 683 - _ = self.stats.host_authority_reject_unknown_host.fetchAdd(1, .monotonic); 684 - self.sampleLogReject("unknown_host", did, pds_host, incoming_host_id, 0); 685 - return .reject; 686 - }; 589 + fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority { 590 + _ = self; 591 + const pds_endpoint = doc.pdsEndpoint() orelse return .reject; 592 + const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject; 593 + const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject; 687 594 if (pds_host_id == incoming_host_id) return .migrate; 688 - _ = self.stats.host_authority_reject_host_mismatch.fetchAdd(1, .monotonic); 689 - self.sampleLogReject("host_mismatch", did, pds_host, incoming_host_id, pds_host_id); 690 595 return .reject; 691 - } 692 - 693 - /// log a rejection sample at 1-in-N rate to avoid drowning the log at 694 - /// ~10 rejections/sec. total rejections per branch are available via 695 - /// relay_host_authority_reject{branch=...} in prometheus. 696 - fn sampleLogReject( 697 - self: *Validator, 698 - branch: []const u8, 699 - did: []const u8, 700 - detail: []const u8, 701 - incoming_host_id: u64, 702 - resolved_host_id: u64, 703 - ) void { 704 - const count = self.stats.failed_host_authority.load(.monotonic); 705 - // sample 1 in 2048 — at 10/s that's one log line every ~3.5min. 706 - // parens mandatory: `&` and `!=` have surprising precedence in zig. 707 - if ((count & 0x7ff) != 0) return; 708 - log.warn( 709 - "host_authority reject branch={s} did={s} detail={s} incoming_host_id={d} resolved_host_id={d}", 710 - .{ branch, did, detail, incoming_host_id, resolved_host_id }, 711 - ); 712 596 } 713 597 }; 714 598