this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Convert to Trinity

garrison 8e972972 60fc267f

+233 -206
+3 -2
lib/cluster_node.ex
··· 1 1 defmodule Hobbes.ClusterNode do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.SimServer 3 4 4 5 alias Hobbes.Servers.{Coordinator, ServerSupervisor} 5 6 ··· 18 19 end 19 20 20 21 def init(config_opts) do 21 - #SimServer.flag(:trap_exit, true) 22 + #SimProcess.flag(:trap_exit, true) 22 23 23 24 # TODO: validate config with helpful error messages 24 25 config = parse_config(config_opts)
+3 -1
lib/construct/sim_supervisor.ex
··· 1 1 defmodule Hobbes.Construct.SimSupervisor do 2 - use Hobbes.Construct.SimServer 2 + # TODO: move this module out of Construct 3 + use GenServer 4 + alias Trinity.SimServer 3 5 4 6 def start_link(children, opts) do 5 7 SimServer.start_link(__MODULE__, {children, opts})
+7 -4
lib/hobbes.ex
··· 1 1 defmodule Hobbes do 2 - alias Hobbes.Construct.SimServer 3 2 alias Hobbes.ClusterNode 4 3 alias Hobbes.Servers.{Coordinator, Manager} 5 4 alias Hobbes.Structs.Cluster 5 + 6 + alias Trinity.{Sim, SimProcess} 6 7 7 8 defmodule AppShim do 8 9 use Application ··· 43 44 ], 44 45 ] 45 46 46 - SimServer.start_node(node, Hobbes.AppShim, [config]) 47 + SimProcess.spawn_node(node, fn -> 48 + Hobbes.AppShim.start(nil, [config]) 49 + end) 47 50 end) 48 51 49 52 coordinators ··· 112 115 num_coordinators = Keyword.get(opts, :num_coordinators, 3) 113 116 114 117 coordinators = 115 - case SimServer.simulated?() do 118 + case Sim.simulated?() do 116 119 true -> init_distributed_cluster(num_coordinators) 117 120 false -> init_local_cluster(num_coordinators) 118 121 end 119 122 120 - SimServer.sleep(1000) 123 + SimProcess.sleep(1000) 121 124 122 125 config = config_pairs(opts) 123 126 :ok = Coordinator.write(hd(coordinators), config)
+9 -9
lib/kv/flat_storage_kv.ex
··· 16 16 def new(path) do 17 17 kv = %FlatStorageKV{path: path, kv: FlatKV.new()} 18 18 19 - case SimFile.read(path) do 20 - {:ok, contents} -> 21 - pairs = decode(contents) 22 - FlatKV.load(kv.kv, pairs) 19 + #case SimFile.read(path) do 20 + # {:ok, contents} -> 21 + # pairs = decode(contents) 22 + # FlatKV.load(kv.kv, pairs) 23 23 24 - {:error, :enoent} -> :noop 25 - end 24 + # {:error, :enoent} -> :noop 25 + #end 26 26 27 27 kv 28 28 end 29 29 30 - def commit(%FlatStorageKV{kv: kv, path: path}) do 31 - contents = FlatKV.dump(kv) |> encode() 32 - :ok = SimFile.write(path, contents) 30 + def commit(%FlatStorageKV{kv: kv, path: _path}) do 31 + _contents = FlatKV.dump(kv) |> encode() 32 + #:ok = SimFile.write(path, contents) 33 33 :ok 34 34 end 35 35
+4 -3
lib/servers/begin_buffer.ex
··· 1 1 defmodule Hobbes.Servers.BeginBuffer do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{SimProcess, SimServer} 3 4 4 5 import ExUnit.Assertions, only: [assert: 1] 5 6 ··· 50 51 check_locked_reply_ids: nil, 51 52 } 52 53 53 - SimServer.send_after self(), :tick, 0 54 + SimProcess.send_after self(), :tick, 0 54 55 {:ok, state} 55 56 end 56 57 ··· 62 63 def handle_info(:tick, %State{} = state) do 63 64 state = flush(state) 64 65 65 - SimServer.send_after self(), :tick, @tick_interval_ms 66 + SimProcess.send_after self(), :tick, @tick_interval_ms 66 67 {:noreply, state} 67 68 end 68 69
+7 -6
lib/servers/commit_buffer.ex
··· 1 1 defmodule Hobbes.Servers.CommitBuffer do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{SimProcess, SimServer} 3 4 4 5 import ExUnit.Assertions, only: [assert: 1] 5 6 ··· 114 115 115 116 buffer: [], 116 117 buffer_size: 0, 117 - last_commit_timestamp: SimServer.current_time(), 118 + last_commit_timestamp: current_time(), 118 119 119 120 storage_servers: %{}, 120 121 } ··· 122 123 ShardTagMap.load_meta_pairs(state.shard_map, storage_teams_pairs) 123 124 ShardTagMap.load_meta_pairs(state.shard_map, key_storage_pairs) 124 125 125 - SimServer.send_after(self(), :flush, @flush_interval_ms) 126 + SimProcess.send_after(self(), :flush, @flush_interval_ms) 126 127 {:ok, state} 127 128 end 128 129 ··· 152 153 153 154 def handle_info(:flush, %State{} = state) do 154 155 state = maybe_commit_batch(state) 155 - SimServer.send_after(self(), :flush, @flush_interval_ms) 156 + SimProcess.send_after(self(), :flush, @flush_interval_ms) 156 157 {:noreply, state} 157 158 end 158 159 ··· 177 178 178 179 defp maybe_commit_batch(%State{} = state) do 179 180 buffer_size = state.buffer_size 180 - elapsed_us = SimServer.current_time() - state.last_commit_timestamp 181 + elapsed_us = current_time() - state.last_commit_timestamp 181 182 182 183 cond do 183 184 buffer_size >= @max_buffer_size -> ··· 339 340 last_committed_version: commit_version, 340 341 buffer: [], 341 342 buffer_size: 0, 342 - last_commit_timestamp: SimServer.current_time(), 343 + last_commit_timestamp: current_time(), 343 344 } 344 345 end 345 346 end
+16 -15
lib/servers/coordinator.ex
··· 1 1 defmodule Hobbes.Servers.Coordinator do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{Sim, SimProcess, SimServer} 3 4 4 5 import ExUnit.Assertions, only: [assert: 1] 5 6 ··· 285 286 # Server 286 287 287 288 def init(%{id: id, replicas: replicas}) do 288 - SimServer.flag(:trap_exit, true) 289 + SimProcess.flag(:trap_exit, true) 289 290 290 291 replica_map = 291 292 replicas ··· 295 296 replica_ids = Enum.to_list(0..(length(replicas) - 1)) 296 297 quorum_size = div(length(replica_ids), 2) + 1 297 298 298 - {kv_mod, kv} = case SimServer.simulated?() do 299 + {kv_mod, kv} = case Sim.simulated?() do 299 300 true -> {FlatStorageKV, FlatStorageKV.new("/coordinator_#{Integer.to_string(id)}.kv")} 300 301 false -> {FlatKV, FlatKV.new()} 301 302 end ··· 336 337 # Subtracting @request_start_view_min_elapsed_ms ensures cluster bootstraps a bit faster 337 338 # if the first Prepare is missed during backup replica startup 338 339 # TODO: there are better ways to ensure this, probably 339 - last_request_start_view_timestamp: SimServer.current_time() - (@request_start_view_min_elapsed_ms * 1000), 340 + last_request_start_view_timestamp: current_time() - (@request_start_view_min_elapsed_ms * 1000), 340 341 } 341 342 342 343 assert state.status in [:normal, :view_change] ··· 350 351 351 352 state = start_manager_if_primary(state) 352 353 353 - SimServer.send_after(self(), :tick, @tick_interval_ms) 354 + SimProcess.send_after(self(), :tick, @tick_interval_ms) 354 355 355 356 {:ok, state} 356 357 end ··· 456 457 def handle_info(:tick, %State{} = state) do 457 458 state = tick_all(state) 458 459 459 - SimServer.send_after(self(), :tick, @tick_interval_ms) 460 + SimProcess.send_after(self(), :tick, @tick_interval_ms) 460 461 {:noreply, %State{} = state} 461 462 end 462 463 ··· 739 740 740 741 state = %{state | 741 742 start_view_change_replicas: svc_replicas, 742 - first_svc_received_timestamp: state.first_svc_received_timestamp || SimServer.current_time(), 743 + first_svc_received_timestamp: state.first_svc_received_timestamp || current_time(), 743 744 } 744 745 745 746 case MapSet.size(state.start_view_change_replicas) >= state.quorum_size do ··· 755 756 defp clear_start_view_change_replicas_if_timeout(%State{} = state) do 756 757 assert state.first_svc_received_timestamp != nil 757 758 758 - elapsed_us = SimServer.current_time() - state.first_svc_received_timestamp 759 + elapsed_us = current_time() - state.first_svc_received_timestamp 759 760 760 761 case elapsed_us > (@clear_svc_replicas_timeout_ms * 1000) do 761 762 true -> ··· 911 912 assert state.status == :normal 912 913 assert state.op_number > state.commit_number 913 914 914 - elapsed_us = SimServer.current_time() - state.last_send_prepare_timestamp 915 + elapsed_us = current_time() - state.last_send_prepare_timestamp 915 916 916 917 case elapsed_us > (@send_prepare_min_elapsed_ms * 1000) do 917 918 true -> send_prepare(state) ··· 940 941 msg_prepare(pid, prepare) 941 942 end) 942 943 943 - %{state | last_send_prepare_timestamp: SimServer.current_time()} 944 + %{state | last_send_prepare_timestamp: current_time()} 944 945 end 945 946 946 947 defp send_commit(%State{} = state) do ··· 961 962 end 962 963 963 964 defp send_start_view_change_if_primary_timeout(%State{} = state) do 964 - primary_elapsed_us = SimServer.current_time() - state.last_primary_message_received_timestamp 965 + primary_elapsed_us = current_time() - state.last_primary_message_received_timestamp 965 966 966 967 case primary_elapsed_us > (@timeout_for_view_change_ms * 1000) do 967 968 true -> send_start_view_change(state) ··· 1021 1022 defp send_request_start_view_if_enough_elapsed(%State{} = state, view_number) when is_integer(view_number) do 1022 1023 assert view_number >= state.view_number 1023 1024 1024 - elapsed_us = SimServer.current_time() - state.last_request_start_view_timestamp 1025 + elapsed_us = current_time() - state.last_request_start_view_timestamp 1025 1026 1026 1027 case elapsed_us > (@request_start_view_min_elapsed_ms * 1000) do 1027 1028 true -> ··· 1032 1033 from_replica_id: state.id, 1033 1034 }) 1034 1035 1035 - %{state | last_request_start_view_timestamp: SimServer.current_time()} 1036 + %{state | last_request_start_view_timestamp: current_time()} 1036 1037 1037 1038 false -> state 1038 1039 end ··· 1090 1091 1091 1092 first_svc_received_timestamp: nil, 1092 1093 # When the view is updated we always want to give the new primary time to respond 1093 - last_primary_message_received_timestamp: SimServer.current_time(), 1094 + last_primary_message_received_timestamp: current_time(), 1094 1095 } 1095 1096 end 1096 1097 ··· 1112 1113 end 1113 1114 1114 1115 defp track_primary_message(%State{} = state) do 1115 - %{state | last_primary_message_received_timestamp: SimServer.current_time()} 1116 + %{state | last_primary_message_received_timestamp: current_time()} 1116 1117 end 1117 1118 1118 1119 # Hobbes/Cluster
+10 -9
lib/servers/distributor.ex
··· 1 1 defmodule Hobbes.Servers.Distributor do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{SimProcess, SimServer} 3 4 4 5 import ExUnit.Assertions, only: [assert: 1] 5 6 ··· 125 126 pid: nil, 126 127 stats: nil, 127 128 # Initialize ping timestamps so that servers are not considered failed at startup 128 - last_ping_timestamp: SimServer.current_time(), 129 + last_ping_timestamp: current_time(), 129 130 } 130 131 } 131 132 end) ··· 167 168 next_shard_move_id: next_shard_move_id, 168 169 } 169 170 170 - SimServer.send_after(self(), :tick_scan, @tick_scan_interval_ms) 171 - SimServer.send_after(self(), :tick_shard_moves, @tick_shard_moves_interval_ms) 171 + SimProcess.send_after(self(), :tick_scan, @tick_scan_interval_ms) 172 + SimProcess.send_after(self(), :tick_shard_moves, @tick_shard_moves_interval_ms) 172 173 {:ok, state} 173 174 end 174 175 ··· 181 182 # TODO: for new storage servers there will be a different path (add id to meta keyspace) 182 183 prev_info = Map.fetch!(state.storage_servers, storage_id) 183 184 184 - info = %StorageInfo{id: storage_id, pid: storage_pid, stats: storage_stats, last_ping_timestamp: SimServer.current_time()} 185 + info = %StorageInfo{id: storage_id, pid: storage_pid, stats: storage_stats, last_ping_timestamp: current_time()} 185 186 state = %{state | storage_servers: Map.put(state.storage_servers, storage_id, info)} 186 187 187 188 :ok = update_shard_stats(state, shard_stats) ··· 196 197 197 198 def handle_info(:tick_scan, %State{} = state) do 198 199 state = scan(state) 199 - SimServer.send_after(self(), :tick_scan, @tick_scan_interval_ms) 200 + SimProcess.send_after(self(), :tick_scan, @tick_scan_interval_ms) 200 201 {:noreply, state} 201 202 end 202 203 203 204 def handle_info(:tick_shard_moves, %State{} = state) do 204 205 state = tick_shard_moves(state) 205 - SimServer.send_after(self(), :tick_shard_moves, @tick_shard_moves_interval_ms) 206 + SimProcess.send_after(self(), :tick_shard_moves, @tick_shard_moves_interval_ms) 206 207 {:noreply, state} 207 208 end 208 209 ··· 234 235 235 236 get_servers(state.cluster, Hobbes.Servers.CommitBuffer) 236 237 |> Enum.each(fn %Server{pid: buf_pid} -> 237 - SimServer.send buf_pid, {:update_storage_servers, server_map} 238 + SimProcess.send buf_pid, {:update_storage_servers, server_map} 238 239 end) 239 240 end 240 241 ··· 274 275 defp compute_team_stats(%State{storage_servers: storage_servers}, %StorageTeam{} = team) do 275 276 storage_info = Enum.map(team.storage_ids, &Map.fetch!(storage_servers, &1)) 276 277 277 - now = SimServer.current_time() 278 + now = current_time() 278 279 unhealthy_count = 279 280 Enum.count(storage_info, fn %StorageInfo{last_ping_timestamp: last_ping_timestamp} -> 280 281 (now - last_ping_timestamp) > @unhealthy_ping_elapsed_us
+13 -12
lib/servers/manager.ex
··· 1 1 defmodule Hobbes.Servers.Manager do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{SimProcess, SimServer} 3 4 4 5 import ExUnit.Assertions, only: [assert: 1] 5 6 ··· 97 98 coordinators: coordinators, 98 99 primary_coordinator: primary_coordinator, 99 100 } 100 - SimServer.send(self(), :begin) 101 + SimProcess.send(self(), :begin) 101 102 {:ok, state} 102 103 end 103 104 ··· 119 120 exit(:shutdown) 120 121 end 121 122 122 - SimServer.send(supervisor_pid, {:update_cluster, state.cluster}) 123 + SimProcess.send(supervisor_pid, {:update_cluster, state.cluster}) 123 124 state = add_supervisor(state, supervisor_pid, slots) 124 125 125 126 {:noreply, state} ··· 167 168 } 168 169 :ok = Coordinator.track_manager_generation(state.primary_coordinator, generation) 169 170 170 - SimServer.send self(), :tick 171 + SimProcess.send self(), :tick 171 172 {:noreply, state} 172 173 end 173 174 174 175 def handle_info(:tick, %State{} = state) do 175 176 state = tick(state) 176 177 177 - SimServer.send_after self(), :tick, @tick_interval_ms 178 + SimProcess.send_after self(), :tick, @tick_interval_ms 178 179 {:noreply, state} 179 180 end 180 181 ··· 198 199 199 200 defp check_supervisor_timeout(%State{} = state) do 200 201 Enum.each(state.supervisors, fn {_pid, %SupervisorInfo{} = info} -> 201 - elapsed_us = SimServer.current_time() - info.last_ping_timestamp 202 + elapsed_us = current_time() - info.last_ping_timestamp 202 203 203 204 if elapsed_us > @supervisor_timeout_us do 204 205 # If a supervisor fails to ping for too long we trigger a recovery ··· 253 254 case List.keyfind(state.supervisors, supervisor_pid, 0) do 254 255 nil -> 255 256 # Add new supervisor to list 256 - info = %SupervisorInfo{pid: supervisor_pid, slots: slots, last_ping_timestamp: SimServer.current_time()} 257 + info = %SupervisorInfo{pid: supervisor_pid, slots: slots, last_ping_timestamp: current_time()} 257 258 %{state | supervisors: [{supervisor_pid, info} | state.supervisors]} 258 259 {^supervisor_pid, %SupervisorInfo{} = existing} -> 259 260 # Update timestamp for existing supervisor 260 - info = %{existing | last_ping_timestamp: SimServer.current_time()} 261 + info = %{existing | last_ping_timestamp: current_time()} 261 262 %{state | supervisors: List.keyreplace(state.supervisors, supervisor_pid, 0, {supervisor_pid, info})} 262 263 end 263 264 |> maybe_start_generation() ··· 325 326 326 327 case has_enough_storage? do 327 328 true -> 328 - state = %{state | recovery_ready_timestamp: state.recovery_ready_timestamp || SimServer.current_time()} 329 - elapsed_us = SimServer.current_time() - state.recovery_ready_timestamp 329 + state = %{state | recovery_ready_timestamp: state.recovery_ready_timestamp || current_time()} 330 + elapsed_us = current_time() - state.recovery_ready_timestamp 330 331 331 332 case elapsed_us > @recovery_wait_duration_us do 332 333 true -> bootstrap(state) ··· 351 352 352 353 case has_enough_prev_tlogs? do 353 354 true -> 354 - state = %{state | recovery_ready_timestamp: state.recovery_ready_timestamp || SimServer.current_time()} 355 - elapsed_us = SimServer.current_time() - state.recovery_ready_timestamp 355 + state = %{state | recovery_ready_timestamp: state.recovery_ready_timestamp || current_time()} 356 + elapsed_us = current_time() - state.recovery_ready_timestamp 356 357 357 358 case elapsed_us > @recovery_wait_duration_us do 358 359 true -> recover(state)
+4 -3
lib/servers/resolver.ex
··· 1 1 defmodule Hobbes.Servers.Resolver do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{SimProcess, SimServer} 3 4 4 5 alias Hobbes.Structs.{Cluster, ResolveBatch} 5 6 alias Hobbes.VersionMap ··· 57 58 meta_mutations_log: [], 58 59 } 59 60 60 - SimServer.send_after(self(), :clear_old, @clear_every_ms) 61 + SimProcess.send_after(self(), :clear_old, @clear_every_ms) 61 62 62 63 {:ok, state} 63 64 end ··· 69 70 70 71 def handle_info(:clear_old, %State{} = state) do 71 72 VersionMap.clear_old(state.version_map, max(state.version - @mvcc_window, 0)) 72 - SimServer.send_after(self(), :clear_old, @clear_every_ms) 73 + SimProcess.send_after(self(), :clear_old, @clear_every_ms) 73 74 {:noreply, state} 74 75 end 75 76
+5 -3
lib/servers/sequencer.ex
··· 1 1 defmodule Hobbes.Servers.Sequencer do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.SimServer 3 4 4 5 alias Hobbes.Structs.Cluster 6 + import Hobbes.Utils 5 7 6 8 defmodule State do 7 9 @enforce_keys [ ··· 59 61 cluster: cluster, 60 62 61 63 last_version: prev_version, 62 - last_time: SimServer.current_time(), 64 + last_time: current_time(), 63 65 known_commit_version: 0, 64 66 } 65 67 ··· 67 69 end 68 70 69 71 def handle_call(:get_commit_version, _from, %State{} = state) do 70 - time = SimServer.current_time() 72 + time = current_time() 71 73 elapsed_us = time - state.last_time 72 74 73 75 # 1 us = 1 version
+18 -15
lib/servers/server_supervisor.ex
··· 1 1 defmodule Hobbes.Servers.ServerSupervisor do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{Sim, SimProcess, SimServer} 3 4 4 5 alias Hobbes.Construct.SimFile 5 6 import ExUnit.Assertions, only: [assert: 1] 6 7 7 8 alias Hobbes.Structs.Cluster 8 9 alias Hobbes.Servers.{Coordinator, Manager} 10 + 11 + import Hobbes.Utils 9 12 10 13 defmodule State do 11 14 @type t :: %__MODULE__{ ··· 52 55 end 53 56 54 57 def init(%{coordinators: coordinators, slots: config_slots}) do 55 - SimServer.flag(:trap_exit, true) 58 + SimProcess.flag(:trap_exit, true) 56 59 57 60 slots = %{ 58 61 stateless: Keyword.get(config_slots, :stateless, 0), ··· 66 69 state = %State{ 67 70 coordinators: coordinators, 68 71 slots: slots, 69 - last_manager_message_timestamp: SimServer.current_time(), 72 + last_manager_message_timestamp: current_time(), 70 73 71 74 open_stateless: slots.stateless, 72 75 open_tlog: slots.tlog, 73 76 open_storage: slots.storage, 74 77 } 75 78 76 - SimServer.send_after(self(), :tick, @tick_interval_ms) 79 + SimProcess.send_after(self(), :tick, @tick_interval_ms) 77 80 {:ok, state} 78 81 end 79 82 ··· 89 92 def handle_info(:tick, %State{} = state) do 90 93 state = tick(state) 91 94 92 - SimServer.send_after(self(), :tick, @tick_interval_ms) 95 + SimProcess.send_after(self(), :tick, @tick_interval_ms) 93 96 {:noreply, state} 94 97 end 95 98 ··· 126 129 # Tick 127 130 128 131 defp tick(%State{} = state) do 129 - timed_out? = (SimServer.current_time() - state.last_manager_message_timestamp) > (@manager_timeout_ms * 1000) 132 + timed_out? = (current_time() - state.last_manager_message_timestamp) > (@manager_timeout_ms * 1000) 130 133 case state.manager_pid == nil or timed_out? do 131 134 true -> 132 135 state ··· 154 157 %{state | 155 158 manager_pid: manager_pid, 156 159 manager_generation: generation, 157 - last_manager_message_timestamp: SimServer.current_time(), 160 + last_manager_message_timestamp: current_time(), 158 161 generation_failed: false, 159 162 } 160 163 end ··· 168 171 # Cluster received for the first time 169 172 assert cluster.generation == state.manager_generation 170 173 171 - %{state | cluster: cluster, last_manager_message_timestamp: SimServer.current_time()} 174 + %{state | cluster: cluster, last_manager_message_timestamp: current_time()} 172 175 |> load_stateful_slots() 173 176 |> broadcast_cluster_to_children() 174 177 end ··· 179 182 180 183 %{state | 181 184 cluster: cluster, 182 - last_manager_message_timestamp: SimServer.current_time(), 185 + last_manager_message_timestamp: current_time(), 183 186 184 187 # Reset stateless and tlog slots for the new generation 185 188 # (but not storage slots as storage servers are not tied to the transaction system generation) ··· 191 194 192 195 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster == state.cluster do 193 196 # Ignore identical cluster but update keepalive timestamp 194 - %{state | last_manager_message_timestamp: SimServer.current_time()} 197 + %{state | last_manager_message_timestamp: current_time()} 195 198 end 196 199 197 200 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when state.cluster != nil do ··· 199 202 assert cluster.generation == state.cluster.generation 200 203 assert cluster.generation == state.manager_generation 201 204 202 - %{state | cluster: cluster, last_manager_message_timestamp: SimServer.current_time()} 205 + %{state | cluster: cluster, last_manager_message_timestamp: current_time()} 203 206 |> broadcast_cluster_to_children() 204 207 end 205 208 206 209 defp load_stateful_slots(%State{} = state) do 207 210 # TODO: check if cluster is in-memory instead 208 - case SimServer.simulated?() do 211 + case Sim.simulated?() do 209 212 true -> 210 213 state 211 214 |> load_tlog_slots() ··· 299 302 state = %{state | open_tlog: rest} 300 303 301 304 kv_path = slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 302 - if SimServer.simulated?(), do: assert not SimFile.exists?(kv_path) 305 + if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 303 306 304 307 arg = Map.put(arg, :path, kv_path) 305 308 restart_arg = %{path: kv_path} ··· 317 320 state = %{state | open_storage: rest} 318 321 319 322 kv_path = slot_path <> "/storage.kv" 320 - if SimServer.simulated?(), do: assert not SimFile.exists?(kv_path) 323 + if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 321 324 322 325 arg = Map.put(arg, :path, kv_path) 323 326 restart_arg = %{path: kv_path} ··· 398 401 assert state.cluster != nil 399 402 400 403 Enum.each(state.children, fn {pid, _info} -> 401 - SimServer.send(pid, {:update_cluster, state.cluster}) 404 + SimProcess.send(pid, {:update_cluster, state.cluster}) 402 405 end) 403 406 404 407 state
+25 -23
lib/servers/storage.ex
··· 1 1 defmodule Hobbes.Servers.Storage do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{Sim, SimProcess, SimServer} 3 4 require Logger 4 5 5 6 import ExUnit.Assertions, only: [assert: 1] ··· 134 135 135 136 @spec check_import_complete_receive(SimServer.request_id) :: {:ok, boolean} | {:error, :unknown_shard_move | :timeout} 136 137 def check_import_complete_receive(request_id) do 137 - case SimServer.receive_response(request_id) do 138 + case SimServer.receive_response(request_id, 5000) do 138 139 {:reply, reply} -> reply 139 140 :timeout -> {:error, :timeout} 140 141 end ··· 143 144 def init(%{cluster: %Cluster{} = cluster, path: path, id: id, storage_team_id: storage_team_id}) do 144 145 # Init new storage server 145 146 assert is_integer(id) 146 - path = case SimServer.simulated?() do 147 + # TODO: simulated? 148 + path = case Sim.simulated?() do 147 149 true -> path 148 150 false -> nil 149 151 end ··· 159 161 160 162 def init(%{cluster: %Cluster{} = cluster, path: path}) do 161 163 # Init existing storage server from storage KV 162 - assert SimServer.simulated?() 164 + assert Sim.simulated?() 163 165 kv = HybridKV.new(path: path) 164 166 165 167 do_init(cluster, kv) ··· 240 242 byte_sample_pairs = HybridKV.scan(kv, 1, special_byte_sample_prefix(), special_byte_sample_end()).pairs 241 243 ByteSample.load(state.byte_sample, byte_sample_pairs) 242 244 243 - SimServer.send_after(self(), :tick_ping, 0) 244 - SimServer.send_after(self(), :flush, @flush_interval_ms) 245 - SimServer.send_after(self(), :peek_retry, 0) 245 + SimProcess.send_after(self(), :tick_ping, 0) 246 + SimProcess.send_after(self(), :flush, @flush_interval_ms) 247 + SimProcess.send_after(self(), :peek_retry, 0) 246 248 {:ok, state} 247 249 end 248 250 ··· 252 254 do 253 255 kv = state.kv 254 256 values = Enum.map(keys, &HybridKV.get(kv, read_version, &1)) 255 - SimServer.simulate_work([1, 2, 3, 10, 30]) 257 + SimProcess.sleep(Enum.random([1, 2, 3, 10, 30])) 256 258 {:reply, {:ok, values}, state} 257 259 else 258 260 {:error, _err} = error -> ··· 265 267 :ok <- check_contains_range(state, start_key, end_key) 266 268 do 267 269 %RangeResult{} = result = HybridKV.scan(state.kv, read_version, start_key, end_key, limit: limit, reverse: reverse) 268 - SimServer.simulate_work([1, 2, 3, 10, 30]) 270 + SimProcess.sleep(Enum.random([1, 2, 3, 10, 30])) 269 271 {:reply, {:ok, result}, state} 270 272 else 271 273 {:error, _err} = error -> ··· 298 300 :ok <- check_contains_range(state, start_key, end_key) 299 301 do 300 302 %RangeResult{} = result = HybridKV.scan(state.kv, read_version, start_key, end_key, limit: limit, reverse: false) 301 - SimServer.simulate_work([1, 2, 3, 10, 30]) 302 - SimServer.send from_pid, {:read_range_result, nonce, result} 303 + SimProcess.sleep(Enum.random([1, 2, 3, 10, 30])) 304 + SimProcess.send from_pid, {:read_range_result, nonce, result} 303 305 else 304 306 {:error, _err} = error -> 305 - SimServer.send from_pid, {:read_range_result, nonce, error} 307 + SimProcess.send from_pid, {:read_range_result, nonce, error} 306 308 end 307 309 {:noreply, state} 308 310 end 309 311 310 312 def handle_info(:tick_ping, %State{} = state) do 311 313 state = ping_distributor(state) 312 - SimServer.send_after(self(), :tick_ping, @ping_distributor_interval_ms) 314 + SimProcess.send_after(self(), :tick_ping, @ping_distributor_interval_ms) 313 315 {:noreply, state} 314 316 end 315 317 ··· 323 325 end 324 326 325 327 def handle_info({:peek_version_too_new, nonce}, %State{} = state) when nonce == state.peek_nonce do 326 - SimServer.send_after self(), :peek_retry, 1 328 + SimProcess.send_after self(), :peek_retry, 1 327 329 {:noreply, state} 328 330 end 329 331 ··· 376 378 377 379 def handle_info(:flush, state) do 378 380 state = flush(state) 379 - SimServer.send_after(self(), :flush, @flush_interval_ms) 381 + SimProcess.send_after(self(), :flush, @flush_interval_ms) 380 382 {:noreply, state} 381 383 end 382 384 ··· 455 457 :ok = HybridKV.commit(kv) 456 458 state = %{state | kv: kv} 457 459 458 - SimServer.simulate_work([1, 10, 20, 30, 100]) 460 + SimProcess.sleep(Enum.random([1, 10, 20, 30, 100])) 459 461 460 462 # Send pops to tlogs 461 463 # Note: pop is a cast (async) because the result doesn't matter for correctness ··· 471 473 defp peek_logs(%State{} = state, _rotate_tlog?) when state.cluster.status != :normal do 472 474 # If we are waiting for recovery, just keep trying 473 475 # TODO: can we peek *during* a recovery? 474 - SimServer.send_after(self(), :peek_retry, 100) 476 + SimProcess.send_after(self(), :peek_retry, 100) 475 477 state 476 478 end 477 479 ··· 509 511 TLog.peek(tlog_server.pid, nonce, tag, start_version, end_version) 510 512 end 511 513 512 - SimServer.send_after self(), {:peek_timeout, nonce}, 300 514 + SimProcess.send_after self(), {:peek_timeout, nonce}, 300 513 515 %{state | peek_nonce: nonce} 514 516 end 515 517 ··· 523 525 assert result.end_version >= state.data_version 524 526 525 527 state = apply_batches(state, result.batches) 526 - SimServer.send_after self(), :peek_retry, 1 528 + SimProcess.send_after self(), :peek_retry, 1 527 529 %{state | data_version: result.end_version, peek_nonce: nil} 528 530 end 529 531 ··· 655 657 656 658 assert not Map.has_key?(state.imports, shard_import.id) 657 659 658 - SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 0) 660 + SimProcess.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 0) 659 661 put_in(state.imports[shard_import.id], shard_import) 660 662 end 661 663 ··· 667 669 defp tick_import(%ShardImport{} = shard_import, %State{} = state) when state.cluster.status != :normal do 668 670 # Cluster is recovering, try again later 669 671 shard_import = %{shard_import | nonce: make_ref()} 670 - SimServer.send_after self(), {:import_read_range_timeout, shard_import.nonce}, 300 672 + SimProcess.send_after self(), {:import_read_range_timeout, shard_import.nonce}, 300 671 673 state 672 674 end 673 675 ··· 689 691 do 690 692 Storage.read_range_async(from_storage_pid, shard_import.nonce, shard_import.current_read_version, shard_import.current_end_key, shard_import.end_key, limit: @import_key_limit) 691 693 692 - SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 300) 694 + SimProcess.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 300) 693 695 state 694 696 else 695 697 # Failure cases: ··· 698 700 # from_pids is all nil because CommitBuffer does not know current storage pids 699 701 _ -> 700 702 # Retry 701 - SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 100) 703 + SimProcess.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 100) 702 704 state 703 705 end 704 706 end
+10 -9
lib/servers/tlog.ex
··· 1 1 defmodule Hobbes.Servers.TLog do 2 - use Hobbes.Construct.SimServer 2 + use GenServer 3 + alias Trinity.{SimProcess, SimServer} 3 4 4 5 import ExUnit.Assertions, only: [assert: 1] 5 6 ··· 89 90 90 91 @spec write_batch_receive(SimServer.request_id) :: :ok | {:error, :tlog_locked | :timeout} 91 92 def write_batch_receive(request_id) do 92 - case SimServer.receive_response(request_id) do 93 + case SimServer.receive_response(request_id, 5000) do 93 94 {:reply, reply} -> reply 94 95 :timeout -> {:error, :timeout} 95 96 end ··· 107 108 108 109 @spec peek_all_receive(SimServer.request_id) :: {:ok, [{non_neg_integer, [Utils.numbered_mutation]}]} | {:error, :timeout} 109 110 def peek_all_receive(request_id) do 110 - case SimServer.receive_response(request_id) do 111 + case SimServer.receive_response(request_id, 5000) do 111 112 {:reply, reply} -> reply 112 113 :timeout -> {:error, :timeout} 113 114 end ··· 219 220 220 221 # TODO: load batches from log 221 222 222 - SimServer.send_after(self(), :tick, @tick_interval) 223 + SimProcess.send_after(self(), :tick, @tick_interval) 223 224 {:ok, state} 224 225 end 225 226 ··· 256 257 end 257 258 258 259 def handle_cast({:peek, from_pid, nonce, _tag, start_version, _end_version}, %State{} = state) when start_version > state.version do 259 - SimServer.send from_pid, {:peek_version_too_new, nonce} 260 + SimProcess.send from_pid, {:peek_version_too_new, nonce} 260 261 {:noreply, state} 261 262 end 262 263 ··· 268 269 assert start_version <= end_version 269 270 270 271 batches = peek_batches(state, tag, start_version, end_version) 271 - SimServer.simulate_work([1, 2, 3, 10, 30]) 272 + SimProcess.sleep(Enum.random([1, 2, 3, 10, 30])) 272 273 273 274 result = %PeekResult{ 274 275 tlog_id: state.id, ··· 277 278 end_version: end_version, 278 279 batches: batches, 279 280 } 280 - SimServer.send from_pid, {:peek_result, result} 281 + SimProcess.send from_pid, {:peek_result, result} 281 282 282 283 {:noreply, state} 283 284 end ··· 309 310 end 310 311 311 312 def handle_cast({:check_locked, pid, nonce}, %State{} = state) do 312 - SimServer.send pid, {:check_locked_reply, nonce, state.id, state.locked?} 313 + SimProcess.send pid, {:check_locked_reply, nonce, state.id, state.locked?} 313 314 {:noreply, state} 314 315 end 315 316 316 317 def handle_info(:tick, %State{} = state) do 317 - SimServer.send_after(self(), :tick, @tick_interval) 318 + SimProcess.send_after(self(), :tick, @tick_interval) 318 319 {:noreply, tick(state)} 319 320 end 320 321
+1 -3
lib/transaction.ex
··· 2 2 alias Hobbes.Structs.{Cluster, Server, CommitTxn, RangeResult} 3 3 alias Hobbes.Servers.{BeginBuffer, CommitBuffer, Storage} 4 4 5 - alias Hobbes.Construct.SimServer 6 - 7 5 alias Hobbes.Utils 8 6 import Hobbes.Utils 9 7 ··· 310 308 311 309 defp random_commit_buffer(%Cluster{} = cluster) do 312 310 commit_buffers = get_servers(cluster, Hobbes.Servers.CommitBuffer) 313 - i = SimServer.deterministic_random(0..(length(commit_buffers) - 1)) 311 + i = Enum.random(0..(length(commit_buffers) - 1)) 314 312 Enum.at(commit_buffers, i).pid 315 313 end 316 314
+10 -4
lib/utils.ex
··· 4 4 """ 5 5 6 6 alias Hobbes.Structs.{Cluster, TLogGeneration, Server} 7 - alias Hobbes.Construct.SimServer 8 7 alias Hobbes.Encoding.Keyset 8 + 9 + alias Trinity.SimProcess 9 10 10 11 import ExUnit.Assertions, only: [assert: 1] 11 12 ··· 202 203 def random_not_nil(choices) do 203 204 case Enum.reject(choices, &is_nil/1) do 204 205 [] -> :error 205 - remaining -> {:ok, SimServer.deterministic_random(remaining)} 206 + remaining -> {:ok, Enum.random(remaining)} 206 207 end 207 208 end 208 209 ··· 394 395 case fun.() do 395 396 {:error, :read_version_too_new} -> 396 397 delay = floor(Float.pow(1.6, attempt + 1)) 397 - SimServer.sleep(delay) 398 + SimProcess.sleep(delay) 398 399 too_new_backoff(fun, attempt + 1) 399 400 result -> 400 401 result ··· 412 413 case fun.(acc) do 413 414 {:cont, acc} -> 414 415 delay = Integer.pow(2, attempt + 1) 415 - SimServer.sleep(delay) 416 + SimProcess.sleep(delay) 416 417 backoff(acc, fun, count, attempt + 1) 417 418 {:halt, acc} -> 418 419 acc ··· 431 432 inc_stat(acc, k, v) 432 433 end) 433 434 end) 435 + end 436 + 437 + @spec current_time :: integer 438 + def current_time do 439 + Trinity.SimSystem.monotonic_time(:microsecond) 434 440 end 435 441 436 442 @doc """
+12 -26
lib/workloads.ex
··· 1 1 defmodule Hobbes.Workloads do 2 2 require Logger 3 3 4 - alias Hobbes.Construct.{SimServer, SimUtils, Scheduler} 4 + alias Trinity.{Sim, SimProcess, SimServer} 5 5 alias Hobbes.Structs.Cluster 6 6 alias Hobbes.Servers.{Coordinator, Manager} 7 7 ··· 10 10 end 11 11 12 12 defmodule Runner do 13 - use Hobbes.Construct.SimServer 13 + use GenServer 14 + alias Trinity.SimServer 14 15 15 16 defmodule State do 16 17 @enforce_keys [:workload_module, :opts, :context, :results] ··· 67 68 results = 68 69 Enum.map(seeds, fn seed -> 69 70 Task.Supervisor.async_nolink(supervisor, fn -> 70 - run_one(seed, workloads, opts) 71 + # TODO: only run in sim if opts[:simulated] 72 + Sim.run_simulation(fn -> 73 + run_one(seed, workloads, opts) 74 + end, seed: seed) 71 75 end) 72 76 end) 73 77 |> Task.yield_many(:infinity) ··· 134 138 :ok 135 139 end 136 140 137 - def run_one(seed, workloads, opts \\ []) do 138 - scheduler_pid = 139 - if opts[:simulated] do 140 - {:ok, pid} = SimServer.start_scheduler(seed) 141 - pid 142 - end 143 - 144 - {cluster_opts, opts} = Keyword.pop(opts, :cluster_opts, []) 141 + def run_one(_seed, workloads, opts \\ []) do 142 + {cluster_opts, _opts} = Keyword.pop(opts, :cluster_opts, []) 145 143 146 144 {:ok, coordinator_pids} = Hobbes.start_cluster(cluster_opts) 147 - SimServer.sleep(2_000) 145 + SimProcess.sleep(2_000) 148 146 {:ok, {manager_pid, _gen}} = Coordinator.get_manager(hd(coordinator_pids)) 149 147 {:ok, %Cluster{} = cluster} = Manager.get_cluster(manager_pid) 150 148 ··· 167 165 """ 168 166 end) 169 167 |> Enum.join("\n\n") 170 - 171 - log_text = 172 - if scheduler_pid do 173 - Scheduler.get_log(scheduler_pid) |> SimUtils.build_log_message(full: opts[:full_log]) 174 - else 175 - "" 176 - end 177 - 178 - message = """ 179 - #{result_logs} 180 - 181 - #{log_text}\ 182 - """ 168 + dbg result_logs 183 169 184 - {:ok, %{message: message}} 170 + {:ok, %{message: "done"}} 185 171 end 186 172 187 173 defp indent(string, spaces \\ 2) when is_binary(string) and is_integer(spaces) do
+8 -6
lib/workloads/coordinator_read_write.ex
··· 1 1 defmodule Hobbes.Workloads.CoordinatorReadWrite do 2 - alias Hobbes.Construct.SimServer 3 2 alias Hobbes.Structs.Cluster 3 + 4 + alias Trinity.{SimProcess, SimServer} 4 5 5 6 import Hobbes.Utils 6 7 import Hobbes.Workloads ··· 8 9 @behaviour Hobbes.Workloads.Workload 9 10 10 11 defmodule Client do 11 - use Hobbes.Construct.SimServer 12 + use GenServer 13 + alias Trinity.{SimProcess, SimServer} 12 14 13 15 alias Hobbes.Servers.Coordinator 14 16 import Hobbes.Utils ··· 42 44 stopped: false, 43 45 } 44 46 45 - SimServer.send_after self(), :tick, state.tick_ms 47 + SimProcess.send_after self(), :tick, state.tick_ms 46 48 {:ok, state} 47 49 end 48 50 ··· 58 60 def handle_info(:tick, %State{} = state) do 59 61 state = tick(state) 60 62 61 - SimServer.send_after self(), :tick, state.tick_ms 63 + SimProcess.send_after self(), :tick, state.tick_ms 62 64 {:noreply, state} 63 65 end 64 66 ··· 136 138 pid 137 139 end) 138 140 139 - SimServer.sleep(duration_ms) 141 + SimProcess.sleep(duration_ms) 140 142 141 143 client_stats = 142 144 clients 143 145 |> Enum.map(&SimServer.send_request(&1, :stop)) 144 - |> Enum.map(&SimServer.receive_response/1) 146 + |> Enum.map(&SimServer.receive_response(&1, 5000)) 145 147 |> Enum.map(fn {:reply, reply} -> reply end) 146 148 147 149 cols = [:write_count, :timeout, :not_primary]
+12 -9
lib/workloads/cycle.ex
··· 20 20 alias Hobbes.Transaction 21 21 alias Hobbes.Structs.Cluster 22 22 23 - alias Hobbes.Construct.SimServer 23 + alias Trinity.{SimProcess, SimServer} 24 24 import Hobbes.Workloads 25 25 26 26 @behaviour Hobbes.Workloads.Workload 27 27 28 28 defmodule Client do 29 - use Hobbes.Construct.SimServer 29 + use GenServer 30 + alias Trinity.{SimProcess, SimServer} 31 + 32 + import Hobbes.Utils, only: [current_time: 0] 30 33 31 34 defmodule State do 32 35 @enforce_keys [:cluster, :count, :tick_ms, :stopped, :stats, :commit_latencies] ··· 38 41 end 39 42 40 43 def init(%{cluster: %Cluster{} = cluster, count: count, tick_ms: tick_ms}) when is_integer(count) do 41 - SimServer.send_after(self(), :tick, 0) 44 + SimProcess.send_after(self(), :tick, 0) 42 45 43 46 {:ok, %State{ 44 47 cluster: cluster, ··· 62 65 63 66 def handle_info(:tick, %State{} = state) do 64 67 state = work(state) 65 - SimServer.send_after(self(), :tick, state.tick_ms) 68 + SimProcess.send_after(self(), :tick, state.tick_ms) 66 69 {:noreply, state} 67 70 end 68 71 69 72 defp work(%State{} = state) do 70 73 state = inc_stat(state, :swaps) 71 74 72 - start_time = SimServer.current_time() 75 + start_time = current_time() 73 76 74 - k1 = "key" <> pad(SimServer.deterministic_random(0..(state.count - 1))) 77 + k1 = "key" <> pad(Enum.random(0..(state.count - 1))) 75 78 with {:ok, txn} <- Transaction.new(state.cluster), 76 79 {:ok, {v2, txn}} <- Transaction.read(txn, k1), k2 = "key" <> v2, 77 80 {:ok, {v3, txn}} <- Transaction.read(txn, k2), k3 = "key" <> v3, ··· 82 85 txn = txn |> Transaction.clear(k1) |> Transaction.write([{k1, v3}, {k3, v2}, {k2, v4}]), 83 86 84 87 # Random delay between reads/commit for additional concurrency 85 - SimServer.sleep(SimServer.deterministic_random(100..300)), 88 + SimProcess.sleep(Enum.random(100..300)), 86 89 {:ok, txn} <- Transaction.commit(txn) 87 90 do 88 - duration = SimServer.current_time() - start_time 91 + duration = current_time() - start_time 89 92 90 93 case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster, txn.commit_version) do 91 94 {:ok, _pairs} -> :noop ··· 145 148 pid 146 149 end) 147 150 148 - SimServer.sleep(duration_ms) 151 + SimProcess.sleep(duration_ms) 149 152 150 153 client_results = 151 154 clients
+9 -7
lib/workloads/kill_servers.ex
··· 1 1 defmodule Hobbes.Workloads.KillServers do 2 - alias Hobbes.Construct.SimServer 3 - 4 2 alias Hobbes.Structs.{Cluster, Server} 3 + 4 + alias Trinity.SimProcess 5 5 6 6 import Hobbes.Utils 7 7 ··· 12 12 duration_ms = Keyword.get(opts, :duration_ms, 0) 13 13 server_type = Keyword.get(opts, :server_type, Hobbes.Servers.TLog) 14 14 15 - SimServer.sleep(delay_ms) 15 + SimProcess.sleep(delay_ms) 16 16 17 - kill_pid = 17 + _kill_pid = 18 18 case server_type do 19 19 Hobbes.Servers.Coordinator -> 20 20 hd(cluster.coordinators) 21 21 |> case do 22 22 pid when is_pid(pid) -> pid 23 - name when is_atom(name) -> SimServer.whereis(name) 23 + # TODO 24 + #name when is_atom(name) -> SimServer.whereis(name) 24 25 end 25 26 other -> 26 27 %Server{pid: pid} = get_servers(cluster, other) |> hd() 27 28 pid 28 29 end 29 30 30 - SimServer.exit(kill_pid, :shutdown) 31 + # TODO 32 + #SimServer.exit(kill_pid, :shutdown) 31 33 32 - SimServer.sleep(duration_ms) 34 + SimProcess.sleep(duration_ms) 33 35 34 36 {:ok, ""} 35 37 end
+4 -4
lib/workloads/lock_database.ex
··· 3 3 A test workload that locks and then unlocks the database. 4 4 """ 5 5 6 - alias Hobbes.Construct.SimServer 7 - 8 6 alias Hobbes.Transaction 9 7 alias Hobbes.Structs.Cluster 8 + 9 + alias Trinity.SimProcess 10 10 11 11 @behaviour Hobbes.Workloads.Workload 12 12 ··· 19 19 delay_ms = Keyword.get(opts, :delay_ms, 10_000) 20 20 lock_duration_ms = Keyword.get(opts, :lock_duration_ms, 10_000) 21 21 22 - SimServer.sleep(delay_ms) 22 + SimProcess.sleep(delay_ms) 23 23 24 24 {:ok, _txn} = do_write(cluster, "\xFF/lock", "true") 25 25 {:error, {:database_locked, _txn}} = do_write(cluster, "foo", "bar") 26 26 27 - SimServer.sleep(lock_duration_ms) 27 + SimProcess.sleep(lock_duration_ms) 28 28 29 29 {:ok, _txn} = do_write(cluster, "\xFF/lock", "false") 30 30 {:ok, _txn} = do_write(cluster, "baz", "foo")
+8 -7
lib/workloads/model.ex
··· 3 3 alias Hobbes.Transaction.TxnState 4 4 alias Hobbes.Structs.Cluster 5 5 6 - alias Hobbes.Construct.SimServer 7 - 8 6 alias Hobbes.Workloads.Model.DatabaseModel 9 7 alias Hobbes.Workloads.Model.DatabaseModel.HistoryTxn 8 + 9 + alias Trinity.{SimProcess, SimServer} 10 10 11 11 @behaviour Hobbes.Workloads.Workload 12 12 13 13 defmodule Client do 14 - use Hobbes.Construct.SimServer 14 + use GenServer 15 + alias Trinity.{SimProcess, SimServer} 15 16 16 17 import Hobbes.Utils 17 18 ··· 35 36 history: [], 36 37 stopped: false, 37 38 } 38 - SimServer.send self(), :tick 39 + SimProcess.send self(), :tick 39 40 {:ok, state} 40 41 end 41 42 ··· 47 48 48 49 def handle_info(:tick, %State{} = state) do 49 50 state = tick(state) 50 - SimServer.send_after self(), :tick, state.tick_ms 51 + SimProcess.send_after self(), :tick, state.tick_ms 51 52 {:noreply, state} 52 53 end 53 54 ··· 62 63 63 64 # Ensure some transactions receive :transaction_too_old 64 65 if Enum.random(1..10) == 1 do 65 - SimServer.sleep(6_000) 66 + SimProcess.sleep(6_000) 66 67 end 67 68 68 69 case Transaction.commit(txn) do ··· 184 185 pid 185 186 end) 186 187 187 - SimServer.sleep(duration_ms) 188 + SimProcess.sleep(duration_ms) 188 189 189 190 results = 190 191 clients
+13 -11
lib/workloads/read_write.ex
··· 28 28 29 29 alias Hobbes.Transaction 30 30 alias Hobbes.Structs.{Cluster, Server} 31 - alias Hobbes.Construct.SimServer 31 + 32 + alias Trinity.{SimProcess, SimServer} 32 33 33 34 import Hobbes.Utils 34 35 import Hobbes.Workloads 35 36 36 37 defmodule Client do 37 - use Hobbes.Construct.SimServer 38 + use GenServer 39 + alias Trinity.{SimProcess, SimServer} 38 40 39 41 defmodule State do 40 42 @enforce_keys [ ··· 54 56 end 55 57 56 58 def init(%{cluster: %Cluster{} = cluster, id: id, tick_ms: tick_ms}) do 57 - SimServer.send_after(self(), :tick, tick_ms) 59 + SimProcess.send_after(self(), :tick, tick_ms) 58 60 59 61 {:ok, %State{ 60 62 cluster: cluster, ··· 71 73 72 74 def handle_info(:tick, state) do 73 75 state = work(state) 74 - SimServer.send_after(self(), :tick, state.tick_ms) 76 + SimProcess.send_after(self(), :tick, state.tick_ms) 75 77 {:noreply, state} 76 78 end 77 79 ··· 87 89 defp check_reads(txn, %State{write_count: 0} = state), do: {txn, state} 88 90 89 91 defp check_reads(%Transaction.TxnState{} = txn, %State{} = state) do 90 - num_reads = SimServer.deterministic_random(1..10) 92 + num_reads = Enum.random(1..10) 91 93 92 94 read_keys = Enum.map(1..num_reads, fn _i -> 93 - write_i = SimServer.deterministic_random(0..(state.write_count - 1)) 95 + write_i = Enum.random(0..(state.write_count - 1)) 94 96 {write_i, key_for(state.id, write_i)} 95 97 end) 96 98 ··· 114 116 end 115 117 116 118 defp write_keys(%Transaction.TxnState{} = txn, %State{} = state) do 117 - writes = SimServer.deterministic_random(1..10) 119 + writes = Enum.random(1..10) 118 120 write_range = state.write_count..(state.write_count + writes - 1) 119 121 120 122 txn = Enum.reduce(write_range, txn, fn i, txn -> ··· 139 141 profiling = Keyword.get(opts, :profiling, false) 140 142 check = Keyword.get(opts, :check, true) 141 143 142 - start_time = SimServer.current_time() 144 + start_time = current_time() 143 145 144 146 clients = Enum.map(1..num_clients, fn id -> 145 147 {:ok, pid} = Client.start_link(%{cluster: cluster, id: id, tick_ms: client_tick_ms}) 146 148 pid 147 149 end) 148 150 149 - SimServer.sleep(duration_ms) 151 + SimProcess.sleep(duration_ms) 150 152 151 153 client_stats = 152 154 clients 153 155 |> Enum.map(&SimServer.send_request(&1, :stop)) 154 - |> Enum.map(&SimServer.receive_response(&1)) 156 + |> Enum.map(&SimServer.receive_response(&1, 5000)) 155 157 |> Enum.map(fn {:reply, reply} -> reply end) 156 158 157 - end_time = SimServer.current_time() 159 + end_time = current_time() 158 160 duration_s = (end_time - start_time) / 1_000_000 159 161 160 162 total_stats = sum_stats(client_stats)
+12 -7
lib/workloads/restart_nodes.ex
··· 1 1 defmodule Hobbes.Workloads.RestartNodes do 2 - alias Hobbes.Construct.SimServer 2 + alias Trinity.SimProcess 3 3 4 4 import Hobbes.Workloads 5 + import Hobbes.Utils 5 6 6 7 @behaviour Hobbes.Workloads.Workload 7 8 8 - defp restart_random_node(restart_delay_ms) do 9 - nodes = SimServer.list_nodes() 9 + defp restart_random_node(_restart_delay_ms) do 10 + # TODO 11 + #nodes = SimServer.list_nodes() 12 + nodes = [] 10 13 node = Enum.random(nodes) 11 14 12 - result = SimServer.restart_node(node, restart_delay_ms) 15 + # TODO 16 + #result = SimServer.restart_node(node, restart_delay_ms) 17 + result = nil 13 18 {node, result} 14 19 end 15 20 ··· 19 24 tick_ms = Keyword.get(opts, :tick_ms, 1000) 20 25 count = Keyword.get(opts, :count, 1) 21 26 22 - SimServer.sleep(delay_ms) 27 + SimProcess.sleep(delay_ms) 23 28 24 29 results = 25 30 1..count 26 31 |> Enum.map(fn _i -> 27 32 {node, result} = restart_random_node(restart_delay_ms) 28 33 29 - time = SimServer.current_time() 30 - SimServer.sleep(tick_ms) 34 + time = current_time() 35 + SimProcess.sleep(tick_ms) 31 36 32 37 %{ 33 38 timestamp: time,
+10 -8
lib/workloads/shard_move.ex
··· 5 5 6 6 alias Hobbes.Structs.{Cluster, Server} 7 7 8 - alias Hobbes.Construct.SimServer 9 8 alias Hobbes.Servers.Distributor 10 9 alias Hobbes.Transaction 11 10 alias Hobbes.Encoding.Keyset 11 + 12 + alias Trinity.{SimProcess, SimServer} 12 13 13 14 import Hobbes.Utils 14 15 import Hobbes.Workloads ··· 16 17 @behaviour Hobbes.Workloads.Workload 17 18 18 19 defmodule Client do 19 - use Hobbes.Construct.SimServer 20 + use GenServer 21 + alias Trinity.{SimProcess, SimServer} 20 22 import ExUnit.Assertions, only: [assert: 1] 21 23 22 24 @tick_ms 100 ··· 31 33 def start_link(%Cluster{} = cluster, opts), do: SimServer.start_link(__MODULE__, {cluster, opts}) 32 34 33 35 def init({cluster, _opts}) do 34 - SimServer.send_after(self(), :tick, 0) 36 + SimProcess.send_after(self(), :tick, 0) 35 37 {:ok, %State{cluster: cluster}} 36 38 end 37 39 ··· 43 45 44 46 def handle_info(:tick, %State{} = state) do 45 47 state = tick(state) 46 - SimServer.send_after(self(), :tick, @tick_ms) 48 + SimProcess.send_after(self(), :tick, @tick_ms) 47 49 {:noreply, state} 48 50 end 49 51 ··· 55 57 tags = Enum.map(tag_pairs, fn {server_tags_prefix() <> tag, _servers} -> String.to_integer(tag) end) |> Enum.sort() 56 58 shards = Enum.map(shard_pairs, fn {key_servers_prefix() <> shard, _servers} -> shard end) 57 59 58 - shard_to_move = SimServer.deterministic_random(shards) 60 + shard_to_move = Enum.random(shards) 59 61 60 62 # TODO: use actual teams 61 - random_team = SimServer.deterministic_random(0..(div(length(tags), 3) - 1)) 63 + random_team = Enum.random(0..(div(length(tags), 3) - 1)) 62 64 to_servers = [ 63 65 Enum.at(tags, (random_team * 3) + 0), 64 66 Enum.at(tags, (random_team * 3) + 1), ··· 87 89 num_clients = Keyword.get(opts, :clients, 1) 88 90 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 89 91 90 - SimServer.sleep(delay_ms) 92 + SimProcess.sleep(delay_ms) 91 93 clients = Enum.map(1..num_clients, fn _i -> 92 94 {:ok, client} = Client.start_link(cluster, []) 93 95 client 94 96 end) 95 97 96 - SimServer.sleep(duration_ms) 98 + SimProcess.sleep(duration_ms) 97 99 98 100 _client_results = 99 101 clients