this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Separate in-memory status from simulation

garrison c7521973 3f6dc5f3

+65 -30
+4 -2
lib/cluster_node.ex
··· 18 18 :coordinator_id, 19 19 :initial_cluster_opts, 20 20 21 + :in_memory?, 21 22 :slots, 22 23 ] 23 24 defstruct @enforce_keys ··· 29 30 coordinator_names: Keyword.fetch!(opts, :coordinators), 30 31 coordinator_id: Keyword.get(opts, :coordinator_id), 31 32 initial_cluster_opts: Keyword.fetch!(opts, :initial_cluster_opts), 33 + in_memory?: Keyword.fetch!(opts, :in_memory?), 32 34 slots: Keyword.fetch!(opts, :slots), 33 35 } 34 36 end ··· 76 78 pid 77 79 end 78 80 79 - defp start_server_supervisor(%{coordinator_names: coordinators, slots: slots}) do 80 - {:ok, pid} = ServerSupervisor.start_link(%{coordinators: coordinators, slots: slots}) 81 + defp start_server_supervisor(%{coordinator_names: coordinators, in_memory?: in_memory?, slots: slots}) do 82 + {:ok, pid} = ServerSupervisor.start_link(%{coordinators: coordinators, in_memory?: in_memory?, slots: slots}) 81 83 pid 82 84 end 83 85
+29 -7
lib/sandbox.ex
··· 18 18 end 19 19 end 20 20 21 - defp default_slots do 21 + defp default_slots(:memory) do 22 22 [ 23 23 stateless: 6, 24 - tlog: [ 25 - "/tlog_1", 26 - ], 27 - storage: 1..4 |> Enum.map(&"/storage_#{&1}"), 24 + tlog: 1..1 |> Enum.map(fn _ -> :memory end), 25 + storage: 1..4 |> Enum.map(fn _ -> :memory end), 26 + ] 27 + end 28 + 29 + defp default_slots(path) when is_binary(path) do 30 + [ 31 + stateless: 6, 32 + tlog: 1..1 |> Enum.map(&"#{path}/tlog#{&1}"), 33 + storage: 1..4 |> Enum.map(&"#{path}/storage_#{&1}"), 28 34 ] 29 35 end 30 36 ··· 32 38 # Starting a distributed cluster is sim-only for now 33 39 assert Sim.simulated?() 34 40 41 + in_memory? = Keyword.get(opts, :in_memory?, false) 42 + cluster_path = case in_memory? do 43 + true -> :memory 44 + false -> "/cluster" 45 + end 46 + 35 47 coordinators = Enum.map(0..(num_coordinators - 1), fn i -> 36 48 # TODO: name should include cluster name 37 49 {String.to_atom("coordinator-#{i}"), String.to_atom("node-#{i}")} ··· 46 58 coordinator_id: i, 47 59 initial_cluster_opts: opts, 48 60 61 + in_memory?: in_memory?, 49 62 # TODO: configurable 50 - slots: default_slots(), 63 + slots: default_slots(cluster_path), 51 64 ] 52 65 53 66 mfa = {Hobbes.Sandbox.AppShim, :start, [nil, [node_config]]} ··· 64 77 String.to_atom("coordinator-#{i}") 65 78 end) 66 79 80 + in_memory? = Keyword.get(opts, :in_memory?, false) 81 + cluster_path = case in_memory? do 82 + true -> :memory 83 + false -> "/cluster" 84 + end 85 + 67 86 coordinators 68 87 |> Enum.with_index() 69 88 |> Enum.each(fn {_name, i} -> ··· 73 92 coordinator_id: i, 74 93 initial_cluster_opts: opts, 75 94 95 + in_memory?: in_memory?, 76 96 # TODO: configurable 77 - slots: default_slots(), 97 + slots: default_slots(cluster_path), 78 98 ] 79 99 80 100 ClusterNode.start_link(node_config) ··· 89 109 90 110 coordinators = 91 111 if Keyword.get(opts, :distributed) do 112 + opts = Keyword.put(opts, :in_memory?, false) 92 113 init_distributed_cluster(num_coordinators, opts) 93 114 else 115 + opts = Keyword.put(opts, :in_memory?, true) 94 116 init_local_cluster(num_coordinators, opts) 95 117 end 96 118
+32 -21
lib/servers/server_supervisor.ex
··· 1 1 defmodule Hobbes.Servers.ServerSupervisor do 2 2 use GenServer 3 - alias Trinity.{Sim, SimProcess, SimServer, SimFile} 3 + alias Trinity.{SimProcess, SimServer, SimFile} 4 4 5 5 import ExUnit.Assertions, only: [assert: 1] 6 6 ··· 12 12 defmodule State do 13 13 @type t :: %__MODULE__{ 14 14 coordinators: [pid], 15 + in_memory?: boolean, 16 + slots: map, 17 + 15 18 manager_pid: pid | nil, 16 19 manager_generation: non_neg_integer | -1, 17 20 cluster: Cluster.t, ··· 21 24 } 22 25 @enforce_keys [ 23 26 :coordinators, 27 + :in_memory?, 24 28 :slots, 25 29 26 30 :open_stateless, ··· 58 62 SimServer.cast(server, {:reply_current_manager, response}) 59 63 end 60 64 61 - def init(%{coordinators: coordinators, slots: config_slots}) do 65 + def init(%{coordinators: coordinators, in_memory?: in_memory?, slots: config_slots}) do 62 66 SimProcess.flag(:trap_exit, true) 63 67 64 68 slots = %{ ··· 70 74 assert is_list(slots.tlog) 71 75 assert is_list(slots.storage) 72 76 77 + if in_memory? do 78 + Enum.each(slots.tlog, fn slot -> assert slot == :memory end) 79 + Enum.each(slots.storage, fn slot -> assert slot == :memory end) 80 + else 81 + Enum.each(slots.tlog, fn slot -> assert is_binary(slot) end) 82 + Enum.each(slots.storage, fn slot -> assert is_binary(slot) end) 83 + end 84 + 73 85 state = %State{ 74 86 coordinators: coordinators, 87 + in_memory?: in_memory?, 75 88 slots: slots, 89 + 76 90 last_manager_message_timestamp: current_time(), 77 91 78 92 open_stateless: slots.stateless, ··· 211 225 end 212 226 213 227 defp load_stateful_slots(%State{} = state) do 214 - # TODO: check if cluster is in-memory instead 215 - case Sim.simulated?() do 228 + case state.in_memory? do 216 229 true -> 217 - state 218 - |> load_tlog_slots() 219 - |> load_storage_slots() 220 - 221 - false -> 222 230 %{state | 223 231 open_tlog: state.slots.tlog, 224 232 open_storage: state.slots.storage, 225 233 } 234 + 235 + false -> 236 + state 237 + |> load_tlog_slots() 238 + |> load_storage_slots() 226 239 end 227 240 end 228 241 ··· 302 315 303 316 defp start_child(%State{} = state, generation, module, arg) when module == Hobbes.Servers.TLog do 304 317 case state.open_tlog do 305 - [slot_path | rest] -> 318 + [slot | rest] -> 306 319 state = %{state | open_tlog: rest} 307 320 308 - # TODO: check for in-memory cluster instead 309 - kv_path = case Sim.simulated?() do 310 - true -> slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 311 - false -> :memory 321 + kv_path = case slot do 322 + :memory -> :memory 323 + path when is_binary(path) -> path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 312 324 end 313 - if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 325 + if is_binary(kv_path), do: assert not SimFile.exists?(kv_path) 314 326 315 327 arg = Map.put(arg, :path, kv_path) 316 328 restart_arg = %{path: kv_path} ··· 324 336 325 337 defp start_child(%State{} = state, generation, module, arg) when module == Hobbes.Servers.Storage do 326 338 case state.open_storage do 327 - [slot_path | rest] -> 339 + [slot | rest] -> 328 340 state = %{state | open_storage: rest} 329 341 330 - # TODO: check for in-memory cluster instead 331 - kv_path = case Sim.simulated?() do 332 - true -> slot_path <> "/storage.kv" 333 - false -> :memory 342 + kv_path = case slot do 343 + :memory -> :memory 344 + path when is_binary(path) -> path <> "/storage.kv" 334 345 end 335 - if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 346 + if is_binary(kv_path), do: assert not SimFile.exists?(kv_path) 336 347 337 348 arg = Map.put(arg, :path, kv_path) 338 349 restart_arg = %{path: kv_path}