this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Prevent compacting memtables with versions past a max_persist_version

garrison a2dfff6e b27f571b

+57 -13
+11 -3
lib/xks/fuzz/model_fuzz.ex
··· 82 82 end 83 83 84 84 defp execute(:apply_batch, %State{} = state) do 85 - %{version: version, opts: opts} = state 85 + %{ 86 + xks: xks, 87 + simple_kv: simple_kv, 88 + version: version, 89 + opts: opts, 90 + } = state 86 91 partition = 0 87 92 88 93 batch = make_batch(opts.key_bits, Enum.random(opts.batch_size_range), []) 89 94 90 - XKS.apply_batch(state.xks, partition, version, batch) 91 - simple_kv = SimpleKV.apply_batch(state.simple_kv, version, batch) 95 + XKS.apply_batch(xks, partition, version, batch) 96 + simple_kv = SimpleKV.apply_batch(simple_kv, version, batch) 97 + 98 + max_pv = Enum.random(:atomics.get(xks.max_persist_version_atomic, 1)..version) 99 + XKS.set_max_persist_version(xks, max_pv) 92 100 93 101 %{state | simple_kv: simple_kv} 94 102 end
+27 -7
lib/xks/work_queue.ex
··· 1 1 defmodule Hobbes.XKS.WorkQueue do 2 2 alias Hobbes.XKS.Manifest.MemtableInfo 3 3 4 + import ExUnit.Assertions, only: [assert: 1] 5 + 4 6 @type t :: :ets.table 5 7 6 - @type job :: {:compact_memtable, integer, non_neg_integer} | {:compact_level, integer, non_neg_integer} 8 + @type job :: {:compact_memtable, integer, non_neg_integer, non_neg_integer} | {:compact_level, integer, non_neg_integer} 7 9 8 10 @queue_space :queue 9 11 @index_space :index ··· 21 23 :ok 22 24 end 23 25 24 - @spec pop_job(t) :: {:ok, job} | :error 25 - def pop_job(queue) do 26 - case :ets.next_lookup(queue, {@queue_space, -1, -1}) do 27 - {{:queue, _priority, _i} = key, [{_key, job}]} -> 26 + @spec pop_job(t, non_neg_integer) :: {:ok, job} | :error 27 + def pop_job(queue, max_persist_version) do 28 + case first_allowed_job(queue, max_persist_version, {@queue_space, -1, -1}) do 29 + {:ok, key, job} -> 28 30 :ets.delete(queue, key) 29 31 :ets.delete(queue, {@index_space, job}) 30 32 {:ok, job} 33 + :error -> 34 + :error 35 + end 36 + end 37 + 38 + defp first_allowed_job(queue, max_persist_version, prev_key) do 39 + case :ets.next_lookup(queue, prev_key) do 40 + {{@queue_space, _priority, _i} = key, [{_key, job}]} -> 41 + case allowed?(job, max_persist_version) do 42 + true -> {:ok, key, job} 43 + false -> first_allowed_job(queue, max_persist_version, key) 44 + end 31 45 _ -> 32 46 :error 33 47 end 34 48 end 35 49 50 + defp allowed?({:compact_memtable, _partition, _epoch_min, last_version}, max_persist_version), do: last_version <= max_persist_version 51 + defp allowed?({:compact_level, _partition, _level}, _max_persist_version), do: true 52 + 36 53 @spec enqueue_compact_memtable(t, MemtableInfo.t) :: :ok 37 54 def enqueue_compact_memtable(queue, %MemtableInfo{} = info) do 38 - job = {:compact_memtable, info.partition, info.epoch_min} 55 + assert info.mutable == false 56 + assert info.last_version != nil 57 + 58 + job = {:compact_memtable, info.partition, info.epoch_min, info.last_version} 39 59 40 60 # TODO: not necessary to check for a memtable 41 61 case enqueued?(queue, job) do ··· 61 81 :ets.insert(queue, {{@index_space, job}, i}) 62 82 end 63 83 64 - defp priority({:compact_memtable, _partition, _epoch_min}), do: 1 84 + defp priority({:compact_memtable, _partition, _epoch_min, _last_version}), do: 1 65 85 defp priority({:compact_level, _partition, _level}), do: 2 66 86 67 87 @spec next_queue_i(t) :: non_neg_integer
+18 -2
lib/xks/xks.ex
··· 24 24 25 25 epoch_atomic: :atomics.atomics_ref, 26 26 version_atomic: :atomics.atomics_ref, 27 + max_persist_version_atomic: :atomics.atomics_ref, 27 28 } 28 29 @enforce_keys [ 29 30 :opts, ··· 41 42 42 43 :epoch_atomic, 43 44 :version_atomic, 45 + :max_persist_version_atomic, 44 46 ] 45 47 defstruct @enforce_keys 46 48 ··· 112 114 manifest_log_tail_address: nil, 113 115 epoch_atomic: :atomics.new(1, signed: false), 114 116 version_atomic: :atomics.new(1, signed: false), 117 + max_persist_version_atomic: :atomics.new(1, signed: false), 115 118 } 116 119 117 120 # Init default partition ··· 152 155 :atomics.put(epoch_atomic, 1, epoch) 153 156 version_atomic = :atomics.new(1, signed: false) 154 157 :atomics.put(version_atomic, 1, version) 158 + max_persist_version_atomic = :atomics.new(1, signed: false) 159 + :atomics.put(max_persist_version_atomic, 1, version) 155 160 156 161 xks = %XKS{ 157 162 opts: opts, ··· 170 175 171 176 epoch_atomic: epoch_atomic, 172 177 version_atomic: version_atomic, 178 + max_persist_version_atomic: max_persist_version_atomic, 173 179 } 174 180 xks = struct!(xks, xks_pairs) 175 181 ··· 260 266 :ok 261 267 end 262 268 269 + @spec set_max_persist_version(t, non_neg_integer) :: :ok 270 + def set_max_persist_version(%XKS{} = xks, version) do 271 + %{max_persist_version_atomic: max_persist_version_atomic} = xks 272 + assert version >= :atomics.get(max_persist_version_atomic, 1) 273 + :atomics.put(max_persist_version_atomic, 1, version) 274 + end 275 + 276 + @spec garbage_collect_tables(t) :: :ok 263 277 def garbage_collect_tables(%XKS{} = xks) do 264 278 %XKS{ 265 279 block_store: block_store, ··· 280 294 end 281 295 ManifestLog.delete_table(manifest_log, table_info) 282 296 end) 297 + :ok 283 298 end 284 299 285 300 @spec rotate_memtable(t, integer) :: :ok ··· 321 336 322 337 @spec perform_compaction(t) :: :ok | :error 323 338 def perform_compaction(%XKS{} = xks) do 324 - case WorkQueue.pop_job(xks.work_queue) do 325 - {:ok, {:compact_memtable, partition, min_epoch}} -> 339 + max_pv = :atomics.get(xks.max_persist_version_atomic, 1) 340 + case WorkQueue.pop_job(xks.work_queue, max_pv) do 341 + {:ok, {:compact_memtable, partition, min_epoch, _last_version}} -> 326 342 info = Manifest.get_memtable_from_min_epoch(xks.manifest, partition, min_epoch) 327 343 compact_memtable(xks, info) 328 344 :ok
+1 -1
test/xks_test.exs
··· 126 126 for {sub_kib, block_kib} <- @sizes do 127 127 Task.async(fn -> 128 128 Hobbes.XKS.Fuzz.ModelFuzz.run(100, [ 129 - iterations: 500, 129 + iterations: 1000, 130 130 #key_bits: 32, 131 131 xks_opts: [ 132 132 block_size: block_kib * 1024,