this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix coordinator not persisting writes

garrison 34f9225c 1a54abce

+22 -56
+16 -56
lib/servers/coordinator.ex
··· 324 324 last_normal_view_number: 0, 325 325 op_number: -1, 326 326 commit_number: -1, 327 - version: 1, 327 + version: 0, 328 328 }) 329 329 |> XKS.commit() 330 330 end ··· 510 510 end 511 511 512 512 defp load_checkpoint(%State{} = state, checkpoint) when is_list(checkpoint) do 513 - %{xks: xks, version: prev_version} = state 514 - 515 513 mutations = Enum.map(checkpoint, fn {k, v} -> 516 514 {:write, k, v} 517 515 end) 518 516 mutations = [{:clear_range, "", special_prefix()} | mutations] 519 517 520 - version = prev_version + 1 521 - XKS.apply_batch(xks, @state_partition, version, mutations) 522 - %{state | version: version} 518 + apply_batch_to_storage(state, mutations) 523 519 end 524 520 525 521 @spec get_coordinator_state(XKS.t) :: map ··· 592 588 defp decode_status(1), do: :view_change 593 589 594 590 @spec commit_storage(State.t) :: State.t 595 - defp commit_storage(%State{xks: xks} = state) do 591 + defp commit_storage(%State{} = state) do 592 + assert XKS.get_max_persist_version(state.xks) == state.version 593 + 596 594 xks = 597 - xks 595 + state.xks 598 596 |> put_coordinator_state(Map.take(state, [:id, :status, :view_number, :last_normal_view_number, :op_number, :commit_number, :version])) 599 597 |> XKS.commit() 598 + 600 599 %{state | xks: xks} 601 600 end 602 601 603 - @persist_keys [:id, :status, :view_number, :last_normal_view_number, :op_number, :commit_number] 604 - 605 - # TODO: remove 606 - defp save_state_and_commit(%State{} = state), do: commit_storage(state) 607 - 608 - defp save_state_and_commit(%State{} = state) do 602 + @spec apply_batch_to_storage(State.t, list) :: State.t 603 + defp apply_batch_to_storage(%State{} = state, mutations) do 609 604 %{xks: xks, version: prev_version} = state 610 - 611 - mutations = Enum.map(@persist_keys, fn key -> 612 - value = Map.fetch!(state, key) 613 - { 614 - :write, 615 - special_prefix() <> "/" <> Atom.to_string(key), 616 - serialize(value), 617 - } 618 - end) 619 - 620 605 version = prev_version + 1 621 - XKS.apply_batch(xks, @state_partition, version, mutations) 622 606 623 - state = %{state | version: version} 624 - commit_storage(state) 625 - end 607 + :ok = XKS.set_max_persist_version(xks, version) 608 + :ok = XKS.apply_batch(xks, @state_partition, version, mutations) 626 609 627 - @spec load_state(XKS.t, non_neg_integer) :: {:ok, map} | :error 628 - defp load_state(%XKS{} = xks, version) do 629 - case XKS.Reader.get(xks, @state_partition, version, special_prefix() <> "/id") do 630 - nil -> 631 - :error 632 - 633 - _value -> 634 - map = Map.new(@persist_keys, fn key -> 635 - value = XKS.Reader.get(xks, @state_partition, version, special_prefix() <> "/" <> Atom.to_string(key)) 636 - assert is_binary(value) 637 - {key, deserialize(key, value)} 638 - end) 639 - {:ok, map} 640 - end 610 + %{state | version: version} 641 611 end 642 612 643 - defp serialize(:normal), do: "normal" 644 - defp serialize(:view_change), do: "view_change" 645 - defp serialize(integer) when is_integer(integer), do: Integer.to_string(integer) 646 - 647 - defp deserialize(:status, "normal"), do: :normal 648 - defp deserialize(:status, "view_change"), do: :view_change 649 - defp deserialize(key, int_str) when key in [:id, :view_number, :last_normal_view_number, :op_number, :commit_number] do 650 - String.to_integer(int_str) 651 - end 613 + # TODO: remove 614 + defp save_state_and_commit(%State{} = state), do: commit_storage(state) 652 615 653 616 # Commands 654 617 ··· 666 629 {writes, result} 667 630 end 668 631 632 + @spec apply_writes(State.t, list) :: State.t 669 633 defp apply_writes(%State{} = state, writes) when is_list(writes) do 670 - %{xks: xks} = state 671 - version = state.version + 1 672 634 mutations = Enum.map(writes, fn {k, v} -> {:write, k, v} end) 673 - XKS.apply_batch(xks, @state_partition, version, mutations) 674 - 675 - %{state | version: version} 635 + apply_batch_to_storage(state, mutations) 676 636 end 677 637 678 638 # Request
+6
lib/xks/xks.ex
··· 337 337 :atomics.put(discard_below_version_atomic, 1, version) 338 338 end 339 339 340 + @spec get_max_persist_version(t) :: non_neg_integer 341 + def get_max_persist_version(%XKS{} = xks) do 342 + %{max_persist_version_atomic: max_persist_version_atomic} = xks 343 + :atomics.get(max_persist_version_atomic, 1) 344 + end 345 + 340 346 @spec set_max_persist_version(t, non_neg_integer) :: :ok 341 347 def set_max_persist_version(%XKS{} = xks, version) do 342 348 %{max_persist_version_atomic: max_persist_version_atomic} = xks