this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor TLog to use XKS (work in progress)

garrison e003a053 e1db3db3

+147 -87
+16
lib/encoding/keyset.ex
··· 18 18 @int_pos_7 0x1C 19 19 @int_pos_8 0x1D 20 20 21 + @bool_false 0x26 22 + @bool_true 0x27 23 + 21 24 @spec pack(list) :: binary 22 25 def pack(list) when is_list(list) do 23 26 Enum.map(list, &encode(&1, 0)) ··· 41 44 42 45 defp encode(nil, 0), do: <<@nil_value>> 43 46 defp encode(nil, depth) when depth > 0, do: <<@nil_value, @escape>> 47 + 48 + defp encode(false, _depth), do: <<@bool_false>> 49 + defp encode(true, _depth), do: <<@bool_true>> 44 50 45 51 defp encode(list, depth) when is_list(list) do 46 52 values = Enum.map(list, &encode(&1, depth + 1)) ··· 104 110 # Closing a nested list 105 111 defp decode(<<@nil_value, rest::binary>>, depth) when depth > 0 do 106 112 {[], rest} 113 + end 114 + 115 + defp decode(<<@bool_false, rest::binary>>, depth) do 116 + {values, tail} = decode(rest, depth) 117 + {[false | values], tail} 118 + end 119 + 120 + defp decode(<<@bool_true, rest::binary>>, depth) do 121 + {values, tail} = decode(rest, depth) 122 + {[true | values], tail} 107 123 end 108 124 109 125 defp decode(<<@binary, rest::binary>>, depth) do
+98 -71
lib/servers/tlog.ex
··· 4 4 import ExUnit.Assertions, only: [assert: 1] 5 5 6 6 alias Hobbes.Servers.Manager 7 - alias Hobbes.{TaggedQueue, StorageQueue} 8 - alias Hobbes.HybridKV 9 - alias Hobbes.KV.{FlatKV, FlatStorageKV} 10 - alias Hobbes.Structs.{Cluster, TLogStatus, LogBatch, PeekResult, RangeResult} 7 + alias Hobbes.TaggedQueue 8 + alias Hobbes.Structs.{Cluster, TLogStatus, LogBatch, PeekResult} 11 9 alias Hobbes.Encoding.Keyset 10 + 11 + alias Hobbes.XKS 12 + alias Hobbes.XKS.Reader 12 13 13 14 alias Hobbes.Utils 14 15 import Hobbes.Utils ··· 22 23 known_committed_version: non_neg_integer, 23 24 buffer: %{non_neg_integer => {GenServer.from, %LogBatch{}}}, 24 25 tagged_queue: TaggedQueue.t, 25 - storage_queue: StorageQueue.t, 26 - meta_kv: HybridKV.t, 27 26 } 28 27 29 28 @enforce_keys [ ··· 38 37 :buffer, 39 38 :tag_popped_versions, 40 39 40 + :xks, 41 41 :tagged_queue, 42 - :storage_queue, 43 - :meta_kv, 44 42 ] 45 43 defstruct @enforce_keys 46 44 end 47 45 48 46 @tick_interval 100 47 + 48 + @state_partition 0 49 + @meta_partition 1 49 50 50 51 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 51 52 ··· 112 113 SimServer.call(server, {:read_meta_store, version}) 113 114 end 114 115 115 - def init(%{cluster: %Cluster{} = cluster, path: path, id: id, prev_version: prev_version, meta_pairs: meta_pairs}) do 116 + def init(%{cluster: %Cluster{} = cluster, path: _path, id: id, prev_version: prev_version, meta_pairs: meta_pairs}) do 116 117 # Init new TLog 117 118 assert is_integer(id) 118 119 assert is_integer(prev_version) 119 120 assert is_list(meta_pairs) 120 121 121 - {storage_module, storage_kv} = 122 - case SimServer.simulated?() do 123 - true -> {FlatStorageKV, FlatStorageKV.new(path)} 124 - false -> {FlatKV, FlatKV.new()} 125 - end 126 - meta_kv = HybridKV.new(storage_module, storage_kv) 127 - storage_queue = StorageQueue.new(storage_module, storage_kv) 122 + xks = XKS.new() 123 + # TODO: init state partition once XKS does not init partition 0 during new() 124 + XKS.init_partition(xks, @meta_partition) 128 125 129 126 # Write id to storage 130 - storage_module.put(storage_kv, special_id_key(), Keyset.pack([id])) 127 + XKS.apply_batch(xks, @state_partition, 1, [{:write, special_id_key(), Keyset.pack([id])}]) 128 + 131 129 # Write initial state to storage 132 - StorageQueue.put_state(storage_queue, %{locked?: false, version: prev_version, known_committed_version: prev_version}) 133 - # Write initial meta pairs to storage 134 - HybridKV.load_storage(meta_kv, meta_pairs) 135 - # Commit the storage KV (shared between meta_kv and storage_queue) 136 - HybridKV.commit(meta_kv) 130 + put_tlog_state(xks, 1, %{ 131 + locked?: false, 132 + version: prev_version, 133 + known_committed_version: prev_version, 134 + }) 137 135 138 - do_init(cluster, meta_kv, storage_queue) 139 - end 136 + # Write initial meta pairs 137 + load_meta_pairs(xks, meta_pairs) 140 138 141 - def init(%{cluster: %Cluster{} = cluster, path: path}) do 142 - # Init existing TLog from storage 143 - {storage_module, storage_kv} = 144 - case SimServer.simulated?() do 145 - true -> {FlatStorageKV, FlatStorageKV.new(path)} 146 - false -> {FlatKV, FlatKV.new()} 147 - end 139 + # Commit metadata 140 + compact_and_commit(xks) 148 141 149 - meta_kv = HybridKV.new(storage_module, storage_kv) 150 - storage_queue = StorageQueue.new(storage_module, storage_kv) 142 + do_init(cluster, xks) 143 + end 151 144 152 - do_init(cluster, meta_kv, storage_queue) 145 + def init(%{cluster: %Cluster{} = _cluster, path: _path}) do 146 + # TODO: load XKS from storage 147 + raise "not implemented" 153 148 end 154 149 155 - def do_init(%Cluster{} = cluster, %HybridKV{} = meta_kv, %StorageQueue{} = storage_queue) do 150 + def do_init(%Cluster{} = cluster, %XKS{} = xks) do 151 + # TODO: version could be >1 if loaded from disk, use XKS version 152 + load_read_version = 1 153 + 156 154 # Load id from storage 157 - [id] = meta_kv.storage_module.get(meta_kv.storage_kv, special_id_key()) |> Keyset.unpack() 155 + [id] = Reader.get(xks, @state_partition, load_read_version, special_id_key()) |> Keyset.unpack() 158 156 assert is_integer(id) 159 157 160 158 # Load persistent state from storage 161 - state_fields = StorageQueue.get_state(storage_queue) 159 + state_fields = get_tlog_state(xks, load_read_version) 162 160 163 161 state = %State{ 164 162 id: id, ··· 172 170 buffer: %{}, 173 171 tag_popped_versions: %{}, 174 172 173 + xks: xks, 175 174 tagged_queue: TaggedQueue.new(), 176 - storage_queue: storage_queue, 177 - meta_kv: meta_kv, 178 175 } 179 176 # Sanity check persistent state 180 177 assert is_boolean(state.locked?) 181 178 assert is_integer(state.version) and state.version >= 0 182 179 assert is_integer(state.known_committed_version) and state.known_committed_version >= 0 183 180 184 - state = 185 - StorageQueue.peek_batches(state.storage_queue) 186 - |> Enum.reduce(state, fn {version, tagged_mutations}, state -> 187 - apply_batch_mutations(state, version, tagged_mutations) 188 - end) 181 + # TODO: load batches from log 189 182 190 183 SimServer.send_after(self(), :tick, @tick_interval) 191 184 {:ok, state} ··· 219 212 assert version >= state.known_committed_version 220 213 assert version <= state.version 221 214 222 - %RangeResult{pairs: pairs, more: false} = HybridKV.scan(state.meta_kv, version, all_keys_prefix(), all_keys_end()) 215 + raise "untested, look at output before removing this" 216 + pairs = Reader.scan(state.xks, @meta_partition, version, all_keys_prefix(), all_keys_end()) 223 217 {:reply, {:ok, pairs}, state} 224 218 end 225 219 ··· 258 252 end 259 253 260 254 def handle_cast(:lock, %State{} = state) do 255 + %State{xks: xks, version: version} = state 256 + 261 257 state = %{state | locked?: true} 262 - StorageQueue.put_state(state.storage_queue, Map.take(state, [:locked?, :version, :known_committed_version])) 263 - StorageQueue.commit(state.storage_queue) 258 + put_tlog_state(xks, version, Map.take(state, [:locked?, :version, :known_committed_version])) 259 + compact_and_commit(xks) 264 260 265 261 {:noreply, state} 266 262 end ··· 312 308 313 309 @spec do_write_batch(%State{}, term, %LogBatch{}) :: %State{} 314 310 defp do_write_batch(%State{} = state, from, %LogBatch{} = batch) do 311 + %State{xks: xks} = state 312 + 315 313 # Apply mutations in-memory 316 314 state = apply_batch_mutations(state, batch.commit_version, batch.tagged_mutations) 317 315 318 316 # Write mutations to storage 319 317 # TODO: do we need to write empty batches? 320 - StorageQueue.append_batch(state.storage_queue, batch.commit_version, batch.tagged_mutations) 321 - # Pop batches which are popped by all tags 322 - min_popped_version = state.tag_popped_versions |> Map.values() |> Enum.min(fn -> 0 end) 323 - StorageQueue.pop_batches(state.storage_queue, min_popped_version) 318 + append_batch(state, batch) 324 319 325 320 # Update versions 326 321 state = %{state | ··· 330 325 331 326 # Write persistent state to storage 332 327 # TODO: group commit storage? 333 - StorageQueue.put_state(state.storage_queue, Map.take(state, [:locked?, :version, :known_committed_version])) 334 - 335 - # Flush meta mutations which are known to be committed to storage and pop them from the queue 336 - # (once mutations are known to be committed we do not need them in multiversion storage for recovery because versions < KCV cannot be chosen) 337 - meta_kv = HybridKV.flush(state.meta_kv, state.known_committed_version) 338 - state = %{state | meta_kv: meta_kv} 339 - 340 - # Commit both storage_queue *and* meta_kv (they share the same underlying storage) 341 - :ok = HybridKV.commit(state.meta_kv) 328 + put_tlog_state(xks, state.version, Map.take(state, [:locked?, :version, :known_committed_version])) 342 329 343 - # Pop the meta tag for mutations which are now in storage 344 - TaggedQueue.pop(state.tagged_queue, meta_tag(), state.known_committed_version) 330 + # TODO: fsync logs but do not compact LSM or commit Superblock 345 331 346 332 # Ack to CommitBuffer 347 333 SimServer.reply(from, :ok) ··· 365 351 # Apply meta mutations to meta store if present 366 352 case mutations_by_tag[-1] do 367 353 nil -> :noop 368 - # Note Enum.reverse 369 - meta_mutations -> apply_meta(state.meta_kv, version, Enum.reverse(meta_mutations)) 354 + meta_mutations -> 355 + # Note that this reverses `meta_mutations` 356 + unwrapped = Enum.reduce(meta_mutations, [], fn {_i, mut}, acc -> [mut | acc] end) 357 + XKS.apply_batch(state.xks, @meta_partition, version, unwrapped) 370 358 end 371 359 372 360 mutations_by_tag ··· 389 377 %{state | tag_popped_versions: tag_popped_versions} 390 378 end 391 379 392 - defp apply_meta(%HybridKV{} = kv, version, meta_mutations) do 393 - Enum.each(meta_mutations, fn 394 - {_i, {:write, meta_prefix() <> _ = k, v}} -> 395 - HybridKV.put(kv, version, k, v) 396 - {_i, {:clear, meta_prefix() <> _ = k}} -> 397 - HybridKV.delete(kv, version, k) 398 - end) 380 + @locked_key "/locked?" 381 + @version_key "/version" 382 + @kcv_key "/known_committed_version" 383 + 384 + defp get_tlog_state(%XKS{} = xks, at_version) do 385 + [locked?] = Reader.get(xks, @state_partition, at_version, @locked_key) |> Keyset.unpack() 386 + [version] = Reader.get(xks, @state_partition, at_version, @version_key) |> Keyset.unpack() 387 + [known_committed_version] = Reader.get(xks, @state_partition, at_version, @kcv_key) |> Keyset.unpack() 388 + 389 + assert is_boolean(locked?) 390 + assert is_integer(version) 391 + assert is_integer(known_committed_version) 392 + 393 + %{ 394 + locked?: locked?, 395 + version: version, 396 + known_committed_version: known_committed_version, 397 + } 398 + end 399 + 400 + def put_tlog_state(%XKS{} = xks, at_version, fields) do 401 + %{ 402 + locked?: locked?, 403 + version: version, 404 + known_committed_version: known_committed_version, 405 + } = fields 406 + 407 + XKS.apply_batch(xks, @state_partition, at_version, [ 408 + {:write, @locked_key, Keyset.pack([locked?])}, 409 + {:write, @version_key, Keyset.pack([version])}, 410 + {:write, @kcv_key, Keyset.pack([known_committed_version])}, 411 + ]) 412 + end 413 + 414 + defp load_meta_pairs(%XKS{} = xks, pairs) do 415 + mutations = Enum.map(pairs, fn {k, v} -> {:write, k, v} end) 416 + XKS.apply_batch(xks, @meta_partition, 1, mutations) 417 + end 418 + 419 + defp compact_and_commit(%XKS{} = xks) do 420 + # TODO: force a compaction of all partitions 421 + XKS.commit(xks) 422 + end 423 + 424 + defp append_batch(%State{} = _state, %LogBatch{} = _batch) do 425 + # TODO 399 426 end 400 427 end
+3 -3
lib/xks/fuzz/iterator_fuzz.ex
··· 79 79 end 80 80 end 81 81 82 - defp construct_iterators(xks, start_key, reverse?) do 82 + defp construct_iterators(%XKS{} = xks, start_key, reverse?) do 83 83 partition = 0 84 - [%{memtable: mt}] = Manifest.list_memtables(xks.manifest, partition, 0) 84 + [%{memtable: mt}] = Manifest.list_memtables(xks.manifest, partition, 1) 85 85 memtable_iterator = Merge.iterator_for_memtable(mt, start_key, reverse?) 86 86 87 87 # TODO: get real epoch? 88 - epoch = 1 88 + epoch = 2 89 89 level = 1 90 90 level_iterator = Merge.iterator_for_level(xks.block_store, xks.manifest, epoch, partition, level, start_key, reverse?) 91 91
+28 -11
lib/xks/xks.ex
··· 114 114 version_atomic: :atomics.new(1, signed: false), 115 115 } 116 116 117 - epoch = :atomics.get(xks.epoch_atomic, 1) 118 - assert epoch == 0 119 - default_partition = 0 120 - memtable_info = %MemtableInfo{ 121 - partition: default_partition, 122 - epoch_min: epoch, 123 - memtable: Memtable.new(), 124 - } 125 - Manifest.insert_memtable(xks.manifest, memtable_info) 117 + # Init default partition 118 + # TODO: remove and force users to init themselves 119 + init_partition(xks, 0) 126 120 127 121 xks 128 122 end ··· 211 205 xks 212 206 end 213 207 208 + @spec init_partition(t, integer) :: :ok 209 + def init_partition(%XKS{} = xks, partition) do 210 + %XKS{ 211 + manifest: manifest, 212 + epoch_atomic: epoch_atomic, 213 + } = xks 214 + 215 + prev_epoch = :atomics.get(epoch_atomic, 1) 216 + assert Manifest.list_memtables(manifest, partition, prev_epoch) == [] 217 + 218 + new_epoch = prev_epoch + 1 219 + memtable_info = %MemtableInfo{ 220 + partition: partition, 221 + epoch_min: new_epoch, 222 + memtable: Memtable.new(), 223 + } 224 + Manifest.insert_memtable(manifest, memtable_info) 225 + 226 + :atomics.put(epoch_atomic, 1, new_epoch) 227 + :ok 228 + end 229 + 214 230 @spec apply_batch(t, non_neg_integer, non_neg_integer, list) :: :ok 215 231 def apply_batch(%XKS{} = xks, partition, version, mutations) do 216 232 %XKS{ ··· 220 236 } = xks 221 237 epoch = :atomics.get(epoch_atomic, 1) 222 238 prev_version = :atomics.get(version_atomic, 1) 223 - # Version must be monotonic 224 - assert version > prev_version 239 + # Version must be monotonic, but multiple `apply_batch` calls 240 + # at a single version are okay with a single writer 241 + assert version >= prev_version 225 242 226 243 [%MemtableInfo{} = active_info | _] = Manifest.list_memtables(manifest, partition, epoch) 227 244 assert active_info.epoch_max == :infinity
+2 -2
test/encoding_test.exs
··· 40 40 41 41 test "encodes and decodes nested" do 42 42 keys = [ 43 - [0, [1, 2, [3, 4]]], 43 + [0, true, [1, false, 2, [3, 4]]], 44 44 [0, "foo", [1, "bar", 2, [3]]], 45 - [0, "fo\x00o", [1, "ba\x00\x00r\x00", 2, [3]]], 45 + [0, "fo\x00o", [1, "ba\x00\x00r\x00", 2, true, false, [3]]], 46 46 ] 47 47 48 48 for k <- keys do