···9191- [X] Update TaggedQueue to use ets instead of erlang queues (tlog batch appends would randomly spike to 50ms, maybe gc? this fixed it)
9292- [ ] Optimize resolver
9393- [ ] Store byte sample in a separate FlatKV and add mutations to HybridKV mutation log
9494+- [X] Optimize CommitBuffer to use unversioned shard map with pre-computed tlogs and tags
9595+ - CommitBuffer reductions on bench_rw went from 85 million down to 25 million!
9696+ - Txn/s went from about 24.8k to about 25.8k! (obviously CommitBuffer is not the bottleneck)
949795989699### Construct
+79
lib/dense_shard_map.ex
···11+defmodule Hobbes.DenseShardMap do
22+ @moduledoc ~S"""
33+ This module implements a *Dense* Shard Map, where the entire key space of the database is covered.
44+55+ Shards are stored in a tree with keys of `start_key`, with the `end_key` implied by
66+ the `start_key` of the following shard.
77+ The final `end_key` is implied to be `Utils.all_keys_end()` (`\xFF\xFF`).
88+99+ This mirrors how shards are stored in the actual Hobbes meta keyspace (the `key_servers` space),
1010+ and `DenseShardMap` is designed to function as a local coherent cache of that space.
1111+ """
1212+1313+ import Hobbes.Utils
1414+1515+ @type t :: :ets.table
1616+1717+ @spec new :: t
1818+ def new do
1919+ :ets.new(__MODULE__, [:ordered_set, :private])
2020+ end
2121+2222+ @spec put(t, binary, term) :: :ok
2323+ def put(table, start_key, value) when is_database_key(start_key) do
2424+ :ets.insert(table, {start_key, value})
2525+ :ok
2626+ end
2727+2828+ @spec delete(t, binary) :: :ok
2929+ def delete(table, start_key) when is_database_key(start_key) do
3030+ :ets.delete(table, start_key)
3131+ :ok
3232+ end
3333+3434+ @spec shard_for_key(t, binary) :: {binary, binary, term}
3535+ def shard_for_key(table, key) when is_database_key(key) do
3636+ # Raises if table is empty, which is fine
3737+ {_sk, [{sk, value}]} = :ets.prev_lookup(table, next_key(key))
3838+ ek = shard_end_key(table, sk)
3939+4040+ {sk, ek, value}
4141+ end
4242+4343+ @spec shards_for_range(t, binary, binary) :: [{{binary, binary}, term}]
4444+ def shards_for_range(table, start_key, end_key) when is_database_range(start_key, end_key) do
4545+ {_sk, [{first_sk, _value} = first]} = :ets.prev_lookup(table, next_key(start_key))
4646+4747+ scan_shards(table, end_key, first_sk, [first])
4848+ |> Enum.reverse()
4949+ |> Enum.chunk_every(2, 1, :discard)
5050+ |> Enum.map(fn [{sk, value}, {ek, _value}] ->
5151+ {sk, ek, value}
5252+ end)
5353+ end
5454+5555+ defp scan_shards(table, end_key, prev_key, acc) do
5656+ case :ets.next_lookup(table, prev_key) do
5757+ {_sk, [{sk, _value} = tup]} ->
5858+ case sk < end_key do
5959+ true -> scan_shards(table, end_key, sk, [tup | acc])
6060+ false -> [tup | acc]
6161+ end
6262+6363+ :"$end_of_table" -> [{all_keys_end(), nil} | acc]
6464+ end
6565+ end
6666+6767+ @spec shard_end_key(t, binary) :: binary
6868+ defp shard_end_key(table, start_key) when is_binary(start_key) do
6969+ case :ets.next(table, start_key) do
7070+ :"$end_of_table" -> all_keys_end()
7171+ ek when is_binary(ek) -> ek
7272+ end
7373+ end
7474+7575+ @doc false
7676+ def dump(table) do
7777+ :ets.tab2list(table)
7878+ end
7979+end
+45-68
lib/servers/commit_buffer.ex
···3344 import ExUnit.Assertions, only: [assert: 1]
5566- alias Hobbes.MetaStore
77- alias Hobbes.Structs.{Cluster, Server, ResolveBatch, CommitTxn, LogBatch}
66+ alias Hobbes.ShardTagMap
77+ alias Hobbes.Structs.{Cluster, TLogGeneration, Server, ResolveBatch, CommitTxn, LogBatch}
8899 alias Hobbes.Servers.{Sequencer, Resolver, TLog}
1010···1616 @flush_interval_ms 1
1717 @max_buffer_size 300
18181919- @tlog_replication_factor 3
2020-2119 defmodule State do
2020+ @type t :: %__MODULE__{
2121+ id: non_neg_integer,
2222+ cluster: Cluster.t,
2323+ shard_map: ShardTagMap.t,
2424+ last_committed_version: non_neg_integer,
2525+2626+ buffer: list,
2727+ buffer_size: non_neg_integer,
2828+2929+ storage_servers: map,
3030+ }
3131+2232 @enforce_keys [
2333 :id,
2434 :cluster,
2525- :meta_store,
3535+ :shard_map,
2636 :last_committed_version,
27372838 :buffer,
···8797 end
88988999 def init(%{id: id, cluster: %Cluster{} = cluster, meta_pairs: meta_pairs}) do
100100+ %TLogGeneration{} = current_generation = hd(cluster.tlog_generations)
101101+ assert current_generation.generation == cluster.generation
102102+90103 state = %State{
91104 id: id,
92105 cluster: cluster,
9393- meta_store: MetaStore.new(),
106106+ shard_map: ShardTagMap.new(current_generation),
94107 last_committed_version: 0,
9510896109 buffer: [],
···100113 }
101114102115 seed_meta = Enum.map(meta_pairs, fn {k, v} -> {:write, k, v} end)
103103- MetaStore.apply_meta_mutations(state.meta_store, 0, seed_meta)
116116+ ShardTagMap.apply_metadata_mutations(state.shard_map, seed_meta)
104117105118 SimServer.send_after(self(), :flush, @flush_interval_ms)
106119 {:ok, state}
107120 end
108121109122 def handle_call({:get_shards, keys_or_ranges}, _from, state) when is_list(keys_or_ranges) do
110110- ms = state.meta_store
111111- version = state.last_committed_version
123123+ stm = state.shard_map
112124 storage_servers = state.storage_servers
113125114126 shard_lists =
115115- Enum.map(keys_or_ranges, &MetaStore.get_shards(ms, version, &1))
116116- |> Enum.map(fn shards ->
117117- Enum.map(shards, fn {sk, ek, ids} ->
118118- pids = Enum.map(ids, &Map.get(storage_servers, &1))
119119- {sk, ek, {ids, pids}}
127127+ Enum.map(keys_or_ranges, fn key_or_range ->
128128+ ShardTagMap.shards_for_key_or_range(stm, key_or_range)
129129+ |> Enum.map(fn {sk, ek, {_tlogs, _all_tags, from_tags}} ->
130130+ pids = Enum.map(from_tags, &Map.get(storage_servers, &1))
131131+ {sk, ek, {from_tags, pids}}
120132 end)
121133 end)
134134+122135 {:reply, {:ok, shard_lists}, state}
123136 end
124137···187200 txn.read_version,
188201 txn.read_conflicts,
189202 txn.write_conflicts,
190190- extract_meta(txn.mutations),
203203+ Enum.filter(txn.mutations, &meta_mutation?/1),
191204 } | acc]
192205 end)
193206···202215 {txn_results_reversed, meta_log} = Resolver.resolve_batch(resolver_pid, batch)
203216204217 # Apply meta mutations received, including our own from this batch
205205- Enum.each(meta_log, fn {commit_version, mutations} ->
206206- MetaStore.apply_meta_mutations(state.meta_store, commit_version, mutations)
218218+ Enum.each(meta_log, fn {_commit_version, mutations} ->
219219+ ShardTagMap.apply_metadata_mutations(state.shard_map, mutations)
207220 end)
208221209222 {allowed_transactions, rejected_transactions} =
···216229217230 # If the database is locked, filter out all non-meta transactions
218231 # TODO: use a lock-aware flag instead like FDB
219219- {allowed_transactions, locked_transactions} =
220220- case MetaStore.locked?(state.meta_store, commit_version) do
221221- true -> Enum.split_with(allowed_transactions, fn txn -> has_meta?(txn.mutations) end)
222222- false -> {allowed_transactions, []}
223223- end
232232+ # TODO: removed for now, bring back later without MetaStore
233233+ #{allowed_transactions, locked_transactions} =
234234+ # case MetaStore.locked?(state.meta_store, commit_version) do
235235+ # true -> Enum.split_with(allowed_transactions, fn txn -> has_meta?(txn.mutations) end)
236236+ # false -> {allowed_transactions, []}
237237+ # end
238238+ locked_transactions = []
224239225240 # Add storage tags to each mutation (including special meta tag for meta mutations)
226226- meta_store = state.meta_store
227227- tagged_mutations =
241241+ tlog_mutations =
228242 allowed_transactions
229243 |> Enum.map(fn %CommitTxn{} = txn -> txn.mutations end)
230244 |> Enum.concat()
231245 |> then(fn mutations ->
232246 mutations ++ compute_special_mutations(mutations)
233247 end)
234234- |> Enum.with_index()
235235- |> Enum.map(fn {mut, i} ->
236236- # TODO: will have to split range clears
237237- tags = MetaStore.get_key_server_mutation_tags(meta_store, commit_version, mutation_key(mut))
238238- {tags, {i, mut}}
248248+ |> then(fn mutations ->
249249+ ShardTagMap.tag_and_slice_mutations(state.shard_map, mutations)
239250 end)
240251241241- # TODO: get latest generation only
242242- assert state.cluster.status == :normal
243243- tlogs =
244244- hd(state.cluster.tlog_generations).tlog_ids
245245- |> Enum.map(fn id -> Map.fetch!(state.cluster.servers, id) end)
246246-247247- num_tlogs = length(tlogs)
248248- all_tlogs = Enum.to_list(0..(num_tlogs - 1))
249249- # TODO: maybe a tuple is better? there are not that many tlogs
250250- log_mutations_reversed = Map.new(all_tlogs, fn i -> {i, []} end)
251251-252252- # Slice up mutations for tlogs
253253- # Note: prepending reverses the mutations, hence the name
254254- # (they are reversed again when the batch is created below)
255255- log_mutations_reversed =
256256- Enum.reduce(tagged_mutations, log_mutations_reversed, fn {tags, _mut} = tm, acc ->
257257- tlogs =
258258- case -1 in tags do
259259- # Send meta mutations (tagged with -1) to all tlogs
260260- true -> all_tlogs
261261- false -> tlogs_for_tags(num_tlogs, @tlog_replication_factor, tags)
262262- end
263263- Enum.reduce(tlogs, acc, fn tlog_i, acc ->
264264- # Prepend mutation onto list for each tlog batch
265265- Map.put(acc, tlog_i, [tm | Map.fetch!(acc, tlog_i)])
266266- end)
267267- end)
252252+ tlog_ids = hd(state.cluster.tlog_generations).tlog_ids
268253269254 # Send sliced mutations to each tlog
270270- all_tlogs
271271- |> Enum.map(fn tlog_i ->
272272- tagged_mutations = Enum.reverse(Map.fetch!(log_mutations_reversed, tlog_i))
255255+ tlog_ids
256256+ |> Enum.map(fn tlog_id ->
257257+ tagged_mutations = Map.fetch!(tlog_mutations, tlog_id)
273258274259 log_batch = %LogBatch{
275260 commit_buffer_id: state.id,
···279264 last_committed_version: state.last_committed_version,
280265 }
281266282282- %Server{pid: tlog_pid} = Enum.at(tlogs, tlog_i)
267267+ %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, tlog_id)
283268 TLog.write_batch_send(tlog_pid, log_batch)
284269 end)
285270 |> Enum.each(fn req_id ->
···313298 end)
314299315300 %State{state | last_committed_version: commit_version, buffer: [], buffer_size: 0}
316316- end
317317-318318- defp has_meta?(mutations) do
319319- Enum.any?(mutations, &meta_mutation?/1)
320320- end
321321-322322- defp extract_meta(mutations) do
323323- Enum.filter(mutations, &meta_mutation?/1)
324301 end
325302end
+11-11
lib/servers/manager.ex
···267267268268 meta_pairs = build_seed_meta(config, storage_ids)
269269270270+ first_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: 0, tlog_ids: tlog_ids}
271271+ state = put_in(state.cluster.tlog_generations, [first_tlog_generation])
272272+270273 cluster = state.cluster
271274 gen = cluster.generation
272275···283286284287 # Write the first generation into the Coordinators
285288 # Once complete, any future recoveries will have to start from these TLogs
286286- first_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: 0, tlog_ids: tlog_ids}
287289 {:ok, :ok} = Coordinator.write_generation(state.primary_coordinator, first_tlog_generation.generation, first_tlog_generation.start_version, first_tlog_generation.tlog_ids)
288290289291 state = put_in(state.cluster.tlog_generations, [first_tlog_generation])
···311313 %Server{pid: old_tlog_pid} = Map.fetch!(state.cluster.servers, hd(state.recovered_tlogs).id)
312314 {:ok, meta_pairs} = TLog.read_meta_store(old_tlog_pid, min_dv)
313315314314- #dbg {state.cluster.generation, max_kcv, min_dv}
316316+ dbg {"Recovery", state.cluster.generation, max_kcv, min_dv}
315317 #dbg {meta_pairs, state.cluster.tlog_generations}
316318317319 ids = allocate_server_ids(state, state.config.num_tlogs + state.config.num_commit_buffers + 3)
···319321 {commit_buffer_ids, ids} = Enum.split(ids, state.config.num_commit_buffers)
320322 [sequencer_id, resolver_id, distributor_id] = ids
321323322322- cluster = state.cluster
323323- gen = state.cluster.generation
324324+ # Create new TLog generation
325325+ new_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: max_kcv + 1, tlog_ids: tlog_ids}
326326+ assert new_tlog_generation.start_version > hd(state.cluster.tlog_generations).start_version
327327+328328+ state = update_in(state.cluster.tlog_generations, &[new_tlog_generation | &1])
324329325330 # Recruit new TLogs
331331+ cluster = state.cluster
332332+ gen = state.cluster.generation
326333 state = recruit(state, gen, Hobbes.Servers.TLog, tlog_ids, %{cluster: state.cluster, meta_pairs: meta_pairs})
327327-328328- new_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: max_kcv + 1, tlog_ids: tlog_ids}
329329- assert new_tlog_generation.start_version > hd(state.cluster.tlog_generations).start_version
330334331335 # Copy mutations in range [max_kcv + 1, min_dv] to the new TLogs
332336 {state, last_batch_version} = copy_mutations_to_new_generation(state, hd(state.cluster.tlog_generations), new_tlog_generation, meta_pairs, new_tlog_generation.start_version, min_dv)
···359363 # Write the new TLog generation to the Coordinators
360364 # Once complete, the effect of this recovery is permanent, and any future recoveries will have to start from these TLogs
361365 {:ok, :ok} = Coordinator.write_generation(state.primary_coordinator, new_tlog_generation.generation, new_tlog_generation.start_version, new_tlog_generation.tlog_ids)
362362-363363- state = update_in(state.cluster.tlog_generations, fn generations when is_list(generations) ->
364364- [new_tlog_generation | generations]
365365- end)
366366367367 put_in(state.cluster.status, :normal)
368368 end
+120
lib/shard_tag_map.ex
···11+defmodule Hobbes.ShardTagMap do
22+ alias Hobbes.{DenseShardMap, ShardTagMap, Utils}
33+ alias Hobbes.Structs.TLogGeneration
44+55+ import Hobbes.Utils
66+ import Hobbes.MetaStore, only: [decode_server_ids: 1]
77+88+ @type t :: %__MODULE__{
99+ tlog_ids: [non_neg_integer],
1010+ replication_factor: non_neg_integer,
1111+ shard_map: DenseShardMap.t,
1212+ }
1313+ @enforce_keys [:tlog_ids, :replication_factor, :shard_map]
1414+ defstruct @enforce_keys
1515+1616+ @spec new(TLogGeneration.t) :: t
1717+ def new(%TLogGeneration{} = generation) do
1818+ %ShardTagMap{
1919+ # TODO: parameterize
2020+ tlog_ids: generation.tlog_ids,
2121+ replication_factor: 3,
2222+ shard_map: DenseShardMap.new(),
2323+ }
2424+ end
2525+2626+ @spec apply_metadata_mutations(t, [Utils.mutation]) :: :ok
2727+ def apply_metadata_mutations(%ShardTagMap{} = stm, mutations) when is_list(mutations) do
2828+ %{shard_map: shard_map, tlog_ids: tlog_ids, replication_factor: rf} = stm
2929+ Enum.each(mutations, fn
3030+ {:write, key_servers_prefix() <> start_key, value} ->
3131+ {shard_all_tags, shard_from_tags} = server_tags_from_key_servers_value(value)
3232+ shard_tlog_ids = tlog_ids_for_servers(tlog_ids, shard_all_tags, rf)
3333+3434+ DenseShardMap.put(shard_map, start_key, {shard_tlog_ids, shard_all_tags, shard_from_tags})
3535+3636+ {:clear, key_servers_prefix() <> start_key} ->
3737+ DenseShardMap.delete(shard_map, start_key)
3838+3939+ _mut -> :noop
4040+ end)
4141+4242+ :ok
4343+ end
4444+4545+ @spec tag_and_slice_mutations(t, [Utils.mutation]) :: %{non_neg_integer => [Utils.tagged_mutation]}
4646+ def tag_and_slice_mutations(%ShardTagMap{} = stm, mutations) when is_list(mutations) do
4747+ %{shard_map: shard_map, tlog_ids: all_tlog_ids} = stm
4848+ tlog_mutations = Map.new(stm.tlog_ids, fn id -> {id, []} end)
4949+5050+ Enum.reduce(mutations, {tlog_mutations, 0}, fn mut, {acc, i} ->
5151+ {tlogs, tags} =
5252+ case mutation_key(mut) do
5353+ special_server_keys_prefix() <> rest ->
5454+ # Special server_keys mutations are sent only to that particular Storage server
5555+ [id_str, _] = String.split(rest, "/")
5656+ tags = [String.to_integer(id_str)]
5757+ # Send to all TLogs for simplicity (shard moves should never be a bottleneck)
5858+ {all_tlog_ids, tags}
5959+6060+ meta_prefix() <> _ = key ->
6161+ {_sk, _ek, {_tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key)
6262+ # Tag meta mutations with meta tag (-1) and send to all tlogs
6363+ tags = [meta_tag() | tags]
6464+ {all_tlog_ids, tags}
6565+6666+ key ->
6767+ {_sk, _ek, {tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key)
6868+ # Send to pre-computed tlogs/tags for this shard
6969+ {tlogs, tags}
7070+ end
7171+7272+ tagged_mut = {tags, {i, mut}}
7373+ acc = Enum.reduce(tlogs, acc, fn tlog_id, acc ->
7474+ Map.update!(acc, tlog_id, &[tagged_mut | &1])
7575+ end)
7676+ {acc, i + 1}
7777+ end)
7878+ |> then(fn {tlog_mutations, _i} ->
7979+ Map.new(tlog_mutations, fn {tlog_id, mutations_reversed} -> {tlog_id, Enum.reverse(mutations_reversed)} end)
8080+ end)
8181+ end
8282+8383+ @spec shards_for_key_or_range(t, binary | {binary, binary}) :: [{binary, binary, {list, Utils.tag_list, Utils.tag_list}}]
8484+ def shards_for_key_or_range(%ShardTagMap{} = stm, key) when is_binary(key) do
8585+ [DenseShardMap.shard_for_key(stm.shard_map, key)]
8686+ end
8787+8888+ def shards_for_key_or_range(%ShardTagMap{} = stm, {start_key, end_key}) do
8989+ DenseShardMap.shards_for_range(stm.shard_map, start_key, end_key)
9090+ end
9191+9292+ # Returns {all_tags, from_tags} where all_tags is (from_tags ++ to_tags)
9393+ defp server_tags_from_key_servers_value(value) do
9494+ case String.split(value, "/") do
9595+ [from_str, to_str] ->
9696+ from_ids = decode_server_ids(from_str)
9797+ {from_ids ++ decode_server_ids(to_str), from_ids}
9898+9999+ [from_str] ->
100100+ from_ids = decode_server_ids(from_str)
101101+ {from_ids, from_ids}
102102+ end
103103+ end
104104+105105+ @doc false
106106+ @spec tlog_ids_for_servers([non_neg_integer], [non_neg_integer], non_neg_integer) :: [non_neg_integer]
107107+ def tlog_ids_for_servers(tlog_ids, server_ids, min_replicas) do
108108+ ids = server_ids |> Enum.map(&buddy_tlog_id(tlog_ids, &1)) |> Enum.uniq()
109109+ length = length(ids)
110110+111111+ case length < min_replicas do
112112+ true ->
113113+ needed = min_replicas - length
114114+ remaining = tlog_ids -- ids
115115+ ids ++ Enum.take_random(remaining, needed)
116116+117117+ false -> ids
118118+ end
119119+ end
120120+end
+3-2
lib/sparse_shard_map.ex
···11defmodule Hobbes.SparseShardMap do
22 @moduledoc """
33- This module implements a sparse shard map on top of an ETS table.
44- The shard map is used by storage servers to track which shards it can safely serve reads for.
33+ This module implements a *Sparse* Shard Map where only some shards are present.
5465 Shards are stored in a tree as key/value pairs of `{start_key, end_key}`.
76 The shards are required to be disjoint, but no safety checks are performed as it is assumed
87 that all shard updates coming from the database are correct.
88+99+ `SparseShardMap` is used by `Storage` servers to keep track of which shards they can serve.
910 """
10111112 @type t :: :ets.table
···11+defmodule Hobbes.ShardTagMapTest do
22+ use ExUnit.Case, async: true
33+44+ alias Hobbes.ShardTagMap
55+66+ @moduletag :shard_tag_map
77+88+ describe "tlog_ids_for_servers/2" do
99+ test "returns tlog ids" do
1010+ # TODO: this test will be flaky if it ever fails, it should really be a fuzz test
1111+ # but for now the function is so simple it's not worth testing further
1212+ tlog_ids = [0, 1, 2, 3, 4, 5]
1313+1414+ assert [0, 1, 2] = ShardTagMap.tlog_ids_for_servers(tlog_ids, [0, 1, 2], 3)
1515+1616+ assert [1, 2, _] = ids1 = ShardTagMap.tlog_ids_for_servers(tlog_ids, [1, 7, 8], 3)
1717+ assert length(Enum.uniq(ids1)) == 3
1818+1919+ assert [1, _, _] = ids2 = ShardTagMap.tlog_ids_for_servers(tlog_ids, [1, 7, 13], 3)
2020+ assert length(Enum.uniq(ids2)) == 3
2121+ end
2222+ end
2323+end