The unpac monorepo manager self-hosting as a monorepo using unpac
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #53 from talex5/set-fixed-buffer

Don't allocate a fixed buffer by default

authored by

Thomas Leonard and committed by
GitHub
d24c68ec 299f58f6

+66 -37
+11 -10
lib/uring/uring.ml
··· 235 235 let unregister_gc_root t = 236 236 update_gc_roots (Ring_set.remove (Generic_ring.T t)) 237 237 238 - let default_iobuf_len = 1024 * 1024 (* 1MB *) 239 - 240 - let create ?(fixed_buf_len=default_iobuf_len) ?polling_timeout ~queue_depth () = 238 + let create ?polling_timeout ~queue_depth () = 241 239 if queue_depth < 1 then Fmt.invalid_arg "Non-positive queue depth: %d" queue_depth; 242 240 let uring = Uring.create queue_depth polling_timeout in 243 - (* TODO posix memalign this to page *) 244 - let fixed_iobuf = Bigarray.(Array1.create char c_layout fixed_buf_len) in 245 - Uring.register_bigarray uring fixed_iobuf; 246 241 let data = Heap.create queue_depth in 247 242 let id = object end in 243 + let fixed_iobuf = Cstruct.empty.buffer in 248 244 let t = { id; uring; fixed_iobuf; data; dirty=false; queue_depth } in 249 245 register_gc_root t; 250 246 t ··· 254 250 | 0 -> () 255 251 | n -> Fmt.invalid_arg "%s: %d request(s) still active!" op n 256 252 257 - let realloc t iobuf = 258 - ensure_idle t "realloc"; 259 - Uring.unregister_buffers t.uring; 253 + let set_fixed_buffer t iobuf = 254 + ensure_idle t "set_fixed_buffer"; 255 + if Bigarray.Array1.dim t.fixed_iobuf > 0 then 256 + Uring.unregister_buffers t.uring; 260 257 t.fixed_iobuf <- iobuf; 261 - Uring.register_bigarray t.uring iobuf 258 + if Bigarray.Array1.dim iobuf > 0 then ( 259 + match Uring.register_bigarray t.uring iobuf with 260 + | () -> Ok () 261 + | exception Unix.Unix_error(Unix.ENOMEM, "io_uring_register_buffers", "") -> Error `ENOMEM 262 + ) else Ok () 262 263 263 264 let exit t = 264 265 ensure_idle t "exit";
+25 -14
lib/uring/uring.mli
··· 25 25 (** A handle for a submitted job, which can be used to cancel it. 26 26 If an operation returns [None], this means that submission failed because the ring is full. *) 27 27 28 - val create : ?fixed_buf_len:int -> ?polling_timeout:int -> queue_depth:int -> unit -> 'a t 29 - (** [create ?fixed_buf_len ~queue_depth] will return a fresh Io_uring structure 30 - [t]. Each [t] has associated with it a fixed region of memory that is used 31 - for the "fixed buffer" mode of io_uring to avoid data copying between 32 - userspace and the kernel. 28 + val create : ?polling_timeout:int -> queue_depth:int -> unit -> 'a t 29 + (** [create ~queue_depth] will return a fresh Io_uring structure [t]. 30 + Initially, [t] has no fixed buffer. Use {!set_fixed_buffer} if you want one. 33 31 @param polling_timeout If given, use polling mode with the given idle timeout (in ms). 34 32 This requires privileges. *) 35 33 36 34 val queue_depth : 'a t -> int 37 35 (** [queue_depth t] returns the total number of submission slots for the uring [t] *) 38 36 39 - val buf : 'a t -> Cstruct.buffer 40 - (** [buf t] is the fixed internal memory buffer associated with uring [t]. 37 + val exit : 'a t -> unit 38 + (** [exit t] will shut down the uring [t]. Any subsequent requests will fail. 39 + @raise Invalid_argument if there are any requests in progress *) 40 + 41 + (** {2 Fixed buffers} 42 + 43 + Each uring may have associated with it a fixed region of memory that is used 44 + for the "fixed buffer" mode of io_uring to avoid data copying between 45 + userspace and the kernel. *) 46 + 47 + val set_fixed_buffer : 'a t -> Cstruct.buffer -> (unit, [> `ENOMEM]) result 48 + (** [set_fixed_buffer t buf] sets [buf] as the fixed buffer for [t]. 49 + 41 50 You will normally want to wrap this with {!Region.alloc} or similar 42 - to divide the buffer into chunks. *) 51 + to divide the buffer into chunks. 52 + 53 + If [t] already has a buffer set, the old one will be removed. 54 + 55 + Returns [`ENOMEM] if insufficient kernel resources are available 56 + or the caller's RLIMIT_MEMLOCK resource limit would be exceeded. 43 57 44 - val realloc : 'a t -> Cstruct.buffer -> unit 45 - (** [realloc t buf] will replace the internal fixed buffer associated with 46 - uring [t] with a fresh one. 47 58 @raise Invalid_argument if there are any requests in progress *) 48 59 49 - val exit : 'a t -> unit 50 - (** [exit t] will shut down the uring [t]. Any subsequent requests will fail. 51 - @raise Invalid_argument if there are any requests in progress *) 60 + val buf : 'a t -> Cstruct.buffer 61 + (** [buf t] is the fixed internal memory buffer associated with uring [t] 62 + using {!set_fixed_buffer}, or a zero-length buffer if none is set. *) 52 63 53 64 (** {2 Queueing operations} *) 54 65
+17 -7
tests/main.ml
··· 123 123 check_raises ~__POS__ (Invalid_argument "Non-positive queue depth: 0") 124 124 (fun () -> ignore (Uring.create ~queue_depth:0 ())) 125 125 126 - let with_uring ?(fixed_buf_len=1024) ~queue_depth fn = 127 - let t = Uring.create ~fixed_buf_len ~queue_depth () in 126 + let with_uring ~queue_depth fn = 127 + let t = Uring.create ~queue_depth () in 128 128 fn t; 129 129 Uring.exit t (* Only free if there wasn't an error *) 130 130 ··· 210 210 check_bool ~__POS__ ~expected:true @@ get ~resolve:Uring.Resolve.empty ".."; 211 211 check_bool ~__POS__ ~expected:false @@ get ~resolve:Uring.Resolve.beneath ".." 212 212 213 + let set_fixed_buffer t size = 214 + let fbuf = Bigarray.(Array1.create char c_layout size) in 215 + match Uring.set_fixed_buffer t fbuf with 216 + | Ok () -> fbuf 217 + | Error `ENOMEM -> failwith "Resource limit exceeded" 218 + 213 219 let test_read () = 214 220 with_uring ~queue_depth:1 @@ fun t -> 221 + let fbuf = set_fixed_buffer t 1024 in 215 222 Test_data.with_fd @@ fun fd -> 216 - 217 223 let off = 3 in 218 224 let len = 5 in 219 225 let file_offset = Int63.of_int 2 in ··· 224 230 assert_ ~__POS__ (token = `Read); 225 231 check_int ~__POS__ read ~expected:len; 226 232 227 - let fbuf = Uring.buf t in 228 233 let got = Cstruct.of_bigarray fbuf ~off ~len in 229 234 check_string ~__POS__ ~expected:"test " (Cstruct.to_string got) 230 235 ··· 261 266 check_string ~__POS__ ~expected:"Gathered [A te] and [st ]" (Cstruct.to_string b) 262 267 263 268 let test_region () = 264 - with_uring ~queue_depth:1 ~fixed_buf_len:64 @@ fun t -> 269 + with_uring ~queue_depth:1 @@ fun t -> 270 + let fbuf = set_fixed_buffer t 64 in 265 271 Test_data.with_fd @@ fun fd -> 266 - let region = Uring.Region.init (Uring.buf t) 4 ~block_size:16 in 272 + let region = Uring.Region.init fbuf 4 ~block_size:16 in 267 273 let chunk = Uring.Region.alloc region in 268 274 assert_some ~__POS__ (Uring.read_chunk t fd chunk `Read ~file_offset:Int63.zero); 269 275 let token, read = consume t in ··· 283 289 (* Ask to read from a pipe (with no data available), then cancel it. *) 284 290 let test_cancel () = 285 291 with_uring ~queue_depth:5 @@ fun t -> 292 + let _fbuf = set_fixed_buffer t 1024 in 286 293 (* while true do *) 287 294 let r, w = Unix.pipe () in 288 295 let read = Uring.read_fixed t ~file_offset:Int63.zero r ~off:0 ~len:1 `Read |> Option.get in ··· 313 320 (* By the time we cancel, the request has already succeeded (we just didn't process the reply yet). *) 314 321 let test_cancel_late () = 315 322 with_uring ~queue_depth:5 @@ fun t -> 323 + let _fbuf = set_fixed_buffer t 1024 in 316 324 let r = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0 in 317 325 let read = Uring.read_fixed t ~file_offset:Int63.zero r ~off:0 ~len:1 `Read |> Option.get in 318 326 check_int ~__POS__ (Uring.submit t) ~expected:1; ··· 333 341 ) else ( 334 342 (* This isn't the case we want to test, but it can happen sometimes. *) 335 343 check_int ~__POS__ ~expected:(-125) r_read; (* ECANCELED *) 336 - check_int ~__POS__ ~expected:1 r_cancel; (* Success *) 344 + check_int ~__POS__ ~expected:0 r_cancel; (* Success *) 337 345 ); 338 346 Unix.close r 339 347 340 348 (* By the time we cancel, we already knew the operation was over. *) 341 349 let test_cancel_invalid () = 342 350 with_uring ~queue_depth:5 @@ fun t -> 351 + let _fbuf = set_fixed_buffer t 1024 in 343 352 let r = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0 in 344 353 let read = Uring.read_fixed t ~file_offset:Int63.zero r ~off:0 ~len:1 `Read |> Option.get in 345 354 let token, r_read = consume t in ··· 353 362 354 363 let test_free_busy () = 355 364 let t = Uring.create ~queue_depth:1 () in 365 + let _fbuf = set_fixed_buffer t 1024 in 356 366 let r, w = Unix.pipe () in 357 367 Fun.protect ~finally:(fun () -> Unix.close r) @@ fun () -> 358 368 assert_some ~__POS__ (Uring.read_fixed t ~file_offset:Int63.minus_one r ~off:0 ~len:1 `Read);
+13 -6
tests/urcp_fixed_lib.ml
··· 158 158 let t = { freelist; block_size; insize; offset=Int63.zero; reads=0; writes=0; write_left=insize; read_left=insize; infd; outfd } in 159 159 Logs.debug (fun l -> l "starting: %a bs=%d qd=%d" pp t block_size queue_depth); 160 160 let fixed_buf_len = queue_depth * block_size in 161 - let uring = Uring.create ~fixed_buf_len ~queue_depth () in 162 - copy_file uring t; 163 - Unix.close infd; 164 - Unix.close outfd; 165 - Uring.exit uring; 166 - Gc.compact () (* TODO to aid debugging with valgrind, remove soon *) 161 + let uring = Uring.create ~queue_depth () in 162 + let fbuf = Bigarray.(Array1.create char c_layout fixed_buf_len) in 163 + Fun.protect 164 + (fun () -> 165 + match Uring.set_fixed_buffer uring fbuf with 166 + | Ok () -> copy_file uring t 167 + | Error `ENOMEM -> failwith "Can't lock memory (check RLIMIT_MEMLOCK)" 168 + ) 169 + ~finally:(fun () -> 170 + Unix.close infd; 171 + Unix.close outfd; 172 + Uring.exit uring 173 + )