The unpac monorepo manager self-hosting as a monorepo using unpac
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more wip on urcp

+138 -15
+1 -1
.gitignore
··· 1 1 .merlin 2 2 .vscode 3 3 _build 4 - .*.swp 4 + .*.sw?
+4
dune
··· 23 23 (modules urcat) 24 24 (libraries bigstringaf unix uring)) 25 25 26 + (executable 27 + (name urcp) 28 + (modules urcp) 29 + (libraries bigstringaf unix uring)) 26 30 27 31 (rule 28 32 (deps
+4
iovec.ml
··· 4 4 type t = iovec * buf array 5 5 external alloc_iovec : buf array -> iovec = "ocaml_uring_alloc_iovecs" 6 6 external free_iovec : iovec -> unit = "ocaml_uring_free_iovecs" 7 + external adjust_iovec : iovec -> int -> int -> unit = "ocaml_iovec_advance_offset" 7 8 8 9 let alloc_buf len = 9 10 Bigarray.(Array1.create char c_layout len) ··· 11 12 let alloc bufs : t = 12 13 let v = alloc_iovec bufs in 13 14 v, bufs 15 + 16 + let advance (iovec,_) ~idx ~adj = 17 + adjust_iovec iovec idx adj 14 18 15 19 let free (iov,_) = free_iovec iov 16 20
+1
iovec.mli
··· 8 8 val nr_vecs : t -> int 9 9 val bufs : t -> buf array 10 10 val empty : t 11 + val advance : t -> idx:int -> adj:int -> unit
+1 -1
urcat.ml
··· 32 32 let blocks = if file_sz mod block_size <> 0 then (file_sz / block_size)+1 else file_sz/block_size in 33 33 let bufs = Array.init blocks (fun _ -> Uring.Iovec.alloc_buf block_size) in 34 34 let iov = Uring.Iovec.alloc bufs in 35 - Uring.submit_readv uring fd iov (iov :> Uring.Iovec.t); 35 + Uring.readv uring fd iov (iov :> Uring.Iovec.t); 36 36 let numreq = Uring.submit uring in 37 37 assert(numreq=1); 38 38 ()
+103
urcp.ml
··· 1 + (* cp(1) built with liburing. Queues up as many reads as the queue 2 + * depth allows and then queues up corresponding writes. 3 + OCaml version of https://unixism.net/loti/tutorial/cp_liburing.html *) 4 + 5 + let queue_depth = 64 6 + let block_size = 32*1024 7 + 8 + let get_file_size fd = 9 + Unix.handle_unix_error Unix.fstat fd |> 10 + fun {Unix.st_size; _} -> st_size 11 + (* TODO make this work with ST_ISBLK *) 12 + 13 + type t = { 14 + mutable insize: int; 15 + mutable offset: int; 16 + mutable reads: int; 17 + mutable writes: int; 18 + mutable write_left: int; 19 + mutable read_left: int; 20 + infd: Unix.file_descr; 21 + outfd: Unix.file_descr; 22 + } 23 + 24 + type req = { 25 + op: [`R | `W ]; 26 + iov: Uring.Iovec.t; 27 + len: int; 28 + mutable off: int; 29 + t : t; 30 + } 31 + 32 + let empty_req t = { op=`R; iov=Uring.Iovec.empty; len=0; off=0; t} 33 + 34 + (* Perform a complete read into bufs. *) 35 + let queue_read uring t len = 36 + let ba = Uring.Iovec.alloc_buf block_size in 37 + let iov = Uring.Iovec.alloc [|ba|] in 38 + let req = { op=`R; iov; len; off=t.offset; t } in 39 + Uring.readv uring t.infd iov req; 40 + t.offset <- t.offset + len; 41 + t.read_left <- t.read_left + len; 42 + t.reads <- t.reads + 1 43 + 44 + (* TODO compile time check *) 45 + let eagain = -11 46 + let eintr = -4 47 + 48 + (* Check that a read has completely finished, and if not 49 + * queue it up for completing the remaining amount *) 50 + let handle_read_completion uring req res = 51 + let bytes_to_read = req.len - req.off in 52 + match res with 53 + | 0 -> raise End_of_file 54 + | n when n = eagain || n = eintr -> 55 + (* requeue the request *) 56 + Uring.readv ~offset:req.off uring req.t.infd req.iov req 57 + | n when n < 0 -> 58 + raise (Failure ("unix errorno " ^ (string_of_int n))) 59 + | n when n < bytes_to_read -> 60 + (* handle short read so new iovec and resubmit *) 61 + Uring.Iovec.advance req.iov ~idx:0 ~adj:n; 62 + req.off <-req.off + n; 63 + Uring.readv ~offset:req.off uring req.t.infd req.iov req 64 + | n when n = bytes_to_read -> 65 + (* Read is complete, all bytes are read, turn it into a write *) 66 + req.t.reads <- req.t.reads - 1; 67 + req.t.writes <- req.t.writes + 1; 68 + (* reset the iovec *) 69 + Uring.Iovec.advance req.iov ~idx:0 ~adj:(req.off * -1); 70 + let req = { req with op=`W; off=0 } in 71 + Uring.writev ~offset:0 uring req.t.outfd req.iov req 72 + | n -> raise (Failure ("unexpected readv result > len " ^ (string_of_int n))) 73 + 74 + let copy_file uring t = 75 + (* Create a set of read requests that we will turn into write requests 76 + * up until the queue depth *) 77 + while t.write_left > 0 || t.read_left > 0 do 78 + let need_submit = ref false in 79 + let submit () = 80 + if t.read_left > 0 then begin 81 + if t.reads + t.writes < queue_depth then begin 82 + let size = min block_size t.read_left in 83 + queue_read uring t size; 84 + need_submit := true; 85 + end 86 + end; 87 + if !need_submit then 88 + let _ = Uring.submit uring in () 89 + 90 + in 91 + submit () 92 + done 93 + 94 + let () = 95 + let infile = Sys.argv.(1) in 96 + let outfile = Sys.argv.(2) in 97 + let infd = Unix.(handle_unix_error (openfile infile [O_RDONLY]) 0) in 98 + let outfd = Unix.(handle_unix_error (openfile outfile [O_WRONLY; O_CREAT; O_TRUNC]) 0o644) in 99 + let insize = get_file_size infd in 100 + let t = { insize; offset=0; reads=0; writes=0; write_left=insize; read_left=insize; infd; outfd } in 101 + let uring = Uring.create ~queue_depth ~default:(empty_req t) () in 102 + copy_file uring t 103 + (* TOD fd close and iouring exit *)
+6 -6
uring.ml
··· 7 7 external uring_submit : uring -> int = "ocaml_uring_submit" 8 8 9 9 type id = int 10 - external uring_submit_readv : uring -> Unix.file_descr -> id -> Iovec.t -> int64 -> unit = "ocaml_uring_submit_readv" 11 - external uring_submit_writev : uring -> Unix.file_descr -> id -> Iovec.t -> int64 -> unit = "ocaml_uring_submit_writev" 10 + external uring_submit_readv : uring -> Unix.file_descr -> id -> Iovec.t -> int -> unit = "ocaml_uring_submit_readv" 11 + external uring_submit_writev : uring -> Unix.file_descr -> id -> Iovec.t -> int -> unit = "ocaml_uring_submit_writev" 12 12 13 13 external uring_wait_cqe : uring -> id * int = "ocaml_uring_wait_cqe" 14 14 ··· 39 39 let put_id t v = 40 40 t.id_freelist <- v :: t.id_freelist 41 41 42 - let submit_readv t fd iovec user_data = 42 + let readv t ?(offset=0) fd iovec user_data = 43 43 let id = get_id t in 44 - uring_submit_readv t.uring fd id iovec 0L; 44 + uring_submit_readv t.uring fd id iovec offset; 45 45 t.user_data.(id) <- user_data 46 46 47 - let submit_writev t fd iovec user_data = 47 + let writev t ?(offset=0) fd iovec user_data = 48 48 let id = get_id t in 49 - uring_submit_writev t.uring fd id iovec 0L; 49 + uring_submit_writev t.uring fd id iovec offset; 50 50 t.user_data.(id) <- user_data 51 51 52 52 let submit {uring;_} =
+2 -2
uring.mli
··· 4 4 5 5 val create : queue_depth:int -> default:'a -> unit -> 'a t 6 6 7 - val submit_readv : 'a t -> Unix.file_descr -> Iovec.t -> 'a -> unit 8 - val submit_writev : 'a t -> Unix.file_descr -> Iovec.t -> 'a -> unit 7 + val readv : 'a t -> ?offset:int -> Unix.file_descr -> Iovec.t -> 'a -> unit 8 + val writev : 'a t -> ?offset:int -> Unix.file_descr -> Iovec.t -> 'a -> unit 9 9 val submit : 'a t -> int 10 10 11 11 val wait : 'a t -> 'a * int
+15 -4
uring_stubs.c
··· 97 97 } 98 98 99 99 value 100 + ocaml_iovec_advance_offset(value v_iovecs, value v_idx, value v_adj) 101 + { 102 + struct iovec *iovs = (struct iovec *) (v_iovecs & ~1); 103 + int idx = Int_val(v_idx); 104 + int adj = Int_val(v_adj); 105 + iovs[idx].iov_base += adj; 106 + iovs[idx].iov_len -= adj; 107 + return(Val_unit); 108 + } 109 + 110 + value 100 111 ocaml_uring_free_iovecs(value iovecs) 101 112 { 102 113 struct iovec *iovs = (struct iovec *) (iovecs & ~1); ··· 114 125 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 115 126 if (!sqe) 116 127 caml_failwith("unable to allocate SQE"); 117 - fprintf(stderr, "submit_readv: %d ents off %lu\n", len, Int64_val(v_off)); 118 - io_uring_prep_readv(sqe, Int_val(v_fd), iovs, len, Int64_val(v_off)); /* TODO add offset to intf */ 128 + fprintf(stderr, "submit_readv: %d ents off %d\n", len, Int_val(v_off)); 129 + io_uring_prep_readv(sqe, Int_val(v_fd), iovs, len, Int_val(v_off)); /* TODO add offset to intf */ 119 130 io_uring_sqe_set_data(sqe, (void *)(uintptr_t)Int_val(v_id)); /* TODO sort out cast */ 120 131 CAMLreturn(Val_unit); 121 132 } ··· 129 140 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 130 141 if (!sqe) 131 142 caml_failwith("unable to allocate SQE"); 132 - fprintf(stderr, "submit_writev: %d ents off %lu\n", len, Int64_val(v_off)); 133 - io_uring_prep_writev(sqe, Int_val(v_fd), iovs, len, Int64_val(v_off)); /* TODO add offset to intf */ 143 + fprintf(stderr, "submit_writev: %d ents off %d\n", len, Int_val(v_off)); 144 + io_uring_prep_writev(sqe, Int_val(v_fd), iovs, len, Int_val(v_off)); /* TODO add offset to intf */ 134 145 io_uring_sqe_set_data(sqe, (void *)(uintptr_t)Int_val(v_id)); /* TODO sort out cast */ 135 146 CAMLreturn(Val_unit); 136 147 }
+1 -1
uring_test.ml
··· 4 4 let b1 = Uring.Iovec.alloc_buf 3 in 5 5 let b2 = Uring.Iovec.alloc_buf 7 in 6 6 let iov = Uring.Iovec.alloc [|b1;b2|] in 7 - Uring.submit_readv t fd iov (); 7 + Uring.readv t fd iov (); 8 8 let res = Uring.submit t in 9 9 Printf.eprintf "submitted %d\n%!" res; 10 10 let (), res = Uring.wait t in