this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Check for new clusters directly in cache

garrison 0e420334 f558f23c

+93 -62
+54 -47
lib/cache.ex
··· 1 1 defmodule Hobbes.Cache do 2 2 use GenServer 3 - alias Trinity.{SimServer, SimPersistentTerm} 3 + alias Trinity.{SimProcess, SimServer, SimPersistentTerm} 4 4 5 5 alias Hobbes.Servers.{Coordinator, Manager} 6 6 alias Hobbes.Structs.Cluster ··· 20 20 defmodule CachedCluster do 21 21 @type t :: %__MODULE__{ 22 22 coordinators: list, 23 + generation: non_neg_integer | -1, 24 + manager_pid: pid | nil, 23 25 cluster: Cluster.t | nil, 24 - counter: non_neg_integer, 25 26 } 26 27 @enforce_keys [ 27 28 :coordinators, 29 + :generation, 30 + :manager_pid, 28 31 :cluster, 29 - :counter, 30 32 ] 31 33 defstruct @enforce_keys 32 34 end ··· 43 45 SimServer.start_link(__MODULE__, nil, name: __MODULE__) 44 46 end 45 47 46 - @spec lookup_cluster(cluster_name) :: {:ok, CachedCluster.t} | {:error, atom} 48 + @spec lookup_cluster(cluster_name) :: {:ok, Cluster.t} | {:error, atom} 47 49 def lookup_cluster(name) do 48 50 clusters_table = get_clusters_table() 49 51 50 52 case :ets.lookup(clusters_table, name) do 51 - [{^name, %CachedCluster{cluster: %Cluster{}} = cached_cluster}] -> 52 - {:ok, cached_cluster} 53 - [{^name, %CachedCluster{counter: counter}}] -> 54 - refresh_cluster(name, counter) 55 - {:error, :cluster_not_loaded} 56 - [] -> 57 - {:error, :unknown_cluster} 53 + [{^name, %CachedCluster{cluster: %Cluster{} = cluster}}] -> {:ok, cluster} 54 + [{^name, %CachedCluster{}}] -> {:error, :cluster_not_loaded} 55 + [] -> {:error, :unknown_cluster} 58 56 end 59 - end 60 - 61 - @spec refresh_cluster(cluster_name, non_neg_integer) :: :ok 62 - def refresh_cluster(name, counter) do 63 - SimServer.cast(__MODULE__, {:refresh_cluster, name, counter}) 64 57 end 65 58 66 59 @spec write_coordinators(cluster_name, list) :: :ok ··· 79 72 state = %State{ 80 73 clusters_table: clusters_table, 81 74 } 75 + 76 + SimProcess.send_after(self(), :tick, 0) 82 77 {:ok, state} 83 78 end 84 79 ··· 87 82 88 83 case :ets.member(clusters_table, name) do 89 84 false -> 90 - cached_cluster = %CachedCluster{coordinators: coordinators, cluster: nil, counter: 0} 85 + cached_cluster = %CachedCluster{ 86 + coordinators: coordinators, 87 + generation: -1, 88 + manager_pid: nil, 89 + cluster: nil, 90 + } 91 91 :ets.insert(clusters_table, {name, cached_cluster}) 92 92 {:reply, :ok, state} 93 93 ··· 96 96 end 97 97 end 98 98 99 - def handle_cast({:refresh_cluster, name, counter}, %State{} = state) do 99 + def handle_info({:response_cluster_manager, response}, %State{} = state) do 100 100 %{clusters_table: clusters_table} = state 101 + %{cluster_name: cluster_name, generation: generation, manager_pid: manager_pid} = response 101 102 102 - case :ets.lookup(clusters_table, name) do 103 - [{^name, %CachedCluster{} = cached_cluster}] when cached_cluster.counter <= counter -> 104 - cached_cluster = 105 - case get_cluster(cached_cluster.coordinators) do 106 - {:ok, %Cluster{} = cluster} -> %{cached_cluster | cluster: cluster} 107 - {:error, _err} -> cached_cluster 108 - end 103 + [{^cluster_name, %CachedCluster{} = cached_cluster}] = :ets.lookup(clusters_table, cluster_name) 104 + if generation > cached_cluster.generation do 105 + cached_cluster = %{cached_cluster | generation: generation, manager_pid: manager_pid, cluster: nil} 106 + :ets.insert(clusters_table, {cluster_name, cached_cluster}) 107 + end 108 + 109 + {:noreply, state} 110 + end 109 111 110 - cached_cluster = %{cached_cluster | counter: cached_cluster.counter + 1} 111 - :ets.insert(clusters_table, {name, cached_cluster}) 112 + def handle_info({:response_cluster, response}, %State{} = state) do 113 + %{clusters_table: clusters_table} = state 114 + %{cluster_name: cluster_name, cluster: cluster} = response 112 115 113 - _ -> :noop 116 + [{^cluster_name, %CachedCluster{} = cached_cluster}] = :ets.lookup(clusters_table, cluster_name) 117 + if cached_cluster.cluster == nil and cached_cluster.generation == cluster.generation do 118 + cached_cluster = %{cached_cluster | cluster: cluster} 119 + :ets.insert(clusters_table, {cluster_name, cached_cluster}) 114 120 end 115 121 116 122 {:noreply, state} 117 123 end 118 124 119 - @spec get_cluster([atom] | [{atom, atom}]) :: {:ok, Cluster.t} | {:error, :coordinator_timeout | :manager_timeout | :manager_recovering} 120 - defp get_cluster(coordinators) when is_list(coordinators) do 121 - coordinators 122 - |> Enum.map(&Coordinator.get_manager_send/1) 123 - |> Enum.map(&Coordinator.get_manager_receive/1) 124 - |> Enum.reduce([], fn 125 - {:ok, result}, acc -> [result | acc] 126 - {:error, _err}, acc -> acc 127 - end) 128 - |> case do 129 - [_ | _] = results -> 130 - {manager_pid, _generation} = Enum.max_by(results, fn {_pid, generation} -> generation end) 131 - case Manager.get_cluster(manager_pid) do 132 - {:ok, %Cluster{} = cluster} -> {:ok, cluster} 133 - {:error, :recovering} -> {:error, :manager_recovering} 134 - {:error, :timeout} -> {:error, :manager_timeout} 125 + def handle_info(:tick, %State{} = state) do 126 + state = tick(state) 127 + SimProcess.send_after(self(), :tick, 100) 128 + {:noreply, state} 129 + end 130 + 131 + defp tick(%State{} = state) do 132 + %{clusters_table: clusters_table} = state 133 + 134 + :ets.tab2list(clusters_table) 135 + |> Enum.each(fn 136 + {cluster_name, %CachedCluster{} = cached_cluster} -> 137 + Enum.each(cached_cluster.coordinators, fn coordinator -> 138 + Coordinator.request_cluster_manager(coordinator, cluster_name) 139 + end) 140 + 141 + if !cached_cluster.cluster && cached_cluster.manager_pid do 142 + Manager.request_cluster(cached_cluster.manager_pid, cluster_name) 135 143 end 144 + end) 136 145 137 - [] -> 138 - {:error, :coordinator_timeout} 139 - end 146 + state 140 147 end 141 148 end
+16
lib/servers/coordinator.ex
··· 227 227 SimServer.cast(server, {:request_current_manager, self()}) 228 228 end 229 229 230 + def request_cluster_manager(server, cluster_name) do 231 + SimServer.cast(server, {:request_cluster_manager, cluster_name, self()}) 232 + end 233 + 230 234 # Private (VSR) API 231 235 232 236 defmodule Prepare do ··· 422 426 generation: state.manager_generation, 423 427 manager_pid: state.manager_pid, 424 428 }) 429 + end 430 + {:noreply, state} 431 + end 432 + 433 + def handle_cast({:request_cluster_manager, cluster_name, from_pid}, %State{} = state) do 434 + if state.manager_pid && state.manager_generation do 435 + SimProcess.send(from_pid, {:response_cluster_manager, %{ 436 + # TODO: store cluster_name in state 437 + cluster_name: cluster_name, 438 + generation: state.manager_generation, 439 + manager_pid: state.manager_pid, 440 + }}) 425 441 end 426 442 {:noreply, state} 427 443 end
+16
lib/servers/manager.ex
··· 68 68 end 69 69 end 70 70 71 + def request_cluster(server, cluster_name) do 72 + SimServer.cast(server, {:request_cluster, cluster_name, self()}) 73 + end 74 + 71 75 def init(%{coordinators: coordinators, primary_coordinator: primary_coordinator}) do 72 76 state = %State{ 73 77 coordinators: coordinators, ··· 109 113 110 114 def handle_cast({:tlog_ping, %TLogStatus{} = status}, %State{} = state) do 111 115 {:noreply, on_tlog_ping(state, status)} 116 + end 117 + 118 + def handle_cast({:request_cluster, cluster_name, from_pid}, %State{} = state) do 119 + case state.cluster do 120 + %Cluster{status: :normal} = cluster -> 121 + SimProcess.send(from_pid, {:response_cluster, %{ 122 + cluster_name: cluster_name, 123 + cluster: cluster, 124 + }}) 125 + _ -> :noop 126 + end 127 + {:noreply, state} 112 128 end 113 129 114 130 def handle_info(:begin, %State{} = state) do
+3 -10
lib/transaction.ex
··· 1 1 defmodule Hobbes.Transaction do 2 - alias Hobbes.Cache 3 - alias Hobbes.Cache.CachedCluster 4 2 alias Hobbes.Structs.{Cluster, Server, CommitTxn, RangeResult} 5 3 alias Hobbes.Servers.{BeginBuffer, CommitBuffer, Storage} 6 4 ··· 42 40 43 41 def new(cluster_name, opts) when is_binary(cluster_name) do 44 42 case Hobbes.Cache.lookup_cluster(cluster_name) do 45 - {:ok, %CachedCluster{} = cached_cluster} -> 46 - case new(cached_cluster.cluster, opts) do 47 - {:ok, _txn} = result -> 48 - result 49 - {:error, _err} = error -> 50 - Cache.refresh_cluster(cluster_name, cached_cluster.counter) 51 - error 52 - end 43 + {:ok, %Cluster{} = cluster} -> new(cluster, opts) 44 + # TODO: pass error through 45 + {:error, :cluster_not_loaded} -> {:error, :timeout} 53 46 end 54 47 end 55 48
+1 -2
lib/workloads.ex
··· 2 2 require Logger 3 3 alias Trinity.{Sim, SimServer, SimLogger} 4 4 5 - alias Hobbes.Cache.CachedCluster 6 5 alias Hobbes.Structs.Cluster 7 6 8 7 import Hobbes.Utils ··· 155 154 {:ok, _coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts) 156 155 157 156 # Keep retrying get_cluster until coordinators are connected 158 - {:ok, %CachedCluster{cluster: %Cluster{} = cluster}} = 157 + {:ok, %Cluster{} = cluster} = 159 158 retry_acc(nil, fn nil -> 160 159 case Hobbes.Cache.lookup_cluster(cluster_name) do 161 160 {:ok, _cluster} = result -> {:halt, result}
+2 -2
lib/workloads/read_write.ex
··· 196 196 cols: ["Txns / s", "Reads / s", "Writes / s"] 197 197 ) 198 198 199 - {:ok, %{cluster: %Cluster{} = cluster}} = Hobbes.Cache.lookup_cluster(cluster_name) 199 + {:ok, %Cluster{} = cluster} = Hobbes.Cache.lookup_cluster(cluster_name) 200 200 201 201 profiling_stats = 202 202 case profiling do ··· 310 310 Hobbes.read_range(key_servers_prefix(), key_servers_end()) 311 311 end) 312 312 313 - {:ok, %{cluster: %Cluster{} = cluster}} = Hobbes.Cache.lookup_cluster(cluster_name) 313 + {:ok, %Cluster{} = cluster} = Hobbes.Cache.lookup_cluster(cluster_name) 314 314 pairs 315 315 |> Enum.map(fn {"\xFF/key_servers/" <> shard_key, servers} -> 316 316 {shard_key, servers}
+1 -1
test/hobbes_test.exs
··· 158 158 keys: 80, 159 159 clients: 20, 160 160 client_tick_ms: 100, 161 - duration_ms: 20_000, 161 + duration_ms: 30_000, 162 162 ]}, 163 163 {Workloads.RestartNodes, [ 164 164 delay_ms: 7000,