this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add initial storage teams to config and rename teams prefix

garrison 32ba7fbb c4e7ef95

+28 -32
+2 -2
ROADMAP.md
··· 15 15 - [X] Spawn servers using slots in Manager 16 16 - [ ] Add unknown servers to meta keyspace in Distributor 17 17 - [ ] Build teams from servers in Distributor 18 + - [ ] Remove dead storage servers 18 19 - [ ] Logs 19 20 - [ ] Add logging system 20 21 - [ ] Add omniscient logger to Construct ··· 23 24 - [ ] Data distribution 24 25 - [ ] Move shards from failing teams 25 26 - [X] Move shards from overfilled teams (Mountain Chopper) 26 - - [ ] Move shards *to* underfilled teams (Valley Filler) 27 - - [ ] Move undersized shards to their siblings for merging 27 + - [ ] Move shards to underfilled teams (Valley Filler) 28 28 - [ ] Merge shards 29 29 30 30 ### Housekeeping
+3 -12
lib/hobbes.ex
··· 39 39 tlog: [ 40 40 "/tlog_1", 41 41 ], 42 - storage: [ 43 - "/storage_1", 44 - "/storage_2", 45 - "/storage_3", 46 - "/storage_4", 47 - ], 42 + storage: 1..4 |> Enum.map(&"/storage_#{&1}"), 48 43 ], 49 44 ] 50 45 ··· 72 67 tlog: [ 73 68 "/tlog_1", 74 69 ], 75 - storage: [ 76 - "/storage_1", 77 - "/storage_2", 78 - "/storage_3", 79 - "/storage_4", 80 - ], 70 + storage: 1..4 |> Enum.map(&"/storage_#{&1}"), 81 71 ], 82 72 ] 83 73 ··· 94 84 num_commit_buffers: 3, 95 85 num_tlogs: 3, 96 86 num_storage: 6, 87 + num_storage_teams: 2, 97 88 ] 98 89 end 99 90
+2 -2
lib/servers/distributor.ex
··· 99 99 SimServer.cast(server, {:storage_ping, self(), storage_id, storage_stats, shard_stats}) 100 100 end 101 101 102 - def init(%{id: id, cluster: %Cluster{} = cluster, team_servers_pairs: team_servers_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}) do 102 + def init(%{id: id, cluster: %Cluster{} = cluster, storage_teams_pairs: storage_teams_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}) do 103 103 assert is_list(key_servers_pairs) 104 104 assert is_integer(next_shard_move_id) 105 105 assert next_shard_move_id >= 0 106 106 107 107 storage_teams = 108 - Enum.map(team_servers_pairs, fn {team_servers_prefix() <> k, v} -> 108 + Enum.map(storage_teams_pairs, fn {storage_teams_prefix() <> k, v} -> 109 109 [team_id] = Keyset.unpack(k) 110 110 storage_ids = Keyset.unpack(v) 111 111 %StorageTeam{id: team_id, storage_ids: storage_ids}
+14 -9
lib/servers/manager.ex
··· 44 44 num_commit_buffers: pos_integer, 45 45 num_tlogs: pos_integer, 46 46 num_storage: pos_integer, 47 + num_storage_teams: pos_integer, 47 48 num_replicas: 1 | 2 | 3, 48 49 initial_shards: [binary], 49 50 } ··· 52 53 :num_commit_buffers, 53 54 :num_tlogs, 54 55 :num_storage, 56 + :num_storage_teams, 55 57 :num_replicas, 56 58 :initial_shards, 57 59 ] ··· 263 265 # to form the seed storage team 264 266 min_storage_zones = state.config.num_replicas 265 267 268 + # TODO: use explicit failure zones rather than assuming nodes are independent 266 269 collected_storage_zones = 267 270 Enum.count(state.supervisors, fn {_pid, %{storage: storage}} -> 268 271 storage > 0 ··· 329 332 open_tlog_slots = Enum.sum_by(supervisors_tlogs, fn {_pid, slots} -> slots end) 330 333 open_storage_slots = Enum.sum_by(supervisors_storage, fn {_pid, slots} -> slots end) 331 334 335 + # 5 -> BeginBuffer, CommitBuffer, Sequencer, Resolver, Distributor 332 336 assert open_stateless_slots >= 5 333 337 assert open_tlog_slots >= state.config.num_replicas 334 - # TODO: assert storage slots are in independent zones 335 - assert open_storage_slots >= state.config.num_replicas 338 + # TODO: update this assert once we have configurable zones (see maybe_bootstrap TODO) 339 + assert Enum.count(supervisors_storage, fn {_pid, slots} -> slots > 0 end) >= state.config.num_replicas 336 340 337 341 # 3 -> Sequencer, Resolver, Distributor 338 342 {num_begin_buffers, num_commit_buffers} = compute_buffer_counts(open_stateless_slots - 3) ··· 340 344 %Config{} = config = state.config 341 345 342 346 ids = allocate_server_ids(state, 343 - storage: open_storage_slots, 347 + storage: min(open_storage_slots, config.num_storage_teams * config.num_replicas), 344 348 tlog: min(open_tlog_slots, config.num_tlogs), 345 349 begin_buffer: min(num_begin_buffers, config.num_begin_buffers), 346 350 commit_buffer: min(num_commit_buffers, config.num_commit_buffers), ··· 613 617 %Cluster{} = cluster = state.cluster 614 618 615 619 # Load meta pairs from meta_kv 616 - team_servers_pairs = FlatKV.scan(meta_kv, team_servers_prefix(), team_servers_end()).pairs 620 + storage_teams_pairs = FlatKV.scan(meta_kv, storage_teams_prefix(), storage_teams_end()).pairs 617 621 key_servers_pairs = FlatKV.scan(meta_kv, key_servers_prefix(), key_servers_end()).pairs 618 622 shard_moves_pairs = FlatKV.scan(meta_kv, shard_moves_prefix(), shard_moves_end()).pairs 619 623 [next_shard_move_id] = FlatKV.get(meta_kv, next_shard_move_id_key()) |> Keyset.unpack() ··· 622 626 [ 623 627 [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: prev_version}}], 624 628 [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: prev_version}}], 625 - [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, team_servers_pairs: team_servers_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 629 + [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, storage_teams_pairs: storage_teams_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 626 630 Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 627 631 Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 628 632 ] ··· 752 756 num_commit_buffers: Map.fetch!(config_map, "config/num_commit_buffers") |> String.to_integer(), 753 757 num_tlogs: Map.fetch!(config_map, "config/num_tlogs") |> String.to_integer(), 754 758 num_storage: Map.fetch!(config_map, "config/num_storage") |> String.to_integer(), 759 + num_storage_teams: Map.fetch!(config_map, "config/num_storage_teams") |> String.to_integer(), 755 760 num_replicas: 3, 756 761 initial_shards: initial_shards, 757 762 } ··· 801 806 } 802 807 end) 803 808 804 - team_servers_pairs = 809 + storage_teams_pairs = 805 810 storage_ids 806 811 |> Enum.chunk_every(num_replicas, num_replicas, :discard) 807 812 |> Enum.with_index() 808 813 |> Enum.map(fn {server_ids, team_id} -> 809 814 assert length(server_ids) == num_replicas 810 815 { 811 - team_servers_prefix() <> Keyset.pack([team_id]), 816 + storage_teams_prefix() <> Keyset.pack([team_id]), 812 817 Keyset.pack(server_ids), 813 818 } 814 819 end) 815 820 816 - assert length(team_servers_pairs) > 0 821 + assert length(storage_teams_pairs) > 0 817 822 818 - server_tag_pairs ++ team_servers_pairs 823 + server_tag_pairs ++ storage_teams_pairs 819 824 end 820 825 821 826 defp build_key_meta(%Config{initial_shards: shards}, storage_ids) do
+2 -2
lib/utils.ex
··· 23 23 defmacro all_keys_prefix, do: "" 24 24 defmacro all_keys_end, do: meta_end() 25 25 26 - defmacro team_servers_prefix, do: "\xFF/team_servers/" 27 - defmacro team_servers_end, do: "\xFF/team_servers0" 26 + defmacro storage_teams_prefix, do: "\xFF/storage_teams/" 27 + defmacro storage_teams_end, do: "\xFF/storage_teams0" 28 28 defmacro server_tags_prefix, do: "\xFF/st/" 29 29 defmacro server_tags_end, do: "\xFF/st0" 30 30
+5 -5
test/hobbes_test.exs
··· 108 108 duration_ms: 20_000, 109 109 ]}, 110 110 ], HobbesTest.SimOpts.sim_opts(name: test, cluster_opts: [ 111 - num_storage: 12, 111 + num_storage_teams: 4, 112 112 num_tlogs: 6, 113 113 114 114 initial_shards: [ ··· 144 144 ], HobbesTest.SimOpts.sim_opts(name: test, cluster_opts: [ 145 145 num_commit_buffers: 6, 146 146 num_tlogs: 6, 147 - num_storage: 12, 147 + num_storage_teams: 4, 148 148 initial_shards: [ 149 149 "", 150 150 "key10", ··· 183 183 ], HobbesTest.SimOpts.sim_opts(name: test, cluster_opts: [ 184 184 num_commit_buffers: 6, 185 185 num_tlogs: 6, 186 - num_storage: 12, 186 + num_storage_teams: 4, 187 187 initial_shards: [ 188 188 "", 189 189 "key10", ··· 256 256 ], HobbesTest.SimOpts.sim_opts(name: test, simulated: false, count: 1, cluster_opts: [ 257 257 num_commit_buffers: 6, 258 258 num_tlogs: 6, 259 - num_storage: 12, 259 + num_storage_teams: 4, 260 260 initial_shards: [ 261 261 "", 262 262 "4", "8", "C", ··· 290 290 291 291 num_commit_buffers: 3, 292 292 num_tlogs: 3, 293 - num_storage: 6, 293 + num_storage_teams: 2, 294 294 ] 295 295 296 296 {:ok, coordinators} = Hobbes.start_cluster(opts)