this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use new API for model workload

garrison 40ac0e0a 2aa8fc4e

+31 -71
+9 -3
lib/hobbes.ex
··· 24 24 end 25 25 26 26 try do 27 - fun.() 28 - commit(get_txn()) 27 + result = fun.() 28 + committed_txn = commit(get_txn()) 29 + 30 + if Keyword.get(opts, :return_transaction) do 31 + {result, committed_txn} 32 + else 33 + result 34 + end 29 35 after 30 36 delete_txn() 31 37 end ··· 40 46 41 47 defp commit(%TxnState{} = txn) do 42 48 case Transaction.commit(txn) do 43 - {:ok, %TxnState{} = _txn} -> :ok 49 + {:ok, %TxnState{} = txn} -> txn 44 50 {:error, _err} = error -> raise "Failed to commit transaction: #{inspect(error)}" 45 51 end 46 52 end
+22 -68
lib/workloads/model.ex
··· 1 1 defmodule Hobbes.Workloads.Model do 2 - alias Hobbes.Transaction 3 2 alias Hobbes.Transaction.TxnState 4 - alias Hobbes.Structs.Cluster 5 3 6 4 alias Hobbes.Workloads.Model.DatabaseModel 7 5 alias Hobbes.Workloads.Model.DatabaseModel.HistoryTxn ··· 18 16 19 17 defmodule State do 20 18 @enforce_keys [ 21 - :cluster, 19 + :cluster_name, 22 20 :tick_ms, 23 21 24 22 :history, ··· 29 27 30 28 def start_link(opts), do: SimServer.start_link(__MODULE__, opts) 31 29 32 - def init(%{cluster: %Cluster{} = cluster, tick_ms: tick_ms}) do 30 + def init(%{cluster_name: cluster_name, tick_ms: tick_ms}) do 33 31 state = %State{ 34 - cluster: cluster, 32 + cluster_name: cluster_name, 35 33 tick_ms: tick_ms, 36 34 history: [], 37 35 stopped: false, ··· 56 54 reads = random_reads(state) 57 55 mutations = random_mutations(state) 58 56 59 - with {:ok, %TxnState{} = txn} <- Transaction.new(state.cluster), 60 - {:ok, {%TxnState{} = txn, read_results}} <- do_reads(txn, reads) 61 - do 62 - txn = add_mutations(txn, mutations) 57 + try do 58 + {read_results, %TxnState{} = committed_txn} = 59 + Hobbes.transaction(state.cluster_name, fn -> 60 + read_results = Enum.map(reads, fn 61 + {:read, keys} = read -> {read, Hobbes.read(keys)} 62 + {:read_range, {sk, ek}} = read -> {read, Hobbes.read_range(sk, ek)} 63 + end) 63 64 64 - # Ensure some transactions receive :transaction_too_old 65 - if Enum.random(1..10) == 1 do 66 - SimProcess.sleep(6_000) 67 - end 68 - 69 - case Transaction.commit(txn) do 70 - {:ok, %TxnState{} = txn} -> 71 - append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations)) 72 - 73 - {:error, {err, %TxnState{} = txn}} when err in [:transaction_too_old, :read_conflict] -> 74 - append_history(state, HistoryTxn.new(err, txn, read_results, mutations)) 65 + Enum.each(mutations, fn 66 + {:write, key, value} -> Hobbes.write(key, value) 67 + {:clear_range, sk, ek} -> Hobbes.clear_range(sk, ek) 68 + end) 75 69 76 - # TODO 77 - {:error, :timeout} -> raise "Timeout not supported" 78 - end 79 - else 80 - # Transaction.new() timed out 81 - {:error, :timeout} -> state 70 + read_results 71 + end, return_transaction: true) 82 72 83 - # Read errors 84 - # TODO: we still want to check these reads 85 - {:error, {%TxnState{} = _txn, _results}} -> state 73 + append_history(state, HistoryTxn.new(:committed, committed_txn, read_results, mutations)) 74 + rescue RuntimeError -> 75 + # TODO: track read_conflict transactions 76 + state 86 77 end 87 78 end 88 79 ··· 90 81 %{state | history: [ht | state.history]} 91 82 end 92 83 93 - @spec add_mutations(TxnState.t, list) :: TxnState.t 94 - defp add_mutations(%TxnState{} = txn, mutations) do 95 - Enum.reduce(mutations, txn, fn 96 - {:write, key, value}, txn -> Transaction.write(txn, key, value) 97 - {:clear_range, sk, ek}, txn -> Transaction.clear_range(txn, sk, ek) 98 - end) 99 - end 100 - 101 - @spec do_reads(TxnState.t, [{binary, binary} | binary]) :: {:ok, {TxnState.t, list}} | {:error, {TxnState.t, list}} 102 - defp do_reads(%TxnState{} = txn, reads) when is_list(reads) do 103 - Enum.reduce_while(reads, {txn, []}, fn 104 - {:read_range, {sk, ek}} = read, {txn, acc} -> 105 - case Transaction.read_range(txn, sk, ek) do 106 - {:ok, {pairs, txn}} -> 107 - acc = [{read, pairs} | acc] 108 - {:cont, {txn, acc}} 109 - {:error, _err} -> 110 - {:halt, {:error, {txn, acc}}} 111 - end 112 - 113 - {:read, keys} = read, {txn, acc} -> 114 - case Transaction.read(txn, keys) do 115 - {:ok, {result, txn}} -> 116 - acc = [{read, result} | acc] 117 - {:cont, {txn, acc}} 118 - {:error, _err} -> 119 - {:halt, {:error, {txn, acc}}} 120 - end 121 - end) 122 - |> case do 123 - {:error, {_txn, _acc}} = error -> 124 - error 125 - {%TxnState{} = txn, results} when is_list(results) -> 126 - {:ok, {txn, results}} 127 - end 128 - end 129 - 130 84 defp random_mutations(%State{} = state) do 131 85 count = Enum.random(1..10) 132 86 Enum.map(1..count, fn _i -> ··· 174 128 end 175 129 end 176 130 177 - def run(%{cluster: %Cluster{} = cluster}, opts) do 131 + def run(%{cluster_name: cluster_name}, opts) do 178 132 client_count = Keyword.get(opts, :clients, 10) 179 133 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 180 134 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 181 135 182 136 clients = 183 137 Enum.map(1..client_count, fn _i -> 184 - {:ok, pid} = Client.start_link(%{cluster: cluster, tick_ms: client_tick_ms}) 138 + {:ok, pid} = Client.start_link(%{cluster_name: cluster_name, tick_ms: client_tick_ms}) 185 139 pid 186 140 end) 187 141