this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Check commit errors in model workload

garrison dbb7284f b5b536cb

+81 -20
+5
lib/workloads/model.ex
··· 56 56 do 57 57 txn = add_mutations(txn, mutations) 58 58 59 + # Ensure some transactions receive :transaction_too_old 60 + if Enum.random(1..10) == 1 do 61 + SimServer.sleep(6_000) 62 + end 63 + 59 64 case Transaction.commit(txn) do 60 65 {:ok, %TxnState{} = txn} -> 61 66 append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations))
+76 -20
lib/workloads/model/database_model.ex
··· 2 2 alias Hobbes.Transaction.TxnState 3 3 alias Hobbes.Structs.RangeResult 4 4 alias Hobbes.KV.TestKV 5 + alias Hobbes.TestVersionMap 6 + 7 + import Hobbes.Utils 5 8 6 9 alias Hobbes.Workloads.Model.DatabaseModel 7 10 ··· 37 40 38 41 @type t :: %__MODULE__{ 39 42 kv: TestKV.t, 43 + vm: TestVersionMap.t, 40 44 } 41 - @enforce_keys [:kv] 45 + @enforce_keys [:kv, :vm] 42 46 defstruct @enforce_keys 43 47 44 48 def new do 45 49 %DatabaseModel{ 46 50 kv: TestKV.new(), 51 + vm: TestVersionMap.new(), 47 52 } 48 53 end 49 54 50 - def validate_history(%DatabaseModel{kv: kv} = _dbm, history) when is_list(history) do 55 + def validate_history(%DatabaseModel{} = dm, history) when is_list(history) do 51 56 history = 52 57 Enum.sort_by(history, fn %HistoryTxn{} = ht -> 53 58 {ht.commit_version, ht.batch_index} 54 59 end) 55 60 56 - Enum.reduce(history, kv, fn %HistoryTxn{} = txn, kv -> 57 - validate_transaction(kv, txn) 61 + Enum.reduce(history, dm, fn %HistoryTxn{} = txn, dm -> 62 + validate_transaction(dm, txn) 58 63 end) 59 64 end 60 65 61 - defp validate_transaction(%TestKV{} = kv, %HistoryTxn{} = txn) do 62 - :ok = validate_reads(kv, txn) 66 + defp validate_transaction(%DatabaseModel{} = dm, %HistoryTxn{} = txn) do 67 + :ok = validate_reads(dm.kv, txn) 68 + 69 + validate_status(txn.status, dm, txn) 70 + end 71 + 72 + defp validate_status(:committed, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do 73 + commit_version = txn.commit_version 74 + assert commit_version 75 + # TODO: check for read conflict 63 76 64 77 kv = 65 - case txn.status do 66 - :committed -> 67 - apply_mutations(kv, txn) 78 + Enum.reduce(txn.mutations, dm.kv, fn 79 + {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v) 80 + # TODO: :clear, :clear_range 81 + end) 68 82 69 - # TODO: validate :read_conflict and :transaction_too_old are correct 70 - _ -> kv 71 - end 83 + write_conflicts = 84 + Enum.map(txn.mutations, fn 85 + {:write, k, _v} -> k 86 + # TODO: :clear, :clear_range 87 + end) 88 + vm = TestVersionMap.add_writes(dm.vm, commit_version, write_conflicts) 72 89 73 - kv 90 + %{dm | kv: kv, vm: vm} 74 91 end 75 92 76 - defp apply_mutations(%TestKV{} = kv, %HistoryTxn{} = txn) do 77 - assert txn.status == :committed 78 - assert txn.commit_version 93 + defp validate_status(:read_conflict, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do 94 + reads = 95 + Enum.map(txn.read_results, fn 96 + {{:read, keys}, _results} -> keys 97 + {{:read_range, {_sk, _ek} = range}, _result} -> [range] 98 + end) 99 + |> Enum.concat() 79 100 80 - commit_version = txn.commit_version 101 + read_version = txn.read_version 102 + vm = dm.vm 81 103 82 - Enum.reduce(txn.mutations, kv, fn 83 - {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v) 84 - end) 104 + read_conflict? = 105 + Enum.any?(reads, fn key_or_range -> 106 + TestVersionMap.written_after?(vm, read_version, key_or_range) 107 + end) 108 + 109 + # TODO: this check will probably have to be disabled for sharded resolvers 110 + if read_conflict? != true do 111 + raise """ 112 + Transaction was rejected for :read_conflict but there was no read conflict! 113 + 114 + Read version: #{inspect(txn.read_version)} 115 + Commit version: #{inspect(txn.commit_version)} 116 + 117 + Reads: 118 + #{inspect(reads, pretty: true)} 119 + """ 120 + end 121 + 122 + dm 123 + end 124 + 125 + defp validate_status(:transaction_too_old, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do 126 + read_version_floor = txn.commit_version - mvcc_window() 127 + allow? = txn.read_version > read_version_floor 128 + 129 + if allow? != false do 130 + raise """ 131 + Transaction was rejected for :transaction_too_old but the read version was not too old! 132 + 133 + Read version: #{inspect(txn.read_version)} 134 + RV floor: #{inspect(read_version_floor)} 135 + 136 + Commit version: #{inspect(txn.commit_version)} 137 + """ 138 + end 139 + 140 + dm 85 141 end 86 142 87 143 defp validate_reads(%TestKV{} = kv, %HistoryTxn{} = txn) do