this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add model checker workload

garrison c5c52568 2032b485

+229
+215
lib/workloads/model.ex
··· 1 + defmodule Hobbes.Workloads.Model do 2 + alias Hobbes.Transaction 3 + alias Hobbes.Transaction.TxnState 4 + alias Hobbes.Structs.Cluster 5 + alias Hobbes.Construct.SimServer 6 + 7 + @behaviour Hobbes.Workloads.Workload 8 + 9 + defmodule HistoryTxn do 10 + @enforce_keys [ 11 + :status, 12 + 13 + :read_version, 14 + :commit_version, 15 + :batch_index, 16 + 17 + :read_results, 18 + :mutations, 19 + ] 20 + defstruct @enforce_keys 21 + 22 + def new(status, %TxnState{} = txn, read_results, mutations) 23 + when is_atom(status) and is_list(read_results) and is_list(mutations) do 24 + %HistoryTxn{ 25 + status: status, 26 + 27 + read_version: txn.read_version, 28 + commit_version: txn.commit_version, 29 + # TODO 30 + batch_index: 0, 31 + 32 + read_results: read_results, 33 + mutations: mutations, 34 + } 35 + end 36 + end 37 + 38 + defmodule Client do 39 + use Hobbes.Construct.SimServer 40 + 41 + import Hobbes.Utils 42 + 43 + defmodule State do 44 + @enforce_keys [ 45 + :cluster, 46 + :tick_ms, 47 + 48 + :history, 49 + ] 50 + defstruct @enforce_keys 51 + end 52 + 53 + def start_link(opts), do: SimServer.start_link(__MODULE__, opts) 54 + 55 + def init(%{cluster: %Cluster{} = cluster, tick_ms: tick_ms}) do 56 + state = %State{ 57 + cluster: cluster, 58 + tick_ms: tick_ms, 59 + history: [], 60 + } 61 + SimServer.send self(), :tick 62 + {:ok, state} 63 + end 64 + 65 + def handle_call(:stop, _from, %State{} = state) do 66 + {:reply, %{history: state.history}, state} 67 + end 68 + 69 + def handle_info(:tick, %State{} = state) do 70 + state = tick(state) 71 + SimServer.send_after self(), :tick, state.tick_ms 72 + {:noreply, state} 73 + end 74 + 75 + defp tick(%State{} = state) do 76 + reads = random_reads(state) 77 + mutations = random_mutations(state) 78 + 79 + with {:ok, %TxnState{} = txn} <- Transaction.new(state.cluster), 80 + {:ok, {%TxnState{} = txn, read_results}} <- do_reads(txn, reads) 81 + do 82 + txn = add_mutations(txn, mutations) 83 + 84 + case Transaction.commit(txn) do 85 + {:ok, %TxnState{} = txn} -> 86 + append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations)) 87 + 88 + {:error, :read_conflict} -> 89 + append_history(state, HistoryTxn.new(:read_conflict, txn, read_results, mutations)) 90 + # TODO: more errors 91 + end 92 + else 93 + # TODO: new() errors, read() errors 94 + error -> raise "Error: #{inspect(error)}" 95 + end 96 + end 97 + 98 + defp append_history(%State{} = state, %HistoryTxn{} = ht) do 99 + %{state | history: [ht | state.history]} 100 + end 101 + 102 + @spec add_mutations(TxnState.t, list) :: TxnState.t 103 + defp add_mutations(%TxnState{} = txn, mutations) do 104 + Enum.reduce(mutations, txn, fn 105 + {:write, key, value}, txn -> Transaction.write(txn, key, value) 106 + end) 107 + end 108 + 109 + @spec do_reads(TxnState.t, [{binary, binary} | binary]) :: {:ok, {TxnState.t, list}} | {:error, {TxnState.t, list}} 110 + defp do_reads(%TxnState{} = txn, reads) when is_list(reads) do 111 + Enum.reduce_while(reads, {txn, []}, fn 112 + {:read_range, {sk, ek}} = read, {txn, acc} -> 113 + case Transaction.read_range(txn, sk, ek) do 114 + {:ok, {pairs, txn}} -> 115 + acc = [{read, pairs} | acc] 116 + {:cont, {txn, acc}} 117 + {:error, _err} -> 118 + {:halt, {:error, {txn, acc}}} 119 + end 120 + 121 + {:read, keys} = read, {txn, acc} -> 122 + case Transaction.read(txn, keys) do 123 + {:ok, {result, txn}} -> 124 + acc = [{read, result} | acc] 125 + {:cont, {txn, acc}} 126 + {:error, _err} -> 127 + {:halt, {:error, {txn, acc}}} 128 + end 129 + end) 130 + |> case do 131 + {:error, {_txn, _acc}} = error -> 132 + error 133 + {%TxnState{} = txn, results} when is_list(results) -> 134 + {:ok, {txn, results}} 135 + end 136 + end 137 + 138 + defp random_mutations(%State{} = state) do 139 + count = Enum.random(1..10) 140 + Enum.map(1..count, fn _i -> 141 + case Enum.random(1..4) do 142 + # TODO: range clears 143 + _ -> 144 + {:write, random_key(state), random_key(state)} 145 + end 146 + end) 147 + end 148 + 149 + defp random_reads(%State{} = state) do 150 + count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10]) 151 + Enum.map(1..count, fn _i -> 152 + case Enum.random(1..4) do 153 + 1 -> 154 + {:read_range, random_range(state)} 155 + _ -> 156 + key_count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10]) 157 + {:read, Enum.map(1..key_count, fn _i -> random_key(state) end)} 158 + end 159 + end) 160 + end 161 + 162 + defp random_range(%State{} = state) do 163 + k1 = random_key(state) 164 + k2 = random_key(state) 165 + cond do 166 + k1 < k2 -> {k1, k2} 167 + k1 > k2 -> {k2, k1} 168 + k1 == k2 -> {k1, next_key(k1)} 169 + end 170 + end 171 + 172 + defp random_key(%State{} = _state) do 173 + case Enum.random(1..10) do 174 + 1 -> "" 175 + 2 -> "\xFF" 176 + _ -> Enum.random(1..100) |> Integer.to_string() |> String.pad_leading(3, "0") 177 + end 178 + end 179 + end 180 + 181 + def run(%{cluster: %Cluster{} = cluster}, opts) do 182 + client_count = Keyword.get(opts, :clients, 10) 183 + duration_ms = Keyword.get(opts, :duration_ms, 10_000) 184 + client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 185 + 186 + clients = 187 + Enum.map(1..client_count, fn _i -> 188 + {:ok, pid} = Client.start_link(%{cluster: cluster, tick_ms: client_tick_ms}) 189 + pid 190 + end) 191 + 192 + SimServer.sleep(duration_ms) 193 + 194 + results = 195 + clients 196 + |> Enum.map(&SimServer.send_request(&1, :stop)) 197 + |> Enum.map(&SimServer.receive_response/1) 198 + |> Enum.map(fn {:reply, reply} -> reply end) 199 + 200 + history = 201 + results 202 + |> Enum.map(fn %{history: history} -> history end) 203 + |> Enum.concat() 204 + |> Enum.sort_by(fn %HistoryTxn{} = ht -> 205 + {ht.commit_version, ht.batch_index} 206 + end) 207 + 208 + { 209 + :ok, 210 + """ 211 + Checked #{length(history)} transactions from #{client_count} clients. 212 + """ 213 + } 214 + end 215 + end
+14
test/hobbes_test.exs
··· 42 42 end 43 43 end 44 44 45 + defmodule ModelWorkloadTest do 46 + use ExUnit.Case, async: true 47 + @tag :model 48 + test "Model", %{test: test} do 49 + Workloads.run([ 50 + {Workloads.Model, [ 51 + clients: 10, 52 + duration_ms: 10_000, 53 + client_tick_ms: 100, 54 + ]}, 55 + ], HobbesTest.SimOpts.sim_opts(name: test, preset: :cycle)) 56 + end 57 + end 58 + 45 59 defmodule CycleWorkloadTest do 46 60 use ExUnit.Case, async: true 47 61 @tag :cycle