this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Convert Cycle to use cluster name

garrison b57a5e8d 87206979

+49 -31
+8 -3
lib/cache.ex
··· 18 18 end 19 19 20 20 defmodule CachedCluster do 21 + @type t :: %__MODULE__{ 22 + coordinators: list, 23 + cluster: Cluster.t | nil, 24 + counter: non_neg_integer, 25 + } 21 26 @enforce_keys [ 22 27 :coordinators, 23 28 :cluster, ··· 38 43 SimServer.start_link(__MODULE__, nil, name: __MODULE__) 39 44 end 40 45 41 - @spec lookup_cluster(cluster_name) :: {:ok, Cluster.t} | {:error, atom} 46 + @spec lookup_cluster(cluster_name) :: {:ok, CachedCluster.t} | {:error, atom} 42 47 def lookup_cluster(name) do 43 48 clusters_table = get_clusters_table() 44 49 45 50 case :ets.lookup(clusters_table, name) do 46 - [{^name, %CachedCluster{cluster: %Cluster{} = cluster}}] -> 47 - {:ok, cluster} 51 + [{^name, %CachedCluster{cluster: %Cluster{}} = cached_cluster}] -> 52 + {:ok, cached_cluster} 48 53 [{^name, %CachedCluster{counter: counter}}] -> 49 54 refresh_cluster(name, counter) 50 55 {:error, :cluster_not_loaded}
+19 -2
lib/transaction.ex
··· 1 1 defmodule Hobbes.Transaction do 2 + alias Hobbes.Cache 3 + alias Hobbes.Cache.CachedCluster 2 4 alias Hobbes.Structs.{Cluster, Server, CommitTxn, RangeResult} 3 5 alias Hobbes.Servers.{BeginBuffer, CommitBuffer, Storage} 4 6 ··· 35 37 ] ++ @enforce_keys 36 38 end 37 39 38 - @spec new(%Cluster{}) :: {:ok, TxnState.t} | {:error, :timeout} 39 - def new(%Cluster{} = cluster, opts \\ []) do 40 + @spec new(binary | %Cluster{}) :: {:ok, TxnState.t} | {:error, :timeout} 41 + def new(cluster, opts \\ []) 42 + 43 + def new(cluster_name, opts) when is_binary(cluster_name) do 44 + case Hobbes.Cache.lookup_cluster(cluster_name) do 45 + {:ok, %CachedCluster{} = cached_cluster} -> 46 + case new(cached_cluster.cluster, opts) do 47 + {:ok, _txn} = result -> 48 + result 49 + {:error, _err} = error -> 50 + Cache.refresh_cluster(cluster_name, cached_cluster.counter) 51 + error 52 + end 53 + end 54 + end 55 + 56 + def new(%Cluster{} = cluster, opts) do 40 57 generation = cluster.generation 41 58 assert is_integer(generation) 42 59
+4 -3
lib/workloads.ex
··· 1 1 defmodule Hobbes.Workloads do 2 2 require Logger 3 + alias Trinity.{Sim, SimServer, SimLogger} 3 4 4 - alias Trinity.{Sim, SimServer, SimLogger} 5 + alias Hobbes.Cache.CachedCluster 5 6 alias Hobbes.Structs.Cluster 6 7 7 8 import Hobbes.Utils ··· 154 155 {:ok, _coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts) 155 156 156 157 # Keep retrying get_cluster until coordinators are connected 157 - {:ok, %Cluster{} = cluster} = 158 + {:ok, %CachedCluster{cluster: %Cluster{} = cluster}} = 158 159 retry_acc(nil, fn nil -> 159 160 case Hobbes.Cache.lookup_cluster(cluster_name) do 160 161 {:ok, _cluster} = result -> {:halt, result} ··· 162 163 end 163 164 end, 20) 164 165 165 - context = %{cluster: cluster} 166 + context = %{cluster_name: cluster_name, cluster: cluster} 166 167 167 168 runners = Enum.map(workloads, fn {module, opts} when is_atom(module) and is_list(opts) -> 168 169 {:ok, pid} = Runner.start_link(module, opts, context)
+18 -23
lib/workloads/cycle.ex
··· 17 17 to fail its checks. 18 18 """ 19 19 20 + alias Trinity.{SimProcess, SimServer} 21 + 20 22 alias Hobbes.Transaction 21 - alias Hobbes.Structs.Cluster 22 - 23 - alias Trinity.{SimProcess, SimServer} 24 23 import Hobbes.Workloads 25 24 26 25 @behaviour Hobbes.Workloads.Workload ··· 32 31 import Hobbes.Utils, only: [current_time: 0] 33 32 34 33 defmodule State do 35 - @enforce_keys [:cluster, :count, :tick_ms, :stopped, :stats, :commit_latencies] 34 + @enforce_keys [:cluster_name, :count, :tick_ms, :stopped, :stats, :commit_latencies] 36 35 defstruct @enforce_keys 37 36 end 38 37 ··· 40 39 SimServer.start_link(__MODULE__, opts) 41 40 end 42 41 43 - def init(%{cluster: %Cluster{} = cluster, count: count, tick_ms: tick_ms}) when is_integer(count) do 42 + def init(%{cluster_name: cluster_name, count: count, tick_ms: tick_ms}) when is_integer(count) do 44 43 SimProcess.send_after(self(), :tick, 0) 45 44 46 45 {:ok, %State{ 47 - cluster: cluster, 46 + cluster_name: cluster_name, 48 47 count: count, 49 48 tick_ms: tick_ms, 50 49 stopped: false, ··· 75 74 start_time = current_time() 76 75 77 76 k1 = "key" <> pad(Enum.random(0..(state.count - 1))) 78 - with {:ok, txn} <- Transaction.new(state.cluster), 77 + with {:ok, txn} <- Transaction.new(state.cluster_name), 79 78 {:ok, {v2, txn}} <- Transaction.read(txn, k1), k2 = "key" <> v2, 80 79 {:ok, {v3, txn}} <- Transaction.read(txn, k2), k3 = "key" <> v3, 81 80 {:ok, {v4, txn}} <- Transaction.read(txn, k3), ··· 91 90 do 92 91 duration = current_time() - start_time 93 92 94 - case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster, txn.commit_version) do 93 + case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster_name, txn.commit_version) do 95 94 {:ok, _pairs} -> :noop 96 95 {:error, _error} -> :noop 97 96 ··· 105 104 state = update_in(state.commit_latencies, fn list -> [duration | list] end) 106 105 inc_stat(state, :success) 107 106 else 108 - {:error, :timeout} -> 109 - cluster = Hobbes.refresh_cluster(state.cluster) 110 - state = %{state | cluster: cluster} 111 - inc_stat(state, :timeout) 107 + {:error, :timeout} -> inc_stat(state, :timeout) 112 108 113 109 {:error, :read_version_too_old} -> inc_stat(state, :rv_old) 114 110 {:error, :read_version_too_new} -> inc_stat(state, :rv_new) ··· 135 131 duration_ms: non_neg_integer, 136 132 ] 137 133 138 - def run(%{cluster: %Cluster{} = cluster}, opts) do 134 + def run(%{cluster_name: cluster_name}, opts) do 139 135 key_count = Keyword.get(opts, :keys, 20) 140 136 client_count = Keyword.get(opts, :clients, 10) 141 137 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 142 138 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 143 139 144 - build_cycle(cluster, key_count) 140 + build_cycle(cluster_name, key_count) 145 141 146 142 clients = Enum.map(1..client_count, fn _i -> 147 - {:ok, pid} = Client.start_link(%{cluster: cluster, count: key_count, tick_ms: client_tick_ms}) 143 + {:ok, pid} = Client.start_link(%{cluster_name: cluster_name, count: key_count, tick_ms: client_tick_ms}) 148 144 pid 149 145 end) 150 146 ··· 187 183 cols: Enum.map(total_cols, &to_string/1) 188 184 ) 189 185 190 - cluster = Hobbes.refresh_cluster(cluster) 191 - pairs = check_cycle(cluster, key_count) 186 + pairs = check_cycle(cluster_name, key_count) 192 187 193 188 pretty_pairs = 194 189 pairs ··· 209 204 } 210 205 end 211 206 212 - defp build_cycle(%Cluster{} = cluster, count) do 213 - {:ok, txn} = Transaction.new(cluster) 207 + defp build_cycle(cluster_name, count) do 208 + {:ok, txn} = Transaction.new(cluster_name) 214 209 215 210 txn = Enum.reduce(0..(count - 2), txn, fn i, txn -> 216 211 Transaction.write(txn, "key" <> Client.pad(i), Client.pad(i + 1)) ··· 219 214 Transaction.commit(txn) 220 215 end 221 216 222 - defp check_cycle(%Cluster{} = cluster, count) do 223 - {:ok, txn} = Transaction.new(cluster) 217 + defp check_cycle(cluster_name, count) do 218 + {:ok, txn} = Transaction.new(cluster_name) 224 219 225 220 all_keys = Enum.map(0..(count - 1), fn i -> "key" <> Client.pad(i) end) 226 221 {:ok, {key_values, txn}} = Transaction.read(txn, all_keys) ··· 253 248 end 254 249 end 255 250 256 - def check_cycle_at_version(%Cluster{} = cluster, version) when is_integer(version) do 257 - with {:ok, txn} <- Transaction.new(cluster), 251 + def check_cycle_at_version(cluster_name, version) when is_integer(version) do 252 + with {:ok, txn} <- Transaction.new(cluster_name), 258 253 txn = Map.put(txn, :read_version, version), 259 254 {:ok, {pairs, _txn}} <- Transaction.read_range(txn, "key", "key\xFF") 260 255 do