···11+/*
22+ * Copyright (C) 2020-2021 Anil Madhavapeddy
33+ *
44+ * Permission to use, copy, modify, and distribute this software for any
55+ * purpose with or without fee is hereby granted, provided that the above
66+ * copyright notice and this permission notice appear in all copies.
77+ *
88+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
99+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1010+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1111+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1212+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1313+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1414+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1515+ */
+7-1
README.md
···11-This is a work in progress, it may never work!
11+# ocaml-uring -- bindings to Linux io_uring
22+33+These are OCaml bindings for the Linux io_uring stack.
44+They are intended to eventually be used with the multicore OCaml stack,
55+but may also be useful for sequential code too.
66+77+- Status: work in progress, unreleased.
···11+(* cp(1) built with liburing. Queues up as many reads as the queue
22+ * depth allows and then queues up corresponding writes.
33+ OCaml version of https://unixism.net/loti/tutorial/cp_liburing.html *)
44+55+let get_file_size fd =
66+ Unix.handle_unix_error Unix.fstat fd |>
77+ fun {Unix.st_size; _} -> st_size
88+(* TODO make this work with ST_ISBLK *)
99+1010+type t = {
1111+ mutable insize: int;
1212+ mutable offset: int;
1313+ mutable reads: int;
1414+ mutable writes: int;
1515+ mutable write_left: int;
1616+ mutable read_left: int;
1717+ block_size: int;
1818+ infd: Unix.file_descr;
1919+ outfd: Unix.file_descr;
2020+}
2121+2222+let pp ppf {insize;offset;reads;writes;write_left; read_left;_} =
2323+ Fmt.pf ppf "insize %d offset %d reads %d writes %d rleft %d wleft %d"
2424+ insize offset reads writes read_left write_left
2525+2626+type req = {
2727+ op: [`R | `W ];
2828+ iov: Uring.Iovec.t;
2929+ len: int;
3030+ fileoff: int;
3131+ mutable off: int;
3232+ t : t;
3333+}
3434+3535+let pp_req ppf {op; len; off; fileoff; t; _ } =
3636+ Fmt.pf ppf "[%s fileoff %d len %d off %d] [%a]" (match op with |`R -> "r" |`W -> "w") fileoff len off pp t
3737+3838+let empty_req t = { op=`R; iov=Uring.Iovec.empty; len=0; off=0; fileoff=0; t}
3939+4040+(* Perform a complete read into bufs. *)
4141+let queue_read uring t len =
4242+ let ba = Uring.Iovec.alloc_buf len in
4343+ let iov = Uring.Iovec.alloc [|ba|] in
4444+ let req = { op=`R; iov; fileoff=t.offset; len; off=0; t } in
4545+ Fmt.epr "queue_read: %a\n%!" pp_req req;
4646+ Uring.readv uring ~offset:t.offset t.infd iov req;
4747+ t.offset <- t.offset + len;
4848+ t.read_left <- t.read_left - len;
4949+ t.reads <- t.reads + 1
5050+5151+(* TODO compile time check *)
5252+let eagain = -11
5353+let eintr = -4
5454+5555+(* Check that a read has completely finished, and if not
5656+ * queue it up for completing the remaining amount *)
5757+let handle_read_completion uring req res =
5858+ Fmt.epr "read_completion: res=%d %a\n%!" res pp_req req;
5959+ let bytes_to_read = req.len - req.off in
6060+ match res with
6161+ | 0 ->
6262+ Fmt.epr "eof %a\n%!" pp_req req
6363+ | n when n = eagain || n = eintr ->
6464+ (* requeue the request *)
6565+ Uring.readv ~offset:req.fileoff uring req.t.infd req.iov req;
6666+ Fmt.epr "requeued eintr read: %a\n%!" pp_req req
6767+ | n when n < 0 ->
6868+ raise (Failure ("unix errorno " ^ (string_of_int n)))
6969+ | n when n < bytes_to_read ->
7070+ (* handle short read so new iovec and resubmit *)
7171+ Uring.Iovec.advance req.iov ~idx:0 ~adj:n;
7272+ req.off <-req.off + n;
7373+ Uring.readv ~offset:req.off uring req.t.infd req.iov req;
7474+ Fmt.epr "requeued short read: %a\n%!" pp_req req
7575+ | n when n = bytes_to_read ->
7676+ (* Read is complete, all bytes are read, turn it into a write *)
7777+ req.t.reads <- req.t.reads - 1;
7878+ req.t.writes <- req.t.writes + 1;
7979+ (* reset the iovec *)
8080+ Uring.Iovec.advance req.iov ~idx:0 ~adj:(req.off * -1);
8181+ let req = { req with op=`W; off=0 } in
8282+ Uring.writev uring ~offset:req.fileoff req.t.outfd req.iov req;
8383+ Fmt.epr "queued write: %a\n%!" pp_req req
8484+ | n -> raise (Failure (Printf.sprintf "unexpected readv result %d > %d " bytes_to_read n))
8585+8686+let handle_write_completion uring req res =
8787+ Fmt.epr "write_completion: res=%d %a\n%!" res pp_req req;
8888+ let bytes_to_write = req.len - req.off in
8989+ match res with
9090+ | 0 -> raise End_of_file
9191+ | n when n = eagain || n = eintr ->
9292+ (* requeue the request *)
9393+ Uring.writev ~offset:req.fileoff uring req.t.infd req.iov req;
9494+ Fmt.epr "requeued eintr read: %a\n%!" pp_req req
9595+ | n when n < bytes_to_write ->
9696+ (* handle short write so new iovec and resubmit *)
9797+ Uring.Iovec.advance req.iov ~idx:0 ~adj:n;
9898+ req.off <-req.off + n;
9999+ Uring.writev ~offset:req.off uring req.t.infd req.iov req;
100100+ Fmt.epr "requeued write read: %a\n%!" pp_req req
101101+ | n when n = bytes_to_write ->
102102+ req.t.writes <- req.t.writes - 1;
103103+ req.t.write_left <- req.t.write_left - req.len;
104104+ Fmt.epr "write done: %a\n%!" pp_req req;
105105+ Uring.Iovec.free req.iov
106106+ | n -> raise (Failure (Printf.sprintf "unexpected writev result %d > %d " bytes_to_write n))
107107+108108+let handle_completion uring req res =
109109+ match req.op with
110110+ |`R -> handle_read_completion uring req res
111111+ |`W -> handle_write_completion uring req res
112112+113113+let copy_file uring t =
114114+ (* Create a set of read requests that we will turn into write requests
115115+ * up until the queue depth *)
116116+ while t.write_left > 0 || t.read_left > 0 do
117117+ let rec submit_reads () =
118118+ if t.read_left > 0 then begin
119119+ if t.reads + t.writes < (Uring.queue_depth uring) then begin
120120+ let size = min t.block_size t.read_left in
121121+ queue_read uring t size;
122122+ submit_reads ()
123123+ end
124124+ end;
125125+ in
126126+ submit_reads ();
127127+ let num = Uring.submit uring in
128128+ Fmt.(epr "%a: %d\n%!" (styled `Yellow string) "submit") num;
129129+ (* Queue now full, find at least one completion *)
130130+ let got_completion = ref false in
131131+ let rec handle_completions () =
132132+ if t.write_left > 0 then begin
133133+ let check_q = if !got_completion then Uring.peek uring else Uring.wait uring in
134134+ match check_q with
135135+ |None -> Fmt.epr "completions: retry so finishing loop\n%!"
136136+ |Some (req, res) ->
137137+ handle_completion uring req res;
138138+ got_completion := true;
139139+ handle_completions ();
140140+ end
141141+ in
142142+ handle_completions ();
143143+ let num = Uring.submit uring in
144144+ Fmt.(epr "%a: %d\n%!" (styled `Yellow string) "submit") num;
145145+ done
146146+147147+let () =
148148+ let infile = Sys.argv.(1) in
149149+ let outfile = Sys.argv.(2) in
150150+ let infd = Unix.(handle_unix_error (openfile infile [O_RDONLY]) 0) in
151151+ let outfd = Unix.(handle_unix_error (openfile outfile [O_WRONLY; O_CREAT; O_TRUNC]) 0o644) in
152152+ let insize = get_file_size infd in
153153+ let block_size = 32 * 1024 in
154154+ let queue_depth = 64 in
155155+ let t = { block_size; insize; offset=0; reads=0; writes=0; write_left=insize; read_left=insize; infd; outfd } in
156156+ Fmt.epr "\nstarting: %a bs=%d qd=%d\n%!" pp t block_size queue_depth;
157157+ let uring = Uring.create ~queue_depth ~default:(empty_req t) () in
158158+ copy_file uring t;
159159+ Unix.close infd;
160160+ Unix.close outfd;
161161+ Uring.exit uring
+16
iovec.ml
···11+(*
22+ * Copyright (C) 2020-2021 Anil Madhavapeddy
33+ *
44+ * Permission to use, copy, modify, and distribute this software for any
55+ * purpose with or without fee is hereby granted, provided that the above
66+ * copyright notice and this permission notice appear in all copies.
77+ *
88+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
99+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1010+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1111+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1212+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1313+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1414+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1515+ *)
1616+117type buf = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
218319type iovec
+16
iovec.mli
···11+(*
22+ * Copyright (C) 2020-2021 Anil Madhavapeddy
33+ *
44+ * Permission to use, copy, modify, and distribute this software for any
55+ * purpose with or without fee is hereby granted, provided that the above
66+ * copyright notice and this permission notice appear in all copies.
77+ *
88+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
99+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1010+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1111+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1212+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1313+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1414+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1515+ *)
1616+117type buf = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
218type iovec
319type t
···99(* TODO make this work with ST_ISBLK *)
10101111let get_completion_and_print uring =
1212- let iov, len = Uring.wait uring in
1212+ let iov, len = match Uring.wait uring with Some v -> v | None -> failwith "retry" in
1313 let bufs = Uring.Iovec.bufs iov in
1414 let remaining = ref len in
1515 Printf.eprintf "%d bytes read\n%!" len;
-103
urcp.ml
···11-(* cp(1) built with liburing. Queues up as many reads as the queue
22- * depth allows and then queues up corresponding writes.
33- OCaml version of https://unixism.net/loti/tutorial/cp_liburing.html *)
44-55-let queue_depth = 64
66-let block_size = 32*1024
77-88-let get_file_size fd =
99- Unix.handle_unix_error Unix.fstat fd |>
1010- fun {Unix.st_size; _} -> st_size
1111-(* TODO make this work with ST_ISBLK *)
1212-1313-type t = {
1414- mutable insize: int;
1515- mutable offset: int;
1616- mutable reads: int;
1717- mutable writes: int;
1818- mutable write_left: int;
1919- mutable read_left: int;
2020- infd: Unix.file_descr;
2121- outfd: Unix.file_descr;
2222-}
2323-2424-type req = {
2525- op: [`R | `W ];
2626- iov: Uring.Iovec.t;
2727- len: int;
2828- mutable off: int;
2929- t : t;
3030-}
3131-3232-let empty_req t = { op=`R; iov=Uring.Iovec.empty; len=0; off=0; t}
3333-3434-(* Perform a complete read into bufs. *)
3535-let queue_read uring t len =
3636- let ba = Uring.Iovec.alloc_buf block_size in
3737- let iov = Uring.Iovec.alloc [|ba|] in
3838- let req = { op=`R; iov; len; off=t.offset; t } in
3939- Uring.readv uring t.infd iov req;
4040- t.offset <- t.offset + len;
4141- t.read_left <- t.read_left + len;
4242- t.reads <- t.reads + 1
4343-4444-(* TODO compile time check *)
4545-let eagain = -11
4646-let eintr = -4
4747-4848-(* Check that a read has completely finished, and if not
4949- * queue it up for completing the remaining amount *)
5050-let handle_read_completion uring req res =
5151- let bytes_to_read = req.len - req.off in
5252- match res with
5353- | 0 -> raise End_of_file
5454- | n when n = eagain || n = eintr ->
5555- (* requeue the request *)
5656- Uring.readv ~offset:req.off uring req.t.infd req.iov req
5757- | n when n < 0 ->
5858- raise (Failure ("unix errorno " ^ (string_of_int n)))
5959- | n when n < bytes_to_read ->
6060- (* handle short read so new iovec and resubmit *)
6161- Uring.Iovec.advance req.iov ~idx:0 ~adj:n;
6262- req.off <-req.off + n;
6363- Uring.readv ~offset:req.off uring req.t.infd req.iov req
6464- | n when n = bytes_to_read ->
6565- (* Read is complete, all bytes are read, turn it into a write *)
6666- req.t.reads <- req.t.reads - 1;
6767- req.t.writes <- req.t.writes + 1;
6868- (* reset the iovec *)
6969- Uring.Iovec.advance req.iov ~idx:0 ~adj:(req.off * -1);
7070- let req = { req with op=`W; off=0 } in
7171- Uring.writev ~offset:0 uring req.t.outfd req.iov req
7272- | n -> raise (Failure ("unexpected readv result > len " ^ (string_of_int n)))
7373-7474-let copy_file uring t =
7575- (* Create a set of read requests that we will turn into write requests
7676- * up until the queue depth *)
7777- while t.write_left > 0 || t.read_left > 0 do
7878- let need_submit = ref false in
7979- let submit () =
8080- if t.read_left > 0 then begin
8181- if t.reads + t.writes < queue_depth then begin
8282- let size = min block_size t.read_left in
8383- queue_read uring t size;
8484- need_submit := true;
8585- end
8686- end;
8787- if !need_submit then
8888- let _ = Uring.submit uring in ()
8989-9090- in
9191- submit ()
9292- done
9393-9494-let () =
9595- let infile = Sys.argv.(1) in
9696- let outfile = Sys.argv.(2) in
9797- let infd = Unix.(handle_unix_error (openfile infile [O_RDONLY]) 0) in
9898- let outfd = Unix.(handle_unix_error (openfile outfile [O_WRONLY; O_CREAT; O_TRUNC]) 0o644) in
9999- let insize = get_file_size infd in
100100- let t = { insize; offset=0; reads=0; writes=0; write_left=insize; read_left=insize; infd; outfd } in
101101- let uring = Uring.create ~queue_depth ~default:(empty_req t) () in
102102- copy_file uring t
103103- (* TOD fd close and iouring exit *)
+49-17
uring.ml
···11+(*
22+ * Copyright (C) 2020-2021 Anil Madhavapeddy
33+ *
44+ * Permission to use, copy, modify, and distribute this software for any
55+ * purpose with or without fee is hereby granted, provided that the above
66+ * copyright notice and this permission notice appear in all copies.
77+ *
88+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
99+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1010+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1111+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1212+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1313+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1414+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1515+ *)
1616+117module Iovec = Iovec
218319type uring
···1127external uring_submit_writev : uring -> Unix.file_descr -> id -> Iovec.t -> int -> unit = "ocaml_uring_submit_writev"
12281329external uring_wait_cqe : uring -> id * int = "ocaml_uring_wait_cqe"
3030+external uring_peek_cqe : uring -> id * int = "ocaml_uring_peek_cqe"
3131+14321533type 'a t = {
1634 uring: uring;
1735 iobuf: Iovec.buf;
1836 mutable id_freelist: int list;
1937 user_data: 'a array;
3838+ queue_depth: int;
3939+ mutable dirty: bool; (* has outstanding requests that need to be submitted *)
2040}
21412242let default_iobuf_len = 1024 * 1024 (* 1MB *)
···2949 Gc.finalise uring_exit uring;
3050 let id_freelist = List.init queue_depth (fun i -> i) in
3151 let user_data = Array.init queue_depth (fun _ -> default) in
3232- { uring; iobuf; id_freelist; user_data }
5252+ { uring; iobuf; id_freelist; user_data; dirty=false; queue_depth }
5353+5454+let exit {uring;_} = uring_exit uring
33553456let get_id t =
3557 match t.id_freelist with
···4264let readv t ?(offset=0) fd iovec user_data =
4365 let id = get_id t in
4466 uring_submit_readv t.uring fd id iovec offset;
6767+ t.dirty <- true;
4568 t.user_data.(id) <- user_data
46694770let writev t ?(offset=0) fd iovec user_data =
4871 let id = get_id t in
4972 uring_submit_writev t.uring fd id iovec offset;
7373+ t.dirty <- true;
5074 t.user_data.(id) <- user_data
51755252-let submit {uring;_} =
5353- uring_submit uring
7676+let submit t =
7777+ if t.dirty then begin
7878+ t.dirty <- false;
7979+ uring_submit t.uring
8080+ end else
8181+ 0
54825555-let wait t =
5656- let id, res = uring_wait_cqe t.uring in
5757- let data = t.user_data.(id) in
5858- put_id t id;
5959- data, res
8383+(* TODO use unixsupport.h *)
8484+let errno_is_retry = function -11 | -4 -> true |_ -> false
8585+8686+let fn_on_ring fn t =
8787+ let id, res = fn t.uring in
8888+ match id, res with
8989+ | -1, res when errno_is_retry res ->
9090+ None
9191+ | -1, res when res < 0 ->
9292+ failwith ("wait error " ^ (string_of_int res))
9393+ (* TODO switch to unixsupport.h to raise Unix_error *)
9494+ | id, res ->
9595+ let data = t.user_data.(id) in
9696+ put_id t id;
9797+ Some (data, res)
60986161-(*
6262-external ring_queue_write_full : t -> Unix.file_descr -> (Bigstringaf.t -> int -> unit) -> Bigstringaf.t -> int -> unit = "ring_queue_write_full"
6363-external ring_queue_read : t -> Unix.file_descr -> (Bigstringaf.t -> int -> unit) -> Bigstringaf.t -> int -> unit = "ring_queue_read"
6464-external ring_queue_accept : t -> Unix.file_descr -> (Unix.file_descr -> unit) -> unit = "ring_queue_accept"
6565-external ring_queue_close : t -> Unix.file_descr -> unit = "ring_queue_close"
6666-external ring_submit : t -> int = "ring_submit"
6767-external ring_exit : t -> unit = "ring_exit"
6868-external ring_wait : t -> unit = "ring_wait"
6969-*)
9999+let peek t = fn_on_ring uring_peek_cqe t
100100+let wait t = fn_on_ring uring_wait_cqe t
101101+let queue_depth {queue_depth;_} = queue_depth
+20-1
uring.mli
···11+(*
22+ * Copyright (C) 2020-2021 Anil Madhavapeddy
33+ *
44+ * Permission to use, copy, modify, and distribute this software for any
55+ * purpose with or without fee is hereby granted, provided that the above
66+ * copyright notice and this permission notice appear in all copies.
77+ *
88+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
99+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1010+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1111+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1212+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1313+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1414+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1515+ *)
1616+117module Iovec = Iovec
218319type 'a t
420521val create : queue_depth:int -> default:'a -> unit -> 'a t
2222+val queue_depth : 'a t -> int
2323+val exit : 'a t -> unit
624725val readv : 'a t -> ?offset:int -> Unix.file_descr -> Iovec.t -> 'a -> unit
826val writev : 'a t -> ?offset:int -> Unix.file_descr -> Iovec.t -> 'a -> unit
927val submit : 'a t -> int
10281111-val wait : 'a t -> 'a * int
2929+val wait : 'a t -> ('a * int) option
3030+val peek : 'a t -> ('a * int) option
+31
uring.opam
···11+# This file is generated by dune, edit dune-project instead
22+opam-version: "2.0"
33+synopsis: "OCaml bindings for Linux io_uring"
44+description: "Bindings to the Linux io_uring kernel IO interfaces."
55+maintainer: ["anil@recoil.org"]
66+authors: ["Anil Madhavapeddy" "Sadiq Jaffer"]
77+license: "ISC"
88+homepage: "https://github.com/ocaml-multicore/ocaml-uring"
99+bug-reports: "https://github.com/ocaml-multicore/ocaml-uring/issues"
1010+depends: [
1111+ "dune" {>= "2.7"}
1212+ "fmt" {>= "0.8.4"}
1313+ "bigstringaf"
1414+ "alcotest" {with-test}
1515+ "odoc" {with-doc}
1616+]
1717+build: [
1818+ ["dune" "subst"] {dev}
1919+ [
2020+ "dune"
2121+ "build"
2222+ "-p"
2323+ name
2424+ "-j"
2525+ jobs
2626+ "@install"
2727+ "@runtest" {with-test}
2828+ "@doc" {with-doc}
2929+ ]
3030+]
3131+dev-repo: "git+https://github.com/ocaml-multicore/ocaml-uring.git"
+69-216
uring_stubs.c
···11+/*
22+ * Copyright (C) 2020-2021 Anil Madhavapeddy
33+ * Copyright (C) 2020-2021 Sadiq Jaffer
44+ *
55+ * Permission to use, copy, modify, and distribute this software for any
66+ * purpose with or without fee is hereby granted, provided that the above
77+ * copyright notice and this permission notice appear in all copies.
88+ *
99+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
1010+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1111+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1212+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1313+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1414+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1515+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1616+ */
1717+118#include <liburing.h>
219#include <caml/alloc.h>
320#include <caml/bigarray.h>
···724#include <caml/memory.h>
825#include <caml/mlvalues.h>
926#include <caml/signals.h>
2727+#include <caml/unixsupport.h>
1028#include <string.h>
11291212-#define Ring_val(v) *((struct io_uring**)Data_custom_val(v))
3030+#undef URING_DEBUG
3131+#ifdef URING_DEBUG
3232+#define dprintf(fmt, ...) fprintf(stderr, fmt, ##__VA_ARGS__)
3333+#else
3434+#define dprintf(fmt, ...) ((void)0)
3535+#endif
13361414-#define EVENT_TYPE_READ 0
1515-#define EVENT_TYPE_WRITE 1
1616-#define EVENT_TYPE_ACCEPT 2
1717-#define EVENT_TYPE_CLOSE 3
3737+#define Ring_val(v) *((struct io_uring**)Data_custom_val(v))
18381939static struct custom_operations ring_ops = {
2040 "uring.ring",
···5171 value ring_custom = caml_alloc_custom_mem(&ring_ops, sizeof(struct io_uring*), sizeof(struct io_uring));
5272 *((struct io_uring**)Data_custom_val(ring_custom)) = ring;
5373 CAMLreturn(ring_custom);
5454- } else {
5555- caml_failwith(strerror(-status));
5656- }
7474+ } else
7575+ unix_error(-status, "io_uring_queue_init", Nothing);
5776}
58775978// TODO also add an unregister ba
···6584 struct iovec iov[1];
6685 iov[0].iov_base = Caml_ba_data_val(v_ba);
6786 iov[0].iov_len = Caml_ba_array_val(v_ba)->dim[0];
6868- fprintf(stderr,"uring %p: registering iobuf base %p len %lu\n", ring, iov[0].iov_base, iov[0].iov_len);
8787+ dprintf("uring %p: registering iobuf base %p len %lu\n", ring, iov[0].iov_base, iov[0].iov_len);
6988 int ret = io_uring_register_buffers(ring, iov, 1);
7089 if (ret)
7171- caml_failwith(strerror(-ret));
9090+ unix_error(-ret, "io_uring_register_buffers", Nothing);
7291 CAMLreturn(Val_unit);
7392}
74937594value ocaml_uring_exit(value v_uring) {
7695 CAMLparam1(v_uring);
7796 struct io_uring *ring = Ring_val(v_uring);
7878- fprintf(stderr, "uring %p: exit\n", ring);
9797+ dprintf("uring %p: exit\n", ring);
7998 io_uring_queue_exit(ring);
8099 caml_stat_free(ring);
100100+ ring = NULL;
81101 CAMLreturn(Val_unit);
82102}
83103···90110 value v_ba = Field(v_ba_arr,i);
91111 iovs[i].iov_base = Caml_ba_data_val(v_ba);
92112 iovs[i].iov_len = Caml_ba_array_val(v_ba)->dim[0];
9393- fprintf(stderr, "iov %d: %p %lu\n", i, iovs[i].iov_base, iovs[i].iov_len);
113113+ dprintf( "iov %d: %p %lu\n", i, iovs[i].iov_base, iovs[i].iov_len);
94114 }
95115 if (((uintptr_t) iovs & 1) == 1) caml_failwith("unaligned alloc??");
96116 CAMLreturn ((value) iovs | 1);
···125145 struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
126146 if (!sqe)
127147 caml_failwith("unable to allocate SQE");
128128- fprintf(stderr, "submit_readv: %d ents off %d\n", len, Int_val(v_off));
148148+ dprintf("submit_readv: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int_val(v_off));
129149 io_uring_prep_readv(sqe, Int_val(v_fd), iovs, len, Int_val(v_off)); /* TODO add offset to intf */
130150 io_uring_sqe_set_data(sqe, (void *)(uintptr_t)Int_val(v_id)); /* TODO sort out cast */
131151 CAMLreturn(Val_unit);
···140160 struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
141161 if (!sqe)
142162 caml_failwith("unable to allocate SQE");
143143- fprintf(stderr, "submit_writev: %d ents off %d\n", len, Int_val(v_off));
163163+ dprintf("submit_writev: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int_val(v_off));
144164 io_uring_prep_writev(sqe, Int_val(v_fd), iovs, len, Int_val(v_off)); /* TODO add offset to intf */
145165 io_uring_sqe_set_data(sqe, (void *)(uintptr_t)Int_val(v_id)); /* TODO sort out cast */
146166 CAMLreturn(Val_unit);
···161181 long id;
162182 struct io_uring *ring = Ring_val(v_uring);
163183 struct io_uring_cqe *cqe;
164164- fprintf(stderr, "cqe: waiting\n");
165165- io_uring_wait_cqe(ring, &cqe);
166166- if (cqe->res < 0)
167167- caml_failwith(strerror(-cqe->res));
168168- fprintf(stderr, "cqe %p: res=%d\n", cqe, cqe->res);
169169- id = (long)io_uring_cqe_get_data(cqe);
170170- io_uring_cqe_seen(ring, cqe);
171171- v_ret = caml_alloc(2, 0);
172172- Store_field(v_ret, 0, Val_int(id));
173173- Store_field(v_ret, 1, Val_int(cqe->res));
184184+ int res;
185185+ dprintf("cqe: waiting\n");
186186+ res = io_uring_wait_cqe(ring, &cqe);
187187+ if (res < 0) {
188188+ v_ret = caml_alloc(2, 0);
189189+ Store_field(v_ret, 0, Val_int(-1));
190190+ Store_field(v_ret, 1, Val_int(res));
191191+ } else {
192192+ id = (long)io_uring_cqe_get_data(cqe);
193193+ io_uring_cqe_seen(ring, cqe);
194194+ v_ret = caml_alloc(2, 0);
195195+ Store_field(v_ret, 0, Val_int(id));
196196+ Store_field(v_ret, 1, Val_int(cqe->res));
197197+ }
174198 CAMLreturn(v_ret);
175199}
176176-#if 0
177200178178-void ring_queue_write_full(value ring_custom, value fd, value callback, value buffer_bigarray, value nbytes) {
179179- CAMLparam5(ring_custom, fd, callback, buffer_bigarray, nbytes);
180180-181181- struct io_uring* ring = Ring_val(ring_custom);
182182- struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
183183-184184- char* buf = (char*)Caml_ba_data_val(buffer_bigarray);
185185-186186- struct request* req = (struct request*)caml_stat_alloc(sizeof(struct request));
187187-188188- req->fd = Int_val(fd);
189189- req->write_length = Long_val(nbytes);
190190- req->written_length = 0;
191191- req->iov.iov_base = buf;
192192- req->iov.iov_len = req->write_length;
193193-194194- io_uring_prep_writev(sqe, req->fd, &req->iov, 1, 0);
195195-196196- req->event_type = EVENT_TYPE_WRITE;
197197- req->callback = callback;
198198- req->buffer = buffer_bigarray;
199199-200200- caml_register_generational_global_root(&req->buffer);
201201- caml_register_generational_global_root(&req->callback);
202202-203203- io_uring_sqe_set_data(sqe, req);
204204-205205- CAMLreturn0;
206206-}
207207-208208-// For now we use readv instead because it's available in 5.1
209209-void ring_queue_read(value ring_custom, value fd, value callback, value buffer_bigarray, value offset) {
210210- CAMLparam5(ring_custom, fd, callback, buffer_bigarray, offset);
211211-212212- struct io_uring* ring = Ring_val(ring_custom);
213213- struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
214214-215215- char* buf = (char*)Caml_ba_data_val(buffer_bigarray);
216216- size_t buf_len = Caml_ba_array_val(buffer_bigarray)->dim[0];
217217-218218- printf("buf_len: %ld\n", buf_len);
219219-220220- struct request* req = (struct request*)caml_stat_alloc(sizeof(struct request));
221221-222222- req->fd = Int_val(fd);
223223- req->iov.iov_base = buf;
224224- req->iov.iov_len = buf_len;
225225-226226- io_uring_prep_readv(sqe, req->fd, &req->iov, 1, Long_val(offset));
227227-228228- req->event_type = EVENT_TYPE_READ;
229229- req->callback = callback;
230230- req->buffer = buffer_bigarray;
231231-232232- caml_register_generational_global_root(&req->buffer);
233233- caml_register_generational_global_root(&req->callback);
234234-235235- io_uring_sqe_set_data(sqe, req);
236236-237237- CAMLreturn0;
238238-}
239239-240240-241241-void ring_queue_close(value ring_custom, value fd) {
242242- CAMLparam2(ring_custom, fd);
243243-244244- struct io_uring* ring = Ring_val(ring_custom);
245245- struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
246246-247247- io_uring_prep_close(sqe, Int_val(fd));
248248-249249- struct request* req = (struct request*)caml_stat_alloc(sizeof(struct request));
250250-251251- req->event_type = EVENT_TYPE_CLOSE;
252252-253253- io_uring_sqe_set_data(sqe, req);
254254-255255- CAMLreturn0;
256256-}
257257-258258-void ring_queue_accept(value ring_custom, value fd, value callback) {
259259- CAMLparam3(ring_custom, fd, callback);
260260-261261- struct io_uring* ring = Ring_val(ring_custom);
262262- struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
263263-264264- struct request* req = (struct request*)caml_stat_alloc(sizeof(struct request));
265265- req->sockaddr = (struct sockaddr*)caml_stat_alloc(sizeof(struct sockaddr));
266266- req->socklen = sizeof(struct sockaddr);
267267-268268- io_uring_prep_accept(sqe, Int_val(fd), req->sockaddr, &req->socklen, 0);
269269-270270- req->event_type = EVENT_TYPE_ACCEPT;
271271- req->callback = callback;
272272-273273- caml_register_generational_global_root(&req->caml_callback);
274274-275275- io_uring_sqe_set_data(sqe, req);
276276-277277- CAMLreturn0;
278278-}
279279-280280-void ring_wait(value ring_custom) {
281281- CAMLparam1(ring_custom);
282282-283283- struct io_uring* ring = Ring_val(ring_custom);
284284- struct io_uring_cqe *cqe;
285285-286286- caml_enter_blocking_section();
287287- int ret = io_uring_wait_cqe(ring, &cqe);
288288- caml_leave_blocking_section();
289289-290290- printf("got event! ret: %d, cqe->res: %d\n", ret, cqe->res);
291291-292292- if( ret < 0 ) {
293293- caml_failwith(strerror(-ret));
294294- }
295295-296296- struct request* req = io_uring_cqe_get_data(cqe);
297297-298298- if( cqe->res < 0 ) {
299299- caml_failwith(strerror(-cqe->res));
300300- }
301301-302302- int cleanup_req = 0;
303303-304304- if( req->event_type == EVENT_TYPE_READ || req->event_type == EVENT_TYPE_WRITE ) {
305305- switch( req->event_type ) {
306306- case EVENT_TYPE_READ:
307307- caml_callback2(req->callback, req->buffer, Val_long(cqe->res));
308308-309309- cleanup_req = 1;
310310- break;
311311- case EVENT_TYPE_WRITE:
312312- /* check we actually wrote the full length we tried to write */
313313- if(cqe->res + req->written_length == req->write_length) {
314314- /* call the callback if it exists */
315315- if( Is_block(req->callback) ) {
316316- caml_callback2(req->callback, req->buffer, req->write_length);
317317- }
318318-319319- cleanup_req = 1;
320320- } else {
321321- // Here we wrote less than the amount we requested
322322-323323- // Store how much we wrote
324324- req->written_length += cqe->res;
325325-326326- // Now we queue up a new write
327327- req->iov.iov_base = req->iov.iov_base + cqe->res;
328328- req->iov.iov_len = req->write_length - req->written_length;
329329-330330- struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
331331-332332- io_uring_prep_writev(sqe, req->fd, &req->iov, 1, 0);
333333- }
334334- }
335335-336336- if( cleanup_req ) {
337337- caml_remove_generational_global_root(&req->callback);
338338- caml_remove_generational_global_root(&req->buffer);
339339- }
340340- }
341341- else if( req->event_type == EVENT_TYPE_ACCEPT ) {
342342- caml_callback(req->callback, Val_int(cqe->res));
343343-344344- free(req->sockaddr);
345345-346346- caml_remove_generational_global_root(&req->callback);
347347- }
348348-201201+value ocaml_uring_peek_cqe(value v_uring)
202202+{
203203+ CAMLparam1(v_uring);
204204+ CAMLlocal1(v_ret);
205205+ long id;
206206+ struct io_uring *ring = Ring_val(v_uring);
207207+ struct io_uring_cqe *cqe;
208208+ int res;
209209+ dprintf("cqe: peeking\n");
210210+ res = io_uring_peek_cqe(ring, &cqe);
211211+ if (res < 0) {
212212+ v_ret = caml_alloc(2, 0);
213213+ Store_field(v_ret, 0, Val_int(-1));
214214+ Store_field(v_ret, 1, Val_int(res));
215215+ } else {
216216+ id = (long)io_uring_cqe_get_data(cqe);
349217 io_uring_cqe_seen(ring, cqe);
350350-351351- if( cleanup_req ) {
352352- caml_stat_free(req);
353353- }
354354-355355- CAMLreturn0;
218218+ v_ret = caml_alloc(2, 0);
219219+ Store_field(v_ret, 0, Val_int(id));
220220+ Store_field(v_ret, 1, Val_int(cqe->res));
221221+ }
222222+ CAMLreturn(v_ret);
356223}
357357-358358-value ring_submit(value ring_custom) {
359359- CAMLparam1(ring_custom);
360360-361361- struct io_uring* ring = Ring_val(ring_custom);
362362-363363- int submitted = io_uring_submit(ring);
364364-365365- printf("submitted: %d\n", submitted);
366366-367367- CAMLreturn(Val_int(submitted));
368368-}
369369-370370-#endif
+7-1
uring_test.ml
tests/basic_file_read.ml
···77 Uring.readv t fd iov ();
88 let res = Uring.submit t in
99 Printf.eprintf "submitted %d\n%!" res;
1010- let (), res = Uring.wait t in
1010+ let (), res =
1111+ let rec retry () =
1212+ match Uring.wait t with
1313+ | None -> retry ()
1414+ | Some v -> v
1515+ in retry ()
1616+ in
1117 Uring.Iovec.free iov;
1218 Printf.eprintf "res %d\n%!" res;
1319 Printf.eprintf "%s -- %s\n%!" (Bigstringaf.to_string b1) (Bigstringaf.to_string b2);