this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Validate transaction history in model workload

garrison bd132dbc 2190784d

+139 -35
+1 -1
ROADMAP.md
··· 31 31 - [ ] Fault injection 32 32 - [ ] Kill/Restart nodes repeatedly 33 33 - [ ] Silently and randomly drop messages between nodes 34 - - [ ] Add a full model-checker workload to verify strict-serializability of all operations 34 + - [X] Add a full model-checker workload to verify strict-serializability of all operations 35 35 - [ ] Add liveness tests (check cluster still responds after nodes are restored) 36 36 37 37 ### Documentation
+14 -34
lib/workloads/model.ex
··· 2 2 alias Hobbes.Transaction 3 3 alias Hobbes.Transaction.TxnState 4 4 alias Hobbes.Structs.Cluster 5 - alias Hobbes.Construct.SimServer 6 - 7 - @behaviour Hobbes.Workloads.Workload 8 - 9 - defmodule HistoryTxn do 10 - @enforce_keys [ 11 - :status, 12 - 13 - :read_version, 14 - :commit_version, 15 - :batch_index, 16 - 17 - :read_results, 18 - :mutations, 19 - ] 20 - defstruct @enforce_keys 21 5 22 - def new(status, %TxnState{} = txn, read_results, mutations) 23 - when is_atom(status) and is_list(read_results) and is_list(mutations) do 24 - %HistoryTxn{ 25 - status: status, 6 + alias Hobbes.Construct.SimServer 26 7 27 - read_version: txn.read_version, 28 - commit_version: txn.commit_version, 29 - batch_index: txn.batch_index, 8 + alias Hobbes.Workloads.Model.DatabaseModel 9 + alias Hobbes.Workloads.Model.DatabaseModel.HistoryTxn 30 10 31 - read_results: read_results, 32 - mutations: mutations, 33 - } 34 - end 35 - end 11 + @behaviour Hobbes.Workloads.Workload 36 12 37 13 defmodule Client do 38 14 use Hobbes.Construct.SimServer ··· 91 67 {:error, :timeout} -> raise "Timeout not supported" 92 68 end 93 69 else 94 - # TODO: new() errors, read() errors 95 - error -> raise "Error: #{inspect(error)}" 70 + # Transaction.new() timed out 71 + {:error, :timeout} -> state 72 + 73 + # Read errors 74 + # TODO: we still want to check these reads 75 + {:error, {%TxnState{} = _txn, _results}} -> state 96 76 end 97 77 end 98 78 ··· 195 175 results = 196 176 clients 197 177 |> Enum.map(&SimServer.send_request(&1, :stop)) 198 - |> Enum.map(&SimServer.receive_response/1) 178 + |> Enum.map(&SimServer.receive_response(&1, 300_000)) 199 179 |> Enum.map(fn {:reply, reply} -> reply end) 200 180 201 181 history = 202 182 results 203 183 |> Enum.map(fn %{history: history} -> history end) 204 184 |> Enum.concat() 205 - |> Enum.sort_by(fn %HistoryTxn{} = ht -> 206 - {ht.commit_version, ht.batch_index} 207 - end) 185 + 186 + db_model = DatabaseModel.new() 187 + DatabaseModel.validate_history(db_model, history) 208 188 209 189 { 210 190 :ok,
+124
lib/workloads/model/database_model.ex
··· 1 + defmodule Hobbes.Workloads.Model.DatabaseModel do 2 + alias Hobbes.Transaction.TxnState 3 + alias Hobbes.Structs.RangeResult 4 + alias Hobbes.KV.TestKV 5 + 6 + alias Hobbes.Workloads.Model.DatabaseModel 7 + 8 + import ExUnit.Assertions, only: [assert: 1] 9 + 10 + defmodule HistoryTxn do 11 + @enforce_keys [ 12 + :status, 13 + 14 + :read_version, 15 + :commit_version, 16 + :batch_index, 17 + 18 + :read_results, 19 + :mutations, 20 + ] 21 + defstruct @enforce_keys 22 + 23 + def new(status, %TxnState{} = txn, read_results, mutations) 24 + when is_atom(status) and is_list(read_results) and is_list(mutations) do 25 + %HistoryTxn{ 26 + status: status, 27 + 28 + read_version: txn.read_version, 29 + commit_version: txn.commit_version, 30 + batch_index: txn.batch_index, 31 + 32 + read_results: read_results, 33 + mutations: mutations, 34 + } 35 + end 36 + end 37 + 38 + @type t :: %__MODULE__{ 39 + kv: TestKV.t, 40 + } 41 + @enforce_keys [:kv] 42 + defstruct @enforce_keys 43 + 44 + def new do 45 + %DatabaseModel{ 46 + kv: TestKV.new(), 47 + } 48 + end 49 + 50 + def validate_history(%DatabaseModel{kv: kv} = _dbm, history) when is_list(history) do 51 + history = 52 + Enum.sort_by(history, fn %HistoryTxn{} = ht -> 53 + {ht.commit_version, ht.batch_index} 54 + end) 55 + 56 + Enum.reduce(history, kv, fn %HistoryTxn{} = txn, kv -> 57 + validate_transaction(kv, txn) 58 + end) 59 + end 60 + 61 + defp validate_transaction(%TestKV{} = kv, %HistoryTxn{} = txn) do 62 + :ok = validate_reads(kv, txn) 63 + 64 + kv = 65 + case txn.status do 66 + :committed -> 67 + apply_mutations(kv, txn) 68 + 69 + # TODO: validate :read_conflict and :transaction_too_old are correct 70 + _ -> kv 71 + end 72 + 73 + kv 74 + end 75 + 76 + defp apply_mutations(%TestKV{} = kv, %HistoryTxn{} = txn) do 77 + assert txn.status == :committed 78 + assert txn.commit_version 79 + 80 + commit_version = txn.commit_version 81 + 82 + Enum.reduce(txn.mutations, kv, fn 83 + {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v) 84 + end) 85 + end 86 + 87 + defp validate_reads(%TestKV{} = kv, %HistoryTxn{} = txn) do 88 + read_version = txn.read_version 89 + 90 + Enum.each(txn.read_results, fn 91 + {{:read, keys}, results} -> 92 + Enum.each(keys, fn key -> 93 + expected = Map.fetch!(results, key) 94 + received = TestKV.get(kv, read_version, key) 95 + 96 + if expected != received do 97 + read_error(kv, txn, key, expected, received) 98 + end 99 + end) 100 + 101 + {{:read_range, {sk, ek} = range}, expected} -> 102 + %RangeResult{pairs: received} = TestKV.scan(kv, read_version, sk, ek) 103 + 104 + if expected != received do 105 + read_error(kv, txn, range, expected, received) 106 + end 107 + end) 108 + 109 + :ok 110 + end 111 + 112 + defp read_error(%TestKV{} = _kv, %HistoryTxn{} = txn, read, expected, received) do 113 + raise """ 114 + Error while validating transaction: read results do not match! 115 + 116 + Read version: #{inspect(txn.read_version)} 117 + 118 + Key/Range: #{inspect(read)} 119 + 120 + Expected: #{inspect(expected, pretty: true)} 121 + Received: #{inspect(received, pretty: true)} 122 + """ 123 + end 124 + end