atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix netListenIp: sync syscalls for bind/listen (kernel <6.11)

IORING_OP_BIND and IORING_OP_LISTEN require kernel 6.11+. Our server
is on 6.8 — the kernel returns EINVAL for unknown opcodes, surfacing
as error.Unexpected. Use direct linux.bind()/linux.listen() syscalls
instead (instant, non-blocking — same approach as getsockname).

IORING_OP_ACCEPT (5.5), IORING_OP_CONNECT (5.5), IORING_OP_SOCKET
(5.19), and IORING_OP_SENDMSG/RECVMSG/READV (5.3-5.6) all work fine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+36 -61
+36 -61
patches/uring-networking.patch
··· 23 23 .netWriteFile = netWriteFileUnavailable, 24 24 .netClose = netClose, 25 25 .netShutdown = netShutdown, 26 - @@ -4953,28 +4953,83 @@ 26 + @@ -4953,28 +4953,102 @@ 27 27 }; 28 28 } 29 29 ··· 52 52 + } 53 53 + var storage: PosixAddress = undefined; 54 54 + var addr_len = addressToPosix(address, &storage); 55 - + try ev.bind(&maybe_sync.cancel_region, socket_fd, &storage.any, addr_len); 56 - + try ev.listen(&maybe_sync.cancel_region, socket_fd, options.kernel_backlog); 55 + + // bind + listen: sync syscalls (IORING_OP_BIND/LISTEN require kernel 6.11+) 56 + + switch (linux.errno(linux.bind(socket_fd, &storage.any, addr_len))) { 57 + + .SUCCESS => {}, 58 + + .ADDRINUSE => return error.AddressInUse, 59 + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, 60 + + .ADDRNOTAVAIL => return error.AddressUnavailable, 61 + + .BADF => |err| return errnoBug(err), 62 + + .INVAL => |err| return errnoBug(err), 63 + + .NOTSOCK => |err| return errnoBug(err), 64 + + .FAULT => |err| return errnoBug(err), 65 + + .NOMEM => return error.SystemResources, 66 + + else => |err| return unexpectedErrno(err), 67 + + } 68 + + switch (linux.errno(linux.listen(socket_fd, options.kernel_backlog))) { 69 + + .SUCCESS => {}, 70 + + .ADDRINUSE => return error.AddressInUse, 71 + + .BADF => |err| return errnoBug(err), 72 + + .NOTSOCK => |err| return errnoBug(err), 73 + + .OPNOTSUPP => |err| return errnoBug(err), 74 + + else => |err| return unexpectedErrno(err), 75 + + } 57 76 + try ev.getsockname(try maybe_sync.enterSync(ev), socket_fd, &storage.any, &addr_len); 58 77 + return .{ .handle = socket_fd, .address = addressFromPosix(&storage) }; 59 78 } ··· 117 136 } 118 137 119 138 fn netBindIp( 120 - @@ -4996,16 +5051,26 @@ 139 + @@ -4996,16 +5070,26 @@ 121 140 return .{ .handle = socket_fd, .address = addressFromPosix(&storage) }; 122 141 } 123 142 ··· 149 168 } 150 169 151 170 fn netListenUnixUnavailable( 152 - @@ -5039,20 +5104,96 @@ 171 + @@ -5039,18 +5123,94 @@ 153 172 return error.OperationUnsupported; 154 173 } 155 174 ··· 178 197 + ev.netSendOne(handle, message, posix_flags) catch |err| return .{ err, i }; 179 198 + } 180 199 + return .{ null, messages.len }; 181 - } 182 - 200 + +} 201 + + 183 202 +fn netSendOne( 184 203 + ev: *Evented, 185 204 + handle: net.Socket.Handle, ··· 247 266 + else => |err| return unexpectedErrno(err), 248 267 + } 249 268 + } 250 - +} 251 - + 269 + } 270 + 252 271 fn netReceive( 253 - ev: *Evented, 254 - cancel_region: *CancelRegion, 255 - @@ -5142,19 +5283,67 @@ 272 + @@ -5142,19 +5302,67 @@ 256 273 } 257 274 } 258 275 ··· 326 343 userdata: ?*anyopaque, 327 344 handle: net.Socket.Handle, 328 345 header: []const u8, 329 - @@ -5162,12 +5351,94 @@ 346 + @@ -5162,12 +5370,94 @@ 330 347 splat: usize, 331 348 ) net.Stream.Writer.Error!usize { 332 349 const ev: *Evented = @ptrCast(@alignCast(userdata)); ··· 427 444 } 428 445 429 446 fn netWriteFileUnavailable( 430 - @@ -5303,11 +5574,100 @@ 431 - .ADDRNOTAVAIL => return error.AddressUnavailable, 432 - .FAULT => |err| return errnoBug(err), // invalid `addr` pointer 433 - .NOMEM => return error.SystemResources, 434 - + else => |err| return unexpectedErrno(err), 435 - + } 436 - + } 437 - +} 438 - + 439 - +fn listen( 440 - + ev: *Evented, 441 - + cancel_region: *CancelRegion, 442 - + socket_fd: fd_t, 443 - + backlog: u31, 444 - +) !void { 445 - + while (true) { 446 - + const thread = try cancel_region.awaitIoUring(); 447 - + thread.enqueue().* = .{ 448 - + .opcode = .LISTEN, 449 - + .flags = 0, 450 - + .ioprio = 0, 451 - + .fd = socket_fd, 452 - + .off = 0, 453 - + .addr = 0, 454 - + .len = backlog, 455 - + .rw_flags = 0, 456 - + .user_data = @intFromPtr(cancel_region.fiber), 457 - + .buf_index = 0, 458 - + .personality = 0, 459 - + .splice_fd_in = 0, 460 - + .addr3 = 0, 461 - + .resv = 0, 462 - + }; 463 - + ev.yield(null, .nothing); 464 - + switch (cancel_region.errno()) { 465 - + .SUCCESS => return, 466 - + .INTR, .CANCELED => {}, 467 - + .ADDRINUSE => return error.AddressInUse, 468 - + .BADF => |err| return errnoBug(err), 469 - + .NOTSOCK => |err| return errnoBug(err), 470 - + .OPNOTSUPP => |err| return errnoBug(err), 447 + @@ -5306,6 +5596,58 @@ 471 448 else => |err| return unexpectedErrno(err), 472 449 } 473 450 } 474 - } 475 - 451 + +} 452 + + 476 453 +fn connect( 477 454 + ev: *Evented, 478 455 + cancel_region: *CancelRegion, ··· 504 481 + .INTR, .CANCELED => {}, 505 482 + .ADDRNOTAVAIL => return error.AddressUnavailable, 506 483 + .AFNOSUPPORT => return error.AddressFamilyUnsupported, 507 - + .AGAIN, .INPROGRESS => return, // non-blocking / TCP fast open 484 + + .AGAIN, .INPROGRESS => return, 508 485 + .ALREADY => return error.ConnectionPending, 509 486 + .BADF => |err| return errnoBug(err), 510 487 + .CONNREFUSED => return error.ConnectionRefused, ··· 523 500 + else => |err| return unexpectedErrno(err), 524 501 + } 525 502 + } 526 - +} 527 - + 503 + } 504 + 528 505 fn chdir(sync: *CancelRegion.Sync, path: [*:0]const u8) ChdirError!void { 529 - while (true) { 530 - try sync.cancel_region.await(.nothing);