this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Persist storage server KV data to simulated file storage

+112 -32
+30 -21
lib/hybrid_kv.ex
··· 1 1 defmodule Hobbes.HybridKV do 2 2 alias Hobbes.{HybridKV, MemKV, RangeForest, Utils} 3 3 alias Hobbes.RangeForest.RangeTree 4 - alias Hobbes.KV.FlatKV 4 + alias Hobbes.KV.{FlatKV, FlatStorageKV} 5 5 alias Hobbes.Structs.RangeResult 6 6 7 7 @type t :: %__MODULE__{ 8 8 mem_kv: term, 9 - storage_kv: term, 9 + storage_module: module, 10 + storage_kv: FlatKV.t | FlatStorageKV.t, 10 11 deleted_forest: term, 11 12 flushed_version: non_neg_integer, 12 13 } 13 14 14 - @enforce_keys [:mem_kv, :storage_kv, :deleted_forest, :flushed_version] 15 + @enforce_keys [:mem_kv, :storage_module, :storage_kv, :deleted_forest, :flushed_version] 15 16 defstruct @enforce_keys 16 17 17 - def new do 18 + def new(opts \\ []) do 19 + {storage_module, storage_kv} = 20 + case Keyword.get(opts, :path) do 21 + nil -> {FlatKV, FlatKV.new()} 22 + path when is_binary(path) -> {FlatStorageKV, FlatStorageKV.new(path)} 23 + end 24 + 18 25 %HybridKV{ 19 26 mem_kv: MemKV.new(), 20 - storage_kv: FlatKV.new(), 27 + storage_module: storage_module, 28 + storage_kv: storage_kv, 21 29 deleted_forest: RangeForest.new(), 22 30 flushed_version: 0, 23 31 } ··· 54 62 nil -> 55 63 # There are no overlapping range deletes 56 64 case MemKV.get(kv.mem_kv, version, 0, key) do 57 - nil -> FlatKV.get(kv.storage_kv, key) 65 + nil -> kv.storage_module.get(kv.storage_kv, key) 58 66 :deleted -> nil 59 67 value -> value 60 68 end ··· 129 137 [] -> end_key 130 138 end 131 139 132 - storage_result = FlatKV.scan(kv.storage_kv, start_key, stop_key, limit: limit) 140 + storage_result = kv.storage_module.scan(kv.storage_kv, start_key, stop_key, limit: limit) 133 141 {_start, storage_end_key} = storage_result.range 134 142 135 143 # Read mem only up to the end of the range scanned by storage ··· 196 204 [] -> start_key 197 205 end 198 206 199 - storage_result = FlatKV.scan(kv.storage_kv, stop_key, end_key, limit: limit, reverse: true) 207 + storage_result = kv.storage_module.scan(kv.storage_kv, stop_key, end_key, limit: limit, reverse: true) 200 208 {storage_start_key, _end_key} = storage_result.range 201 209 202 210 mem_result = MemKV.scan(kv.mem_kv, version, 0, storage_start_key, end_key, limit: limit, reverse: true) ··· 270 278 Flushes keys/values with a version <= `version` to unversioned storage. 271 279 """ 272 280 @spec flush(%HybridKV{}, non_neg_integer) :: %HybridKV{} 273 - def flush(%HybridKV{} = kv, version) when is_integer(version) and version >= 0 do 281 + def flush(%HybridKV{storage_module: s_mod, storage_kv: s_kv} = kv, version) when is_integer(version) and version >= 0 do 274 282 {ranges, deleted_forest} = RangeForest.flush(kv.deleted_forest, version, kv.flushed_version + 1) 275 283 276 284 # Gather all mutations (writes, deletes, and range deletes) and batch them together ··· 286 294 287 295 batches = Utils.interleave_batches(pairs_per_version, ranges_per_version) 288 296 289 - storage_kv = kv.storage_kv 290 297 # Iterate through all batches and apply point writes first, 291 298 # then range deletes 292 299 # ··· 298 305 # to reduce complexity (performance would probably be better anyway) 299 306 Enum.each(batches, fn {_v, pairs, ranges} -> 300 307 Enum.each(pairs, fn 301 - {key, :deleted, _v} -> FlatKV.delete(storage_kv, key) 302 - {key, value, _v} -> FlatKV.put(storage_kv, key, value) 308 + {key, :deleted, _v} -> s_mod.delete(s_kv, key) 309 + {key, value, _v} -> s_mod.put(s_kv, key, value) 303 310 end) 304 311 305 312 Enum.each(ranges, fn {sk, ek, _v} -> 306 - FlatKV.clear_range(storage_kv, sk, ek) 313 + s_mod.delete_range(s_kv, sk, ek) 307 314 end) 308 315 end) 309 316 310 317 %HybridKV{kv | deleted_forest: deleted_forest, flushed_version: version} 311 318 end 312 319 313 - @spec put_storage(%HybridKV{}, binary, binary) :: :ok 314 - def put_storage(%HybridKV{} = kv, key, value) when is_binary(key) and is_binary(value) do 315 - FlatKV.put(kv.storage_kv, key, value) 320 + @spec commit(t) :: :ok 321 + def commit(%HybridKV{} = kv) do 322 + kv.storage_module.commit(kv.storage_kv) 316 323 end 317 324 318 325 @spec load_storage(%HybridKV{}, [{binary, binary}]) :: :ok 319 - def load_storage(%HybridKV{} = kv, pairs) when is_list(pairs) do 320 - FlatKV.load(kv.storage_kv, pairs) 326 + def load_storage(%HybridKV{storage_module: s_mod, storage_kv: s_kv}, pairs) when is_list(pairs) do 327 + Enum.each(pairs, fn {k, v} when is_binary(k) and is_binary(v) -> 328 + s_mod.put(s_kv, k, v) 329 + end) 321 330 end 322 331 323 332 @spec delete_range_storage(%HybridKV{}, binary, binary) :: :ok 324 - def delete_range_storage(%HybridKV{} = kv, start_key, end_key) do 325 - FlatKV.clear_range(kv.storage_kv, start_key, end_key) 333 + def delete_range_storage(%HybridKV{storage_module: s_mod, storage_kv: s_kv}, start_key, end_key) do 334 + s_mod.delete_range(s_kv, start_key, end_key) 326 335 end 327 336 328 337 @doc """ ··· 339 348 def dump(%HybridKV{} = kv) do 340 349 %{ 341 350 mem_kv: MemKV.dump(kv.mem_kv), 342 - storage_kv: FlatKV.dump(kv.storage_kv), 351 + storage_kv: kv.storage_module.dump(kv.storage_kv), 343 352 deleted_forest: RangeForest.dump(kv.deleted_forest), 344 353 } 345 354 end
+14
lib/kv.ex
··· 1 + defmodule Hobbes.KV do 2 + defmodule Storage do 3 + alias Hobbes.Structs.RangeResult 4 + 5 + @callback commit(term) :: :ok 6 + 7 + @callback put(term, binary, binary) :: :ok 8 + @callback delete(term, binary) :: :ok 9 + @callback delete_range(term, binary, binary) :: :ok 10 + 11 + @callback get(term, binary) :: binary | nil 12 + @callback scan(term, binary, binary, keyword) :: RangeResult.t 13 + end 14 + end
+7 -3
lib/kv/flat_kv.ex
··· 5 5 6 6 alias Hobbes.Structs.RangeResult 7 7 8 + @behaviour Hobbes.KV.Storage 9 + 8 10 @type t :: :ets.table 9 11 10 12 @spec new :: t ··· 17 19 true = :ets.insert(kv, pairs) 18 20 :ok 19 21 end 22 + 23 + def commit(_kv), do: :ok 20 24 21 25 @spec put(:ets.table, binary, binary) :: :ok 22 26 def put(kv, key, value) when is_binary(key) and is_binary(value) do ··· 30 34 :ok 31 35 end 32 36 33 - @spec clear_range(:ets.table, binary, binary) :: :ok 34 - def clear_range(kv, start_key, end_key) when is_binary(start_key) and is_binary(end_key) do 37 + @spec delete_range(:ets.table, binary, binary) :: :ok 38 + def delete_range(kv, start_key, end_key) when is_binary(start_key) and is_binary(end_key) do 35 39 :ets.select_delete(kv, [{ 36 40 {:"$1", :"$2"}, 37 41 [ ··· 140 144 :ets.delete_all_objects(kv) 141 145 end 142 146 143 - @doc false 147 + @spec dump(t) :: [{binary, binary}] 144 148 def dump(kv) do 145 149 :ets.tab2list(kv) 146 150 end
+53
lib/kv/flat_storage_kv.ex
··· 1 + defmodule Hobbes.KV.FlatStorageKV do 2 + alias Hobbes.Construct.SimFile 3 + 4 + alias Hobbes.KV.{FlatKV, FlatStorageKV} 5 + 6 + @behaviour Hobbes.KV.Storage 7 + 8 + @type t :: %__MODULE__{ 9 + path: String.t, 10 + kv: FlatKV.t, 11 + } 12 + @enforce_keys [:path, :kv] 13 + defstruct @enforce_keys 14 + 15 + @spec new(String.t) :: t 16 + def new(path) do 17 + kv = %FlatStorageKV{path: path, kv: FlatKV.new()} 18 + 19 + case SimFile.read(path) do 20 + {:ok, contents} -> 21 + pairs = decode(contents) 22 + FlatKV.load(kv.kv, pairs) 23 + 24 + {:error, :enoent} -> :noop 25 + end 26 + 27 + kv 28 + end 29 + 30 + def commit(%FlatStorageKV{kv: kv, path: path}) do 31 + contents = FlatKV.dump(kv) |> encode() 32 + :ok = SimFile.write(path, contents) 33 + :ok 34 + end 35 + 36 + def put(%{kv: kv}, key, value), do: FlatKV.put(kv, key, value) 37 + def delete(%{kv: kv}, key), do: FlatKV.delete(kv, key) 38 + def delete_range(%{kv: kv}, start_key, end_key), do: FlatKV.delete_range(kv, start_key, end_key) 39 + 40 + def get(%{kv: kv}, key), do: FlatKV.get(kv, key) 41 + def scan(%{kv: kv}, start_key, end_key, opts \\ []), do: FlatKV.scan(kv, start_key, end_key, opts) 42 + 43 + defp encode(pairs) when is_list(pairs), do: :erlang.term_to_binary(pairs, [:deterministic]) 44 + defp decode(contents) when is_binary(contents), do: :erlang.binary_to_term(contents, [:safe]) 45 + 46 + @doc false 47 + def dump_file(%{path: path}) do 48 + case SimFile.read(path) do 49 + {:ok, contents} -> decode(contents) 50 + {:error, _} -> nil 51 + end 52 + end 53 + end
+3 -3
lib/servers/storage.ex
··· 103 103 end 104 104 105 105 def init(%{id: id, cluster: cluster}) do 106 - kv = HybridKV.new() 106 + kv = HybridKV.new(path: "/#{Integer.to_string(id)}.storage_kv") 107 + 107 108 state = %State{ 108 109 id: id, 109 110 cluster: cluster, ··· 274 275 275 276 # Now that all shard clears are complete, we can flush the remaining versions 276 277 HybridKV.flush(kv, flush_version) 277 - 278 - # TODO: commit the storage changes here 278 + HybridKV.commit(kv) 279 279 280 280 # Send pops to tlogs 281 281 # Note: pop is a cast (async) because the result doesn't matter for correctness
+5 -5
test/kv/flat_kv_test.exs
··· 30 30 end 31 31 end 32 32 33 - describe "clear_range/3" do 33 + describe "delete_range/3" do 34 34 setup %{kv: kv} do 35 35 Enum.each(1..6, fn i -> 36 36 :ok = FlatKV.put(kv, "foo#{i}", "bar#{i}") ··· 39 39 end 40 40 41 41 test "clears at start", %{kv: kv} do 42 - assert :ok = FlatKV.clear_range(kv, "", "foo4") 42 + assert :ok = FlatKV.delete_range(kv, "", "foo4") 43 43 assert FlatKV.scan(kv, "", "zoo").pairs == [ 44 44 {"foo4", "bar4"}, 45 45 {"foo5", "bar5"}, ··· 48 48 end 49 49 50 50 test "clears at the end", %{kv: kv} do 51 - assert :ok = FlatKV.clear_range(kv, "foo4", "zoo") 51 + assert :ok = FlatKV.delete_range(kv, "foo4", "zoo") 52 52 assert FlatKV.scan(kv, "", "zoo").pairs == [ 53 53 {"foo1", "bar1"}, 54 54 {"foo2", "bar2"}, ··· 57 57 end 58 58 59 59 test "clears in the middle", %{kv: kv} do 60 - assert :ok = FlatKV.clear_range(kv, "foo3", "foo5") 60 + assert :ok = FlatKV.delete_range(kv, "foo3", "foo5") 61 61 assert FlatKV.scan(kv, "", "zoo").pairs == [ 62 62 {"foo1", "bar1"}, 63 63 {"foo2", "bar2"}, ··· 67 67 end 68 68 69 69 test "clears all", %{kv: kv} do 70 - assert :ok = FlatKV.clear_range(kv, "", "zoo") 70 + assert :ok = FlatKV.delete_range(kv, "", "zoo") 71 71 assert FlatKV.scan(kv, "", "zoo").pairs == [] 72 72 end 73 73 end