this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add mutation log to HybridKV

garrison 55783096 a3daf6ae

+186 -224
+30 -34
lib/hybrid_kv.ex
··· 1 1 defmodule Hobbes.HybridKV do 2 2 alias Hobbes.{HybridKV, MemKV, RangeForest, Utils} 3 3 alias Hobbes.RangeForest.RangeTree 4 - alias Hobbes.KV.{FlatKV, FlatStorageKV} 4 + alias Hobbes.KV.{FlatKV, FlatStorageKV, MutationLog} 5 5 alias Hobbes.Structs.RangeResult 6 6 7 7 @type t :: %__MODULE__{ ··· 9 9 storage_module: module, 10 10 storage_kv: FlatKV.t | FlatStorageKV.t, 11 11 deleted_forest: term, 12 + mutation_log: MutationLog.t, 12 13 flushed_version: non_neg_integer, 13 14 } 14 15 15 - @enforce_keys [:mem_kv, :storage_module, :storage_kv, :deleted_forest, :flushed_version] 16 + @enforce_keys [:mem_kv, :storage_module, :storage_kv, :deleted_forest, :mutation_log, :flushed_version] 16 17 defstruct @enforce_keys 17 18 18 19 def new(opts \\ []) do ··· 27 28 storage_module: storage_module, 28 29 storage_kv: storage_kv, 29 30 deleted_forest: RangeForest.new(), 31 + mutation_log: MutationLog.new(), 30 32 flushed_version: 0, 31 33 } 32 34 end ··· 274 276 defp acc_pair({_key, :deleted}, acc), do: acc 275 277 defp acc_pair({_key, _value} = pair, acc), do: [pair | acc] 276 278 279 + @spec apply_batch(t, non_neg_integer, [Utils.mutation]) :: :ok 280 + def apply_batch(kv, version, mutations) when is_list(mutations) do 281 + MutationLog.insert(kv.mutation_log, version, mutations) 282 + 283 + Enum.reduce(mutations, kv, fn 284 + {:write, k, v}, kv -> 285 + put(kv, version, k, v) 286 + {:clear, k}, kv -> 287 + delete(kv, version, k) 288 + kv 289 + {:clear_range, sk, ek}, kv -> 290 + delete_range(kv, version, sk, ek) 291 + end) 292 + end 293 + 277 294 @doc """ 278 295 Flushes keys/values with a version <= `version` to unversioned storage. 279 296 """ 280 297 @spec flush(%HybridKV{}, non_neg_integer) :: %HybridKV{} 281 298 def flush(%HybridKV{storage_module: s_mod, storage_kv: s_kv} = kv, version) when is_integer(version) and version >= 0 do 282 - {ranges, deleted_forest} = RangeForest.flush(kv.deleted_forest, version, kv.flushed_version + 1) 283 - 284 - # Gather all mutations (writes, deletes, and range deletes) and batch them together 285 - # by version 286 - ranges_per_version = 287 - Enum.group_by(ranges, fn {_start, _end, v} -> v end) 288 - |> Enum.sort_by(&elem(&1, 0)) 289 - 290 - pairs_per_version = 291 - MemKV.flush(kv.mem_kv, version) 292 - |> Enum.group_by(fn {_key, _value, v} -> v end) 293 - |> Enum.sort_by(&elem(&1, 0)) 294 - 295 - batches = Utils.interleave_batches(pairs_per_version, ranges_per_version) 296 - 297 - # Iterate through all batches and apply point writes first, 298 - # then range deletes 299 - # 300 - # Note that position of the range deletes within the batch does 301 - # not matter because we split them for writes which come later 302 - # in the same version (in put/4) 303 - # 304 - # TODO: we should probably switch to a "mutation log" approach 305 - # to reduce complexity (performance would probably be better anyway) 306 - Enum.each(batches, fn {_v, pairs, ranges} -> 307 - Enum.each(pairs, fn 308 - {key, :deleted, _v} -> s_mod.delete(s_kv, key) 309 - {key, value, _v} -> s_mod.put(s_kv, key, value) 310 - end) 299 + batches = MutationLog.pop_up_to(kv.mutation_log, version) 311 300 312 - Enum.each(ranges, fn {sk, ek, _v} -> 313 - s_mod.delete_range(s_kv, sk, ek) 301 + Enum.each(batches, fn {_v, mutations} -> 302 + Enum.each(mutations, fn 303 + {:write, k, v} -> s_mod.put(s_kv, k, v) 304 + {:clear, k} -> s_mod.delete(s_kv, k) 305 + {:clear_range, sk, ek} -> s_mod.delete_range(s_kv, sk, ek) 314 306 end) 315 307 end) 316 308 317 - %HybridKV{kv | deleted_forest: deleted_forest, flushed_version: version} 309 + {_ranges, deleted_forest} = RangeForest.flush(kv.deleted_forest, version, kv.flushed_version + 1) 310 + MemKV.flush(kv.mem_kv, version) 311 + 312 + %{kv | deleted_forest: deleted_forest, flushed_version: version} 318 313 end 319 314 320 315 @spec put_storage(t, binary, binary) :: :ok ··· 355 350 mem_kv: MemKV.dump(kv.mem_kv), 356 351 storage_kv: kv.storage_module.dump(kv.storage_kv), 357 352 deleted_forest: RangeForest.dump(kv.deleted_forest), 353 + mutation_log: MutationLog.dump(kv.mutation_log), 358 354 } 359 355 end 360 356 end
+53
lib/kv/mutation_log.ex
··· 1 + defmodule Hobbes.KV.MutationLog do 2 + alias Hobbes.Utils 3 + 4 + @type t :: :ets.table 5 + 6 + @spec new :: t 7 + def new do 8 + :ets.new(__MODULE__, [:ordered_set, :private]) 9 + end 10 + 11 + @spec insert(t, non_neg_integer, [Utils.mutation]) :: :ok 12 + def insert(table, version, mutations) when is_integer(version) and is_list(mutations) do 13 + :ets.insert(table, {version, mutations}) 14 + :ok 15 + end 16 + 17 + @spec append(t, non_neg_integer, [Utils.mutation]) :: :ok 18 + def append(table, version, mutations) when is_integer(version) and is_list(mutations) do 19 + case :ets.lookup(table, version) do 20 + [{^version, existing}] -> :ets.insert(table, {version, existing ++ mutations}) 21 + [] -> :ets.insert(table, {version, mutations}) 22 + end 23 + :ok 24 + end 25 + 26 + @spec pop_up_to(t, non_neg_integer) :: [{non_neg_integer, [Utils.mutation]}] 27 + def pop_up_to(table, end_version) when is_integer(end_version) do 28 + scan_pop(table, end_version, -1, []) 29 + |> Enum.reverse() 30 + end 31 + 32 + defp scan_pop(table, end_version, prev_version, acc) do 33 + case :ets.next(table, prev_version) do 34 + ver when is_integer(ver) -> 35 + case ver <= end_version do 36 + true -> 37 + [{^ver, mutations}] = :ets.lookup(table, ver) 38 + :ets.delete(table, ver) 39 + 40 + scan_pop(table, end_version, ver, [{ver, mutations} | acc]) 41 + 42 + false -> acc 43 + end 44 + 45 + :"$end_of_table" -> acc 46 + end 47 + end 48 + 49 + @doc false 50 + def dump(table) do 51 + :ets.tab2list(table) 52 + end 53 + end
+8
lib/kv/test_kv.ex
··· 89 89 end) 90 90 end 91 91 92 + def apply_batch(%TestKV{} = kv, version, mutations) do 93 + Enum.reduce(mutations, kv, fn 94 + {:write, k, v}, kv -> put(kv, version, k, v) 95 + {:clear, k}, kv -> delete(kv, version, k) 96 + {:clear_range, sk, ek}, kv -> delete_range(kv, version, sk, ek) 97 + end) 98 + end 99 + 92 100 def dump(%TestKV{} = kv) do 93 101 kv.maps[kv.version] 94 102 |> Enum.sort_by(&elem(&1, 0))
+7 -27
lib/servers/storage.ex
··· 5 5 import ExUnit.Assertions, only: [assert: 1] 6 6 7 7 alias Hobbes.{HybridKV, MetaStore, SparseShardMap} 8 + alias Hobbes.KV.MutationLog 8 9 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, PeekResult, RangeResult, ShardStats} 9 10 alias Hobbes.Servers.{CommitBuffer, TLog, Storage, Distributor} 10 11 ··· 510 511 end 511 512 512 513 defp apply_data_mutations(%State{kv: kv} = state, version, data_mutations) when is_list(data_mutations) do 513 - Enum.each(data_mutations, fn 514 - {:write, k, v} -> 515 - HybridKV.put(kv, version, k, v) 516 - 517 - case byte_sample_pair(k, v) do 518 - {:in_sample, sk, sv} -> HybridKV.put(kv, version, sk, sv) 519 - {:not_in_sample, sk} -> 520 - case HybridKV.get(kv, version, sk) do 521 - nil -> :noop 522 - _value -> HybridKV.delete(kv, version, sk) 523 - end 524 - end 525 - {:clear, k} -> 526 - HybridKV.delete(kv, version, k) 527 - 528 - sk = byte_sample_key(k) 529 - case HybridKV.get(kv, version, sk) do 530 - nil -> :noop 531 - _value -> HybridKV.delete(kv, version, sk) 532 - end 533 - # TODO: range clear 534 - end) 535 - 536 - state 514 + # TODO: append byte sample mutations to batch 515 + kv = HybridKV.apply_batch(kv, version, data_mutations) 516 + %{state | kv: kv} 537 517 end 538 518 539 519 defp byte_sample_key(key), do: @byte_sample_prefix <> key ··· 699 679 end 700 680 |> case do 701 681 {:ok, %RangeResult{pairs: pairs, more: more}} -> 702 - # Write imported KV pairs at `read_version` 703 - # TODO: use a separate function for this to avoid the map 704 - apply_data_mutations(state, read_version, Enum.map(pairs, fn {k, v} -> {:write, k, v} end)) 682 + # Write imported KV pairs into the mutation log at `read_version` 683 + mutations = Enum.map(pairs, fn {k, v} -> {:write, k, v} end) 684 + MutationLog.append(state.kv.mutation_log, read_version, mutations) 705 685 706 686 case more do 707 687 true ->
+3 -3
lib/workloads/read_write.ex
··· 103 103 if expected_value != received_value do 104 104 raise """ 105 105 Received unexpected value! 106 - Key: #{key} 107 - Expected: #{expected_value} 108 - Received: #{received_value} 106 + Key: #{inspect(key)} 107 + Expected: #{inspect(expected_value)} 108 + Received: #{inspect(received_value)} 109 109 """ 110 110 end 111 111 end)
+37 -160
test/hybrid_kv_test.exs
··· 3 3 4 4 alias Hobbes.HybridKV 5 5 alias Hobbes.KV.TestKV 6 - alias Hobbes.Structs.RangeResult 7 6 8 7 @moduletag :hybrid_kv 9 8 ··· 64 63 ], limit: :infinity 65 64 end 66 65 67 - @ops [:put, :delete, :delete_range, :get, :scan] 66 + @ops [:apply_batch, :get, :scan] 68 67 defp random_op do 69 68 case Enum.random(1..100) do 70 69 1 -> :flush ··· 72 71 end 73 72 end 74 73 75 - defp perform(:put, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do 76 - verifier = inc_version(verifier) 77 - 78 - version = verifier.version 79 - key = rand_hash() 80 - value = rand_hash() 81 - 82 - hkv = HybridKV.put(hkv, version, key, value) 83 - tkv = TestKV.put(tkv, version, key, value) 84 - 85 - %Verifier{verifier | hybrid_kv: hkv, test_kv: tkv} 86 - end 87 - 88 - defp perform(:delete, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do 89 - verifier = inc_version(verifier) 90 - 91 - version = verifier.version 92 - key = rand_hash() 93 - 94 - :ok = HybridKV.delete(hkv, version, key) 95 - tkv = TestKV.delete(tkv, version, key) 96 - 97 - %Verifier{verifier | hybrid_kv: hkv, test_kv: tkv} 98 - end 99 - 100 - defp perform(:delete_range, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do 101 - verifier = inc_version(verifier) 102 - 103 - version = verifier.version 104 - start_key = rand_hash() 105 - end_key = rand_hash() 106 - {start_key, end_key} = 107 - cond do 108 - start_key < end_key -> {start_key, end_key} 109 - start_key > end_key -> {end_key, start_key} 110 - start_key == end_key -> {start_key, start_key <> "\x00"} 111 - end 112 - 113 - hkv = HybridKV.delete_range(hkv, version, start_key, end_key) 114 - tkv = TestKV.delete_range(tkv, version, start_key, end_key) 115 - 116 - %Verifier{verifier | hybrid_kv: hkv, test_kv: tkv} 117 - end 118 - 119 74 defp perform(:get, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do 120 75 read_version = rand_read_version(verifier) 121 76 ··· 150 105 verifier 151 106 end 152 107 108 + defp perform(:apply_batch, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do 109 + verifier = %{verifier | version: verifier.version + Enum.random(1..1000)} 110 + 111 + mutation_count = Enum.random(1..10) 112 + mutations = Enum.reduce(1..mutation_count, [], fn _i, acc -> 113 + [rand_mutation() | acc] 114 + end) 115 + 116 + version = verifier.version 117 + hkv = HybridKV.apply_batch(hkv, version, mutations) 118 + tkv = TestKV.apply_batch(tkv, version, mutations) 119 + 120 + %{verifier | hybrid_kv: hkv, test_kv: tkv} 121 + end 122 + 153 123 defp perform(:flush, %Verifier{hybrid_kv: hkv} = verifier) do 154 124 flush_version = rand(verifier.durable_version, verifier.version) 155 125 hkv = HybridKV.flush(hkv, flush_version) 156 126 157 127 %Verifier{verifier | hybrid_kv: hkv, durable_version: flush_version} 128 + end 129 + 130 + defp rand_mutation do 131 + case Enum.random(1..10) do 132 + 1 -> 133 + start_key = rand_hash() 134 + end_key = rand_hash() 135 + {start_key, end_key} = 136 + cond do 137 + start_key < end_key -> {start_key, end_key} 138 + start_key > end_key -> {end_key, start_key} 139 + start_key == end_key -> {start_key, start_key <> "\x00"} 140 + end 141 + {:clear_range, start_key, end_key} 142 + 143 + i when i in [2, 3] -> 144 + {:clear, rand_hash()} 145 + 146 + _ -> 147 + {:write, rand_hash(), rand_hash()} 148 + end 158 149 end 159 150 160 151 defp inc_version(%Verifier{version: version} = verifier) do ··· 204 195 Enum.map(1..300, fn s -> 205 196 Verifier.new({100 + s, 101, 102}) 206 197 |> Verifier.run(1000) 207 - end) 208 - end 209 - end 210 - 211 - describe "HybridKV" do 212 - setup %{kv: kv} do 213 - HybridKV.put(kv, 1, "foo", "bar") 214 - HybridKV.put(kv, 2, "foo", "bar_2") 215 - HybridKV.put(kv, 3, "foo", "bar_3") 216 - 217 - HybridKV.put(kv, 2, "foo_b", "bar_b_2") 218 - 219 - HybridKV.put(kv, 3, "foo_c", "bar_c_3") 220 - HybridKV.put(kv, 3, "foo_d", "bar_d_3") 221 - 222 - HybridKV.delete(kv, 4, "foo_d") 223 - HybridKV.put(kv, 5, "foo_e", "bar_e_5") 224 - :ok 225 - end 226 - 227 - test "get", %{kv: kv} do 228 - assert HybridKV.get(kv, 1, "foo") == "bar" 229 - assert HybridKV.get(kv, 2, "foo") == "bar_2" 230 - assert HybridKV.get(kv, 3, "foo") == "bar_3" 231 - 232 - assert kv = HybridKV.flush(kv, 2) 233 - 234 - assert HybridKV.get(kv, 1, "foo") == "bar_2" 235 - assert HybridKV.get(kv, 2, "foo") == "bar_2" 236 - assert HybridKV.get(kv, 3, "foo") == "bar_3" 237 - 238 - assert HybridKV.get(kv, 3, "foo_d") == "bar_d_3" 239 - assert HybridKV.get(kv, 4, "foo_d") == nil 240 - end 241 - 242 - test "scan", %{kv: kv} do 243 - assert %RangeResult{pairs: [ 244 - {"foo", "bar"}, 245 - ], more: false} = HybridKV.scan(kv, 1, "foo", "foo_z") 246 - 247 - assert kv = HybridKV.flush(kv, 2) 248 - 249 - assert %RangeResult{pairs: [ 250 - {"foo", "bar_2"}, 251 - {"foo_b", "bar_b_2"}, 252 - ], more: false} = HybridKV.scan(kv, 1, "foo", "foo_z") 253 - 254 - assert %RangeResult{pairs: [ 255 - {"foo", "bar_2"}, 256 - {"foo_b", "bar_b_2"}, 257 - ], more: false} = HybridKV.scan(kv, 2, "foo", "foo_z") 258 - 259 - assert %RangeResult{pairs: [ 260 - {"foo", "bar_3"}, 261 - {"foo_b", "bar_b_2"}, 262 - {"foo_c", "bar_c_3"}, 263 - {"foo_d", "bar_d_3"}, 264 - ], more: false} = HybridKV.scan(kv, 3, "foo", "foo_z") 265 - 266 - assert %RangeResult{pairs: [ 267 - {"foo", "bar_3"}, 268 - {"foo_b", "bar_b_2"}, 269 - {"foo_c", "bar_c_3"}, 270 - ], more: false} = HybridKV.scan(kv, 4, "foo", "foo_z") 271 - 272 - kv = HybridKV.flush(kv, 4) 273 - 274 - assert %RangeResult{pairs: [ 275 - {"foo", "bar_3"}, 276 - {"foo_b", "bar_b_2"}, 277 - {"foo_c", "bar_c_3"}, 278 - ], more: false} = HybridKV.scan(kv, 4, "foo", "foo_z") 279 - end 280 - 281 - test "scan limit", %{kv: kv} do 282 - assert kv = HybridKV.flush(kv, 2) 283 - 284 - assert %RangeResult{pairs: [ 285 - {"foo", "bar_3"}, 286 - {"foo_b", "bar_b_2"}, 287 - ], more: true} = HybridKV.scan(kv, 3, "foo", "foo_z", limit: 2) 288 - end 289 - 290 - test "scan deleted limit", %{kv: kv} do 291 - Enum.each(0..5, fn ver -> 292 - kv = case ver == 0 do 293 - true -> HybridKV.flush(kv, ver) 294 - false -> kv 295 - end 296 - 297 - assert %RangeResult{pairs: [ 298 - {"foo", "bar_3"}, 299 - {"foo_b", "bar_b_2"}, 300 - {"foo_c", "bar_c_3"}, 301 - {"foo_e", "bar_e_5"}, 302 - ], more: false} = HybridKV.scan(kv, 5, "foo", "foo_z", limit: 4) 303 - end) 304 - end 305 - 306 - test "scan deleted limit with more", %{kv: kv} do 307 - HybridKV.put(kv, 5, "foo_f", "bar_f_5") 308 - 309 - Enum.each(0..5, fn ver -> 310 - kv = case ver == 0 do 311 - true -> HybridKV.flush(kv, ver) 312 - false -> kv 313 - end 314 - 315 - assert %RangeResult{pairs: [ 316 - {"foo", "bar_3"}, 317 - {"foo_b", "bar_b_2"}, 318 - {"foo_c", "bar_c_3"}, 319 - {"foo_e", "bar_e_5"}, 320 - ], more: true} = HybridKV.scan(kv, 5, "foo", "foo_z", limit: 4) 321 198 end) 322 199 end 323 200 end
+48
test/kv/mutation_log_test.exs
··· 1 + defmodule Hobbes.KV.MutationLogTest do 2 + use ExUnit.Case, async: true 3 + 4 + alias Hobbes.KV.MutationLog 5 + 6 + @moduletag :mutation_log 7 + 8 + setup do 9 + %{ml: MutationLog.new()} 10 + end 11 + 12 + describe "mutation log" do 13 + test "logs mutations", %{ml: ml} do 14 + MutationLog.insert(ml, 0, [{:write, "k0", "v0"}]) 15 + MutationLog.insert(ml, 1, [{:write, "k1", "v1"}]) 16 + MutationLog.insert(ml, 3, [{:write, "k3", "v3"}]) 17 + MutationLog.insert(ml, 4, [{:write, "k4", "v4"}]) 18 + 19 + assert MutationLog.pop_up_to(ml, 1) == [ 20 + {0, [{:write, "k0", "v0"}]}, 21 + {1, [{:write, "k1", "v1"}],}, 22 + ] 23 + assert MutationLog.pop_up_to(ml, 1) == [] 24 + assert MutationLog.pop_up_to(ml, 2) == [] 25 + 26 + assert MutationLog.pop_up_to(ml, 4) == [ 27 + {3, [{:write, "k3", "v3"}]}, 28 + {4, [{:write, "k4", "v4"}],}, 29 + ] 30 + 31 + assert MutationLog.dump(ml) == [] 32 + end 33 + 34 + test "appends", %{ml: ml} do 35 + MutationLog.append(ml, 0, [{:write, "k", "1"}]) 36 + MutationLog.append(ml, 0, [{:write, "k", "2"}]) 37 + MutationLog.append(ml, 0, [{:write, "k", "3"}]) 38 + 39 + MutationLog.insert(ml, 1, [{:write, "k", "1"}]) 40 + MutationLog.append(ml, 1, [{:write, "k", "2"}]) 41 + 42 + assert MutationLog.dump(ml) == [ 43 + {0, [{:write, "k", "1"}, {:write, "k", "2"}, {:write, "k", "3"}]}, 44 + {1, [{:write, "k", "1"}, {:write, "k", "2"}]}, 45 + ] 46 + end 47 + end 48 + end