this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Encode tag into entry directly and refactor

garrison e4c8fddd 0f64ba00

+46 -36
+46 -36
lib/servers/tlog.ex
··· 564 564 XKS.Reader.scan(xks, @queue_partition, version, start_key, end_key) 565 565 |> Enum.map(fn {key, value} -> 566 566 [^tag, ver] = Keyset.unpack(key) 567 - [extent, position, entry_size, checksum] = Keyset.unpack(value) 567 + [extent_index, position, entry_size, checksum] = Keyset.unpack(value) 568 + assert ver >= 0 569 + assert extent_index >= 0 570 + assert entry_size >= 0 568 571 569 - entry_data = XKS.Blocks.read_extent(block_store, extent, position, entry_size) 572 + entry_data = XKS.Blocks.read_extent(block_store, extent_index, position, entry_size) 570 573 checksummed_size = byte_size(entry_data) - 16 571 574 << 572 575 checksummed_entry_data::binary-size(checksummed_size), ··· 575 578 assert XKS.Blocks.checksum(checksummed_entry_data) == checksum 576 579 577 580 << 578 - group_size::integer-64, 579 - group_commit_version::integer-64, 581 + mutations_size::integer-64, 582 + entry_tag::signed-integer-64, 583 + entry_commit_version::integer-64, 580 584 _known_committed_version::integer-64, 581 585 _prev_checksum::binary-16, 582 - group_data::binary-size(group_size), 586 + mutations_data::binary-size(mutations_size), 583 587 _checksum::binary-16, 584 588 >> = entry_data 585 - assert group_commit_version == ver 589 + assert entry_tag == tag 590 + assert entry_commit_version == ver 586 591 587 592 # :safe is not really necessary here, but we may as well 588 593 # We're going to switch to a custom encoding anyway 589 - {_tag, mutations} = :erlang.binary_to_term(group_data, [:safe]) 590 - {ver, mutations} 594 + entry_mutations = :erlang.binary_to_term(mutations_data, [:safe]) 595 + assert is_list(entry_mutations) 596 + 597 + {ver, entry_mutations} 591 598 end) 592 599 end 593 600 ··· 679 686 end) 680 687 end 681 688 682 - # group_size + commit_version + known_committed_version + prev_checksum + entry_checksum 683 - defmacrop c_entry_overhead_bytes, do: 8 + 8 + 8 + 16 + 16 689 + # mutations_size + tag + commit_version + known_committed_version + prev_checksum + entry_checksum 690 + defmacrop c_entry_overhead_bytes, do: 8 + 8 + 8 + 8 + 16 + 16 684 691 685 692 defp do_append_groups([], %State{} = state, _commit_version, _known_committed_version, state_acc, queue_acc, meta_acc) do 686 693 {state, {state_acc, queue_acc, meta_acc}} ··· 690 697 %{xks: xks} = state 691 698 opt_extent_size = xks.opts.extent_block_count * xks.opts.block_size 692 699 693 - {tag, group_mutations} = group 694 - group_data = :erlang.term_to_binary(group) 695 - group_size = byte_size(group_data) 696 - entry_size = c_entry_overhead_bytes() + group_size 700 + {tag, mutations} = group 701 + mutations_data = :erlang.term_to_binary(mutations) 702 + mutations_size = byte_size(mutations_data) 703 + entry_size = mutations_size + c_entry_overhead_bytes() 697 704 698 705 # Rotate extent if needed 699 706 {state, state_acc} = ··· 706 713 end 707 714 708 715 # Encode log entry 709 - {entry_data, entry_checksum} = encode_entry(group_data, state.prev_checksum, commit_version, known_committed_version) 716 + {entry_data, entry_checksum} = encode_entry(tag, mutations_data, commit_version, known_committed_version, state.prev_checksum) 710 717 assert byte_size(entry_data) == entry_size 711 718 assert entry_size <= opt_extent_size 712 719 ··· 719 726 {queue_acc, meta_acc} = 720 727 case tag do 721 728 blank_tag() -> 722 - # Blank group, don't write to queue 729 + # Blank log entry to delimit the end of the batch, don't write to queue 723 730 {queue_acc, meta_acc} 724 731 725 732 meta_tag() -> 726 733 { 727 734 [pack_queue_mutation(tag, commit_version, extent_index, position, entry_size, entry_checksum) | queue_acc], 728 - Enum.reverse(group_mutations, meta_acc), 735 + Enum.reverse(mutations, meta_acc), 729 736 } 730 737 731 738 _ -> ··· 743 750 do_append_groups(groups_rest, state, commit_version, known_committed_version, state_acc, queue_acc, meta_acc) 744 751 end 745 752 746 - defp encode_entry(group_data, prev_checksum, commit_version, known_committed_version) do 747 - group_size = byte_size(group_data) 753 + defp encode_entry(tag, mutations_data, commit_version, known_committed_version, prev_checksum) do 754 + mutations_size = byte_size(mutations_data) 748 755 entry_data = << 749 - group_size::integer-64, 756 + mutations_size::integer-64, 757 + tag::signed-integer-64, 750 758 commit_version::integer-64, 751 759 known_committed_version::integer-64, 752 760 prev_checksum::binary-16, 753 - group_data::binary, 761 + mutations_data::binary, 754 762 >> 755 763 entry_checksum = XKS.Blocks.checksum(entry_data) 756 764 ··· 816 824 position: position, 817 825 size: entry_size, 818 826 entry_checksum: entry_checksum, 819 - group_data: group_data, 827 + mutations_data: mutations_data, 820 828 } = entry 821 829 822 830 case queue_acc do ··· 837 845 meta_tag() -> 838 846 # This entry contains meta mutations, so in addition to the queue we write the mutations 839 847 # to the meta partition 840 - {^tag, group_mutations} = :erlang.binary_to_term(group_data) 848 + entry_mutations = :erlang.binary_to_term(mutations_data, [:safe]) 841 849 { 842 850 [pack_queue_mutation(tag, commit_version, extent_index, position, entry_size, entry_checksum) | queue_acc], 843 - Enum.reverse(group_mutations, meta_acc), 851 + Enum.reverse(entry_mutations, meta_acc), 844 852 last_written_versions, 845 853 } 846 854 ··· 875 883 876 884 defp do_read_log(data, above_version, prev_checksum, acc, position) do 877 885 << 878 - group_size::integer-64, 886 + mutations_size::integer-64, 879 887 _rest::binary, 880 888 >> = data 881 889 890 + entry_size = mutations_size + c_entry_overhead_bytes() 882 891 cond do 883 - # First, sanity-check the group size 892 + # First, sanity-check the entry size 884 893 # If the value is unreasonable we have reached the end of the log 885 - group_size < @min_entry_bytes -> 894 + entry_size < @min_entry_bytes -> 886 895 acc 887 - group_size > @max_entry_bytes -> 896 + entry_size > @max_entry_bytes -> 888 897 acc 889 898 true -> 890 - entry_size = group_size + c_entry_overhead_bytes() 891 899 << 892 900 entry_data::binary-size(entry_size), 893 901 rest::binary, 894 902 >> = data 895 903 896 904 # The group size was reasonable, so we try to decode the entry 897 - case decode_entry(entry_data, group_size, prev_checksum, position) do 905 + case decode_entry(entry_data, mutations_size, prev_checksum, position) do 898 906 false -> 899 907 # If decoding failed (wrong checksum), we have reached the end of the log 900 908 acc 909 + 901 910 entry when is_map(entry) -> 902 911 # The entry is valid 903 912 acc = case entry.commit_version > above_version do ··· 910 919 end 911 920 end 912 921 913 - defp decode_entry(entry_data, group_size, prev_checksum, position) do 922 + defp decode_entry(entry_data, mutations_size, prev_checksum, position) do 914 923 checksummed_size = byte_size(entry_data) - 16 915 924 << 916 925 checksummed_data::binary-size(checksummed_size), ··· 922 931 true -> 923 932 # The entry is (internally) consistent, decode it 924 933 << 925 - _group_size::integer-64, 934 + entry_mutations_size::integer-64, 935 + tag::signed-integer-64, 926 936 commit_version::integer-64, 927 937 known_committed_version::integer-64, 928 938 entry_prev_checksum::binary-16, 929 - group_data::binary-size(group_size), 939 + mutations_data::binary-size(mutations_size), 930 940 _entry_checksum::binary-16, 931 941 >> = entry_data 942 + assert entry_mutations_size == mutations_size 932 943 933 944 case entry_prev_checksum == prev_checksum do 934 945 true -> 935 - # TODO: read from entry_data 936 - {tag, _mutations} = :erlang.binary_to_term(group_data, [:safe]) 937 946 # This entry is valid because it maintains the hash chain 947 + # TODO: no need to include mutations_data unless `tag == meta_tag()` 938 948 %{ 939 949 tag: tag, 940 950 commit_version: commit_version, ··· 942 952 entry_checksum: entry_checksum, 943 953 position: position, 944 954 size: byte_size(entry_data), 945 - group_data: group_data, 955 + mutations_data: mutations_data, 946 956 } 947 957 948 958 false ->