···133133(* Used for the sendmsg/recvmsg calls. Liburing doesn't support sendto/recvfrom at the time of writing. *)
134134module Msghdr = struct
135135 type msghdr
136136- type t = msghdr * Sockaddr.t * Iovec.t
137137- external make_msghdr : Sockaddr.t -> Iovec.t-> msghdr = "ocaml_uring_make_msghdr"
136136+ type t = msghdr * Sockaddr.t option * Iovec.t
137137+ external make_msghdr : int -> Unix.file_descr list -> Sockaddr.t option -> Iovec.t-> msghdr = "ocaml_uring_make_msghdr"
138138+ external get_msghdr_fds : msghdr -> Unix.file_descr list = "ocaml_uring_get_msghdr_fds"
138139139139- let get_sockaddr (_, addr, _) = addr
140140+ let get_fds (hdr, _, _) = get_msghdr_fds hdr
140141141141- let create_with_addr addr buffs =
142142+ (* Create a value with space for [n_fds] file descriptors.
143143+ When sending, [fds] is used to fill those slots. When receiving, they can be left blank. *)
144144+ let create_with_addr ~n_fds ~fds ?addr buffs =
142145 let iovs = Iovec.make buffs in
143143- make_msghdr addr iovs, addr, iovs
146146+ make_msghdr n_fds fds addr iovs, addr, iovs
144147145145- let create buffs =
146146- create_with_addr (Sockaddr.create ()) buffs
148148+ let create ?(n_fds=0) ?addr buffs =
149149+ create_with_addr ~n_fds ~fds:[] ?addr buffs
147150end
148151149152type 'a job = 'a Heap.entry
···333336let accept t fd addr user_data =
334337 with_id_full t (fun id -> Uring.submit_accept t.uring id fd addr) user_data ~extra_data:addr
335338336336-let send_msg t fd addr buffers user_data =
337337- let addr = Sockaddr.of_unix addr in
338338- let msghdr = Msghdr.create_with_addr addr buffers in
339339+let send_msg ?(fds=[]) ?dst t fd buffers user_data =
340340+ let addr = Option.map Sockaddr.of_unix dst in
341341+ let n_fds = List.length fds in
342342+ let msghdr = Msghdr.create_with_addr ~n_fds ~fds ?addr buffers in
339343 with_id_full t (fun id -> Uring.submit_send_msg t.uring id fd msghdr) user_data ~extra_data:msghdr
340344341345let recv_msg t fd msghdr user_data =
+12-11
lib/uring/uring.mli
···202202module Msghdr : sig
203203 type t
204204205205- val create : Cstruct.t list -> t
205205+ val create : ?n_fds:int -> ?addr:Sockaddr.t -> Cstruct.t list -> t
206206 (** [create buffs] makes a new [msghdr] using the [buffs]
207207- for the underlying [iovec]. A dummy socket address is used
208208- and will be filled when data is received.*)
207207+ for the underlying [iovec].
208208+ @param addr The remote address.
209209+ Use {!Sockaddr.create} to create a dummy address that will be filled when data is received.
210210+ @param n_fds Reserve space to receive this many FDs (default 0) *)
209211210210- val get_sockaddr : t -> Sockaddr.t
211211- (** [get_sockaddr t] gets the socket address from [t]. When used
212212- with {!recv_msg} the socket will only be the sender address once the message
213213- is received, until then the address will be a dummy address. *)
212212+ val get_fds : t -> Unix.file_descr list
214213end
215214216216-val send_msg : 'a t -> Unix.file_descr -> Unix.sockaddr -> Cstruct.t list -> 'a -> 'a job option
217217-(** [send_msg t fd addr buffs d] will submit a [sendmsg(2)] request. The [Msghdr] will be constructed
218218- from the address ([addr]) and the buffers ([buffs]). *)
215215+val send_msg : ?fds:Unix.file_descr list -> ?dst:Unix.sockaddr -> 'a t -> Unix.file_descr -> Cstruct.t list -> 'a -> 'a job option
216216+(** [send_msg t fd buffs d] will submit a [sendmsg(2)] request. The [Msghdr] will be constructed
217217+ from the FDs ([fds]), address ([dst]) and buffers ([buffs]).
218218+ @param dst Destination address.
219219+ @param fds Extra file descriptors to attach to the message. *)
219220220221val recv_msg : 'a t -> Unix.file_descr -> Msghdr.t -> 'a -> 'a job option
221222(** [recv_msg t fd msghdr d] will submit a [recvmsg(2)] request. If the request is
222222- successful then the [msghdr] will contain the sender address and the data sent. *)
223223+ successful then the [msghdr] will contain the sender address and the data received. *)
223224224225(** {2 Submitting operations} *)
225226
+71-10
lib/uring/uring_stubs.c
···2727#include <caml/signals.h>
2828#include <caml/unixsupport.h>
2929#include <caml/socketaddr.h>
3030+#include <sys/socket.h>
3031#include <errno.h>
3132#include <string.h>
3233#include <poll.h>
···423424 custom_fixed_length_default
424425};
425426427427+struct msghdr_with_cmsg {
428428+ struct msghdr msg;
429429+ struct cmsghdr cmsg;
430430+};
431431+426432// v_sockaddr and v_iov must not be freed before the msghdr as it contains pointers to them
427433value
428428-ocaml_uring_make_msghdr(value v_sockaddr, value v_iov) {
429429- CAMLparam2(v_sockaddr, v_iov);
434434+ocaml_uring_make_msghdr(value v_n_fds, value v_fds, value v_sockaddr_opt, value v_iov) {
435435+ CAMLparam3(v_fds, v_sockaddr_opt, v_iov);
430436 CAMLlocal1(v);
431437 struct msghdr *msg;
432438 struct iovec *iovs = Iovec_val(Field(v_iov, 0));
433439 int iovs_len = Int_val(Field(v_iov, 1));
440440+ int n_fds = Int_val(v_n_fds);
441441+ int cmsg_offset, controllen, total_size;
442442+ cmsg_offset = sizeof(struct msghdr_with_cmsg) - sizeof(struct cmsghdr);
443443+ controllen = n_fds > 0 ? CMSG_SPACE(sizeof(int) * n_fds) : 0;
444444+ total_size = cmsg_offset + controllen;
445445+ //dprintf("using %d bytes to hold %d FDs\n", total_size, n_fds);
434446 // Allocate a pointer on the OCaml heap for the msghdr
435435- v = caml_alloc_custom_mem(&msghdr_ops, sizeof(struct msghdr *), sizeof(struct msghdr));
447447+ v = caml_alloc_custom_mem(&msghdr_ops, sizeof(struct msghdr *), total_size);
436448 Msghdr_val(v) = NULL;
437437- msg = (struct msghdr *) caml_stat_alloc(sizeof(struct msghdr));
438438- // The msghdr must zero-ed to avoid unwanted errors
439439- memset(msg, 0, sizeof(struct msghdr));
449449+ msg = (struct msghdr *) caml_stat_alloc(total_size);
450450+ // The msghdr and cmsghdr must zero-ed to avoid unwanted errors
451451+ memset(msg, 0, total_size);
440452 Msghdr_val(v) = msg;
441441- struct sock_addr_data *addr = Sock_addr_val(v_sockaddr);
442442- // Store the address and iovec data in the message
443443- msg->msg_name = &(addr->sock_addr_addr);
444444- msg->msg_namelen = sizeof(addr->sock_addr_addr);
453453+ if (Is_some(v_sockaddr_opt)) {
454454+ struct sock_addr_data *addr = Sock_addr_val(Some_val(v_sockaddr_opt));
455455+ // Store the address and iovec data in the message
456456+ msg->msg_name = &(addr->sock_addr_addr);
457457+ msg->msg_namelen = sizeof(addr->sock_addr_addr);
458458+ } else {
459459+ msg->msg_name = NULL;
460460+ }
445461 msg->msg_iov = iovs;
446462 msg->msg_iovlen = iovs_len;
463463+ // Add the FDs to the message
464464+ if (n_fds > 0) {
465465+ int i;
466466+ struct cmsghdr *cm;
467467+ msg->msg_control = &(((struct msghdr_with_cmsg *) msg)->cmsg);
468468+ msg->msg_controllen = controllen;
469469+ if (Is_block(v_fds)) {
470470+ cm = CMSG_FIRSTHDR(msg);
471471+ cm->cmsg_level = SOL_SOCKET;
472472+ cm->cmsg_type = SCM_RIGHTS;
473473+ cm->cmsg_len = CMSG_LEN(n_fds * sizeof(int));
474474+ for (i = 0; i < n_fds; i++) {
475475+ int fd = -1;
476476+ if (Is_block(v_fds)) {
477477+ fd = Int_val(Field(v_fds, 0));
478478+ v_fds = Field(v_fds, 1);
479479+ }
480480+ ((int *)CMSG_DATA(cm))[i] = fd;
481481+ }
482482+ }
483483+ }
447484 CAMLreturn(v);
485485+}
486486+487487+value
488488+ocaml_uring_get_msghdr_fds(value v_msghdr) {
489489+ CAMLparam1(v_msghdr);
490490+ CAMLlocal2(v_list, v_cons);
491491+ struct msghdr *msg = Msghdr_val(v_msghdr);
492492+ struct cmsghdr *cm;
493493+ v_list = Val_int(0);
494494+ for (cm = CMSG_FIRSTHDR(msg); cm; cm = CMSG_NXTHDR(msg, cm)) {
495495+ if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) {
496496+ int *fds = (int *) CMSG_DATA(cm);
497497+ int n_fds = (cm->cmsg_len - CMSG_LEN(0)) / sizeof(int);
498498+ int i;
499499+ for (i = n_fds - 1; i >= 0; i--) {
500500+ int fd = Val_int(fds[i]);
501501+ value v_cons = caml_alloc_tuple(2);
502502+ Store_field(v_cons, 0, fd);
503503+ Store_field(v_cons, 1, v_list);
504504+ v_list = v_cons;
505505+ }
506506+ }
507507+ }
508508+ CAMLreturn(v_list);
448509}
449510450511// v_sockaddr must not be GC'd while the call is in progress
+29
tests/main.ml
···366366 check_int ~__POS__ ~expected:0 r_read;
367367 Uring.exit t
368368369369+let test_send_msg () =
370370+ let r, w = Unix.pipe () in
371371+ let t = Uring.create ~queue_depth:2 () in
372372+ let a, b = Unix.(socketpair PF_UNIX SOCK_STREAM 0) in
373373+ let bufs = [Cstruct.of_string "hi"] in
374374+ assert_some ~__POS__ (Uring.send_msg t a ~fds:[r; w] bufs `Send);
375375+ check_int ~__POS__ (Uring.submit t) ~expected:1;
376376+ let _, r_send = consume t in
377377+ check_int ~__POS__ ~expected:2 r_send;
378378+ let recv_buf = Cstruct.of_string "XX" in
379379+ let recv = Uring.Msghdr.create ~n_fds:2 [recv_buf] in
380380+ check_int ~__POS__ ~expected:0 (List.length (Uring.Msghdr.get_fds recv));
381381+ assert_some ~__POS__ (Uring.recv_msg t b recv `Recv);
382382+ check_int ~__POS__ (Uring.submit t) ~expected:1;
383383+ let _, r_recv = consume t in
384384+ check_int ~__POS__ ~expected:2 r_recv;
385385+ check_string ~__POS__ ~expected:"hi" (Cstruct.to_string recv_buf);
386386+ let r2, w2 =
387387+ match Uring.Msghdr.get_fds recv with
388388+ | [r2; w2] -> r2, w2
389389+ | _ -> failwith "Expected two FDs!"
390390+ in
391391+ check_int ~__POS__ ~expected:5 (Unix.write_substring w2 "to-w2" 0 5);
392392+ check_string ~__POS__ ~expected:"to-w2" (really_input_string (Unix.in_channel_of_descr r) 5);
393393+ check_int ~__POS__ ~expected:4 (Unix.write_substring w "to-w" 0 4);
394394+ check_string ~__POS__ ~expected:"to-w" (really_input_string (Unix.in_channel_of_descr r2) 4);
395395+ List.iter Unix.close [r; w; r2; w2]
396396+369397let () =
370398 Test_data.setup ();
371399 Random.self_init ();
···389417 tc "cancel" test_cancel;
390418 tc "cancel_late" test_cancel_late;
391419 tc "cancel_invalid" test_cancel_invalid;
420420+ tc "send_msg" test_send_msg;
392421 tc "free_busy" test_free_busy;
393422 ];
394423 ]