···11+defmodule Hobbes.KV.ByteSample do
22+ alias Hobbes.Utils
33+ import Hobbes.Utils
44+55+ # Byte sample should be 1/250th the size of k/v data
66+ @byte_sample_factor 250
77+ # TODO: 250 is to rare for tests, buggify
88+ #@byte_sample_factor 2
99+1010+ # Approximate overhead per sample (other than key size)
1111+ # (currently just 8 bytes to store the size value as a string)
1212+ @byte_sample_overhead_bytes 8
1313+1414+ @type t :: :ets.table
1515+1616+ @spec new :: t
1717+ def new do
1818+ :ets.new(__MODULE__, [:ordered_set, :private])
1919+ end
2020+2121+ @spec load(t, [{binary, binary}]) :: :ok
2222+ def load(table, pairs) when is_list(pairs) do
2323+ Enum.each(pairs, fn {k, v} when is_binary(k) and is_binary(v) ->
2424+ bytes = decode_float(v)
2525+ :ets.insert(table, {k, bytes})
2626+ end)
2727+ :ok
2828+ end
2929+3030+ @spec scan(t, binary, binary) :: [{binary, float}]
3131+ def scan(table, start_key, end_key) do
3232+ acc =
3333+ case :ets.lookup(table, start_key) do
3434+ [{^start_key, _size}] = result -> result
3535+ [] -> []
3636+ end
3737+3838+ do_scan(table, end_key, start_key, acc)
3939+ |> Enum.reverse()
4040+ end
4141+4242+ defp do_scan(table, end_key, prev_key, acc) do
4343+ case :ets.next_lookup(table, prev_key) do
4444+ {_key, [{key, _size} = pair]} ->
4545+ case key < end_key do
4646+ true -> do_scan(table, end_key, key, [pair | acc])
4747+ false -> acc
4848+ end
4949+5050+ :"$end_of_table" -> acc
5151+ end
5252+ end
5353+5454+ @spec apply_batch(t, [Utils.mutation]) :: [Utils.mutation]
5555+ def apply_batch(table, mutations) when is_list(mutations) do
5656+ mutations
5757+ |> Enum.reduce([], fn
5858+ {:write, k, v}, acc ->
5959+ key_size = byte_size(k)
6060+ pair_size = key_size + byte_size(v)
6161+ probability = byte_sample_probability(key_size, pair_size)
6262+6363+ case (:erlang.phash2(k, 1000) / 1000) < probability do
6464+ true ->
6565+ # Correct for sampling probability (see comments in byte_sample_probability/2)
6666+ sampled_size = pair_size / min(probability, 1)
6767+6868+ :ets.insert(table, {k, sampled_size})
6969+ mut = {:write, special_byte_sample_prefix() <> k, encode_float(sampled_size)}
7070+ [mut | acc]
7171+7272+ false -> acc
7373+ end
7474+7575+ {:clear, k}, acc ->
7676+ case :ets.member(table, k) do
7777+ true ->
7878+ :ets.delete(table, k)
7979+ mut = {:clear, special_byte_sample_prefix() <> k}
8080+ [mut | acc]
8181+8282+ false -> acc
8383+ end
8484+8585+ {:clear_range, _k, _v}, _acc ->
8686+ raise "Not implemented"
8787+ end)
8888+ |> Enum.reverse()
8989+ end
9090+9191+ defp byte_sample_probability(key_size, pair_size) do
9292+ # Probability that a key/value pair of this size belongs in the byte sample
9393+ # This is a function of the size rather than a percentage of keys so that
9494+ # we can maintain the byte sample as a fraction of *total KV size*
9595+ #
9696+ # Intuitively: the byte sample only stores keys, so if values are larger than
9797+ # the overhead we can afford to store more samples while staying under the limit
9898+ # Therefore, if the value is large, probability should increase
9999+ #
100100+ # We then correct out the probability factor by dividing size to get sampled_size
101101+ # at the end, so that the sample is not biased by the larger pairs
102102+ #
103103+ # This algorithm is borrowed directly from FDB (storageserver isKeyValueInSample)
104104+ (pair_size / (key_size + @byte_sample_overhead_bytes)) / @byte_sample_factor
105105+ end
106106+107107+ defp encode_float(float) when is_number(float) do
108108+ Integer.to_string(round(float * 1000))
109109+ end
110110+111111+ defp decode_float(string) when is_binary(string) do
112112+ String.to_integer(string) / 1000
113113+ end
114114+115115+ @doc false
116116+ def dump(table) do
117117+ :ets.tab2list(table)
118118+ end
119119+end
+17-65
lib/servers/storage.ex
···55 import ExUnit.Assertions, only: [assert: 1]
6677 alias Hobbes.{HybridKV, MetaStore, SparseShardMap}
88- alias Hobbes.KV.MutationLog
88+ alias Hobbes.KV.{MutationLog, ByteSample}
99 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, PeekResult, RangeResult, ShardStats}
1010 alias Hobbes.Servers.{CommitBuffer, TLog, Storage, Distributor}
1111···4848 kv: term,
4949 meta_store: term,
5050 shard_map: SparseShardMap.t,
5151+ byte_sample: ByteSample.t,
51525253 imports: [ShardImport.t],
5354 shard_clears: [ShardClear.t],
···6768 :kv,
6869 :meta_store,
6970 :shard_map,
7171+ :byte_sample,
70727173 :imports,
7274 :shard_clears,
···7981 @flush_interval_ms 250
8082 @import_interval_ms 10
81838282- # Byte sample should be 1/250th the size of k/v data
8383- @byte_sample_factor 250
8484- # Approximate overhead per sample (other than key size)
8585- # (currently just 8 bytes to store the size value as a string)
8686- @byte_sample_overhead_bytes 8
8787- # Prefix under which to store byte sample data
8888- @byte_sample_prefix "\xFF\xFF\xFE"
8989-9084 def start_link(arg), do: SimServer.start_link(__MODULE__, arg)
91859286 # TODO: get rid of single-key reads entirely and just send a multi-key read from Transaction
···179173 kv: kv,
180174 meta_store: MetaStore.new(kv.mem_kv),
181175 shard_map: shard_map,
176176+ byte_sample: ByteSample.new(),
182177183178 imports: %{},
184179 shard_clears: [],
185180 }
181181+182182+ byte_sample_pairs = HybridKV.scan(kv, 1, special_byte_sample_prefix(), special_byte_sample_end()).pairs
183183+ ByteSample.load(state.byte_sample, byte_sample_pairs)
186184187185 SimServer.send_after(self(), :tick_ping, 0)
188186 SimServer.send_after(self(), :flush, @flush_interval_ms)
···218216 end
219217220218 def handle_call({:get_shard_stats, start_key, end_key}, _from, %State{} = state) do
221221- read_version = state.data_version
222222-223223- bs_sk = @byte_sample_prefix <> start_key
224224- bs_ek = @byte_sample_prefix <> end_key
225225- %RangeResult{pairs: pairs, more: false} = HybridKV.scan(state.kv, read_version, bs_sk, bs_ek)
219219+ pairs = ByteSample.scan(state.byte_sample, start_key, end_key)
226220227221 size =
228222 pairs
229229- |> Enum.reduce(0, fn {_k, v}, acc -> acc + decode_float(v) end)
223223+ |> Enum.reduce(0, fn {_k, bytes}, acc -> acc + bytes end)
230224 |> round()
231225232226 half_size = div(size, 2)
233227 # TODO: we use the full midpoint key, but we could instead use the shortest key which
234228 # separates the midpoint and the next key (which would make the shard map smaller)
235229 midpoint =
236236- Enum.reduce_while(pairs, 0, fn {k, v}, acc ->
237237- acc = acc + decode_float(v)
230230+ Enum.reduce_while(pairs, 0, fn {k, bytes}, acc ->
231231+ acc = acc + bytes
238232 case acc > half_size do
239233 true -> {:halt, k}
240234 false -> {:cont, acc}
241235 end
242236 end)
243237 |> case do
244244- @byte_sample_prefix <> midpoint -> midpoint
238238+ midpoint when is_binary(midpoint) -> midpoint
245239 # If the shard is too small to have any byte sample keys
246240 0 -> start_key
247241 end
···367361 HybridKV.flush(kv, adv)
368362369363 HybridKV.delete_range_storage(kv, start_key, end_key)
370370- HybridKV.delete_range_storage(kv, @byte_sample_prefix <> start_key, @byte_sample_prefix <> end_key)
364364+ # TODO: clear byte sample
371365 end)
372366 state = %State{state | shard_clears: remaining_clears}
373367···511505 end
512506513507 defp apply_data_mutations(%State{kv: kv} = state, version, data_mutations) when is_list(data_mutations) do
514514- # TODO: append byte sample mutations to batch
515508 kv = HybridKV.apply_batch(kv, version, data_mutations)
516516- %{state | kv: kv}
517517- end
518518-519519- defp byte_sample_key(key), do: @byte_sample_prefix <> key
520509521521- @spec byte_sample_pair(binary, binary) :: {:in_sample, binary, binary} | {:not_in_sample, binary}
522522- defp byte_sample_pair(key, value) when is_binary(key) and is_binary(value) do
523523- # Note: this byte sample algorithm is borrowed directly from FDB
524524- # See isKeyValueInSample in storageserver
525525- key_size = byte_size(key)
526526- pair_size = key_size + byte_size(value)
510510+ byte_sample_mutations = ByteSample.apply_batch(state.byte_sample, data_mutations)
511511+ MutationLog.append(state.kv.mutation_log, version, byte_sample_mutations)
527512528528- # Probability that a key/value pair of this size belongs in the byte sample
529529- # This is a function of the size rather than a percentage of keys so that
530530- # we can maintain the byte sample as a fraction of *total KV size*
531531- #
532532- # Intuitively: the byte sample only stores keys, so if values are larger than
533533- # the overhead we can afford to store more samples while staying under the limit
534534- # Therefore, if the value is large, probability should increase
535535- #
536536- # We then correct out the probability factor by dividing size to get sampled_size
537537- # at the end, so that the sample is not biased by the larger pairs
538538- probability = (pair_size / (key_size + @byte_sample_overhead_bytes)) / @byte_sample_factor
539539-540540- hash = :erlang.phash2(key, 1000) / 1000
541541- case hash < probability do
542542- true ->
543543- sampled_size = pair_size / min(probability, 1)
544544-545545- s_key = @byte_sample_prefix <> key
546546- s_value = encode_float(sampled_size)
547547-548548- {:in_sample, s_key, s_value}
549549-550550- false ->
551551- {:not_in_sample, @byte_sample_prefix <> key}
552552- end
513513+ %{state | kv: kv}
553514 end
554515555516 defp apply_special_mutations(%State{} = state, version, mutations) do
···633594 if cancelled? do
634595 %ShardImport{start_key: start_key, end_key: end_key} = si
635596 HybridKV.nuke_range(state.kv, start_key, end_key)
636636- HybridKV.nuke_range(state.kv, @byte_sample_prefix <> start_key, @byte_sample_prefix <> end_key)
597597+ # TODO: clear byte sample
637598 end
638599639600 %State{state | imports: Map.delete(state.imports, si.ref)}
···738699 true -> :ok
739700 false -> {:error, :wrong_server}
740701 end
741741- end
742742-743743- # TODO: switch to a real encoding, obviously
744744- defp encode_float(float) when is_number(float) do
745745- Integer.to_string(round(float * 1000))
746746- end
747747-748748- defp decode_float(string) when is_binary(string) do
749749- String.to_integer(string) / 1000
750702 end
751703end
···148148 end
149149 end
150150151151- defp inc_version(%Verifier{version: version} = verifier) do
152152- %Verifier{verifier | version: version + rand(3)}
153153- end
154154-155151 defp rand(s \\ 0, e), do: Enum.random(s..e)
156152157153 defp rand_read_version(%Verifier{} = verifier) do
+22
test/kv/byte_sample_test.exs
···11+defmodule Hobbes.KV.ByteSampleTest do
22+ use ExUnit.Case, async: true
33+44+ alias Hobbes.KV.ByteSample
55+66+ @moduletag :byte_sample
77+88+ setup do
99+ %{bs: ByteSample.new()}
1010+ end
1111+1212+ describe "ByteSample" do
1313+ test "samples", %{bs: bs} do
1414+ mutations = Enum.map(1..4000, fn i -> {:write, "k#{String.pad_leading(to_string(i), 4, "0")}", "v#{i}"} end)
1515+ ByteSample.apply_batch(bs, mutations)
1616+1717+ pairs = ByteSample.scan(bs, "k1000", "k2000")
1818+ # Anything else will flake if we change the parameters
1919+ assert is_list(pairs)
2020+ end
2121+ end
2222+end