this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add coordinators to Cluster and refresh on timeout in Cycle

+73 -11
+24 -1
lib/hobbes.ex
··· 1 1 defmodule Hobbes do 2 - alias Hobbes.Servers.{Coordinator, ServerSupervisor} 2 + alias Hobbes.Servers.{Coordinator, ServerSupervisor, Manager} 3 + alias Hobbes.Structs.Cluster 3 4 4 5 defp default_opts do 5 6 [ ··· 57 58 :ok, 58 59 coordinator_pids, 59 60 } 61 + end 62 + 63 + @spec refresh_cluster(Cluster.t) :: Cluster.t 64 + def refresh_cluster(%Cluster{} = cluster) do 65 + cluster.coordinators 66 + |> Enum.map(&Coordinator.get_manager_send/1) 67 + |> Enum.map(&Coordinator.get_manager_receive/1) 68 + |> Enum.reduce([], fn 69 + {:ok, result}, acc -> [result | acc] 70 + {:error, :not_primary}, acc -> acc 71 + end) 72 + |> case do 73 + [_ | _] = results -> 74 + {manager_pid, _generation} = Enum.max_by(results, fn {_pid, generation} -> generation end) 75 + case Manager.get_cluster(manager_pid) do 76 + {:ok, %Cluster{} = new_cluster} -> new_cluster 77 + {:error, _err} -> cluster 78 + end 79 + 80 + # Received no responses 81 + [] -> cluster 82 + end 60 83 end 61 84 end
+5 -1
lib/servers/commit_buffer.ex
··· 283 283 TLog.write_batch_send(tlog_pid, log_batch) 284 284 end) 285 285 |> Enum.each(fn req_id -> 286 - :ok = TLog.write_batch_receive(req_id) 286 + case TLog.write_batch_receive(req_id) do 287 + :ok -> :noop 288 + # If a commit fails, we trigger recovery (or are already recovering) 289 + {:error, :timeout} -> exit(:shutdown) 290 + end 287 291 end) 288 292 289 293 # Once all tlogs have replied (made durable), notify sequencer this version is committed
+22 -2
lib/servers/coordinator.ex
··· 145 145 SimServer.call(server, :get_manager) 146 146 end 147 147 148 + @spec get_manager_send(term) :: SimServer.request_id 149 + def get_manager_send(server) do 150 + SimServer.send_request(server, :get_manager) 151 + end 152 + 153 + @spec get_manager_receive(SimServer.request_id) :: {:ok, {pid, non_neg_integer}} | {:error, :not_primary | :timeout} 154 + def get_manager_receive(req_id) do 155 + case SimServer.receive_response(req_id, 1000) do 156 + {:reply, reply} -> reply 157 + :timeout -> {:error, :timeout} 158 + end 159 + end 160 + 148 161 def inc_generation(server) do 149 162 SimServer.call(server, {:request, :inc_generation}) 150 163 end ··· 298 311 end 299 312 300 313 def handle_call(:get_manager, _from, %State{} = state) do 301 - {:reply, {:ok, state.manager_pid}, state} 314 + case state.manager_pid do 315 + nil -> 316 + {:reply, {:error, :not_primary}, %State{} = state} 317 + pid when is_pid(pid) -> 318 + assert is_integer(state.manager_generation) 319 + {:reply, {:ok, {pid, state.manager_generation}}, state} 320 + end 302 321 end 303 322 304 323 def handle_call({:track_manager_generation, pid, generation}, _from, %State{} = state) when is_pid(pid) and is_integer(generation) do ··· 986 1005 assert state.manager_pid == nil 987 1006 assert state.manager_generation == nil 988 1007 989 - {:ok, manager_pid} = Manager.start_link(%{primary_coordinator: self()}) 1008 + coordinators = Enum.map(state.replica_ids, &Map.fetch!(state.replica_map, &1)) 1009 + {:ok, manager_pid} = Manager.start_link(%{coordinators: coordinators, primary_coordinator: self()}) 990 1010 %{state | manager_pid: manager_pid} 991 1011 end 992 1012 end
+11 -4
lib/servers/manager.ex
··· 11 11 12 12 defmodule State do 13 13 @type t :: %__MODULE__{ 14 + coordinators: [term], 14 15 primary_coordinator: pid, 15 16 generation: non_neg_integer | nil, 16 17 supervisors: [pid], ··· 22 23 recovery_started_timestamp: integer | nil, 23 24 } 24 25 25 - @enforce_keys [:primary_coordinator] 26 + @enforce_keys [:coordinators, :primary_coordinator] 26 27 defstruct [ 27 28 generation: nil, 28 29 supervisors: [], ··· 68 69 SimServer.cast(server, {:tlog_ping, status}) 69 70 end 70 71 71 - def get_cluster(server) do 72 - SimServer.call(server, :get_cluster) 72 + def get_cluster(server, timeout \\ 5000) do 73 + try do 74 + SimServer.call(server, :get_cluster, timeout) 75 + catch 76 + :exit, {:timeout, _} -> {:error, :timeout} 77 + end 73 78 end 74 79 75 - def init(%{primary_coordinator: primary_coordinator}) do 80 + def init(%{coordinators: coordinators, primary_coordinator: primary_coordinator}) do 76 81 state = %State{ 82 + coordinators: coordinators, 77 83 primary_coordinator: primary_coordinator, 78 84 } 79 85 SimServer.send(self(), :begin) ··· 128 134 tlog_generations = load_generations(generation_pairs) 129 135 130 136 cluster = %Cluster{ 137 + coordinators: state.coordinators, 131 138 generation: generation, 132 139 tlog_generations: tlog_generations, 133 140 status: :recovering,
+2
lib/structs.ex
··· 8 8 9 9 defmodule Cluster do 10 10 @type t :: %__MODULE__{ 11 + coordinators: [term], 11 12 generation: non_neg_integer, 12 13 tlog_generations: [TLogGeneration.t], 13 14 status: :recovering | :normal, ··· 15 16 servers: %{non_neg_integer => Hobbes.Structs.Server.t}, 16 17 } 17 18 @enforce_keys [ 19 + :coordinators, 18 20 :generation, 19 21 :tlog_generations, 20 22 :status,
+1 -1
lib/workloads.ex
··· 145 145 146 146 {:ok, coordinator_pids} = Hobbes.start_cluster(cluster_opts) 147 147 SimServer.sleep(1_000) 148 - {:ok, manager_pid} = Coordinator.get_manager(hd(coordinator_pids)) 148 + {:ok, {manager_pid, _gen}} = Coordinator.get_manager(hd(coordinator_pids)) 149 149 {:ok, %Cluster{} = cluster} = Manager.get_cluster(manager_pid) 150 150 151 151 context = %{cluster: cluster}
+5
lib/workloads/cycle.ex
··· 101 101 state = update_in(state.commit_latencies, fn list -> [duration | list] end) 102 102 inc_stat(state, :success) 103 103 else 104 + {:error, :timeout} -> 105 + cluster = Hobbes.refresh_cluster(state.cluster) 106 + state = %{state | cluster: cluster} 107 + inc_stat(state, :timeout) 108 + 104 109 {:error, :read_version_too_old} -> inc_stat(state, :rv_too_old) 105 110 {:error, :read_version_too_new} -> inc_stat(state, :rv_too_new) 106 111 {:error, :database_locked} -> inc_stat(state, :db_lock)
+3 -2
test/hobbes_test.exs
··· 13 13 14 14 @count 1 15 15 #@count 10 16 + #@count 30 16 17 #@count 100 17 18 18 19 @seed nil ··· 189 190 defp setup_cluster(_context) do 190 191 {:ok, coordinator_pids} = Hobbes.start_cluster([]) 191 192 SimServer.sleep(1_000) 192 - {:ok, manager_pid} = Coordinator.get_manager(hd(coordinator_pids)) 193 + {:ok, {manager_pid, _gen}} = Coordinator.get_manager(hd(coordinator_pids)) 193 194 {:ok, %Cluster{} = cluster} = Manager.get_cluster(manager_pid) 194 195 195 196 %{cluster: cluster} ··· 213 214 214 215 SimServer.sleep(1_000) 215 216 216 - {:ok, manager_pid} = Coordinator.get_manager(hd(coordinators)) 217 + {:ok, {manager_pid, _gen}} = Coordinator.get_manager(hd(coordinators)) 217 218 {:ok, %Cluster{} = cluster} = Manager.get_cluster(manager_pid) 218 219 219 220 Transaction.new!(cluster)