this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use new API for cycle

garrison c898a365 ac85ad8a

+40 -29
+2
lib/hobbes.ex
··· 48 48 case Transaction.commit(txn) do 49 49 {:ok, %TxnState{} = txn} -> 50 50 txn 51 + {:error, :timeout} = error -> 52 + raise(Hobbes.CommitError, error: error, read_version: txn.read_version, commit_version: nil, batch_index: nil) 51 53 {:error, {err, %TxnState{} = txn}} -> 52 54 raise(Hobbes.CommitError, error: {:error, err}, read_version: txn.read_version, commit_version: txn.commit_version, batch_index: txn.batch_index) 53 55 end
+38 -29
lib/workloads/cycle.ex
··· 72 72 defp work(%State{} = state) do 73 73 state = inc_stat(state, :swaps) 74 74 75 - start_time = current_time() 75 + try do 76 + swap(state) 77 + rescue 78 + e in [Hobbes.BeginError, Hobbes.CommitError, Hobbes.ReadError] -> 79 + case e.error do 80 + :timeout -> inc_stat(state, :timeout) 76 81 77 - k1 = "key" <> pad(Enum.random(0..(state.count - 1))) 78 - with {:ok, txn} <- Transaction.new(state.cluster_name), 79 - {:ok, {v2, txn}} <- Transaction.read(txn, k1), k2 = "key" <> v2, 80 - {:ok, {v3, txn}} <- Transaction.read(txn, k2), k3 = "key" <> v3, 81 - {:ok, {v4, txn}} <- Transaction.read(txn, k3), 82 + :read_version_too_old -> inc_stat(state, :rv_old) 83 + :read_version_too_new -> inc_stat(state, :rv_new) 84 + :wrong_generation -> inc_stat(state, :wrng_gen) 82 85 83 - # Swaps (1 -> 2 -> 3 -> 4) to (1 -> 3 -> 2 -> 4) 84 - # clear(k1) is meant to catch bugs in clear implementation or mutation ordering 85 - txn = txn |> Transaction.clear(k1) |> Transaction.write([{k1, v3}, {k3, v2}, {k2, v4}]), 86 - #txn = Transaction.write(txn, [{"\xFF" <> k1, v3}, {"\xFF" <> k3, v2}, {"\xFF" <> k2, v4}]), 86 + :transaction_too_old -> inc_stat(state, :txn_old) 87 + :read_conflict -> inc_stat(state, :rd_cflt) 88 + :database_locked -> inc_stat(state, :db_lk) 89 + end 90 + end 91 + end 92 + 93 + defp swap(%State{} = state) do 94 + start = current_time() 95 + 96 + Hobbes.transaction(state.cluster_name, fn -> 97 + k1 = "key" <> pad(Enum.random(0..(state.count - 1))) 98 + v2 = Hobbes.read(k1) 99 + 100 + k2 = "key" <> v2 101 + v3 = Hobbes.read(k2) 102 + 103 + k3 = "key" <> v3 104 + v4 = Hobbes.read(k3) 87 105 88 - # Random delay between reads/commit for additional concurrency 89 - SimProcess.sleep(Enum.random(100..300)), 90 - {:ok, txn} <- Transaction.commit(txn) 91 - do 92 - duration = current_time() - start_time 106 + Hobbes.write([{k1, v3}, {k3, v2}, {k2, v4}]) 107 + SimProcess.sleep(Enum.random(100..300)) 108 + :ok 109 + end, return_transaction: true) 110 + |> then(fn {:ok, %Transaction.TxnState{commit_version: commit_version}} -> 111 + duration = current_time() - start 93 112 94 - case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster_name, txn.commit_version) do 113 + case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster_name, commit_version) do 95 114 {:ok, _pairs} -> :noop 96 115 {:error, _error} -> :noop 97 116 98 117 {:error, error, pairs} -> 99 118 raise """ 100 - Cycle check error at version #{txn.commit_version}: #{error} 119 + Cycle check error at version #{commit_version}: #{error} 101 120 Pairs: #{inspect(pairs, pretty: true, limit: :infinity)}\ 102 121 """ 103 122 end 104 123 105 - state = update_in(state.commit_latencies, fn list -> [duration | list] end) 124 + state = update_in(state.commit_latencies, &[duration | &1]) 106 125 inc_stat(state, :success) 107 - else 108 - {:error, :timeout} -> inc_stat(state, :timeout) 109 - 110 - {:error, :read_version_too_old} -> inc_stat(state, :rv_old) 111 - {:error, :read_version_too_new} -> inc_stat(state, :rv_new) 112 - {:error, :wrong_generation} -> inc_stat(state, :wrng_gen) 113 - 114 - {:error, {:transaction_too_old, _txn}} -> inc_stat(state, :txn_old) 115 - {:error, {:read_conflict, _txn}} -> inc_stat(state, :rd_cflt) 116 - {:error, {:database_locked, _txn}} -> inc_stat(state, :db_lk) 117 - end 126 + end) 118 127 end 119 128 120 129 defp inc_stat(%State{} = state, key) do