this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Compute team storage utilization in Distributor

garrison a504e9b6 86457a1a

+80 -17
+54 -11
lib/servers/distributor.ex
··· 5 5 6 6 alias Hobbes.{Transaction, ShardInfoMap} 7 7 alias Hobbes.ShardInfoMap.Shard 8 - alias Hobbes.Structs.{Cluster, Server, ShardStats} 8 + alias Hobbes.Structs.{Cluster, Server, StorageStats, ShardStats} 9 9 alias Hobbes.Servers.Storage 10 10 alias Hobbes.Encoding.Keyset 11 11 12 12 import Hobbes.Utils 13 13 14 + defmodule StorageTeam do 15 + @type t :: %__MODULE__{ 16 + id: non_neg_integer, 17 + storage_ids: non_neg_integer, 18 + } 19 + @enforce_keys [:id, :storage_ids] 20 + defstruct @enforce_keys 21 + end 22 + 14 23 defmodule StorageInfo do 15 24 @type t :: %__MODULE__{ 16 25 id: non_neg_integer, 17 26 pid: pid, 27 + stats: StorageStats.t, 18 28 last_ping_timestamp: non_neg_integer, 19 29 } 20 - @enforce_keys [:id, :pid, :last_ping_timestamp] 30 + @enforce_keys [:id, :pid, :stats, :last_ping_timestamp] 21 31 defstruct @enforce_keys 22 32 end 23 33 ··· 44 54 id: non_neg_integer, 45 55 cluster: Cluster.t, 46 56 57 + storage_teams: [StorageTeam.t], 47 58 storage_servers: %{non_neg_integer => StorageInfo.t}, 48 59 49 60 shard_map: ShardInfoMap.t, ··· 55 66 :id, 56 67 :cluster, 57 68 69 + :storage_teams, 58 70 :storage_servers, 59 71 60 72 :shard_map, ··· 81 93 end 82 94 end 83 95 84 - @spec storage_ping(pid, non_neg_integer, list) :: :ok 85 - def storage_ping(server, storage_id, shard_stats) when is_integer(storage_id) when is_list(shard_stats) do 86 - SimServer.cast(server, {:storage_ping, self(), storage_id, shard_stats}) 96 + @spec storage_ping(pid, non_neg_integer, StorageStats.t, list) :: :ok 97 + def storage_ping(server, storage_id, %StorageStats{} = storage_stats, shard_stats) when is_integer(storage_id) when is_list(shard_stats) do 98 + SimServer.cast(server, {:storage_ping, self(), storage_id, storage_stats, shard_stats}) 87 99 end 88 100 89 - def init(%{id: id, cluster: %Cluster{} = cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}) do 101 + def init(%{id: id, cluster: %Cluster{} = cluster, team_servers_pairs: team_servers_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}) do 90 102 assert is_list(key_servers_pairs) 91 103 assert is_integer(next_shard_move_id) 92 104 assert next_shard_move_id >= 0 93 105 106 + storage_teams = 107 + Enum.map(team_servers_pairs, fn {team_servers_prefix() <> k, v} -> 108 + [team_id] = Keyset.unpack(k) 109 + storage_ids = Keyset.unpack(v) 110 + %StorageTeam{id: team_id, storage_ids: storage_ids} 111 + end) 112 + 94 113 shard_map = ShardInfoMap.new() 95 114 ShardInfoMap.load(shard_map, key_servers_pairs) 96 115 ··· 119 138 id: id, 120 139 cluster: cluster, 121 140 141 + storage_teams: storage_teams, 122 142 storage_servers: %{}, 123 143 124 144 shard_map: shard_map, ··· 137 157 {:reply, result, state} 138 158 end 139 159 140 - def handle_cast({:storage_ping, storage_pid, storage_id, shard_stats}, %State{} = state) do 160 + def handle_cast({:storage_ping, storage_pid, storage_id, %StorageStats{} = storage_stats, shard_stats}, %State{} = state) do 141 161 prev_info = Map.get(state.storage_servers, storage_id) 142 162 143 - info = %StorageInfo{id: storage_id, pid: storage_pid, last_ping_timestamp: SimServer.current_time()} 163 + info = %StorageInfo{id: storage_id, pid: storage_pid, stats: storage_stats, last_ping_timestamp: SimServer.current_time()} 144 164 state = %{state | storage_servers: Map.put(state.storage_servers, storage_id, info)} 145 165 146 166 :ok = update_shard_stats(state, shard_stats) ··· 153 173 end 154 174 155 175 def handle_info(:tick_scan, %State{} = state) do 156 - state = scan_shards(state) 176 + state = scan(state) 157 177 SimServer.send_after(self(), :tick_scan, @tick_scan_interval_ms) 158 178 {:noreply, state} 159 179 end ··· 191 211 end) 192 212 end 193 213 194 - defp scan_shards(%State{} = state) when state.cluster.status != :normal, do: state 214 + defp scan(%State{} = state) when state.cluster.status != :normal, do: state 215 + 216 + defp scan(%State{} = state) do 217 + state = scan_and_split_shards(state) 218 + state = scan_teams(state) 219 + 220 + state 221 + end 195 222 196 - defp scan_shards(%State{} = state) do 223 + defp scan_teams(%State{} = state) do 224 + _team_stats = 225 + state.storage_teams 226 + |> Enum.map(fn %StorageTeam{} = team -> 227 + storage_info = Enum.map(team.storage_ids, fn id -> Map.get(state.storage_servers, id) end) 228 + # The storage disk state will not be byte-for-byte identical (though probably fairly close in practice) 229 + # so we use the worst case (min) across the team for these stats 230 + # TODO: handle missing stats (server has not pinged) 231 + used_bytes = storage_info |> Enum.map(&(&1.stats.used_bytes)) |> Enum.min() 232 + free_bytes = storage_info |> Enum.map(&(&1.stats.free_bytes)) |> Enum.min() 233 + {team, %{used_bytes: used_bytes, free_bytes: free_bytes}} 234 + end) 235 + 236 + state 237 + end 238 + 239 + defp scan_and_split_shards(%State{} = state) do 197 240 assert state.cluster.status == :normal 198 241 199 242 shards = ShardInfoMap.list_shards(state.shard_map)
+2 -1
lib/servers/manager.ex
··· 613 613 %Cluster{} = cluster = state.cluster 614 614 615 615 # Load meta pairs from meta_kv 616 + team_servers_pairs = FlatKV.scan(meta_kv, team_servers_prefix(), team_servers_end()).pairs 616 617 key_servers_pairs = FlatKV.scan(meta_kv, key_servers_prefix(), key_servers_end()).pairs 617 618 shard_moves_pairs = FlatKV.scan(meta_kv, shard_moves_prefix(), shard_moves_end()).pairs 618 619 [next_shard_move_id] = FlatKV.get(meta_kv, next_shard_move_id_key()) |> Keyset.unpack() ··· 621 622 [ 622 623 [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: prev_version}}], 623 624 [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: prev_version}}], 624 - [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 625 + [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, team_servers_pairs: team_servers_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 625 626 Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 626 627 Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 627 628 ]
+13 -2
lib/servers/storage.ex
··· 6 6 7 7 alias Hobbes.{HybridKV, SparseShardMap} 8 8 alias Hobbes.KV.{MutationLog, ByteSample} 9 - alias Hobbes.Structs.{Cluster, TLogGeneration, Server, PeekResult, RangeResult, ShardStats} 9 + alias Hobbes.Structs.{Cluster, TLogGeneration, Server, PeekResult, RangeResult, StorageStats, ShardStats} 10 10 alias Hobbes.Servers.{CommitBuffer, TLog, Storage, Distributor} 11 11 alias Hobbes.Encoding.Keyset 12 12 ··· 411 411 SparseShardMap.list_shards(state.shard_map) 412 412 |> Enum.map(fn {sk, ek} -> compute_shard_stats(state, sk, ek) end) 413 413 414 - Distributor.storage_ping(distributor_pid, state.id, shard_stats) 414 + # TODO: real storage capacity 415 + total_storage_bytes = 1_000_000_000 416 + used_bytes = Enum.sum_by(shard_stats, &(&1.size_bytes)) 417 + free_bytes = total_storage_bytes - used_bytes 418 + 419 + storage_stats = %StorageStats{ 420 + server_id: state.id, 421 + used_bytes: used_bytes, 422 + free_bytes: free_bytes, 423 + } 424 + 425 + Distributor.storage_ping(distributor_pid, state.id, storage_stats, shard_stats) 415 426 end 416 427 417 428 defp flush(%State{kv: kv} = state) do
+10
lib/structs.ex
··· 134 134 defstruct [:pairs, :count, :range, :more] 135 135 end 136 136 137 + defmodule StorageStats do 138 + @type t :: %__MODULE__{ 139 + server_id: non_neg_integer, 140 + used_bytes: non_neg_integer, 141 + free_bytes: non_neg_integer, 142 + } 143 + @enforce_keys [:server_id, :used_bytes, :free_bytes] 144 + defstruct @enforce_keys 145 + end 146 + 137 147 defmodule ShardStats do 138 148 @type t :: %__MODULE__{ 139 149 start_key: binary,
+1 -3
test/hobbes_test.exs
··· 259 259 num_storage: 12, 260 260 initial_shards: [ 261 261 "", 262 - "4", 263 - "8", 264 - "C", 262 + "4", "8", "C", 265 263 ], 266 264 ])) 267 265 end