···404404 tag = state.id
405405 start_version = state.data_version + 1
406406407407- buddy_server = Map.get(state.cluster.servers, buddy_tlog_id(generation, state.id))
407407+ buddy_server = Map.get(state.cluster.servers, buddy_tlog_id(generation.tlog_ids, state.id))
408408409409 case state.buddy_tlog_failed or is_nil(buddy_server) do
410410 true ->
···431431 %TLogGeneration{} = generation = current_tlog_generation(state)
432432 assert result.tlog_id in generation.tlog_ids
433433434434- case result.tlog_id == buddy_tlog_id(generation, state.id) do
434434+ case result.tlog_id == buddy_tlog_id(generation.tlog_ids, state.id) do
435435 true ->
436436 apply_buddy_peek_result(state, result)
437437
+2-53
lib/utils.ex
···33 Hobbes utils.
44 """
5566- alias Hobbes.Structs.{Cluster, TLogGeneration, Server}
66+ alias Hobbes.Structs.{Cluster, Server}
77 alias Hobbes.Construct.SimServer
8899 @type mutation :: {:write, binary, binary} | {:clear, binary} | {:clear_range, binary, binary}
···119119 end
120120 end
121121122122- @doc """
123123- Returns the preferred tlog index for `tag`.
124124- """
125125- @spec preferred_tlog_i(pos_integer, integer) :: integer
126126- def preferred_tlog_i(num_tlogs, tag) when is_integer(num_tlogs) and num_tlogs > 0 and is_integer(tag) do
127127- rem(tag, num_tlogs)
128128- end
129129-130130- @spec buddy_tlog_id(TLogGeneration.t | [non_neg_integer], non_neg_integer) :: non_neg_integer
131131- def buddy_tlog_id(%TLogGeneration{} = generation, server_id), do: buddy_tlog_id(generation.tlog_ids, server_id)
132132-122122+ @spec buddy_tlog_id([non_neg_integer], non_neg_integer) :: non_neg_integer
133123 def buddy_tlog_id(tlog_ids, server_id) when is_list(tlog_ids) and is_integer(server_id) do
134124 index = rem(server_id, length(tlog_ids))
135125 Enum.at(tlog_ids, index)
136136- end
137137-138138- @spec tlogs_for_tags(pos_integer, pos_integer, [integer]) :: [non_neg_integer]
139139- def tlogs_for_tags(num_tlogs, num_replicas, tags)
140140- when is_integer(num_tlogs) and is_integer(num_replicas) and (num_tlogs >= num_replicas) and is_list(tags) do
141141- # TODO: this function is called for literally every mutation - can we optimize it?
142142- tlogs = Enum.map(tags, fn t -> rem(t, num_tlogs) end) |> Enum.uniq()
143143- missing = num_replicas - length(tlogs)
144144- case missing <= 0 do
145145- true ->
146146- tlogs
147147- false ->
148148- all_tlogs = Enum.to_list(0..(num_tlogs - 1))
149149- Enum.reduce(1..missing, tlogs, fn _i, acc ->
150150- choice = SimServer.deterministic_random(all_tlogs -- acc)
151151- [choice | acc]
152152- end)
153153- end
154154- end
155155-156156- defp tlog_ids_for_tags(tlog_ids, tags, replication_factor) do
157157- # TODO: refactor tlogs_for_tags to do this
158158- tlogs_for_tags(length(tlog_ids), replication_factor, tags)
159159- |> Enum.map(&Enum.at(tlog_ids, &1))
160160- end
161161-162162- def slice_mutations_for_tlogs(tagged_mutations, all_tlog_ids, replication_factor)
163163- when is_list(tagged_mutations) and is_list(all_tlog_ids) and is_integer(replication_factor) and replication_factor >= 1 do
164164- tlog_mutations = Map.new(all_tlog_ids, fn id -> {id, []} end)
165165-166166- tagged_mutations
167167- |> Enum.reduce(tlog_mutations, fn {tags, _mut} = tm, acc ->
168168- case meta_tag() in tags do
169169- true -> all_tlog_ids
170170- false -> tlog_ids_for_tags(all_tlog_ids, tags, replication_factor)
171171- end
172172- |> Enum.reduce(acc, fn id, acc ->
173173- Map.update!(acc, id, &[tm | &1])
174174- end)
175175- end)
176176- |> Map.new(fn {id, mutations} -> {id, Enum.reverse(mutations)} end)
177126 end
178127179128 @doc """
-23
test/utils_test.exs
···3131 end
3232 end
33333434- describe "tlogs_for_tags/3" do
3535- test "returns tlogs" do
3636- assert [1, 2, 3] = Utils.tlogs_for_tags(10, 3, [101, 102, 103])
3737- assert [_, 0, 1] = l1 = Utils.tlogs_for_tags(10, 3, [100, 200, 201])
3838- assert length(Enum.uniq(l1)) == 3
3939- end
4040-4141- test "handles large number of collisions" do
4242- # All 100 tags will have the same preferred tlog, so the function must
4343- # choose the other 99 tlogs randomly, one by one
4444- tags = Enum.map(1..100, fn i -> i * 100 end)
4545- tlogs = Utils.tlogs_for_tags(100, 100, tags)
4646-4747- assert length(Enum.uniq(tlogs)) == 100
4848- end
4949-5050- test "raises if there are not enough tlogs" do
5151- assert_raise FunctionClauseError, fn ->
5252- Utils.tlogs_for_tags(9, 10, [1, 2, 3])
5353- end
5454- end
5555- end
5656-5734 describe "merge_pairs/2" do
5835 test "merges" do
5936 l1 = [