this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix non-sim tests, exit CommitBuffer cleanly, and test node restarts

garrison dbc5bb11 72875cd8

+83 -33
+4
ROADMAP.md
··· 6 6 7 7 - [X] BeginBuffer server (batch get_read_version requests) 8 8 - [X] Use slots to spawn servers 9 + - [ ] Range clears 10 + - [ ] Add clear_range() to Transaction 11 + - [ ] Slice range clear mutations in CommitBuffer 12 + - [ ] Apply range clears on Storage servers 9 13 - [ ] Storage server handling 10 14 - [X] Spawn servers using slots in Manager 11 15 - [ ] Add unknown servers to meta keyspace in Distributor
+11 -1
lib/hobbes.ex
··· 68 68 coordinators: coordinators, 69 69 coordinator_id: i, 70 70 slots: [ 71 + stateless: 6, 72 + tlog: [ 73 + "/tlog_1", 74 + ], 75 + storage: [ 76 + "/storage_1", 77 + "/storage_2", 78 + "/storage_3", 79 + "/storage_4", 80 + ], 71 81 ], 72 82 ] 73 83 ··· 134 144 |> Enum.map(&Coordinator.get_manager_receive/1) 135 145 |> Enum.reduce([], fn 136 146 {:ok, result}, acc -> [result | acc] 137 - {:error, :not_primary}, acc -> acc 147 + {:error, _err}, acc -> acc 138 148 end) 139 149 |> case do 140 150 [_ | _] = results ->
+15 -3
lib/servers/commit_buffer.ex
··· 199 199 assert state.cluster.status == :normal 200 200 201 201 [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer) 202 - {commit_version, prev_commit_version} = Sequencer.get_commit_version(seq_pid) 202 + 203 + {commit_version, prev_commit_version} = 204 + case Sequencer.get_commit_version(seq_pid) do 205 + {:ok, {_cv, _pcv} = versions} -> versions 206 + {:error, :timeout} -> exit(:shutdown) 207 + end 203 208 204 209 transactions_reversed = state.buffer 205 210 ··· 234 239 } 235 240 236 241 [%Server{pid: resolver_pid}] = get_servers(state.cluster, Hobbes.Servers.Resolver) 237 - {txn_results_reversed, meta_log} = Resolver.resolve_batch(resolver_pid, batch) 242 + {txn_results_reversed, meta_log} = 243 + case Resolver.resolve_batch(resolver_pid, batch) do 244 + {:ok, result} -> result 245 + {:error, :timeout} -> exit(:shutdown) 246 + end 238 247 239 248 # Apply meta mutations received, including our own from this batch 240 249 Enum.each(meta_log, fn {_commit_version, mutations} -> ··· 298 307 end) 299 308 300 309 # Once all tlogs have replied (made durable), notify sequencer this version is committed 301 - :ok = Sequencer.notify_committed(seq_pid, commit_version) 310 + case Sequencer.notify_committed(seq_pid, commit_version) do 311 + :ok -> :noop 312 + {:error, :timeout} -> exit(:shutdown) 313 + end 302 314 303 315 # Reply to clients 304 316
+5 -5
lib/servers/coordinator.ex
··· 367 367 end 368 368 369 369 def handle_call(:get_manager, _from, %State{} = state) do 370 - case state.manager_pid do 371 - nil -> 372 - {:reply, {:error, :not_primary}, %State{} = state} 373 - pid when is_pid(pid) -> 374 - assert is_integer(state.manager_generation) 370 + case {state.manager_pid, state.manager_generation} do 371 + {pid, generation} when is_pid(pid) and is_integer(generation) -> 375 372 {:reply, {:ok, {pid, state.manager_generation}}, state} 373 + 374 + {_pid, _generation} -> 375 + {:reply, {:error, :no_manager}, %State{} = state} 376 376 end 377 377 end 378 378
+7 -3
lib/servers/resolver.ex
··· 36 36 Meta log is a list of batches of mutations applied by other 37 37 CommitBuffers since the last request from this CommitBuffer. 38 38 """ 39 - @spec resolve_batch(pid, %ResolveBatch{}) :: {[boolean], [{non_neg_integer, [Utils.mutation]}]} 39 + @spec resolve_batch(pid, %ResolveBatch{}) :: {:ok, {[boolean], [{non_neg_integer, [Utils.mutation]}]}} | {:error, :timeout} 40 40 def resolve_batch(server, %ResolveBatch{} = batch) do 41 - SimServer.call(server, {:resolve_batch, batch}) 41 + try do 42 + {:ok, _} = SimServer.call(server, {:resolve_batch, batch}) 43 + catch 44 + :exit, {:timeout, _mfa} -> {:error, :timeout} 45 + end 42 46 end 43 47 44 48 def init(%{id: id, cluster: %Cluster{} = cluster, prev_version: prev_version}) do ··· 132 136 133 137 # Note: we intentionally reply with results reversed because it's 134 138 # more efficient on CommitBuffer 135 - SimServer.reply(from, {results, reply_meta_mutations}) 139 + SimServer.reply(from, {:ok, {results, reply_meta_mutations}}) 136 140 137 141 %State{state | 138 142 version: commit_version,
+22 -13
lib/servers/sequencer.ex
··· 25 25 {100, 99} 26 26 27 27 """ 28 - @spec get_commit_version(pid) :: {integer, integer} 29 - def get_commit_version(server), do: SimServer.call(server, :get_commit_version) 28 + @spec get_commit_version(pid) :: {:ok, {non_neg_integer, non_neg_integer}} | {:error, :timeout} 29 + def get_commit_version(server) do 30 + try do 31 + SimServer.call(server, :get_commit_version) 32 + catch 33 + :exit, {:timeout, _mfa} -> {:error, :timeout} 34 + end 35 + end 30 36 31 37 @spec get_read_version(pid) :: {:ok, integer} | {:error, :timeout} 32 38 def get_read_version(server) do ··· 38 44 end 39 45 end 40 46 41 - @spec notify_committed(pid, integer) :: :ok 47 + @spec notify_committed(pid, integer) :: :ok | {:error, :timeout} 42 48 def notify_committed(server, commit_version) when is_integer(commit_version) do 43 - SimServer.call(server, {:notify_committed, commit_version}) 49 + try do 50 + :ok = SimServer.call(server, {:notify_committed, commit_version}) 51 + catch 52 + :exit, {:timeout, _mfa} -> {:error, :timeout} 53 + end 44 54 end 45 55 46 56 def init(%{id: id, cluster: %Cluster{} = cluster, prev_version: prev_version}) do ··· 56 66 {:ok, state} 57 67 end 58 68 59 - def handle_call(:get_commit_version, _from, state) do 69 + def handle_call(:get_commit_version, _from, %State{} = state) do 60 70 time = SimServer.current_time() 61 71 elapsed_us = time - state.last_time 62 72 ··· 67 77 last_version = state.last_version 68 78 version = last_version + advance_by 69 79 70 - { 71 - :reply, 72 - {version, last_version}, 73 - %State{state | last_version: version, last_time: time}, 74 - } 80 + state = %{state | last_version: version, last_time: time} 81 + {:reply, {:ok, {version, last_version}}, state} 75 82 end 76 83 77 - def handle_call(:get_read_version, _from, state) do 84 + def handle_call(:get_read_version, _from, %State{} = state) do 78 85 {:reply, state.known_commit_version, state} 79 86 end 80 87 81 - def handle_call({:notify_committed, commit_version}, _from, state) when is_integer(commit_version) and commit_version > 0 do 88 + def handle_call({:notify_committed, commit_version}, _from, %State{} = state) when is_integer(commit_version) and commit_version > 0 do 82 89 max_known = max(state.known_commit_version, commit_version) 83 - {:reply, :ok, %State{state | known_commit_version: max_known}} 90 + 91 + state = %{state | known_commit_version: max_known} 92 + {:reply, :ok, state} 84 93 end 85 94 86 95 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do
+15 -5
lib/servers/server_supervisor.ex
··· 204 204 end 205 205 206 206 defp load_stateful_slots(%State{} = state) do 207 - state 208 - |> load_tlog_slots() 209 - |> load_storage_slots() 207 + # TODO: check if cluster is in-memory instead 208 + case SimServer.simulated?() do 209 + true -> 210 + state 211 + |> load_tlog_slots() 212 + |> load_storage_slots() 213 + 214 + false -> 215 + %{state | 216 + open_tlog: state.slots.tlog, 217 + open_storage: state.slots.storage, 218 + } 219 + end 210 220 end 211 221 212 222 defp load_tlog_slots(%State{} = state) do ··· 257 267 state = %{state | open_tlog: rest} 258 268 259 269 kv_path = slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 260 - assert not SimFile.exists?(kv_path) 270 + if SimServer.simulated?(), do: assert not SimFile.exists?(kv_path) 261 271 262 272 arg = Map.put(arg, :path, kv_path) 263 273 restart_arg = %{path: kv_path} ··· 275 285 state = %{state | open_storage: rest} 276 286 277 287 kv_path = slot_path <> "/storage.kv" 278 - assert not SimFile.exists?(kv_path) 288 + if SimServer.simulated?(), do: assert not SimFile.exists?(kv_path) 279 289 280 290 arg = Map.put(arg, :path, kv_path) 281 291 restart_arg = %{path: kv_path}
+4 -3
test/hobbes_test.exs
··· 119 119 client_tick_ms: 100, 120 120 duration_ms: 14_000, 121 121 ]}, 122 - {Workloads.KillServers, [ 123 - delay_ms: 7_000, 124 - duration_ms: 0, 122 + {Workloads.RestartNodes, [ 123 + delay_ms: 7000, 124 + tick_ms: 1000, 125 + count: 1, 125 126 ]}, 126 127 ], HobbesTest.SimOpts.sim_opts(name: test, cluster_opts: [ 127 128 num_commit_buffers: 6,