this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Handle range deletes in HybridKV.scan

+127 -67
+2 -2
ROADMAP.md
··· 61 61 - [X] Implement clears in KVs 62 62 - [ ] Implement clears in the database 63 63 - [ ] Range clears 64 - - [ ] Implement range clears in KVs 64 + - [X] Implement range deletes in HybridKV 65 65 - [X] Add persistent tree (RangeForest) to hold deleted ranges 66 66 - [X] Handle deleted ranges in HybridKV.get/put 67 67 - [X] Handle deleted ranges in HybridKV.flush 68 - - [ ] Handle deleted ranges in HybridKV.scan 68 + - [X] Handle deleted ranges in HybridKV.scan 69 69 - [ ] Implement range clears in the database 70 70 - [ ] Recovery 71 71 - [ ] Monitor all transaction processes in Manager to detect failures
+84 -31
lib/hybrid_kv.ex
··· 68 68 limit = Keyword.get(opts, :limit, :infinity) 69 69 reverse = Keyword.get(opts, :reverse, false) 70 70 71 + deleted_ranges = 72 + kv.deleted_forest 73 + |> RangeForest.tree_at(version) 74 + |> RangeTree.intersect_range(kv.flushed_version, start_key, end_key) 75 + 71 76 read_limit = case limit do 72 77 :infinity -> :infinity 73 78 limit -> limit + 1 ··· 75 80 76 81 {pairs, count} = 77 82 case reverse do 78 - false -> do_scan(:forward, kv, version, start_key, end_key, read_limit, 0) 79 - true -> do_scan(:backward, kv, version, start_key, end_key, read_limit, 0) 83 + false -> do_scan(:forward, kv, deleted_ranges, version, start_key, end_key, read_limit, 0) 84 + true -> do_scan(:backward, kv, Enum.reverse(deleted_ranges), version, start_key, end_key, read_limit, 0) 80 85 end 81 86 82 87 # We over-read by 1 and then use the extra read ··· 90 95 end 91 96 92 97 # This exists purely as a sanity check and should be unreachable except for bugs 93 - defp do_scan(_direction, _kv, _version, _start_key, _end_key, _limit, 1000), do: raise "Scan caught in loop!" 98 + defp do_scan(_direction, _kv, _deleted_ranges, _version, _start_key, _end_key, _limit, 1000), do: raise "Scan caught in loop!" 94 99 95 100 # Abandon hope all ye who enter here 96 - defp do_scan(:forward, %HybridKV{} = kv, version, start_key, end_key, limit, scan_count) do 97 - storage_result = FlatKV.scan(kv.storage_kv, start_key, end_key, limit: limit) 98 - {_start, storage_end_key} = storage_result.range 101 + defp do_scan(:forward, %HybridKV{} = kv, deleted_ranges, version, start_key, end_key, limit, scan_count) do 102 + {merged_pairs, scanned_end_key, deleted_ranges} = 103 + case deleted_ranges do 104 + [{sk, ek, del_v} | rest] when sk <= start_key -> 105 + # We are "inside" a range clear, so we scan mem only to the end of min(ek, end_key) 106 + mem_result = MemKV.scan(kv.mem_kv, version, del_v + 1, start_key, min(ek, end_key), limit: limit) 107 + {_start, mem_end_key} = mem_result.range 108 + # Merge just to clear :deleted 109 + pairs = merge([], mem_result.pairs, false) 99 110 100 - # Read mem only up to the end of the range scanned by storage 101 - mem_result = MemKV.scan(kv.mem_kv, version, start_key, storage_end_key, limit: limit) 102 - {_start, mem_end_key} = mem_result.range 111 + # Note: it would be wrong to return "rest" here if we are still inside 112 + # the deleted range (due to hitting the limit), however if a mem scan 113 + # hits the limit then we are done scanning because there is nothing 114 + # for the tombstones to clear out, so we don't care 115 + {pairs, mem_end_key, rest} 103 116 104 - # Both KVs were scanned up to this key 105 - # Anything past this key was only scanned by storage and must be discarded 106 - scanned_end_key = min(storage_end_key, mem_end_key) 117 + _ -> 118 + # If there is a range delete ahead, we want to stop this hybrid scan before that 119 + # delete's start_key so that the next iteration will scan it mem only (the previous clause) 120 + stop_key = case deleted_ranges do 121 + [{sk, _ek, _del_v} | _] -> sk 122 + [] -> end_key 123 + end 124 + 125 + storage_result = FlatKV.scan(kv.storage_kv, start_key, stop_key, limit: limit) 126 + {_start, storage_end_key} = storage_result.range 127 + 128 + # Read mem only up to the end of the range scanned by storage 129 + mem_result = MemKV.scan(kv.mem_kv, version, 0, start_key, storage_end_key, limit: limit) 130 + {_start, mem_end_key} = mem_result.range 107 131 108 - merged_pairs = 109 - merge(storage_result.pairs, mem_result.pairs, false) 110 - # TODO: more efficient to do this in merge/3 111 - |> Enum.take_while(fn {k, _v} -> k < scanned_end_key end) 132 + # Both KVs were scanned up to this key 133 + # Anything past this key was only scanned by storage and must be discarded 134 + scanned_end_key = min(storage_end_key, mem_end_key) 135 + 136 + merged_pairs = 137 + merge(storage_result.pairs, mem_result.pairs, false) 138 + # TODO: more efficient to do this in merge/3 139 + |> Enum.take_while(fn {k, _v} -> k < scanned_end_key end) 140 + 141 + {merged_pairs, scanned_end_key, deleted_ranges} 142 + end 112 143 113 144 # TODO: compute count in merge/3 114 145 count = length(merged_pairs) ··· 129 160 # We got <limit pairs but we really did scan the full range 130 161 {merged_pairs, count} 131 162 false -> 132 - # We got <limit pairs and we did not scan the full range, meaning 133 - # the KVs hit the limit but then the :deleted tombstones cleared 134 - # out enough keys to bring us back under the limit, so we must 135 - # keep scanning 136 - {next_pairs, next_count} = do_scan(:forward, kv, version, scanned_end_key, end_key, limit - count, scan_count + 1) 163 + # We got <limit pairs and we have not yet scanned the full range, 164 + # so we need to keep scanning 165 + # 166 + # This could be because we hit the limit and then the :deleted 167 + # tombstones cleared out enough pairs to bring us back under, 168 + # or because we ran into a range delete and had to skip over it 169 + {next_pairs, next_count} = do_scan(:forward, kv, deleted_ranges, version, scanned_end_key, end_key, subtract_limit(limit, count), scan_count + 1) 137 170 {merged_pairs ++ next_pairs, count + next_count} 138 171 end 139 172 end ··· 141 174 142 175 # See :forward for comments, the :backward version is the same except key logic 143 176 # is inverted (deals with start_key instead of end_key) 144 - defp do_scan(:backward, %HybridKV{} = kv, version, start_key, end_key, limit, scan_count) do 145 - storage_result = FlatKV.scan(kv.storage_kv, start_key, end_key, limit: limit, reverse: true) 146 - {storage_start_key, _end_key} = storage_result.range 177 + defp do_scan(:backward, %HybridKV{} = kv, deleted_ranges, version, start_key, end_key, limit, scan_count) do 178 + {merged_pairs, scanned_start_key, deleted_ranges} = 179 + case deleted_ranges do 180 + [{sk, ek, del_v} | rest] when ek >= end_key -> 181 + mem_result = MemKV.scan(kv.mem_kv, version, del_v + 1, max(sk, start_key), end_key, limit: limit, reverse: true) 182 + {mem_start_key, _end_key} = mem_result.range 183 + pairs = merge([], mem_result.pairs, true) 184 + {pairs, mem_start_key, rest} 185 + 186 + _ -> 187 + stop_key = case deleted_ranges do 188 + [{_sk, ek, _del_v} | _] -> ek 189 + [] -> start_key 190 + end 147 191 148 - mem_result = MemKV.scan(kv.mem_kv, version, storage_start_key, end_key, limit: limit, reverse: true) 149 - {mem_start_key, _end_key} = mem_result.range 192 + storage_result = FlatKV.scan(kv.storage_kv, stop_key, end_key, limit: limit, reverse: true) 193 + {storage_start_key, _end_key} = storage_result.range 150 194 151 - scanned_start_key = max(storage_start_key, mem_start_key) 195 + mem_result = MemKV.scan(kv.mem_kv, version, 0, storage_start_key, end_key, limit: limit, reverse: true) 196 + {mem_start_key, _end_key} = mem_result.range 152 197 153 - merged_pairs = 154 - merge(storage_result.pairs, mem_result.pairs, true) 155 - |> Enum.take_while(fn {k, _v} -> k >= scanned_start_key end) 198 + scanned_start_key = max(storage_start_key, mem_start_key) 199 + 200 + merged_pairs = 201 + merge(storage_result.pairs, mem_result.pairs, true) 202 + |> Enum.take_while(fn {k, _v} -> k >= scanned_start_key end) 203 + 204 + {merged_pairs, scanned_start_key, deleted_ranges} 205 + end 156 206 157 207 count = length(merged_pairs) 158 208 ··· 168 218 true -> 169 219 {merged_pairs, count} 170 220 false -> 171 - {next_pairs, next_count} = do_scan(:backward, kv, version, start_key, scanned_start_key, limit - count, scan_count + 1) 221 + {next_pairs, next_count} = do_scan(:backward, kv, deleted_ranges, version, start_key, scanned_start_key, subtract_limit(limit, count), scan_count + 1) 172 222 {merged_pairs ++ next_pairs, count + next_count} 173 223 end 174 224 end 175 225 end 226 + 227 + defp subtract_limit(:infinity, _n), do: :infinity 228 + defp subtract_limit(limit, n), do: limit - n 176 229 177 230 @spec merge([{binary, binary}], [{binary, binary}], boolean) :: [{binary, binary}] 178 231 defp merge(list1, list2, reverse), do: do_merge(reverse, list1, list2, []) |> Enum.reverse()
+19 -18
lib/mem_kv.ex
··· 39 39 end 40 40 end 41 41 42 - @spec scan(:ets.table, non_neg_integer, binary, binary, keyword) :: RangeResult.t 43 - def scan(table, version, start_key, end_key, opts \\ []) do 42 + @spec scan(:ets.table, non_neg_integer, non_neg_integer, binary, binary, keyword) :: RangeResult.t 43 + def scan(table, version, floor_version, start_key, end_key, opts \\ []) 44 + when is_integer(version) and is_integer(floor_version) and is_binary(start_key) and is_binary(end_key) do 44 45 limit = Keyword.get(opts, :limit, :infinity) 45 46 reverse = Keyword.get(opts, :reverse, false) 46 47 ··· 51 52 52 53 # Note: atoms (e.g. :infinity) always sort larger than numbers 53 54 {pairs, count} = case reverse do 54 - false -> do_scan(:forward, table, version, end_key, read_limit, {start_key, -1}, [], 0) 55 - true -> do_scan(:backward, table, version, start_key, read_limit, {end_key, -1}, [], 0) 55 + false -> do_scan(:forward, table, version, floor_version, end_key, read_limit, {start_key, -1}, [], 0) 56 + true -> do_scan(:backward, table, version, floor_version, start_key, read_limit, {end_key, -1}, [], 0) 56 57 end 57 58 58 59 # We over-read by 1 (read_limit) and then use the extra key ··· 92 93 end 93 94 end 94 95 95 - defp do_scan(:forward, table, version, end_key, limit, {_, _} = prev, acc, count) do 96 + defp do_scan(:forward, table, version, floor_version, end_key, limit, {_, _} = prev, acc, count) do 96 97 case :ets.next(table, prev) do 97 98 {key, ver} = full_key when key < end_key -> 98 - case ver <= version do 99 + case ver >= floor_version and ver <= version do 99 100 true -> 100 101 [{^full_key, value}] = :ets.lookup(table, full_key) 101 102 102 103 case acc do 103 104 [] -> 104 105 acc = [{key, value} | acc] 105 - do_scan(:forward, table, version, end_key, limit, full_key, acc, count) 106 + do_scan(:forward, table, version, floor_version, end_key, limit, full_key, acc, count) 106 107 107 108 [{^key, _value} | rest] -> 108 109 # Same key, overwrite lower version 109 110 acc = [{key, value} | rest] 110 - do_scan(:forward, table, version, end_key, limit, full_key, acc, count) 111 + do_scan(:forward, table, version, floor_version, end_key, limit, full_key, acc, count) 111 112 112 113 [{_key, :deleted} | _rest] -> 113 114 # Key boundary, last pair was :deleted 114 115 acc = [{key, value} | acc] 115 - do_scan(:forward, table, version, end_key, limit, full_key, acc, count) 116 + do_scan(:forward, table, version, floor_version, end_key, limit, full_key, acc, count) 116 117 117 118 [{_key, _value} | _rest] -> 118 119 # Key boundary, last pair was *not* :deleted ··· 120 121 case count < limit do 121 122 true -> 122 123 acc = [{key, value} | acc] 123 - do_scan(:forward, table, version, end_key, limit, full_key, acc, count) 124 + do_scan(:forward, table, version, floor_version, end_key, limit, full_key, acc, count) 124 125 false -> 125 126 {acc, count} 126 127 end ··· 128 129 129 130 false -> 130 131 # Version is unreadable, skip 131 - do_scan(:forward, table, version, end_key, limit, full_key, acc, count) 132 + do_scan(:forward, table, version, floor_version, end_key, limit, full_key, acc, count) 132 133 end 133 134 134 135 _ -> ··· 143 144 end 144 145 end 145 146 146 - defp do_scan(:backward, table, version, start_key, limit, {_, _} = prev, acc, count) do 147 + defp do_scan(:backward, table, version, floor_version, start_key, limit, {_, _} = prev, acc, count) do 147 148 case :ets.prev(table, prev) do 148 149 {key, ver} = full_key when key >= start_key -> 149 - case ver <= version do 150 + case ver >= floor_version and ver <= version do 150 151 true -> 151 152 [{^full_key, value}] = :ets.lookup(table, full_key) 152 153 153 154 case acc do 154 155 [] -> 155 156 acc = [{key, value}] 156 - do_scan(:backward, table, version, start_key, limit, full_key, acc, count) 157 + do_scan(:backward, table, version, floor_version, start_key, limit, full_key, acc, count) 157 158 158 159 [{^key, _value} | _rest] -> 159 160 # Same key, keep higher version 160 - do_scan(:backward, table, version, start_key, limit, full_key, acc, count) 161 + do_scan(:backward, table, version, floor_version, start_key, limit, full_key, acc, count) 161 162 162 163 [{_key, :deleted} | _rest] -> 163 164 # Key boundary, last pair was :deleted 164 165 acc = [{key, value} | acc] 165 - do_scan(:backward, table, version, start_key, limit, full_key, acc, count) 166 + do_scan(:backward, table, version, floor_version, start_key, limit, full_key, acc, count) 166 167 167 168 [{_key, _value} | _rest] -> 168 169 # Key boundary, last pair was *not* :deleted ··· 170 171 case count < limit do 171 172 true -> 172 173 acc = [{key, value} | acc] 173 - do_scan(:backward, table, version, start_key, limit, full_key, acc, count) 174 + do_scan(:backward, table, version, floor_version, start_key, limit, full_key, acc, count) 174 175 false -> 175 176 {acc, count} 176 177 end ··· 178 179 179 180 false -> 180 181 # Version is unreadable, skip 181 - do_scan(:backward, table, version, start_key, limit, full_key, acc, count) 182 + do_scan(:backward, table, version, floor_version, start_key, limit, full_key, acc, count) 182 183 end 183 184 184 185 _ ->
+1 -1
lib/meta_store.ex
··· 84 84 end 85 85 86 86 defp scan(kv, version, start_key, end_key, opts \\ []) do 87 - MemKV.scan(kv, version, @special_prefix <> start_key, @special_prefix <> end_key, opts) 87 + MemKV.scan(kv, version, 0, @special_prefix <> start_key, @special_prefix <> end_key, opts) 88 88 # TODO: remove this and return the real result 89 89 |> case do 90 90 %RangeResult{pairs: pairs} -> pairs
+8 -4
lib/range_forest.ex
··· 113 113 114 114 @spec insert_range(RangeTree.t, non_neg_integer, binary, binary) :: [{binary, binary, non_neg_integer}] 115 115 def intersect_range(tree, min_version, start_key, end_key) do 116 - case :gb_trees.smaller(start_key, tree) do 117 - {sk, {_ek, _v}} -> :gb_trees.iterator_from(sk, tree) 118 - :none -> :gb_trees.iterator_from(start_key, tree) 116 + acc = case :gb_trees.smaller(start_key, tree) do 117 + {sk, {ek, v}} when start_key < ek and v >= min_version -> 118 + [{sk, ek, v}] 119 + _ -> 120 + [] 119 121 end 120 - |> scan_intersect_range(end_key, min_version, []) 122 + 123 + :gb_trees.iterator_from(start_key, tree) 124 + |> scan_intersect_range(end_key, min_version, acc) 121 125 |> Enum.reverse() 122 126 end 123 127
+1 -1
test/hybrid_kv_test.exs
··· 64 64 ], limit: :infinity 65 65 end 66 66 67 - @ops [:put, :delete, :delete_range, :get] 67 + @ops [:put, :delete, :delete_range, :get, :scan] 68 68 defp random_op do 69 69 case Enum.random(1..100) do 70 70 1 -> :flush
+10 -10
test/mem_kv_test.exs
··· 120 120 {"foo_b", "bar_b"}, 121 121 {"foo_c", "bar_c"}, 122 122 {"foo_d", "bar_d"}, 123 - ], count: 4, range: {"foo_a", "foo_z"}, more: false} = MemKV.scan(kv, 1, "foo_a", "foo_z") 123 + ], count: 4, range: {"foo_a", "foo_z"}, more: false} = MemKV.scan(kv, 1, 0, "foo_a", "foo_z") 124 124 125 125 assert %RangeResult{pairs: [ 126 126 {"foo_a", "bar_a_2"}, 127 127 {"foo_b", "bar_b"}, 128 128 {"foo_c", "bar_c"}, 129 129 {"foo_d", "bar_d"}, 130 - ], count: 4, range: {"foo_a", "foo_z"}, more: false} = MemKV.scan(kv, 2, "foo_a", "foo_z") 130 + ], count: 4, range: {"foo_a", "foo_z"}, more: false} = MemKV.scan(kv, 2, 0, "foo_a", "foo_z") 131 131 132 132 assert %RangeResult{pairs: [ 133 133 {"foo_a", "bar_a_2"}, ··· 135 135 {"foo_c", "bar_c"}, 136 136 {"foo_d", :deleted}, 137 137 {"foo_e", "bar_e_3"}, 138 - ], count: 4, range: {"foo_a", "foo_z"}, more: false} = MemKV.scan(kv, 3, "foo_a", "foo_z") 138 + ], count: 4, range: {"foo_a", "foo_z"}, more: false} = MemKV.scan(kv, 3, 0, "foo_a", "foo_z") 139 139 end 140 140 141 141 test "limits forward", %{kv: kv} do ··· 143 143 {"foo_b", "bar_b_3"}, 144 144 {"foo_c", "bar_c"}, 145 145 {"foo_d", :deleted}, 146 - ], count: 2, range: {"foo_b", "foo_e"}, more: true} = MemKV.scan(kv, 3, "foo_b", "foo_z", limit: 2) 146 + ], count: 2, range: {"foo_b", "foo_e"}, more: true} = MemKV.scan(kv, 3, 0, "foo_b", "foo_z", limit: 2) 147 147 148 148 assert %RangeResult{pairs: [ 149 149 {"foo_b", "bar_b_3"}, 150 150 {"foo_c", "bar_c"}, 151 151 {"foo_d", :deleted}, 152 - ], count: 2, range: {"foo_b", "foo_e"}, more: false} = MemKV.scan(kv, 3, "foo_b", "foo_e", limit: 3) 152 + ], count: 2, range: {"foo_b", "foo_e"}, more: false} = MemKV.scan(kv, 3, 0, "foo_b", "foo_e", limit: 3) 153 153 end 154 154 155 155 test "scans backward", %{kv: kv} do ··· 158 158 {"foo_c", "bar_c"}, 159 159 {"foo_b", "bar_b"}, 160 160 {"foo_a", "bar_a"}, 161 - ], count: 4, range: {"foo_", "foo_e"}, more: false} = MemKV.scan(kv, 1, "foo_", "foo_e", reverse: true) 161 + ], count: 4, range: {"foo_", "foo_e"}, more: false} = MemKV.scan(kv, 1, 0, "foo_", "foo_e", reverse: true) 162 162 163 163 assert %RangeResult{pairs: [ 164 164 {"foo_d", "bar_d"}, 165 165 {"foo_c", "bar_c"}, 166 166 {"foo_b", "bar_b"}, 167 167 {"foo_a", "bar_a_2"}, 168 - ], count: 4, range: {"foo_", "foo_e"}, more: false} = MemKV.scan(kv, 2, "foo_", "foo_e", reverse: true) 168 + ], count: 4, range: {"foo_", "foo_e"}, more: false} = MemKV.scan(kv, 2, 0, "foo_", "foo_e", reverse: true) 169 169 170 170 assert %RangeResult{pairs: [ 171 171 {"foo_e", "bar_e_3"}, ··· 173 173 {"foo_c", "bar_c"}, 174 174 {"foo_b", "bar_b_3"}, 175 175 {"foo_a", "bar_a_2"}, 176 - ], count: 4, range: {"foo_", "foo_f"}, more: false} = MemKV.scan(kv, 3, "foo_", "foo_f", reverse: true) 176 + ], count: 4, range: {"foo_", "foo_f"}, more: false} = MemKV.scan(kv, 3, 0, "foo_", "foo_f", reverse: true) 177 177 end 178 178 179 179 test "limits backward", %{kv: kv} do ··· 181 181 {"foo_d", :deleted}, 182 182 {"foo_c", "bar_c"}, 183 183 {"foo_b", "bar_b_3"}, 184 - ], count: 2, range: {"foo_b", "foo_e"}, more: true} = MemKV.scan(kv, 3, "foo_", "foo_e", reverse: true, limit: 2) 184 + ], count: 2, range: {"foo_b", "foo_e"}, more: true} = MemKV.scan(kv, 3, 0, "foo_", "foo_e", reverse: true, limit: 2) 185 185 186 186 assert %RangeResult{pairs: [ 187 187 {"foo_d", :deleted}, 188 188 {"foo_c", "bar_c"}, 189 189 {"foo_b", "bar_b_3"}, 190 - ], count: 2, range: {"foo_b", "foo_e"}, more: false} = MemKV.scan(kv, 3, "foo_b", "foo_e", reverse: true, limit: 3) 190 + ], count: 2, range: {"foo_b", "foo_e"}, more: false} = MemKV.scan(kv, 3, 0, "foo_b", "foo_e", reverse: true, limit: 3) 191 191 end 192 192 end 193 193
+2
test/range_forest_test.exs
··· 147 147 {"foo_e", "foo_g", 2}, 148 148 {"foo_g", "foo_z", 1}, 149 149 ] 150 + 151 + assert RangeTree.intersect_range(rt, 1, "\xFF", "\xFF\x00") == [] 150 152 end 151 153 end 152 154