···3737 :ok
3838 end
39394040+ @spec get(t, binary) :: term | nil
4141+ def get(table, start_key) when is_binary(start_key) do
4242+ case :ets.lookup(table, start_key) do
4343+ [{^start_key, value}] -> value
4444+ [] -> nil
4545+ end
4646+ end
4747+4048 @spec shard_for_key(t, binary) :: {binary, binary, term}
4149 def shard_for_key(table, key) when is_database_key(key) do
4250 # Raises if table is empty, which is fine
+57-17
lib/servers/distributor.ex
···3344 import ExUnit.Assertions, only: [assert: 1]
5566- alias Hobbes.{Transaction, MetaStore}
66+ alias Hobbes.{Transaction, MetaStore, ShardInfoMap}
77 alias Hobbes.Structs.{Cluster, Server, ShardStats}
88 alias Hobbes.Servers.Storage
99 alias Hobbes.Transaction.TxnState
···3838 id: non_neg_integer,
3939 cluster: Cluster.t,
4040 storage_servers: %{non_neg_integer => StorageInfo.t},
4141+ shard_map: ShardInfoMap.t,
4142 shard_moves: [ShardMove.t],
4243 }
4344 @enforce_keys [
4445 :id,
4546 :cluster,
4647 :storage_servers,
4848+ :shard_map,
4749 :shard_moves,
4850 ]
4951 defstruct @enforce_keys
···6062 SimServer.call(server, {:move_shard, shard_key, to_servers}, 300_000)
6163 end
62646363- @spec storage_ping(pid, non_neg_integer) :: :ok
6464- def storage_ping(server, storage_id) when is_integer(storage_id) do
6565- SimServer.cast(server, {:storage_ping, self(), storage_id})
6565+ @spec storage_ping(pid, non_neg_integer, list) :: :ok
6666+ def storage_ping(server, storage_id, shard_stats) when is_integer(storage_id) when is_list(shard_stats) do
6767+ SimServer.cast(server, {:storage_ping, self(), storage_id, shard_stats})
6668 end
67696870 def init(%{id: id, cluster: %Cluster{} = cluster}) do
···7072 id: id,
7173 cluster: cluster,
7274 storage_servers: %{},
7575+ shard_map: ShardInfoMap.new(),
7376 shard_moves: [],
7477 }
75787676- SimServer.send_after(self(), :tick_scan, @tick_scan_interval_ms)
7777- SimServer.send_after(self(), :tick_shard_moves, @tick_shard_moves_interval_ms)
7878-7979+ SimServer.send_after self(), :tick_setup, 0
7980 {:ok, state}
8081 end
8182···8788 end
8889 end
89909090- def handle_cast({:storage_ping, storage_pid, storage_id}, %State{} = state) do
9191+ def handle_cast({:storage_ping, storage_pid, storage_id, shard_stats}, %State{} = state) do
9192 prev_info = Map.get(state.storage_servers, storage_id)
92939394 info = %StorageInfo{id: storage_id, pid: storage_pid, last_ping_timestamp: SimServer.current_time()}
9495 state = %{state | storage_servers: Map.put(state.storage_servers, storage_id, info)}
9696+9797+ :ok = update_shard_stats(state, shard_stats)
95989699 if prev_info == nil or (prev_info.id != storage_id) do
9797- # Send updated storage server pid map to CommitBuffers
9898- # Note: in theory this message could be lost, and we do not currently re-send
9999- # This is not a correctness issue, but reads would be blocked
100100- server_map = state.storage_servers |> Map.values() |> Map.new(&{&1.id, &1.pid})
101101-102102- get_servers(state.cluster, Hobbes.Servers.CommitBuffer)
103103- |> Enum.each(fn %Server{pid: buf_pid} ->
104104- SimServer.send buf_pid, {:update_storage_servers, server_map}
105105- end)
100100+ broadcast_storage_map(state)
106101 end
107102108103 {:noreply, state}
109104 end
110105106106+ def handle_info(:tick_setup, %State{} = state) do
107107+ case setup(state) do
108108+ {:ok, state} ->
109109+ SimServer.send_after(self(), :tick_scan, @tick_scan_interval_ms)
110110+ SimServer.send_after(self(), :tick_shard_moves, @tick_shard_moves_interval_ms)
111111+112112+ {:noreply, state}
113113+114114+ :error ->
115115+ SimServer.send_after self(), :tick_setup, 100
116116+ {:noreply, state}
117117+ end
118118+ end
119119+111120 def handle_info(:tick_scan, %State{} = state) do
112121 state = scan_shards(state)
113122 SimServer.send_after(self(), :tick_scan, @tick_scan_interval_ms)
···126135 cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}}
127136 cluster.generation > state.cluster.generation -> exit(:shutdown)
128137 end
138138+ end
139139+140140+ defp setup(%State{} = state) do
141141+ with {:ok, txn} <- Transaction.new(state.cluster),
142142+ {:ok, {key_servers_pairs, txn}} <- Transaction.read_range(txn, key_servers_prefix(), key_servers_end()),
143143+ {:ok, _txn} = Transaction.commit(txn)
144144+ do
145145+ ShardInfoMap.load(state.shard_map, key_servers_pairs)
146146+ {:ok, state}
147147+ else
148148+ _ -> :error
149149+ end
150150+ end
151151+152152+ defp update_shard_stats(%State{shard_map: shard_map}, shard_stats) when is_list(shard_stats) do
153153+ Enum.each(shard_stats, fn stats ->
154154+ ShardInfoMap.update_shard_stats(shard_map, stats)
155155+ end)
156156+ :ok
157157+ end
158158+159159+ defp broadcast_storage_map(%State{} = state) do
160160+ # Send updated storage server pid map to CommitBuffers
161161+ # Note: in theory this message could be lost, and we do not currently re-send
162162+ # This is not a correctness issue, but reads would be blocked
163163+ server_map = state.storage_servers |> Map.values() |> Map.new(&{&1.id, &1.pid})
164164+165165+ get_servers(state.cluster, Hobbes.Servers.CommitBuffer)
166166+ |> Enum.each(fn %Server{pid: buf_pid} ->
167167+ SimServer.send buf_pid, {:update_storage_servers, server_map}
168168+ end)
129169 end
130170131171 defp scan_shards(%State{} = state) when state.cluster.status != :normal do
···214214 end
215215216216 def handle_call({:get_shard_stats, start_key, end_key}, _from, %State{} = state) do
217217- pairs = ByteSample.scan(state.byte_sample, start_key, end_key)
218218-219219- size =
220220- pairs
221221- |> Enum.reduce(0, fn {_k, bytes}, acc -> acc + bytes end)
222222- |> round()
223223-224224- half_size = div(size, 2)
225225- # TODO: we use the full midpoint key, but we could instead use the shortest key which
226226- # separates the midpoint and the next key (which would make the shard map smaller)
227227- midpoint =
228228- Enum.reduce_while(pairs, 0, fn {k, bytes}, acc ->
229229- acc = acc + bytes
230230- case acc > half_size do
231231- true -> {:halt, k}
232232- false -> {:cont, acc}
233233- end
234234- end)
235235- |> case do
236236- midpoint when is_binary(midpoint) -> midpoint
237237- # If the shard is too small to have any byte sample keys
238238- 0 -> start_key
239239- end
240240-241241- stats = %ShardStats{size_bytes: size, midpoint_key: midpoint}
217217+ stats = compute_shard_stats(state, start_key, end_key)
242218 {:reply, {:ok, stats}, state}
243219 end
244220···360336 # If calls we make time out, we may receive their responses later
361337 def handle_info(_message, state), do: {:noreply, state}
362338363363- def ping_distributor(%State{} = state) do
339339+ defp ping_distributor(%State{} = state) do
364340 case get_servers(state.cluster, Hobbes.Servers.Distributor) do
365365- [%Server{pid: distributor_pid}] ->
366366- Distributor.storage_ping(distributor_pid, state.id)
367367-341341+ [%Server{pid: distributor_pid}] -> do_ping_distributor(state, distributor_pid)
368342 [] -> :noop
369343 end
370344371345 state
346346+ end
347347+348348+ defp do_ping_distributor(%State{} = state, distributor_pid) do
349349+ # TODO: this is very inefficient, we need to continuously update shard sizes in-memory
350350+ # TODO: only send oversize/undersize shards?
351351+ shard_stats =
352352+ SparseShardMap.list_shards(state.shard_map)
353353+ |> Enum.map(fn {sk, ek} -> compute_shard_stats(state, sk, ek) end)
354354+355355+ Distributor.storage_ping(distributor_pid, state.id, shard_stats)
372356 end
373357374358 defp flush(%State{kv: kv} = state) do
···691675 shard_import = %{shard_import | nonce: nil, status: :complete, completed_version: shard_import.current_read_version}
692676 put_in(state.imports[shard_import.id], shard_import)
693677 end
678678+ end
679679+680680+ defp compute_shard_stats(%State{} = state, start_key, end_key) when is_binary(start_key) and is_binary(end_key) do
681681+ pairs = ByteSample.scan(state.byte_sample, start_key, end_key)
682682+683683+ size =
684684+ pairs
685685+ |> Enum.reduce(0, fn {_k, bytes}, acc -> acc + bytes end)
686686+ |> round()
687687+688688+ half_size = div(size, 2)
689689+ # TODO: we use the full midpoint key, but we could instead use the shortest key which
690690+ # separates the midpoint and the next key (which would make the shard map smaller)
691691+ midpoint =
692692+ Enum.reduce_while(pairs, 0, fn {k, bytes}, acc ->
693693+ acc = acc + bytes
694694+ case acc > half_size do
695695+ true -> {:halt, k}
696696+ false -> {:cont, acc}
697697+ end
698698+ end)
699699+ |> case do
700700+ midpoint when is_binary(midpoint) -> midpoint
701701+ # If the shard is too small to have any byte sample keys
702702+ 0 -> start_key
703703+ end
704704+705705+ %ShardStats{start_key: start_key, end_key: end_key, size_bytes: size, midpoint_key: midpoint}
694706 end
695707696708 defp check_up_to_date(%State{} = state, version) do
+79
lib/shard_info_map.ex
···11+defmodule Hobbes.ShardInfoMap do
22+ alias Hobbes.{DenseShardMap, ShardInfoMap}
33+ alias Hobbes.Structs.ShardStats
44+ alias Hobbes.MetaStore
55+66+ import Hobbes.Utils
77+88+ defmodule Shard do
99+ @type t :: %__MODULE__{
1010+ start_key: binary,
1111+ end_key: binary,
1212+ from_server_ids: [integer],
1313+ to_server_ids: [integer],
1414+ stats: ShardStats.t,
1515+ }
1616+ @enforce_keys [
1717+ :start_key,
1818+ :end_key,
1919+ :from_server_ids,
2020+ :to_server_ids,
2121+ :stats,
2222+ ]
2323+ defstruct @enforce_keys
2424+ end
2525+2626+ @type t :: %__MODULE__{
2727+ shard_map: DenseShardMap.t,
2828+ }
2929+ @enforce_keys [:shard_map]
3030+ defstruct @enforce_keys
3131+3232+ def new do
3333+ %ShardInfoMap{
3434+ shard_map: DenseShardMap.new(),
3535+ }
3636+ end
3737+3838+ def load(%ShardInfoMap{shard_map: dsm}, key_servers_pairs) when is_list(key_servers_pairs) do
3939+ key_servers_pairs ++ [{key_servers_prefix() <> "\xFF\xFF", ""}]
4040+ |> Enum.chunk_every(2, 1, :discard)
4141+ |> Enum.each(fn [pair, next] ->
4242+ %Shard{} = info = shard_from_ks_pairs(pair, next)
4343+ DenseShardMap.put(dsm, info.start_key, info)
4444+ end)
4545+4646+ :ok
4747+ end
4848+4949+ def update_shard_stats(%ShardInfoMap{shard_map: dsm}, %ShardStats{} = stats) do
5050+ sk = stats.start_key
5151+ ek = stats.end_key
5252+5353+ case DenseShardMap.get(dsm, sk) do
5454+ %Shard{start_key: ^sk, end_key: ^ek} = existing ->
5555+ DenseShardMap.put(dsm, sk, %{existing | stats: stats})
5656+ :ok
5757+5858+ _ -> :error
5959+ end
6060+ end
6161+6262+ defp shard_from_ks_pairs({key_servers_prefix() <> start_key, value}, {key_servers_prefix() <> end_key, _value}) do
6363+ {from, to} = MetaStore.decode_key_servers(value)
6464+ to = to || []
6565+6666+ %Shard{
6767+ start_key: start_key,
6868+ end_key: end_key,
6969+ from_server_ids: from,
7070+ to_server_ids: to,
7171+ stats: nil,
7272+ }
7373+ end
7474+7575+ @doc false
7676+ def dump(%ShardInfoMap{} = sim) do
7777+ DenseShardMap.dump(sim.shard_map)
7878+ end
7979+end
+5
lib/sparse_shard_map.ex
···3434 :ok
3535 end
36363737+ @spec list_shards(t) :: [{binary, binary}]
3838+ def list_shards(table) do
3939+ :ets.tab2list(table)
4040+ end
4141+3742 @doc """
3843 Checks if any shards in the map contain a key.
3944 """