this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use mutation log to remove keys from MemKV instead of scanning

garrison 48147269 c014f0d6

+59 -6
+2
ROADMAP.md
··· 94 94 - [X] Optimize CommitBuffer to use unversioned shard map with pre-computed tlogs and tags 95 95 - CommitBuffer reductions on bench_rw went from 85 million down to 25 million! 96 96 - Txn/s went from about 24.8k to about 25.8k! (obviously CommitBuffer is not the bottleneck) 97 + - [X] Clear keys from MemKV using mutation log instead of full scan 98 + - Txn/s went from 26k to 29k! Apparently storage flush was slowing things down. 97 99 98 100 99 101 ### Construct
+14 -6
lib/hybrid_kv.ex
··· 292 292 end 293 293 294 294 @doc """ 295 - Flushes keys/values with a version <= `version` to unversioned storage. 295 + Flushes mutations with a version <= `version` to unversioned storage. 296 296 """ 297 297 @spec flush(%HybridKV{}, non_neg_integer) :: %HybridKV{} 298 298 def flush(%HybridKV{storage_module: s_mod, storage_kv: s_kv} = kv, version) when is_integer(version) and version >= 0 do 299 299 batches = MutationLog.pop_up_to(kv.mutation_log, version) 300 300 301 - Enum.each(batches, fn {_v, mutations} -> 301 + mem_kv = kv.mem_kv 302 + Enum.each(batches, fn {ver, mutations} -> 302 303 Enum.each(mutations, fn 303 - {:write, k, v} -> s_mod.put(s_kv, k, v) 304 - {:clear, k} -> s_mod.delete(s_kv, k) 305 - {:clear_range, sk, ek} -> s_mod.delete_range(s_kv, sk, ek) 304 + {:write, k, v} -> 305 + MemKV.remove_key_at_version(mem_kv, ver, k) 306 + s_mod.put(s_kv, k, v) 307 + 308 + {:clear, k} -> 309 + MemKV.remove_key_at_version(mem_kv, ver, k) 310 + s_mod.delete(s_kv, k) 311 + 312 + {:clear_range, sk, ek} -> 313 + MemKV.remove_range_at_version(mem_kv, ver, sk, ek) 314 + s_mod.delete_range(s_kv, sk, ek) 306 315 end) 307 316 end) 308 317 309 318 {_ranges, deleted_forest} = RangeForest.flush(kv.deleted_forest, version, kv.flushed_version + 1) 310 - MemKV.flush(kv.mem_kv, version) 311 319 312 320 %{kv | deleted_forest: deleted_forest, flushed_version: version} 313 321 end
+43
lib/mem_kv.ex
··· 6 6 7 7 alias Hobbes.Structs.RangeResult 8 8 9 + @type t :: :ets.table 10 + 9 11 @spec new :: :ets.table 10 12 def new do 11 13 :ets.new(:mem_kv, [:ordered_set, :private]) ··· 236 238 237 239 _ -> 238 240 acc 241 + end 242 + end 243 + 244 + @spec remove_key_at_version(t, non_neg_integer, binary) :: :ok 245 + def remove_key_at_version(table, version, key) do 246 + :ets.delete(table, {key, version}) 247 + :ok 248 + end 249 + 250 + @spec remove_range_at_version(t, non_neg_integer, binary, binary) :: :ok 251 + def remove_range_at_version(table, version, start_key, end_key) do 252 + :ets.delete(table, {start_key, version}) 253 + do_scan_remove(table, version, end_key, {start_key, :infinity}) 254 + :ok 255 + end 256 + 257 + defp do_scan_remove(table, version, end_key, prev) do 258 + case :ets.next(table, prev) do 259 + {k, v} = full_key -> 260 + case k < end_key do 261 + true -> 262 + cond do 263 + v < version -> 264 + # This is a new key, try to skip to version 265 + # TODO: if we always flush in order is this unreachable? 266 + do_scan_remove(table, version, end_key, {k, version - 1}) 267 + v == version -> 268 + # This is the version, delete it 269 + :ets.delete(table, full_key) 270 + # Skip straight to the next key 271 + do_scan_remove(table, version, end_key, {k, :infinity}) 272 + true -> 273 + # v > version 274 + # Skip straight to the next key 275 + do_scan_remove(table, version, end_key, {k, :infinity}) 276 + end 277 + 278 + false -> :ok 279 + end 280 + 281 + :"$end_of_table" -> :ok 239 282 end 240 283 end 241 284