this repo has no description
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

s/Runtime/Ring

+70 -70
+9 -9
README.md
··· 16 16 capability of io_uring to pass a completion from one ring to another. This 17 17 allows a multithreaded application to implement message passing using io_uring 18 18 (or kqueue, if that's your flavor). Multithreaded applications should plan to 19 - use one `Runtime` per thread. Submission onto the runtime is not thread safe, 19 + use one `Ring` per thread. Submission onto the runtime is not thread safe, 20 20 any message passing must occur using `msg_ring` rather than directly submitting 21 21 a task to another 22 22 ··· 61 61 .req = .{ .userfd = fd }, 62 62 }; 63 63 64 - // Send target_task from the main_rt thread to the thread_rt Runtime. The 65 - // thread_rt Runtime will then // process the task as a completion, ie 64 + // Send target_task from the main_rt thread to the thread_rt Ring. The 65 + // thread_rt Ring will then // process the task as a completion, ie 66 66 // Worker.onCompletion will be called with // this task. That thread can then 67 67 // schedule a recv, a write, etc on the file // descriptor it just received. 68 68 _ = try main_rt.msgRing(thread_rt, target_task, .{}); 69 69 ``` 70 70 71 - ### Multiple Runtimes on the same thread 71 + ### Multiple Rings on the same thread 72 72 73 - You can have multiple Runtimes in a single thread. One could be a priority 74 - Runtime, or handle specific types of tasks, etc. Poll any runtime from any other 73 + You can have multiple Rings in a single thread. One could be a priority 74 + Ring, or handle specific types of tasks, etc. Poll any runtime from any other 75 75 runtime. 76 76 77 77 ```zig ··· 110 110 try self.buf.appendSlice(gpa, bytes); 111 111 } 112 112 113 - pub fn flush(self: *MultiWriter, rt: *io.Runtime) !void { 113 + pub fn flush(self: *MultiWriter, rt: *io.Ring) !void { 114 114 if (self.fd1_written < self.buf.items.len) { 115 115 _ = try rt.write(self.fd1, self.buf.items[self.fd1_written..], .{ 116 116 .ptr = self, ··· 128 128 } 129 129 } 130 130 131 - pub fn onCompletion(rt: *io.Runtime, task: io.Task) anyerror!void { 131 + pub fn onCompletion(rt: *io.Ring, task: io.Task) anyerror!void { 132 132 const self = task.userdataCast(MultiWriter); 133 133 const result = task.result.?; 134 134 ··· 151 151 152 152 pub fn main() !void { 153 153 var gpa: std.heap.DebugAllocator(.{}) = .init; 154 - var rt: io.Runtime = try .init(gpa.allocator(), 16); 154 + var rt: io.Ring = try .init(gpa.allocator(), 16); 155 155 defer rt.deinit(); 156 156 157 157 // Pretend I created some files
+5 -5
src/Kqueue.zig
··· 548 548 return self.kq; 549 549 } 550 550 551 - pub fn reapCompletions(self: *Kqueue, rt: *io.Runtime) anyerror!void { 551 + pub fn reapCompletions(self: *Kqueue, rt: *io.Ring) anyerror!void { 552 552 defer self.event_idx = 0; 553 553 554 554 if (self.event_idx == 0) { ··· 604 604 /// to call the callback and return the task(s) to the free list 605 605 fn handleSynchronousCompletion( 606 606 self: *Kqueue, 607 - rt: *io.Runtime, 607 + rt: *io.Ring, 608 608 task: *io.Task, 609 609 ) !void { 610 610 switch (task.req) { ··· 685 685 686 686 fn handleCompletion( 687 687 self: *Kqueue, 688 - rt: *io.Runtime, 688 + rt: *io.Ring, 689 689 task: *io.Task, 690 690 event: posix.Kevent, 691 691 ) !void { ··· 790 790 } 791 791 } 792 792 793 - fn releaseTask(self: *Kqueue, rt: *io.Runtime, task: *io.Task) void { 793 + fn releaseTask(self: *Kqueue, rt: *io.Ring, task: *io.Task) void { 794 794 rt.free_q.push(task); 795 795 if (task.deadline) |d| { 796 796 // remove the deadline ··· 805 805 } 806 806 } 807 807 808 - fn handleExpiredTimer(self: *Kqueue, rt: *io.Runtime, t: Timer) !void { 808 + fn handleExpiredTimer(self: *Kqueue, rt: *io.Ring, t: Timer) !void { 809 809 switch (t) { 810 810 .deadline => |deadline| { 811 811 defer self.releaseTask(rt, deadline.task);
+1 -1
src/Mock.zig
··· 81 81 } 82 82 } 83 83 84 - pub fn reapCompletions(self: *Mock, rt: *io.Runtime) anyerror!void { 84 + pub fn reapCompletions(self: *Mock, rt: *io.Ring) anyerror!void { 85 85 while (self.completions.pop()) |task| { 86 86 try task.callback(rt, task.*); 87 87 rt.free_q.push(task);
+3 -3
src/Task.zig
··· 4 4 const io = @import("main.zig"); 5 5 6 6 const Allocator = std.mem.Allocator; 7 - const Runtime = io.Runtime; 7 + const Ring = io.Ring; 8 8 9 9 userdata: ?*anyopaque = null, 10 10 msg: u16 = 0, ··· 37 37 38 38 pub fn setDeadline( 39 39 self: *Task, 40 - rt: *Runtime, 40 + rt: *Ring, 41 41 deadline: io.Timespec, 42 42 ) Allocator.Error!void { 43 43 std.debug.assert(!deadline.isZero()); ··· 55 55 56 56 pub fn cancel( 57 57 self: *Task, 58 - rt: *Runtime, 58 + rt: *Ring, 59 59 ctx: io.Context, 60 60 ) Allocator.Error!void { 61 61 const task = try rt.getTask();
+1 -1
src/Uring.zig
··· 243 243 return self.ring.get_sqe() catch unreachable; 244 244 } 245 245 246 - pub fn reapCompletions(self: *Uring, rt: *io.Runtime) anyerror!void { 246 + pub fn reapCompletions(self: *Uring, rt: *io.Ring) anyerror!void { 247 247 var cqes: [64]linux.io_uring_cqe = undefined; 248 248 const n = self.ring.copy_cqes(&cqes, 0) catch |err| { 249 249 switch (err) {
+36 -36
src/main.zig
··· 24 24 pub const has_io_uring = builtin.os.tag == .linux; 25 25 26 26 pub const Task = @import("Task.zig"); 27 - pub const Callback = *const fn (*Runtime, Task) anyerror!void; 28 - pub fn noopCallback(_: *Runtime, _: Task) anyerror!void {} 27 + pub const Callback = *const fn (*Ring, Task) anyerror!void; 28 + pub fn noopCallback(_: *Ring, _: Task) anyerror!void {} 29 29 30 30 pub const RunCondition = enum { 31 31 once, ··· 103 103 104 104 pub fn reapCompletions( 105 105 self: *Backend, 106 - rt: *Runtime, 106 + rt: *Ring, 107 107 ) !void { 108 108 return switch (self.*) { 109 109 inline else => |*backend| backend.reapCompletions(rt), ··· 121 121 pub const FreeQueue = Queue(Task, .free); 122 122 pub const SubmissionQueue = Queue(Task, .in_flight); 123 123 124 - pub const Runtime = struct { 124 + pub const Ring = struct { 125 125 backend: Backend, 126 126 gpa: Allocator, 127 127 ··· 129 129 submission_q: SubmissionQueue = .{}, 130 130 free_q: FreeQueue = .{}, 131 131 132 - pub fn init(gpa: Allocator, entries: u16) !Runtime { 132 + pub fn init(gpa: Allocator, entries: u16) !Ring { 133 133 return .{ 134 134 .backend = .{ .platform = try .init(gpa, entries) }, 135 135 .gpa = gpa, 136 136 }; 137 137 } 138 138 139 - pub fn initChild(self: *Runtime, entries: u16) !Runtime { 139 + pub fn initChild(self: *Ring, entries: u16) !Ring { 140 140 return .{ 141 141 .backend = try self.backend.initChild(entries), 142 142 .gpa = self.gpa, 143 143 }; 144 144 } 145 145 146 - pub fn initMock(gpa: Allocator, entries: u16) !Runtime { 146 + pub fn initMock(gpa: Allocator, entries: u16) !Ring { 147 147 return .{ 148 148 .backend = .{ .mock = try .init(entries) }, 149 149 .gpa = gpa, ··· 153 153 }; 154 154 } 155 155 156 - pub fn deinit(self: *Runtime) void { 156 + pub fn deinit(self: *Ring) void { 157 157 self.backend.deinit(self.gpa); 158 158 while (self.free_q.pop()) |task| self.gpa.destroy(task); 159 159 while (self.submission_q.pop()) |task| self.gpa.destroy(task); 160 160 while (self.completion_q.pop()) |task| self.gpa.destroy(task); 161 161 } 162 162 163 - pub fn run(self: *Runtime, condition: RunCondition) !void { 163 + pub fn run(self: *Ring, condition: RunCondition) !void { 164 164 while (true) { 165 165 try self.backend.submitAndWait(&self.submission_q); 166 166 try self.backend.reapCompletions(self); ··· 172 172 } 173 173 } 174 174 175 - pub fn getTask(self: *Runtime) Allocator.Error!*Task { 175 + pub fn getTask(self: *Ring) Allocator.Error!*Task { 176 176 return self.free_q.pop() orelse try self.gpa.create(Task); 177 177 } 178 178 179 179 pub fn noop( 180 - self: *Runtime, 180 + self: *Ring, 181 181 ctx: Context, 182 182 ) Allocator.Error!*Task { 183 183 const task = try self.getTask(); ··· 193 193 } 194 194 195 195 pub fn timer( 196 - self: *Runtime, 196 + self: *Ring, 197 197 duration: Timespec, 198 198 ctx: Context, 199 199 ) Allocator.Error!*Task { ··· 209 209 return task; 210 210 } 211 211 212 - pub fn cancelAll(self: *Runtime) Allocator.Error!void { 212 + pub fn cancelAll(self: *Ring) Allocator.Error!void { 213 213 const task = try self.getTask(); 214 214 task.* = .{ 215 215 .req = .{ .cancel = .all }, ··· 219 219 } 220 220 221 221 pub fn accept( 222 - self: *Runtime, 222 + self: *Ring, 223 223 fd: posix.fd_t, 224 224 ctx: Context, 225 225 ) Allocator.Error!*Task { ··· 236 236 } 237 237 238 238 pub fn msgRing( 239 - self: *Runtime, 240 - target: *Runtime, 239 + self: *Ring, 240 + target: *Ring, 241 241 target_task: *Task, // The task that the target ring will receive. The callbacks of 242 242 // this task are what will be called when the target receives the message 243 243 ··· 260 260 } 261 261 262 262 pub fn recv( 263 - self: *Runtime, 263 + self: *Ring, 264 264 fd: posix.fd_t, 265 265 buffer: []u8, 266 266 ctx: Context, ··· 281 281 } 282 282 283 283 pub fn write( 284 - self: *Runtime, 284 + self: *Ring, 285 285 fd: posix.fd_t, 286 286 buffer: []const u8, 287 287 ctx: Context, ··· 302 302 } 303 303 304 304 pub fn writev( 305 - self: *Runtime, 305 + self: *Ring, 306 306 fd: posix.fd_t, 307 307 vecs: []const posix.iovec_const, 308 308 ctx: Context, ··· 323 323 } 324 324 325 325 pub fn close( 326 - self: *Runtime, 326 + self: *Ring, 327 327 fd: posix.fd_t, 328 328 ctx: Context, 329 329 ) Allocator.Error!*Task { ··· 340 340 } 341 341 342 342 pub fn poll( 343 - self: *Runtime, 343 + self: *Ring, 344 344 fd: posix.fd_t, 345 345 mask: u32, 346 346 ctx: Context, ··· 358 358 } 359 359 360 360 pub fn socket( 361 - self: *Runtime, 361 + self: *Ring, 362 362 domain: u32, 363 363 socket_type: u32, 364 364 protocol: u32, ··· 377 377 } 378 378 379 379 pub fn connect( 380 - self: *Runtime, 380 + self: *Ring, 381 381 fd: posix.socket_t, 382 382 addr: *posix.sockaddr, 383 383 addr_len: posix.socklen_t, ··· 411 411 socket, 412 412 connect, 413 413 414 - /// userfd is meant to send file descriptors between Runtime instances (using msgRing) 414 + /// userfd is meant to send file descriptors between Ring instances (using msgRing) 415 415 userfd, 416 416 /// usermsg is meant to send a u16 between runtime instances (using msgRing) 417 417 usermsg, ··· 429 429 }, 430 430 accept: posix.fd_t, 431 431 msg_ring: struct { 432 - target: *Runtime, 432 + target: *Ring, 433 433 task: *Task, 434 434 }, 435 435 recv: struct { ··· 521 521 const Foo = struct { 522 522 bar: usize = 0, 523 523 524 - fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 524 + fn callback(_: *io.Ring, task: io.Task) anyerror!void { 525 525 const self = task.userdataCast(Foo); 526 526 self.bar += 1; 527 527 } 528 528 }; 529 529 530 530 test "runtime: noop" { 531 - var rt: io.Runtime = try .init(std.testing.allocator, 16); 531 + var rt: io.Ring = try .init(std.testing.allocator, 16); 532 532 defer rt.deinit(); 533 533 534 534 var foo: Foo = .{}; ··· 546 546 } 547 547 548 548 test "runtime: timer" { 549 - var rt: io.Runtime = try .init(std.testing.allocator, 16); 549 + var rt: io.Ring = try .init(std.testing.allocator, 16); 550 550 defer rt.deinit(); 551 551 552 552 var foo: Foo = .{}; ··· 562 562 } 563 563 564 564 test "runtime: poll" { 565 - var rt: io.Runtime = try .init(std.testing.allocator, 16); 565 + var rt: io.Ring = try .init(std.testing.allocator, 16); 566 566 defer rt.deinit(); 567 567 568 568 var foo: Foo = .{}; ··· 580 580 581 581 test "runtime: deadline doesn't call user callback" { 582 582 const gpa = std.testing.allocator; 583 - var rt = try io.Runtime.init(gpa, 16); 583 + var rt = try io.Ring.init(gpa, 16); 584 584 defer rt.deinit(); 585 585 586 586 var foo: Foo = .{}; ··· 596 596 597 597 test "runtime: timeout" { 598 598 const gpa = std.testing.allocator; 599 - var rt = try io.Runtime.init(gpa, 16); 599 + var rt = try io.Ring.init(gpa, 16); 600 600 defer rt.deinit(); 601 601 602 602 var foo: Foo = .{}; ··· 613 613 614 614 test "runtime: cancel" { 615 615 const gpa = std.testing.allocator; 616 - var rt = try io.Runtime.init(gpa, 16); 616 + var rt = try io.Ring.init(gpa, 16); 617 617 defer rt.deinit(); 618 618 619 619 var foo: Foo = .{}; ··· 633 633 634 634 test "runtime: cancel all" { 635 635 const gpa = std.testing.allocator; 636 - var rt = try io.Runtime.init(gpa, 16); 636 + var rt = try io.Ring.init(gpa, 16); 637 637 defer rt.deinit(); 638 638 639 639 const Foo2 = struct { 640 640 bar: usize = 0, 641 641 642 - fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 642 + fn callback(_: *io.Ring, task: io.Task) anyerror!void { 643 643 const self = task.userdataCast(@This()); 644 644 const result = task.result.?; 645 645 _ = result.timer catch |err| { ··· 669 669 670 670 test "runtime: msgRing" { 671 671 const gpa = std.testing.allocator; 672 - var rt1 = try io.Runtime.init(gpa, 16); 672 + var rt1 = try io.Ring.init(gpa, 16); 673 673 defer rt1.deinit(); 674 674 675 675 var rt2 = try rt1.initChild(16); ··· 681 681 682 682 const Msg = enum { rt1, rt2 }; 683 683 684 - fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 684 + fn callback(_: *io.Ring, task: io.Task) anyerror!void { 685 685 const self = task.userdataCast(@This()); 686 686 const msg = task.msgToEnum(Msg); 687 687 switch (msg) {
+5 -5
src/net.zig
··· 8 8 const assert = std.debug.assert; 9 9 10 10 pub fn tcpConnectToHost( 11 - rt: *io.Runtime, 11 + rt: *io.Ring, 12 12 host: []const u8, 13 13 port: u16, 14 14 ctx: io.Context, ··· 26 26 } 27 27 28 28 pub fn tcpConnectToAddr( 29 - rt: *io.Runtime, 29 + rt: *io.Ring, 30 30 addr: std.net.Address, 31 31 ctx: io.Context, 32 32 ) Allocator.Error!*ConnectTask { ··· 68 68 /// Cancels the current task. Not guaranteed to actually cancel. User's callback will get an 69 69 /// error.Canceled if cancelation was successful, otherwise the operation will complete as 70 70 /// normal and this is essentially a no-op 71 - pub fn cancel(self: *ConnectTask, rt: *io.Runtime) void { 71 + pub fn cancel(self: *ConnectTask, rt: *io.Ring) void { 72 72 _ = self.task.cancel(rt, null, 0, io.noopCallback) catch {}; 73 73 } 74 74 75 - pub fn handleMsg(rt: *io.Runtime, task: io.Task) anyerror!void { 75 + pub fn handleMsg(rt: *io.Ring, task: io.Task) anyerror!void { 76 76 const self = task.userdataCast(ConnectTask); 77 77 const result = task.result.?; 78 78 switch (task.msgToEnum(Msg)) { ··· 127 127 }; 128 128 129 129 test "tcp connect" { 130 - var rt: io.Runtime = try .init(std.testing.allocator, 16); 130 + var rt: io.Ring = try .init(std.testing.allocator, 16); 131 131 defer rt.deinit(); 132 132 133 133 const addr: std.net.Address = try .parseIp4("127.0.0.1", 80);
+10 -10
src/tls.zig
··· 22 22 written: usize = 0, 23 23 24 24 userdata: ?*anyopaque = null, 25 - callback: *const fn (*io.Runtime, io.Task) anyerror!void = io.noopCallback, 25 + callback: *const fn (*io.Ring, io.Task) anyerror!void = io.noopCallback, 26 26 close_msg: u16 = 0, 27 27 write_msg: u16 = 0, 28 28 recv_msg: u16 = 0, ··· 38 38 handshake: tls.nonblock.Client, 39 39 task: *io.Task, 40 40 41 - pub fn handleMsg(rt: *io.Runtime, task: io.Task) anyerror!void { 41 + pub fn handleMsg(rt: *io.Ring, task: io.Task) anyerror!void { 42 42 const self = task.userdataCast(HandshakeTask); 43 43 const result = task.result.?; 44 44 ··· 148 148 149 149 /// Tries to cancel the handshake. Callback will receive an error.Canceled if cancelation 150 150 /// was successful, otherwise handhsake will proceed 151 - pub fn cancel(self: *HandshakeTask, rt: *io.Runtime) void { 151 + pub fn cancel(self: *HandshakeTask, rt: *io.Ring) void { 152 152 self.task.cancel(rt, null, 0, io.noopCallback) catch {}; 153 153 } 154 154 }; ··· 162 162 /// Initializes a handshake, which will ultimately deliver a Client to the callback via a 163 163 /// userptr result 164 164 pub fn init( 165 - rt: *io.Runtime, 165 + rt: *io.Ring, 166 166 fd: posix.fd_t, 167 167 opts: tls.config.Client, 168 168 ctx: io.Context, ··· 189 189 self.cleartext_buf.deinit(gpa); 190 190 } 191 191 192 - pub fn close(self: *Client, gpa: Allocator, rt: *io.Runtime) !void { 192 + pub fn close(self: *Client, gpa: Allocator, rt: *io.Ring) !void { 193 193 // close notify is 2 bytes long 194 194 const len = self.tls.encryptedLength(2); 195 195 try self.ciphertext_buf.ensureUnusedCapacity(gpa, len); ··· 209 209 } 210 210 } 211 211 212 - fn onCompletion(rt: *io.Runtime, task: io.Task) anyerror!void { 212 + fn onCompletion(rt: *io.Ring, task: io.Task) anyerror!void { 213 213 const self = task.userdataCast(Client); 214 214 const result = task.result.?; 215 215 ··· 317 317 } 318 318 } 319 319 320 - pub fn recv(self: *Client, rt: *io.Runtime) !void { 320 + pub fn recv(self: *Client, rt: *io.Ring) !void { 321 321 if (self.recv_task != null) return; 322 322 self.recv_task = try rt.recv( 323 323 self.fd, ··· 330 330 try self.cleartext_buf.appendSlice(gpa, bytes); 331 331 } 332 332 333 - pub fn flush(self: *Client, gpa: Allocator, rt: *io.Runtime) !void { 333 + pub fn flush(self: *Client, gpa: Allocator, rt: *io.Ring) !void { 334 334 const len = self.tls.encryptedLength(self.cleartext_buf.items.len); 335 335 try self.ciphertext_buf.ensureUnusedCapacity(gpa, len); 336 336 const slice = self.ciphertext_buf.unusedCapacitySlice(); ··· 372 372 const net = @import("net.zig"); 373 373 const gpa = std.testing.allocator; 374 374 375 - var rt = try io.Runtime.init(gpa, 16); 375 + var rt = try io.Ring.init(gpa, 16); 376 376 defer rt.deinit(); 377 377 378 378 const Foo = struct { ··· 389 389 recv, 390 390 }; 391 391 392 - fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 392 + fn callback(_: *io.Ring, task: io.Task) anyerror!void { 393 393 const self = task.userdataCast(Self); 394 394 const result = task.result.?; 395 395 errdefer {