···11defmodule Trinity.Scheduler do
22 defmodule Simulation do
33- @enforce_keys [:queue, :now, :supervisor_pid]
33+ @enforce_keys [
44+ :queue,
55+ :proc_links,
66+ :now,
77+ :supervisor_pid,
88+ ]
49 defstruct @enforce_keys
510 end
611···3944 def start do
4045 sim = %Simulation{
4146 queue: :ets.new(__MODULE__, [:ordered_set, :public]),
4747+ proc_links: :ets.new(__MODULE__, [:set, :public]),
4248 now: :atomics.new(1, signed: false),
4349 supervisor_pid: nil,
4450 }
···6571 end
66726773 @spec spawn_and_yield(function) :: pid
6868- def spawn_and_yield(fun) do
6969- %Simulation{queue: queue, now: now} = sim = get_sim()
7070- yield_ref = enqueue_self(queue, now, 0)
7474+ def spawn_and_yield(fun, link? \\ false) do
7575+ sim = get_sim()
7676+ parent_pid = self()
7777+ spawn_ref = make_ref()
71787279 spawned_pid = spawn(fn ->
8080+ # In the new process, we first register with the supervisor
8181+ # and inject the sim
7382 SimulationSupervisor.register_self(sim.supervisor_pid)
7483 put_sim(sim)
75848585+ # We then enqueue the new process and unsuspend the parent
8686+ # so that it can register any links before yielding back
8787+ # to us (eventually, if there are others in queue at `time=now`)
8888+ yield_ref = enqueue_self(sim.queue, sim.now, 0)
8989+ send parent_pid, spawn_ref
9090+ receive do
9191+ ^yield_ref -> :noop
9292+ end
9393+9494+ # Once we are properly resumed we can execute `fun`
7695 fun.()
7796 end)
78979898+ # We suspend the parent process while waiting for the child
9999+ # to register itself
79100 receive do
8080- ^yield_ref -> spawned_pid
101101+ ^spawn_ref -> :noop
102102+ end
103103+ # Register the link, if requested
104104+ # Note that at this point the child process has registered itself
105105+ # but has not begun executing its `fun` (and so cannot have crashed)
106106+ if link?, do: add_link(spawned_pid)
107107+108108+ # Yield to the child (or anything else in queue before it)
109109+ yield(0)
110110+ spawned_pid
111111+ end
112112+113113+ @spec add_link(pid) :: :ok
114114+ def add_link(to_pid) do
115115+ # TODO: noproc if `to_pid` has exited
116116+ %Simulation{proc_links: proc_links} = get_sim()
117117+ from_pid = self()
118118+119119+ from_links =
120120+ case :ets.lookup(proc_links, from_pid) do
121121+ [{^from_pid, links}] -> links
122122+ _ -> []
123123+ end
124124+125125+ to_links =
126126+ case :ets.lookup(proc_links, to_pid) do
127127+ [{^to_pid, links}] -> links
128128+ _ -> []
129129+ end
130130+131131+ case to_pid in from_links do
132132+ false ->
133133+ :ets.insert(proc_links, {from_pid, [to_pid | from_links]})
134134+ :ets.insert(proc_links, {to_pid, [from_pid | to_links]})
135135+ _ -> :noop
81136 end
137137+138138+ :ok
82139 end
8314084141 defp perform_next(queue, now) do
···124181 end
125182126183 @doc false
127127- def dump(%Simulation{} = sim) do
184184+ def dump do
185185+ sim = get_sim()
128186 %{
129187 queue: :ets.tab2list(sim.queue),
188188+ proc_links: :ets.tab2list(sim.proc_links),
130189 now: read_now(sim.now),
131190 supervisor_pid: sim.supervisor_pid,
132191 }