this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add merge iterator and multiversion scan to btree

garrison c5551041 52e19601

+177 -6
+34 -1
lib/btree/fuzz/model_fuzz.ex
··· 74 74 case Enum.random(1..20) do 75 75 1 -> :commit 76 76 _ -> 77 - case Enum.random(1..2) do 77 + case Enum.random(1..3) do 78 78 1 -> :apply_batch 79 79 2 -> :get 80 + 3 -> :scan 80 81 end 81 82 end 82 83 end ··· 138 139 state 139 140 end 140 141 142 + defp execute(:scan, %State{} = state) do 143 + %{ 144 + simple_kv: simple_kv, 145 + btree: btree, 146 + version: version, 147 + opts: %{ 148 + key_bits: opt_key_bits, 149 + }, 150 + } = state 151 + 152 + read_version = Enum.random(max(version - 5000, 0)..version) 153 + {start_key, end_key} = make_range(opt_key_bits) 154 + reverse? = false 155 + limit = Enum.random(1..10) 156 + 157 + btree_result = BTree.Iterator.scan(btree, read_version, start_key, end_key, reverse?, limit) 158 + skv_result = SimpleKV.scan(simple_kv, read_version, start_key, end_key, reverse?, limit) 159 + assert btree_result == skv_result 160 + 161 + state 162 + end 163 + 141 164 defp make_batch(opts) do 142 165 opt_key_bits = opts.key_bits 143 166 batch_size = Enum.random(opts.batch_size_range) ··· 158 181 key = make_key(opt_key_bits) 159 182 value = String.duplicate(String.reverse(key), 2) 160 183 {:write, key, value} 184 + end 185 + end 186 + 187 + defp make_range(bits) do 188 + key1 = make_key(bits) 189 + key2 = make_key(bits) 190 + cond do 191 + key1 < key2 -> {key1, key2} 192 + key1 > key2 -> {key2, key1} 193 + key1 == key2 -> {key1, key2 <> "\x00"} 161 194 end 162 195 end 163 196
+17
lib/btree/fuzz/simple_kv.ex
··· 16 16 Map.get(kv, key) 17 17 end 18 18 19 + @spec scan(t, non_neg_integer, binary, binary, boolean, non_neg_integer | :infinity) :: [{binary, binary}] 20 + def scan(simple_kv, version, start_key, end_key, reverse?, limit) do 21 + {versions, kvs_by_version} = simple_kv 22 + kv = Map.fetch!(kvs_by_version, find_version(versions, version)) 23 + 24 + kv 25 + |> Enum.filter(fn {k, _v} -> k >= start_key and k < end_key end) 26 + |> Enum.sort_by(fn {k, _v} -> k end, sort_direction(reverse?)) 27 + |> take_limit(limit) 28 + end 29 + 30 + defp sort_direction(true), do: :desc 31 + defp sort_direction(false), do: :asc 32 + 33 + defp take_limit(pairs, :infinity), do: pairs 34 + defp take_limit(pairs, limit), do: Enum.take(pairs, limit) 35 + 19 36 # Finds the largest version <= `target_version` 20 37 # Invariants: 21 38 # - versions list is always in descending order
+126 -5
lib/btree/iterator.ex
··· 35 35 _ -> 36 36 it = new(btree, key, false) 37 37 case it.current_pair do 38 - {^key, value} -> value 38 + {[^key | _ver], value} -> value 39 39 _ -> nil 40 40 end 41 41 end 42 42 end 43 43 44 + @spec scan(BTree.t, non_neg_integer, binary, binary, boolean, non_neg_integer) :: [{binary, binary}] 45 + def scan(%BTree{} = btree, version, start_key, end_key, reverse?, limit) do 46 + merge_it = new_merge_iterator(btree, start_key, reverse?) 47 + 48 + case reverse? do 49 + false -> start_scan_forward(merge_it, version, end_key, limit) 50 + end 51 + |> Enum.reverse() 52 + end 53 + 54 + defp start_scan_forward(:empty = _merge_it, _version, _end_key, _limit), do: [] 55 + defp start_scan_forward(merge_it, version, end_key, limit) do 56 + {_, _, {[key | ver], val}} = merge_it 57 + merge_it = merge_forward(merge_it) 58 + 59 + cond do 60 + key >= end_key -> 61 + # Reached the end of the requested range without ever finding a visible pair 62 + [] 63 + 64 + ver > version -> 65 + # This pair is not visible because its version is too new, skip it 66 + start_scan_forward(merge_it, version, end_key, limit) 67 + 68 + true -> 69 + # This pair is visible, start scanning with it 70 + do_scan_forward(merge_it, version, end_key, limit, key, ver, val, [], 0) 71 + end 72 + end 73 + 74 + defp finish_scan_forward(_prev_key, :tombstone = _prev_val, acc), do: acc 75 + defp finish_scan_forward(prev_key, prev_val, acc), do: [{prev_key, prev_val} | acc] 76 + 77 + defp do_scan_forward(:empty = _merge_it, _version, _end_key, _limit, prev_key, _prev_ver, prev_val, acc, _count) do 78 + finish_scan_forward(prev_key, prev_val, acc) 79 + end 80 + 81 + defp do_scan_forward(merge_it, version, end_key, limit, prev_key, prev_ver, prev_val, acc, count) do 82 + {_, _, {[key | ver], val}} = merge_it 83 + merge_it = merge_forward(merge_it) 84 + 85 + cond do 86 + key >= end_key -> 87 + # Reached the end of the requested range 88 + finish_scan_forward(prev_key, prev_val, acc) 89 + 90 + ver > version -> 91 + # This pair is not visible because its version is too new, skip it 92 + do_scan_forward(merge_it, version, end_key, limit, prev_key, prev_ver, prev_val, acc, count) 93 + 94 + key == prev_key -> 95 + # This is a newer (but visible) version of the same key, overwrite 96 + do_scan_forward(merge_it, version, end_key, limit, key, ver, val, acc, count) 97 + 98 + prev_val == :tombstone -> 99 + # This is a new key but the previous value was a tombstone, overwrite 100 + do_scan_forward(merge_it, version, end_key, limit, key, ver, val, acc, count) 101 + 102 + true -> 103 + # This pair has a new key, so it is now safe to accumulate the previous key 104 + # and then check the limit 105 + # 106 + # Note: *both* sides of this branch accumulate `prev_key`/`prev_val` 107 + count = count + 1 108 + case count < limit do 109 + true -> 110 + # We are under the limit, accumulate and keep going 111 + acc = [{prev_key, prev_val} | acc] 112 + do_scan_forward(merge_it, version, end_key, limit, key, ver, val, acc, count) 113 + false -> 114 + # We have reached the limit, accumulate the last pair and finish 115 + finish_scan_forward(prev_key, prev_val, acc) 116 + end 117 + end 118 + end 119 + 120 + defp new_merge_iterator(%BTree{} = btree, start_key, reverse?) do 121 + %{ 122 + versioned_tree: versioned_tree, 123 + } = btree 124 + 125 + storage_it = new(btree, start_key, reverse?) 126 + versioned_it = new_versioned_iterator(versioned_tree, start_key, reverse?) 127 + merge_it = {storage_it, versioned_it, nil} 128 + 129 + case reverse? do 130 + false -> merge_forward(merge_it) 131 + end 132 + end 133 + 134 + defp merge_forward({storage_it, versioned_it, _current}) do 135 + {[storage_key | _ver], _value} = storage_pair = storage_it.current_pair 136 + {_vt, _rv?, {[versioned_key | _ver], _value} = versioned_pair} = versioned_it 137 + 138 + cond do 139 + storage_key == :infinity and versioned_key == :infinity -> 140 + :empty 141 + 142 + versioned_key == :infinity -> {next_forward(storage_it), versioned_it, storage_pair} 143 + storage_key == :infinity -> {storage_it, versioned_forward(versioned_it), versioned_pair} 144 + storage_key <= versioned_key -> {next_forward(storage_it), versioned_it, storage_pair} 145 + true -> {storage_it, versioned_forward(versioned_it), versioned_pair} 146 + end 147 + end 148 + 149 + defp new_versioned_iterator(versioned_tree, start_key, reverse?) do 150 + case reverse? do 151 + true -> raise "todo" 152 + false -> versioned_forward({versioned_tree, false, {[start_key | -1], nil}}) 153 + end 154 + end 155 + 156 + defp versioned_forward({versioned_tree, reverse?, {current_key, _value}}) do 157 + case :ets.next_lookup(versioned_tree, current_key) do 158 + {_key, [pair]} -> 159 + {versioned_tree, reverse?, pair} 160 + :"$end_of_table" -> 161 + {versioned_tree, reverse?, {[:infinity | 0], :infinity}} 162 + end 163 + end 164 + 44 165 @spec new(BTree.t, binary, boolean) :: t 45 166 def new(%BTree{} = btree, start_key, reverse?) do 46 167 %{ ··· 203 324 >> = page_data 204 325 205 326 case (key >= search_key) or (i == (pair_count - 1)) do 206 - true -> {i, {key, value}} 327 + true -> {i, {[key | 0], value}} 207 328 false -> do_seek_leaf(page_data, slots_start, pair_count, search_key, i + 1) 208 329 end 209 330 end ··· 253 374 case it.page_stack do 254 375 [] -> 255 376 %{it | 256 - current_pair: :infinity, 377 + current_pair: {[:infinity | 0], :infinity}, 257 378 } 258 379 259 380 [{page, i} | stack_rest] -> ··· 307 428 case it.page_stack do 308 429 [] -> 309 430 %{it | 310 - current_pair: :infinity, 431 + current_pair: {[:infinity | 0], :infinity}, 311 432 } 312 433 313 434 [{page, i} | stack_rest] -> ··· 356 477 _rest::binary, 357 478 >> = page_data 358 479 359 - {key, value} 480 + {[key | 0], value} 360 481 end 361 482 end