this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add nodes to Construct and bootstrap multi-node cluster in simulation

garrison ef7e69de c55fc2da

+266 -59
+63
lib/cluster_node.ex
··· 1 + defmodule Hobbes.ClusterNode do 2 + use Hobbes.Construct.SimServer 3 + 4 + alias Hobbes.Servers.{Coordinator, ServerSupervisor} 5 + 6 + import ExUnit.Assertions, only: [assert: 1] 7 + 8 + @spec start_link(keyword) :: term 9 + def start_link(config) when is_list(config), do: SimServer.start_link(__MODULE__, config) 10 + 11 + defmodule State do 12 + @enforce_keys [ 13 + :config, 14 + :coordinator_pid, 15 + :server_supervisor_pid, 16 + ] 17 + defstruct @enforce_keys 18 + end 19 + 20 + def init(config_opts) do 21 + #SimServer.flag(:trap_exit, true) 22 + 23 + # TODO: validate config with helpful error messages 24 + config = parse_config(config_opts) 25 + 26 + coordinator_pid = start_coordinator(config) 27 + server_supervisor_pid = start_server_supervisor(config) 28 + 29 + state = %State{ 30 + config: config, 31 + coordinator_pid: coordinator_pid, 32 + server_supervisor_pid: server_supervisor_pid, 33 + } 34 + 35 + {:ok, state} 36 + end 37 + 38 + defp start_coordinator(%{coordinator_id: nil}), do: nil 39 + 40 + defp start_coordinator(%{coordinator_id: id, coordinator_names: coordinator_names}) 41 + when is_integer(id) and is_list(coordinator_names) do 42 + assert id < length(coordinator_names) 43 + 44 + # TODO: use cluster name 45 + name = String.to_atom("coordinator-#{id}") 46 + {:ok, pid} = Coordinator.start_link(id, coordinator_names, name: name) 47 + pid 48 + end 49 + 50 + defp start_server_supervisor(%{coordinator_names: coordinators}) do 51 + {:ok, pid} = ServerSupervisor.start_link(coordinators: coordinators) 52 + pid 53 + end 54 + 55 + defp parse_config(config) do 56 + %{ 57 + cluster_name: Keyword.fetch!(config, :cluster), 58 + coordinator_names: Keyword.fetch!(config, :coordinators), 59 + coordinator_id: Keyword.get(config, :coordinator_id), 60 + slots: Keyword.fetch!(config, :slots), 61 + } 62 + end 63 + end
+91 -30
lib/construct/scheduler.ex
··· 30 30 defstruct @enforce_keys 31 31 end 32 32 33 + defmodule StartNode do 34 + @enforce_keys [:name, :app_module, :args] 35 + defstruct @enforce_keys 36 + end 37 + 38 + defmodule Node do 39 + @type t :: %__MODULE__{ 40 + pid: pid | nil, 41 + name: atom, 42 + app_module: module, 43 + args: term, 44 + } 45 + @enforce_keys [:pid, :name, :app_module, :args] 46 + defstruct @enforce_keys 47 + end 48 + 33 49 defmodule State do 34 50 @type t :: %__MODULE__{ 35 51 clock: non_neg_integer, 36 - current: {pid, reference} | nil, 52 + current: pid | nil, 53 + 54 + nodes: [{atom, Node.t}], 37 55 proc_store: ProcStore.t, 38 56 proc_queue: ProcQueue.t, 39 57 proc_registry: ProcRegistry.t, 40 58 file_stores: %{atom => FileStore.t}, 59 + 41 60 log_server_pid: pid, 61 + 42 62 resumes_without_send: non_neg_integer, 43 63 } 44 64 45 65 defstruct [ 46 66 clock: 0, 67 + 68 + nodes: [], 47 69 current: nil, 48 70 proc_store: nil, 49 71 proc_queue: nil, ··· 121 143 :ok 122 144 end 123 145 146 + @spec start_node(pid, atom, module, term) :: :ok 147 + def start_node(scheduler, name, app_module, args) when is_atom(name) and is_atom(app_module) do 148 + start_node = %StartNode{name: name, app_module: app_module, args: args} 149 + GenServer.cast(scheduler, {:queue_task, self(), 0, start_node}) 150 + # TODO: yield? 151 + :ok 152 + end 153 + 124 154 @spec monitor(pid, pid) :: reference 125 155 def monitor(scheduler, target_pid) do 126 156 ref = make_ref() ··· 177 207 end 178 208 179 209 def init(%{initial_pid: initial_pid, seed: seed}) do 210 + Process.flag(:trap_exit, true) 180 211 :rand.seed(:exsss, seed) 181 212 182 213 state = %State{ ··· 186 217 # TODO: spawn per node instead 187 218 file_stores: %{nonode: FileStore.new()}, 188 219 } 189 - ProcStore.add_process(state.proc_store, initial_pid) 220 + ProcStore.add_process(state.proc_store, initial_pid, :nonode) 190 221 191 222 state = set_current_process(state, initial_pid) 192 223 {:ok, state} ··· 210 241 {:reply, {:ok, old_value}, state} 211 242 end 212 243 213 - def handle_call({:register_process, _from_pid, pid, name}, _from, %State{} = state) do 214 - # TODO: ensure from_pid and pid are on the same node and use that node 244 + def handle_call({:register_process, from_pid, pid, name}, _from, %State{} = state) do 215 245 # TODO: ensure name is valid? (not nil/true/false/:undefined) 216 - node = :nonode 246 + {:ok, %ProcState{node: from_node}} = ProcStore.fetch_state(state.proc_store, from_pid) 247 + {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, pid) 217 248 218 - case ProcRegistry.register(state.proc_registry, pid, node, name) do 219 - :ok -> 220 - {:reply, :ok, state} 221 - {:error, _error} -> 222 - {:reply, :error, state} 249 + case node == from_node do 250 + true -> 251 + case ProcRegistry.register(state.proc_registry, pid, node, name) do 252 + :ok -> {:reply, :ok, state} 253 + {:error, _error} -> {:reply, :error, state} 254 + end 255 + 256 + false -> {:reply, :error, state} 223 257 end 224 258 end 225 259 226 - def handle_call({:whereis, _from_pid, name}, _from, %State{} = state) do 227 - # TODO: use node of from_pid 228 - node = :nonode 260 + def handle_call({:whereis, from_pid, name}, _from, %State{} = state) do 261 + {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, from_pid) 229 262 230 263 case ProcRegistry.whereis(state.proc_registry, node, name) do 231 264 {:ok, pid} -> {:reply, pid, state} ··· 308 341 309 342 def handle_cast({:yield, pid}, %State{} = state) when is_pid(pid) do 310 343 case state.current do 311 - {^pid, mref} -> Process.demonitor(mref, [:flush]) 344 + ^pid -> :noop 312 345 end 313 346 {:noreply, %{state | current: nil} |> perform_next_task()} 314 347 end 315 348 316 - def handle_info({:DOWN, mref, :process, pid, reason} = message, %State{} = state) do 349 + def handle_info({:EXIT, pid, reason} = message, %State{} = state) do 317 350 case state.current do 318 - {^pid, ^mref} -> 351 + ^pid -> 319 352 state = clean_up_dead_process(state, pid, reason) 320 353 {:noreply, %{state | current: nil} |> perform_next_task()} 321 354 322 355 _ -> 323 356 raise """ 324 - Received invalid DOWN message: #{inspect(message)} 357 + Received invalid EXIT message: #{inspect(message)} 325 358 326 359 Scheduler state: 327 360 ··· 385 418 386 419 defp perform(%State{} = state, %Spawn{} = spawn) do 387 420 parent_pid = spawn.parent_resume.pid 421 + {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, parent_pid) 388 422 389 - scheduler_pid = self() 390 - seed = random_seed() 391 - %Spawn{module: m, function: f, args: a} = spawn 392 - child_pid = spawn(fn -> 393 - Hobbes.Construct.SimServer.set_scheduler_pid(scheduler_pid) 394 - :rand.seed(:exsss, seed) 395 - apply(m, f, a) 396 - end) 423 + child_pid = do_spawn_apply(spawn.module, spawn.function, spawn.args) 397 424 398 - ProcStore.add_process(state.proc_store, child_pid) 425 + ProcStore.add_process(state.proc_store, child_pid, node) 399 426 if spawn.link, do: ProcStore.add_link(state.proc_store, parent_pid, child_pid) 400 427 401 428 log(state, {:spawn, child_pid, {spawn.module, spawn.function, spawn.args}}) ··· 405 432 state 406 433 |> set_current_process(child_pid) 407 434 |> queue_task(state.clock, parent_resume) 435 + end 436 + 437 + defp perform(%State{} = state, %StartNode{} = start_node) do 438 + pid = do_spawn_apply(start_node.app_module, :start, [:temporary, start_node.args]) 439 + node = %Node{pid: pid, name: start_node.name, app_module: start_node.app_module, args: start_node.args} 440 + 441 + ProcStore.add_process(state.proc_store, pid, node.name) 442 + log(state, {:start_node, pid, {node.name, node.app_module, node.args}}) 443 + 444 + %{state | nodes: [node | state.nodes]} 445 + |> set_current_process(pid) 408 446 end 409 447 410 448 defp perform(%State{} = state, %Send{} = send) do ··· 464 502 state 465 503 end 466 504 467 - defp set_current_process(%State{} = state, pid) do 468 - mref = Process.monitor(pid) 469 - %{state | current: {pid, mref}} 505 + defp set_current_process(%State{} = state, pid) when is_pid(pid) do 506 + %{state | current: pid} 470 507 end 471 508 472 509 defp send_message(%State{} = state, %Send{} = send) do 473 510 dest_pid = 474 511 case send.dest do 512 + # TODO: for local names, send/2 actually raises if the name is not registered 475 513 name when is_atom(name) -> 514 + # TODO: use node from sender 476 515 case ProcRegistry.whereis(state.proc_registry, :nonode, name) do 516 + {:ok, pid} -> pid 517 + {:error, :not_found} -> nil 518 + end 519 + 520 + {node, name} when is_atom(node) and is_atom(name) -> 521 + case ProcRegistry.whereis(state.proc_registry, node, name) do 477 522 {:ok, pid} -> pid 478 523 {:error, :not_found} -> nil 479 524 end ··· 567 612 # Kill processes 568 613 Enum.each(pids_to_kill, fn pid -> 569 614 Process.exit(pid, reason) 570 - # TODO: receive :EXIT 615 + receive do 616 + {:EXIT, ^pid, ^reason} -> :noop 617 + after 618 + # Sanity to stop the scheduler hanging, this should never happen 619 + 1000 -> raise "Process #{inspect(pid)} failed to exit" 620 + end 571 621 end) 572 622 573 623 # Dispatch monitors for all killed processes ··· 627 677 628 678 defp log(%State{} = state, event) do 629 679 SimLog.log(state.log_server_pid, event) 680 + end 681 + 682 + @spec do_spawn_apply(module, atom, list) :: pid 683 + defp do_spawn_apply(m, f, a) do 684 + scheduler_pid = self() 685 + seed = random_seed() 686 + spawn_link(fn -> 687 + Hobbes.Construct.SimServer.set_scheduler_pid(scheduler_pid) 688 + :rand.seed(:exsss, seed) 689 + apply(m, f, a) 690 + end) 630 691 end 631 692 632 693 defp random_seed, do: Enum.random(1..1_000_000)
+5 -4
lib/construct/scheduler/proc_store.ex
··· 6 6 defmodule ProcState do 7 7 @type t :: %__MODULE__{ 8 8 pid: pid, 9 + node: atom, 9 10 monitors: %{reference => pid}, 10 11 monitored_by: [{reference, pid}], 11 12 linked_to: [pid], ··· 13 14 queue_key: ProcQueue.key | nil, 14 15 await: {ProcStore.check_fun, term} | nil, 15 16 } 16 - @enforce_keys [:pid] 17 + @enforce_keys [:pid, :node] 17 18 defstruct [ 18 19 monitors: %{}, 19 20 monitored_by: [], ··· 38 39 } 39 40 end 40 41 41 - @spec add_process(t, pid) :: :ok 42 - def add_process(%ProcStore{} = ps, pid) when is_pid(pid) do 43 - state = %ProcState{pid: pid} 42 + @spec add_process(t, pid, atom) :: :ok 43 + def add_process(%ProcStore{} = ps, pid, node) when is_pid(pid) and is_atom(node) do 44 + state = %ProcState{pid: pid, node: node} 44 45 45 46 :ets.insert(ps.proc_table, {pid, state}) 46 47 :ok
+8
lib/construct/sim_server.ex
··· 337 337 end 338 338 end 339 339 340 + @spec start_node(atom, module, term) :: :ok 341 + def start_node(name, app_module, args) when is_atom(name) and is_atom(app_module) do 342 + if not simulated?(), do: raise "start_node/3 can only be called in simulation" 343 + scheduler_pid = fetch_scheduler_pid!() 344 + 345 + Scheduler.start_node(scheduler_pid, name, app_module, args) 346 + end 347 + 340 348 @spec monitor(pid) :: reference 341 349 def monitor(monitor_pid) do 342 350 case get_scheduler_pid() do
+18
lib/construct/sim_supervisor.ex
··· 1 + defmodule Hobbes.Construct.SimSupervisor do 2 + use Hobbes.Construct.SimServer 3 + 4 + def start_link(children, opts) do 5 + SimServer.start_link(__MODULE__, {children, opts}) 6 + end 7 + 8 + def init({children, _opts}) do 9 + # TODO: trap exits? 10 + Enum.map(children, &spawn_child/1) 11 + {:ok, nil} 12 + end 13 + 14 + defp spawn_child({module, arg} = _child_spec) do 15 + %{start: {m, f, a}} = module.child_spec(arg) 16 + apply(m, f, a) 17 + end 18 + end
+70 -17
lib/hobbes.ex
··· 1 1 defmodule Hobbes do 2 - alias Hobbes.Servers.{Coordinator, ServerSupervisor, Manager} 2 + alias Hobbes.Construct.SimServer 3 + alias Hobbes.ClusterNode 4 + alias Hobbes.Servers.{Coordinator, Manager} 3 5 alias Hobbes.Structs.Cluster 4 6 7 + defmodule AppShim do 8 + use Application 9 + 10 + alias Hobbes.Construct.SimSupervisor 11 + 12 + def start(_type, [config]) do 13 + children = [ 14 + {Hobbes.ClusterNode, config} 15 + ] 16 + SimSupervisor.start_link(children, max_restarts: 0) 17 + end 18 + end 19 + 20 + # Sim-only, relies on simulated nodes 21 + defp init_distributed_cluster(num_coordinators) do 22 + coordinators = 23 + Enum.map(0..(num_coordinators - 1), fn i -> 24 + {String.to_atom("node#{i}"), String.to_atom("coordinator-#{i}")} 25 + end) 26 + 27 + coordinators 28 + |> Enum.with_index() 29 + |> Enum.each(fn {{node, _name}, i} -> 30 + config = [ 31 + cluster: "cluster", 32 + coordinators: coordinators, 33 + coordinator_id: i, 34 + slots: [ 35 + ], 36 + ] 37 + 38 + SimServer.start_node(node, Hobbes.AppShim, [config]) 39 + end) 40 + 41 + coordinators 42 + end 43 + 44 + defp init_local_cluster(num_coordinators) do 45 + coordinators = 46 + Enum.map(0..(num_coordinators - 1), fn i -> 47 + String.to_atom("coordinator-#{i}") 48 + end) 49 + 50 + coordinators 51 + |> Enum.with_index() 52 + |> Enum.each(fn {_name, i} -> 53 + config = [ 54 + cluster: "cluster", 55 + coordinators: coordinators, 56 + coordinator_id: i, 57 + slots: [ 58 + ], 59 + ] 60 + 61 + ClusterNode.start_link(config) 62 + end) 63 + 64 + coordinators 65 + end 66 + 5 67 defp default_opts do 6 68 [ 7 69 num_coordinators: 3, ··· 34 96 def start_cluster(opts) do 35 97 num_coordinators = Keyword.get(opts, :num_coordinators, 3) 36 98 37 - coordinator_names = 38 - Enum.map(0..(num_coordinators - 1), fn i -> 39 - {i, String.to_atom("coordinator-#{i}")} 40 - end) 41 - 42 99 coordinators = 43 - Enum.map(coordinator_names, fn {id, name} -> 44 - {:ok, pid} = Coordinator.start_link(id, coordinator_names, name: name) 45 - %{id: id, name: name, pid: pid} 46 - end) 100 + case SimServer.simulated?() do 101 + true -> init_distributed_cluster(num_coordinators) 102 + false -> init_local_cluster(num_coordinators) 103 + end 47 104 48 - coordinator_pids = Enum.map(coordinators, fn %{pid: pid} -> pid end) 105 + SimServer.sleep(1000) 49 106 50 107 config = config_pairs(opts) 51 - Coordinator.write(hd(coordinator_pids), config) 52 - 53 - for _i <- 1..Keyword.get(opts, :num_supervisors, 3) do 54 - {:ok, _pid} = ServerSupervisor.start_link(coordinators: coordinator_pids) 55 - end 108 + {:ok, :ok} = Coordinator.write(hd(coordinators), config) 56 109 57 110 { 58 111 :ok, 59 - coordinator_pids, 112 + coordinators, 60 113 } 61 114 end 62 115
+6 -4
lib/servers/coordinator.ex
··· 288 288 def init(%{id: id, replicas: replicas}) do 289 289 SimServer.flag(:trap_exit, true) 290 290 291 - replica_map = Map.new(replicas, fn {id, name} when is_integer(id) and is_atom(name) -> 292 - {id, name} 293 - end) 294 - replica_ids = replica_map |> Map.keys() |> Enum.sort() 291 + replica_map = 292 + replicas 293 + |> Enum.with_index() 294 + |> Map.new(fn {name, i} -> {i, name} end) 295 + 296 + replica_ids = Enum.to_list(0..(length(replicas) - 1)) 295 297 296 298 {kv_mod, kv} = case SimServer.simulated?() do 297 299 true -> {FlatStorageKV, FlatStorageKV.new("/coordinator_#{Integer.to_string(id)}.kv")}
+2 -1
lib/servers/manager.ex
··· 124 124 } = 125 125 case Coordinator.inc_generation(state.primary_coordinator) do 126 126 {:ok, {:ok, result}} -> result 127 - {:ok, {:error, :no_config}} -> exit(:normal) 127 + {:ok, {:error, :no_config}} -> exit(:shutdown) 128 + {:error, _err} -> exit(:shutdown) 128 129 end 129 130 130 131 assert is_integer(generation)
+3 -3
test/hobbes_test.exs
··· 261 261 262 262 SimServer.sleep(1_000) 263 263 264 - assert {:error, {:not_primary, :"coordinator-0"}} = Coordinator.write(c2, "foo", "bar") 265 - assert {:error, {:not_primary, :"coordinator-0"}} = Coordinator.write(c3, "foo", "bar") 264 + assert {:error, {:not_primary, {_, :"coordinator-0"}}} = Coordinator.write(c2, "foo", "bar") 265 + assert {:error, {:not_primary, {_, :"coordinator-0"}}} = Coordinator.write(c3, "foo", "bar") 266 266 267 267 assert {:ok, _} = Coordinator.write(c1, "foo", "bar") 268 268 assert {:ok, _} = Coordinator.write(c1, "hello", "world") ··· 274 274 275 275 SimServer.sleep(3_000) 276 276 277 - assert {:error, {:not_primary, :"coordinator-1"}} = Coordinator.read(c3, "foo") 277 + assert {:error, {:not_primary, {_, :"coordinator-1"}}} = Coordinator.read(c3, "foo") 278 278 279 279 assert {:ok, "bar"} = Coordinator.read(c2, "foo") 280 280