this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Link sim processes to scheduler and run root fun inside sim

garrison d22b6a5c daee0828

+119 -79
+6 -71
lib/trinity/scheduler.ex
··· 1 1 defmodule Trinity.Scheduler do 2 + alias Trinity.Scheduler.SimulationSupervisor 3 + 2 4 defmodule Simulation do 3 5 @enforce_keys [ 4 6 :queue, ··· 10 12 defstruct @enforce_keys 11 13 end 12 14 13 - defmodule SimulationSupervisor do 14 - use GenServer 15 - 16 - defmodule State do 17 - @enforce_keys [:sim, :monitors] 18 - defstruct @enforce_keys 19 - end 20 - 21 - def start_link(%Simulation{} = sim) do 22 - GenServer.start(__MODULE__, sim) 23 - end 24 - 25 - def register_self(server) do 26 - GenServer.cast(server, {:register, self()}) 27 - end 28 - 29 - def init(%Simulation{} = sim) do 30 - state = %State{ 31 - sim: sim, 32 - monitors: %{}, 33 - } 34 - {:ok, state} 35 - end 36 - 37 - def handle_cast({:register, pid}, %State{} = state) do 38 - mref = Process.monitor(pid) 39 - state = %{state | monitors: Map.put(state.monitors, pid, mref)} 40 - {:noreply, state} 41 - end 42 - 43 - def handle_info({:DOWN, _ref, :process, pid, reason}, %State{} = state) do 44 - killed = Trinity.Scheduler.handle_down(state.sim, pid, reason) 45 - killed = [pid | killed] 46 - 47 - monitors = 48 - Enum.reduce(killed, state.monitors, fn p, acc -> 49 - {mref, acc} = Map.pop!(acc, p) 50 - Process.demonitor(mref, [:flush]) 51 - acc 52 - end) 53 - 54 - state = %{state | monitors: monitors} 55 - {:noreply, state} 56 - end 57 - end 58 - 59 15 defmacro simulation_key, do: :_trinity_simulation 60 16 61 17 defp get_sim, do: Process.get(simulation_key()) 62 - defp put_sim(%Simulation{} = sim), do: Process.put(simulation_key(), sim) 63 18 64 - @spec start :: :ok 65 - def start do 66 - sim = %Simulation{ 67 - queue: :ets.new(__MODULE__, [:ordered_set, :public]), 68 - proc_queue_keys: :ets.new(__MODULE__, [:set, :public]), 69 - proc_links: :ets.new(__MODULE__, [:set, :public]), 70 - now: :atomics.new(1, signed: false), 71 - supervisor_pid: nil, 72 - } 73 - 74 - {:ok, supervisor_pid} = SimulationSupervisor.start_link(sim) 75 - SimulationSupervisor.register_self(supervisor_pid) 76 - 77 - sim = %{sim | supervisor_pid: supervisor_pid} 78 - put_sim(sim) 79 - 80 - :ok 19 + def run_simulation(fun) do 20 + SimulationSupervisor.run_simulation(fun) 81 21 end 82 22 83 23 @spec yield(non_neg_integer) :: :ok ··· 98 38 parent_pid = self() 99 39 spawn_ref = make_ref() 100 40 101 - spawned_pid = spawn(fn -> 102 - # In the new process, we first register with the supervisor 103 - # and inject the sim 104 - SimulationSupervisor.register_self(sim.supervisor_pid) 105 - put_sim(sim) 106 - 107 - # We then enqueue the new process and unsuspend the parent 41 + spawned_pid = SimulationSupervisor.spawn_child(sim.supervisor_pid, fn -> 42 + # Enqueue the new process and unsuspend the parent 108 43 # so that it can register any links before yielding back 109 44 # to us (eventually, if there are others in queue at `time=now`) 110 45 yield_ref = enqueue_self(sim.queue, sim.proc_queue_keys, sim.now, 0)
+109
lib/trinity/scheduler/simulation_supervisor.ex
··· 1 + defmodule Trinity.Scheduler.SimulationSupervisor do 2 + use GenServer 3 + 4 + alias Trinity.Scheduler.Simulation 5 + import Trinity.Scheduler, only: [simulation_key: 0] 6 + 7 + defmodule State do 8 + @enforce_keys [ 9 + :sim, 10 + :root_pid, 11 + 12 + :parent_pid, 13 + :parent_ref, 14 + ] 15 + defstruct @enforce_keys 16 + end 17 + 18 + def run_simulation(fun) do 19 + ref = make_ref() 20 + GenServer.start_link(__MODULE__, %{ 21 + fun: fun, 22 + parent_pid: self(), 23 + parent_ref: ref, 24 + }) 25 + 26 + receive do 27 + ^ref -> :ok 28 + end 29 + end 30 + 31 + @spec spawn_child(GenServer.server, function) :: pid 32 + def spawn_child(server, fun) do 33 + GenServer.call(server, {:spawn_child, fun}) 34 + end 35 + 36 + def init(%{fun: fun, parent_pid: parent_pid, parent_ref: parent_ref}) do 37 + Process.flag(:trap_exit, true) 38 + 39 + sim = %Simulation{ 40 + queue: :ets.new(__MODULE__, [:ordered_set, :public]), 41 + proc_queue_keys: :ets.new(__MODULE__, [:set, :public]), 42 + proc_links: :ets.new(__MODULE__, [:set, :public]), 43 + now: :atomics.new(1, signed: false), 44 + supervisor_pid: self(), 45 + } 46 + 47 + root_pid = spawn_sim_child(sim, fun) 48 + 49 + state = %State{ 50 + sim: sim, 51 + root_pid: root_pid, 52 + 53 + parent_pid: parent_pid, 54 + parent_ref: parent_ref, 55 + } 56 + 57 + {:ok, state} 58 + end 59 + 60 + def handle_call({:spawn_child, fun}, _from, %State{} = state) do 61 + pid = spawn_sim_child(state.sim, fun) 62 + {:reply, pid, state} 63 + end 64 + 65 + def handle_info({:EXIT, from, reason}, %State{} = state) do 66 + %State{ 67 + sim: sim, 68 + root_pid: root_pid, 69 + parent_pid: parent_pid, 70 + parent_ref: parent_ref, 71 + } = state 72 + 73 + case from do 74 + ^parent_pid -> 75 + # The parent process died, kill the simulation 76 + exit(reason) 77 + 78 + ^root_pid -> 79 + # The root has died, end the simulation 80 + send parent_pid, parent_ref 81 + {:noreply, state} 82 + 83 + _ -> 84 + killed = Trinity.Scheduler.handle_down(sim, from, reason) 85 + 86 + Enum.each(killed, fn pid -> 87 + # Flush exit messages for (simulated) linked processes 88 + receive do 89 + {:EXIT, ^pid, _reason} -> :noop 90 + end 91 + 92 + # If the root was killed, end the simulation 93 + case pid == root_pid do 94 + true -> send parent_pid, parent_ref 95 + _ -> :noop 96 + end 97 + end) 98 + 99 + {:noreply, state} 100 + end 101 + end 102 + 103 + defp spawn_sim_child(%Simulation{} = sim, fun) do 104 + spawn_link(fn -> 105 + Process.put(simulation_key(), sim) 106 + fun.() 107 + end) 108 + end 109 + end
+4 -8
test/trinity_test.exs
··· 4 4 alias Trinity.Scheduler 5 5 6 6 test "scheduler" do 7 - Scheduler.start() 8 - 9 - Scheduler.spawn_and_yield(fn -> 7 + Scheduler.run_simulation(fn -> 10 8 Enum.each(1..100, fn i -> 11 9 Scheduler.spawn_and_yield(fn -> 12 10 dbg {self(), "proc #{i}"} 13 11 Scheduler.yield(1_000_000) 14 12 end, true) 15 13 end) 16 - Scheduler.yield(1000) 17 - raise "foobar" 18 - end, false) 19 14 20 - Scheduler.yield(2000) 21 - dbg Scheduler.dump() 15 + Scheduler.yield(1000) 16 + dbg Scheduler.dump(), limit: :infinity 17 + end) 22 18 end 23 19 end