this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Seed shards by storage team and refactor ShardTagMap load

garrison febfe410 726ca9f7

+75 -83
+2 -2
lib/servers/commit_buffer.ex
··· 102 102 end 103 103 end 104 104 105 - def init(%{id: id, cluster: %Cluster{} = cluster, key_servers_pairs: key_servers_pairs}) do 105 + def init(%{id: id, cluster: %Cluster{} = cluster, key_storage_pairs: key_storage_pairs}) do 106 106 %TLogGeneration{} = current_generation = hd(cluster.tlog_generations) 107 107 assert current_generation.generation == cluster.generation 108 108 ··· 119 119 storage_servers: %{}, 120 120 } 121 121 122 - ShardTagMap.load_meta_pairs(state.shard_map, key_servers_pairs) 122 + ShardTagMap.load_meta_pairs(state.shard_map, key_storage_pairs) 123 123 124 124 SimServer.send_after(self(), :flush, @flush_interval_ms) 125 125 {:ok, state}
+25 -36
lib/servers/manager.ex
··· 671 671 672 672 # Load meta pairs from meta_kv 673 673 storage_teams_pairs = FlatKV.scan(meta_kv, storage_teams_prefix(), storage_teams_end()).pairs 674 - key_servers_pairs = FlatKV.scan(meta_kv, key_servers_prefix(), key_servers_end()).pairs 674 + key_storage_pairs = FlatKV.scan(meta_kv, key_storage_prefix(), key_storage_end()).pairs 675 675 shard_moves_pairs = FlatKV.scan(meta_kv, shard_moves_prefix(), shard_moves_end()).pairs 676 676 [next_shard_move_id] = FlatKV.get(meta_kv, next_shard_move_id_key()) |> Keyset.unpack() 677 + # TODO: remove once distributor can handle key_storage_pairs 678 + key_servers_pairs = [] 677 679 678 680 # Recruit stateless servers 679 681 [ ··· 681 683 [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: prev_version}}], 682 684 [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, storage_teams_pairs: storage_teams_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 683 685 Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 684 - Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 686 + Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_storage_pairs: key_storage_pairs}} end), 685 687 ] 686 688 |> Enum.concat() 687 689 |> recruit_servers(supervisor_slots, cluster.generation) ··· 831 833 |> Enum.reverse() 832 834 end 833 835 834 - defp build_seed_meta(%Config{} = config, storage_ids) do 835 - server_pairs = build_server_meta(config, storage_ids) 836 - key_pairs = build_key_meta(config, storage_ids) 836 + defp build_seed_meta(%Config{} = config, storage_server_ids) do 837 + num_storage_teams = div(length(storage_server_ids), config.num_replicas) 838 + storage_team_ids = Enum.to_list(0..(num_storage_teams - 1)) 839 + 840 + storage_pairs = build_storage_meta(config, storage_team_ids, storage_server_ids) 841 + shard_pairs = build_shard_meta(config, storage_team_ids) 837 842 838 843 other_pairs = [ 839 844 {next_shard_move_id_key(), Keyset.pack([0])}, 840 845 ] 841 846 842 - Enum.sort(server_pairs ++ key_pairs ++ other_pairs) 847 + Enum.sort(storage_pairs ++ shard_pairs ++ other_pairs) 843 848 end 844 849 845 - defp build_server_meta(%Config{num_replicas: num_replicas}, storage_ids) do 850 + defp build_storage_meta(%Config{num_replicas: num_replicas}, storage_team_ids, storage_server_ids) do 846 851 server_tag_pairs = 847 - storage_ids 848 - |> Enum.map(fn id -> 852 + Enum.map(storage_server_ids, fn id -> 849 853 { 854 + # TODO: move to utils and pack id 850 855 server_tags_prefix() <> Integer.to_string(id), 851 856 "", 852 857 } 853 858 end) 854 859 860 + storage_server_chunks = Enum.chunk_every(storage_server_ids, num_replicas, num_replicas, :discard) 861 + assert length(storage_server_chunks) == length(storage_team_ids) 862 + 855 863 storage_teams_pairs = 856 - storage_ids 857 - |> Enum.chunk_every(num_replicas, num_replicas, :discard) 858 - |> Enum.with_index() 859 - |> Enum.map(fn {server_ids, team_id} -> 864 + Enum.zip(storage_team_ids, storage_server_chunks) 865 + |> Enum.map(fn {team_id, server_ids} -> 860 866 assert length(server_ids) == num_replicas 861 - { 862 - storage_teams_prefix() <> Keyset.pack([team_id]), 863 - Keyset.pack(server_ids), 864 - } 867 + pack_storage_team_pair(team_id, server_ids) 865 868 end) 866 869 867 870 assert length(storage_teams_pairs) > 0 ··· 869 872 server_tag_pairs ++ storage_teams_pairs 870 873 end 871 874 872 - defp build_key_meta(%Config{initial_shards: shards}, storage_ids) do 873 - teams = Enum.chunk_every(storage_ids, 3, 3, :discard) 874 - 875 + defp build_shard_meta(%Config{initial_shards: shards}, storage_team_ids) do 875 876 shards 876 877 |> Enum.with_index() 877 878 |> Enum.map(fn {shard_key, i} -> 878 - shard_server_ids = Enum.at(teams, rem(i, length(teams))) 879 - end_key = Enum.at(shards, i + 1, all_keys_end()) 879 + team_id = Enum.at(storage_team_ids, rem(i, length(storage_team_ids))) 880 + _end_key = Enum.at(shards, i + 1, all_keys_end()) 880 881 881 - key_servers_pair = { 882 - key_servers_prefix() <> shard_key, 883 - pack_key_servers(shard_server_ids, []), 884 - } 885 - 886 - server_keys_pairs = 887 - Enum.map(shard_server_ids, fn id -> 888 - { 889 - server_keys_prefix() <> pack_server_keys_key(id, shard_key), 890 - pack_server_keys_value(nil, end_key, "complete"), 891 - } 892 - end) 893 - 894 - [key_servers_pair] ++ server_keys_pairs 882 + key_storage_pair = pack_key_storage_pair(shard_key, team_id, nil) 883 + [key_storage_pair] 895 884 end) 896 885 |> Enum.concat() 897 886 end
+23 -27
lib/shard_tag_map.ex
··· 37 37 @spec apply_metadata_mutations(t, [Utils.mutation]) :: :ok 38 38 def apply_metadata_mutations(%ShardTagMap{} = stm, mutations) when is_list(mutations) do 39 39 %{shard_map: shard_map, tlog_ids: tlog_ids, replication_factor: rf} = stm 40 + tlog_teams = Enum.chunk_every(tlog_ids, rf, rf, :discard) 41 + tlog_team_count = length(tlog_teams) 42 + 40 43 Enum.each(mutations, fn 41 - {:write, key_servers_prefix() <> start_key, value} -> 42 - {shard_all_tags, shard_from_tags} = server_tags_from_key_servers_value(value) 43 - shard_tlog_ids = tlog_ids_for_servers(tlog_ids, shard_all_tags, rf) 44 + {:write, key_storage_prefix() <> start_key, value} -> 45 + [from_storage_team_id, to_storage_team_id] = unpack_key_storage_value(value) 44 46 45 - DenseShardMap.put(shard_map, start_key, {shard_tlog_ids, shard_all_tags, shard_from_tags}) 47 + value = {_, _, _} = 48 + case to_storage_team_id do 49 + nil -> 50 + { 51 + Enum.at(tlog_teams, rem(from_storage_team_id, tlog_team_count)), 52 + [from_storage_team_id], 53 + [from_storage_team_id], 54 + } 55 + _ -> 56 + { 57 + Enum.at(tlog_teams, rem(from_storage_team_id, tlog_team_count)) + Enum.at(tlog_teams, rem(to_storage_team_id, tlog_team_count)), 58 + [from_storage_team_id, to_storage_team_id], 59 + [from_storage_team_id], 60 + } 61 + end 46 62 47 - {:clear, key_servers_prefix() <> start_key} -> 63 + DenseShardMap.put(shard_map, start_key, value) 64 + 65 + {:clear, key_storage_prefix() <> start_key} -> 48 66 DenseShardMap.delete(shard_map, start_key) 49 67 50 68 _mut -> :noop ··· 115 133 116 134 def shards_for_key_or_range(%ShardTagMap{} = stm, {start_key, end_key}) do 117 135 DenseShardMap.shards_for_range(stm.shard_map, start_key, end_key) 118 - end 119 - 120 - # Returns {all_tags, from_tags} where all_tags is (from_tags ++ to_tags) 121 - defp server_tags_from_key_servers_value(value) do 122 - [from_ids, to_ids] = unpack_key_servers(value) 123 - {from_ids ++ to_ids, from_ids} 124 - end 125 - 126 - @doc false 127 - @spec tlog_ids_for_servers([non_neg_integer], [non_neg_integer], non_neg_integer) :: [non_neg_integer] 128 - def tlog_ids_for_servers(tlog_ids, server_ids, min_replicas) do 129 - ids = server_ids |> Enum.map(&buddy_tlog_id(tlog_ids, &1)) |> Enum.uniq() 130 - length = length(ids) 131 - 132 - case length < min_replicas do 133 - true -> 134 - needed = min_replicas - length 135 - remaining = tlog_ids -- ids 136 - ids ++ Enum.take_random(remaining, needed) 137 - 138 - false -> ids 139 - end 140 136 end 141 137 142 138 @doc false
+25
lib/utils.ex
··· 30 30 defmacro server_tags_prefix, do: "\xFF/st/" 31 31 defmacro server_tags_end, do: "\xFF/st0" 32 32 33 + defmacro key_storage_prefix, do: "\xFF/ks/" 34 + defmacro key_storage_end, do: "\xFF/ks0" 35 + 33 36 defmacro key_servers_prefix, do: "\xFF/key_servers/" 34 37 defmacro key_servers_end, do: "\xFF/key_servers0" 35 38 defmacro server_keys_prefix, do: "\xFF/sk/" ··· 91 94 92 95 # Special range clears are not currently supported 93 96 def special_mutation?({:clear_range, _sk, _ek}), do: false 97 + 98 + @spec pack_storage_team_pair(non_neg_integer, [non_neg_integer]) :: {binary, binary} 99 + def pack_storage_team_pair(storage_team_id, storage_server_ids) 100 + when is_integer(storage_team_id) and is_list(storage_server_ids) do 101 + { 102 + storage_teams_prefix() <> Keyset.pack([storage_team_id]), 103 + Keyset.pack(storage_server_ids), 104 + } 105 + end 106 + 107 + def pack_key_storage_pair(shard_start_key, from_storage_team_id, to_storage_team_id) 108 + when is_binary(shard_start_key) and is_integer(from_storage_team_id) and (is_integer(to_storage_team_id) or is_nil(to_storage_team_id)) do 109 + { 110 + key_storage_prefix() <> shard_start_key, 111 + Keyset.pack([from_storage_team_id, to_storage_team_id]), 112 + } 113 + end 114 + 115 + @spec unpack_key_storage_value(binary) :: [non_neg_integer | nil] 116 + def unpack_key_storage_value(value) when is_binary(value) do 117 + [_to_storage_team_id, _from_storage_team_id] = Keyset.unpack(value) 118 + end 94 119 95 120 @spec pack_key_servers([integer], [integer]) :: binary 96 121 def pack_key_servers(from_ids, to_ids) when is_list(from_ids) and is_list(to_ids) do
-18
test/shard_tag_map_test.exs
··· 1 1 defmodule Hobbes.ShardTagMapTest do 2 2 use ExUnit.Case, async: true 3 3 4 - alias Hobbes.ShardTagMap 5 - 6 4 @moduletag :shard_tag_map 7 - 8 - describe "tlog_ids_for_servers/2" do 9 - test "returns tlog ids" do 10 - # TODO: this test will be flaky if it ever fails, it should really be a fuzz test 11 - # but for now the function is so simple it's not worth testing further 12 - tlog_ids = [0, 1, 2, 3, 4, 5] 13 - 14 - assert [0, 1, 2] = ShardTagMap.tlog_ids_for_servers(tlog_ids, [0, 1, 2], 3) 15 - 16 - assert [1, 2, _] = ids1 = ShardTagMap.tlog_ids_for_servers(tlog_ids, [1, 7, 8], 3) 17 - assert length(Enum.uniq(ids1)) == 3 18 - 19 - assert [1, _, _] = ids2 = ShardTagMap.tlog_ids_for_servers(tlog_ids, [1, 7, 13], 3) 20 - assert length(Enum.uniq(ids2)) == 3 21 - end 22 - end 23 5 end