this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add level iterator

garrison 65a7d804 3d9515ac

+125 -35
+51 -31
lib/xks/manifest.ex
··· 3 3 4 4 alias Hobbes.XKS.{Blocks, Memtable} 5 5 6 - import ExUnit.Assertions, only: [assert: 1] 6 + @memtable_level -1 7 7 8 - @memtable_level -1 8 + defmodule Table do 9 + @type t :: %__MODULE__{ 10 + last_block_address: {non_neg_integer, Blocks.checksum}, 11 + start_key: {binary, non_neg_integer}, 12 + end_key: {binary, non_neg_integer}, 13 + manifest_key: tuple, 14 + } 15 + @enforce_keys [ 16 + :last_block_address, 17 + :start_key, 18 + :end_key, 19 + :manifest_key, 20 + ] 21 + defstruct @enforce_keys 22 + end 9 23 10 24 @spec new :: t 11 25 def new do ··· 70 84 :ok 71 85 end 72 86 73 - @spec seek_table(t, non_neg_integer, non_neg_integer, binary, non_neg_integer) :: {:ok, tuple} | :error 74 - def seek_table(manifest, epoch, level, key, version) do 75 - seek_key = {key, version} 87 + def level_start_sentinel(level), do: {level, "", 0, -1} 88 + # TODO: standardize last key of keyspace 89 + def level_end_sentinel(level), do: {level, "\xFF\xFF\xFF\xFF", :infinity, :infinity} 90 + 91 + @spec seek_prev_table(t, non_neg_integer, non_neg_integer, binary, non_neg_integer) :: {:ok, Table.t} | :error 92 + def seek_prev_table(manifest, epoch, level, key, version) do 76 93 # version + 1 because prev() is exclusive and we need a table with start_key <= seek_key 77 94 # epoch = 0 because we are trying to seek the previous key in the manifest keyspace 78 95 # E.g. if we passed epoch=10 we could get {level, key, version + 1, 9} when we want to get {level, key, version, some_epoch} 79 96 prev_key = {level, key, version + 1, 0} 80 97 81 - do_seek_table(manifest, epoch, level, seek_key, prev_key, false) 98 + do_seek_prev_table(manifest, epoch, level, prev_key, false) 82 99 end 83 100 84 - defp do_seek_table(manifest, epoch, level, seek_key, prev_key, prev_tombstone?) do 101 + defp do_seek_prev_table(manifest, epoch, level, prev_key, prev_tombstone?) do 85 102 case :ets.prev_lookup(manifest, prev_key) do 86 103 {{^level, _k, _ver, _ep} = key, _obj} when prev_tombstone? -> 87 104 # The previous entry we saw (which is *next* in the keyspace) was a tombstone, 88 105 # so this entry has been deleted at `epoch` 89 - do_seek_table(manifest, epoch, level, seek_key, key, false) 106 + do_seek_prev_table(manifest, epoch, level, key, false) 90 107 91 108 {{^level, _k, _ver, ep} = key, _obj} when ep > epoch -> 92 109 # This entry is not visible at `epoch` 93 - do_seek_table(manifest, epoch, level, seek_key, key, false) 110 + do_seek_prev_table(manifest, epoch, level, key, false) 94 111 95 112 {{^level, _k, _ver, _ep} = key, [{_key, :tombstone}]} -> 96 113 # This entry is visible and is a tombstone, 97 - # so we set the prev_tombstone? flag to cancel out the next entry 98 - do_seek_table(manifest, epoch, level, seek_key, key, true) 99 - 100 - {{^level, sk_key, sk_ver, _ep} = key, [{_key, value}]} -> 101 - # This entry is visible and may contain `seek_key` 102 - {ek_key, ek_ver, _lb_index, _lb_checksum, _id} = value 114 + # so we set the prev_tombstone? flag to cancel out the next (previous in the keyspace) entry 115 + do_seek_prev_table(manifest, epoch, level, key, true) 103 116 104 - # This invariant is guaranteed because we are calling prev(seek_key + 1) to get here 105 - assert {sk_key, sk_ver} <= seek_key 106 - case seek_key < {ek_key, ek_ver} do 107 - true -> 108 - # If seek_key < end_key, this table contains `seek_key` 109 - {:ok, table_from_kv(key, value)} 110 - 111 - false -> 112 - # If seek_key >= end_key, that means there is a "hole" in the keyspace 113 - # and there is no table which contains `seek_key` 114 - :error 115 - end 117 + {{^level, _sk_key, _sk_ver, _ep} = key, [{_key, value}]} -> 118 + # This entry is visible 119 + {:ok, table_from_kv(key, value)} 116 120 117 121 _ -> 118 122 # Hit the start of the level without finding a table ··· 120 124 end 121 125 end 122 126 123 - @spec next_table(t, non_neg_integer, non_neg_integer, tuple) :: {:ok, tuple} | :error 124 - def next_table(manifest, epoch, level, {:table, _i, _ck, _start_key, _end_key, table_key} = _table) do 125 - do_next_table(manifest, epoch, level, table_key) 127 + @spec seek_next_table(t, non_neg_integer, non_neg_integer, binary, non_neg_integer) :: {:ok, Table.t} | :error 128 + def seek_next_table(manifest, epoch, level, key, version) do 129 + prev_key = 130 + case seek_prev_table(manifest, epoch, level, key, version) do 131 + {:ok, %{manifest_key: key}} -> key 132 + :error -> level_start_sentinel(level) 133 + end 134 + 135 + next_table(manifest, epoch, level, %{manifest_key: prev_key}) 136 + end 137 + 138 + @spec next_table(t, non_neg_integer, non_neg_integer, map) :: {:ok, Table.t} | :error 139 + def next_table(manifest, epoch, level, %{manifest_key: manifest_key} = _table) do 140 + do_next_table(manifest, epoch, level, manifest_key) 126 141 end 127 142 128 143 defp do_next_table(manifest, epoch, level, prev_key) do ··· 152 167 {_level, sk_key, sk_ver, _epoch} = manifest_key 153 168 {ek_key, ek_ver, lb_index, lb_checksum, _id} = manifest_value 154 169 155 - {:table, lb_index, lb_checksum, {sk_key, sk_ver}, {ek_key, ek_ver}, manifest_key} 170 + %Table{ 171 + last_block_address: {lb_index, lb_checksum}, 172 + start_key: {sk_key, sk_ver}, 173 + end_key: {ek_key, ek_ver}, 174 + manifest_key: manifest_key, 175 + } 156 176 end 157 177 158 178 @spec list_overlapping_tables(t, non_neg_integer, non_neg_integer, {binary, non_neg_integer}, {binary, non_neg_integer}) :: [tuple]
+63 -1
lib/xks/merge.ex
··· 1 1 defmodule Hobbes.XKS.Merge do 2 - alias Hobbes.XKS.Memtable 2 + alias Hobbes.XKS.{Manifest, Memtable, Blocks} 3 3 4 + defmodule LevelIterator do 5 + @enforce_keys [ 6 + :block_store, 7 + :manifest, 8 + :epoch, 9 + :level, 10 + :table, 11 + :table_block_addresses, 12 + :current_pair, 13 + ] 14 + defstruct @enforce_keys 15 + end 16 + 17 + # {:memtable_iterator, memtable, key, version, value} 4 18 @type memtable_iterator :: {:memtable_iterator, Memtable.t, binary | :infinity, non_neg_integer | :infinity, binary | nil} 5 19 @type iterator :: memtable_iterator 6 20 ··· 45 59 advance({:memtable_iterator, memtable, start_key, -1, nil}) 46 60 end 47 61 62 + def iterator_for_level(block_store, manifest, epoch, level, start_key) do 63 + case Manifest.seek_next_table(manifest, epoch, level, start_key, 0) do 64 + {:ok, table} -> 65 + %LevelIterator{ 66 + block_store: block_store, 67 + manifest: manifest, 68 + epoch: epoch, 69 + level: level, 70 + table: table, 71 + table_block_addresses: {}, 72 + current_pair: nil, 73 + } 74 + |> load_table() 75 + 76 + :error -> 77 + # TODO 78 + raise "" 79 + end 80 + end 81 + 48 82 defp current_key({:memtable_iterator, _memtable, key, version, _value}) do 49 83 {key, version} 50 84 end ··· 60 94 :"$end_of_table" -> 61 95 {:memtable_iterator, memtable, :infinity, :infinity, nil} 62 96 end 97 + end 98 + 99 + defp load_table(%LevelIterator{} = iterator) do 100 + {lb_index, lb_checksum} = iterator.table.last_block_address 101 + block_data = Blocks.read(iterator.block_store, lb_index, lb_checksum) 102 + block_addresses = decode_block_addresses(block_data) 103 + 104 + %{iterator | 105 + table_block_addresses: block_addresses, 106 + } 107 + end 108 + 109 + # TODO: use macros for these constants, they come from Compaction 110 + @table_metadata_size (4 + 4 + 4) 111 + @block_address_size (8 + 16) 112 + 113 + defp decode_block_addresses(block_data) do 114 + <<block_count::integer-32, _::integer-32, _::integer-32, _rest::binary>> = block_data 115 + 116 + 0..(block_count - 1) 117 + |> Enum.reduce([], fn i, acc -> 118 + offset = @table_metadata_size + (i * @block_address_size) 119 + <<_::binary-size(offset), block_index::integer-64, block_checksum::binary-16, _rest::binary>> = block_data 120 + # These are in the wrong order because the acc will be reversed 121 + [block_checksum, block_index | acc] 122 + end) 123 + |> Enum.reverse() 124 + |> List.to_tuple() 63 125 end 64 126 end
+11 -3
lib/xks/xks.ex
··· 63 63 manifest = xks.manifest 64 64 epoch = :atomics.get(xks.epoch_atomic, 1) 65 65 66 - Manifest.seek_table(manifest, epoch, 1, key, version) 66 + Manifest.seek_prev_table(manifest, epoch, 1, key, version) 67 67 #|> dbg() 68 68 #|> tap(fn result -> is_tuple(result) && dbg(Manifest.next_table(manifest, epoch, 1, elem(result, 1))) end) 69 69 ··· 83 83 84 84 @spec scan(t, non_neg_integer, binary, binary) :: [{binary, binary}] 85 85 def scan(%XKS{} = xks, version, start_key, end_key) do 86 - manifest = xks.manifest 86 + %XKS{manifest: manifest, block_store: block_store} = xks 87 87 epoch = :atomics.get(xks.epoch_atomic, 1) 88 88 89 89 # start_key has version=0 because we have no way of knowing its latest version in the DB 90 90 # end_key has version=0 because the end key is exclusive so we won't actually read it 91 91 _tables = Manifest.list_overlapping_tables(manifest, epoch, 1, {start_key, 0}, {end_key, 0}) 92 92 93 - iterators = 93 + memtable_iterators = 94 94 Manifest.list_memtables(xks.manifest, epoch) 95 95 |> Enum.map(fn {_id, memtable} -> 96 96 Merge.iterator_for_memtable(memtable, start_key) 97 97 end) 98 + 99 + _level_iterators = 100 + Enum.map(1..1, fn level -> 101 + Merge.iterator_for_level(block_store, manifest, epoch, level, start_key) 102 + end) 103 + #|> dbg() 104 + 105 + iterators = memtable_iterators 98 106 99 107 Merge.new(iterators) 100 108 |> do_scan(version, end_key, [])