The unpac monorepo manager self-hosting as a monorepo using unpac
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

updated interfaces

+121 -62
+1 -2
dune
··· 3 3 (library 4 4 (name uring) 5 5 (public_name uring) 6 - (libraries bigstringaf) 7 - (modules uring) 6 + (modules iovec uring) 8 7 (foreign_archives uring) 9 8 (foreign_stubs 10 9 (language c)
+21
iovec.ml
··· 1 + type buf = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t 2 + 3 + type iovec 4 + type t = iovec * buf array 5 + external alloc_iovec : buf array -> iovec = "ocaml_uring_alloc_iovecs" 6 + external free_iovec : iovec -> unit = "ocaml_uring_free_iovecs" 7 + 8 + let alloc_buf len = 9 + Bigarray.(Array1.create char c_layout len) 10 + 11 + let alloc bufs : t = 12 + let v = alloc_iovec bufs in 13 + v, bufs 14 + 15 + let free (iov,_) = free_iovec iov 16 + 17 + let nr_vecs (_,bufs) = Array.length bufs 18 + 19 + let bufs t = snd t 20 + 21 + let empty = alloc [||]
+10
iovec.mli
··· 1 + type buf = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t 2 + type iovec 3 + type t 4 + 5 + val alloc : buf array -> t 6 + val alloc_buf : int -> buf 7 + val free : t -> unit 8 + val nr_vecs : t -> int 9 + val bufs : t -> buf array 10 + val empty : t
+6 -5
urcat.ml
··· 9 9 (* TODO make this work with ST_ISBLK *) 10 10 11 11 let get_completion_and_print uring = 12 - let bufs, len = Uring.wait_cqe uring in 12 + let iov, len = Uring.wait uring in 13 + let bufs = Uring.Iovec.bufs iov in 13 14 let remaining = ref len in 14 15 Printf.eprintf "%d bytes read\n%!" len; 15 16 Array.iter (fun buf -> ··· 29 30 let fd = Unix.(handle_unix_error (openfile fname [O_RDONLY]) 0) in 30 31 let file_sz = get_file_size fd in 31 32 let blocks = if file_sz mod block_size <> 0 then (file_sz / block_size)+1 else file_sz/block_size in 32 - let bufs = Array.init blocks (fun _ -> Uring.iobuf_alloc block_size) in 33 - Uring.submit_readv uring fd bufs; 33 + let bufs = Array.init blocks (fun _ -> Uring.Iovec.alloc_buf block_size) in 34 + let iov = Uring.Iovec.alloc bufs in 35 + Uring.submit_readv uring fd iov (iov :> Uring.Iovec.t); 34 36 let numreq = Uring.submit uring in 35 37 assert(numreq=1); 36 38 () 37 39 38 40 let () = 39 41 let fname = Sys.argv.(1) in 40 - let uring = Uring.create ~queue_depth:1 () in 42 + let uring = Uring.create ~queue_depth:1 ~default:Uring.Iovec.empty () in 41 43 submit_read_request fname uring; 42 44 get_completion_and_print uring 43 -
+38 -32
uring.ml
··· 1 + module Iovec = Iovec 2 + 1 3 type uring 2 - 3 - type iobuf = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t 4 - 5 - let iobuf_alloc len = Bigarray.(Array1.create char c_layout len) 6 - 7 - (** [ring_setup i] allocates a new io_uring with queue depth [i]. 8 - @raise [Failure] *) 9 4 external uring_create : int -> uring = "ocaml_uring_setup" 10 5 external uring_exit : uring -> unit = "ocaml_uring_exit" 11 6 (* external uring_register_bigarray : uring -> iobuf -> unit = "ocaml_uring_register_ba" *) 12 7 external uring_submit : uring -> int = "ocaml_uring_submit" 13 8 14 - type iovecs 15 - external uring_alloc_iovecs : iobuf array -> iovecs = "ocaml_uring_alloc_iovecs" 16 - external uring_free_iovecs : iovecs -> unit = "ocaml_uring_free_iovecs" 9 + type id = int 10 + external uring_submit_readv : uring -> Unix.file_descr -> id -> Iovec.t -> int64 -> unit = "ocaml_uring_submit_readv" 11 + external uring_submit_writev : uring -> Unix.file_descr -> id -> Iovec.t -> int64 -> unit = "ocaml_uring_submit_writev" 17 12 18 - external uring_submit_readv : uring -> Unix.file_descr -> iovecs -> int -> unit = "ocaml_uring_submit_readv" 19 - external uring_wait_cqe : uring -> iovecs * int = "ocaml_uring_wait_cqe" 13 + external uring_wait_cqe : uring -> id * int = "ocaml_uring_wait_cqe" 20 14 21 - type t = { 15 + type 'a t = { 22 16 uring: uring; 23 - iobuf: iobuf; 24 - pending: (iovecs, iobuf array) Hashtbl.t; 17 + iobuf: Iovec.buf; 18 + mutable id_freelist: int list; 19 + user_data: 'a array; 25 20 } 26 21 27 22 let default_iobuf_len = 1024 * 1024 (* 1MB *) 28 23 29 - let create ~queue_depth () = 24 + let create ~queue_depth ~default () = 30 25 let uring = uring_create queue_depth in 31 26 (* TODO posix memalign this to page *) 32 - let iobuf = Bigarray.(Array1.create char c_layout default_iobuf_len) in 27 + let iobuf = Iovec.alloc_buf default_iobuf_len in 33 28 (* uring_register_bigarray uring iobuf; *) 34 29 Gc.finalise uring_exit uring; 35 - let pending = Hashtbl.create 1 in 36 - { uring; iobuf; pending } 30 + let id_freelist = List.init queue_depth (fun i -> i) in 31 + let user_data = Array.init queue_depth (fun _ -> default) in 32 + { uring; iobuf; id_freelist; user_data } 37 33 38 - let submit_readv {uring;pending;_} fd bufs = 39 - let len = Array.length bufs in 40 - let iovs = uring_alloc_iovecs bufs in 41 - uring_submit_readv uring fd iovs len; 42 - Hashtbl.add pending iovs bufs; 43 - () 34 + let get_id t = 35 + match t.id_freelist with 36 + | [] -> raise Not_found 37 + | hd::tl -> t.id_freelist <- tl; hd 38 + 39 + let put_id t v = 40 + t.id_freelist <- v :: t.id_freelist 41 + 42 + let submit_readv t fd iovec user_data = 43 + let id = get_id t in 44 + uring_submit_readv t.uring fd id iovec 0L; 45 + t.user_data.(id) <- user_data 46 + 47 + let submit_writev t fd iovec user_data = 48 + let id = get_id t in 49 + uring_submit_writev t.uring fd id iovec 0L; 50 + t.user_data.(id) <- user_data 44 51 45 52 let submit {uring;_} = 46 53 uring_submit uring 47 54 48 - let wait_cqe {uring;pending;_} = 49 - let iovecs, len = uring_wait_cqe uring in 50 - let bas = Hashtbl.find pending iovecs in 51 - uring_free_iovecs iovecs; 52 - bas, len 53 - 55 + let wait t = 56 + let id, res = uring_wait_cqe t.uring in 57 + let data = t.user_data.(id) in 58 + put_id t id; 59 + data, res 54 60 55 61 (* 56 62 external ring_queue_write_full : t -> Unix.file_descr -> (Bigstringaf.t -> int -> unit) -> Bigstringaf.t -> int -> unit = "ring_queue_write_full" ··· 60 66 external ring_submit : t -> int = "ring_submit" 61 67 external ring_exit : t -> unit = "ring_exit" 62 68 external ring_wait : t -> unit = "ring_wait" 63 - *) 69 + *)
+9 -7
uring.mli
··· 1 + module Iovec = Iovec 1 2 2 - type t 3 - type iobuf = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t 3 + type 'a t 4 4 5 - val iobuf_alloc : int -> iobuf 6 - val create : queue_depth:int -> unit -> t 7 - val submit_readv : t -> Unix.file_descr -> iobuf array -> unit 8 - val submit : t -> int 9 - val wait_cqe : t -> iobuf array * int 5 + val create : queue_depth:int -> default:'a -> unit -> 'a t 6 + 7 + val submit_readv : 'a t -> Unix.file_descr -> Iovec.t -> 'a -> unit 8 + val submit_writev : 'a t -> Unix.file_descr -> Iovec.t -> 'a -> unit 9 + val submit : 'a t -> int 10 + 11 + val wait : 'a t -> 'a * int
+28 -10
uring_stubs.c
··· 61 61 value ocaml_uring_register_ba(value v_uring, value v_ba) { 62 62 CAMLparam2(v_uring, v_ba); 63 63 struct io_uring *ring = Ring_val(v_uring); 64 + // TODO broken needs malloc 64 65 struct iovec iov[1]; 65 66 iov[0].iov_base = Caml_ba_data_val(v_ba); 66 67 iov[0].iov_len = Caml_ba_array_val(v_ba)->dim[0]; ··· 105 106 } 106 107 107 108 value 108 - ocaml_uring_submit_readv(value v_uring, value v_fd, value iovecs, value v_len) { 109 + ocaml_uring_submit_readv(value v_uring, value v_fd, value v_id, value v_iov, value v_off) { 109 110 CAMLparam1(v_uring); 110 111 struct io_uring *ring = Ring_val(v_uring); 111 - struct iovec *iovs = (struct iovec *) (iovecs & ~1); 112 + struct iovec *iovs = (struct iovec *) (Field(v_iov, 0) & ~1); 113 + int len = Wosize_val(Field(v_iov, 1)); 112 114 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 113 115 if (!sqe) 114 116 caml_failwith("unable to allocate SQE"); 115 - io_uring_prep_readv(sqe, Int_val(v_fd), iovs, Int_val(v_len), 0); 116 - io_uring_sqe_set_data(sqe, iovs); 117 + fprintf(stderr, "submit_readv: %d ents off %lu\n", len, Int64_val(v_off)); 118 + io_uring_prep_readv(sqe, Int_val(v_fd), iovs, len, Int64_val(v_off)); /* TODO add offset to intf */ 119 + io_uring_sqe_set_data(sqe, (void *)(uintptr_t)Int_val(v_id)); /* TODO sort out cast */ 120 + CAMLreturn(Val_unit); 121 + } 122 + 123 + value 124 + ocaml_uring_submit_writev(value v_uring, value v_fd, value v_id, value v_iov, value v_off) { 125 + CAMLparam1(v_uring); 126 + struct io_uring *ring = Ring_val(v_uring); 127 + struct iovec *iovs = (struct iovec *) (Field(v_iov, 0) & ~1); 128 + int len = Wosize_val(Field(v_iov, 1)); 129 + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 130 + if (!sqe) 131 + caml_failwith("unable to allocate SQE"); 132 + fprintf(stderr, "submit_writev: %d ents off %lu\n", len, Int64_val(v_off)); 133 + io_uring_prep_writev(sqe, Int_val(v_fd), iovs, len, Int64_val(v_off)); /* TODO add offset to intf */ 134 + io_uring_sqe_set_data(sqe, (void *)(uintptr_t)Int_val(v_id)); /* TODO sort out cast */ 117 135 CAMLreturn(Val_unit); 118 136 } 119 137 ··· 129 147 { 130 148 CAMLparam1(v_uring); 131 149 CAMLlocal1(v_ret); 150 + long id; 132 151 struct io_uring *ring = Ring_val(v_uring); 133 152 struct io_uring_cqe *cqe; 134 153 fprintf(stderr, "cqe: waiting\n"); 135 154 io_uring_wait_cqe(ring, &cqe); 136 - fprintf(stderr, "cqe: %p res = %d\n", cqe, cqe->res); 137 155 if (cqe->res < 0) 138 156 caml_failwith(strerror(-cqe->res)); 139 - fprintf(stderr, "ceq %p: res=%d\n", cqe, cqe->res); 140 - struct iovec *iovs = io_uring_cqe_get_data(cqe); 157 + fprintf(stderr, "cqe %p: res=%d\n", cqe, cqe->res); 158 + id = (long)io_uring_cqe_get_data(cqe); 141 159 io_uring_cqe_seen(ring, cqe); 142 - v_ret = caml_alloc(3, 0); 143 - Store_field(v_ret, 0, (value)iovs | 1); 160 + v_ret = caml_alloc(2, 0); 161 + Store_field(v_ret, 0, Val_int(id)); 144 162 Store_field(v_ret, 1, Val_int(cqe->res)); 145 163 CAMLreturn(v_ret); 146 164 } ··· 338 356 CAMLreturn(Val_int(submitted)); 339 357 } 340 358 341 - #endif 359 + #endif
+8 -6
uring_test.ml
··· 1 1 let () = 2 - let t = Uring.create ~queue_depth:1 () in 2 + let t = Uring.create ~queue_depth:1 ~default:() () in 3 3 let fd = Unix.(handle_unix_error (openfile "test.txt" [O_RDONLY]) 0) in 4 - let b1 = Uring.iobuf_alloc 3 in 5 - let b2 = Uring.iobuf_alloc 5 in 6 - Uring.submit_readv t fd [|b1;b2|]; 4 + let b1 = Uring.Iovec.alloc_buf 3 in 5 + let b2 = Uring.Iovec.alloc_buf 7 in 6 + let iov = Uring.Iovec.alloc [|b1;b2|] in 7 + Uring.submit_readv t fd iov (); 7 8 let res = Uring.submit t in 8 9 Printf.eprintf "submitted %d\n%!" res; 9 - let _,res = Uring.wait_cqe t in 10 + let (), res = Uring.wait t in 11 + Uring.Iovec.free iov; 10 12 Printf.eprintf "res %d\n%!" res; 11 13 Printf.eprintf "%s -- %s\n%!" (Bigstringaf.to_string b1) (Bigstringaf.to_string b2); 12 14 () ··· 49 51 Printf.printf "Go connection!\n%!"; 50 52 client_fd := Some(new_client_fd); 51 53 server_loop () 52 - *) 54 + *)