···6767 - [X] Handle deleted ranges in HybridKV.flush
6868 - [X] Handle deleted ranges in HybridKV.scan
6969 - [ ] Implement range clears in the database
7070+- [ ] Use ets to store Resolver mutation log and pop unneeded entries
7071- [ ] Recovery
7172 - [ ] Monitor all transaction processes in Manager to detect failures
7273 - [ ] Lock TLogs and choose recovery version
+7-5
lib/meta_store.ex
···3535 alias Hobbes.{MemKV, MetaStore}
3636 alias Hobbes.Structs.{Cluster, RangeResult}
37373838+ alias Hobbes.Utils
3839 import Hobbes.Utils
39404041 @enforce_keys [:kv]
···187188 Returns a list of diffs of the form `{key, {previous_value, new_value}}`
188189 for use in implementing side-effects.
189190 """
190190- @spec apply_meta_mutations(%MetaStore{}, non_neg_integer, [{binary, binary}]) :: [{binary, {binary, binary}}]
191191+ @spec apply_meta_mutations(%MetaStore{}, non_neg_integer, [Utils.mutation]) :: [{binary, {binary, binary}}]
191192 def apply_meta_mutations(%MetaStore{} = store, commit_version, meta_mutations) when is_integer(commit_version) and is_list(meta_mutations) do
192193 # TODO: should the return values be deduplicated?
193194 # A key could be written multiple times in a batch...
194194- Enum.map(meta_mutations, fn {@meta_prefix <> _ = k, v} when is_binary(k) and is_binary(v) ->
195195- existing = get(store.kv, commit_version, k)
196196- put(store.kv, commit_version, k, v)
197197- {k, {existing, v}}
195195+ Enum.map(meta_mutations, fn
196196+ {:write, @meta_prefix <> _ = k, v} when is_binary(k) and is_binary(v) ->
197197+ existing = get(store.kv, commit_version, k)
198198+ put(store.kv, commit_version, k, v)
199199+ {k, {existing, v}}
198200 end)
199201 end
200202
+41-39
lib/servers/commit_buffer.ex
···114114 end
115115116116 def handle_call({:commit, %CommitTxn{} = txn}, from, state) do
117117- # TODO: error handling?
118118- validate_transaction!(txn)
119119-120117 txn = %CommitTxn{txn | from: from}
121118 state = %State{state | buffer: [txn | state.buffer], buffer_size: state.buffer_size + 1}
122119 state = maybe_commit_batch(state)
···136133 # in order to keep meta mutations flowing from resolvers
137134 @spec commit_batch(%State{}) :: %State{}
138135 defp commit_batch(%State{} = state) do
139139- transactions =
140140- state.buffer
141141- |> Enum.reverse()
142142- |> Enum.with_index()
143143- |> Enum.map(fn {%CommitTxn{} = txn, i} ->
144144- %CommitTxn{txn |
145145- id: i,
146146- meta: txn.mutations |> Enum.any?(fn {k, _v} -> MetaStore.is_meta?(k) end),
147147- }
148148- end)
149149-150136 [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer)
151137 {commit_version, prev_commit_version} = Sequencer.get_commit_version(seq_pid)
152138139139+ transactions_reversed = state.buffer
140140+141141+ # Buffer is reversed, so these are in the correct order
142142+ resolver_txns =
143143+ Enum.reduce(transactions_reversed, [], fn %CommitTxn{} = txn, acc ->
144144+ [{
145145+ txn.read_version,
146146+ txn.read_conflicts,
147147+ txn.write_conflicts,
148148+ extract_meta(txn.mutations),
149149+ } | acc]
150150+ end)
151151+153152 batch = %CommitBatch{
154153 commit_buffer_id: state.id,
155154 commit_version: commit_version,
156155 prev_commit_version: prev_commit_version,
157157- transactions: transactions,
156156+ transactions: resolver_txns,
158157 }
159158160159 [%Server{pid: resolver_pid}] = get_servers(state.cluster, Hobbes.Servers.Resolver)
161161- {allowed_txn_ids, meta_log} = Resolver.resolve_batch(resolver_pid, batch)
160160+ {txn_results_reversed, meta_log} = Resolver.resolve_batch(resolver_pid, batch)
162161162162+ # Apply meta mutations received, including our own from this batch
163163 Enum.each(meta_log, fn {commit_version, mutations} ->
164164 MetaStore.apply_meta_mutations(state.meta_store, commit_version, mutations)
165165 end)
166166167167 {allowed_transactions, rejected_transactions} =
168168- batch.transactions
169169- |> Enum.split_with(fn %CommitTxn{} = txn ->
170170- txn.id in allowed_txn_ids
168168+ # Both are reversed, so they come out of this the right way around
169169+ Enum.zip(transactions_reversed, txn_results_reversed)
170170+ |> Enum.reduce({[], []}, fn
171171+ {txn, true}, {a, r} -> {[txn | a], r}
172172+ {txn, false}, {a, r} -> {a, [txn | r]}
171173 end)
172174173175 # If the database is locked, filter out all non-meta transactions
174176 # TODO: use a lock-aware flag instead like FDB
175177 {allowed_transactions, locked_transactions} =
176178 case MetaStore.locked?(state.meta_store, commit_version) do
177177- true -> Enum.split_with(allowed_transactions, fn %CommitTxn{} = txn -> txn.meta end)
179179+ true -> Enum.split_with(allowed_transactions, fn txn -> has_meta?(txn.mutations) end)
178180 false -> {allowed_transactions, []}
179181 end
180182···184186 allowed_transactions
185187 |> Enum.map(fn %CommitTxn{} = txn -> txn.mutations end)
186188 |> Enum.concat()
187187- |> Enum.map(fn {k, _v} = mut ->
188188- tags = MetaStore.get_key_server_mutation_tags(meta_store, commit_version, k)
189189- {tags, mut}
189189+ |> Enum.map(fn
190190+ {:write, k, _v} = mut ->
191191+ tags = MetaStore.get_key_server_mutation_tags(meta_store, commit_version, k)
192192+ {tags, mut}
193193+ # TODO: clears
194194+ # TODO: range clears (will have to be split)
190195 end)
191196192197 # TODO: get latest generation only
···201206 # Note: prepending reverses the mutations, hence the name
202207 # (they are reversed again when the batch is created below)
203208 log_mutations_reversed =
204204- Enum.reduce(tagged_mutations, log_mutations_reversed, fn {tags, {k, _v}} = tm, acc ->
209209+ Enum.reduce(tagged_mutations, log_mutations_reversed, fn {tags, _mut} = tm, acc ->
205210 tlogs =
206206- # Note: could also use (-1 in tags) to check if meta - which is faster?
207207- case MetaStore.is_meta?(k) do
208208- # Send meta mutations (already tagged with -1 above) to all tlogs
211211+ case -1 in tags do
212212+ # Send meta mutations (tagged with -1) to all tlogs
209213 true -> all_tlogs
210214 false -> tlogs_for_tags(num_tlogs, @tlog_replication_factor, tags)
211215 end
···255259 %State{state | last_commit_version: commit_version, buffer: [], buffer_size: 0}
256260 end
257261258258- defp validate_transaction!(%CommitTxn{} = txn) do
259259- unless is_integer(txn.read_version) and txn.read_version >= 0 do
260260- raise "Invalid read_version #{inspect(txn.read_version)} found in transaction #{inspect(txn)}"
261261- end
262262-263263- Enum.each(txn.reads, fn
264264- k when is_binary(k) -> :noop
265265- {s, e} when is_binary(s) and is_binary(e) -> :noop
266266- other -> raise "Invalid read #{inspect(other)} found in transaction #{inspect(txn)}"
262262+ defp has_meta?(mutations) do
263263+ Enum.any?(mutations, fn
264264+ {:write, "\xFF" <> _, _v} -> true
265265+ {:write, _k, _v} -> false
267266 end)
267267+ end
268268269269- Enum.each(txn.mutations, fn
270270- {k, v} when is_binary(k) and is_binary(v) -> :noop
271271- other -> raise "Invalid mutation #{inspect(other)} found in transaction #{inspect(txn)}"
269269+ defp extract_meta(mutations) do
270270+ Enum.filter(mutations, fn
271271+ # TODO: clears, range clears
272272+ {:write, "\xFF" <> _, _v} -> true
273273+ {:write, _k, _v} -> false
272274 end)
273275 end
274276end
+53-38
lib/servers/resolver.ex
···11defmodule Hobbes.Servers.Resolver do
22 use Hobbes.Construct.SimServer
3344- alias Hobbes.Structs.{Cluster, CommitBatch, CommitTxn}
44+ alias Hobbes.Structs.{Cluster, CommitBatch}
55 alias Hobbes.VersionMap
66+77+ alias Hobbes.Utils
6879 defmodule State do
810 @enforce_keys [
···23252426 def start_link, do: SimServer.start_link(__MODULE__, nil)
25272626- @spec resolve_batch(pid, %CommitBatch{}) :: {:allow, [non_neg_integer]}
2828+ @doc """
2929+ Resolves a batch of transactions.
3030+3131+ Returns a tuple of `{results, meta_log}`.
3232+3333+ Results is a **reversed** list of booleans where each entry is
3434+ true if the transaction was allowed and false if not.
3535+3636+ Meta log is a list of batches of mutations applied by other
3737+ CommitBuffers since the last request from this CommitBuffer.
3838+ """
3939+ @spec resolve_batch(pid, %CommitBatch{}) :: {[boolean], [{non_neg_integer, [Utils.mutation]}]}
2740 def resolve_batch(server, %CommitBatch{} = batch) do
2841 SimServer.call(server, {:resolve_batch, batch})
2942 end
···6275 {{from, %CommitBatch{} = batch}, buffer} ->
6376 state = %State{state | buffer: buffer}
64776565- do_resolve_batch(state, from, batch)
7878+ do_resolve_batch(batch, from, state)
6679 |> maybe_resolve_next()
6780 {nil, _buffer} ->
6881 state
6982 end
7083 end
71847272- defp do_resolve_batch(%State{} = state, from, %CommitBatch{} = batch) do
7373- {state, allowed_ids, allowed_meta} =
7474- batch.transactions
7575- |> Enum.reduce({state, [], []}, fn %CommitTxn{} = txn, {%State{} = state, allowed_ids, allowed_meta} ->
7676- case allow_transaction?(state, txn.read_version, txn.reads) do
8585+ defp do_resolve_batch(%CommitBatch{} = batch, from, %State{} = state) do
8686+ vm = state.version_map
8787+ commit_version = batch.commit_version
8888+8989+ {results, allowed_meta_mutations} =
9090+ Enum.reduce(batch.transactions, {[], []}, fn txn, acc ->
9191+ {results, allowed_meta_mutations} = acc
9292+ {read_version, read_conflicts, write_conflicts, meta_mutations} = txn
9393+9494+ case allow_read_conflicts?(vm, read_version, read_conflicts) do
7795 true ->
7878- nil
9696+ update_write_conflicts(vm, commit_version, write_conflicts)
9797+7998 {
8080- update_last_commits(state, batch.commit_version, txn.mutations),
8181- [txn.id | allowed_ids],
8282- case txn.meta do
8383- true -> [txn | allowed_meta]
8484- false -> allowed_meta
8585- end
9999+ [true | results],
100100+ [meta_mutations | allowed_meta_mutations],
86101 }
8710288103 false ->
8989- {state, allowed_ids, allowed_meta}
104104+ {
105105+ [false | results],
106106+ allowed_meta_mutations,
107107+ }
90108 end
91109 end)
921109393- batch_meta_mutations =
9494- allowed_meta
9595- |> Enum.map(fn %CommitTxn{} = txn -> txn.mutations end)
9696- |> Enum.concat()
111111+ # TODO: we will need to keep track of the index of each allowed transaction once
112112+ # we have multiple Resolvers so that CommitBuffers can discern which transactions
113113+ # passed on all Resolvers
114114+ meta_mutations_log = [{commit_version, Enum.concat(allowed_meta_mutations)} | state.meta_mutations_log]
971159898- meta_mutations_log = [{batch.commit_version, batch_meta_mutations} | state.meta_mutations_log]
116116+ # TODO: store the log in ets, maybe reuse TaggedQueue?
99117 buffer_last_version = Map.get(state.commit_buffer_last_versions, batch.commit_buffer_id, 0)
100100-101118 reply_meta_mutations =
102102- meta_mutations_log
103103- |> Enum.reduce_while([], fn {commit_ver, _mutations} = meta_log_entry, acc ->
104104- case commit_ver >= buffer_last_version do
105105- true -> {:cont, [meta_log_entry | acc]}
119119+ Enum.reduce_while(meta_mutations_log, [], fn {ver, _mutations} = entry, acc ->
120120+ case ver > buffer_last_version do
121121+ true -> {:cont, [entry | acc]}
106122 false -> {:halt, acc}
107123 end
108124 end)
109125110110- SimServer.reply(from, {allowed_ids, reply_meta_mutations})
126126+ # Note: we intentionally reply with results reversed because it's
127127+ # better to reverse them on the CommitBuffer as it's less of a bottleneck
128128+ SimServer.reply(from, {results, reply_meta_mutations})
111129112130 %State{state |
113113- version: batch.commit_version,
114114- commit_buffer_last_versions: Map.put(state.commit_buffer_last_versions, batch.commit_buffer_id, batch.commit_version),
131131+ version: commit_version,
115132 meta_mutations_log: meta_mutations_log,
133133+ commit_buffer_last_versions: Map.put(state.commit_buffer_last_versions, batch.commit_buffer_id, commit_version),
116134 }
117135 end
118136119119- defp allow_transaction?(%State{version_map: vm}, read_version, reads)
120120- when is_integer(read_version) and is_list(reads) do
121121- Enum.all?(reads, fn read ->
122122- not VersionMap.written_after?(vm, read_version, read)
137137+ defp allow_read_conflicts?(version_map, read_version, read_conflicts) do
138138+ Enum.all?(read_conflicts, fn key_or_range ->
139139+ not VersionMap.written_after?(version_map, read_version, key_or_range)
123140 end)
124141 end
125142126126- defp update_last_commits(%State{} = state, commit_version, mutations) when is_integer(commit_version) and is_list(mutations) do
127127- keys = Enum.map(mutations, fn {k, _v} -> k end)
128128- VersionMap.add_writes(state.version_map, commit_version, keys)
129129-130130- state
143143+ defp update_write_conflicts(version_map, commit_version, write_conflicts) do
144144+ # TODO: handle ranges
145145+ VersionMap.add_writes(version_map, commit_version, write_conflicts)
131146 end
132147end
+14-9
lib/servers/storage.ex
···66 alias Hobbes.Structs.{Cluster, Server, RangeResult, ShardStats}
77 alias Hobbes.Servers.{TLog, Storage}
8899+ alias Hobbes.Utils
910 import Hobbes.Utils
10111112 defmodule ShardImport do
···293294 %State{state | data_version: largest_version}
294295 end
295296296296- @spec apply_batch(%State{}, non_neg_integer, [{binary, binary}], [{binary, binary}]) :: %State{}
297297+ @spec apply_batch(%State{}, non_neg_integer, [Utils.mutation], [Utils.mutation]) :: %State{}
297298 defp apply_batch(%State{} = state, commit_version, meta_mutations, data_mutations)
298299 when is_integer(commit_version) and is_list(meta_mutations) and is_list(data_mutations) do
299300 # Note: meta mutations are always applied first in a batch
···304305 end
305306306307 defp apply_data_mutations(%State{kv: kv} = state, version, data_mutations) when is_list(data_mutations) do
307307- Enum.each(data_mutations, fn {k, v} ->
308308- HybridKV.put(kv, version, k, v)
308308+ Enum.each(data_mutations, fn
309309+ {:write, k, v} ->
310310+ HybridKV.put(kv, version, k, v)
309311310310- case byte_sample_pair(k, v) do
311311- {sk, sv} -> HybridKV.put(kv, version, sk, sv)
312312- :not_in_sample -> :noop
313313- end
312312+ case byte_sample_pair(k, v) do
313313+ {sk, sv} -> HybridKV.put(kv, version, sk, sv)
314314+ :not_in_sample -> :noop
315315+ end
316316+ # TODO: clear
317317+ # TODO: range clear
314318 end)
315319316320 state
···350354 end
351355 end
352356353353- @spec apply_meta_mutations(%State{}, non_neg_integer, [{binary, binary}]) :: %State{}
357357+ @spec apply_meta_mutations(%State{}, non_neg_integer, [Utils.mutation]) :: %State{}
354358 defp apply_meta_mutations(%State{} = state, _commit_version, []), do: state
355359356360 defp apply_meta_mutations(%State{} = state, commit_version, meta_mutations)
···472476 |> case do
473477 {:ok, %RangeResult{pairs: pairs, more: more}} ->
474478 # Write imported KV pairs at `read_version`
475475- apply_data_mutations(state, read_version, pairs)
479479+ # TODO: use a separate function for this to avoid the map
480480+ apply_data_mutations(state, read_version, Enum.map(pairs, fn {k, v} -> {:write, k, v} end))
476481477482 case more do
478483 true ->
+3-1
lib/servers/tlog.ex
···44 alias Hobbes.TaggedQueue
55 alias Hobbes.Structs.{Cluster, LogBatch}
6677+ alias Hobbes.Utils
88+79 @meta_tag -1
810911 defmodule State do
···3234 Returns `{{start_version, end_version}, {meta_mutations, data_mutations}}`.
3335 """
3436 @spec peek(pid, term, non_neg_integer | nil) ::
3535- {{non_neg_integer, non_neg_integer}, {[{non_neg_integer, [{binary, binary}]}], [{non_neg_integer, [{binary, binary}]}]}}
3737+ {{non_neg_integer, non_neg_integer}, {[{non_neg_integer, [Utils.mutation]}], [{non_neg_integer, [Utils.mutation]}]}}
3638 def peek(server, tag, start_version) do
3739 SimServer.call(server, {:peek, tag, start_version})
3840 end