this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Re-replicate batches to new TLogs during recovery

+162 -21
+9
lib/meta_store.ex
··· 38 38 alias Hobbes.Utils 39 39 import Hobbes.Utils 40 40 41 + @type t :: %__MODULE__{ 42 + kv: term, 43 + } 41 44 @enforce_keys [:kv] 42 45 defstruct @enforce_keys 43 46 ··· 116 119 @spec new(:ets.table) :: %MetaStore{} 117 120 def new(kv \\ MemKV.new()) do 118 121 %MetaStore{kv: kv} 122 + end 123 + 124 + @spec load(t, [{binary, binary}]) :: :ok 125 + def load(%MetaStore{} = ms, pairs) when is_list(pairs) do 126 + apply_meta_mutations(ms, 0, Enum.map(pairs, fn {k, v} -> {:write, k, v} end)) 127 + :ok 119 128 end 120 129 121 130 @doc """
+64 -12
lib/servers/manager.ex
··· 318 318 319 319 # Copy mutations to new TLogs 320 320 new_tlog_generation = %TLogGeneration{generation: state.cluster.generation, tlog_ids: tlog_ids} 321 - tags = extract_tags(meta_pairs) 322 - state = copy_mutations_to_new_generation(state, hd(state.cluster.tlog_generations), new_tlog_generation, tags, max_kcv, recovery_version) 321 + state = copy_mutations_to_new_generation(state, hd(state.cluster.tlog_generations), new_tlog_generation, meta_pairs, max_kcv, recovery_version) 323 322 324 323 # Recruit the rest of the new transaction system 325 324 state = ··· 351 350 |> Enum.sort() 352 351 end 353 352 354 - defp copy_mutations_to_new_generation(%State{} = state, %TLogGeneration{} = old_gen, %TLogGeneration{} = _new_gen, tags, start_version, end_version) do 353 + defp copy_mutations_to_new_generation(%State{} = state, %TLogGeneration{} = old_gen, %TLogGeneration{} = new_gen, meta_pairs, start_version, end_version) do 354 + tags = extract_tags(meta_pairs) 355 + 355 356 recovered_old_tlog_pids = 356 357 old_gen.tlog_ids 357 358 |> Enum.filter(fn id -> ··· 362 363 pid 363 364 end) 364 365 365 - Enum.each(tags, fn tag -> 366 - batches = peek_merge(recovered_old_tlog_pids, tag, start_version, end_version) 367 - dbg {tag, batches} 368 - # TODO: write to new TLogs 366 + batches = 367 + recovered_old_tlog_pids 368 + |> Enum.map(fn pid -> 369 + TLog.peek_many_send(pid, tags, start_version, end_version) 370 + end) 371 + |> Enum.map(&TLog.peek_many_receive/1) 372 + |> merge_batches() 373 + |> then(fn batches -> 374 + [{0, []} | batches] 375 + end) 376 + |> Enum.chunk_every(2, 1, :discard) 377 + |> Enum.map(fn [{prev_version, _}, {commit_version, mutations}] -> 378 + %{ 379 + prev_version: prev_version, 380 + commit_version: commit_version, 381 + mutations: mutations, 382 + } 383 + end) 384 + 385 + meta_store = MetaStore.new() 386 + MetaStore.load(meta_store, meta_pairs) 387 + 388 + Enum.each(batches, fn batch -> 389 + write_batch_to_tlogs(state.cluster, new_gen, batch, meta_store) 369 390 end) 370 391 371 392 state 372 393 end 373 394 374 - defp peek_merge(tlog_pids, tag, start_version, end_version) when is_list(tlog_pids) and is_integer(tag) do 375 - Enum.map(tlog_pids, fn pid -> 376 - {{_sv, _ev}, batches} = TLog.peek(pid, tag, start_version, end_version) 377 - batches 395 + defp write_batch_to_tlogs(%Cluster{} = cluster, %TLogGeneration{} = tlog_gen, batch, meta_store) when is_map(batch) do 396 + %{prev_version: prev_version, commit_version: commit_version, mutations: mutations} = batch 397 + 398 + # Apply meta mutations from the batch first 399 + meta_mutations = 400 + mutations 401 + |> Enum.filter(fn {_i, mut} -> meta_mutation?(mut) end) 402 + |> Enum.map(fn {_i, mut} -> mut end) 403 + MetaStore.apply_meta_mutations(meta_store, commit_version, meta_mutations) 404 + 405 + tagged_mutations = 406 + mutations 407 + |> Enum.map(fn {_i, mut} = num_mut -> 408 + tags = MetaStore.get_key_server_mutation_tags(meta_store, commit_version, mutation_key(mut)) 409 + {tags, num_mut} 410 + end) 411 + 412 + sliced_mutations = slice_mutations_for_tlogs(tagged_mutations, tlog_gen.tlog_ids, 3) 413 + 414 + tlog_gen.tlog_ids 415 + |> Enum.map(fn tlog_id -> 416 + log_batch = %LogBatch{ 417 + commit_buffer_id: nil, 418 + commit_version: commit_version, 419 + prev_commit_version: prev_version, 420 + tagged_mutations: Map.fetch!(sliced_mutations, tlog_id), 421 + # This is safe because we guarantee all of these mutations are replicated before completing the recovery 422 + last_committed_version: commit_version, 423 + } 424 + 425 + %Server{pid: tlog_pid} = Map.fetch!(cluster.servers, tlog_id) 426 + TLog.write_batch_send(tlog_pid, log_batch) 378 427 end) 379 - |> merge_batches() 428 + |> Enum.map(fn req_id -> 429 + :ok = TLog.write_batch_receive(req_id) 430 + end) 431 + :ok 380 432 end 381 433 382 434 defp recruit(%State{} = state, generation, module, ids, args) when is_atom(module) and is_list(ids) and is_map(args) do
+43
lib/servers/tlog.ex
··· 69 69 SimServer.receive_response(request_id) 70 70 end 71 71 72 + @doc """ 73 + Peeks many tags at once and merges the resulting batches to avoid duplicating mutations over the wire. 74 + 75 + Used by the Manager during recovery to quickly copy mutations. 76 + Split into send/receive functions for concurrent use. 77 + 78 + ## Examples 79 + 80 + iex> peek_many_send(server, [1, 2, 3], 100, 200) |> peek_many_receive() 81 + [ 82 + { 83 + 100, 84 + [ 85 + {0, {:write, "foo", "bar"}}, 86 + {1, {:clear, "foo"}}, 87 + ], 88 + }, 89 + {110, ...}, 90 + {130, ...}, 91 + ] 92 + 93 + """ 94 + @spec peek_many_send(term, [integer], non_neg_integer, non_neg_integer) :: SimServer.request_id 95 + def peek_many_send(server, tags, start_version, end_version) 96 + when is_list(tags) and is_integer(start_version) and is_integer(end_version) do 97 + SimServer.send_request(server, {:peek_many, tags, start_version, end_version}) 98 + end 99 + 100 + @spec peek_many_receive(SimServer.request_id) :: [{non_neg_integer, [Utils.numbered_mutation]}] 101 + def peek_many_receive(request_id) do 102 + SimServer.receive_response(request_id) 103 + end 104 + 72 105 def lock(server) do 73 106 SimServer.cast(server, :lock) 74 107 end ··· 99 132 100 133 batches = TaggedQueue.peek(state.tagged_queue, tag, start_version, end_version) 101 134 {:reply, {{start_version, end_version}, batches}, state} 135 + end 136 + 137 + def handle_call({:peek_many, tags, start_version, end_version}, _from, state) do 138 + batches = 139 + tags 140 + |> Enum.map(fn tag -> 141 + TaggedQueue.peek(state.tagged_queue, tag, start_version, end_version) 142 + end) 143 + |> merge_batches() 144 + {:reply, batches, state} 102 145 end 103 146 104 147 def handle_call({:write_batch, %LogBatch{} = batch}, from, %State{} = state) do
+23
lib/utils.ex
··· 91 91 end 92 92 end 93 93 94 + defp tlog_ids_for_tags(tlog_ids, tags, replication_factor) do 95 + # TODO: refactor tlogs_for_tags to do this 96 + tlogs_for_tags(length(tlog_ids), replication_factor, tags) 97 + |> Enum.map(&Enum.at(tlog_ids, &1)) 98 + end 99 + 100 + def slice_mutations_for_tlogs(tagged_mutations, all_tlog_ids, replication_factor) 101 + when is_list(tagged_mutations) and is_list(all_tlog_ids) and is_integer(replication_factor) and replication_factor >= 1 do 102 + tlog_mutations = Map.new(all_tlog_ids, fn id -> {id, []} end) 103 + 104 + tagged_mutations 105 + |> Enum.reduce(tlog_mutations, fn {tags, _mut} = tm, acc -> 106 + case meta_tag() in tags do 107 + true -> all_tlog_ids 108 + false -> tlog_ids_for_tags(all_tlog_ids, tags, replication_factor) 109 + end 110 + |> Enum.reduce(acc, fn id, acc -> 111 + Map.update!(acc, id, &[tm | &1]) 112 + end) 113 + end) 114 + |> Map.new(fn {id, mutations} -> {id, Enum.reverse(mutations)} end) 115 + end 116 + 94 117 @doc """ 95 118 Merges two sorted lists of key/value pairs. 96 119
+7 -5
lib/workloads/cycle.ex
··· 29 29 use Hobbes.Construct.SimServer 30 30 31 31 defmodule State do 32 - @enforce_keys [:cluster, :count, :stopped, :stats] 32 + @enforce_keys [:cluster, :count, :tick_ms, :stopped, :stats] 33 33 defstruct @enforce_keys 34 34 end 35 35 ··· 37 37 SimServer.start_link(__MODULE__, opts) 38 38 end 39 39 40 - def init(%{cluster: %Cluster{} = cluster, count: count}) when is_integer(count) do 41 - SimServer.send_after(self(), :tick, 100) 40 + def init(%{cluster: %Cluster{} = cluster, count: count, tick_ms: tick_ms}) when is_integer(count) do 41 + SimServer.send_after(self(), :tick, 0) 42 42 43 43 {:ok, %State{ 44 44 cluster: cluster, 45 45 count: count, 46 + tick_ms: tick_ms, 46 47 stopped: false, 47 48 stats: %{}, 48 49 }} ··· 60 61 61 62 def handle_info(:tick, %State{} = state) do 62 63 state = work(state) 63 - SimServer.send_after(self(), :tick, 100) 64 + SimServer.send_after(self(), :tick, state.tick_ms) 64 65 {:noreply, state} 65 66 end 66 67 ··· 137 138 def run(%{cluster: %Cluster{} = cluster}, opts) do 138 139 key_count = Keyword.get(opts, :keys, 20) 139 140 client_count = Keyword.get(opts, :clients, 10) 141 + client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 140 142 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 141 143 142 144 build_cycle(cluster, key_count) 143 145 144 146 clients = Enum.map(1..client_count, fn _i -> 145 - {:ok, pid} = Client.start_link(%{cluster: cluster, count: key_count}) 147 + {:ok, pid} = Client.start_link(%{cluster: cluster, count: key_count, tick_ms: client_tick_ms}) 146 148 pid 147 149 end) 148 150
+16 -4
test/hobbes_test.exs
··· 112 112 test "Cycle Recovery", %{test: test} do 113 113 Workloads.run([ 114 114 {Workloads.Cycle, [ 115 - keys: 20, 116 - clients: 10, 117 - duration_ms: 20_000, 115 + keys: 80, 116 + clients: 100, 117 + client_tick_ms: 100, 118 + duration_ms: 30_000, 118 119 ]}, 119 120 {Workloads.KillServers, [ 120 - delay_ms: 3_000, 121 + delay_ms: 10_000, 121 122 duration_ms: 20_000, 122 123 ]}, 123 124 ], HobbesTest.SimOpts.sim_opts(name: test, cluster_opts: [ 125 + num_commit_buffers: 6, 124 126 num_tlogs: 6, 125 127 num_storage: 12, 128 + initial_shards: [ 129 + "", 130 + "key10", 131 + "key20", 132 + "key30", 133 + "key40", 134 + "key50", 135 + "key60", 136 + "key70", 137 + ], 126 138 ])) 127 139 end 128 140 end