this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Return commit version and batch info on commit error

garrison 2190784d c5c52568

+50 -29
+9 -7
lib/servers/commit_buffer.ex
··· 12 12 13 13 # TODO: centralize types? 14 14 @type shard :: {binary, binary, {[integer], [pid]}} 15 + @type commit_info :: %{commit_version: non_neg_integer, batch_index: non_neg_integer} 15 16 16 17 @flush_interval_ms 1 17 18 @commit_interval_us 1000 ··· 92 93 end 93 94 end 94 95 95 - @spec commit(pid, CommitTxn.t) :: {:ok, map} | {:error, :transaction_too_old | :read_conflict | :database_locked | :timeout} 96 + @spec commit(pid, CommitTxn.t) :: {:ok, commit_info} | {:error, :timeout | {:transaction_too_old | :read_conflict | :database_locked, commit_info}} 96 97 def commit(server, %CommitTxn{} = txn) do 97 98 try do 98 99 SimServer.call(server, {:commit, txn}) ··· 141 142 end 142 143 143 144 def handle_call({:commit, %CommitTxn{} = txn}, from, state) do 144 - txn = %CommitTxn{txn | from: from} 145 - state = %State{state | buffer: [txn | state.buffer], buffer_size: state.buffer_size + 1} 145 + txn = %{txn | from: from, batch_index: state.buffer_size} 146 + state = %{state | buffer: [txn | state.buffer], buffer_size: state.buffer_size + 1} 147 + 146 148 state = maybe_commit_batch(state) 147 149 {:noreply, state} 148 150 end ··· 315 317 # Reply to clients 316 318 317 319 Enum.each(allowed_transactions, fn %CommitTxn{} = txn -> 318 - SimServer.reply(txn.from, {:ok, %{commit_version: commit_version}}) 320 + SimServer.reply(txn.from, {:ok, %{commit_version: commit_version, batch_index: txn.batch_index}}) 319 321 end) 320 322 321 323 Enum.each(locked_transactions, fn %CommitTxn{} = txn -> 322 324 # TODO: use a different error message 323 - SimServer.reply(txn.from, {:error, :database_locked}) 325 + SimServer.reply(txn.from, {:error, {:database_locked, %{commit_version: commit_version, batch_index: txn.batch_index}}}) 324 326 end) 325 327 326 328 Enum.each(old_transactions, fn %CommitTxn{} = txn -> 327 - SimServer.reply(txn.from, {:error, :transaction_too_old}) 329 + SimServer.reply(txn.from, {:error, {:transaction_too_old, %{commit_version: commit_version, batch_index: txn.batch_index}}}) 328 330 end) 329 331 330 332 Enum.each(rejected_transactions, fn %CommitTxn{} = txn -> 331 - SimServer.reply(txn.from, {:error, :read_conflict}) 333 + SimServer.reply(txn.from, {:error, {:read_conflict, %{commit_version: commit_version, batch_index: txn.batch_index}}}) 332 334 end) 333 335 334 336 %{state |
+4 -1
lib/structs.ex
··· 7 7 alias Hobbes.Utils 8 8 9 9 defmodule Cluster do 10 + # Hide :servers from inspect output because they are spammy and unhelpful 11 + @derive {Inspect, except: [:servers]} 10 12 @type t :: %__MODULE__{ 11 13 coordinators: [term], 12 14 generation: non_neg_integer, ··· 72 74 write_conflicts: [binary | {binary, binary}], 73 75 mutations: [Utils.mutation], 74 76 from: term, 77 + batch_index: non_neg_integer | nil, 75 78 } 76 79 @enforce_keys [:read_version, :read_conflicts, :write_conflicts, :mutations] 77 - defstruct @enforce_keys ++ [:from] 80 + defstruct @enforce_keys ++ [:from, :batch_index] 78 81 end 79 82 80 83 defmodule ResolveBatch do
+21 -6
lib/transaction.ex
··· 16 16 write_conflicts: [binary | {binary, binary}], 17 17 mutations: [Utils.mutation], 18 18 19 - commit_version: nil | non_neg_integer, 19 + commit_succeeded: boolean | nil, 20 + commit_version: non_neg_integer | nil, 21 + batch_index: non_neg_integer | nil, 20 22 } 21 23 @enforce_keys [:cluster, :read_version] 22 - defstruct @enforce_keys ++ [read_conflicts: [], write_conflicts: [], mutations: [], commit_version: nil] 24 + defstruct [ 25 + read_conflicts: [], 26 + write_conflicts: [], 27 + mutations: [], 28 + commit_succeeded: nil, 29 + commit_version: nil, 30 + batch_index: nil, 31 + ] ++ @enforce_keys 23 32 end 24 33 25 34 @spec new(%Cluster{}) :: {:ok, TxnState.t} | {:error, :timeout} ··· 259 268 %TxnState{txn | write_conflicts: [range | txn.write_conflicts]} 260 269 end 261 270 262 - @spec commit(TxnState.t) :: {:ok, %TxnState{}} | {:error, :transaction_too_old | :read_conflict | :database_locked | :timeout} 271 + @spec commit(TxnState.t) :: {:ok, %TxnState{}} | {:error, :timeout | {:transaction_too_old | :read_conflict | :database_locked, TxnState.t}} 263 272 def commit(%TxnState{} = txn) do 264 273 commit_txn = %CommitTxn{ 265 274 read_version: txn.read_version, ··· 272 281 273 282 CommitBuffer.commit(buf, commit_txn) 274 283 |> case do 275 - {:ok, %{commit_version: commit_version}} -> 276 - {:ok, %{txn | commit_version: commit_version}} 284 + {:ok, %{commit_version: commit_version, batch_index: batch_index}} -> 285 + {:ok, %{txn | commit_succeeded: true, commit_version: commit_version, batch_index: batch_index}} 277 286 278 - {:error, _err} = error -> error 287 + {:error, :timeout} = error -> error 288 + 289 + {:error, {err, info}} when err in [:transaction_too_old, :read_conflict, :database_locked] -> 290 + %{commit_version: commit_version, batch_index: batch_index} = info 291 + 292 + txn = %{txn | commit_succeeded: false, commit_version: commit_version, batch_index: batch_index} 293 + {:error, {err, txn}} 279 294 end 280 295 end 281 296
+7 -7
lib/workloads/cycle.ex
··· 106 106 state = %{state | cluster: cluster} 107 107 inc_stat(state, :timeout) 108 108 109 - {:error, :read_version_too_old} -> inc_stat(state, :rv_too_old) 110 - {:error, :read_version_too_new} -> inc_stat(state, :rv_too_new) 111 - {:error, :database_locked} -> inc_stat(state, :db_lock) 112 - {:error, :too_many_retries} -> inc_stat(state, :tm_retries) 113 - {:error, :transaction_too_old} -> inc_stat(state, :txn_too_old) 114 - {:error, :read_conflict} -> inc_stat(state, :read_cflt) 109 + {:error, :read_version_too_old} -> inc_stat(state, :rv_too_old) 110 + {:error, :read_version_too_new} -> inc_stat(state, :rv_too_new) 115 111 116 - {:error, error} -> inc_stat(state, error) 112 + {:error, {:transaction_too_old, _txn}} -> inc_stat(state, :txn_too_old) 113 + {:error, {:read_conflict, _txn}} -> inc_stat(state, :read_cflt) 114 + {:error, {:database_locked, _txn}} -> inc_stat(state, :db_lock) 115 + 116 + {:error, :too_many_retries} -> inc_stat(state, :tm_retries) 117 117 end 118 118 end 119 119
+1 -1
lib/workloads/lock_database.ex
··· 22 22 SimServer.sleep(delay_ms) 23 23 24 24 {:ok, _txn} = do_write(cluster, "\xFF/lock", "true") 25 - {:error, :database_locked} = do_write(cluster, "foo", "bar") 25 + {:error, {:database_locked, _txn}} = do_write(cluster, "foo", "bar") 26 26 27 27 SimServer.sleep(lock_duration_ms) 28 28
+6 -5
lib/workloads/model.ex
··· 26 26 27 27 read_version: txn.read_version, 28 28 commit_version: txn.commit_version, 29 - # TODO 30 - batch_index: 0, 29 + batch_index: txn.batch_index, 31 30 32 31 read_results: read_results, 33 32 mutations: mutations, ··· 85 84 {:ok, %TxnState{} = txn} -> 86 85 append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations)) 87 86 88 - {:error, :read_conflict} -> 89 - append_history(state, HistoryTxn.new(:read_conflict, txn, read_results, mutations)) 90 - # TODO: more errors 87 + {:error, {err, %TxnState{} = txn}} when err in [:transaction_too_old, :read_conflict] -> 88 + append_history(state, HistoryTxn.new(err, txn, read_results, mutations)) 89 + 90 + # TODO 91 + {:error, :timeout} -> raise "Timeout not supported" 91 92 end 92 93 else 93 94 # TODO: new() errors, read() errors
+2 -2
test/hobbes_test.exs
··· 368 368 369 369 assert {:ok, {[], txn1}} = Transaction.read_range(txn1, "foo", "zoo") 370 370 txn1 = Transaction.write(txn1, "foo2", "baz") 371 - assert {:error, :read_conflict} = Transaction.commit(txn1) 371 + assert {:error, {:read_conflict, _txn}} = Transaction.commit(txn1) 372 372 373 373 txn2 = Transaction.new!(cluster) 374 374 assert {:ok, {[{"foo1", "bar"}], txn2}} = Transaction.read_range(txn2, "foo", "zoo") ··· 434 434 Transaction.commit(txn) 435 435 end) 436 436 437 - assert {:error, :read_conflict} = Transaction.commit(before_txn) 437 + assert {:error, {:read_conflict, _txn}} = Transaction.commit(before_txn) 438 438 439 439 read_txn = Transaction.new!(cluster) 440 440 values = Enum.map(1..3, fn i ->