atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

Io.Evented backend via patched Uring.zig networking

Implements 6 io_uring network vtable entries (listen, accept, connect,
read, write, send) that are stubbed as Unavailable upstream (zig#31723).
The patch is applied at build time inside the Docker container only —
it modifies the zig stdlib bundled in the container image, not the host
zig installation or any downstream consumer of zat/websocket.zig.

DNS (netLookup) is deliberately NOT patched — subscribers resolve
hostnames through pool_io (Threaded) and pass the connected stream
to the websocket client for TLS + framing via Evented io.

Dep bumps:
- websocket.zig 80c6434: initWithStream() respects config.tls
(was hardcoded to null). Non-breaking — existing callers default
to tls=false and get identical behavior.
- zat v0.3.0-alpha.16: picks up the websocket bump so both zlay
and zat resolve to the same websocket version.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+551 -11
+4
Dockerfile
··· 8 8 ENV PATH=/opt/zig-x86_64-linux-0.16.0-dev.3059+42e33db9d:$PATH 9 9 WORKDIR /build 10 10 11 + # patch Io.Uring networking (stubbed as Unavailable upstream, zig#31723) 12 + COPY patches/ patches/ 13 + RUN patch /opt/zig-x86_64-linux-0.16.0-dev.3059+42e33db9d/lib/std/Io/Uring.zig < patches/uring-networking.patch 14 + 11 15 # fetch dependencies first (cacheable — only changes when build.zig.zon changes) 12 16 COPY build.zig build.zig.zon ./ 13 17 RUN zig build --fetch-only 2>/dev/null || true
+4 -4
build.zig.zon
··· 5 5 .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.15.tar.gz", 9 - .hash = "zat-0.3.0-alpha.15-5PuC7nVhBQCJEzz9LuzSbtLb68Wd0x_yjDgTP3EqV8dH", 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.16.tar.gz", 9 + .hash = "zat-0.3.0-alpha.15-5PuC7nVhBQC8QDphqwrmXGqng1Xvo8ua_H5MqS-smp1T", 10 10 }, 11 11 .websocket = .{ 12 - .url = "https://github.com/zzstoatzz/websocket.zig/archive/ac3df25.tar.gz", 13 - .hash = "websocket-0.1.0-ZPISdUvvAwDQN3W3AYDxmzMj5ipuTnB3vpQinQPF9LqI", 12 + .url = "https://github.com/zzstoatzz/websocket.zig/archive/80c6434.tar.gz", 13 + .hash = "websocket-0.1.0-ZPISdTHwAwA1d45BsYRE81Z8wNwZ3RhukgNADOma4eym", 14 14 }, 15 15 .pg = .{ 16 16 .url = "git+https://github.com/zzstoatzz/pg.zig?ref=dev#5ce2355b1d851075523709c7d3068dcdb0224322",
+530
patches/uring-networking.patch
··· 1 + --- a/lib/std/Io/Uring.zig 2 + +++ b/lib/std/Io/Uring.zig 3 + @@ -771,16 +771,16 @@ 4 + .random = random, 5 + .randomSecure = randomSecure, 6 + 7 + - .netListenIp = netListenIpUnavailable, 8 + - .netAccept = netAcceptUnavailable, 9 + + .netListenIp = netListenIp, 10 + + .netAccept = netAccept, 11 + .netBindIp = netBindIp, 12 + - .netConnectIp = netConnectIpUnavailable, 13 + + .netConnectIp = netConnectIp, 14 + .netListenUnix = netListenUnixUnavailable, 15 + .netConnectUnix = netConnectUnixUnavailable, 16 + .netSocketCreatePair = netSocketCreatePairUnavailable, 17 + - .netSend = netSendUnavailable, 18 + - .netRead = netReadUnavailable, 19 + - .netWrite = netWriteUnavailable, 20 + + .netSend = netSend, 21 + + .netRead = netRead, 22 + + .netWrite = netWrite, 23 + .netWriteFile = netWriteFileUnavailable, 24 + .netClose = netClose, 25 + .netShutdown = netShutdown, 26 + @@ -4953,28 +4953,83 @@ 27 + }; 28 + } 29 + 30 + -fn netListenIpUnavailable( 31 + +fn netListenIp( 32 + userdata: ?*anyopaque, 33 + address: *const net.IpAddress, 34 + options: net.IpAddress.ListenOptions, 35 + ) net.IpAddress.ListenError!net.Socket { 36 + const ev: *Evented = @ptrCast(@alignCast(userdata)); 37 + - _ = ev; 38 + - _ = address; 39 + - _ = options; 40 + - return error.NetworkDown; 41 + + const family = posixAddressFamily(address); 42 + + var maybe_sync: CancelRegion.Sync.Maybe = .{ .cancel_region = .init() }; 43 + + defer maybe_sync.deinit(ev); 44 + + const socket_fd = try ev.socket(&maybe_sync.cancel_region, family, .{ 45 + + .mode = options.mode, 46 + + .protocol = options.protocol, 47 + + }); 48 + + errdefer ev.closeAsync(socket_fd); 49 + + if (options.reuse_address) { 50 + + try ev.setsockopt(&maybe_sync.cancel_region, socket_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, 1); 51 + + try ev.setsockopt(&maybe_sync.cancel_region, socket_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, 1); 52 + + } 53 + + var storage: PosixAddress = undefined; 54 + + var addr_len = addressToPosix(address, &storage); 55 + + try ev.bind(&maybe_sync.cancel_region, socket_fd, &storage.any, addr_len); 56 + + try ev.listen(&maybe_sync.cancel_region, socket_fd, options.kernel_backlog); 57 + + try ev.getsockname(try maybe_sync.enterSync(ev), socket_fd, &storage.any, &addr_len); 58 + + return .{ .handle = socket_fd, .address = addressFromPosix(&storage) }; 59 + } 60 + 61 + -fn netAcceptUnavailable( 62 + +fn netAccept( 63 + userdata: ?*anyopaque, 64 + listen_handle: net.Socket.Handle, 65 + options: net.Server.AcceptOptions, 66 + ) net.Server.AcceptError!net.Socket { 67 + - const ev: *Evented = @ptrCast(@alignCast(userdata)); 68 + - _ = ev; 69 + - _ = listen_handle; 70 + _ = options; 71 + - return error.NetworkDown; 72 + + const ev: *Evented = @ptrCast(@alignCast(userdata)); 73 + + var cancel_region: CancelRegion = .init(); 74 + + defer cancel_region.deinit(); 75 + + var storage: PosixAddress = undefined; 76 + + var addr_len: linux.socklen_t = @sizeOf(PosixAddress); 77 + + const accepted_fd = while (true) { 78 + + const thread = try cancel_region.awaitIoUring(); 79 + + thread.enqueue().* = .{ 80 + + .opcode = .ACCEPT, 81 + + .flags = 0, 82 + + .ioprio = 0, 83 + + .fd = listen_handle, 84 + + .off = @intFromPtr(&addr_len), 85 + + .addr = @intFromPtr(&storage.any), 86 + + .len = 0, 87 + + .rw_flags = linux.SOCK.CLOEXEC, 88 + + .user_data = @intFromPtr(cancel_region.fiber), 89 + + .buf_index = 0, 90 + + .personality = 0, 91 + + .splice_fd_in = 0, 92 + + .addr3 = 0, 93 + + .resv = 0, 94 + + }; 95 + + ev.yield(null, .nothing); 96 + + const completion = cancel_region.completion(); 97 + + switch (completion.errno()) { 98 + + .SUCCESS => break completion.result, 99 + + .INTR, .CANCELED => {}, 100 + + .AGAIN => unreachable, 101 + + .BADF => |err| return errnoBug(err), 102 + + .CONNABORTED => return error.ConnectionAborted, 103 + + .INVAL => return error.SocketNotListening, 104 + + .NOTSOCK => |err| return errnoBug(err), 105 + + .MFILE => return error.ProcessFdQuotaExceeded, 106 + + .NFILE => return error.SystemFdQuotaExceeded, 107 + + .NOBUFS => return error.SystemResources, 108 + + .NOMEM => return error.SystemResources, 109 + + .OPNOTSUPP => |err| return errnoBug(err), 110 + + .PROTO => return error.ProtocolFailure, 111 + + .PERM => return error.BlockedByFirewall, 112 + + .NETDOWN => return error.NetworkDown, 113 + + else => |err| return unexpectedErrno(err), 114 + + } 115 + + }; 116 + + return .{ .handle = accepted_fd, .address = addressFromPosix(&storage) }; 117 + } 118 + 119 + fn netBindIp( 120 + @@ -4996,16 +5051,26 @@ 121 + return .{ .handle = socket_fd, .address = addressFromPosix(&storage) }; 122 + } 123 + 124 + -fn netConnectIpUnavailable( 125 + +fn netConnectIp( 126 + userdata: ?*anyopaque, 127 + address: *const net.IpAddress, 128 + options: net.IpAddress.ConnectOptions, 129 + ) net.IpAddress.ConnectError!net.Socket { 130 + + if (options.timeout != .none) @panic("TODO: connect timeout for Io.Uring"); 131 + const ev: *Evented = @ptrCast(@alignCast(userdata)); 132 + - _ = ev; 133 + - _ = address; 134 + - _ = options; 135 + - return error.NetworkDown; 136 + + const family = posixAddressFamily(address); 137 + + var maybe_sync: CancelRegion.Sync.Maybe = .{ .cancel_region = .init() }; 138 + + defer maybe_sync.deinit(ev); 139 + + const socket_fd = try ev.socket(&maybe_sync.cancel_region, family, .{ 140 + + .mode = options.mode, 141 + + .protocol = options.protocol, 142 + + }); 143 + + errdefer ev.closeAsync(socket_fd); 144 + + var storage: PosixAddress = undefined; 145 + + var addr_len = addressToPosix(address, &storage); 146 + + try ev.connect(&maybe_sync.cancel_region, socket_fd, &storage.any, addr_len); 147 + + try ev.getsockname(try maybe_sync.enterSync(ev), socket_fd, &storage.any, &addr_len); 148 + + return .{ .handle = socket_fd, .address = addressFromPosix(&storage) }; 149 + } 150 + 151 + fn netListenUnixUnavailable( 152 + @@ -5039,20 +5104,96 @@ 153 + return error.OperationUnsupported; 154 + } 155 + 156 + -fn netSendUnavailable( 157 + +fn netSend( 158 + userdata: ?*anyopaque, 159 + handle: net.Socket.Handle, 160 + messages: []net.OutgoingMessage, 161 + flags: net.SendFlags, 162 + ) struct { ?net.Socket.SendError, usize } { 163 + const ev: *Evented = @ptrCast(@alignCast(userdata)); 164 + - _ = ev; 165 + - _ = handle; 166 + - _ = messages; 167 + - _ = flags; 168 + - return .{ error.NetworkDown, 0 }; 169 + + const posix_flags: u32 = 170 + + @as(u32, if (flags.confirm) linux.MSG.CONFIRM else 0) | 171 + + @as(u32, if (flags.dont_route) linux.MSG.DONTROUTE else 0) | 172 + + @as(u32, if (flags.eor) linux.MSG.EOR else 0) | 173 + + @as(u32, if (flags.oob) linux.MSG.OOB else 0) | 174 + + @as(u32, if (flags.fastopen) linux.MSG.FASTOPEN else 0) | 175 + + linux.MSG.NOSIGNAL; 176 + + 177 + + for (messages, 0..) |*message, i| { 178 + + ev.netSendOne(handle, message, posix_flags) catch |err| return .{ err, i }; 179 + + } 180 + + return .{ null, messages.len }; 181 + } 182 + 183 + +fn netSendOne( 184 + + ev: *Evented, 185 + + handle: net.Socket.Handle, 186 + + message: *net.OutgoingMessage, 187 + + flags: u32, 188 + +) net.Socket.SendError!void { 189 + + var addr: PosixAddress = undefined; 190 + + var iov: iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len }; 191 + + var msg: linux.msghdr_const = .{ 192 + + .name = &addr.any, 193 + + .namelen = addressToPosix(message.address, &addr), 194 + + .iov = (&iov)[0..1], 195 + + .iovlen = 1, 196 + + .control = if (message.control.len == 0) null else @constCast(message.control.ptr), 197 + + .controllen = @intCast(message.control.len), 198 + + .flags = 0, 199 + + }; 200 + + var cancel_region: CancelRegion = .init(); 201 + + defer cancel_region.deinit(); 202 + + while (true) { 203 + + const thread = try cancel_region.awaitIoUring(); 204 + + thread.enqueue().* = .{ 205 + + .opcode = .SENDMSG, 206 + + .flags = 0, 207 + + .ioprio = 0, 208 + + .fd = handle, 209 + + .off = 0, 210 + + .addr = @intFromPtr(&msg), 211 + + .len = 0, 212 + + .rw_flags = flags, 213 + + .user_data = @intFromPtr(cancel_region.fiber), 214 + + .buf_index = 0, 215 + + .personality = 0, 216 + + .splice_fd_in = 0, 217 + + .addr3 = 0, 218 + + .resv = 0, 219 + + }; 220 + + ev.yield(null, .nothing); 221 + + const completion = cancel_region.completion(); 222 + + switch (completion.errno()) { 223 + + .SUCCESS => { 224 + + message.data_len = @intCast(completion.result); 225 + + return; 226 + + }, 227 + + .INTR, .CANCELED => {}, 228 + + .ACCES => return error.AccessDenied, 229 + + .ALREADY => return error.FastOpenAlreadyInProgress, 230 + + .BADF => |err| return errnoBug(err), 231 + + .CONNRESET => return error.ConnectionResetByPeer, 232 + + .DESTADDRREQ => |err| return errnoBug(err), 233 + + .FAULT => |err| return errnoBug(err), 234 + + .INVAL => |err| return errnoBug(err), 235 + + .ISCONN => |err| return errnoBug(err), 236 + + .MSGSIZE => return error.MessageOversize, 237 + + .NOBUFS => return error.SystemResources, 238 + + .NOMEM => return error.SystemResources, 239 + + .NOTSOCK => |err| return errnoBug(err), 240 + + .OPNOTSUPP => |err| return errnoBug(err), 241 + + .PIPE => return error.SocketUnconnected, 242 + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, 243 + + .HOSTUNREACH => return error.HostUnreachable, 244 + + .NETUNREACH => return error.NetworkUnreachable, 245 + + .NOTCONN => return error.SocketUnconnected, 246 + + .NETDOWN => return error.NetworkDown, 247 + + else => |err| return unexpectedErrno(err), 248 + + } 249 + + } 250 + +} 251 + + 252 + fn netReceive( 253 + ev: *Evented, 254 + cancel_region: *CancelRegion, 255 + @@ -5142,19 +5283,67 @@ 256 + } 257 + } 258 + 259 + -fn netReadUnavailable( 260 + +fn netRead( 261 + userdata: ?*anyopaque, 262 + fd: net.Socket.Handle, 263 + data: [][]u8, 264 + ) net.Stream.Reader.Error!usize { 265 + const ev: *Evented = @ptrCast(@alignCast(userdata)); 266 + - _ = ev; 267 + - _ = fd; 268 + - _ = data; 269 + - return error.NetworkDown; 270 + + var iovecs_buffer: [max_iovecs_len]iovec = undefined; 271 + + var i: usize = 0; 272 + + for (data) |buf| { 273 + + if (iovecs_buffer.len - i == 0) break; 274 + + if (buf.len > 0) { 275 + + iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len }; 276 + + i += 1; 277 + + } 278 + + } 279 + + if (i == 0) return 0; 280 + + const dest = iovecs_buffer[0..i]; 281 + + assert(dest[0].len > 0); 282 + + var cancel_region: CancelRegion = .init(); 283 + + defer cancel_region.deinit(); 284 + + const gather = dest.len > 1 or dest[0].len > 0xfffff000; 285 + + while (true) { 286 + + const thread = try cancel_region.awaitIoUring(); 287 + + thread.enqueue().* = .{ 288 + + .opcode = if (gather) .READV else .READ, 289 + + .flags = 0, 290 + + .ioprio = 0, 291 + + .fd = fd, 292 + + .off = std.math.maxInt(u64), 293 + + .addr = if (gather) @intFromPtr(dest.ptr) else @intFromPtr(dest[0].base), 294 + + .len = @intCast(if (gather) dest.len else dest[0].len), 295 + + .rw_flags = 0, 296 + + .user_data = @intFromPtr(cancel_region.fiber), 297 + + .buf_index = 0, 298 + + .personality = 0, 299 + + .splice_fd_in = 0, 300 + + .addr3 = 0, 301 + + .resv = 0, 302 + + }; 303 + + ev.yield(null, .nothing); 304 + + const completion = cancel_region.completion(); 305 + + switch (completion.errno()) { 306 + + .SUCCESS => return @as(u32, @bitCast(completion.result)), 307 + + .INTR, .CANCELED => {}, 308 + + .INVAL => |err| return errnoBug(err), 309 + + .FAULT => |err| return errnoBug(err), 310 + + .AGAIN => unreachable, 311 + + .BADF => |err| return errnoBug(err), 312 + + .NOBUFS => return error.SystemResources, 313 + + .NOMEM => return error.SystemResources, 314 + + .NOTCONN => return error.SocketUnconnected, 315 + + .CONNRESET => return error.ConnectionResetByPeer, 316 + + .TIMEDOUT => return error.Timeout, 317 + + .PIPE => return error.SocketUnconnected, 318 + + .NETDOWN => return error.NetworkDown, 319 + + else => |err| return unexpectedErrno(err), 320 + + } 321 + + } 322 + } 323 + 324 + -fn netWriteUnavailable( 325 + +fn netWrite( 326 + userdata: ?*anyopaque, 327 + handle: net.Socket.Handle, 328 + header: []const u8, 329 + @@ -5162,12 +5351,94 @@ 330 + splat: usize, 331 + ) net.Stream.Writer.Error!usize { 332 + const ev: *Evented = @ptrCast(@alignCast(userdata)); 333 + - _ = ev; 334 + - _ = handle; 335 + - _ = header; 336 + - _ = data; 337 + - _ = splat; 338 + - return error.NetworkDown; 339 + + var iovecs: [max_iovecs_len]iovec_const = undefined; 340 + + var iovlen: iovlen_t = 0; 341 + + addBuf(&iovecs, &iovlen, header); 342 + + for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &iovlen, bytes); 343 + + const pattern = data[data.len - 1]; 344 + + var backup_buffer: [splat_buffer_size]u8 = undefined; 345 + + if (iovecs.len - iovlen != 0) switch (splat) { 346 + + 0 => {}, 347 + + 1 => addBuf(&iovecs, &iovlen, pattern), 348 + + else => switch (pattern.len) { 349 + + 0 => {}, 350 + + 1 => { 351 + + const splat_buffer = &backup_buffer; 352 + + const memset_len = @min(splat_buffer.len, splat); 353 + + const buf = splat_buffer[0..memset_len]; 354 + + @memset(buf, pattern[0]); 355 + + addBuf(&iovecs, &iovlen, buf); 356 + + var remaining_splat = splat - buf.len; 357 + + while (remaining_splat > splat_buffer.len and iovecs.len - iovlen != 0) { 358 + + assert(buf.len == splat_buffer.len); 359 + + addBuf(&iovecs, &iovlen, splat_buffer); 360 + + remaining_splat -= splat_buffer.len; 361 + + } 362 + + addBuf(&iovecs, &iovlen, splat_buffer[0..@min(remaining_splat, splat_buffer.len)]); 363 + + }, 364 + + else => for (0..@min(splat, iovecs.len - iovlen)) |_| { 365 + + addBuf(&iovecs, &iovlen, pattern); 366 + + }, 367 + + }, 368 + + }; 369 + + const iov = iovecs[0..iovlen]; 370 + + if (iov.len == 0) return 0; 371 + + var msg: linux.msghdr_const = .{ 372 + + .name = null, 373 + + .namelen = 0, 374 + + .iov = iov.ptr, 375 + + .iovlen = iovlen, 376 + + .control = null, 377 + + .controllen = 0, 378 + + .flags = 0, 379 + + }; 380 + + var cancel_region: CancelRegion = .init(); 381 + + defer cancel_region.deinit(); 382 + + while (true) { 383 + + const thread = try cancel_region.awaitIoUring(); 384 + + thread.enqueue().* = .{ 385 + + .opcode = .SENDMSG, 386 + + .flags = 0, 387 + + .ioprio = 0, 388 + + .fd = handle, 389 + + .off = 0, 390 + + .addr = @intFromPtr(&msg), 391 + + .len = 0, 392 + + .rw_flags = linux.MSG.NOSIGNAL, 393 + + .user_data = @intFromPtr(cancel_region.fiber), 394 + + .buf_index = 0, 395 + + .personality = 0, 396 + + .splice_fd_in = 0, 397 + + .addr3 = 0, 398 + + .resv = 0, 399 + + }; 400 + + ev.yield(null, .nothing); 401 + + const completion = cancel_region.completion(); 402 + + switch (completion.errno()) { 403 + + .SUCCESS => return @as(u32, @bitCast(completion.result)), 404 + + .INTR, .CANCELED => {}, 405 + + .ACCES => |err| return errnoBug(err), 406 + + .AGAIN => unreachable, 407 + + .BADF => |err| return errnoBug(err), 408 + + .DESTADDRREQ => |err| return errnoBug(err), 409 + + .FAULT => |err| return errnoBug(err), 410 + + .INVAL => |err| return errnoBug(err), 411 + + .ISCONN => |err| return errnoBug(err), 412 + + .MSGSIZE => |err| return errnoBug(err), 413 + + .OPNOTSUPP => |err| return errnoBug(err), 414 + + .ALREADY => return error.FastOpenAlreadyInProgress, 415 + + .CONNRESET => return error.ConnectionResetByPeer, 416 + + .NOBUFS => return error.SystemResources, 417 + + .NOMEM => return error.SystemResources, 418 + + .PIPE => return error.SocketUnconnected, 419 + + .NOTCONN => return error.SocketUnconnected, 420 + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, 421 + + .HOSTUNREACH => return error.HostUnreachable, 422 + + .NETUNREACH => return error.NetworkUnreachable, 423 + + .NETDOWN => return error.NetworkDown, 424 + + else => |err| return unexpectedErrno(err), 425 + + } 426 + + } 427 + } 428 + 429 + fn netWriteFileUnavailable( 430 + @@ -5303,11 +5574,100 @@ 431 + .ADDRNOTAVAIL => return error.AddressUnavailable, 432 + .FAULT => |err| return errnoBug(err), // invalid `addr` pointer 433 + .NOMEM => return error.SystemResources, 434 + + else => |err| return unexpectedErrno(err), 435 + + } 436 + + } 437 + +} 438 + + 439 + +fn listen( 440 + + ev: *Evented, 441 + + cancel_region: *CancelRegion, 442 + + socket_fd: fd_t, 443 + + backlog: u31, 444 + +) !void { 445 + + while (true) { 446 + + const thread = try cancel_region.awaitIoUring(); 447 + + thread.enqueue().* = .{ 448 + + .opcode = .LISTEN, 449 + + .flags = 0, 450 + + .ioprio = 0, 451 + + .fd = socket_fd, 452 + + .off = 0, 453 + + .addr = 0, 454 + + .len = backlog, 455 + + .rw_flags = 0, 456 + + .user_data = @intFromPtr(cancel_region.fiber), 457 + + .buf_index = 0, 458 + + .personality = 0, 459 + + .splice_fd_in = 0, 460 + + .addr3 = 0, 461 + + .resv = 0, 462 + + }; 463 + + ev.yield(null, .nothing); 464 + + switch (cancel_region.errno()) { 465 + + .SUCCESS => return, 466 + + .INTR, .CANCELED => {}, 467 + + .ADDRINUSE => return error.AddressInUse, 468 + + .BADF => |err| return errnoBug(err), 469 + + .NOTSOCK => |err| return errnoBug(err), 470 + + .OPNOTSUPP => |err| return errnoBug(err), 471 + else => |err| return unexpectedErrno(err), 472 + } 473 + } 474 + } 475 + 476 + +fn connect( 477 + + ev: *Evented, 478 + + cancel_region: *CancelRegion, 479 + + socket_fd: fd_t, 480 + + addr: *const linux.sockaddr, 481 + + addr_len: linux.socklen_t, 482 + +) !void { 483 + + while (true) { 484 + + const thread = try cancel_region.awaitIoUring(); 485 + + thread.enqueue().* = .{ 486 + + .opcode = .CONNECT, 487 + + .flags = 0, 488 + + .ioprio = 0, 489 + + .fd = socket_fd, 490 + + .off = addr_len, 491 + + .addr = @intFromPtr(addr), 492 + + .len = 0, 493 + + .rw_flags = 0, 494 + + .user_data = @intFromPtr(cancel_region.fiber), 495 + + .buf_index = 0, 496 + + .personality = 0, 497 + + .splice_fd_in = 0, 498 + + .addr3 = 0, 499 + + .resv = 0, 500 + + }; 501 + + ev.yield(null, .nothing); 502 + + switch (cancel_region.errno()) { 503 + + .SUCCESS => return, 504 + + .INTR, .CANCELED => {}, 505 + + .ADDRNOTAVAIL => return error.AddressUnavailable, 506 + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, 507 + + .AGAIN, .INPROGRESS => return, // non-blocking / TCP fast open 508 + + .ALREADY => return error.ConnectionPending, 509 + + .BADF => |err| return errnoBug(err), 510 + + .CONNREFUSED => return error.ConnectionRefused, 511 + + .CONNRESET => return error.ConnectionResetByPeer, 512 + + .FAULT => |err| return errnoBug(err), 513 + + .ISCONN => |err| return errnoBug(err), 514 + + .HOSTUNREACH => return error.HostUnreachable, 515 + + .NETUNREACH => return error.NetworkUnreachable, 516 + + .NOTSOCK => |err| return errnoBug(err), 517 + + .PROTOTYPE => |err| return errnoBug(err), 518 + + .TIMEDOUT => return error.Timeout, 519 + + .CONNABORTED => |err| return errnoBug(err), 520 + + .ACCES => return error.AccessDenied, 521 + + .PERM => |err| return errnoBug(err), 522 + + .NETDOWN => return error.NetworkDown, 523 + + else => |err| return unexpectedErrno(err), 524 + + } 525 + + } 526 + +} 527 + + 528 + fn chdir(sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError!void { 529 + while (true) { 530 + try sync.cancel_region.await(.nothing);
+6 -6
src/main.zig
··· 47 47 pub const default_stack_size = 8 * 1024 * 1024; 48 48 49 49 // -- Io backend selection -- 50 - // Evented (fibers): network orchestration, subscriber connections, WS server, broadcasting. 51 - // Worker threads use a dedicated pool_io (Threaded) for their sync — they never touch Evented io. 52 - // The broadcast queue bridges workers → broadcaster fiber (atomics only, no Io dependency). 50 + // Io.Evented (fibers on io_uring): ~35 threads instead of ~2,800 (one per PDS). 51 + // Networking via patched Uring.zig (patches/uring-networking.patch) — implements 52 + // listen, accept, connect, read, write, send via io_uring opcodes. DNS (netLookup) 53 + // is NOT patched; subscribers resolve hostnames through pool_io (Threaded) instead. 53 54 // 54 - // NOTE: Io.Uring has a fiber context-switch GPF under ReleaseSafe (optimizer + safety checks). 55 - // Debug and ReleaseFast both work fine. Build with ReleaseFast until the stdlib bug is fixed. 56 - // See repro_evented.zig for the minimal reproduction case. 55 + // Known issue: Io.Uring GPFs under ReleaseSafe (optimizer + safety interaction). 56 + // Build with ReleaseFast. See repro_evented.zig for minimal reproduction. 57 57 const Backend = Io.Evented; 58 58 59 59 var backend: Backend = undefined;
+7 -1
src/subscriber.zig
··· 323 323 } 324 324 const path = w.buffered(); 325 325 326 - var client = try websocket.Client.init(self.io, self.allocator, .{ 326 + // DNS + TCP connect through pool_io (Threaded — has working netLookup). 327 + // The resulting fd is used by the Evented io for TLS + WebSocket I/O. 328 + const dns_io = self.pool_io orelse self.io; 329 + const host_name = try Io.net.HostName.init(self.options.hostname); 330 + const net_stream = try host_name.connect(dns_io, 443, .{ .mode = .stream }); 331 + 332 + var client = try websocket.Client.initWithStream(self.io, self.allocator, net_stream, .{ 327 333 .host = self.options.hostname, 328 334 .port = 443, 329 335 .tls = true,