this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add create_node/start_node to allow for node restarts

garrison 2d1c8097 366f69dd

+101 -26
+49 -1
lib/trinity/scheduler.ex
··· 11 11 proc_links: :ets.table, 12 12 proc_aliases: :ets.table, 13 13 14 + nodes: :ets.table, 15 + node_procs: :ets.table, 14 16 proc_nodes: :ets.table, 15 - node_procs: :ets.table, 16 17 17 18 file_paths: :ets.table, 18 19 file_data: :ets.table, ··· 29 30 :proc_links, 30 31 :proc_aliases, 31 32 33 + :nodes, 32 34 :proc_nodes, 33 35 :node_procs, 34 36 ··· 65 67 def now do 66 68 %{now: now} = get_sim() 67 69 :atomics.get(now, 1) 70 + end 71 + 72 + @spec create_node(atom, {module, atom, list}) :: :ok 73 + def create_node(node, {_m, _f, _a} = mfa) do 74 + %{nodes: nodes} = get_sim() 75 + case :ets.lookup(nodes, node) do 76 + [] -> :ets.insert(nodes, {node, mfa}) 77 + [{_key, _existing}] -> raise ArgumentError, "Node #{inspect(node)} already exists" 78 + end 79 + :ok 80 + end 81 + 82 + @spec start_node(atom, non_neg_integer) :: :ok 83 + def start_node(node, delay) do 84 + %{queue: queue, nodes: nodes, now: now} = get_sim() 85 + case :ets.member(nodes, node) do 86 + true -> :noop 87 + false -> raise ArgumentError, "Node #{inspect(node)} cannot be started because it does not exist" 88 + end 89 + enqueue_start_node(queue, now, node, delay) 90 + :ok 68 91 end 69 92 70 93 @spec yield(non_neg_integer) :: :ok ··· 346 369 347 370 defp perform_next(queue, proc_queue_keys, now) do 348 371 case pop_next(queue) do 372 + {time, {:start_node, node}} -> 373 + :atomics.put(now, 1, time) 374 + do_start_node(node) 349 375 {time, {:timeout, pid, ref}} -> 350 376 :ets.delete(proc_queue_keys, pid) 351 377 :atomics.put(now, 1, time) ··· 369 395 _ -> 370 396 raise "Queue empty!" 371 397 end 398 + end 399 + 400 + defp do_start_node(node) do 401 + %{ 402 + nodes: nodes, 403 + node_procs: node_procs, 404 + proc_nodes: proc_nodes, 405 + supervisor_pid: supervisor_pid, 406 + } = get_sim() 407 + 408 + [{_key, mfa}] = :ets.lookup(nodes, node) 409 + {m, f, a} = mfa 410 + 411 + SimulationSupervisor.spawn_child(supervisor_pid, fn -> 412 + put_proc_node(proc_nodes, node_procs, node) 413 + apply(m, f, a) 414 + end) 415 + end 416 + 417 + defp enqueue_start_node(queue, now, node, delay) do 418 + enqueue_event(queue, now, delay, {:start_node, node}) 372 419 end 373 420 374 421 defp enqueue_timeout(_queue, proc_queue_keys, _now, :infinity) do ··· 481 528 queue: :ets.tab2list(sim.queue), 482 529 proc_queue_keys: :ets.tab2list(sim.proc_queue_keys), 483 530 proc_links: :ets.tab2list(sim.proc_links), 531 + proc_aliases: :ets.tab2list(sim.proc_aliases), 484 532 485 533 proc_nodes: :ets.tab2list(sim.proc_nodes), 486 534 node_procs: :ets.tab2list(sim.node_procs),
+2 -1
lib/trinity/scheduler/simulation_supervisor.ex
··· 53 53 proc_links: :ets.new(__MODULE__, [:set, :public]), 54 54 proc_aliases: :ets.new(__MODULE__, [:set, :public]), 55 55 56 + nodes: :ets.new(__MODULE__, [:set, :public]), 57 + node_procs: :ets.new(__MODULE__, [:set, :public]), 56 58 proc_nodes: :ets.new(__MODULE__, [:set, :public]), 57 - node_procs: :ets.new(__MODULE__, [:set, :public]), 58 59 59 60 file_paths: :ets.new(__MODULE__, [:ordered_set, :public]), 60 61 file_data: :ets.new(__MODULE__, [:ordered_set, :public]),
+10
lib/trinity/sim.ex
··· 21 21 |> Enum.sort() 22 22 end 23 23 24 + @spec create_node(atom, {module, atom, list}) :: :ok 25 + def create_node(node, {_m, _f, _a} = mfa) do 26 + Trinity.Scheduler.create_node(node, mfa) 27 + end 28 + 29 + @spec start_node(atom, non_neg_integer) :: :ok 30 + def start_node(node, delay \\ 0) do 31 + Trinity.Scheduler.start_node(node, delay) 32 + end 33 + 24 34 @spec kill_node(atom) :: :ok 25 35 def kill_node(node) do 26 36 %{supervisor_pid: supervisor_pid} = get_sim()
+40 -24
test/trinity_test.exs
··· 57 57 end 58 58 end 59 59 60 + defmodule CounterSupervisor do 61 + use GenServer 62 + alias Trinity.SimServer 63 + 64 + def start_link(children, opts), do: SimServer.start_link(__MODULE__, children, opts) 65 + def get_children(server), do: SimServer.call(server, :get_children) 66 + 67 + def init(children) do 68 + pids = Enum.map(children, fn {m, f, a} -> 69 + {:ok, pid} = apply(m, f, a) 70 + pid 71 + end) 72 + {:ok, pids} 73 + end 74 + 75 + def handle_call(:get_children, _from, pids) do 76 + {:reply, pids, pids} 77 + end 78 + end 79 + 60 80 test "scheduler" do 61 81 message = Sim.run_simulation(fn -> 62 82 nodes = [:n1, :n2, :n3] 63 - pids = 64 - Enum.map(nodes, fn node -> 65 - name = String.to_atom(Atom.to_string(node) <> "_proc") 66 - parent = self() 67 - ref = make_ref() 68 - 69 - SimProcess.spawn_node(node, fn -> 70 - SimProcess.register(self(), name) 71 83 72 - receive_yield do 73 - :begin -> :noop 74 - end 84 + names = Enum.map(nodes, fn node -> 85 + name = String.to_atom(Atom.to_string(node) <> "_proc") 75 86 76 - pids = Enum.map(1..10, fn i -> 77 - {:ok, pid} = Counter.start_link(i, i) 78 - {pid, i} 79 - end) 87 + children = Enum.map(1..10, fn i -> 88 + {TrinityTest.Counter, :start_link, [i, i]} 89 + end) 80 90 81 - SimProcess.send parent, {ref, pids} 82 - end) 91 + node_mfa = { 92 + TrinityTest.CounterSupervisor, 93 + :start_link, 94 + [children, [name: name]], 95 + } 96 + Sim.create_node(node, node_mfa) 97 + Sim.start_node(node) 98 + {name, node} 99 + end) 83 100 84 - SimProcess.send {name, node}, :begin 101 + # Wait for nodes to start/register 102 + SimProcess.sleep(100) 85 103 86 - receive_yield do 87 - {^ref, pids} -> pids 88 - end 89 - end) 104 + pids = 105 + names 106 + |> Enum.map(&CounterSupervisor.get_children/1) 90 107 |> Enum.concat() 91 108 92 - #dbg pids 93 - 109 + pids = [] 94 110 Enum.each(pids, fn {pid, id} -> 95 111 result = Counter.add(pid, 10) 96 112 assert result == (id + 10)