···3737 :ok
3838 end
39394040+ @spec list_shards(t) :: [{binary, term}]
4141+ def list_shards(table) do
4242+ :ets.tab2list(table)
4343+ end
4444+4045 @spec get(t, binary) :: term | nil
4146 def get(table, start_key) when is_binary(start_key) do
4247 case :ets.lookup(table, start_key) do
+43-80
lib/servers/distributor.ex
···44 import ExUnit.Assertions, only: [assert: 1]
5566 alias Hobbes.{Transaction, MetaStore, ShardInfoMap}
77+ alias Hobbes.ShardInfoMap.Shard
78 alias Hobbes.Structs.{Cluster, Server, ShardStats}
89 alias Hobbes.Servers.Storage
910 alias Hobbes.Transaction.TxnState
···175176 defp scan_shards(%State{} = state) do
176177 assert state.cluster.status == :normal
177178178178- with {:ok, txn} <- Transaction.new(state.cluster),
179179- {:ok, {pairs, _txn}} <- Transaction.read_range(txn, key_servers_prefix(), key_servers_end())
180180- do
181181- do_scan_shards(state, pairs)
182182- else
183183- {:error, _error} -> state
184184- end
185185- end
179179+ shards = ShardInfoMap.list_shards(state.shard_map)
186180187187- defp do_scan_shards(%State{} = state, pairs) when is_list(pairs) do
188188- shard_sizes =
189189- pairs
190190- |> Enum.map(fn {"\xFF/key_servers/" <> shard_key, servers_encoded} ->
191191- {from, to} = MetaStore.decode_key_servers(servers_encoded)
192192- {shard_key, from, to}
193193- end)
194194- |> then(fn shards ->
195195- shards ++ [{all_keys_end(), nil, nil}]
196196- end)
197197- |> Enum.chunk_every(2, 1, :discard)
198198- |> Enum.map(fn [{start_key, from, _to}, {end_key, _, _}] ->
199199- from
200200- |> Enum.map(fn id ->
201201- case Map.fetch(state.storage_servers, id) do
202202- {:ok, %StorageInfo{pid: pid}} -> pid
203203- :error -> nil
204204- end
205205- end)
206206- |> Enum.reject(&is_nil/1)
207207- |> case do
208208- [_ | _] = from_pids ->
209209- storage_pid = SimServer.deterministic_random(from_pids)
181181+ {shards_to_split, _shards} = Enum.split_with(shards, &should_split_shard?/1)
210182211211- {:ok, %ShardStats{} = stats} = too_new_backoff(fn ->
212212- Storage.get_shard_stats(storage_pid, start_key, end_key)
213213- end)
214214- {{start_key, end_key}, stats}
215215-216216- [] -> nil
217217- end
218218- end)
219219- |> Enum.reject(&is_nil/1)
220220-221221- oversize_shards = Enum.filter(shard_sizes, fn {_shard, %ShardStats{} = stats} ->
222222- stats.size_bytes > @shard_max_size_bytes
223223- end)
224224-225225- Enum.each(oversize_shards, fn {{start_key, end_key}, %ShardStats{} = stats} ->
226226- split_shard(start_key, end_key, stats.midpoint_key, state)
183183+ Enum.each(shards_to_split, fn shard ->
184184+ split_shard(state, shard)
227185 end)
228186229187 state
230188 end
231189232232- defp split_shard(shard_key, end_key, at_key, %State{} = state) when is_binary(shard_key) and is_binary(at_key) do
233233- # TODO: we will have to take a lock for this
234234- {:ok, txn} = Transaction.new(state.cluster)
235235- {:ok, {shard_value, txn}} = Transaction.read(txn, "\xFF/key_servers/" <> shard_key)
190190+ defp should_split_shard?(%Shard{stats: nil}), do: false
191191+ defp should_split_shard?(%Shard{to_server_ids: to}) when to != [], do: false
192192+ defp should_split_shard?(%Shard{} = shard), do: shard.stats.size_bytes > @shard_max_size_bytes
236193237237- case MetaStore.decode_key_servers(shard_value) do
238238- {[_ | _] = from, nil} ->
239239- pairs =
240240- Enum.map(from, fn id ->
241241- [
242242- {server_keys_prefix() <> Integer.to_string(id) <> "/" <> shard_key, "complete/" <> at_key},
243243- {server_keys_prefix() <> Integer.to_string(id) <> "/" <> at_key, "complete/" <> end_key},
244244- ]
245245- end)
246246- |> Enum.concat()
194194+ defp split_shard(%State{} = state, %Shard{} = shard) do
195195+ assert shard.to_server_ids == []
196196+ %ShardStats{midpoint_key: midpoint} = shard.stats
247197248248- split_key = "\xFF/key_servers/" <> at_key
249249- split_value = MetaStore.encode_key_servers(from, nil)
250250- pairs = [{split_key, split_value}] ++ pairs
198198+ new_shard = %Shard{
199199+ start_key: midpoint,
200200+ end_key: shard.end_key,
201201+ from_server_ids: shard.from_server_ids,
202202+ to_server_ids: [],
203203+ stats: nil,
204204+ }
205205+ shard = %{shard | end_key: midpoint, stats: nil}
251206252252- case Transaction.read(txn, split_key) do
253253- {:ok, {nil, txn}} ->
254254- txn
255255- |> Transaction.write(pairs)
256256- |> Transaction.commit()
257257- |> case do
258258- {:ok, _txn} -> :ok
259259- {:error, _} -> {:error, :commit_error}
260260- end
261261-262262- {:ok, {existing, _txn}} when is_binary(existing) ->
263263- {:error, :split_shard_already_exists}
264264- end
265265-266266- {[_ | _], [_ | _]} ->
267267- {:error, :shard_moving}
207207+ # TODO: open a transaction with no read version and only handle commit error
208208+ with {:ok, txn} <- Transaction.new(state.cluster),
209209+ txn = Transaction.write(txn, [to_ks_pair(new_shard)]),
210210+ {:ok, _txn} <- Transaction.commit(txn)
211211+ do
212212+ ShardInfoMap.put(state.shard_map, shard.start_key, shard)
213213+ ShardInfoMap.put(state.shard_map, new_shard.start_key, new_shard)
214214+ :ok
215215+ else
216216+ # TODO: we only need to exit for errors which are not retryable (e.g. commit timed out)
217217+ # This is necessary to maintain consistency of the in-memory shard map
218218+ _ -> exit(:shutdown)
268219 end
269220 end
270221···399350400351 defp server_keys_key(server_id, shard_key) when is_integer(server_id) and is_binary(shard_key) do
401352 server_keys_prefix() <> Integer.to_string(server_id) <> "/" <> shard_key
353353+ end
354354+355355+ defp to_ks_pair(%Shard{} = shard) do
356356+ to_server_ids = case shard.to_server_ids do
357357+ [] -> nil
358358+ list -> list
359359+ end
360360+361361+ key = key_servers_prefix() <> shard.start_key
362362+ value = MetaStore.encode_key_servers(shard.from_server_ids, to_server_ids)
363363+364364+ {key, value}
402365 end
403366end
+15-1
lib/shard_info_map.ex
···1111 end_key: binary,
1212 from_server_ids: [integer],
1313 to_server_ids: [integer],
1414- stats: ShardStats.t,
1414+ stats: ShardStats.t | nil,
1515 }
1616 @enforce_keys [
1717 :start_key,
···2929 @enforce_keys [:shard_map]
3030 defstruct @enforce_keys
31313232+ @spec new :: t
3233 def new do
3334 %ShardInfoMap{
3435 shard_map: DenseShardMap.new(),
3536 }
3637 end
37383939+ def put(%ShardInfoMap{shard_map: dsm}, start_key, %Shard{} = shard) do
4040+ DenseShardMap.put(dsm, start_key, shard)
4141+ end
4242+4343+ @spec list_shards(t) :: [Shard.t]
4444+ def list_shards(%ShardInfoMap{shard_map: dsm} = _sim) do
4545+ DenseShardMap.list_shards(dsm)
4646+ |> Enum.map(fn {_sk, %Shard{} = shard} -> shard end)
4747+ end
4848+4949+ @spec load(t, [{binary, binary}]) :: :ok
3850 def load(%ShardInfoMap{shard_map: dsm}, key_servers_pairs) when is_list(key_servers_pairs) do
3951 key_servers_pairs ++ [{key_servers_prefix() <> "\xFF\xFF", ""}]
4052 |> Enum.chunk_every(2, 1, :discard)
···4658 :ok
4759 end
48606161+ @spec update_shard_stats(t, ShardStats.t) :: :ok | :error
4962 def update_shard_stats(%ShardInfoMap{shard_map: dsm}, %ShardStats{} = stats) do
5063 sk = stats.start_key
5164 ek = stats.end_key
···5972 end
6073 end
61747575+ @spec shard_from_ks_pairs({binary, binary}, {binary, binary}) :: Shard.t
6276 defp shard_from_ks_pairs({key_servers_prefix() <> start_key, value}, {key_servers_prefix() <> end_key, _value}) do
6377 {from, to} = MetaStore.decode_key_servers(value)
6478 to = to || []