this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Decode table index

garrison be1a1f75 a11a02eb

+73 -25
+13 -15
lib/xks/compaction.ex
··· 93 93 {index, checksum} 94 94 end) 95 95 96 - {index_blocks_data, index_slots_data, index_slot_count} = encode_index(data_blocks) 96 + data_block_count = length(data_blocks) 97 + {index_blocks_data, index_slots_data, index_slot_count} = encode_index(data_blocks, data_block_count) 97 98 98 99 # TODO: most of the time we should be able to allocate these blocks up-front to avoid fragmentation 99 100 index_block_indexes = FreeList.reserve(free_list, length(index_blocks_data)) ··· 162 163 encode_block_addresses(rest, acc, size_acc + @block_address_size) 163 164 end 164 165 165 - defp encode_index(data_blocks) do 166 - {blocks, slots, count} = do_encode_index(0, [], data_blocks, [], [], 0, [], 0) 166 + defp encode_index(data_blocks, data_block_count) do 167 + # Note: index_block_i is started at data_block_count so that it represents the i of the index block within the table 168 + {blocks, slots, count} = do_encode_index(0, [], data_blocks, [], [], 0, [], 0, data_block_count) 167 169 assert length(slots) == count 168 170 { 169 171 Enum.reduce(blocks, [], fn bl, acc -> [pad_index_block(bl) | acc] end), ··· 198 200 # Where `index_block_i` is the index of the block *within the table* containing the entry, 199 201 # `offset` is the byte offset of the entry within that block, 200 202 # and `key_size` is the size of the entry's key in bytes. 201 - defp do_encode_index(_i, [], [], cur_acc, blocks_acc, size_acc, slots_acc, count) do 203 + defp do_encode_index(_i, [], [], cur_acc, blocks_acc, size_acc, slots_acc, count, _index_block_i) do 202 204 { 203 205 [{cur_acc, size_acc} | blocks_acc], 204 206 slots_acc, ··· 206 208 } 207 209 end 208 210 209 - defp do_encode_index(_i, [], [data_block | data_blocks_rest], cur_acc, blocks_acc, size_acc, slots_acc, count) do 211 + defp do_encode_index(_i, [], [data_block | data_blocks_rest], cur_acc, blocks_acc, size_acc, slots_acc, count, index_block_i) do 210 212 {:data_block, i, _index, _checksum, _fk, _lk, entries} = data_block 211 - do_encode_index(i, entries, data_blocks_rest, cur_acc, blocks_acc, size_acc, slots_acc, count) 213 + do_encode_index(i, entries, data_blocks_rest, cur_acc, blocks_acc, size_acc, slots_acc, count, index_block_i) 212 214 end 213 215 214 - defp do_encode_index(block_i, [entry | rest], data_blocks, cur_acc, blocks_acc, size_acc, slots_acc, count) do 216 + defp do_encode_index(block_i, [entry | rest], data_blocks, cur_acc, blocks_acc, size_acc, slots_acc, count, index_block_i) do 215 217 {{key, version}, offset} = entry 216 218 key_size = byte_size(key) 217 219 entry_size = key_size + @table_index_entry_overhead_bytes 218 220 219 - {cur_acc, blocks_acc, size_acc} = 221 + {cur_acc, blocks_acc, size_acc, index_block_i} = 220 222 case (size_acc + entry_size) > @block_size do 221 223 # Rotate current block 222 224 # TODO: this code is currently untested because block size is too large 223 - true -> {[], [{cur_acc, size_acc} | blocks_acc], 0} 224 - false -> {cur_acc, blocks_acc, size_acc} 225 + true -> {[], [{cur_acc, size_acc} | blocks_acc], 0, index_block_i + 1} 226 + false -> {cur_acc, blocks_acc, size_acc, index_block_i} 225 227 end 226 228 227 229 # [key, block_i, offset] 228 230 # Note iolist is reversed here 229 231 cur_acc = [<<version::integer-64, block_i::integer-8, offset::integer-24>>, key | cur_acc] 230 232 231 - # length() is not efficient but this will rarely be >0 in practice 232 - # TODO: use the index of the block within the table instead of starting from 0 233 - index_block_i = length(blocks_acc) 234 - 235 233 # [index_block, slot_offset, key_size] 236 234 slots_acc = [<<index_block_i::integer-16, size_acc::integer-24, key_size::integer-16>> | slots_acc] 237 235 size_acc = size_acc + entry_size 238 236 count = count + 1 239 237 240 - do_encode_index(block_i, rest, data_blocks, cur_acc, blocks_acc, size_acc, slots_acc, count) 238 + do_encode_index(block_i, rest, data_blocks, cur_acc, blocks_acc, size_acc, slots_acc, count, index_block_i) 241 239 end 242 240 243 241 defp do_compact_table(_xks, :empty = state, _count, _block_indexes, data_blocks_acc, initial_sub) do
+60 -10
lib/xks/merge.ex
··· 8 8 :epoch, 9 9 :level, 10 10 :table, 11 - :table_block_addresses, 11 + :table_metadata, 12 12 :current_pair, 13 + ] 14 + defstruct @enforce_keys 15 + end 16 + 17 + defmodule TableMetadata do 18 + @enforce_keys [ 19 + :last_block_data, 20 + :block_addresses, 21 + :subtable_count, 13 22 ] 14 23 defstruct @enforce_keys 15 24 end ··· 68 77 epoch: epoch, 69 78 level: level, 70 79 table: table, 71 - table_block_addresses: {}, 80 + table_metadata: nil, 72 81 current_pair: nil, 73 82 } 74 - |> load_table() 83 + |> load_table_metadata() 84 + |> tap(fn it -> dbg(dump_index(it), limit: :infinity) end) 75 85 76 86 :error -> 77 87 # TODO ··· 96 106 end 97 107 end 98 108 99 - defp load_table(%LevelIterator{} = iterator) do 109 + defp load_table_metadata(%LevelIterator{} = iterator) do 100 110 {lb_index, lb_checksum} = iterator.table.last_block_address 101 111 block_data = Blocks.read(iterator.block_store, lb_index, lb_checksum) 102 - block_addresses = decode_block_addresses(block_data) 112 + 113 + <<block_count::integer-32, _::integer-32, subtable_count::integer-32, _rest::binary>> = block_data 114 + block_addresses = decode_block_addresses(block_data, block_count) 103 115 104 - %{iterator | 105 - table_block_addresses: block_addresses, 116 + metadata = %TableMetadata{ 117 + last_block_data: block_data, 118 + block_addresses: block_addresses, 119 + subtable_count: subtable_count, 106 120 } 121 + 122 + %{iterator | table_metadata: metadata} 107 123 end 108 124 109 125 # TODO: use macros for these constants, they come from Compaction 110 126 @table_metadata_size (4 + 4 + 4) 111 127 @block_address_size (8 + 16) 112 - 113 - defp decode_block_addresses(block_data) do 114 - <<block_count::integer-32, _::integer-32, _::integer-32, _rest::binary>> = block_data 128 + @index_slot_size (2 + 3 + 2) 115 129 130 + defp decode_block_addresses(block_data, block_count) do 116 131 0..(block_count - 1) 117 132 |> Enum.reduce([], fn i, acc -> 118 133 offset = @table_metadata_size + (i * @block_address_size) ··· 122 137 end) 123 138 |> Enum.reverse() 124 139 |> List.to_tuple() 140 + end 141 + 142 + @doc false 143 + def dump_index(%LevelIterator{} = iterator) do 144 + %TableMetadata{last_block_data: lb_data, block_addresses: addresses, subtable_count: sub_count} = iterator.table_metadata 145 + 146 + block_count = div(tuple_size(addresses), 2) 147 + index_slots_offset = @table_metadata_size + (block_count * @block_address_size) 148 + 149 + Enum.map(0..(sub_count - 1), fn i -> 150 + slot_offset = index_slots_offset + (i * @index_slot_size) 151 + << 152 + _::binary-size(slot_offset), 153 + index_block_i::integer-16, 154 + entry_offset::integer-24, 155 + key_size::integer-16, 156 + _rest::binary, 157 + >> = lb_data 158 + 159 + index_block_index = elem(addresses, index_block_i * 2) 160 + index_block_checksum = elem(addresses, (index_block_i * 2) + 1) 161 + 162 + block_data = Blocks.read(iterator.block_store, index_block_index, index_block_checksum) 163 + 164 + << 165 + _::binary-size(entry_offset), 166 + sub_sk_key::binary-size(key_size), 167 + sub_sk_version::integer-64, 168 + sub_data_block_i::integer-8, 169 + sub_offset::integer-24, 170 + _rest::binary, 171 + >> = block_data 172 + 173 + {sub_sk_key, sub_sk_version, sub_data_block_i, sub_offset} 174 + end) 125 175 end 126 176 end