this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Improve distribution heuristics

garrison 80ff67ea 32ba7fbb

+57 -17
+4
config/config.exs
··· 1 + import Config 2 + 3 + config :logger, 4 + truncate: :infinity
+25 -13
lib/servers/distributor.ex
··· 81 81 @tick_shard_moves_interval_ms 1000 82 82 83 83 @shard_max_size_bytes 50_000_000 84 - #@shard_max_size_bytes 20_000_000 84 + #@shard_max_size_bytes 10_000_000 85 85 86 86 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 87 87 ··· 246 246 # i.e. team must be overfilled by at least N shards worth of bytes 247 247 # (this also helpfully prevents rebalancing a nearly empty cluster) 248 248 @overfilled_threshold_bytes (@shard_max_size_bytes * 3) 249 + # Only move shards from overfilled teams to teams with this many fewer bytes 250 + # (the emptiest available team will still be picked) 251 + @move_to_below_difference_bytes (@shard_max_size_bytes * 2) 249 252 # Limit shard move concurrency per-team 250 253 # Prevents overloading and improves distribution since we don't currently take in-flight shard bytes into account 251 254 @max_team_outgoing_shards 2 252 255 @max_team_incoming_shards 1 253 256 254 257 defp chop_overfilled_teams(%State{} = state, team_stats, avg_bytes_per_team) when is_list(team_stats) and is_integer(avg_bytes_per_team) do 255 - overfilled_teams = 256 - Enum.filter(team_stats, fn {_team, %{used_bytes: used_bytes}} -> 257 - used_bytes > (avg_bytes_per_team + @overfilled_threshold_bytes) 258 - end) 259 - 260 - Enum.reduce(overfilled_teams, state, fn {%StorageTeam{} = team, _stats}, %State{} = state -> 261 - movable_shards = movable_shards_for_team(state, team) 258 + team_stats 259 + |> Enum.filter(fn {_team, %{used_bytes: used_bytes}} -> 260 + used_bytes > (avg_bytes_per_team + @overfilled_threshold_bytes) 261 + end) 262 + # Prioritize the most-filled teams 263 + |> Enum.sort_by(fn {_team, %{used_bytes: used_bytes}} -> used_bytes end, :desc) 264 + |> Enum.reduce(state, fn {%StorageTeam{} = team, stats}, %State{} = state -> 265 + below_threshold = stats.used_bytes - @move_to_below_difference_bytes 262 266 263 267 # Move shards off of this overfilled team until we hit max outgoing shards or we run out of valid target teams 264 - Enum.reduce_while(movable_shards, state, fn %Shard{} = shard, %State{} = state -> 268 + movable_shards_for_team(state, team) 269 + |> Enum.shuffle() 270 + |> Enum.reduce_while(state, fn %Shard{} = shard, %State{} = state -> 265 271 with true <- (count_outgoing_shards_for_team(state, team) < @max_team_outgoing_shards), 266 - {:ok, %StorageTeam{} = target_team} <- random_underfilled_team(state, team_stats, avg_bytes_per_team) 272 + {:ok, %StorageTeam{} = target_team} <- best_team_below(state, team_stats, below_threshold) 267 273 do 274 + # Should be impossible due to threshold 275 + assert target_team != team 276 + 268 277 {_result, %State{} = state} = begin_shard_move(state, shard.start_key, target_team.storage_ids) 269 278 {:cont, state} 270 279 else ··· 274 283 end) 275 284 end 276 285 277 - defp random_underfilled_team(%State{} = state, team_stats, avg_bytes_per_team) when is_list(team_stats) and is_integer(avg_bytes_per_team) do 286 + defp best_team_below(%State{} = state, team_stats, threshold_bytes) when is_list(team_stats) and is_integer(threshold_bytes) do 278 287 team_stats 279 288 |> Enum.filter(fn {team, %{used_bytes: used_bytes}} -> 280 - (used_bytes < (avg_bytes_per_team / 2)) and (count_incoming_shards_for_team(state, team) < @max_team_incoming_shards) 289 + (used_bytes < threshold_bytes) and (count_incoming_shards_for_team(state, team) < @max_team_incoming_shards) 281 290 end) 291 + # This prevents a bias when multiple teams have the same used_bytes 292 + # (in practice this will only ever happen when there are multiple completely empty teams) 293 + |> Enum.shuffle() 282 294 |> case do 283 295 [] -> 284 296 :error 285 297 teams -> 286 - {%StorageTeam{} = team, _stats} = Enum.random(teams) 298 + {%StorageTeam{} = team, _stats} = Enum.min_by(teams, fn {_team, %{used_bytes: used_bytes}} -> used_bytes end) 287 299 {:ok, team} 288 300 end 289 301 end
+28 -4
lib/workloads/read_write.ex
··· 221 221 shards = list_shards(cluster) 222 222 shards_table = 223 223 shards 224 - |> Enum.map(fn {shard_key, servers, size_bytes} -> 225 - [inspect(shard_key), servers, pretty_number(size_bytes / 1_000_000) <> " MB"] 224 + |> Enum.map(fn {shard_key, {from_ids, to_ids}, size_bytes} -> 225 + [ 226 + inspect(shard_key), 227 + "#{inspect(from_ids)} -> #{inspect(to_ids)}", 228 + pretty_number(size_bytes / 1_000_000) <> " MB", 229 + ] 226 230 end) 227 231 |> Hobbes.Workloads.table( 228 232 cols: ["Shard", "Servers", "Size (MB)"], 229 233 align: [:left, :left, :right] 230 234 ) 231 235 236 + teams_table = 237 + shards 238 + |> Enum.group_by(fn {_shard_key, {from_ids, _to_ids}, _size_bytes} -> from_ids end) 239 + |> Enum.sort_by(&elem(&1, 0)) 240 + |> Enum.map(fn {from_ids, shards} -> 241 + shard_count = length(shards) 242 + total_size = Enum.sum_by(shards, fn {_, _, size_bytes} -> size_bytes end) 243 + [ 244 + inspect(from_ids), 245 + pretty_number(shard_count), 246 + pretty_number(total_size / 1_000_000) <> " MB", 247 + ] 248 + end) 249 + |> Hobbes.Workloads.table( 250 + cols: ["Team", "Shards", "MB (Total)"], 251 + align: [:left, :right, :right] 252 + ) 253 + 232 254 if check do 233 255 check_database(cluster, Map.fetch!(total_stats, :writes)) 234 256 end ··· 250 272 251 273 Shards (#{length(shards)}): 252 274 #{shards_table} 275 + 276 + Teams 277 + #{teams_table} 253 278 """, 254 279 } 255 280 end ··· 283 308 %{pid: pid} = Map.fetch!(cluster.servers, hd(from_ids)) 284 309 {:ok, %{size_bytes: size_bytes}} = Hobbes.Servers.Storage.get_shard_stats(pid, start_key, end_key) 285 310 286 - servers = "#{inspect(from_ids)} -> #{inspect(to_ids)}" 287 - {start_key, servers, size_bytes} 311 + {start_key, {from_ids, to_ids}, size_bytes} 288 312 end) 289 313 end 290 314