···11-module type MUTEX = sig
22-(** Locks for mutual exclusion with support for multiple concurrency backends.
33-44- This module provides a unified interface for mutexes that can work with
55- different concurrency backends (OCaml standard library, Picos, etc.).
66- It extends the standard OCaml mutex interface with additional functionality
77- for acquiring multiple mutexes atomically.
88-*)
99-1010-type t
1111-(** The type of mutexes. *)
1212-1313-val create : unit -> t
1414-(** Return a new mutex. *)
1515-1616-val lock : t -> unit
1717-(** Lock the given mutex. Only one thread can have the mutex locked
1818- at any time. A thread that attempts to lock a mutex already locked
1919- by another thread will suspend until the other thread unlocks
2020- the mutex.
2121-2222- @raise Sys_error if the mutex is already locked by the thread calling
2323- {!lock}.
2424-2525- @before 4.12 {!Sys_error} was not raised for recursive locking
2626- (platform-dependent behaviour) *)
2727-2828-val try_lock : t -> bool
2929-(** Same as {!lock}, but does not suspend the calling thread if
3030- the mutex is already locked: just return [false] immediately
3131- in that case. If the mutex is unlocked, lock it and
3232- return [true]. *)
3333-3434-val unlock : t -> unit
3535-(** Unlock the given mutex. Other threads suspended trying to lock
3636- the mutex will restart. The mutex must have been previously locked
3737- by the thread that calls {!unlock}.
3838- @raise Sys_error if the mutex is unlocked or was locked by another thread.
3939-4040- @before 4.12 {!Sys_error} was not raised when unlocking an unlocked mutex
4141- or when unlocking a mutex from a different thread. *)
4242-4343-val protect : t -> (unit -> 'a) -> 'a
4444-(** [protect mutex f] runs [f()] in a critical section where [mutex]
4545- is locked (using {!lock}); it then takes care of releasing [mutex],
4646- whether [f()] returned a value or raised an exception.
4747-4848- The unlocking operation is guaranteed to always takes place,
4949- even in the event an asynchronous exception (e.g. {!Sys.Break}) is raised
5050- in some signal handler.
5151-5252- @since 5.1 *)
5353-5454-val lock_all : t list -> bool
5555-(** [lock_all mutexes] attempts to acquire all mutexes in the list atomically.
5656- It uses {!try_lock} for each mutex in the order provided. If any mutex
5757- cannot be acquired, it releases all previously acquired mutexes and
5858- returns [false]. If all mutexes are successfully acquired, it returns [true].
5959-6060- This function is useful for avoiding deadlocks when multiple mutexes
6161- need to be acquired simultaneously.
6262-6363- @return [true] if all mutexes were successfully acquired, [false] otherwise.
6464-6565- Note: The caller is responsible for unlocking all mutexes that were
6666- successfully acquired when this function returns [true]. *)
6767-end
···11-(* Default stdlib mutex implementation, actual implementation is in lwd_impl.ml *)
22-include Lwd_impl.Make(Mutex_backend.Stdlib)
+2
forks/lwd/lib/lwd/lwd.mli
···160160161161(* For debug purposes *)
162162val dump_trace : 'a t -> unit
163163+164164+(* val to_mermaid : ?max_nodes:int -> 'a t_ -> string *)
+1101-753
forks/lwd/lib/lwd/lwd_impl.ml
···11module Make (Mutex : Mutex_backend.MUTEX) = struct
22+ let log_src = Logs.Src.create "lwd.impl" ~doc:"Lwd implementation"
33+ module Log = (val Logs.src_log log_src : Logs.LOG)
2433-(** Create-only version of [Obj.t] *)
44-module Any : sig
55- type t
66- val any : 'a -> t
77-end = struct
88- type t = Obj.t
99- let any = Obj.repr
1010-end
55+ (** Create-only version of [Obj.t] *)
66+ module Any : sig
77+ type t
88+ val any : 'a -> t
99+ end = struct
1010+ type t = Obj.t
1111+ let any = Obj.repr
1212+ end
1313+1414+ type 'a eval =
1515+ | Eval_none
1616+ | Eval_progress
1717+ | Eval_some of 'a
1818+ | Eval_invalid_next
1919+2020+ type 'a t_ =
2121+ | Pure of 'a
2222+ | Operator : {
2323+ mutex : Mutex.t;
2424+ mutable value : 'a eval; (* cached value *)
2525+ mutable trace : trace; (* list of parents this can invalidate *)
2626+ mutable trace_idx : trace_idx; (* list of direct children that can invalidate this *)
2727+ desc: 'a desc;
2828+ } -> 'a t_
2929+ | Root : {
3030+ mutex : Mutex.t;
3131+ mutable value : 'a eval; (* cached value *)
3232+ mutable trace_idx : trace_idx; (* list of direct children that can invalidate this *)
3333+ mutable on_invalidate : 'a -> unit;
3434+ mutable acquired : bool;
3535+ child : 'a t_;
3636+ } -> 'a t_
3737+3838+ and _ desc =
3939+ | Map : 'a t_ * ('a -> 'b) -> 'b desc
4040+ | Map2 : 'a t_ * 'b t_ * ('a -> 'b -> 'c) -> 'c desc
4141+ | Pair : 'a t_ * 'b t_ -> ('a * 'b) desc
4242+ | App : ('a -> 'b) t_ * 'a t_ -> 'b desc
4343+ | Join : { child : 'a t_ t_; mutable intermediate : 'a t_ option } -> 'a desc
4444+ | Var : { mutable binding : 'a } -> 'a desc
4545+ | Prim : { acquire : 'a t -> 'a;
4646+ release : 'a t -> 'a -> unit } -> 'a desc
4747+ | Fix : { doc : 'a t_; wrt : _ t_ } -> 'a desc
4848+4949+ (* a set of (active) parents for a ['a t], used during invalidation *)
5050+ and trace =
5151+ | T0
5252+ | T1 : _ t_ -> trace
5353+ | T2 : _ t_ * _ t_ -> trace
5454+ | T3 : _ t_ * _ t_ * _ t_ -> trace
5555+ | T4 : _ t_ * _ t_ * _ t_ * _ t_ -> trace
5656+ | Tn : { mutable active : int; mutable count : int;
5757+ mutable entries : Any.t t_ array } -> trace
5858+5959+ (* a set of direct children for a composite document *)
6060+ and trace_idx =
6161+ | I0
6262+ | I1 : { mutable idx : int ;
6363+ obj : 'a t_;
6464+ mutable next : trace_idx } -> trace_idx
6565+6666+ (* The type system cannot see that t is covariant in its parameter.
6767+ Use the Force to convince it. *)
6868+ and +'a t
6969+ external inj : 'a t_ -> 'a t = "%identity"
7070+ external prj : 'a t -> 'a t_ = "%identity"
7171+ external prj2 : 'a t t -> 'a t_ t_ = "%identity"
7272+7373+ (* Basic combinators *)
7474+ let return x = inj (Pure x)
7575+ let pure x = inj (Pure x)
7676+7777+ let is_pure x = match prj x with
7878+ | Pure x -> Some x
7979+ | _ -> None
8080+8181+ let dummy = Pure (Any.any ())
8282+8383+ let operator desc =
8484+ Operator { value = Eval_none; trace = T0; desc; trace_idx = I0 ;mutex= Mutex.create () }
8585+8686+ let map x ~f = inj (
8787+ match prj x with
8888+ | Pure vx -> Pure (f vx)
8989+ | x -> operator (Map (x, f))
9090+ )
9191+9292+ let map2 x y ~f = inj (
9393+ match prj x, prj y with
9494+ | Pure vx, Pure vy -> Pure (f vx vy)
9595+ | x, y -> operator (Map2 (x, y, f))
9696+ )
9797+9898+9999+ let pair x y = inj (
100100+ match prj x, prj y with
101101+ | Pure vx, Pure vy -> Pure (vx, vy)
102102+ | x, y -> operator (Pair (x, y))
103103+ )
104104+105105+ let app f x = inj (
106106+ match prj f, prj x with
107107+ | Pure vf, Pure vx -> Pure (vf vx)
108108+ | f, x -> operator (App (f, x))
109109+ )
110110+111111+ let join child = inj (
112112+ match prj2 child with
113113+ | Pure v -> v
114114+ | child -> operator (Join { child; intermediate = None })
115115+ )
116116+117117+ let bind x ~f = join (map ~f x)
118118+119119+ let pp_eval_status fmt eval =
120120+ match eval with
121121+ | Eval_none -> Format.fprintf fmt "None"
122122+ | Eval_progress -> Format.fprintf fmt "Progress"
123123+ | Eval_some _ -> Format.fprintf fmt "Some"
124124+ | Eval_invalid_next -> Format.fprintf fmt "Invalid_next"
125125+126126+ (* Management of trace indices *)
127127+128128+ let addr oc obj =
129129+ Printf.fprintf oc "0x%08x" (Obj.magic obj : int)
130130+131131+ let pp_addr fmt obj =
132132+ Format.fprintf fmt "0x%08x" (Obj.magic obj : int)
133133+134134+ external t_equal : _ t_ -> _ t_ -> bool = "%eq"
135135+ external obj_t : 'a t_ -> Any.t t_ = "%identity"
136136+137137+ let rec dump_trace_format : type a. Format.formatter -> a t_ -> unit =
138138+ fun fmt obj ->
139139+ match obj with
140140+ | Pure _ -> Format.fprintf fmt "%a: Pure _@." pp_addr obj
141141+ | Operator t ->
142142+ Format.fprintf fmt "%a: Operator _ -> %a@." pp_addr obj dump_trace_aux
143143+ t.trace;
144144+ begin
145145+ match t.trace with
146146+ | T0 -> ()
147147+ | T1 a -> dump_trace_format fmt a
148148+ | T2 (a, b) ->
149149+ dump_trace_format fmt a;
150150+ dump_trace_format fmt b
151151+ | T3 (a, b, c) ->
152152+ dump_trace_format fmt a;
153153+ dump_trace_format fmt b;
154154+ dump_trace_format fmt c
155155+ | T4 (a, b, c, d) ->
156156+ dump_trace_format fmt a;
157157+ dump_trace_format fmt b;
158158+ dump_trace_format fmt c;
159159+ dump_trace_format fmt d
160160+ | Tn t -> Array.iter (dump_trace_format fmt) t.entries
161161+ end
162162+ | Root t ->
163163+ Format.fprintf fmt "%a: Root _@." pp_addr obj
111641212-type 'a eval =
1313- | Eval_none
1414- | Eval_progress
1515- | Eval_some of 'a
1616- | Eval_invalid_next
165165+ and dump_trace_aux : type a. Format.formatter -> trace -> unit =
166166+ fun fmt -> function
167167+ | T0 -> Format.fprintf fmt "T0"
168168+ | T1 a -> Format.fprintf fmt "T1 %a" pp_addr a
169169+ | T2 (a, b) -> Format.fprintf fmt "T2 (%a, %a)" pp_addr a pp_addr b
170170+ | T3 (a, b, c) ->
171171+ Format.fprintf fmt "T3 (%a, %a, %a)" pp_addr a pp_addr b pp_addr c
172172+ | T4 (a, b, c, d) ->
173173+ Format.fprintf fmt "T4 (%a, %a, %a, %a)" pp_addr a pp_addr b pp_addr c
174174+ pp_addr d
175175+ | Tn t ->
176176+ Format.fprintf fmt "Tn {active = %d; count = %d; entries = " t.active
177177+ t.count;
178178+ Array.iter (Format.fprintf fmt "(%a)" pp_addr) t.entries;
179179+ Format.fprintf fmt "}"
180180+181181+ let dump_trace x = dump_trace_format Format.err_formatter (obj_t (prj x))
182182+183183+ let to_mermaid (type a) ?(max_nodes=100) (root : a t_) : string =
184184+ let buf = Buffer.create 1024 in
185185+ Buffer.add_string buf "graph TD;\n";
186186+ let visited : (string, unit) Hashtbl.t = Hashtbl.create 16 in
171871818-type 'a t_ =
1919- | Pure of 'a
2020- | Operator : {
2121- mutex : Mutex.t;
2222- mutable value : 'a eval; (* cached value *)
2323- mutable trace : trace; (* list of parents this can invalidate *)
2424- mutable trace_idx : trace_idx; (* list of direct children that can invalidate this *)
2525- desc: 'a desc;
2626- } -> 'a t_
2727- | Root : {
2828- mutex : Mutex.t;
2929- mutable value : 'a eval; (* cached value *)
3030- mutable trace_idx : trace_idx; (* list of direct children that can invalidate this *)
3131- mutable on_invalidate : 'a -> unit;
3232- mutable acquired : bool;
3333- child : 'a t_;
3434- } -> 'a t_
188188+ let get_id : type a. a t_ -> string = fun node -> Format.asprintf "%a" pp_addr node in
351893636-and _ desc =
3737- | Map : 'a t_ * ('a -> 'b) -> 'b desc
3838- | Map2 : 'a t_ * 'b t_ * ('a -> 'b -> 'c) -> 'c desc
3939- | Pair : 'a t_ * 'b t_ -> ('a * 'b) desc
4040- | App : ('a -> 'b) t_ * 'a t_ -> 'b desc
4141- | Join : { child : 'a t_ t_; mutable intermediate : 'a t_ option } -> 'a desc
4242- | Var : { mutable binding : 'a; mutable nextVal: 'a option } -> 'a desc
4343- | Prim : { acquire : 'a t -> 'a;
4444- release : 'a t -> 'a -> unit } -> 'a desc
4545- | Fix : { doc : 'a t_; wrt : _ t_ } -> 'a desc
190190+ let get_eval_status_str (type a) (eval : a eval) : string =
191191+ match eval with
192192+ | Eval_none -> "None"
193193+ | Eval_progress -> "Progress"
194194+ | Eval_some _ -> "Some"
195195+ | Eval_invalid_next -> "Invalid_next"
196196+ in
461974747-(* a set of (active) parents for a ['a t], used during invalidation *)
4848-and trace =
4949- | T0
5050- | T1 : _ t_ -> trace
5151- | T2 : _ t_ * _ t_ -> trace
5252- | T3 : _ t_ * _ t_ * _ t_ -> trace
5353- | T4 : _ t_ * _ t_ * _ t_ * _ t_ -> trace
5454- | Tn : { mutable active : int; mutable count : int;
5555- mutable entries : Any.t t_ array } -> trace
198198+ (* Using breadth-first traversal with a queue to prevent stack overflows on deep graphs. *)
199199+ let q : Any.t t_ Queue.t = Queue.create () in
200200+ Queue.add (obj_t ( root)) q;
201201+ let nodes_processed = ref 0 in
562025757-(* a set of direct children for a composite document *)
5858-and trace_idx =
5959- | I0
6060- | I1 : { mutable idx : int ;
6161- obj : 'a t_;
6262- mutable next : trace_idx } -> trace_idx
203203+ let rec process_queue () =
204204+ if not (Queue.is_empty q) && !nodes_processed < max_nodes then (
205205+ let node = Queue.take q in
206206+ let node_id_str = get_id node in
207207+ if not (Hashtbl.mem visited node_id_str) then (
208208+ Hashtbl.add visited node_id_str ();
209209+ incr nodes_processed;
210210+ let node_id_str = get_id node in
632116464-(* The type system cannot see that t is covariant in its parameter.
6565- Use the Force to convince it. *)
6666-and +'a t
6767-external inj : 'a t_ -> 'a t = "%identity"
6868-external prj : 'a t -> 'a t_ = "%identity"
6969-external prj2 : 'a t t -> 'a t_ t_ = "%identity"
212212+ let label, children =
213213+ match node with
214214+ | Pure _ -> ("Pure", [])
215215+ | Root t ->
216216+ let status = get_eval_status_str t.value in
217217+ (Printf.sprintf "Root\nstatus: %s" status, [ (obj_t t.child, "solid") ])
218218+ | Operator t ->
219219+ let status = get_eval_status_str t.value in
220220+ let desc_str, children =
221221+ match t.desc with
222222+ | Map (x, _) -> "Map", [ (obj_t x, "solid") ]
223223+ | Map2 (x, y, _) -> "Map2", [ (obj_t x, "solid"); (obj_t y, "solid") ]
224224+ | Pair (x, y) -> "Pair", [ (obj_t x, "solid"); (obj_t y, "solid") ]
225225+ | App (f, x) -> "App", [ (obj_t f, "solid"); (obj_t x, "solid") ]
226226+ | Join { child; intermediate } ->
227227+ let c = [ (obj_t child, "solid") ] in
228228+ let c =
229229+ match intermediate with
230230+ | None -> c
231231+ | Some i -> (obj_t i, "dotted") :: c
232232+ in
233233+ "Join", c
234234+ | Var v ->
235235+ (Printf.sprintf "Var\nhash: %d" (Hashtbl.hash v.binding), [])
236236+ | Prim _ -> "Prim", []
237237+ | Fix { doc; wrt } -> "Fix", [ (obj_t doc, "solid"); (obj_t wrt, "solid") ]
238238+ in
239239+ (Printf.sprintf "%s\nstatus: %s" desc_str status, children)
240240+ in
702417171-(* Basic combinators *)
7272-let return x = inj (Pure x)
7373-let pure x = inj (Pure x)
242242+ let label = Printf.sprintf "%s\n%s" node_id_str label in
243243+ Printf.bprintf buf " %s[\"%s\"];\n" node_id_str label;
742447575-let is_pure x = match prj x with
7676- | Pure x -> Some x
7777- | _ -> None
245245+ if !nodes_processed < max_nodes then (
246246+ List.iter (fun (child, style) ->
247247+ let child_id_str = get_id child in
248248+ let arrow = if style = "dotted" then "-.-> " else "-->" in
249249+ Printf.bprintf buf " %s %s %s;\n" node_id_str arrow child_id_str;
250250+ Queue.add child q
251251+ ) children
252252+ ) else if children <> [] then (
253253+ let ellipsis_id = node_id_str ^ "_ellipsis" in
254254+ Printf.bprintf buf " %s[\"...\"];\n" ellipsis_id;
255255+ Printf.bprintf buf " %s --> %s;\n" node_id_str ellipsis_id
256256+ );
257257+ );
258258+ process_queue ()
259259+ )
260260+ in
261261+ process_queue ();
262262+ Buffer.contents buf
782637979-let dummy = Pure (Any.any ())
264264+ let to_mermaid_trace (type a) ?(max_nodes=100) (start : a t_) : string =
265265+ let buf = Buffer.create 1024 in
266266+ Buffer.add_string buf "graph TD;\n";
267267+ let visited : (Any.t t_, unit) Hashtbl.t = Hashtbl.create 16 in
802688181-let operator desc =
8282- Operator { value = Eval_none; trace = T0; desc; trace_idx = I0 ;mutex= Mutex.create () }
269269+ let get_id : type a. a t_ -> string = fun node -> Format.asprintf "%a" pp_addr node in
832708484-let map x ~f = inj (
8585- match prj x with
8686- | Pure vx -> Pure (f vx)
8787- | x -> operator (Map (x, f))
8888- )
271271+ let get_eval_status_str (type a) (eval : a eval) : string =
272272+ match eval with
273273+ | Eval_none -> "None"
274274+ | Eval_progress -> "Progress"
275275+ | Eval_some _ -> "Some"
276276+ | Eval_invalid_next -> "Invalid_next"
277277+ in
892789090-let map2 x y ~f = inj (
9191- match prj x, prj y with
9292- | Pure vx, Pure vy -> Pure (f vx vy)
9393- | x, y -> operator (Map2 (x, y, f))
9494- )
279279+ (* Using breadth-first traversal following trace (parent) relationships *)
280280+ let q : Any.t t_ Queue.t = Queue.create () in
281281+ Queue.add (obj_t ( start)) q;
282282+ let nodes_processed = ref 0 in
95283284284+ let rec process_queue () =
285285+ if not (Queue.is_empty q) && !nodes_processed < max_nodes then (
286286+ let node = Queue.take q in
287287+ if not (Hashtbl.mem visited node) then (
288288+ Hashtbl.add visited node ();
289289+ incr nodes_processed;
290290+ let node_id_str = get_id node in
962919797-let pair x y = inj (
9898- match prj x, prj y with
9999- | Pure vx, Pure vy -> Pure (vx, vy)
100100- | x, y -> operator (Pair (x, y))
101101- )
292292+ let label, parents =
293293+ match node with
294294+ | Pure _ -> ("Pure", [])
295295+ | Root t ->
296296+ let status = get_eval_status_str t.value in
297297+ (Printf.sprintf "Root\nstatus: %s" status, [])
298298+ | Operator t ->
299299+ let status = get_eval_status_str t.value in
300300+ let desc_str =
301301+ match t.desc with
302302+ | Map (_, _) -> "Map"
303303+ | Map2 (_, _, _) -> "Map2"
304304+ | Pair (_, _) -> "Pair"
305305+ | App (_, _) -> "App"
306306+ | Join _ -> "Join"
307307+ | Var v -> Printf.sprintf "Var\nhash: %d" (Hashtbl.hash v.binding)
308308+ | Prim _ -> "Prim"
309309+ | Fix _ -> "Fix"
310310+ in
311311+ let parents =
312312+ match t.trace with
313313+ | T0 -> []
314314+ | T1 p1 -> [obj_t p1]
315315+ | T2 (p1, p2) -> [obj_t p1; obj_t p2]
316316+ | T3 (p1, p2, p3) -> [obj_t p1; obj_t p2; obj_t p3]
317317+ | T4 (p1, p2, p3, p4) -> [obj_t p1; obj_t p2; obj_t p3; obj_t p4]
318318+ | Tn t -> Array.to_list (Array.sub t.entries 0 t.active)
319319+ in
320320+ (Printf.sprintf "%s\nstatus: %s" desc_str status, parents)
321321+ in
102322103103-let app f x = inj (
104104- match prj f, prj x with
105105- | Pure vf, Pure vx -> Pure (vf vx)
106106- | f, x -> operator (App (f, x))
107107- )
323323+ let label = Printf.sprintf "%s\n%s" node_id_str label in
324324+ Printf.bprintf buf " %s[\"%s\"];\n" node_id_str label;
108325109109-let join child = inj (
110110- match prj2 child with
111111- | Pure v -> v
112112- | child -> operator (Join { child; intermediate = None })
113113- )
326326+ if !nodes_processed < max_nodes then (
327327+ List.iter (fun parent ->
328328+ let parent_id_str = get_id parent in
329329+ Printf.bprintf buf " %s --> %s;\n" parent_id_str node_id_str;
330330+ Queue.add parent q
331331+ ) parents
332332+ ) else if parents <> [] then (
333333+ let ellipsis_id = node_id_str ^ "_ellipsis" in
334334+ Printf.bprintf buf " %s[\"...\"];\n" ellipsis_id;
335335+ Printf.bprintf buf " %s --> %s;\n" ellipsis_id node_id_str
336336+ );
337337+ );
338338+ process_queue ()
339339+ )
340340+ in
341341+ process_queue ();
342342+ Buffer.contents buf
114343115115-let bind x ~f = join (map ~f x)
344344+ let to_mermaid_trace_idx (type a) ?(max_nodes=100) (start : a t_) : string =
345345+ let buf = Buffer.create 1024 in
346346+ Buffer.add_string buf "graph TD;\n";
347347+ let visited : (Any.t t_, unit) Hashtbl.t = Hashtbl.create 16 in
116348117117-(* Management of trace indices *)
349349+ let get_id : type a. a t_ -> string = fun node -> Format.asprintf "%a" pp_addr node in
118350119119-let addr oc obj =
120120- Printf.fprintf oc "0x%08x" (Obj.magic obj : int)
351351+ let get_eval_status_str (type a) (eval : a eval) : string =
352352+ match eval with
353353+ | Eval_none -> "None"
354354+ | Eval_progress -> "Progress"
355355+ | Eval_some _ -> "Some"
356356+ | Eval_invalid_next -> "Invalid_next"
357357+ in
121358122122-external t_equal : _ t_ -> _ t_ -> bool = "%eq"
123123-external obj_t : 'a t_ -> Any.t t_ = "%identity"
359359+ (* Using breadth-first traversal with a queue to prevent stack overflows on deep graphs. *)
360360+ let q : Any.t t_ Queue.t = Queue.create () in
361361+ Queue.add (obj_t (start)) q;
362362+ let nodes_processed = ref 0 in
124363125125-let rec dump_trace : type a. a t_ -> unit =
126126- fun obj -> match obj with
127127- | Pure _ -> Printf.eprintf "%a: Pure _\n%!" addr obj
128128- | Operator t ->
129129- Printf.eprintf "%a: Operator _ -> %a\n%!" addr obj dump_trace_aux t.trace;
130130- begin match t.trace with
131131- | T0 -> ()
132132- | T1 a -> dump_trace a
133133- | T2 (a,b) -> dump_trace a; dump_trace b
134134- | T3 (a,b,c) -> dump_trace a; dump_trace b; dump_trace c
135135- | T4 (a,b,c,d) -> dump_trace a; dump_trace b; dump_trace c; dump_trace d
136136- | Tn t -> Array.iter dump_trace t.entries
137137- end
138138- | Root _ -> Printf.eprintf "%a: Root _\n%!" addr obj
364364+ let rec process_queue () =
365365+ if not (Queue.is_empty q) && !nodes_processed < max_nodes then (
366366+ let node = Queue.take q in
367367+ if not (Hashtbl.mem visited node) then (
368368+ Hashtbl.add visited node ();
369369+ incr nodes_processed;
370370+ let node_id_str = get_id node in
139371140140-and dump_trace_aux oc = function
141141- | T0 -> Printf.fprintf oc "T0"
142142- | T1 a -> Printf.fprintf oc "T1 %a" addr a
143143- | T2 (a,b) ->
144144- Printf.fprintf oc "T2 (%a, %a)" addr a addr b
145145- | T3 (a,b,c) ->
146146- Printf.fprintf oc "T3 (%a, %a, %a)" addr a addr b addr c
147147- | T4 (a,b,c,d) ->
148148- Printf.fprintf oc "T4 (%a, %a, %a, %a)" addr a addr b addr c addr d
149149- | Tn t ->
150150- Printf.fprintf oc "Tn {active = %d; count = %d; entries = "
151151- t.active t.count;
152152- Array.iter (Printf.fprintf oc "(%a)" addr) t.entries;
153153- Printf.fprintf oc "}"
154154-155155-let dump_trace x = dump_trace (obj_t (prj x))
156156-157157-let add_idx obj idx = function
158158- | Pure _ -> assert false
159159- | Root t' -> t'.trace_idx <- I1 { idx; obj; next = t'.trace_idx }
160160- | Operator t' -> t'.trace_idx <- I1 { idx; obj; next = t'.trace_idx }
161161-162162-let rec rem_idx_rec obj = function
163163- | I0 -> assert false
164164- | I1 t as self ->
165165- if t_equal t.obj obj
166166- then (t.idx, t.next)
167167- else (
168168- let idx, result = rem_idx_rec obj t.next in
169169- t.next <- result;
170170- (idx, self)
171171- )
172172-173173-(* remove [obj] from the lwd's trace. *)
174174-let rem_idx obj = function
175175- | Pure _ -> assert false
176176- | Root t' ->
177177- let idx, trace_idx = rem_idx_rec obj t'.trace_idx in
178178- t'.trace_idx <- trace_idx; idx
179179- | Operator t' ->
180180- let idx, trace_idx = rem_idx_rec obj t'.trace_idx in
181181- t'.trace_idx <- trace_idx; idx
182182-183183-(* move [obj] from old index to new index. *)
184184-let rec mov_idx_rec obj oldidx newidx = function
185185- | I0 -> assert false
186186- | I1 t ->
187187- if t.idx = oldidx && t_equal t.obj obj
188188- then t.idx <- newidx
189189- else mov_idx_rec obj oldidx newidx t.next
190190-191191-let mov_idx obj oldidx newidx = function
192192- | Pure _ -> assert false
193193- | Root t' -> mov_idx_rec obj oldidx newidx t'.trace_idx
194194- | Operator t' -> mov_idx_rec obj oldidx newidx t'.trace_idx
195195-196196-let rec get_idx_rec obj = function
197197- | I0 -> assert false
198198- | I1 t ->
199199- if t_equal t.obj obj
200200- then t.idx
201201- else get_idx_rec obj t.next
372372+ let get_children_from_trace_idx trace_idx =
373373+ let rec aux acc = function
374374+ | I0 -> acc
375375+ | I1 { obj; next; _ } -> aux ((obj_t obj, "solid") :: acc) next
376376+ in
377377+ aux [] trace_idx
378378+ in
202379203203-(* find index of [obj] in the given lwd *)
204204-let get_idx obj = function
205205- | Pure _ -> assert false
206206- | Root t' -> get_idx_rec obj t'.trace_idx
207207- | Operator t' -> get_idx_rec obj t'.trace_idx
208208-209209-type status =
210210- | Neutral
211211- | Safe
212212- | Unsafe of (unit->unit) list ref
380380+ let label, children =
381381+ match node with
382382+ | Pure _ -> ("Pure", [])
383383+ | Root t ->
384384+ let status = get_eval_status_str t.value in
385385+ let children = get_children_from_trace_idx t.trace_idx in
386386+ (Printf.sprintf "Root\nstatus: %s" status, children)
387387+ | Operator t ->
388388+ let status = get_eval_status_str t.value in
389389+ let desc_str =
390390+ match t.desc with
391391+ | Map (_, _) -> "Map"
392392+ | Map2 (_, _, _) -> "Map2"
393393+ | Pair (_, _) -> "Pair"
394394+ | App (_, _) -> "App"
395395+ | Join _ -> "Join"
396396+ | Var v -> Printf.sprintf "Var\nhash: %d" (Hashtbl.hash v.binding)
397397+ | Prim _ -> "Prim"
398398+ | Fix _ -> "Fix"
399399+ in
400400+ let children = get_children_from_trace_idx t.trace_idx in
401401+ (Printf.sprintf "%s\nstatus: %s" desc_str status, children)
402402+ in
213403214214-(*
215215-Sensitivity is used to indicate to when reading a root node, that one of the child operater nodes was being evaluated.
216216-I think this is needed because the child cound have multiple roots and we need to indicate that to all of them
217217-*)
218218-type sensitivity =
219219- | Strong
220220- | Fragile
404404+ let label = Printf.sprintf "%s\n%s" node_id_str label in
405405+ Printf.bprintf buf " %s[\"%s\"];\n" node_id_str label;
221406222222-(* Propagating invalidation recursively.
223223- Each document is invalidated at most once,
224224- and only if it has [t.value = Some _]. *)
225225-let rec invalidate_node : type a . status ref -> sensitivity -> a t_ -> unit =
226226- (*sensitivity indicates that a parent is being evaluated*)
227227- fun status sensitivity node ->
228228- match node, sensitivity with
229229- | Pure _, _ -> assert false
230230- | Root ({value; on_invalidate; _} as t), _ ->
231231- (match value with
232232- | Eval_none | Eval_invalid_next -> ()
233233- | Eval_progress ->
234234- t.value <- Eval_invalid_next
235235- | Eval_some x ->
236236- t.value <- Eval_none;
237237- on_invalidate x
407407+ if !nodes_processed < max_nodes then (
408408+ List.iter (fun (child, style) ->
409409+ let child_id_str = get_id child in
410410+ let arrow = if style = "dotted" then "-.-> " else "-->" in
411411+ Printf.bprintf buf " %s %s %s;\n" node_id_str arrow child_id_str;
412412+ Queue.add child q
413413+ ) children
414414+ ) else if children <> [] then (
415415+ let ellipsis_id = node_id_str ^ "_ellipsis" in
416416+ Printf.bprintf buf " %s[\"...\"];\n" ellipsis_id;
417417+ Printf.bprintf buf " %s --> %s;\n" node_id_str ellipsis_id
418418+ );
419419+ );
420420+ process_queue ()
238421 )
239239- | Operator { value = Eval_none | Eval_invalid_next; _ }, _ -> ()
240240- | Operator { desc = Fix { wrt = Operator { value = Eval_none | Eval_invalid_next; _ }; _ }; _ }, Fragile ->
241241- (match !status with
242242- | Safe | Unsafe _ -> ()
243243- | Neutral -> status := Safe)
244244- | Operator { desc = Fix { wrt = Operator { value = Eval_some _; _ }; _ }; _ }, Fragile
245245- -> ()
246246- | Operator t, _ ->
247247- let sensitivity =
248248- match t.value with Eval_progress -> Fragile | _ -> sensitivity
249422 in
250250- t.value <- Eval_none;
251251- (* invalidate parents recursively *)
252252- invalidate_trace status sensitivity t.trace
423423+ process_queue ();
424424+ Buffer.contents buf
253425254254-(* invalidate recursively documents in the given trace *)
255255-and invalidate_trace status sensitivity = function
256256- | T0 -> ()
257257- | T1 x -> invalidate_node status sensitivity x
258258- | T2 (x, y) ->
259259- invalidate_node status sensitivity x;
260260- invalidate_node status sensitivity y
261261- | T3 (x, y, z) ->
262262- invalidate_node status sensitivity x;
263263- invalidate_node status sensitivity y;
264264- invalidate_node status sensitivity z
265265- | T4 (x, y, z, w) ->
266266- invalidate_node status sensitivity x;
267267- invalidate_node status sensitivity y;
268268- invalidate_node status sensitivity z;
269269- invalidate_node status sensitivity w
270270- | Tn t ->
271271- let active = t.active in
272272- t.active <- 0;
273273- for i = 0 to active - 1 do
274274- invalidate_node status sensitivity t.entries.(i)
275275- done
426426+ let add_idx obj idx = function
427427+ | Pure _ -> assert false
428428+ | Root t' -> t'.trace_idx <- I1 { idx; obj; next = t'.trace_idx }
429429+ | Operator t' -> t'.trace_idx <- I1 { idx; obj; next = t'.trace_idx }
430430+431431+ let rec rem_idx_rec obj = function
432432+ | I0 -> assert false
433433+ | I1 t as self ->
434434+ if t_equal t.obj obj
435435+ then (t.idx, t.next)
436436+ else (
437437+ let idx, result = rem_idx_rec obj t.next in
438438+ t.next <- result;
439439+ (idx, self)
440440+ )
441441+442442+ (* remove [obj] from the lwd's trace. *)
443443+ let rem_idx obj = function
444444+ | Pure _ -> assert false
445445+ | Root t' ->
446446+ let idx, trace_idx = rem_idx_rec obj t'.trace_idx in
447447+ t'.trace_idx <- trace_idx; idx
448448+ | Operator t' ->
449449+ let idx, trace_idx = rem_idx_rec obj t'.trace_idx in
450450+ t'.trace_idx <- trace_idx; idx
451451+452452+ (* move [obj] from old index to new index. *)
453453+ let rec mov_idx_rec obj oldidx newidx = function
454454+ | I0 -> assert false
455455+ | I1 t ->
456456+ if t.idx = oldidx && t_equal t.obj obj
457457+ then t.idx <- newidx
458458+ else mov_idx_rec obj oldidx newidx t.next
459459+460460+ let mov_idx obj oldidx newidx = function
461461+ | Pure _ -> assert false
462462+ | Root t' -> mov_idx_rec obj oldidx newidx t'.trace_idx
463463+ | Operator t' -> mov_idx_rec obj oldidx newidx t'.trace_idx
464464+465465+ let rec get_idx_rec obj = function
466466+ | I0 -> assert false
467467+ | I1 t ->
468468+ if t_equal t.obj obj
469469+ then t.idx
470470+ else get_idx_rec obj t.next
471471+472472+ (* find index of [obj] in the given lwd *)
473473+ let get_idx obj = function
474474+ | Pure _ -> assert false
475475+ | Root t' -> get_idx_rec obj t'.trace_idx
476476+ | Operator t' -> get_idx_rec obj t'.trace_idx
477477+478478+ type status =
479479+ | Neutral
480480+ | Safe
481481+ | Unsafe of (unit->unit) list ref
482482+483483+ (*
484484+ Sensitivity is used to indicate to when reading a root node, that one of the child operater nodes was being evaluated.
485485+ I think this is needed because the child cound have multiple roots and we need to indicate that to all of them
486486+ *)
487487+ type sensitivity =
488488+ | Strong
489489+ | Fragile
490490+491491+ let pp_sensitivity ppf = function
492492+ | Strong -> Format.fprintf ppf "Strong"
493493+ | Fragile -> Format.fprintf ppf "Fragile"
494494+495495+ (* Propagating invalidation recursively.
496496+ Each document is invalidated at most once,
497497+ and only if it has [t.value = Some _]. *)
498498+ let rec invalidate_node : type a . status ref -> sensitivity -> a t_ -> unit =
499499+ (*sensitivity indicates that a parent is being evaluated*)
500500+ fun status sensitivity node ->
276501277277-let default_unsafe_mutation_logger () =
278278- let callstack = Printexc.get_callstack 20 in
279279- Printf.fprintf stderr
280280- "Lwd: unsafe mutation (variable invalidated during evaluation) at\n%a"
281281- Printexc.print_raw_backtrace callstack
502502+ match node, sensitivity with
503503+ | Pure _, _ -> assert false
504504+ | Root ({value; on_invalidate; _} as t), _ ->
505505+ (match value with
506506+ | Eval_none | Eval_invalid_next -> ()
507507+ | Eval_progress ->
508508+ t.value <- Eval_invalid_next
509509+ | Eval_some x ->
510510+ t.value <- Eval_none;
511511+ on_invalidate x
512512+ )
513513+ | Operator { value = Eval_none | Eval_invalid_next; _ }, _ -> ()
514514+ | Operator { desc = Fix { wrt = Operator { value = Eval_none | Eval_invalid_next; _ }; _ }; _ }, Fragile ->
515515+ (match !status with
516516+ | Safe | Unsafe _ -> ()
517517+ | Neutral -> status := Safe)
518518+ | Operator { desc = Fix { wrt = Operator { value = Eval_some _; _ }; _ }; _ }, Fragile
519519+ -> ()
520520+ | Operator t, _ ->
521521+ let sensitivity =
522522+ match t.value with Eval_progress -> Fragile | _ -> sensitivity
523523+ in
524524+ t.value <- Eval_none;
525525+ (* invalidate parents recursively *)
526526+ invalidate_trace status sensitivity t.trace
527527+528528+ (* invalidate recursively documents in the given trace *)
529529+ and invalidate_trace status sensitivity = function
530530+ | T0 -> ()
531531+ | T1 x -> invalidate_node status sensitivity x
532532+ | T2 (x, y) ->
533533+ invalidate_node status sensitivity x;
534534+ invalidate_node status sensitivity y
535535+ | T3 (x, y, z) ->
536536+ invalidate_node status sensitivity x;
537537+ invalidate_node status sensitivity y;
538538+ invalidate_node status sensitivity z
539539+ | T4 (x, y, z, w) ->
540540+ invalidate_node status sensitivity x;
541541+ invalidate_node status sensitivity y;
542542+ invalidate_node status sensitivity z;
543543+ invalidate_node status sensitivity w
544544+ | Tn t ->
545545+ let active = t.active in
546546+ t.active <- 0;
547547+ for i = 0 to active - 1 do
548548+ invalidate_node status sensitivity t.entries.(i)
549549+ done
550550+551551+ let default_unsafe_mutation_logger () =
552552+ let callstack = Printexc.get_callstack 20 in
553553+ Printf.fprintf stderr
554554+ "Lwd: unsafe mutation (variable invalidated during evaluation) at\n%a"
555555+ Printexc.print_raw_backtrace callstack
556556+557557+ let unsafe_mutation_logger = ref default_unsafe_mutation_logger
558558+559559+560560+ let do_invalidate sensitivity (node : 'a t_) =
561561+ let status = ref Neutral in
562562+ invalidate_node status sensitivity node;
563563+ (* Variables *)
564564+ type 'a var = 'a t_
565565+ let var x = operator (Var {binding = x})
566566+ let get x = inj x
567567+568568+ let get_parents_from_trace (trace:trace) : (Any.t var) list =
569569+ match trace with
570570+ | T0 -> []
571571+ | T1 p1 -> [obj_t p1]
572572+ | T2 (p1, p2) -> [obj_t p1; obj_t p2]
573573+ | T3 (p1, p2, p3) -> [obj_t p1; obj_t p2; obj_t p3]
574574+ | T4 (p1, p2, p3, p4) -> [obj_t p1; obj_t p2; obj_t p3; obj_t p4]
575575+ | Tn t ->
576576+ let res = ref [] in
577577+ for i = t.active - 1 downto 0 do
578578+ res := t.entries.(i) :: !res
579579+ done;
580580+ !res
282581283283-let unsafe_mutation_logger = ref default_unsafe_mutation_logger
582582+ let set (vx:'a var) x : unit =
583583+ match vx with
584584+ | (Operator ({desc = Var v; _;} as inner )) ->
585585+ v.binding <- x;
586586+ inner.value <- Eval_some x;
587587+ (* [climb] traverses the graph upwards from a changed variable, invalidating
588588+ parent nodes. The `parents` list is a list of work to do, containing tuples
589589+ of `(node, has_seen_invalid_next)`.
284590591591+ The `has_seen_invalid_next` flag is crucial for correctness. It tracks
592592+ whether we have already encountered a node marked `Eval_invalid_next`
593593+ in the current upward traversal path.
285594286286-(**
287287-595595+ When a node is being evaluated (`Eval_progress`), and a variable it
596596+ depends on is `set`, the evaluation might be happening in a different
597597+ thread. If another invalidation has already marked a node higher up in
598598+ the chain as `Eval_invalid_next`, we don't want to also mark the
599599+ current `Eval_progress` node as `Eval_invalid_next`. The higher-up
600600+ invalidation will already cause a re-evaluation that will deal with
601601+ this node. Setting it to `Eval_invalid_next` here could lead to
602602+ incorrect state transitions.
288603289289- @param ~was_delayed: set to true if the function call was put on hold untill the current root had finished being evaluated
290290-*)
291291-let do_invalidate sensitivity (node : 'a t_) =
292292- let status = ref Neutral in
293293- invalidate_node status sensitivity node;
294294-(* Variables *)
295295-type 'a var = 'a t_
296296-let var x = operator (Var {binding = x;nextVal=None})
297297-let get x = inj x
298298-299299-let set (vx:'a var) x : unit =
300300- match vx with
301301- | (Operator ({desc = Var v; _} )as node) ->
302302- v.binding <- x;
303303- v.nextVal <- None;
304304- (let rec climb (parents: Any.t t_ list) =
305305- let new_parents : Any.t t_ list = List.fold_left (fun acc p ->
306306- match p with
307307- | Pure _ -> acc
308308- | Root r ->
309309- if Mutex.try_lock r.mutex then (
310310- (match r.value with
311311- | Eval_some v ->
312312- r.value <- Eval_none;
313313- r.on_invalidate v
314314- | Eval_none | Eval_invalid_next -> ()
315315- | Eval_progress -> r.value <- Eval_invalid_next
316316- );
317317- Mutex.unlock r.mutex;
318318- acc
319319- ) else (
320320- Mutex.protect r.mutex (fun () ->
321321- if r.value = Eval_progress then r.value <- Eval_invalid_next
604604+ Therefore, `has_seen_invalid_next` is propagated upwards. Once it's
605605+ `true` for a given path, it remains `true` for all ancestors in that
606606+ path. We only set a node from `Eval_progress` to `Eval_invalid_next`
607607+ if `has_seen_invalid_next` is `false`.
608608+ We should actually be able to stop propagating invalidation once we have seen an Eval_invalid_next node, but in reality things seem to be messier than that and so we keep going.
609609+ Instead i reset the seen_eval node flag if we ever encounter a node that is not Eval_progress.
610610+ *)
611611+ (let rec climb (parents: (Any.t t_ * bool) list) =
612612+ let new_parents : (Any.t t_ * bool) list = List.fold_left (fun acc (p, seen_eval_node) ->
613613+ match p with
614614+ | Pure _ -> acc
615615+ | Root r ->
616616+ if Mutex.try_lock r.mutex then (
617617+ (match r.value with
618618+ | Eval_some v ->
619619+ r.value <- Eval_none;
620620+ r.on_invalidate v
621621+ | Eval_none | Eval_invalid_next -> ()
622622+ | Eval_progress -> if not seen_eval_node then r.value <- Eval_invalid_next
322623 );
323323- acc
324324- )
325325- | Operator o ->
326326- if Mutex.try_lock o.mutex then (
327327- let continue =
328328- match o.value with
329329- | Eval_some _ -> o.value <- Eval_none; true
330330- | Eval_none | Eval_invalid_next -> false
331331- | Eval_progress -> o.value <- Eval_invalid_next; false
624624+ Mutex.unlock r.mutex;
625625+ acc
626626+ ) else (
627627+ (* if the root is currently being evaluated, we pro*)
628628+ (* Mutex.protect r.mutex (fun () -> *)
629629+ (* ); *)
630630+ (* ELI: try just skipping the lock and invalidating the root, it should be safe *)
631631+ if r.value = Eval_progress && not seen_eval_node then r.value <- Eval_invalid_next;
632632+ acc
633633+ )
634634+ | Operator o ->
635635+ let (continue, seen_eval_node) =
636636+ if Mutex.try_lock o.mutex then (
637637+ let current_value = o.value in
638638+ let continue,this_node_has_seen_invalid_next =
639639+ match current_value with
640640+ | Eval_some _ -> o.value <- Eval_none; true,false
641641+ (* This shouldn't be needed, but sometimes it is so we do it anyway*)
642642+ | Eval_none -> true,false
643643+ | Eval_invalid_next -> true,true
644644+ | Eval_progress -> (if not seen_eval_node then
645645+ o.value <- Eval_invalid_next); true,true
646646+ in
647647+ Mutex.unlock o.mutex;
648648+ (continue, this_node_has_seen_invalid_next )
649649+ ) else (
650650+ Mutex.protect o.mutex (fun () ->
651651+ if o.value = Eval_progress && not seen_eval_node then o.value <- Eval_invalid_next
652652+ );
653653+ (true, true)
654654+ )
332655 in
333333- Mutex.unlock o.mutex;
334656 if continue then (
335335- match o.trace with
336336- | T0 -> acc
337337- | T1 p1 -> obj_t p1 :: acc
338338- | T2 (p1, p2) -> obj_t p1 :: obj_t p2 :: acc
339339- | T3 (p1, p2, p3) -> obj_t p1 :: obj_t p2 :: obj_t p3 :: acc
340340- | T4 (p1, p2, p3, p4) -> obj_t p1 :: obj_t p2 :: obj_t p3 :: obj_t p4 :: acc
341341- | Tn t -> Array.to_list t.entries |> List.rev_append acc
657657+ let parents_of_o = get_parents_from_trace o.trace in
658658+ let new_acc_entries = List.map (fun p -> (obj_t p, seen_eval_node)) parents_of_o in
659659+ List.rev_append new_acc_entries acc
342660 ) else acc
343343- ) else (
344344- Mutex.protect o.mutex (fun () ->
345345- if o.value = Eval_progress then o.value <- Eval_invalid_next
346346- );
347347- acc
348348- )
349349- ) [] parents in
350350- if new_parents <> [] then climb new_parents
351351- in
352352- match node with
353353- | Operator o ->
354354- let initial_parents : Any.t t_ list =
355355- match o.trace with
356356- | T0 -> []
357357- | T1 p1 -> [obj_t p1]
358358- | T2 (p1, p2) -> [obj_t p1; obj_t p2]
359359- | T3 (p1, p2, p3) -> [obj_t p1; obj_t p2; obj_t p3]
360360- | T4 (p1, p2, p3, p4) -> [obj_t p1; obj_t p2; obj_t p3; obj_t p4]
361361- | Tn t -> Array.to_list t.entries
661661+ ) [] parents in
662662+ if new_parents <> [] then climb new_parents
663663+ in
664664+ let initial_parents = get_parents_from_trace inner.trace in
665665+ climb (List.map (fun p -> (obj_t p, false)) initial_parents)
666666+ )
667667+ | _ -> assert false
668668+669669+ let peek_stable = function
670670+ | Operator ({desc = Var v; _}) -> v.binding
671671+ | _ -> assert false
672672+673673+ let peek = function
674674+ | Operator ({desc = Var v; _}) -> v.binding
675675+ | _ -> assert false
676676+677677+ let update f v = set v (f (peek v))
678678+679679+ let may_update f v =
680680+ match f (peek v) with
681681+ | None -> ()
682682+ | Some x -> set v x
683683+684684+ (* Primitives *)
685685+ type 'a prim = 'a t
686686+ let prim ~acquire ~release =
687687+ inj (operator (Prim { acquire; release }))
688688+ let get_prim x = x
689689+690690+ let invalidate x = match prj x with
691691+ | Operator {desc = Prim p; value; _} as t ->
692692+ (* the value is invalidated, be sure to invalidate all parents as well *)
693693+ begin match value with
694694+ | Eval_none | Eval_invalid_next -> ()
695695+ | Eval_progress -> do_invalidate Fragile t;
696696+ | Eval_some v ->
697697+ do_invalidate Strong t;
698698+ p.release x v
699699+ end
700700+ | _ -> assert false
701701+702702+ (* Fix point *)
703703+704704+ let fix doc ~wrt = match prj wrt with
705705+ | Root _ -> assert false
706706+ | Pure _ -> doc
707707+ | Operator _ as wrt -> inj (operator (Fix {doc = prj doc; wrt}))
708708+709709+ type release_list =
710710+ | Release_done
711711+ | Release_more :
712712+ { origin : 'a t_; element : 'b t_; next : release_list } -> release_list
713713+714714+ type release_queue = release_list ref
715715+ let make_release_queue () = ref Release_done
716716+717717+ type release_failure = exn * Printexc.raw_backtrace
718718+719719+ (* [sub_release [] origin self] is called when [origin] is released,
720720+ where [origin] is reachable from [self]'s trace.
721721+ We're going to remove [origin] from that trace as [origin] is now dead.
722722+723723+ [sub_release] cannot raise.
724724+ If a primitive raises, the exception is caught and a warning is emitted. *)
725725+ let rec sub_release
726726+ : type a b . release_failure list -> a t_ -> b t_ -> release_failure list
727727+ = fun failures origin -> function
728728+ | Root _ -> assert false
729729+ | Pure _ -> failures
730730+ | Operator t as self ->
731731+ Mutex.protect t.mutex @@ fun () ->
732732+ (* compute [t.trace \ {origin}] *)
733733+ let trace = match t.trace with
734734+ | T0 -> assert false
735735+ | T1 x -> assert (t_equal x origin); T0
736736+ | T2 (x, y) ->
737737+ if t_equal x origin then T1 y
738738+ else if t_equal y origin then T1 x
739739+ else assert false
740740+ | T3 (x, y, z) ->
741741+ if t_equal x origin then T2 (y, z)
742742+ else if t_equal y origin then T2 (x, z)
743743+ else if t_equal z origin then T2 (x, y)
744744+ else assert false
745745+ | T4 (x, y, z, w) ->
746746+ if t_equal x origin then T3 (y, z, w)
747747+ else if t_equal y origin then T3 (x, z, w)
748748+ else if t_equal z origin then T3 (x, y, w)
749749+ else if t_equal w origin then T3 (x, y, z)
750750+ else assert false
751751+ | Tn tn as trace ->
752752+ let revidx = rem_idx self origin in
753753+ assert (t_equal tn.entries.(revidx) origin);
754754+ let count = tn.count - 1 in
755755+ tn.count <- count;
756756+ if revidx < count then (
757757+ let obj = tn.entries.(count) in
758758+ tn.entries.(revidx) <- obj;
759759+ tn.entries.(count) <- dummy;
760760+ mov_idx self count revidx obj
761761+ ) else
762762+ tn.entries.(revidx) <- dummy;
763763+ if tn.active > count then tn.active <- count;
764764+ if count = 4 then (
765765+ (* downgrade to [T4] to save space *)
766766+ let a = tn.entries.(0) and b = tn.entries.(1) in
767767+ let c = tn.entries.(2) and d = tn.entries.(3) in
768768+ ignore (rem_idx self a : int);
769769+ ignore (rem_idx self b : int);
770770+ ignore (rem_idx self c : int);
771771+ ignore (rem_idx self d : int);
772772+ T4 (a, b, c, d)
773773+ ) else (
774774+ let len = Array.length tn.entries in
775775+ if count <= len lsr 2 then
776776+ Tn { active = tn.active; count = tn.count;
777777+ entries = Array.sub tn.entries 0 (len lsr 1) }
778778+ else
779779+ trace
780780+ )
362781 in
363363- climb initial_parents
782782+ t.trace <- trace;
783783+ match trace with
784784+ | T0 ->
785785+ (* [self] is not active anymore, since it's not reachable
786786+ from any root. We can release its cached value and
787787+ recursively release its subtree. *)
788788+ let value = t.value in
789789+ t.value <- Eval_none;
790790+ begin match t.desc with
791791+ | Map (x, _) -> sub_release failures self x
792792+ | Map2 (x, y, _) ->
793793+ sub_release (sub_release failures self x) self y
794794+ | Pair (x, y) ->
795795+ sub_release (sub_release failures self x) self y
796796+ | App (x, y) ->
797797+ sub_release (sub_release failures self x) self y
798798+ | Join ({ child; intermediate } as t) ->
799799+ let failures = sub_release failures self child in
800800+ begin match intermediate with
801801+ | None -> failures
802802+ | Some child' ->
803803+ t.intermediate <- None;
804804+ sub_release failures self child'
805805+ end
806806+ | Var _ -> failures
807807+ | Fix {doc; wrt} ->
808808+ sub_release (sub_release failures self wrt) self doc
809809+ | Prim t ->
810810+ begin match value with
811811+ | Eval_none | Eval_invalid_next | Eval_progress -> failures
812812+ | Eval_some x ->
813813+ begin match t.release (inj self) x with
814814+ | () -> failures
815815+ | exception exn ->
816816+ let bt = Printexc.get_raw_backtrace () in
817817+ (exn, bt) :: failures
818818+ end
819819+ end
820820+ end
821821+ | _ -> failures
822822+823823+ (* [sub_acquire] cannot raise *)
824824+ let rec sub_acquire : type a b . a t_ -> b t_ -> unit = fun origin ->
825825+ function
826826+ | Root _ -> assert false
827827+ | Pure _ -> ()
828828+ | Operator t as self ->
829829+ (*lock the mutex, because we are making changes within this node *)
830830+831831+ Mutex.protect t.mutex @@ fun _->
832832+ (* [acquire] is true if this is the first time this operator
833833+ is used, in which case we need to acquire its children *)
834834+ let acquire = match t.trace with T0 -> true | _ -> false in
835835+ let trace = match t.trace with
836836+ | T0 -> T1 origin
837837+ | T1 x -> T2 (origin, x)
838838+ | T2 (x, y) -> T3 (origin, x, y)
839839+ | T3 (x, y, z) -> T4 (origin, x, y, z)
840840+ | T4 (x, y, z, w) ->
841841+ let obj_origin = obj_t origin in
842842+ let entries =
843843+ [| obj_t x; obj_t y; obj_t z; obj_t w; obj_t origin; dummy; dummy; dummy |]
844844+ in
845845+ for i = 0 to 4 do add_idx self i entries.(i) done;
846846+ Tn { active = 5; count = 5; entries }
847847+ | Tn tn as trace ->
848848+ let index = tn.count in
849849+ let entries, trace =
850850+ (* possibly resize array [entries] *)
851851+ if index < Array.length tn.entries then (
852852+ tn.count <- tn.count + 1;
853853+ (tn.entries, trace)
854854+ ) else (
855855+ let entries = Array.make (index * 2) dummy in
856856+ Array.blit tn.entries 0 entries 0 index;
857857+ (entries, Tn { active = tn.active; count = index + 1; entries })
858858+ )
859859+ in
860860+ let obj_origin = obj_t origin in
861861+ entries.(index) <- obj_origin;
862862+ add_idx self index obj_origin;
863863+ trace
864864+ in
865865+ t.trace <- trace;
866866+ if acquire then (
867867+ (* acquire immediate children, and so on recursively *)
868868+ match t.desc with
869869+ | Map (x, _) -> sub_acquire self x
870870+ | Map2 (x, y, _) ->
871871+ sub_acquire self x;
872872+ sub_acquire self y
873873+ | Pair (x, y) ->
874874+ sub_acquire self x;
875875+ sub_acquire self y
876876+ | App (x, y) ->
877877+ sub_acquire self x;
878878+ sub_acquire self y
879879+ | Fix {doc; wrt} ->
880880+ sub_acquire self doc;
881881+ sub_acquire self wrt
882882+ | Join { child; intermediate } ->
883883+ sub_acquire self child;
884884+ begin match intermediate with
885885+ | None -> ()
886886+ | Some _ ->
887887+ assert false (* this can't initialized already, first-time acquire *)
888888+ end
889889+ | Var _ -> ()
890890+ | Prim _ -> ()
891891+ )
892892+893893+ (* make sure that [origin] is in [self.trace], passed as last arg. *)
894894+ let activate_tracing self origin = function
895895+ | Tn tn ->
896896+ let idx = get_idx self origin in (* index of [self] in [origin.trace_idx] *)
897897+ let active = tn.active in
898898+ (* [idx < active] means [self] is already traced by [origin].
899899+ We only have to add [self] to the entries if [idx >= active]. *)
900900+ if idx >= active then (
901901+ tn.active <- active + 1;
902902+ );
903903+ if idx > active then (
904904+ (* swap with last entry in [tn.entries] *)
905905+ let old = tn.entries.(active) in
906906+ tn.entries.(idx) <- old;
907907+ tn.entries.(active) <- obj_t origin;
908908+ mov_idx self active idx old;
909909+ mov_idx self idx active origin
910910+ )
364911 | _ -> ()
365365- )
366366- | _ -> assert false
912912+913913+ let sub_is_damaged = function
914914+ | Root _ -> assert false
915915+ | Pure _ -> false
916916+ | Operator {value; _} ->
917917+ match value with
918918+ | Eval_none | Eval_invalid_next -> true
919919+ | Eval_some _ -> false
920920+ | Eval_progress -> assert false
921921+922922+ (* [sub_sample origin self] computes a value for [self].
923923+924924+ [sub_sample] raise if any user-provided computation raises.
925925+ Graph will be left in a coherent state but exception will be propagated
926926+ to the observer. *)
927927+ let rec sub_sample queue =
928928+ let rec aux : type a b . a t_ -> b t_ -> b = fun origin ->
929929+ function
930930+ | Root _ -> assert false
931931+ | Pure x -> x
932932+ | Operator t as self ->
933933+ (* lock the mutex, examine cached value *)
367934368368-let peek_stable = function
369369- | Operator ({desc = Var v; _}) -> v.binding
370370- | _ -> assert false
935935+ Mutex.lock t.mutex;
371936372372-let peek = function
373373- | Operator ({desc = Var v; _}) -> v.nextVal |>Option.value ~default: v.binding
374374- | _ -> assert false
937937+ match t.value with
938938+ | Eval_some value ->
939939+ Mutex.unlock t.mutex;
940940+ activate_tracing self origin t.trace;
941941+ value
942942+ | Eval_none ->
943943+ t.value <- Eval_progress;
944944+ Mutex.unlock t.mutex;
945945+946946+ (* compute value without holding the lock *)
375947376376-let update f v = set v (f (peek v))
948948+ let result : b =
949949+ match t.desc with
950950+ | Map (x, f) -> f (aux self x)
951951+ | Map2 (x, y, f) -> f (aux self x) (aux self y)
952952+ | Pair (x, y) -> (aux self x, aux self y)
953953+ | App (f, x) -> (aux self f) (aux self x)
954954+ | Fix { doc; wrt } ->
955955+ let _ = aux self wrt in
956956+ let result = aux self doc in
957957+ if sub_is_damaged wrt then aux origin self
958958+ else (
959959+ if sub_is_damaged doc then do_invalidate Fragile self;
960960+ result)
961961+ | Join x ->
377962378378-let may_update f v =
379379- match f (peek v) with
380380- | None -> ()
381381- | Some x -> set v x
963963+ let intermediate =
964964+ (* We haven't touched any state yet,
965965+ it is safe for [aux] to raise *)
966966+ aux self x.child
967967+ in
968968+ begin
969969+ match x.intermediate with
970970+ | None ->
971971+ x.intermediate <- Some intermediate;
382972383383-(* Primitives *)
384384-type 'a prim = 'a t
385385-let prim ~acquire ~release =
386386- inj (operator (Prim { acquire; release }))
387387-let get_prim x = x
973973+ sub_acquire self intermediate
974974+ | Some x' when x' != intermediate ->
975975+ queue :=
976976+ Release_more
977977+ { origin = self; element = x'; next = !queue };
978978+ x.intermediate <- Some intermediate;
388979389389-let invalidate x = match prj x with
390390- | Operator {desc = Prim p; value; _} as t ->
391391- (* the value is invalidated, be sure to invalidate all parents as well *)
392392- begin match value with
393393- | Eval_none | Eval_invalid_next -> ()
394394- | Eval_progress -> do_invalidate Fragile t;
395395- | Eval_some v ->
396396- do_invalidate Strong t;
397397- p.release x v
398398- end
399399- | _ -> assert false
980980+ sub_acquire self intermediate
981981+ | Some _ -> ()
982982+ end;
983983+ (*print mermaid*)
400984401401-(* Fix point *)
985985+ let mermaid=to_mermaid_trace_idx ~max_nodes:200 ( intermediate) in
986986+ (* let mermaid_2=to_mermaid ~max_nodes:200 ( intermediate) in *)
987987+ let mermaid_3=to_mermaid_trace_idx ~max_nodes:200 ( intermediate) in
402988403403-let fix doc ~wrt = match prj wrt with
404404- | Root _ -> assert false
405405- | Pure _ -> doc
406406- | Operator _ as wrt -> inj (operator (Fix {doc = prj doc; wrt}))
407989408408-type release_list =
409409- | Release_done
410410- | Release_more :
411411- { origin : 'a t_; element : 'b t_; next : release_list } -> release_list
412990413413-type release_queue = release_list ref
414414-let make_release_queue () = ref Release_done
991991+ aux self intermediate
992992+ | Var x ->
415993416416-type release_failure = exn * Printexc.raw_backtrace
994994+ x.binding
995995+ | Prim t -> t.acquire (inj self)
996996+ in
997997+998998+ (* lock again and finalize *)
417999418418-(* [sub_release [] origin self] is called when [origin] is released,
419419- where [origin] is reachable from [self]'s trace.
420420- We're going to remove [origin] from that trace as [origin] is now dead.
10001000+ Mutex.lock t.mutex;
10011001+ begin
10021002+ match t.value with
10031003+ | Eval_progress -> t.value <- Eval_some result
10041004+ | Eval_invalid_next -> t.value <- Eval_none
10051005+ | Eval_none | Eval_some _ -> ()
10061006+ end;
10071007+ Mutex.unlock t.mutex;
4211008422422- [sub_release] cannot raise.
423423- If a primitive raises, the exception is caught and a warning is emitted. *)
424424-let rec sub_release
425425- : type a b . release_failure list -> a t_ -> b t_ -> release_failure list
426426- = fun failures origin -> function
427427- | Root _ -> assert false
428428- | Pure _ -> failures
429429- | Operator t as self ->
430430- (* compute [t.trace \ {origin}] *)
431431- let trace = match t.trace with
432432- | T0 -> assert false
433433- | T1 x -> assert (t_equal x origin); T0
434434- | T2 (x, y) ->
435435- if t_equal x origin then T1 y
436436- else if t_equal y origin then T1 x
437437- else assert false
438438- | T3 (x, y, z) ->
439439- if t_equal x origin then T2 (y, z)
440440- else if t_equal y origin then T2 (x, z)
441441- else if t_equal z origin then T2 (x, y)
442442- else assert false
443443- | T4 (x, y, z, w) ->
444444- if t_equal x origin then T3 (y, z, w)
445445- else if t_equal y origin then T3 (x, z, w)
446446- else if t_equal z origin then T3 (x, y, w)
447447- else if t_equal w origin then T3 (x, y, z)
448448- else assert false
449449- | Tn tn as trace ->
450450- let revidx = rem_idx self origin in
451451- assert (t_equal tn.entries.(revidx) origin);
452452- let count = tn.count - 1 in
453453- tn.count <- count;
454454- if revidx < count then (
455455- let obj = tn.entries.(count) in
456456- tn.entries.(revidx) <- obj;
457457- tn.entries.(count) <- dummy;
458458- mov_idx self count revidx obj
459459- ) else
460460- tn.entries.(revidx) <- dummy;
461461- if tn.active > count then tn.active <- count;
462462- if count = 4 then (
463463- (* downgrade to [T4] to save space *)
464464- let a = tn.entries.(0) and b = tn.entries.(1) in
465465- let c = tn.entries.(2) and d = tn.entries.(3) in
466466- ignore (rem_idx self a : int);
467467- ignore (rem_idx self b : int);
468468- ignore (rem_idx self c : int);
469469- ignore (rem_idx self d : int);
470470- T4 (a, b, c, d)
471471- ) else (
472472- let len = Array.length tn.entries in
473473- if count <= len lsr 2 then
474474- Tn { active = tn.active; count = tn.count;
475475- entries = Array.sub tn.entries 0 (len lsr 1) }
476476- else
477477- trace
478478- )
479479- in
480480- t.trace <- trace;
481481- match trace with
482482- | T0 ->
483483- (* [self] is not active anymore, since it's not reachable
484484- from any root. We can release its cached value and
485485- recursively release its subtree. *)
486486- let value = t.value in
487487- t.value <- Eval_progress;
488488- begin match t.desc with
489489- | Map (x, _) -> sub_release failures self x
490490- | Map2 (x, y, _) ->
491491- sub_release (sub_release failures self x) self y
492492- | Pair (x, y) ->
493493- sub_release (sub_release failures self x) self y
494494- | App (x, y) ->
495495- sub_release (sub_release failures self x) self y
496496- | Join ({ child; intermediate } as t) ->
497497- let failures = sub_release failures self child in
498498- begin match intermediate with
499499- | None -> failures
500500- | Some child' ->
501501- t.intermediate <- None;
502502- sub_release failures self child'
503503- end
504504- | Var _ -> failures
505505- | Fix {doc; wrt} ->
506506- sub_release (sub_release failures self wrt) self doc
507507- | Prim t ->
508508- begin match value with
509509- | Eval_none | Eval_invalid_next | Eval_progress -> failures
510510- | Eval_some x ->
511511- begin match t.release (inj self) x with
512512- | () -> failures
513513- | exception exn ->
514514- let bt = Printexc.get_raw_backtrace () in
515515- (exn, bt) :: failures
516516- end
517517- end
518518- end
519519- | _ -> failures
10091009+5201010521521-(* [sub_acquire] cannot raise *)
522522-let rec sub_acquire : type a b . a t_ -> b t_ -> unit = fun origin ->
523523- function
524524- | Root _ -> assert false
525525- | Pure _ -> ()
526526- | Operator t as self ->
527527- (*lock the mutex, because we are making changes within this node *)
528528- Mutex.protect t.mutex @@ fun _->
529529- (* [acquire] is true if this is the first time this operator
530530- is used, in which case we need to acquire its children *)
531531- let acquire = match t.trace with T0 -> true | _ -> false in
532532- let trace = match t.trace with
533533- | T0 -> T1 origin
534534- | T1 x -> T2 (origin, x)
535535- | T2 (x, y) -> T3 (origin, x, y)
536536- | T3 (x, y, z) -> T4 (origin, x, y, z)
537537- | T4 (x, y, z, w) ->
538538- let obj_origin = obj_t origin in
539539- let entries =
540540- [| obj_t x; obj_t y; obj_t z; obj_t w; obj_origin; dummy; dummy; dummy |]
541541- in
542542- for i = 0 to 4 do add_idx self i entries.(i) done;
543543- Tn { active = 5; count = 5; entries }
544544- | Tn tn as trace ->
545545- let index = tn.count in
546546- let entries, trace =
547547- (* possibly resize array [entries] *)
548548- if index < Array.length tn.entries then (
549549- tn.count <- tn.count + 1;
550550- (tn.entries, trace)
551551- ) else (
552552- let entries = Array.make (index * 2) dummy in
553553- Array.blit tn.entries 0 entries 0 index;
554554- (entries, Tn { active = tn.active; count = index + 1; entries })
555555- )
556556- in
557557- let obj_origin = obj_t origin in
558558- entries.(index) <- obj_origin;
559559- add_idx self index obj_origin;
560560- trace
561561- in
562562- t.trace <- trace;
563563- if acquire then (
564564- (* acquire immediate children, and so on recursively *)
565565- match t.desc with
566566- | Map (x, _) -> sub_acquire self x
567567- | Map2 (x, y, _) ->
568568- sub_acquire self x;
569569- sub_acquire self y
570570- | Pair (x, y) ->
571571- sub_acquire self x;
572572- sub_acquire self y
573573- | App (x, y) ->
574574- sub_acquire self x;
575575- sub_acquire self y
576576- | Fix {doc; wrt} ->
577577- sub_acquire self doc;
578578- sub_acquire self wrt
579579- | Join { child; intermediate } ->
580580- sub_acquire self child;
581581- begin match intermediate with
582582- | None -> ()
583583- | Some _ ->
584584- assert false (* this can't initialized already, first-time acquire *)
585585- end
586586- | Var _ -> ()
587587- | Prim _ -> ()
588588- )
10111011+ (* Re-evaluate if the node was invalidated during computation *)
10121012+ if t.value == Eval_none then (
5891013590590-(* make sure that [origin] is in [self.trace], passed as last arg. *)
591591-let activate_tracing self origin = function
592592- | Tn tn ->
593593- let idx = get_idx self origin in (* index of [self] in [origin.trace_idx] *)
594594- let active = tn.active in
595595- (* [idx < active] means [self] is already traced by [origin].
596596- We only have to add [self] to the entries if [idx >= active]. *)
597597- if idx >= active then (
598598- tn.active <- active + 1;
599599- );
600600- if idx > active then (
601601- (* swap with last entry in [tn.entries] *)
602602- let old = tn.entries.(active) in
603603- tn.entries.(idx) <- old;
604604- tn.entries.(active) <- obj_t origin;
605605- mov_idx self active idx old;
606606- mov_idx self idx active origin
607607- )
608608- | _ -> ()
10141014+ aux origin self
10151015+ ) else (
6091016610610-let sub_is_damaged = function
611611- | Root _ -> assert false
612612- | Pure _ -> false
613613- | Operator {value; _} ->
614614- match value with
615615- | Eval_none | Eval_invalid_next -> true
616616- | Eval_some _ -> false
617617- | Eval_progress -> assert false
10171017+ (* [self] just became active, so it may invalidate [origin] in case its
10181018+ value changes because of [t.desc], like if it's a variable and gets
10191019+ mutated, or if it's a primitive that gets invalidated.
10201020+ We need to put [origin] into [self.trace] in case it isn't there yet. *)
10211021+ activate_tracing self origin t.trace;
10221022+ result)
10231023+ | Eval_progress | Eval_invalid_next ->
10241024+ Mutex.unlock t.mutex;
6181025619619-(* [sub_sample origin self] computes a value for [self].
10261026+ (* spin and retry *)
10271027+ let rec spin () =
10281028+ match t.value with
10291029+ | Eval_progress | Eval_invalid_next ->
10301030+ Domain.cpu_relax ();
10311031+ spin ()
10321032+ | Eval_none | Eval_some _ -> ()
10331033+ in
10341034+ spin ();
6201035621621- [sub_sample] raise if any user-provided computation raises.
622622- Graph will be left in a coherent state but exception will be propagated
623623- to the observer. *)
624624-let sub_sample queue =
625625- let rec aux : type a b . a t_ -> b t_ -> b = fun origin ->
626626- function
627627- | Root _ -> assert false
628628- | Pure x -> x
629629- | Operator t as self ->
630630- (* lock the mutex, examine cached value *)
631631- Mutex.lock t.mutex;
632632- match t.value with
633633- | Eval_some value ->
634634- Mutex.unlock t.mutex;
635635- activate_tracing self origin t.trace;
636636- value
637637- | Eval_none ->
638638- t.value <- Eval_progress;
639639- Mutex.unlock t.mutex;
10361036+ aux origin self
10371037+ in
10381038+ aux
10391039+10401040+ type 'a root = 'a t
10411041+10421042+ let observe ?(on_invalidate = ignore) child : _ root =
10431043+ let root =
10441044+ Root
10451045+ { child = prj child
10461046+ ; value = Eval_none
10471047+ ; on_invalidate
10481048+ ; trace_idx = I0
10491049+ ; acquired = false
10501050+ ; mutex= Mutex.create()
10511051+ }
10521052+ in
10531053+ inj root
10541054+10551055+ exception Release_failure of exn option * release_failure list
10561056+10571057+ let raw_flush_release_queue queue =
10581058+ let rec aux failures = function
10591059+ | Release_done -> failures
10601060+ | Release_more t ->
6401061641641- (* compute value without holding the lock *)
642642- let result : b =
643643- match t.desc with
644644- | Map (x, f) -> f (aux self x)
645645- | Map2 (x, y, f) -> f (aux self x) (aux self y)
646646- | Pair (x, y) -> (aux self x, aux self y)
647647- | App (f, x) -> (aux self f) (aux self x)
648648- | Fix { doc; wrt } ->
649649- let _ = aux self wrt in
650650- let result = aux self doc in
651651- if sub_is_damaged wrt then aux origin self
652652- else (
653653- if sub_is_damaged doc then do_invalidate Fragile self;
654654- result)
655655- | Join x ->
656656- let intermediate =
657657- (* We haven't touched any state yet,
658658- it is safe for [aux] to raise *)
659659- aux self x.child
660660- in
661661- begin
662662- match x.intermediate with
663663- | None ->
664664- x.intermediate <- Some intermediate;
665665- sub_acquire self intermediate
666666- | Some x' when x' != intermediate ->
667667- queue :=
668668- Release_more
669669- { origin = self; element = x'; next = !queue };
670670- x.intermediate <- Some intermediate;
671671- sub_acquire self intermediate
672672- | Some _ -> ()
673673- end;
674674- aux self intermediate
675675- | Var x -> x.binding
676676- | Prim t -> t.acquire (inj self)
677677- in
10621062+ let failures = sub_release failures t.origin t.element in
10631063+ let return = aux failures t.next in
6781064679679- (* lock again and finalize *)
680680- Mutex.lock t.mutex;
681681- begin
682682- match t.value with
683683- | Eval_progress -> t.value <- Eval_some result
684684- | Eval_invalid_next -> t.value <- Eval_none
685685- | Eval_none | Eval_some _ -> ()
686686- end;
687687- Mutex.unlock t.mutex;
10651065+ return
10661066+ in
10671067+ aux [] queue
10681068+10691069+ let flush_release_queue queue =
10701070+ let queue' = !queue in
10711071+ queue := Release_done;
10721072+ raw_flush_release_queue queue'
10731073+10741074+ let sample queue x = match prj x with
10751075+ | Pure _ | Operator _ -> assert false
10761076+ | Root t as self ->
6881077689689- (* Re-evaluate if the node was invalidated during computation *)
690690- if t.value == Eval_none then aux origin self
691691- else (
692692- (* [self] just became active, so it may invalidate [origin] in case its
693693- value changes because of [t.desc], like if it's a variable and gets
694694- mutated, or if it's a primitive that gets invalidated.
695695- We need to put [origin] into [self.trace] in case it isn't there yet. *)
696696- activate_tracing self origin t.trace;
697697- result)
698698- | Eval_progress | Eval_invalid_next ->
699699- Mutex.unlock t.mutex;
700700- (* spin and retry *)
701701- let rec spin () =
702702- match t.value with
703703- | Eval_progress | Eval_invalid_next ->
704704- Domain.cpu_relax ();
705705- spin ()
706706- | Eval_none | Eval_some _ -> ()
707707- in
708708- spin ();
709709- aux origin self
710710- in
711711- aux
10781078+ (* debug log the whole tree *)
7121079713713-type 'a root = 'a t
10801080+ let mermaid=to_mermaid_trace_idx ~max_nodes:200 ( t.child) in
10811081+ (* m "sample: graph: \n %s" mermaid); *)
10821082+7141083715715-let observe ?(on_invalidate = ignore) child : _ root =
716716- let root =
717717- Root
718718- { child = prj child
719719- ; value = Eval_none
720720- ; on_invalidate
721721- ; trace_idx = I0
722722- ; acquired = false
723723- ; mutex= Mutex.create()
724724- }
725725- in
726726- inj root
7271084728728-exception Release_failure of exn option * release_failure list
10851085+ (*lock the root mutex while sampling*)
7291086730730-let raw_flush_release_queue queue =
731731- let rec aux failures = function
732732- | Release_done -> failures
733733- | Release_more t ->
734734- let failures = sub_release failures t.origin t.element in
735735- aux failures t.next
736736- in
737737- aux [] queue
10871087+ let a=Mutex.protect t.mutex @@ fun _->
10881088+ match t.value with
10891089+ | Eval_some value -> value
10901090+ | _ ->
10911091+ (
10921092+ (* no cached value, compute it now *)
10931093+ if not t.acquired then (
7381094739739-let flush_release_queue queue =
740740- let queue' = !queue in
741741- queue := Release_done;
742742- raw_flush_release_queue queue'
10951095+ t.acquired <- true;
10961096+ let res = sub_acquire self t.child in
7431097744744-let sample queue x = match prj x with
745745- | Pure _ | Operator _ -> assert false
746746- | Root t as self ->
747747- (*lock the root mutex while sampling*)
748748- Mutex.protect t.mutex @@ fun _->
749749- match t.value with
750750- | Eval_some value -> value
751751- | _ ->
752752- (
753753- (* no cached value, compute it now *)
754754- if not t.acquired then (
755755- t.acquired <- true;
756756- sub_acquire self t.child;
757757- );
758758- t.value <- Eval_progress;
759759- let value = sub_sample queue self t.child in
760760- begin match t.value with
761761- | Eval_progress -> t.value <- Eval_some value; (* cache value *)
762762- | Eval_none | Eval_some _ | Eval_invalid_next -> ()
763763- end;
764764- value
765765- )
10981098+ res
10991099+ );
11001100+11011101+ t.value <- Eval_progress;
11021102+ let value = sub_sample queue self t.child in
11031103+ begin match t.value with
11041104+ | Eval_progress ->
7661105767767-let is_damaged x =
768768- match prj x with
769769- | Pure _ | Operator _ -> assert false
770770- | Root {value;_}->
771771- (* NOTE: I don't think i need a mutex here*)
772772- (match value with
773773- | Eval_some _ -> false
774774- | Eval_none | Eval_progress | Eval_invalid_next -> true
11061106+ t.value <- Eval_some value; (* cache value *)
11071107+ | Eval_none | Eval_some _ | Eval_invalid_next -> ()
11081108+ end;
11091109+ value
7751110 )
11111111+ in
7761112777777-let release queue x = match prj x with
778778- | Pure _ | Operator _ -> assert false
779779- | Root t as self ->
780780- Mutex.protect t.mutex @@ fun _->
781781- if t.acquired then (
782782- (* release subtree, remove cached value *)
783783- t.value <- Eval_none;
784784- t.acquired <- false;
785785- queue := Release_more { origin = self; element = t.child; next = !queue }
786786- )
787787-788788-let set_on_invalidate x f =
789789- match prj x with
790790- | Pure _ | Operator _ -> assert false
791791- | Root t ->
792792- t.on_invalidate <- f
793793-794794-let flush_or_fail main_exn queue =
795795- match flush_release_queue queue with
796796- | [] -> ()
797797- | failures -> raise (Release_failure (main_exn, failures))
798798-799799-let quick_sample root =
800800- let queue = ref Release_done in
801801- match sample queue root with
802802- | result -> flush_or_fail None queue; result
803803- | exception exn -> flush_or_fail (Some exn) queue; raise exn
804804-805805-let quick_release root =
806806- let queue = ref Release_done in
807807- release queue root;
808808- flush_or_fail None queue
809809-810810-module Infix = struct
811811- let (>>=) x f = bind x ~f
812812- let (>|=) x f = map x ~f
813813- let (<*>) = app
814814-end
815815-816816-(*$R
817817- let x = var 0 in
818818- let y = map ~f:succ (get x) in
819819- let o_y = Lwd.observe y in
820820- assert_equal 1 (quick_sample o_y);
821821- set x 10;
822822- assert_equal 11 (quick_sample o_y);
823823- *)
824824-end11131113+ a
11141114+11151115+ let is_damaged x =
11161116+ match prj x with
11171117+ | Pure _ | Operator _ -> assert false
11181118+ | Root {value;_}->
11191119+ (* NOTE: I don't think i need a mutex here*)
11201120+ (match value with
11211121+ | Eval_some _ -> false
11221122+ | Eval_none | Eval_progress | Eval_invalid_next -> true
11231123+ )
11241124+11251125+ let release queue x = match prj x with
11261126+ | Pure _ | Operator _ -> assert false
11271127+ | Root t as self ->
11281128+ Mutex.protect t.mutex @@ fun _->
11291129+ if t.acquired then (
11301130+ (* release subtree, remove cached value *)
11311131+ t.value <- Eval_none;
11321132+ t.acquired <- false;
11331133+ queue := Release_more { origin = self; element = t.child; next = !queue }
11341134+ )
11351135+11361136+ let set_on_invalidate x f =
11371137+ match prj x with
11381138+ | Pure _ | Operator _ -> assert false
11391139+ | Root t ->
11401140+ t.on_invalidate <- f
11411141+11421142+ let flush_or_fail main_exn queue =
11431143+ match flush_release_queue queue with
11441144+ | [] -> ()
11451145+ | failures -> raise (Release_failure (main_exn, failures))
11461146+11471147+ let quick_sample root =
11481148+ let queue = ref Release_done in
11491149+ match sample queue root with
11501150+ | result -> flush_or_fail None queue; result
11511151+ | exception exn -> flush_or_fail (Some exn) queue; raise exn
11521152+11531153+ let quick_release root =
11541154+ let queue = ref Release_done in
11551155+ release queue root;
11561156+ flush_or_fail None queue
11571157+11581158+ module Infix = struct
11591159+ let (>>=) x f = bind x ~f
11601160+ let (>|=) x f = map x ~f
11611161+ let (<*>) = app
11621162+ end
11631163+11641164+ (*$R
11651165+ let x = var 0 in
11661166+ let y = map ~f:succ (get x) in
11671167+ let o_y = Lwd.observe y in
11681168+ assert_equal 1 (quick_sample o_y);
11691169+ set x 10;
11701170+ assert_equal 11 (quick_sample o_y);
11711171+ *)
11721172+ end
+6-1
forks/lwd/lib/lwd/mutex_backend.ml
···11(** Backend selection for mutex implementations. *)
2233module type MUTEX = sig
44- include module type of Mutex
44+ type t
55+ val create : unit -> t
66+ val lock : t -> unit
77+ val unlock : t -> unit
88+ val try_lock : t -> bool
99+ val protect : t -> (unit -> 'a) -> 'a
510 val lock_all : t list -> bool
611end
712
···11-include Lwd_impl.Make(Mutex_picos.Default)11+include Lwd_impl.Make(Mutex_picos)
22+33+(* This should prevent the set from being cancelled and leaving hanging locks*)
44+let set vx x =
55+ Picos_std_structured.Control.protect (fun () ->
66+ set vx x;
77+ )
···88 Since Picos doesn't have a direct mutex equivalent, we'll implement
99 one using Picos's basic synchronization primitives. *)
10101111-type t = {
1212- mutable locked : bool;
1313- mutable owner : Picos.Fiber.t option;
1414- waiters : Picos.Trigger.t list ref;
1515-}
1111+include Picos_std_sync.Mutex
16121717-let create () = {
1818- locked = false;
1919- owner = None;
2020- waiters = ref [];
2121-}
1313+let create () = create ()
22142323-let lock mutex =
2424- let rec try_acquire () =
2525- if not mutex.locked then begin
2626- mutex.locked <- true;
2727- mutex.owner <- Some (Picos.Fiber.current ());
2828- end else begin
2929- (* Create a trigger to wait for the mutex to be released *)
3030- let trigger = Picos.Trigger.create () in
3131- mutex.waiters := trigger :: !(mutex.waiters);
3232- match Picos.await trigger with
3333- | None -> (* Cancelled *) raise (Sys_error "Mutex lock cancelled")
3434- | Some (exn, _) -> raise exn
3535- end
3636- in
3737- try_acquire ()
1515+let lock mut = lock mut
38163939-let try_lock mutex =
4040- if not mutex.locked then begin
4141- mutex.locked <- true;
4242- mutex.owner <- Some (Picos.Fiber.current ());
4343- true
4444- end else
4545- false
4646-4747-let unlock mutex =
4848- if not mutex.locked then
4949- raise (Sys_error "Mutex is not locked")
5050- else
5151- let current_fiber = Picos.Fiber.current () in
5252- match mutex.owner with
5353- | None -> raise (Sys_error "Mutex has no owner")
5454- | Some owner ->
5555- if not (Picos.Fiber.equal current_fiber owner) then
5656- raise (Sys_error "Mutex was locked by another fiber")
5757- else begin
5858- mutex.locked <- false;
5959- mutex.owner <- None;
6060- (* Wake up one waiter if any *)
6161- match !(mutex.waiters) with
6262- | [] -> ()
6363- | trigger :: rest ->
6464- mutex.waiters := rest;
6565- Picos.Trigger.signal trigger ()
6666- end
6767-6868-let protect mutex f =
6969- lock mutex;
7070- try
7171- let result = f () in
7272- unlock mutex;
7373- result
7474- with exn ->
7575- unlock mutex;
7676- raise exn
1717+let unlock mut = unlock mut
1818+let try_lock mut = try_lock mut
1919+let protect mut f = protect mut f
77207821let lock_all mutexes =
7922 let rec try_lock_all acc = function
···8932 false
9033 end
9134 in
9292- try_lock_all [] mutexes 3535+ try_lock_all [] mutexes