this repo has no description
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

ourio: add read, readv

+100
+48
src/ourio.zig
··· 413 413 return task; 414 414 } 415 415 416 + pub fn readv( 417 + self: *Ring, 418 + fd: posix.fd_t, 419 + vecs: []const posix.iovec, 420 + ctx: Context, 421 + ) Allocator.Error!*Task { 422 + const task = try self.getTask(); 423 + task.* = .{ 424 + .userdata = ctx.ptr, 425 + .msg = ctx.msg, 426 + .callback = ctx.cb, 427 + .req = .{ .readv = .{ 428 + .fd = fd, 429 + .vecs = vecs, 430 + } }, 431 + }; 432 + 433 + self.submission_q.push(task); 434 + return task; 435 + } 436 + 437 + pub fn read( 438 + self: *Ring, 439 + fd: posix.fd_t, 440 + buffer: []u8, 441 + ctx: Context, 442 + ) Allocator.Error!*Task { 443 + const task = try self.getTask(); 444 + task.* = .{ 445 + .userdata = ctx.ptr, 446 + .msg = ctx.msg, 447 + .callback = ctx.cb, 448 + .req = .{ .read = .{ 449 + .fd = fd, 450 + .buffer = buffer, 451 + } }, 452 + }; 453 + 454 + self.submission_q.push(task); 455 + return task; 456 + } 457 + 416 458 pub fn open( 417 459 self: *Ring, 418 460 path: [:0]const u8, ··· 514 556 statx, 515 557 readv, 516 558 open, 559 + read, 517 560 518 561 /// userfd is meant to send file descriptors between Ring instances (using msgRing) 519 562 userfd, ··· 576 619 flags: posix.O, 577 620 mode: posix.mode_t, 578 621 }, 622 + read: struct { 623 + fd: posix.fd_t, 624 + buffer: []u8, 625 + }, 579 626 580 627 userfd, 581 628 usermsg, ··· 599 646 statx: ResultError!*Statx, 600 647 readv: ResultError!usize, 601 648 open: ResultError!posix.fd_t, 649 + read: ResultError!usize, 602 650 603 651 userfd: anyerror!posix.fd_t, 604 652 usermsg: u16,
+36
src/ourio/Kqueue.zig
··· 352 352 } 353 353 }, 354 354 355 + .read => |req| { 356 + self.in_flight.push(task); 357 + const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task); 358 + try self.submission_queue.append(self.gpa, kevent); 359 + }, 360 + 355 361 .readv => |req| { 356 362 self.in_flight.push(task); 357 363 const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task); ··· 501 507 const kevent = evSet(@intCast(cancel_req.fd), EVFILT.WRITE, EV.DELETE, task); 502 508 try self.submission_queue.append(self.gpa, kevent); 503 509 } 510 + }, 511 + 512 + .read => |cancel_req| { 513 + self.in_flight.remove(task); 514 + task.result = .{ .read = error.Canceled }; 515 + const kevent = evSet( 516 + @intCast(cancel_req.fd), 517 + EVFILT.READ, 518 + EV.DELETE, 519 + task, 520 + ); 521 + try self.submission_queue.append(self.gpa, kevent); 504 522 }, 505 523 506 524 .readv => |cancel_req| { ··· 682 700 // async tasks. These can be handled synchronously in a cancel all 683 701 .accept, 684 702 .poll, 703 + .read, 685 704 .readv, 686 705 .recv, 687 706 .write, ··· 731 750 .noop => unreachable, 732 751 .open => .{ .open = error.Canceled }, 733 752 .poll => .{ .poll = error.Canceled }, 753 + .read => .{ .read = error.Canceled }, 734 754 .readv => .{ .readv = error.Canceled }, 735 755 .recv => .{ .recv = error.Canceled }, 736 756 .socket => .{ .socket = error.Canceled }, ··· 816 836 const err = unexpectedError(dataToE(event.data)); 817 837 task.result = .{ .poll = err }; 818 838 } else task.result = .{ .poll = {} }; 839 + return task.callback(rt, task.*); 840 + }, 841 + 842 + .read => |req| { 843 + defer self.releaseTask(rt, task); 844 + self.in_flight.remove(task); 845 + if (event.flags & EV.ERROR != 0) { 846 + // Interpret data as an errno 847 + const err = unexpectedError(dataToE(event.data)); 848 + task.result = .{ .read = err }; 849 + return task.callback(rt, task.*); 850 + } 851 + if (posix.read(req.fd, req.buffer)) |n| 852 + task.result = .{ .read = n } 853 + else |_| 854 + task.result = .{ .read = error.Unexpected }; 819 855 return task.callback(rt, task.*); 820 856 }, 821 857
+2
src/ourio/Mock.zig
··· 20 20 noop_cb: ?*const fn (*io.Task) io.Result = null, 21 21 open_cb: ?*const fn (*io.Task) io.Result = null, 22 22 poll_cb: ?*const fn (*io.Task) io.Result = null, 23 + read_cb: ?*const fn (*io.Task) io.Result = null, 23 24 readv_cb: ?*const fn (*io.Task) io.Result = null, 24 25 recv_cb: ?*const fn (*io.Task) io.Result = null, 25 26 socket_cb: ?*const fn (*io.Task) io.Result = null, ··· 72 73 .noop => if (self.noop_cb) |cb| cb(task) else return error.NoMockCallback, 73 74 .open => if (self.open_cb) |cb| cb(task) else return error.NoMockCallback, 74 75 .poll => if (self.poll_cb) |cb| cb(task) else return error.NoMockCallback, 76 + .read => if (self.read_cb) |cb| cb(task) else return error.NoMockCallback, 75 77 .readv => if (self.readv_cb) |cb| cb(task) else return error.NoMockCallback, 76 78 .recv => if (self.recv_cb) |cb| cb(task) else return error.NoMockCallback, 77 79 .socket => if (self.socket_cb) |cb| cb(task) else return error.NoMockCallback,
+14
src/ourio/Uring.zig
··· 225 225 self.prepDeadline(task, sqe); 226 226 }, 227 227 228 + .read => |req| { 229 + const sqe = self.getSqe(); 230 + sqe.prep_read(req.fd, req.buffer, 0); 231 + sqe.user_data = @intFromPtr(task); 232 + self.prepDeadline(task, sqe); 233 + }, 234 + 228 235 .readv => |req| { 229 236 const sqe = self.getSqe(); 230 237 sqe.prep_readv(req.fd, req.vecs, 0); ··· 363 370 364 371 .statx => |req| .{ .statx = switch (cqeToE(cqe.res)) { 365 372 .SUCCESS => req.result, 373 + .INVAL => io.ResultError.Invalid, 374 + .CANCELED => io.ResultError.Canceled, 375 + else => |e| unexpectedError(e), 376 + } }, 377 + 378 + .read => .{ .read = switch (cqeToE(cqe.res)) { 379 + .SUCCESS => @intCast(cqe.res), 366 380 .INVAL => io.ResultError.Invalid, 367 381 .CANCELED => io.ResultError.Canceled, 368 382 else => |e| unexpectedError(e),