···606606607607 module Cells = Cells
608608 module Broadcast = Broadcast
609609+ module Single_waiter = Single_waiter
609610610611 (** Every fiber has an associated context. *)
611612 module Fiber_context : sig
+26-3
vendor/opam/eio/lib_eio_linux/low_level.ml
···207207 raise @@ Err.wrap (Uring.error_of_errno res) "write" ""
208208 )
209209210210-let alloc_fixed () = Effect.perform Sched.Alloc
210210+let alloc_fixed () =
211211+ let s = Sched.get () in
212212+ match s.mem with
213213+ | None -> None
214214+ | Some mem ->
215215+ match Uring.Region.alloc mem with
216216+ | buf -> Some buf
217217+ | exception Uring.Region.No_space -> None
211218212212-let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait
219219+let alloc_fixed_or_wait () =
220220+ let s = Sched.get () in
221221+ match s.mem with
222222+ | None -> failwith "No fixed buffer available"
223223+ | Some mem ->
224224+ match Uring.Region.alloc mem with
225225+ | buf -> buf
226226+ | exception Uring.Region.No_space ->
227227+ let id = Eio.Private.Trace.mint_id () in
228228+ let trigger = Eio.Private.Single_waiter.create () in
229229+ Queue.push trigger s.mem_q;
230230+ (* todo: remove protect; but needs to remove from queue on cancel *)
231231+ Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id
213232214214-let free_fixed buf = Effect.perform (Sched.Free buf)
233233+let free_fixed buf =
234234+ let s = Sched.get () in
235235+ match Queue.take_opt s.mem_q with
236236+ | None -> Uring.Region.free buf
237237+ | Some k -> Eio.Private.Single_waiter.wake k (Ok buf)
215238216239let splice src ~dst ~len =
217240 Fd.use_exn "splice-src" src @@ fun src ->
+6-36
vendor/opam/eio/lib_eio_linux/sched.ml
···5050 uring: io_job Uring.t;
5151 mem: Uring.Region.t option;
5252 io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
5353- mem_q : Uring.Region.chunk Suspended.t Queue.t;
5353+ mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
54545555 (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
5656 run_q : runnable Lf_queue.t;
···7474type _ Effect.t +=
7575 | Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
7676 | Cancel : io_job Uring.job -> unit Effect.t
7777- | Alloc : Uring.Region.chunk option Effect.t
7878- | Alloc_or_wait : Uring.Region.chunk Effect.t
7979- | Free : Uring.Region.chunk -> unit Effect.t
7777+ | Get : t Effect.t
7878+7979+let get () = Effect.perform Get
80808181let wake_buffer =
8282 let b = Bytes.create 8 in
···339339 | _, Exactly len -> Suspended.continue action len
340340 | n, Upto _ -> Suspended.continue action n
341341342342-let alloc_buf_or_wait st k =
343343- match st.mem with
344344- | None -> Suspended.discontinue k (Failure "No fixed buffer available")
345345- | Some mem ->
346346- match Uring.Region.alloc mem with
347347- | buf -> Suspended.continue k buf
348348- | exception Uring.Region.No_space ->
349349- Queue.push k st.mem_q;
350350- schedule st
351351-352352-let free_buf st buf =
353353- match Queue.take_opt st.mem_q with
354354- | None -> Uring.Region.free buf
355355- | Some k -> enqueue_thread st k buf
356356-357342let rec enqueue_poll_add fd poll_mask st action =
358343 Trace.log "poll_add";
359344 let retry = with_cancel_hook ~action st (fun () ->
···411396 Fiber_context.destroy fiber;
412397 Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ())
413398 );
414414- effc = fun (type a) (e : a Effect.t) ->
399399+ effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option ->
415400 match e with
401401+ | Get -> Some (fun k -> continue k st)
416402 | Enter fn -> Some (fun k ->
417403 match Fiber_context.get_error fiber with
418404 | Some e -> discontinue k e
···466452 let enqueue x = enqueue_thread st k (x, st.thread_pool) in
467453 Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
468454 schedule st
469469- )
470470- | Alloc -> Some (fun k ->
471471- match st.mem with
472472- | None -> continue k None
473473- | Some mem ->
474474- match Uring.Region.alloc mem with
475475- | buf -> continue k (Some buf)
476476- | exception Uring.Region.No_space -> continue k None
477477- )
478478- | Alloc_or_wait -> Some (fun k ->
479479- let k = { Suspended.k; fiber } in
480480- alloc_buf_or_wait st k
481481- )
482482- | Free buf -> Some (fun k ->
483483- free_buf st buf;
484484- continue k ()
485455 )
486456 | e -> extra_effects.effc e
487457 }