The unpac monorepo manager self-hosting as a monorepo using unpac
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #49 from patricoferris/sendmsg-recvmsg

Add sendmsg and recvmsg

authored by

Thomas Leonard and committed by
GitHub
5fbb71e7 5f9b9142

+118
+26
lib/uring/uring.ml
··· 130 130 (iovec, len, buffers) 131 131 end 132 132 133 + (* Used for the sendmsg/recvmsg calls. Liburing doesn't support sendto/recvfrom at the time of writing. *) 134 + module Msghdr = struct 135 + type msghdr 136 + type t = msghdr * Sockaddr.t * Iovec.t 137 + external make_msghdr : Sockaddr.t -> Iovec.t-> msghdr = "ocaml_uring_make_msghdr" 138 + 139 + let get_sockaddr (_, addr, _) = addr 140 + 141 + let create_with_addr addr buffs = 142 + let iovs = Iovec.make buffs in 143 + make_msghdr addr iovs, addr, iovs 144 + 145 + let create buffs = 146 + create_with_addr (Sockaddr.create ()) buffs 147 + end 148 + 133 149 type 'a job = 'a Heap.entry 134 150 135 151 module Uring = struct ··· 157 173 external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_accept" [@@noalloc] 158 174 external submit_cancel : t -> id -> id -> bool = "ocaml_uring_submit_cancel" [@@noalloc] 159 175 external submit_openat2 : t -> id -> Unix.file_descr -> Open_how.t -> bool = "ocaml_uring_submit_openat2" [@@noalloc] 176 + external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> bool = "ocaml_uring_submit_send_msg" [@@noalloc] 177 + external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> bool = "ocaml_uring_submit_recv_msg" [@@noalloc] 160 178 161 179 type cqe_option = private 162 180 | Cqe_none ··· 314 332 315 333 let accept t fd addr user_data = 316 334 with_id_full t (fun id -> Uring.submit_accept t.uring id fd addr) user_data ~extra_data:addr 335 + 336 + let send_msg t fd addr buffers user_data = 337 + let addr = Sockaddr.of_unix addr in 338 + let msghdr = Msghdr.create_with_addr addr buffers in 339 + with_id_full t (fun id -> Uring.submit_send_msg t.uring id fd msghdr) user_data ~extra_data:msghdr 340 + 341 + let recv_msg t fd msghdr user_data = 342 + with_id_full t (fun id -> Uring.submit_recv_msg t.uring id fd msghdr) user_data ~extra_data:msghdr 317 343 318 344 let cancel t job user_data = 319 345 ignore (Heap.ptr job : Uring.id); (* Check it's still valid *)
+22
lib/uring/uring.mli
··· 199 199 if [job] had already completed by the time the kernel processed the cancellation request. 200 200 @raise Invalid_argument if the job has already been returned by e.g. {!wait}. *) 201 201 202 + module Msghdr : sig 203 + type t 204 + 205 + val create : Cstruct.t list -> t 206 + (** [create buffs] makes a new [msghdr] using the [buffs] 207 + for the underlying [iovec]. A dummy socket address is used 208 + and will be filled when data is received.*) 209 + 210 + val get_sockaddr : t -> Sockaddr.t 211 + (** [get_sockaddr t] gets the socket address from [t]. When used 212 + with {!recv_msg} the socket will only be the sender address once the message 213 + is received, until then the address will be a dummy address. *) 214 + end 215 + 216 + val send_msg : 'a t -> Unix.file_descr -> Unix.sockaddr -> Cstruct.t list -> 'a -> 'a job option 217 + (** [send_msg t fd addr buffs d] will submit a [sendmsg(2)] request. The [Msghdr] will be constructed 218 + from the address ([addr]) and the buffers ([buffs]). *) 219 + 220 + val recv_msg : 'a t -> Unix.file_descr -> Msghdr.t -> 'a -> 'a job option 221 + (** [recv_msg t fd msghdr d] will submit a [recvmsg(2)] request. If the request is 222 + successful then the [msghdr] will contain the sender address and the data sent. *) 223 + 202 224 (** {2 Submitting operations} *) 203 225 204 226 val submit : 'a t -> int
+70
lib/uring/uring_stubs.c
··· 405 405 CAMLreturn(v_sockaddr); 406 406 } 407 407 408 + #define Msghdr_val(v) (*((struct msghdr **) Data_custom_val(v))) 409 + 410 + static void finalize_msghdr(value v) { 411 + caml_stat_free(Msghdr_val(v)); 412 + Msghdr_val(v) = NULL; 413 + } 414 + 415 + static struct custom_operations msghdr_ops = { 416 + "uring.msghdr_ops", 417 + finalize_msghdr, 418 + custom_compare_default, 419 + custom_hash_default, 420 + custom_serialize_default, 421 + custom_deserialize_default, 422 + custom_compare_ext_default, 423 + custom_fixed_length_default 424 + }; 425 + 426 + // v_sockaddr and v_iov must not be freed before the msghdr as it contains pointers to them 427 + value 428 + ocaml_uring_make_msghdr(value v_sockaddr, value v_iov) { 429 + CAMLparam2(v_sockaddr, v_iov); 430 + CAMLlocal1(v); 431 + struct msghdr *msg; 432 + struct iovec *iovs = Iovec_val(Field(v_iov, 0)); 433 + int iovs_len = Int_val(Field(v_iov, 1)); 434 + // Allocate a pointer on the OCaml heap for the msghdr 435 + v = caml_alloc_custom_mem(&msghdr_ops, sizeof(struct msghdr *), sizeof(struct msghdr)); 436 + Msghdr_val(v) = NULL; 437 + msg = (struct msghdr *) caml_stat_alloc(sizeof(struct msghdr)); 438 + // The msghdr must zero-ed to avoid unwanted errors 439 + memset(msg, 0, sizeof(struct msghdr)); 440 + Msghdr_val(v) = msg; 441 + struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); 442 + // Store the address and iovec data in the message 443 + msg->msg_name = &(addr->sock_addr_addr); 444 + msg->msg_namelen = sizeof(addr->sock_addr_addr); 445 + msg->msg_iov = iovs; 446 + msg->msg_iovlen = iovs_len; 447 + CAMLreturn(v); 448 + } 449 + 408 450 // v_sockaddr must not be GC'd while the call is in progress 409 451 value 410 452 ocaml_uring_submit_connect(value v_uring, value v_id, value v_fd, value v_sockaddr) { ··· 415 457 sqe = io_uring_get_sqe(ring); 416 458 if (!sqe) CAMLreturn(Val_false); 417 459 io_uring_prep_connect(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), addr->sock_addr_len); 460 + io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); 461 + CAMLreturn(Val_true); 462 + } 463 + 464 + // v_msghdr must not be GC'd while the call is in progress 465 + value 466 + ocaml_uring_submit_send_msg(value v_uring, value v_id, value v_fd, value v_msghdr) { 467 + CAMLparam2(v_uring, v_msghdr); 468 + struct io_uring *ring = Ring_val(v_uring); 469 + struct msghdr *msg = Msghdr_val(Field(v_msghdr, 0)); 470 + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 471 + if (!sqe) CAMLreturn(Val_false); 472 + dprintf("submit_sendmsg\n"); 473 + io_uring_prep_sendmsg(sqe, Int_val(v_fd), msg, 0); 474 + io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); 475 + CAMLreturn(Val_true); 476 + } 477 + 478 + // v_msghdr must not be GC'd while the call is in progress 479 + value 480 + ocaml_uring_submit_recv_msg(value v_uring, value v_id, value v_fd, value v_msghdr) { 481 + CAMLparam2(v_uring, v_msghdr); 482 + struct io_uring *ring = Ring_val(v_uring); 483 + struct msghdr *msg = Msghdr_val(Field(v_msghdr, 0)); 484 + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 485 + if (!sqe) CAMLreturn(Val_false); 486 + dprintf("submit_recvmsg:msghdr %p: registering iobuf base %p len %lu\n", msg, msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len); 487 + io_uring_prep_recvmsg(sqe, Int_val(v_fd), msg, 0); 418 488 io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); 419 489 CAMLreturn(Val_true); 420 490 }