this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Rebalance shards away from overfilled teams in Distributor

garrison c4e7ef95 a504e9b6

+75 -3
+2 -1
ROADMAP.md
··· 22 22 - [ ] Add SQLite KV for real storage 23 23 - [ ] Data distribution 24 24 - [ ] Move shards from failing teams 25 - - [ ] Move shards from overfilled teams (Mountain Chopper) 25 + - [X] Move shards from overfilled teams (Mountain Chopper) 26 26 - [ ] Move shards *to* underfilled teams (Valley Filler) 27 27 - [ ] Move undersized shards to their siblings for merging 28 28 - [ ] Merge shards ··· 30 30 ### Housekeeping 31 31 32 32 - [ ] Store replication factor per TLogGeneration and remove all hard-coded references 33 + - [ ] Add error handling to ReadWrite transactions 33 34 34 35 ### Testing 35 36
+68 -1
lib/servers/distributor.ex
··· 81 81 @tick_shard_moves_interval_ms 1000 82 82 83 83 @shard_max_size_bytes 50_000_000 84 + #@shard_max_size_bytes 20_000_000 84 85 85 86 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 86 87 ··· 221 222 end 222 223 223 224 defp scan_teams(%State{} = state) do 224 - _team_stats = 225 + team_stats = 225 226 state.storage_teams 226 227 |> Enum.map(fn %StorageTeam{} = team -> 227 228 storage_info = Enum.map(team.storage_ids, fn id -> Map.get(state.storage_servers, id) end) ··· 233 234 {team, %{used_bytes: used_bytes, free_bytes: free_bytes}} 234 235 end) 235 236 237 + total_used_bytes = Enum.sum_by(team_stats, fn {_team, %{used_bytes: used_bytes}} -> used_bytes end) 238 + avg_bytes_per_team = floor(total_used_bytes / length(team_stats)) 239 + 240 + state = chop_overfilled_teams(state, team_stats, avg_bytes_per_team) 241 + 236 242 state 243 + end 244 + 245 + # Team must have used_bytes > (avg_bytes_per_team + @overfilled_threshold_bytes) 246 + # i.e. team must be overfilled by at least N shards worth of bytes 247 + # (this also helpfully prevents rebalancing a nearly empty cluster) 248 + @overfilled_threshold_bytes (@shard_max_size_bytes * 3) 249 + # Limit shard move concurrency per-team 250 + # Prevents overloading and improves distribution since we don't currently take in-flight shard bytes into account 251 + @max_team_outgoing_shards 2 252 + @max_team_incoming_shards 1 253 + 254 + defp chop_overfilled_teams(%State{} = state, team_stats, avg_bytes_per_team) when is_list(team_stats) and is_integer(avg_bytes_per_team) do 255 + overfilled_teams = 256 + Enum.filter(team_stats, fn {_team, %{used_bytes: used_bytes}} -> 257 + used_bytes > (avg_bytes_per_team + @overfilled_threshold_bytes) 258 + end) 259 + 260 + Enum.reduce(overfilled_teams, state, fn {%StorageTeam{} = team, _stats}, %State{} = state -> 261 + movable_shards = movable_shards_for_team(state, team) 262 + 263 + # Move shards off of this overfilled team until we hit max outgoing shards or we run out of valid target teams 264 + Enum.reduce_while(movable_shards, state, fn %Shard{} = shard, %State{} = state -> 265 + with true <- (count_outgoing_shards_for_team(state, team) < @max_team_outgoing_shards), 266 + {:ok, %StorageTeam{} = target_team} <- random_underfilled_team(state, team_stats, avg_bytes_per_team) 267 + do 268 + {_result, %State{} = state} = begin_shard_move(state, shard.start_key, target_team.storage_ids) 269 + {:cont, state} 270 + else 271 + _ -> {:halt, state} 272 + end 273 + end) 274 + end) 275 + end 276 + 277 + defp random_underfilled_team(%State{} = state, team_stats, avg_bytes_per_team) when is_list(team_stats) and is_integer(avg_bytes_per_team) do 278 + team_stats 279 + |> Enum.filter(fn {team, %{used_bytes: used_bytes}} -> 280 + (used_bytes < (avg_bytes_per_team / 2)) and (count_incoming_shards_for_team(state, team) < @max_team_incoming_shards) 281 + end) 282 + |> case do 283 + [] -> 284 + :error 285 + teams -> 286 + {%StorageTeam{} = team, _stats} = Enum.random(teams) 287 + {:ok, team} 288 + end 289 + end 290 + 291 + defp movable_shards_for_team(%State{shard_map: shard_map}, %StorageTeam{} = team) do 292 + ShardInfoMap.list_shards(shard_map) 293 + |> Enum.filter(fn %Shard{} = shard -> 294 + shard.from_server_ids == team.storage_ids and shard.to_server_ids == [] 295 + end) 296 + end 297 + 298 + defp count_outgoing_shards_for_team(%State{shard_moves: shard_moves}, %StorageTeam{} = team) do 299 + Enum.count(shard_moves, fn %ShardMove{} = move -> move.from_server_ids == team.storage_ids end) 300 + end 301 + 302 + defp count_incoming_shards_for_team(%State{shard_moves: shard_moves}, %StorageTeam{} = team) do 303 + Enum.count(shard_moves, fn %ShardMove{} = move -> move.to_server_ids == team.storage_ids end) 237 304 end 238 305 239 306 defp scan_and_split_shards(%State{} = state) do
+5 -1
lib/servers/storage.ex
··· 705 705 state 706 706 end 707 707 708 + # TODO: buggify down in sim 709 + @import_key_limit 4 710 + #@import_key_limit 5_000 711 + 708 712 defp tick_import(%ShardImport{} = shard_import, %State{} = state) do 709 713 assert state.cluster.status == :normal 710 714 ··· 717 721 with {:ok, [[{_sk, _ek, {_ids, from_pids}}]]} <- CommitBuffer.get_shards_multi(buf_pid, [{shard_import.start_key, shard_import.end_key}]), 718 722 {:ok, from_storage_pid} <- random_not_nil(from_pids) 719 723 do 720 - Storage.read_range_async(from_storage_pid, shard_import.nonce, shard_import.current_read_version, shard_import.current_end_key, shard_import.end_key, limit: 4) 724 + Storage.read_range_async(from_storage_pid, shard_import.nonce, shard_import.current_read_version, shard_import.current_end_key, shard_import.end_key, limit: @import_key_limit) 721 725 722 726 SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 300) 723 727 state