···5959 - [ ] Add include_start/end to Transaction.read_range
6060 - [ ] Clears
6161 - [X] Implement clears in KVs
6262- - [ ] Implement clears in database
6262+ - [ ] Implement clears in the database
6363 - [ ] Range clears
6464+ - [ ] Implement range clears in KVs
6565+ - [X] Add persistent tree (RangeForest) to hold deleted ranges
6666+ - [X] Handle deleted ranges in HybridKV.get/put
6767+ - [X] Handle deleted ranges in HybridKV.flush
6868+ - [ ] Handle deleted ranges in HybridKV.scan
6969+ - [ ] Implement range clears in the database
6470- [ ] Recovery
6571 - [ ] Monitor all transaction processes in Manager to detect failures
6672 - [ ] Lock TLogs and choose recovery version
+59-21
lib/hybrid_kv.ex
···11defmodule Hobbes.HybridKV do
22- alias Hobbes.{HybridKV, MemKV, RangeForest}
22+ alias Hobbes.{HybridKV, MemKV, RangeForest, Utils}
33 alias Hobbes.RangeForest.RangeTree
44 alias Hobbes.KV.FlatKV
55 alias Hobbes.Structs.RangeResult
6677- @enforce_keys [:mem_kv, :storage_kv, :deleted_forest]
77+ @enforce_keys [:mem_kv, :storage_kv, :deleted_forest, :flushed_version]
88 defstruct @enforce_keys
991010 def new do
···1212 mem_kv: MemKV.new(),
1313 storage_kv: FlatKV.new(),
1414 deleted_forest: RangeForest.new(),
1515+ flushed_version: 0,
1516 }
1617 end
1718···3940 @spec get(%HybridKV{}, non_neg_integer, binary) :: binary | nil
4041 def get(%HybridKV{} = kv, version, key)
4142 when is_integer(version) and version >= 0 and is_binary(key) do
4242- floor_version =
4343- case RangeForest.tree_at(kv.deleted_forest, version) |> RangeTree.intersect_key(0, key) do
4444- {_sk, _ek, v} -> v + 1
4545- nil -> 0
4646- end
4343+ kv.deleted_forest
4444+ |> RangeForest.tree_at(version)
4545+ |> RangeTree.intersect_key(kv.flushed_version + 1, key)
4646+ |> case do
4747+ nil ->
4848+ # There are no overlapping range deletes
4949+ case MemKV.get(kv.mem_kv, version, 0, key) do
5050+ nil -> FlatKV.get(kv.storage_kv, key)
5151+ :deleted -> nil
5252+ value -> value
5353+ end
47544848- case MemKV.get(kv.mem_kv, version, floor_version, key) do
4949- nil -> FlatKV.get(kv.storage_kv, key)
5050- :deleted -> nil
5151- value -> value
5555+ {_sk, _ek, range_deleted_at} ->
5656+ # There is an overlapping range delete, so we return nil if
5757+ # a newer key is not found
5858+ case MemKV.get(kv.mem_kv, version, range_deleted_at + 1, key) do
5959+ nil -> nil
6060+ :deleted -> nil
6161+ value -> value
6262+ end
5263 end
5364 end
5465···198209 @doc """
199210 Flushes keys/values with a version <= `version` to unversioned storage.
200211 """
201201- @spec flush(%HybridKV{}, non_neg_integer) :: :ok
212212+ @spec flush(%HybridKV{}, non_neg_integer) :: %HybridKV{}
202213 def flush(%HybridKV{} = kv, version) when is_integer(version) and version >= 0 do
203203- MemKV.flush(kv.mem_kv, version)
204204- # Sort here is not needed for determinism, but the btree
205205- # inserts will probably prefer sorted data in the future
206206- # TODO: maybe return sorted keys from MemKV.flush?
207207- |> Enum.sort_by(&(&1))
208208- |> Enum.map(fn
209209- {k, :deleted} -> FlatKV.delete(kv.storage_kv, k)
210210- {k, v} -> FlatKV.put(kv.storage_kv, k, v)
214214+ {ranges, deleted_forest} = RangeForest.flush(kv.deleted_forest, version, kv.flushed_version + 1)
215215+216216+ # Gather all mutations (writes, deletes, and range deletes) and batch them together
217217+ # by version
218218+ ranges_per_version =
219219+ Enum.group_by(ranges, fn {_start, _end, v} -> v end)
220220+ |> Enum.sort_by(&elem(&1, 0))
221221+222222+ pairs_per_version =
223223+ MemKV.flush(kv.mem_kv, version)
224224+ |> Enum.group_by(fn {_key, _value, v} -> v end)
225225+ |> Enum.sort_by(&elem(&1, 0))
226226+227227+ batches = Utils.interleave_batches(pairs_per_version, ranges_per_version)
228228+229229+ storage_kv = kv.storage_kv
230230+ # Iterate through all batches and apply point writes first,
231231+ # then range deletes
232232+ #
233233+ # Note that position of the range deletes within the batch does
234234+ # not matter because we split them for writes which come later
235235+ # in the same version (in put/4)
236236+ #
237237+ # TODO: we should probably switch to a "mutation log" approach
238238+ # to reduce complexity (performance would probably be better anyway)
239239+ Enum.each(batches, fn {_v, pairs, ranges} ->
240240+ Enum.each(pairs, fn
241241+ {key, :deleted, _v} -> FlatKV.delete(storage_kv, key)
242242+ {key, value, _v} -> FlatKV.put(storage_kv, key, value)
243243+ end)
244244+245245+ Enum.each(ranges, fn {sk, ek, _v} ->
246246+ FlatKV.clear_range(storage_kv, sk, ek)
247247+ end)
211248 end)
212212- :ok
249249+250250+ %HybridKV{kv | deleted_forest: deleted_forest, flushed_version: version}
213251 end
214252215253 @spec put_storage(%HybridKV{}, binary, binary) :: :ok
+9-3
lib/mem_kv.ex
···208208 }
209209210210 """
211211- @spec flush(:ets.table, non_neg_integer) :: %{String.t => String.t}
211211+ @spec flush(:ets.table, non_neg_integer) :: [{binary, binary, non_neg_integer}]
212212 def flush(table, version) do
213213 # TODO: since this scans the entire table anyway, it can be rewritten
214214 # to be much faster with matchspecs
215215- do_flush_scan(table, version, {"", -1}, %{})
215215+ do_flush_scan(table, version, {"", -1}, [])
216216+ |> Enum.reverse()
216217 end
217218218219 defp do_flush_scan(table, version, prev, acc) do
···223224 [{^full_key, value}] = :ets.lookup(table, full_key)
224225 true = :ets.delete(table, full_key)
225226226226- do_flush_scan(table, version, full_key, Map.put(acc, key, value))
227227+ case acc do
228228+ [{^key, _value, _v} | rest] ->
229229+ do_flush_scan(table, version, full_key, [{key, value, ver} | rest])
230230+ acc ->
231231+ do_flush_scan(table, version, full_key, [{key, value, ver} | acc])
232232+ end
227233228234 false ->
229235 do_flush_scan(table, version, full_key, acc)
+7
lib/range_forest.ex
···162162 # tree to the end of the queue
163163 tree = RangeTree.insert_range(prev_tree, version, start_key, end_key)
164164 :gb_trees.insert(version, tree, forest)
165165+166166+ :none ->
167167+ # There is no previous tree, so we create a new one
168168+ tree = RangeTree.new() |> RangeTree.insert_range(version, start_key, end_key)
169169+ :gb_trees.insert(version, tree, forest)
165170 end
166171 end
167172···184189 def tree_at(forest, version) do
185190 case :gb_trees.smaller(version + 1, forest) do
186191 {_v, tree} -> tree
192192+ :none -> :gb_trees.empty()
187193 end
188194 end
189195···195201 {[], forest}
196202197203 {last_tree, forest} ->
204204+ # TODO: clear these ranges from the newest tree
198205 # TODO: should end_key be four \xFFs to include special space?
199206 ranges = RangeTree.intersect_range(last_tree, min_version, "", "\xFF\xFF")
200207 {ranges, forest}
+21-10
test/hybrid_kv_test.exs
···4747 e in [ExUnit.AssertionError] ->
4848 e = Map.update!(e, :message, &(&1 <> " (at op #{i}, seed=#{inspect(verifier.seed)})"))
4949 reraise e, __STACKTRACE__
5050+ e ->
5151+ require Logger
5252+ Logger.error("Error #{inspect(e)} at op=#{i}, seed=#{inspect(verifier.seed)}")
5353+ reraise e, __STACKTRACE__
5054 end
5155 end)
5256 end
···5660 dbg [
5761 hybrid_kv: HybridKV.dump(verifier.hybrid_kv),
5862 test_kv: TestKV.dump(verifier.test_kv),
6363+ durable_version: verifier.durable_version,
5964 ], limit: :infinity
6065 end
61666267 @ops [:put, :delete, :delete_range, :get]
6368 defp random_op do
6469 case Enum.random(1..100) do
6565- #1 -> :flush
7070+ 1 -> :flush
6671 _ -> Enum.random(@ops)
6772 end
6873 end
···147152148153 defp perform(:flush, %Verifier{hybrid_kv: hkv} = verifier) do
149154 flush_version = rand(verifier.durable_version, verifier.version)
150150- :ok = HybridKV.flush(hkv, flush_version)
155155+ hkv = HybridKV.flush(hkv, flush_version)
151156152152- %Verifier{verifier | durable_version: flush_version}
157157+ %Verifier{verifier | hybrid_kv: hkv, durable_version: flush_version}
153158 end
154159155160 defp inc_version(%Verifier{version: version} = verifier) do
···188193 describe "verify HybridKV" do
189194 @tag :hkv_verify
190195 test "operations" do
191191- Verifier.new({101, 101, 102})
196196+ Verifier.new({100, 101, 102})
192197 |> Verifier.run(1000)
193198 end
194199···224229 assert HybridKV.get(kv, 2, "foo") == "bar_2"
225230 assert HybridKV.get(kv, 3, "foo") == "bar_3"
226231227227- assert :ok = HybridKV.flush(kv, 2)
232232+ assert kv = HybridKV.flush(kv, 2)
228233229234 assert HybridKV.get(kv, 1, "foo") == "bar_2"
230235 assert HybridKV.get(kv, 2, "foo") == "bar_2"
···239244 {"foo", "bar"},
240245 ], more: false} = HybridKV.scan(kv, 1, "foo", "foo_z")
241246242242- assert :ok = HybridKV.flush(kv, 2)
247247+ assert kv = HybridKV.flush(kv, 2)
243248244249 assert %RangeResult{pairs: [
245250 {"foo", "bar_2"},
···264269 {"foo_c", "bar_c_3"},
265270 ], more: false} = HybridKV.scan(kv, 4, "foo", "foo_z")
266271267267- :ok = HybridKV.flush(kv, 4)
272272+ kv = HybridKV.flush(kv, 4)
268273269274 assert %RangeResult{pairs: [
270275 {"foo", "bar_3"},
···274279 end
275280276281 test "scan limit", %{kv: kv} do
277277- assert :ok = HybridKV.flush(kv, 2)
282282+ assert kv = HybridKV.flush(kv, 2)
278283279284 assert %RangeResult{pairs: [
280285 {"foo", "bar_3"},
···284289285290 test "scan deleted limit", %{kv: kv} do
286291 Enum.each(0..5, fn ver ->
287287- if ver != 0, do: HybridKV.flush(kv, ver)
292292+ kv = case ver == 0 do
293293+ true -> HybridKV.flush(kv, ver)
294294+ false -> kv
295295+ end
288296289297 assert %RangeResult{pairs: [
290298 {"foo", "bar_3"},
···299307 HybridKV.put(kv, 5, "foo_f", "bar_f_5")
300308301309 Enum.each(0..5, fn ver ->
302302- if ver != 0, do: HybridKV.flush(kv, ver)
310310+ kv = case ver == 0 do
311311+ true -> HybridKV.flush(kv, ver)
312312+ false -> kv
313313+ end
303314304315 assert %RangeResult{pairs: [
305316 {"foo", "bar_3"},