this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use new API for ReadWrite

garrison f558f23c c898a365

+53 -47
+53 -47
lib/workloads/read_write.ex
··· 26 26 27 27 @behaviour Hobbes.Workloads.Workload 28 28 29 - alias Hobbes.Transaction 30 - alias Hobbes.Structs.{Cluster, Server} 31 - 32 29 alias Trinity.{SimProcess, SimServer} 30 + alias Hobbes.Structs.{Cluster, Server} 33 31 34 32 import Hobbes.Utils 35 33 import Hobbes.Workloads ··· 40 38 41 39 defmodule State do 42 40 @enforce_keys [ 43 - :cluster, 41 + :cluster_name, 44 42 :id, 45 43 :tick_ms, 46 44 ] ··· 55 53 SimServer.start_link(__MODULE__, opts) 56 54 end 57 55 58 - def init(%{cluster: %Cluster{} = cluster, id: id, tick_ms: tick_ms}) do 56 + def init(%{cluster_name: cluster_name, id: id, tick_ms: tick_ms}) do 59 57 SimProcess.send_after(self(), :tick, tick_ms) 60 58 61 59 {:ok, %State{ 62 - cluster: cluster, 60 + cluster_name: cluster_name, 63 61 id: id, 64 62 tick_ms: tick_ms, 65 63 }} ··· 78 76 end 79 77 80 78 defp work(state) do 81 - with {:ok, txn} <- Transaction.new(state.cluster), 82 - {:ok, {txn, state}} <- check_reads(txn, state), 83 - {txn, state} = write_keys(txn, state), 84 - {:ok, _txn} <- Transaction.commit(txn) 85 - do 79 + try do 80 + state = 81 + Hobbes.transaction(state.cluster_name, fn -> 82 + state 83 + |> check_reads() 84 + |> write_keys() 85 + end) 86 86 %{state | stats: inc_stat(state.stats, :transactions)} 87 - else 88 - {:error, _err} -> 87 + rescue 88 + _e in [Hobbes.BeginError, Hobbes.CommitError, Hobbes.ReadError] -> 89 89 %{state | stats: inc_stat(state.stats, :failed_transactions)} 90 90 end 91 91 end 92 92 93 - defp check_reads(txn, %State{write_count: 0} = state), do: {:ok, {txn, state}} 93 + defp check_reads(%State{write_count: 0} = state), do: state 94 94 95 - defp check_reads(%Transaction.TxnState{} = txn, %State{} = state) do 95 + defp check_reads(%State{} = state) do 96 96 num_reads = Enum.random(1..10) 97 97 98 98 read_keys = Enum.map(1..num_reads, fn _i -> ··· 100 100 {write_i, key_for(state.id, write_i)} 101 101 end) 102 102 103 - case Transaction.read(txn, Enum.map(read_keys, &elem(&1, 1))) do 104 - {:ok, {results, txn}} -> 105 - Enum.each(read_keys, fn {write_i, key} -> 106 - expected_value = value_for(state.id, write_i) 107 - received_value = Map.fetch!(results, key) 103 + results = Hobbes.read(Enum.map(read_keys, &elem(&1, 1))) 108 104 109 - if expected_value != received_value do 110 - raise """ 111 - Received unexpected value! 112 - Key: #{inspect(key)} 113 - Expected: #{inspect(expected_value)} 114 - Received: #{inspect(received_value)} 115 - """ 116 - end 117 - end) 105 + Enum.each(read_keys, fn {write_i, key} -> 106 + expected_value = value_for(state.id, write_i) 107 + received_value = Map.fetch!(results, key) 118 108 119 - {:ok, {txn, %{state | stats: inc_stat(state.stats, :reads, num_reads)}}} 109 + if expected_value != received_value do 110 + raise """ 111 + Received unexpected value! 112 + Key: #{inspect(key)} 113 + Expected: #{inspect(expected_value)} 114 + Received: #{inspect(received_value)} 115 + """ 116 + end 117 + end) 120 118 121 - {:error, _err} = error -> error 122 - end 119 + %{state | stats: inc_stat(state.stats, :reads, num_reads)} 123 120 end 124 121 125 - defp write_keys(%Transaction.TxnState{} = txn, %State{} = state) do 122 + defp write_keys(%State{} = state) do 126 123 writes = Enum.random(1..10) 127 124 write_range = state.write_count..(state.write_count + writes - 1) 128 125 129 - txn = Enum.reduce(write_range, txn, fn i, txn -> 130 - Transaction.write(txn, key_for(state.id, i), value_for(state.id, i)) 126 + Enum.each(write_range, fn i -> 127 + Hobbes.write(key_for(state.id, i), value_for(state.id, i)) 131 128 end) 132 - {txn, %{state | write_count: state.write_count + writes, stats: inc_stat(state.stats, :writes, writes)}} 129 + 130 + %{state | 131 + write_count: state.write_count + writes, 132 + stats: inc_stat(state.stats, :writes, writes), 133 + } 133 134 end 134 135 135 136 defp key_for(id, i), do: hash("key:#{id}:#{i}") ··· 141 142 end 142 143 end 143 144 144 - def run(%{cluster: %Cluster{} = cluster}, opts) do 145 + def run(%{cluster_name: cluster_name}, opts) do 145 146 num_clients = Keyword.get(opts, :clients, 10) 146 147 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 147 148 duration_ms = Keyword.get(opts, :duration_ms, 10_000) ··· 151 152 start_time = current_time() 152 153 153 154 clients = Enum.map(1..num_clients, fn id -> 154 - {:ok, pid} = Client.start_link(%{cluster: cluster, id: id, tick_ms: client_tick_ms}) 155 + {:ok, pid} = Client.start_link(%{cluster_name: cluster_name, id: id, tick_ms: client_tick_ms}) 155 156 pid 156 157 end) 157 158 ··· 194 195 [[:transactions, :reads, :writes] |> Enum.map(&Map.get(per_second, &1, 0)) |> Enum.map(&pretty_number/1)], 195 196 cols: ["Txns / s", "Reads / s", "Writes / s"] 196 197 ) 198 + 199 + {:ok, %{cluster: %Cluster{} = cluster}} = Hobbes.Cache.lookup_cluster(cluster_name) 197 200 198 201 profiling_stats = 199 202 case profiling do ··· 227 230 end 228 231 229 232 230 - shards = list_shards(cluster) 233 + shards = list_shards(cluster_name) 231 234 shards_table = 232 235 shards 233 236 |> Enum.map(fn {shard_key, {from_ids, to_ids}, size_bytes} -> ··· 261 264 ) 262 265 263 266 if check do 264 - check_database(cluster, Map.fetch!(total_stats, :writes)) 267 + check_database(cluster_name, Map.fetch!(total_stats, :writes)) 265 268 end 266 269 267 270 { ··· 288 291 } 289 292 end 290 293 291 - defp check_database(%Cluster{} = cluster, num_writes) do 292 - {:ok, txn} = Transaction.new(cluster) 293 - {:ok, {pairs, _txn}} = Transaction.read_range(txn, "", "\xFF") 294 + defp check_database(cluster_name, num_writes) do 295 + pairs = Hobbes.transaction(cluster_name, fn -> 296 + Hobbes.read_range(normal_prefix(), normal_end()) 297 + end) 294 298 295 299 if length(pairs) != num_writes do 296 300 raise """ ··· 301 305 :ok 302 306 end 303 307 304 - def list_shards(%Cluster{} = cluster) do 305 - {:ok, txn} = Transaction.new(cluster) 306 - {:ok, {result, _txn}} = Transaction.read_range(txn, "\xFF/key_servers/", "\xFF/key_servers/\xFF\xFF") 308 + def list_shards(cluster_name) do 309 + pairs = Hobbes.transaction(cluster_name, fn -> 310 + Hobbes.read_range(key_servers_prefix(), key_servers_end()) 311 + end) 307 312 308 - result 313 + {:ok, %{cluster: %Cluster{} = cluster}} = Hobbes.Cache.lookup_cluster(cluster_name) 314 + pairs 309 315 |> Enum.map(fn {"\xFF/key_servers/" <> shard_key, servers} -> 310 316 {shard_key, servers} 311 317 end)