this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add node restarts to Construct and RestartNodes workload

garrison fb6fcb93 5f51d701

+148 -35
+85 -24
lib/construct/scheduler.ex
··· 31 31 end 32 32 33 33 defmodule StartNode do 34 - @enforce_keys [:name, :app_module, :args] 34 + @enforce_keys [:name] 35 35 defstruct @enforce_keys 36 36 end 37 37 38 38 defmodule Node do 39 39 @type t :: %__MODULE__{ 40 40 pid: pid | nil, 41 + status: :running | :stopped, 41 42 name: atom, 42 43 app_module: module, 43 44 args: term, 44 45 } 45 - @enforce_keys [:pid, :name, :app_module, :args] 46 + @enforce_keys [:pid, :status, :name, :app_module, :args] 46 47 defstruct @enforce_keys 47 48 end 48 49 ··· 145 146 146 147 @spec start_node(pid, atom, module, term) :: :ok 147 148 def start_node(scheduler, name, app_module, args) when is_atom(name) and is_atom(app_module) do 148 - start_node = %StartNode{name: name, app_module: app_module, args: args} 149 - GenServer.cast(scheduler, {:queue_task, self(), 0, start_node}) 150 - # TODO: yield? 149 + GenServer.call(scheduler, {:create_node, name, app_module, args}) 151 150 :ok 151 + end 152 + 153 + def restart_node(scheduler, name, delay) when is_atom(name) do 154 + GenServer.call(scheduler, {:restart_node, name, delay}) 155 + end 156 + 157 + def list_nodes(scheduler) do 158 + GenServer.call(scheduler, :list_nodes) 152 159 end 153 160 154 161 @spec monitor(pid, pid) :: reference ··· 301 308 end 302 309 end 303 310 311 + def handle_call({:create_node, name, app_module, args}, _from, %State{} = state) when is_atom(name) and is_atom(app_module) do 312 + node = %Node{pid: nil, status: :stopped, name: name, app_module: app_module, args: args} 313 + state = %{state | nodes: [{node.name, node} | state.nodes]} 314 + 315 + queue_task(state, state.clock, %StartNode{name: node.name}) 316 + 317 + {:reply, :ok, state} 318 + end 319 + 320 + def handle_call({:restart_node, name, delay}, _from, %State{} = state) when is_atom(name) and is_integer(delay) do 321 + case List.keyfind(state.nodes, name, 0) do 322 + {^name, %Node{status: :running} = node} -> 323 + :ok = kill_node(state, node) 324 + 325 + node = %{node | pid: nil, status: :stopped} 326 + state = %{state | nodes: List.keyreplace(state.nodes, name, 0, {name, node})} 327 + 328 + start_node = %StartNode{name: node.name} 329 + state = queue_task(state, state.clock + delay, start_node) 330 + 331 + {:reply, :ok, state} 332 + 333 + {^name, %Node{status: :stopped}} -> {:reply, {:error, :node_stopped}, state} 334 + [] -> {:reply, {:error, :node_not_found}, state} 335 + end 336 + end 337 + 338 + def handle_call(:list_nodes, _from, %State{} = state) do 339 + nodes = 340 + state.nodes 341 + |> Enum.map(fn {_name, %Node{} = node} -> node.name end) 342 + |> Enum.reverse() 343 + {:reply, nodes, state} 344 + end 345 + 304 346 def handle_cast({:queue_task, pid, delay, %Send{} = send}, %State{} = state) when is_pid(pid) and is_integer(delay) do 305 347 delay = 306 348 case send.dest == pid do ··· 435 477 end 436 478 437 479 defp perform(%State{} = state, %StartNode{} = start_node) do 438 - pid = do_spawn_apply(start_node.app_module, :start, [:temporary, start_node.args]) 439 - node = %Node{pid: pid, name: start_node.name, app_module: start_node.app_module, args: start_node.args} 480 + {_name, %Node{} = node} = List.keyfind(state.nodes, start_node.name, 0) 481 + pid = do_spawn_apply(node.app_module, :start, [:temporary, node.args]) 440 482 441 483 ProcStore.add_process(state.proc_store, pid, node.name) 442 484 log(state, {:start_node, pid, {node.name, node.app_module, node.args}}) 443 485 444 - %{state | nodes: [node | state.nodes]} 486 + node = %{node | pid: pid, status: :running} 487 + state = %{state | nodes: List.keyreplace(state.nodes, node.name, 0, {node.name, node})} 488 + 489 + state 445 490 |> set_current_process(pid) 446 491 end 447 492 ··· 572 617 state 573 618 end 574 619 575 - defp remove_process(%State{} = state, pid) when is_pid(pid) do 576 - {:ok, %ProcState{queue_key: queue_key}} = ProcStore.fetch_state(state.proc_store, pid) 577 - if queue_key, do: ProcQueue.remove(state.proc_queue, queue_key) 578 - 579 - ProcStore.remove_process(state.proc_store, pid) 580 - ProcRegistry.remove_process(state.proc_registry, pid) 581 - :ok 582 - end 583 - 584 620 # Don't send exits for reason :normal 585 621 defp dispatch_links(%State{} = state, target_pid, :normal) do 586 622 {:ok, %ProcState{} = target_proc} = ProcStore.fetch_state(state.proc_store, target_pid) ··· 611 647 612 648 # Kill processes 613 649 Enum.each(pids_to_kill, fn pid -> 614 - Process.exit(pid, reason) 615 - receive do 616 - {:EXIT, ^pid, ^reason} -> :noop 617 - after 618 - # Sanity to stop the scheduler hanging, this should never happen 619 - 1000 -> raise "Process #{inspect(pid)} failed to exit" 620 - end 650 + kill_process(pid, reason) 621 651 end) 622 652 623 653 # Dispatch monitors for all killed processes ··· 688 718 :rand.seed(:exsss, seed) 689 719 apply(m, f, a) 690 720 end) 721 + end 722 + 723 + defp kill_node(%State{} = state, %Node{} = node) do 724 + ProcStore.list_processes(state.proc_store) 725 + |> Enum.filter(&(&1.node == node.name)) 726 + |> Enum.each(fn %ProcState{pid: pid} -> 727 + :ok = kill_process(pid, :shutdown) 728 + :ok = remove_process(state, pid) 729 + end) 730 + :ok 731 + end 732 + 733 + # Note: remember to call remove_process() afterwards to clear the process from the ProcStore 734 + defp kill_process(pid, reason) do 735 + Process.exit(pid, reason) 736 + receive do 737 + {:EXIT, ^pid, ^reason} -> :noop 738 + after 739 + # Sanity to stop the scheduler hanging, this should never happen 740 + 1000 -> raise "Process #{inspect(pid)} failed to exit" 741 + end 742 + :ok 743 + end 744 + 745 + defp remove_process(%State{} = state, pid) when is_pid(pid) do 746 + {:ok, %ProcState{queue_key: queue_key}} = ProcStore.fetch_state(state.proc_store, pid) 747 + if queue_key, do: ProcQueue.remove(state.proc_queue, queue_key) 748 + 749 + ProcStore.remove_process(state.proc_store, pid) 750 + ProcRegistry.remove_process(state.proc_registry, pid) 751 + :ok 691 752 end 692 753 693 754 defp random_seed, do: Enum.random(1..1_000_000)
+7
lib/construct/scheduler/proc_store.ex
··· 27 27 28 28 @type t :: %__MODULE__{ 29 29 proc_table: :ets.table, 30 + alias_table: :ets.table, 30 31 } 31 32 32 33 @enforce_keys [:proc_table, :alias_table] ··· 209 210 210 211 :ets.insert(ps.proc_table, {pid, %{state | trap_exit: value}}) 211 212 {:ok, state.trap_exit} 213 + end 214 + 215 + @spec list_processes(t) :: [ProcState.t] 216 + def list_processes(%ProcStore{} = ps) do 217 + :ets.tab2list(ps.proc_table) 218 + |> Enum.map(fn {_pid, %ProcState{} = proc} -> proc end) 212 219 end 213 220 end
+14
lib/construct/sim_server.ex
··· 337 337 end 338 338 end 339 339 340 + def list_nodes do 341 + if not simulated?(), do: raise "list_nodes/0 can only be called in simulation" 342 + scheduler_pid = fetch_scheduler_pid!() 343 + 344 + Scheduler.list_nodes(scheduler_pid) 345 + end 346 + 340 347 @spec start_node(atom, module, term) :: :ok 341 348 def start_node(name, app_module, args) when is_atom(name) and is_atom(app_module) do 342 349 if not simulated?(), do: raise "start_node/3 can only be called in simulation" 343 350 scheduler_pid = fetch_scheduler_pid!() 344 351 345 352 Scheduler.start_node(scheduler_pid, name, app_module, args) 353 + end 354 + 355 + def restart_node(name, delay_ms \\ 0) when is_atom(name) and is_integer(delay_ms) do 356 + if not simulated?(), do: raise "restart_node/1 can only be called in simulation" 357 + scheduler_pid = fetch_scheduler_pid!() 358 + 359 + Scheduler.restart_node(scheduler_pid, name, delay_ms) 346 360 end 347 361 348 362 @spec monitor(pid) :: reference
+6 -8
lib/servers/coordinator.ex
··· 347 347 state = update_view(state, state.view_number) 348 348 state = save_state_and_commit(state) 349 349 350 - state = 351 - case state.role do 352 - :primary -> start_new_manager(state) 353 - _ -> state 354 - end 350 + state = start_manager_if_primary(state) 355 351 356 352 SimServer.send_after(self(), :tick, @tick_interval_ms) 357 353 ··· 443 439 true -> 444 440 state = 445 441 %{state | manager_pid: nil, manager_generation: nil} 446 - |> start_new_manager() 442 + |> start_manager_if_primary() 447 443 448 444 {:noreply, state} 449 445 ··· 856 852 857 853 state 858 854 |> send_start_view(other_replica_pids(state)) 859 - |> start_new_manager() 855 + |> start_manager_if_primary() 860 856 end 861 857 862 858 defp on_request_start_view(%State{} = state, %RequestStartView{} = rsv) when rsv.view_number != state.view_number do ··· 1117 1113 1118 1114 # Hobbes/Cluster 1119 1115 1120 - defp start_new_manager(%State{} = state) do 1116 + defp start_manager_if_primary(%State{role: :backup} = state), do: state 1117 + 1118 + defp start_manager_if_primary(%State{} = state) do 1121 1119 assert state.role == :primary 1122 1120 assert state.manager_pid == nil 1123 1121 assert state.manager_generation == nil
+6 -1
lib/servers/distributor.ex
··· 157 157 |> Enum.chunk_every(2, 1, :discard) 158 158 |> Enum.map(fn [{start_key, from, _to}, {end_key, _, _}] -> 159 159 from 160 - |> Enum.map(fn id -> Map.get(state.storage_servers, id).pid end) 160 + |> Enum.map(fn id -> 161 + case Map.fetch(state.storage_servers, id) do 162 + {:ok, %StorageInfo{pid: pid}} -> pid 163 + :error -> nil 164 + end 165 + end) 161 166 |> Enum.reject(&is_nil/1) 162 167 |> case do 163 168 [_ | _] = from_pids ->
+28
lib/workloads/restart_nodes.ex
··· 1 + defmodule Hobbes.Workloads.RestartNodes do 2 + alias Hobbes.Construct.SimServer 3 + 4 + @behaviour Hobbes.Workloads.Workload 5 + 6 + defp restart_random_node do 7 + nodes = SimServer.list_nodes() 8 + #node = Enum.random(nodes) 9 + node = hd(nodes) 10 + 11 + result = SimServer.restart_node(node, 4000) 12 + {node, result} 13 + end 14 + 15 + def run(%{}, opts) do 16 + delay = Keyword.get(opts, :delay, 3000) 17 + 18 + SimServer.sleep(delay) 19 + result = restart_random_node() 20 + 21 + { 22 + :ok, 23 + """ 24 + Result: #{inspect(result)} 25 + """, 26 + } 27 + end 28 + end
+2 -2
test/hobbes_test.exs
··· 166 166 {Workloads.CoordinatorReadWrite, [ 167 167 clients: 10, 168 168 client_tick_ms: 300, 169 - duration_ms: 10_000, 169 + duration_ms: 20_000, 170 170 ]}, 171 - {Workloads.KillServers, [delay_ms: 3000, server_type: Hobbes.Servers.Coordinator]}, 171 + {Workloads.RestartNodes, [delay_ms: 3000, server_type: Hobbes.Servers.Coordinator]}, 172 172 ], HobbesTest.SimOpts.sim_opts(name: test, preset: :read_write)) 173 173 end 174 174 end