this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Store pointers in memory and read mutations from log

garrison 715111e2 fecb36a8

+34 -24
+28 -17
lib/servers/tlog.ex
··· 205 205 end 206 206 207 207 def handle_call({:peek_all, start_version, end_version}, _from, state) do 208 - tagged_queue = state.tagged_queue 209 208 # tag_popped_versions is guaranteed to contain all tags which are present in the TaggedQueue 210 209 tags = state.tag_popped_versions |> Map.keys() |> Enum.sort() 211 210 212 211 batches = 213 212 tags 214 - |> Enum.map(fn tag -> TaggedQueue.peek(tagged_queue, tag, start_version, end_version) end) 213 + |> Enum.map(fn tag -> peek_batches(state, tag, start_version, end_version) end) 215 214 |> merge_batches() 216 215 {:reply, {:ok, batches}, state} 217 216 end ··· 249 248 end_version = end_version || state.version 250 249 assert start_version <= end_version 251 250 252 - batches = TaggedQueue.peek(state.tagged_queue, tag, start_version, end_version) 251 + batches = peek_batches(state, tag, start_version, end_version) 253 252 SimServer.simulate_work([1, 2, 3, 10, 30]) 254 253 255 254 result = %PeekResult{ ··· 357 356 358 357 defp apply_batch_mutations(%State{} = state, version, tagged_mutations) 359 358 when is_integer(version) and is_list(tagged_mutations) do 359 + # TODO: remove this function once meta mutations are written elsewhere 360 360 mutations_by_tag = Enum.reduce(tagged_mutations, %{}, fn {tags, mutation}, acc -> 361 361 Enum.reduce(tags, acc, fn tag, acc -> 362 362 case acc[tag] do ··· 377 377 XKS.apply_batch(state.xks, @meta_partition, version, unwrapped) 378 378 end 379 379 380 - mutations_by_tag 381 - # TODO: this sort is not strictly needed for determinism because 382 - # TaggedQueue ets insert order should not affect anything 383 - |> Enum.sort_by(&elem(&1, 0)) 384 - |> Enum.each(fn {tag, mutations} -> 385 - # Note: we restore the mutation order with Enum.reverse here 386 - :ok = TaggedQueue.append_batch(state.tagged_queue, tag, version, Enum.reverse(mutations)) 387 - end) 388 - 389 380 tag_popped_versions = 390 381 mutations_by_tag 391 382 |> Map.keys() ··· 441 432 XKS.commit(xks) 442 433 end 443 434 435 + defp peek_batches(%State{} = state, tag, start_version, end_version) do 436 + %State{xks: xks, tagged_queue: tagged_queue} = state 437 + %XKS{block_store: block_store} = xks 438 + 439 + TaggedQueue.peek(tagged_queue, tag, start_version, end_version) 440 + |> Enum.map(fn {ver, {extent, position, group_size, checksum}} -> 441 + batch_data = XKS.Blocks.read_extent(block_store, extent, position, group_size) 442 + assert XKS.Blocks.checksum(batch_data) == checksum 443 + # :safe is not really necessary here, but we may as well 444 + # We're going to switch to a custom encoding anyway 445 + {_tag, mutations} = :erlang.binary_to_term(batch_data, [:safe]) 446 + {ver, mutations} 447 + end) 448 + end 449 + 444 450 defp append_batch(%State{} = state, %LogBatch{} = batch) do 445 451 # TODO: mutations should come from CommitBuffer grouped this way 446 452 groups = group_mutations(batch.tagged_mutations) 447 - state = do_append_groups(groups, state) 453 + state = do_append_groups(groups, batch.commit_version, state) 448 454 state 449 455 end 450 456 451 - defp do_append_groups([], %State{} = state), do: state 457 + defp do_append_groups([], _commit_version, %State{} = state), do: state 452 458 453 - defp do_append_groups([group | groups_rest], %State{} = state) do 459 + defp do_append_groups([group | groups_rest], commit_version, %State{} = state) do 454 460 %{xks: xks, current_extent: current_extent} = state 455 461 %{extent: extent, size: size} = current_extent 456 462 extent_size = xks.opts.extent_block_count * xks.opts.block_size 457 463 464 + {tag, _mutations} = group 458 465 group_data = :erlang.term_to_binary(group) 459 466 group_size = byte_size(group_data) 460 467 ··· 463 470 # Rotate extent and start this iteration over 464 471 # TODO: this is not tested 465 472 state = rotate_extent(state) 466 - do_append_groups([group | groups_rest], state) 473 + do_append_groups([group | groups_rest], commit_version, state) 467 474 468 475 false -> 469 476 position = size 470 477 XKS.Blocks.write_extent(xks.block_store, extent, position, group_data) 478 + checksum = XKS.Blocks.checksum(group_data) 479 + 480 + info = {extent, position, group_size, checksum} 481 + TaggedQueue.append_batch(state.tagged_queue, tag, commit_version, info) 471 482 472 483 state = put_in(state.current_extent.size, size + group_size) 473 - do_append_groups(groups_rest, state) 484 + do_append_groups(groups_rest, commit_version, state) 474 485 end 475 486 end 476 487
+6 -7
lib/tagged_queue.ex
··· 5 5 """ 6 6 7 7 alias Hobbes.TaggedQueue 8 - alias Hobbes.Utils 9 8 10 9 @type t :: %__MODULE__{ 11 10 table: :ets.table, ··· 22 21 %TaggedQueue{table: table} 23 22 end 24 23 25 - @spec append_batch(%TaggedQueue{}, integer, non_neg_integer, [Utils.numbered_mutation]) :: :ok 26 - def append_batch(%TaggedQueue{} = tq, tag, version, mutations) when is_integer(tag) and is_integer(version) and is_list(mutations) do 27 - true = :ets.insert(tq.table, {{tag, version}, mutations}) 24 + @spec append_batch(%TaggedQueue{}, integer, non_neg_integer, term) :: :ok 25 + def append_batch(%TaggedQueue{} = tq, tag, version, info) when is_integer(tag) and is_integer(version) do 26 + true = :ets.insert(tq.table, {{tag, version}, info}) 28 27 :ok 29 28 end 30 29 ··· 42 41 }, 43 42 ] 44 43 """ 45 - @spec peek(%TaggedQueue{}, integer, non_neg_integer, non_neg_integer) :: [{non_neg_integer, [Utils.numbered_mutation]}] 44 + @spec peek(%TaggedQueue{}, integer, non_neg_integer, non_neg_integer) :: [{non_neg_integer, term}] 46 45 def peek(%TaggedQueue{} = tq, tag, start_version, end_version) 47 46 when is_integer(tag) and is_integer(start_version) and is_integer(end_version) and start_version <= end_version do 48 47 do_scan(tq.table, tag, end_version, start_version - 1, []) ··· 51 50 52 51 defp do_scan(table, tag, end_version, prev_version, acc) do 53 52 case :ets.next_lookup(table, {tag, prev_version}) do 54 - {_key, [{{^tag, ver}, mutations}]} when ver <= end_version -> 55 - do_scan(table, tag, end_version, ver, [{ver, mutations} | acc]) 53 + {_key, [{{^tag, ver}, info}]} when ver <= end_version -> 54 + do_scan(table, tag, end_version, ver, [{ver, info} | acc]) 56 55 _ -> 57 56 acc 58 57 end