···208208 read_version_floor = commit_version - mvcc_window()
209209 {transactions_reversed, old_transactions} =
210210 Enum.split_with(transactions_reversed, fn %CommitTxn{} = txn ->
211211+ # TODO: maybe validate instead of asserting?
212212+ assert is_integer(txn.read_version) or txn.read_version == :write_only
213213+211214 # TODO: make sure this is not off by one anywhere (e.g. WRT resolvers, storage)
215215+ # Note: (:write_only > integer, so :write_only transactions always pass)
212216 txn.read_version > read_version_floor
213217 end)
214218
+29-18
lib/servers/distributor.ex
···179179 shards = ShardInfoMap.list_shards(state.shard_map)
180180181181 {shards_to_split, _shards} = Enum.split_with(shards, &should_split_shard?/1)
182182-183183- Enum.each(shards_to_split, fn shard ->
184184- split_shard(state, shard)
185185- end)
182182+ :ok = split_shards(state, shards_to_split)
186183187184 state
188185 end
···191188 defp should_split_shard?(%Shard{to_server_ids: to}) when to != [], do: false
192189 defp should_split_shard?(%Shard{} = shard), do: shard.stats.size_bytes > @shard_max_size_bytes
193190194194- defp split_shard(%State{} = state, %Shard{} = shard) do
191191+ defp split_shards(%State{} = state, shards) do
192192+ # If too many shards are split at once the transaction could be too large
193193+ # This seems very unlikely, but the failure would be metastable and block all splits,
194194+ # so we have a limit to be safe
195195+ # TODO: buggify this to a lower number for testing?
196196+ shards = Enum.take(shards, 100)
197197+ ks_pairs = Enum.map(shards, &do_split_shard(state, &1))
198198+199199+ {:ok, txn} = Transaction.new(state.cluster, write_only: true)
200200+ txn = Transaction.write(txn, ks_pairs)
201201+202202+ case Transaction.commit(txn) do
203203+ {:ok, _txn} -> :ok
204204+ # Note that retryable errors (:transaction_too_old, :read_conflict) are impossible
205205+ # for this transaction as it is write-only
206206+ #
207207+ # If we get a non-retryable error (e.g. :timeout) then we must exit (triggering a recovery)
208208+ # to preserve the consistency of the in-memory shard map
209209+ # (In practice, if commits are timing out a recovery is likely going to happen anyway)
210210+ {:error, _other} -> exit(:shutdown)
211211+ end
212212+ end
213213+214214+ defp do_split_shard(%State{} = state, %Shard{} = shard) do
195215 assert shard.to_server_ids == []
196216 %ShardStats{midpoint_key: midpoint} = shard.stats
197217···204224 }
205225 shard = %{shard | end_key: midpoint, stats: nil}
206226207207- # TODO: open a transaction with no read version and only handle commit error
208208- with {:ok, txn} <- Transaction.new(state.cluster),
209209- txn = Transaction.write(txn, [to_ks_pair(new_shard)]),
210210- {:ok, _txn} <- Transaction.commit(txn)
211211- do
212212- ShardInfoMap.put(state.shard_map, shard.start_key, shard)
213213- ShardInfoMap.put(state.shard_map, new_shard.start_key, new_shard)
214214- :ok
215215- else
216216- # TODO: we only need to exit for errors which are not retryable (e.g. commit timed out)
217217- # This is necessary to maintain consistency of the in-memory shard map
218218- _ -> exit(:shutdown)
219219- end
227227+ ShardInfoMap.put(state.shard_map, shard.start_key, shard)
228228+ ShardInfoMap.put(state.shard_map, new_shard.start_key, new_shard)
229229+230230+ to_ks_pair(new_shard)
220231 end
221232222233 defp tick_shard_moves(%State{} = state) when state.cluster.status != :normal do
+24-9
lib/transaction.ex
···24242525 @spec new(%Cluster{}) :: {:ok, TxnState.t} | {:error, :timeout}
2626 def new(%Cluster{} = cluster, opts \\ []) do
2727- if read_version = opts[:read_version] do
2828- {:ok, %TxnState{cluster: cluster, read_version: read_version}}
2929- else
3030- case get_read_version(cluster) do
3131- {:ok, read_version} -> {:ok, %TxnState{cluster: cluster, read_version: read_version}}
3232- {:error, _err} = error -> error
3333- end
2727+ cond do
2828+ Keyword.get(opts, :write_only) ->
2929+ {:ok, %TxnState{cluster: cluster, read_version: :write_only}}
3030+3131+ read_version = Keyword.get(opts, :read_version) ->
3232+ {:ok, %TxnState{cluster: cluster, read_version: read_version}}
3333+3434+ true ->
3535+ case get_read_version(cluster) do
3636+ {:ok, read_version} -> {:ok, %TxnState{cluster: cluster, read_version: read_version}}
3737+ {:error, _err} = error -> error
3838+ end
3439 end
3540 end
3641···6671 end
67726873 def read(%TxnState{} = txn, keys) when is_list(keys) do
7474+ ensure_can_read!(txn)
7575+6976 get_shards = fn ->
7077 buf = random_commit_buffer(txn.cluster)
7178 CommitBuffer.get_shards_multi(buf, keys)
···148155 @spec read_range(%TxnState{}, binary, binary) :: {:ok, {[{binary, binary}], %TxnState{}}} | {:error, :read_version_too_old}
149156 def read_range(%TxnState{} = txn, start_key, end_key)
150157 when is_binary(start_key) and is_binary(end_key) and start_key >= "" and end_key <= "\xFF\xFF" do
158158+ ensure_can_read!(txn)
159159+151160 get_ranges = fn ->
152161 buf = random_commit_buffer(txn.cluster)
153162 case CommitBuffer.get_shards_multi(buf, [{start_key, end_key}]) do
···247256 %TxnState{txn | write_conflicts: [range | txn.write_conflicts]}
248257 end
249258250250- @spec commit(TxnState.t) :: {:ok, %TxnState{}} | {:error, term}
259259+ @spec commit(TxnState.t) :: {:ok, %TxnState{}} | {:error, :transaction_too_old | :read_conflict | :database_locked | :timeout}
251260 def commit(%TxnState{} = txn) do
252261 commit_txn = %CommitTxn{
253262 read_version: txn.read_version,
···263272 {:ok, %{commit_version: commit_version}} ->
264273 {:ok, %{txn | commit_version: commit_version}}
265274266266- {:error, _error} = error -> error
275275+ {:error, _err} = error -> error
267276 end
268277 end
269278···278287 i = SimServer.deterministic_random(0..(length(commit_buffers) - 1))
279288 Enum.at(commit_buffers, i).pid
280289 end
290290+291291+ defp ensure_can_read!(%TxnState{read_version: :write_only}) do
292292+ raise "Cannot perform reads because transaction is write-only!"
293293+ end
294294+295295+ defp ensure_can_read!(_txn), do: :noop
281296end