this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Apply replayed log as partitioned batches in tlog

garrison ef178b8c c415a4ba

+62 -43
+62 -43
lib/servers/tlog.ex
··· 692 692 } 693 693 end 694 694 695 - defp write_queue(%XKS{} = xks, tag, commit_version, extent, position, entry_size, checksum) do 696 - XKS.apply_batch(xks, @queue_partition, commit_version, [pack_queue_mutation(tag, commit_version, extent, position, entry_size, checksum)]) 697 - :ok 698 - end 699 - 700 - defp maybe_write_meta({meta_tag() = _tag, mutations} = _group, commit_version, %XKS{} = xks) do 701 - XKS.apply_batch(xks, @meta_partition, commit_version, mutations) 702 - :ok 703 - end 704 - 705 - defp maybe_write_meta(_group, _commit_version, _xks), do: :ok 706 - 707 695 @spec replay_log(XKS.t, map, non_neg_integer, non_neg_integer) :: {non_neg_integer, non_neg_integer} 708 696 defp replay_log(%XKS{} = xks, current_extent, startup_version, startup_kcv) do 709 697 %{extent: extent_index, nonce: extent_nonce} = current_extent ··· 716 704 entries_reversed = do_read_log(extent_data, startup_version, prev_checksum, [], 0) 717 705 entries = Enum.reverse(entries_reversed) 718 706 719 - # Replay the log against the queue and meta partitions 720 - {restored_version, restored_kcv} = 721 - Enum.reduce(entries, {startup_version, startup_kcv}, fn entry, {version_acc, kcv_acc} -> 722 - %{ 723 - commit_version: commit_version, 724 - known_committed_version: entry_kcv, 725 - entry_checksum: checksum, 726 - group_data: group_data, 727 - position: position, 728 - size: size, 729 - } = entry 730 - assert commit_version >= version_acc 731 - assert commit_version > startup_version 732 - assert entry_kcv >= kcv_acc 733 - assert entry_kcv >= startup_kcv 707 + {restored_version, restored_kcv} = do_replay_log(entries, xks, extent_index, [], [], startup_version, startup_kcv) 708 + 709 + assert restored_version >= startup_version 710 + assert restored_kcv >= startup_kcv 711 + {restored_version, restored_kcv} 712 + end 713 + 714 + defp do_replay_log([], %XKS{} = xks, _extent_index, queue_acc, meta_acc, last_version, last_kcv) do 715 + :ok = replay_apply_accs(xks, last_version, queue_acc, meta_acc) 716 + {last_version, last_kcv} 717 + end 718 + 719 + defp do_replay_log([entry | entries_rest], %XKS{} = xks, extent_index, queue_acc, meta_acc, last_version, _last_kcv) do 720 + %{ 721 + commit_version: commit_version, 722 + known_committed_version: known_committed_version, 723 + position: position, 724 + size: entry_size, 725 + entry_checksum: entry_checksum, 726 + group_data: group_data, 727 + } = entry 728 + {tag, group_mutations} = :erlang.binary_to_term(group_data, [:safe]) 729 + 730 + {queue_acc, meta_acc} = 731 + case (commit_version > last_version) do 732 + true -> 733 + # Note: this will trigger on the first iteration (when last_version is startup_version), 734 + # but it will noop because both accs are [] 735 + # If it did not noop XKS would crash because `last_version` (`startup_version`) is already the XKS version 736 + :ok = replay_apply_accs(xks, last_version, queue_acc, meta_acc) 737 + {[], []} 738 + 739 + false -> 740 + assert commit_version == last_version 741 + {queue_acc, meta_acc} 742 + end 743 + 744 + {queue_acc, meta_acc} = 745 + case tag do 746 + -2 -> 747 + {queue_acc, meta_acc} 734 748 735 - # TODO: mutations are needed for meta tags, but it would be more efficient to 736 - # store the tag explicitly and only decode the group if the tag is meta_tag() 737 - {tag, _mutations} = group = :erlang.binary_to_term(group_data, [:safe]) 749 + meta_tag() -> 750 + { 751 + [pack_queue_mutation(tag, commit_version, extent_index, position, entry_size, entry_checksum) | queue_acc], 752 + Enum.reverse(group_mutations, meta_acc), 753 + } 738 754 739 - :ok = XKS.set_max_persist_version(xks, commit_version) 740 - case tag do 741 - -2 -> 742 - :noop 743 - _ -> 744 - :ok = write_queue(xks, tag, commit_version, extent_index, position, size, checksum) 745 - :ok = maybe_write_meta(group, commit_version, xks) 746 - end 755 + _ -> 756 + { 757 + [pack_queue_mutation(tag, commit_version, extent_index, position, entry_size, entry_checksum) | queue_acc], 758 + meta_acc, 759 + } 760 + end 747 761 748 - {commit_version, entry_kcv} 749 - end) 762 + do_replay_log(entries_rest, xks, extent_index, meta_acc, queue_acc, commit_version, known_committed_version) 763 + end 750 764 751 - assert restored_version >= startup_version 752 - assert restored_kcv >= startup_kcv 753 - {restored_version, restored_kcv} 765 + defp replay_apply_accs(_xks, _commit_version, [], []), do: :ok 766 + defp replay_apply_accs(%XKS{} = xks, commit_version, queue_acc, meta_acc) do 767 + partitioned_batches = [ 768 + {@queue_partition, Enum.reverse(queue_acc)}, 769 + {@meta_partition, Enum.reverse(meta_acc)}, 770 + ] 771 + XKS.set_max_persist_version(xks, commit_version) 772 + XKS.apply_partitioned_batches(xks, commit_version, partitioned_batches) 754 773 end 755 774 756 775 @min_entry_bytes 8