atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

v0.3.0: docs pass + devlog 008

update README examples for 0.16 Io parameter, add subscribe(handler)
pattern for streaming clients, add CHANGELOG 0.3.0 entry, devlog 008
covering the full migration saga.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+268 -21
+9
CHANGELOG.md
··· 1 1 # changelog 2 2 3 + ## 0.3.0 4 + 5 + - **breaking**: zig 0.16 — all networking APIs take `io: std.Io` as first parameter 6 + - **breaking**: streaming clients use `subscribe(handler)` pattern instead of `connect()` + `next()` loop 7 + - **breaking**: websocket.zig bumped — Io-native server accept loop, client write lock, TLS stream support 8 + - **feat**: `Io.Timestamp` replaces libc `gettimeofday` in JWT/OAuth 9 + - **feat**: `io.sleep()` replaces libc `nanosleep` in reconnect backoff (cancellation-aware) 10 + - **docs**: [devlog 008](devlog/008-the-io-migration.md) — the 0.16 migration 11 + 3 12 ## 0.2.18 4 13 5 14 - **feat**: export `HttpTransport` from root module — consumers can now use `zat.HttpTransport` for direct HTTP access without going through `XrpcClient`
+29 -21
README.md
··· 13 13 14 14 ## install 15 15 16 + requires zig 0.16+. 17 + 16 18 ```bash 17 19 zig fetch --save https://tangled.sh/zat.dev/zat/archive/main 18 20 ``` ··· 53 55 54 56 ```zig 55 57 // handle → DID 56 - var handle_resolver = zat.HandleResolver.init(allocator); 58 + var handle_resolver = zat.HandleResolver.init(io, allocator); 57 59 defer handle_resolver.deinit(); 58 60 const did = try handle_resolver.resolve(zat.Handle.parse("bsky.app").?); 59 61 defer allocator.free(did); 60 62 61 63 // DID → document 62 - var did_resolver = zat.DidResolver.init(allocator); 64 + var did_resolver = zat.DidResolver.init(io, allocator); 63 65 defer did_resolver.deinit(); 64 66 var doc = try did_resolver.resolve(zat.Did.parse("did:plc:z72i7hdynmk6r22z27h6tvur").?); 65 67 defer doc.deinit(); ··· 164 166 <summary><strong>repo verification</strong> - full AT Protocol trust chain</summary> 165 167 166 168 ```zig 167 - const result = try zat.verifyRepo(allocator, "pfrazee.com"); 169 + const result = try zat.verifyRepo(io, allocator, "pfrazee.com"); 168 170 defer result.deinit(); 169 171 170 172 // result.did, result.signing_key, result.pds_endpoint ··· 181 183 <summary><strong>firehose client</strong> - raw CBOR event stream from relay</summary> 182 184 183 185 ```zig 184 - var client = zat.FirehoseClient.init(allocator, .{}); 186 + var client = zat.FirehoseClient.init(io, allocator, .{}); 185 187 defer client.deinit(); 186 188 187 - try client.connect(); 188 - while (try client.next()) |event| { 189 - switch (event.header.type) { 190 - .commit => { 191 - const car_data = try zat.car.read(allocator, event.body.blocks); 192 - // process blocks... 193 - }, 194 - else => {}, 189 + const Handler = struct { 190 + pub fn onEvent(_: *@This(), event: zat.FirehoseClient.Event) void { 191 + switch (event.header.type) { 192 + .commit => { 193 + // event.body.blocks, event.body.ops, ... 194 + }, 195 + else => {}, 196 + } 195 197 } 196 - } 198 + }; 199 + var handler: Handler = .{}; 200 + try client.subscribe(&handler); 197 201 ``` 198 202 199 203 connects to `com.atproto.sync.subscribeRepos` via WebSocket. decodes binary CBOR frames into typed events. round-robin host rotation with backoff. ··· 204 208 <summary><strong>jetstream client</strong> - typed JSON event stream</summary> 205 209 206 210 ```zig 207 - var client = zat.JetstreamClient.init(allocator, .{ 211 + var client = zat.JetstreamClient.init(io, allocator, .{ 208 212 .wanted_collections = &.{"app.bsky.feed.post"}, 209 213 }); 210 214 defer client.deinit(); 211 215 212 - try client.connect(); 213 - while (try client.next()) |event| { 214 - if (event.commit) |commit| { 215 - const record = commit.record; 216 - // process... 216 + const Handler = struct { 217 + pub fn onEvent(_: *@This(), event: zat.JetstreamClient.Event) void { 218 + if (event.commit) |commit| { 219 + const record = commit.record; 220 + // process... 221 + _ = record; 222 + } 217 223 } 218 - } 224 + }; 225 + var handler: Handler = .{}; 226 + try client.subscribe(&handler); 219 227 ``` 220 228 221 229 connects to jetstream (bluesky's JSON event stream). typed events, automatic reconnection with cursor tracking, round-robin across community relays. ··· 226 234 <summary><strong>xrpc client</strong> - call AT Protocol endpoints</summary> 227 235 228 236 ```zig 229 - var client = zat.XrpcClient.init(allocator, "https://bsky.social"); 237 + var client = zat.XrpcClient.init(io, allocator, "https://bsky.social"); 230 238 defer client.deinit(); 231 239 232 240 const nsid = zat.Nsid.parse("app.bsky.actor.getProfile").?;
+229
devlog/008-the-io-migration.md
··· 1 + # the io migration 2 + 3 + zig 0.16 replaced the networking and concurrency primitives. `std.net`, `std.Thread.Pool`, `std.Thread.Mutex` — all gone, replaced by `std.Io`. this is the story of migrating zat and zlay to the new system, the eight crashes that followed, and the rule we discovered that isn't documented anywhere. 4 + 5 + ## what changed in 0.16 6 + 7 + `std.Io` is a backend-agnostic interface for all I/O and concurrency. two backends: 8 + 9 + - **Threaded** — always available. `io.concurrent()` spawns OS threads. 10 + - **Evented** — fiber-based. io_uring on linux, GCD on macOS, kqueue on BSD. `io.concurrent()` creates cheap userspace coroutines. 11 + 12 + same code runs on both. you write against `Io`, pick the backend at init, and the scheduler does the rest. `io.async()` for CPU work (bounded pool, overflow runs inline). `io.concurrent()` for I/O tasks (unbounded under Threaded, fibers under Evented). `io.sleep()` is cancellation-aware. `Io.Mutex` integrates with the scheduler's futex. 13 + 14 + the promise: write once, switch backends, get threads or fibers for free. 15 + 16 + the catch: the scheduler integration means `Io.Mutex`, `Io.Condition`, and `io.sleep()` are not just synchronization primitives — they're scheduler entry points. call them from the wrong execution context and the scheduler dereferences state that doesn't exist. 17 + 18 + ## the library migration 19 + 20 + zat's migration was straightforward. every networking type gained `io: std.Io` as its first init parameter: 21 + 22 + ```zig 23 + // 0.15 24 + var resolver = zat.HandleResolver.init(allocator); 25 + var client = zat.XrpcClient.init(allocator, "https://bsky.social"); 26 + 27 + // 0.16 28 + var resolver = zat.HandleResolver.init(io, allocator); 29 + var client = zat.XrpcClient.init(io, allocator, "https://bsky.social"); 30 + ``` 31 + 32 + the streaming clients got a bigger change. `connect()` + `next()` loops became `subscribe(handler)`: 33 + 34 + ```zig 35 + // 0.15 36 + var client = zat.JetstreamClient.init(allocator, .{...}); 37 + try client.connect(); 38 + while (try client.next()) |event| { ... } 39 + 40 + // 0.16 41 + var client = zat.JetstreamClient.init(io, allocator, .{...}); 42 + try client.subscribe(&handler); 43 + ``` 44 + 45 + `subscribe` blocks forever — reconnects with exponential backoff, rotates hosts, calls `handler.onEvent()` for each frame. the handler is `anytype` with optional `onError` and `onConnect` callbacks. cancellation propagates through `Io.Cancelable`. 46 + 47 + internally: `std.crypto.random` → `io.random()`. `std.posix.nanosleep` → `io.sleep()`. `libc.gettimeofday` → `Io.Timestamp`. websocket.zig took `io` in its client and server init. 48 + 49 + zat and websocket.zig migrated in lockstep over a day. 203 tests pass. the library side was clean. 50 + 51 + then we deployed the relay. 52 + 53 + ## the relay migration 54 + 55 + [zlay](https://tangled.sh/zzstoatzz.io/zlay) is an AT Protocol relay — ~8,400 lines of zig, ~2,750 PDS subscribers, WebSocket fan-out to downstream consumers. it was the heaviest consumer of the 0.15 API surface. migrating it to 0.16 compiled on the first try. 56 + 57 + eight crashes followed. 58 + 59 + ## crash 1: SIGSEGV on startup 60 + 61 + the build auto-selected `Io.Evented` (io_uring available on linux). the relay printed "io backend: Evented" and immediately segfaulted. exit code 139. 62 + 63 + the cause: zlay's frame processing pool spawns plain `std.Thread` workers. under Evented, any `Io.Mutex` operation calls into the Uring scheduler, which accesses `Thread.current()` — a threadlocal that's only initialized on Uring-managed fibers. plain threads have it set to `null`. in ReleaseFast, `self.?` on null gives a NULL pointer, not a panic. the mutex dereferences a field at an offset from NULL. 64 + 65 + fix: force `Backend = Io.Threaded` in main.zig. we'd come back to Evented later. 66 + 67 + ## crash 2: pool acquire panic 68 + 69 + 30-60 seconds into processing, `unreachable` panic in `Io.Event.waitTimeout`. stack trace: `event_log.zig uidForDid → pg pool.zig acquire → Io.zig`. 70 + 71 + pg.zig (the postgres driver) used `Io.Event` as a connection-available signal. `Io.Event.reset()` has an invariant in the stdlib: it assumes no pending call to `wait`. with 16 frame workers contending for 5 database connections, `set()` wakes all waiters, one calls `reset()`, others hit `unreachable`. 72 + 73 + fix (in pg.zig fork): replaced `Io.Event` with a monotonic `u32` futex counter. `release()` increments + `futexWake(1)`. `acquire()` snapshots counter + `futexWaitTimeout()` with snapshot. no `reset()`, no single-waiter constraint. also bumped pool size to 20 (was hardcoded 5). 74 + 75 + ## crash 3: GPF in websocket write 76 + 77 + 30-60 seconds in, general protection fault in `memcpy → Writer.zig → websocket client.zig writeFrame`. 78 + 79 + the websocket `Client` had no write serialization. three concurrent writers: 80 + 81 + 1. `pingLoop` — `writeFrame(.ping, ...)` every 30s 82 + 2. `readLoop` auto-pong — `writeFrame(.pong, ...)` on upstream ping 83 + 3. close path — `writeFrame(.close, ...)` on failure 84 + 85 + interleaved frame headers corrupt the shared TLS writer state. the server-side `Conn` already had a write lock; the client was missing it. 86 + 87 + fix (in websocket.zig fork): added `_write_lock: Io.Mutex` to `Client`, acquired around both `writeAll` calls in `writeFrame()`. 88 + 89 + ## crash 4: use-after-free in ping loop 90 + 91 + same GPF stack trace as crash 3, but after the write lock fix. every ~60-90 seconds. 92 + 93 + `pingLoop` runs as an `io.concurrent` task and sleeps in 1-second increments. when the connection dies, `readLoop` returns and the defer chain runs: `ping_future.cancel(io)` then `client.deinit()`. but `pingLoop` had `io.sleep(...) catch {}` — swallowing all errors, including `error.Canceled`. so `cancel()` couldn't stop it. `deinit()` freed the client's buffers while `pingLoop` was still running. 94 + 95 + fix: `catch {}` → `catch return`. makes the ping loop cancellation-cooperative. added `isClosed()` guard before `writeFrame` as defense-in-depth. 96 + 97 + ## fix 5: HTTP fallback for health probes 98 + 99 + not a crash — a deployment failure. k8s health probes on port 3000 got 400 responses. the websocket server's handshake handler sent 400 on any non-upgrade HTTP request. 100 + 101 + fix (in websocket.zig fork): intercept `MissingHeaders`, `InvalidConnection`, `InvalidUpgrade` errors from the handshake parser. for these, re-parse as plain HTTP and dispatch to `Handler.httpFallback()` if it exists (comptime check). k8s probes hit `/_health`, get a 200. 102 + 103 + ## crash 6: SIGSEGV in the resyncer 104 + 105 + after switching back to `Io.Evented`: SIGSEGV at startup. `dmesg` showed crash addresses in `Uring.zig` at `Thread` struct field offsets from NULL — the same signature as crash 1 but in a different code path. 106 + 107 + `addr2line` traced it to the resyncer thread. it was spawned via `io.concurrent()`, which under Evented creates a fiber. but the resync work called `DiskPersist` methods that lock `self.mutex` with `pool_io` (Threaded). Threaded futex from an Evented fiber → NULL `Thread.current()` → SIGSEGV. 108 + 109 + this was the moment the pattern became clear: **you cannot call Threaded Io primitives from Evented fibers, or vice versa.** the futex dispatch goes through the scheduler, and the scheduler has thread-local state that only exists in its own managed execution context. 110 + 111 + fix: run the resyncer on a plain `std.Thread` with `pool_io`. the thread checks a `shutdown_flag` atomic to exit. 112 + 113 + ## fix 7: startup connection storm 114 + 115 + ~2,750 simultaneous WebSocket connects at startup starved the io_uring submission queue. event loop couldn't process completions fast enough. 116 + 117 + fix: throttle startup — connect in batches, give the ring time to drain between waves. 118 + 119 + ## crash 8: cross-Io heap corruption 120 + 121 + the relay ran for hours, then SIGSEGV. zero downstream consumers connected. `dmesg` showed crash addresses in `Uring.zig` at `Thread` struct field offsets from NULL — same signature as crashes 1 and 6, but in steady-state operation. 122 + 123 + two cross-Io violations were active: 124 + 125 + 1. **GC loop** — ran as an Evented fiber (`io.concurrent(gcLoop, ...)`), but called `dp.gc()` which locks a mutex with `pool_io` (Threaded) and queries postgres through `pg.Pool` (also Threaded). Threaded futex from Evented fiber → NULL deref. 126 + 127 + 2. **health check endpoints** — `/_readyz`, `/_health`, `/xrpc/_health` on the metrics and API servers. executed `db.exec("SELECT 1")` through the Threaded `pg.Pool` from an Evented HTTP handler context. same violation. 128 + 129 + fix: GC loop moved from `io.concurrent()` to `std.Thread.spawn()` with `pool_io`. health checks replaced with an atomic `last_db_success` timestamp — Threaded workers set it after successful queries, Evented handlers read it. no cross-Io boundary. 130 + 131 + ## the cross-Io rule 132 + 133 + the central discovery from eight crashes across five days: 134 + 135 + **`Io.Mutex`, `Io.Condition`, `io.sleep()`, and any library that uses them internally (pg.Pool, etc.) must be called from the same Io backend they were initialized with.** 136 + 137 + the mechanism: these primitives dispatch through the Io backend's scheduler via futex. each backend has thread-local state — `Thread.current()` under Uring is a `threadlocal var self: ?*Thread = null`, only set inside `Uring.Thread.run()`. calling from outside that context dereferences NULL. 138 + 139 + ``` 140 + Evented fiber → Io.Mutex.lock(pool_io) → Threaded futex 141 + → Thread.current() → threadlocal is NULL 142 + → field access at offset from NULL → SIGSEGV 143 + ``` 144 + 145 + this isn't documented in the stdlib. the API compiles and type-checks — `Io.Mutex.lock` takes any `Io`. the crash only manifests at runtime when the calling thread's execution context doesn't match the Io's backend. 146 + 147 + **safe cross-Io patterns:** 148 + - raw atomics (`std.atomic.Value`, `fetchAdd`, CAS) 149 + - `Io.Mutex.tryLock()` — non-blocking CAS, no futex 150 + - MPSC ring buffers with atomic spinlocks 151 + - atomic timestamps for health checks 152 + 153 + **unsafe cross-Io patterns:** 154 + - `Io.Mutex.lock()` / `lockUncancelable()` with wrong Io 155 + - `Io.Condition.wait()` / `signal()` / `broadcast()` 156 + - `io.sleep()` from wrong context 157 + - any library that internally uses the above (pg.Pool, etc.) 158 + 159 + ## the fix: DbRequestQueue 160 + 161 + ~40 call sites across the relay needed database access from Evented fibers, but `pg.Pool` requires Threaded. the initial approach — a second pool on Evented Io — failed because `netLookup` is unimplemented in Uring. three deploy attempts, three rollbacks. 162 + 163 + the solution: an MPSC ring buffer with typed request structs. 164 + 165 + ```zig 166 + pub const DbRequest = struct { 167 + callback: *const fn(*DbRequest, *DiskPersist) void, 168 + done: std.atomic.Value(bool) = .{ .raw = false }, 169 + err: ?anyerror = null, 170 + 171 + pub fn wait(self: *DbRequest) void { 172 + while (!self.done.load(.acquire)) { 173 + std.atomic.spinLoopHint(); 174 + } 175 + } 176 + }; 177 + ``` 178 + 179 + callers define typed structs that embed `DbRequest` and use `@fieldParentPtr`: 180 + 181 + ```zig 182 + const ListActiveHostsReq = struct { 183 + base: DbRequest = .{ .callback = &execute }, 184 + allocator: Allocator, 185 + result: ?[]Host = null, 186 + 187 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 188 + const self: *@This() = @fieldParentPtr("base", b); 189 + self.result = dp.listActiveHosts(self.allocator) catch |e| { 190 + b.err = e; 191 + return; 192 + }; 193 + } 194 + }; 195 + ``` 196 + 197 + the queue itself: 4096 slots, CAS-based spinlock for producers (Evented fibers), 2 worker threads on `pool_io` (Threaded). workers call `req.callback(req, persist)` then `req.done.store(true, .release)`. fibers spin on `done` with `spinLoopHint()`. shutdown drain marks unprocessed requests as done with `error.ShuttingDown`. 198 + 199 + no futex. no cross-Io boundary. the queue is pure atomics — safe from any execution context. 200 + 201 + the final architecture: 202 + 203 + ``` 204 + Evented fibers atomic boundary Threaded workers 205 + ─────────────────────────── ─────────────────── ────────────────────────── 206 + PDS subscribers DbRequestQueue (2 workers) 207 + downstream consumers DbRequest.push() → pg.Pool queries 208 + broadcast loop ──────────────→ → DiskPersist writes 209 + API/admin handlers → host ops 210 + atomic timestamp 211 + health checks ←──────────── last_db_success ←── set by workers after query 212 + 213 + std.Thread.spawn() 214 + GC loop (pool_io) 215 + resyncer (pool_io) 216 + backfiller (pool_io) 217 + ``` 218 + 219 + ## what this means for zat 220 + 221 + the library held up. CBOR, CAR, commit parsing, verification, multibase — all chain correctly through the Io migration. the API change was mechanical: add `io` as first parameter, thread it through. 222 + 223 + one bug surfaced at the relay level: `tooBig` omission from passthrough frames. the lexicon requires the field on `#commit` events. some PDSes omit it (it's deprecated, always false). zlay's passthrough re-encoding preserved the omission. downstream consumers with strict deserialization (no `#[serde(default)]`) rejected the frames. fix: inject `tooBig: false` when missing during resequencing. 224 + 225 + the streaming client redesign — `subscribe(handler)` instead of `connect()` + `next()` — was the right call. the handler pattern gives the library control over reconnection, backoff, and host rotation. the caller implements `onEvent` and gets reliable delivery without managing connection lifecycle. 226 + 227 + six patches were needed against the zig stdlib or its Uring backend for zlay to run on Evented. `netLookup` is still unimplemented. the cross-Io hazard is still undocumented. but the Io abstraction itself — write once, pick your scheduler — delivered on its promise. the same relay code runs on Threaded (production) and Evented (local development on macOS via GCD) without conditional compilation. 228 + 229 + zat is v0.3.0. the Io parameter is the only breaking change.
+1
scripts/publish-docs.zig
··· 21 21 .{ .path = "/devlog/005", .file = "devlog/005-three-way-verify.md" }, 22 22 .{ .path = "/devlog/006", .file = "devlog/006-building-a-relay.md" }, 23 23 .{ .path = "/devlog/007", .file = "devlog/007-up-and-to-the-right.md" }, 24 + .{ .path = "/devlog/008", .file = "devlog/008-the-io-migration.md" }, 24 25 }; 25 26 26 27 pub fn main() !void {