The unpac monorepo manager self-hosting as a monorepo using unpac
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

eio_linux: allow alloc_fixed_or_wait to be cancelled

+103 -33
+1 -1
lib_eio/core/fiber.ml
··· 226 226 227 227 let release t = 228 228 t.free_fibers <- t.free_fibers + 1; 229 - if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ()) 229 + if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond 230 230 231 231 let use t fn x = 232 232 await_free t;
+19 -12
lib_eio/core/single_waiter.ml
··· 1 - (* Allows a single fiber to wait to be notified by another fiber in the same domain. 2 - If multiple fibers need to wait at once, or the notification comes from another domain, 3 - this can't be used. *) 1 + type 'a state = 2 + | Running 3 + | Sleeping of (('a, exn) result -> unit) 4 4 5 - type 'a t = { 6 - mutable wake : ('a, exn) result -> unit; 7 - } 5 + type 'a t = 'a state ref 6 + 7 + let create () = ref Running 8 8 9 - let create () = { wake = ignore } 9 + let wake t v = 10 + match !t with 11 + | Running -> false 12 + | Sleeping fn -> 13 + t := Running; 14 + fn v; 15 + true 10 16 11 - let wake t v = t.wake v 17 + let wake_if_sleeping t = 18 + ignore (wake t (Ok ()) : bool) 12 19 13 20 let await t op id = 14 21 let x = 15 22 Suspend.enter op @@ fun ctx enqueue -> 16 23 Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> 17 - t.wake <- ignore; 24 + t := Running; 18 25 enqueue (Error ex) 19 26 ); 20 - t.wake <- (fun x -> 27 + t := Sleeping (fun x -> 21 28 Cancel.Fiber_context.clear_cancel_fn ctx; 22 - t.wake <- ignore; 29 + t := Running; 23 30 enqueue x 24 31 ) 25 32 in ··· 29 36 let await_protect t op id = 30 37 let x = 31 38 Suspend.enter_unchecked op @@ fun _ctx enqueue -> 32 - t.wake <- (fun x -> t.wake <- ignore; enqueue x) 39 + t := Sleeping (fun x -> t := Running; enqueue x) 33 40 in 34 41 Trace.get id; 35 42 x
+25
lib_eio/core/single_waiter.mli
··· 1 + (** Allows a single fiber to wait to be notified by another fiber in the same domain. 2 + If multiple fibers need to wait at once, or the notification comes from another domain, 3 + this can't be used. *) 4 + 5 + type 'a t 6 + (** A handle representing a fiber that might be sleeping. 7 + It is either in the Running or Sleeping state. *) 8 + 9 + val create : unit -> 'a t 10 + (** [create ()] is a new waiter, initially in the Running state. *) 11 + 12 + val wake : 'a t -> ('a, exn) result -> bool 13 + (** [wake t v] resumes [t]'s fiber with value [v] and returns [true] if it was sleeping. 14 + If [t] is Running then this just returns [false]. *) 15 + 16 + val wake_if_sleeping : unit t -> unit 17 + (** [wake_if_sleeping] is [ignore (wake t (Ok ()))]. *) 18 + 19 + val await : 'a t -> string -> Trace.id -> 'a 20 + (** [await t op id] suspends the calling fiber, changing [t]'s state to Sleeping. 21 + If the fiber is cancelled, a cancel exception is raised. 22 + [op] and [id] are used for tracing. *) 23 + 24 + val await_protect : 'a t -> string -> Trace.id -> 'a 25 + (** [await_protect] is like {!await}, but the sleep cannot be cancelled. *)
+1 -1
lib_eio/core/switch.ml
··· 72 72 if t.daemon_fibers > 0 && t.fibers = t.daemon_fibers then 73 73 Cancel.cancel t.cancel Exit; 74 74 if t.fibers = 0 then 75 - Single_waiter.wake t.waiter (Ok ()) 75 + Single_waiter.wake_if_sleeping t.waiter 76 76 77 77 let with_op t fn = 78 78 inc_fibers t;
+11 -6
lib_eio_linux/low_level.ml
··· 226 226 | exception Uring.Region.No_space -> 227 227 let id = Eio.Private.Trace.mint_id () in 228 228 let trigger = Eio.Private.Single_waiter.create () in 229 - Queue.push trigger s.mem_q; 230 - (* todo: remove protect; but needs to remove from queue on cancel *) 231 - Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id 229 + let node = Lwt_dllist.add_r trigger s.mem_q in 230 + try 231 + Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id 232 + with ex -> 233 + Lwt_dllist.remove node; 234 + raise ex 232 235 233 - let free_fixed buf = 236 + let rec free_fixed buf = 234 237 let s = Sched.get () in 235 - match Queue.take_opt s.mem_q with 238 + match Lwt_dllist.take_opt_l s.mem_q with 236 239 | None -> Uring.Region.free buf 237 - | Some k -> Eio.Private.Single_waiter.wake k (Ok buf) 240 + | Some k -> 241 + if not (Eio.Private.Single_waiter.wake k (Ok buf)) then 242 + free_fixed buf (* [k] was already cancelled, but not yet removed from the queue *) 238 243 239 244 let splice src ~dst ~len = 240 245 Fd.use_exn "splice-src" src @@ fun src ->
+3 -3
lib_eio_linux/sched.ml
··· 50 50 uring: io_job Uring.t; 51 51 mem: Uring.Region.t option; 52 52 io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) 53 - mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t; 53 + mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t; 54 54 55 55 (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) 56 56 run_q : runnable Lf_queue.t; ··· 247 247 ) else if timeout = None && Uring.active_ops uring = 0 then ( 248 248 (* Nothing further can happen at this point. 249 249 If there are no events in progress but also still no memory available, something has gone wrong! *) 250 - assert (Queue.length mem_q = 0); 250 + assert (Lwt_dllist.length mem_q = 0); 251 251 Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *) 252 252 `Exit_scheduler 253 253 ) else ( ··· 536 536 Lf_queue.push run_q IO; 537 537 let sleep_q = Zzz.create () in 538 538 let io_q = Queue.create () in 539 - let mem_q = Queue.create () in 539 + let mem_q = Lwt_dllist.create () in 540 540 with_eventfd @@ fun eventfd -> 541 541 let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in 542 542 fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
+43 -10
lib_eio_linux/tests/test.ml
··· 211 211 (fun () -> Eio.Condition.await_no_mutex cond) 212 212 (fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status)) 213 213 214 + let test_alloc_fixed_or_wait () = 215 + Eio_linux.run ~n_blocks:1 @@ fun _env -> 216 + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in 217 + (* We have to wait for the block, but get cancelled while waiting. *) 218 + begin 219 + try 220 + Fiber.both 221 + (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait () : Uring.Region.chunk)) 222 + (fun () -> raise Exit); 223 + with Exit -> () 224 + end; 225 + (* We have to wait for the block, and get it when the old one is freed. *) 226 + Fiber.both 227 + (fun () -> 228 + let x = Eio_linux.Low_level.alloc_fixed_or_wait () in 229 + Eio_linux.Low_level.free_fixed x 230 + ) 231 + (fun () -> 232 + Eio_linux.Low_level.free_fixed block 233 + ); 234 + (* The old block is passed to the waiting fiber, but it's cancelled. *) 235 + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in 236 + Fiber.both 237 + (fun () -> 238 + Fiber.first 239 + (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()); assert false) 240 + (fun () -> ()) 241 + ) 242 + (fun () -> Eio_linux.Low_level.free_fixed block); 243 + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in 244 + Eio_linux.Low_level.free_fixed block 245 + 214 246 let () = 215 247 let open Alcotest in 216 248 run "eio_linux" [ 217 249 "io", [ 218 - test_case "copy" `Quick test_copy; 219 - test_case "direct_copy" `Quick test_direct_copy; 220 - test_case "poll_add" `Quick test_poll_add; 221 - test_case "poll_add_busy" `Quick test_poll_add_busy; 222 - test_case "iovec" `Quick test_iovec; 223 - test_case "no_sqe" `Quick test_no_sqe; 224 - test_case "read_exact" `Quick test_read_exact; 225 - test_case "expose_backend" `Quick test_expose_backend; 226 - test_case "statx" `Quick test_statx; 227 - test_case "signal_race" `Quick test_signal_race; 250 + test_case "copy" `Quick test_copy; 251 + test_case "direct_copy" `Quick test_direct_copy; 252 + test_case "poll_add" `Quick test_poll_add; 253 + test_case "poll_add_busy" `Quick test_poll_add_busy; 254 + test_case "iovec" `Quick test_iovec; 255 + test_case "no_sqe" `Quick test_no_sqe; 256 + test_case "read_exact" `Quick test_read_exact; 257 + test_case "expose_backend" `Quick test_expose_backend; 258 + test_case "statx" `Quick test_statx; 259 + test_case "signal_race" `Quick test_signal_race; 260 + test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait; 228 261 ]; 229 262 ]