this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add nodes

garrison 76275f8c 57f04b27

+93 -7
+63 -2
lib/trinity/scheduler.ex
··· 8 8 proc_links: :ets.table, 9 9 proc_aliases: :ets.table, 10 10 11 + proc_nodes: :ets.table, 12 + node_procs: :ets.table, 13 + 11 14 now: :atomics.atomics_ref, 12 15 supervisor_pid: pid, 13 16 } ··· 16 19 :proc_queue_keys, 17 20 :proc_links, 18 21 :proc_aliases, 22 + 23 + :proc_nodes, 24 + :node_procs, 19 25 20 26 :now, 21 27 :supervisor_pid, ··· 64 70 65 71 @spec spawn_and_yield(function) :: pid 66 72 def spawn_and_yield(fun, link? \\ false) do 67 - sim = get_sim() 73 + spawn_node_and_yield(fun, nil, link?) 74 + end 75 + 76 + @spec spawn_node_and_yield(function, atom, boolean) :: pid 77 + def spawn_node_and_yield(fun, node, link?) do 78 + %{proc_nodes: proc_nodes} = sim = get_sim() 79 + node = node || get_proc_node(proc_nodes) 80 + 68 81 parent_pid = self() 69 82 spawn_ref = make_ref() 70 83 71 84 spawned_pid = SimulationSupervisor.spawn_child(sim.supervisor_pid, fn -> 85 + %{ 86 + queue: queue, 87 + proc_queue_keys: proc_queue_keys, 88 + proc_nodes: proc_nodes, 89 + node_procs: node_procs, 90 + now: now, 91 + } = sim 92 + 93 + # Update the process/node mapping in both directions 94 + put_proc_node(proc_nodes, node_procs, node) 72 95 # Enqueue the new process and unsuspend the parent 73 96 # so that it can register any links before yielding back 74 97 # to us (eventually, if there are others in queue at `time=now`) 75 - yield_ref = enqueue_resume(sim.queue, sim.proc_queue_keys, sim.now, 0) 98 + yield_ref = enqueue_resume(queue, proc_queue_keys, now, 0) 76 99 send parent_pid, spawn_ref 77 100 receive do 78 101 ^yield_ref -> :noop ··· 181 204 :ok 182 205 end 183 206 207 + @spec set_up_root(atom) :: :ok 208 + def set_up_root(root_node) do 209 + %{proc_nodes: proc_nodes, node_procs: node_procs} = get_sim() 210 + put_proc_node(proc_nodes, node_procs, root_node) 211 + :ok 212 + end 213 + 184 214 def handle_down(%Simulation{} = sim, _pid, :normal) do 185 215 %{queue: queue, proc_queue_keys: proc_queue_keys} = sim 186 216 perform_next(queue, proc_queue_keys, sim.now) ··· 297 327 queue: queue, 298 328 proc_links: proc_links, 299 329 proc_queue_keys: proc_queue_keys, 330 + proc_nodes: proc_nodes, 331 + node_procs: node_procs, 300 332 } = sim 301 333 302 334 destroy_links(proc_links, pid) 335 + destroy_proc_node(proc_nodes, node_procs, pid) 336 + # Note that we do not currently bother to destroy aliases here 303 337 304 338 case :ets.lookup(proc_queue_keys, pid) do 305 339 [{_event, ^pid, qk}] -> ··· 352 386 end 353 387 end 354 388 389 + defp get_proc_node(proc_nodes) do 390 + [{_pid, node}] = :ets.lookup(proc_nodes, self()) 391 + node 392 + end 393 + 394 + defp put_proc_node(proc_nodes, node_procs, node) do 395 + pid = self() 396 + :ets.insert(proc_nodes, {pid, node}) 397 + existing_procs = case :ets.lookup(node_procs, node) do 398 + [{_node, procs}] -> procs 399 + [] -> [] 400 + end 401 + :ets.insert(node_procs, {node, [pid | existing_procs]}) 402 + end 403 + 404 + defp destroy_proc_node(proc_nodes, node_procs, pid) do 405 + [{_pid, node}] = :ets.lookup(proc_nodes, pid) 406 + :ets.delete(proc_nodes, pid) 407 + 408 + [{_node, existing_procs}] = :ets.lookup(node_procs, node) 409 + :ets.insert(node_procs, {node, List.delete(existing_procs, pid)}) 410 + end 411 + 355 412 defp read_now(now), do: :atomics.get(now, 1) 356 413 defp set_now(now, time), do: :atomics.put(now, 1, time) 357 414 ··· 361 418 %{ 362 419 queue: :ets.tab2list(sim.queue), 363 420 proc_links: :ets.tab2list(sim.proc_links), 421 + 422 + proc_nodes: :ets.tab2list(sim.proc_nodes), 423 + node_procs: :ets.tab2list(sim.node_procs), 424 + 364 425 now: read_now(sim.now), 365 426 supervisor_pid: sim.supervisor_pid, 366 427 }
+9 -1
lib/trinity/scheduler/simulation_supervisor.ex
··· 46 46 proc_links: :ets.new(__MODULE__, [:set, :public]), 47 47 proc_aliases: :ets.new(__MODULE__, [:set, :public]), 48 48 49 + proc_nodes: :ets.new(__MODULE__, [:set, :public]), 50 + node_procs: :ets.new(__MODULE__, [:set, :public]), 51 + 49 52 now: :atomics.new(1, signed: false), 50 53 supervisor_pid: self(), 51 54 } 52 55 53 - root_pid = spawn_sim_child(sim, fun) 56 + # TODO: configurable? 57 + root_node = :nonode 58 + root_pid = spawn_sim_child(sim, fn -> 59 + Trinity.Scheduler.set_up_root(root_node) 60 + fun.() 61 + end) 54 62 55 63 state = %State{ 56 64 sim: sim,
+8
lib/trinity/sim_process.ex
··· 12 12 Kernel.send(dest, message) 13 13 end 14 14 15 + @spec spawn_node(atom, function) :: pid 16 + def spawn_node(node, fun) do 17 + case get_sim() do 18 + nil -> raise "spawn_node() only works in simulation" 19 + _sim -> Scheduler.spawn_node_and_yield(fun, node, false) 20 + end 21 + end 22 + 15 23 @spec alias :: reference 16 24 def alias do 17 25 case get_sim() do
+4
lib/trinity/sim_server.ex
··· 11 11 SimGen.start_link(module, arg, options) 12 12 end 13 13 14 + def start(module, arg, options \\ []) do 15 + SimGen.start(module, arg, options) 16 + end 17 + 14 18 @spec call(GenServer.server, term, timeout) :: term 15 19 def call(server, request, timeout \\ 5000) do 16 20 case get_sim() do
+9 -4
test/trinity_test.exs
··· 1 1 defmodule TrinityTest do 2 2 use ExUnit.Case 3 3 4 - alias Trinity.Scheduler 4 + alias Trinity.{SimProcess, Scheduler} 5 5 6 6 defmodule Counter do 7 7 use GenServer ··· 22 22 23 23 test "scheduler" do 24 24 Scheduler.run_simulation(fn -> 25 + nodes = [:n1, :n2, :n3] 25 26 pids = 26 - Enum.map(1..10, fn i -> 27 - {:ok, pid} = Counter.start_link(i) 28 - pid 27 + Enum.map(nodes, fn i -> 28 + Enum.map(1..10, fn i -> 29 + {:ok, pid} = Counter.start_link(i) 30 + pid 31 + end) 29 32 end) 33 + |> Enum.concat() 34 + 30 35 dbg pids 31 36 32 37 Enum.each(pids, fn pid ->