···226226227227 let release t =
228228 t.free_fibers <- t.free_fibers + 1;
229229- if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ())
229229+ if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond
230230231231 let use t fn x =
232232 await_free t;
+19-12
lib_eio/core/single_waiter.ml
···11-(* Allows a single fiber to wait to be notified by another fiber in the same domain.
22- If multiple fibers need to wait at once, or the notification comes from another domain,
33- this can't be used. *)
11+type 'a state =
22+ | Running
33+ | Sleeping of (('a, exn) result -> unit)
4455-type 'a t = {
66- mutable wake : ('a, exn) result -> unit;
77-}
55+type 'a t = 'a state ref
66+77+let create () = ref Running
8899-let create () = { wake = ignore }
99+let wake t v =
1010+ match !t with
1111+ | Running -> false
1212+ | Sleeping fn ->
1313+ t := Running;
1414+ fn v;
1515+ true
10161111-let wake t v = t.wake v
1717+let wake_if_sleeping t =
1818+ ignore (wake t (Ok ()) : bool)
12191320let await t op id =
1421 let x =
1522 Suspend.enter op @@ fun ctx enqueue ->
1623 Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
1717- t.wake <- ignore;
2424+ t := Running;
1825 enqueue (Error ex)
1926 );
2020- t.wake <- (fun x ->
2727+ t := Sleeping (fun x ->
2128 Cancel.Fiber_context.clear_cancel_fn ctx;
2222- t.wake <- ignore;
2929+ t := Running;
2330 enqueue x
2431 )
2532 in
···2936let await_protect t op id =
3037 let x =
3138 Suspend.enter_unchecked op @@ fun _ctx enqueue ->
3232- t.wake <- (fun x -> t.wake <- ignore; enqueue x)
3939+ t := Sleeping (fun x -> t := Running; enqueue x)
3340 in
3441 Trace.get id;
3542 x
+25
lib_eio/core/single_waiter.mli
···11+(** Allows a single fiber to wait to be notified by another fiber in the same domain.
22+ If multiple fibers need to wait at once, or the notification comes from another domain,
33+ this can't be used. *)
44+55+type 'a t
66+(** A handle representing a fiber that might be sleeping.
77+ It is either in the Running or Sleeping state. *)
88+99+val create : unit -> 'a t
1010+(** [create ()] is a new waiter, initially in the Running state. *)
1111+1212+val wake : 'a t -> ('a, exn) result -> bool
1313+(** [wake t v] resumes [t]'s fiber with value [v] and returns [true] if it was sleeping.
1414+ If [t] is Running then this just returns [false]. *)
1515+1616+val wake_if_sleeping : unit t -> unit
1717+(** [wake_if_sleeping] is [ignore (wake t (Ok ()))]. *)
1818+1919+val await : 'a t -> string -> Trace.id -> 'a
2020+(** [await t op id] suspends the calling fiber, changing [t]'s state to Sleeping.
2121+ If the fiber is cancelled, a cancel exception is raised.
2222+ [op] and [id] are used for tracing. *)
2323+2424+val await_protect : 'a t -> string -> Trace.id -> 'a
2525+(** [await_protect] is like {!await}, but the sleep cannot be cancelled. *)
+1-1
lib_eio/core/switch.ml
···7272 if t.daemon_fibers > 0 && t.fibers = t.daemon_fibers then
7373 Cancel.cancel t.cancel Exit;
7474 if t.fibers = 0 then
7575- Single_waiter.wake t.waiter (Ok ())
7575+ Single_waiter.wake_if_sleeping t.waiter
76767777let with_op t fn =
7878 inc_fibers t;
+11-6
lib_eio_linux/low_level.ml
···226226 | exception Uring.Region.No_space ->
227227 let id = Eio.Private.Trace.mint_id () in
228228 let trigger = Eio.Private.Single_waiter.create () in
229229- Queue.push trigger s.mem_q;
230230- (* todo: remove protect; but needs to remove from queue on cancel *)
231231- Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id
229229+ let node = Lwt_dllist.add_r trigger s.mem_q in
230230+ try
231231+ Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id
232232+ with ex ->
233233+ Lwt_dllist.remove node;
234234+ raise ex
232235233233-let free_fixed buf =
236236+let rec free_fixed buf =
234237 let s = Sched.get () in
235235- match Queue.take_opt s.mem_q with
238238+ match Lwt_dllist.take_opt_l s.mem_q with
236239 | None -> Uring.Region.free buf
237237- | Some k -> Eio.Private.Single_waiter.wake k (Ok buf)
240240+ | Some k ->
241241+ if not (Eio.Private.Single_waiter.wake k (Ok buf)) then
242242+ free_fixed buf (* [k] was already cancelled, but not yet removed from the queue *)
238243239244let splice src ~dst ~len =
240245 Fd.use_exn "splice-src" src @@ fun src ->
+3-3
lib_eio_linux/sched.ml
···5050 uring: io_job Uring.t;
5151 mem: Uring.Region.t option;
5252 io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
5353- mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
5353+ mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t;
54545555 (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
5656 run_q : runnable Lf_queue.t;
···247247 ) else if timeout = None && Uring.active_ops uring = 0 then (
248248 (* Nothing further can happen at this point.
249249 If there are no events in progress but also still no memory available, something has gone wrong! *)
250250- assert (Queue.length mem_q = 0);
250250+ assert (Lwt_dllist.length mem_q = 0);
251251 Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *)
252252 `Exit_scheduler
253253 ) else (
···536536 Lf_queue.push run_q IO;
537537 let sleep_q = Zzz.create () in
538538 let io_q = Queue.create () in
539539- let mem_q = Queue.create () in
539539+ let mem_q = Lwt_dllist.create () in
540540 with_eventfd @@ fun eventfd ->
541541 let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
542542 fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
+43-10
lib_eio_linux/tests/test.ml
···211211 (fun () -> Eio.Condition.await_no_mutex cond)
212212 (fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status))
213213214214+let test_alloc_fixed_or_wait () =
215215+ Eio_linux.run ~n_blocks:1 @@ fun _env ->
216216+ let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
217217+ (* We have to wait for the block, but get cancelled while waiting. *)
218218+ begin
219219+ try
220220+ Fiber.both
221221+ (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait () : Uring.Region.chunk))
222222+ (fun () -> raise Exit);
223223+ with Exit -> ()
224224+ end;
225225+ (* We have to wait for the block, and get it when the old one is freed. *)
226226+ Fiber.both
227227+ (fun () ->
228228+ let x = Eio_linux.Low_level.alloc_fixed_or_wait () in
229229+ Eio_linux.Low_level.free_fixed x
230230+ )
231231+ (fun () ->
232232+ Eio_linux.Low_level.free_fixed block
233233+ );
234234+ (* The old block is passed to the waiting fiber, but it's cancelled. *)
235235+ let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
236236+ Fiber.both
237237+ (fun () ->
238238+ Fiber.first
239239+ (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()); assert false)
240240+ (fun () -> ())
241241+ )
242242+ (fun () -> Eio_linux.Low_level.free_fixed block);
243243+ let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
244244+ Eio_linux.Low_level.free_fixed block
245245+214246let () =
215247 let open Alcotest in
216248 run "eio_linux" [
217249 "io", [
218218- test_case "copy" `Quick test_copy;
219219- test_case "direct_copy" `Quick test_direct_copy;
220220- test_case "poll_add" `Quick test_poll_add;
221221- test_case "poll_add_busy" `Quick test_poll_add_busy;
222222- test_case "iovec" `Quick test_iovec;
223223- test_case "no_sqe" `Quick test_no_sqe;
224224- test_case "read_exact" `Quick test_read_exact;
225225- test_case "expose_backend" `Quick test_expose_backend;
226226- test_case "statx" `Quick test_statx;
227227- test_case "signal_race" `Quick test_signal_race;
250250+ test_case "copy" `Quick test_copy;
251251+ test_case "direct_copy" `Quick test_direct_copy;
252252+ test_case "poll_add" `Quick test_poll_add;
253253+ test_case "poll_add_busy" `Quick test_poll_add_busy;
254254+ test_case "iovec" `Quick test_iovec;
255255+ test_case "no_sqe" `Quick test_no_sqe;
256256+ test_case "read_exact" `Quick test_read_exact;
257257+ test_case "expose_backend" `Quick test_expose_backend;
258258+ test_case "statx" `Quick test_statx;
259259+ test_case "signal_race" `Quick test_signal_race;
260260+ test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait;
228261 ];
229262 ]