this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add process aliases to simulation and use for SimServer calls

+91 -20
+30
lib/construct/scheduler.ex
··· 129 129 ref 130 130 end 131 131 132 + @spec alias(pid) :: reference 133 + def alias(scheduler) do 134 + GenServer.call(scheduler, {:alias, self()}) 135 + end 136 + 137 + @spec unalias(pid, reference) :: boolean 138 + def unalias(scheduler, alias) when is_reference(alias) do 139 + GenServer.call(scheduler, {:unalias, self(), alias}) 140 + end 141 + 132 142 @spec set_process_flag(pid, :trap_exit, boolean) :: :ok 133 143 def set_process_flag(scheduler, :trap_exit = flag, value) when is_boolean(value) do 134 144 {:ok, old_value} = GenServer.call(scheduler, {:set_process_flag, self(), flag, value}) ··· 176 186 177 187 state = set_current_process(state, initial_pid) 178 188 {:ok, state} 189 + end 190 + 191 + def handle_call({:alias, pid}, _from, %State{} = state) do 192 + alias = make_ref() 193 + ProcStore.add_alias(state.proc_store, pid, alias) 194 + {:reply, alias, state} 195 + end 196 + 197 + def handle_call({:unalias, pid, alias}, _from, %State{} = state) do 198 + case ProcStore.remove_alias(state.proc_store, pid, alias) do 199 + :ok -> {:reply, true, state} 200 + {:error, _err} -> {:reply, false, state} 201 + end 179 202 end 180 203 181 204 def handle_call({:set_process_flag, pid, flag, value}, _from, %State{} = state) do ··· 440 463 {:ok, pid} -> pid 441 464 {:error, :not_found} -> nil 442 465 end 466 + 467 + alias when is_reference(alias) -> 468 + case ProcStore.resolve_alias(state.proc_store, alias) do 469 + {:ok, pid} -> pid 470 + :error -> nil 471 + end 472 + 443 473 pid when is_pid(pid) -> pid 444 474 end 445 475
+30 -1
lib/construct/scheduler/proc_store.ex
··· 28 28 proc_table: :ets.table, 29 29 } 30 30 31 - @enforce_keys [:proc_table] 31 + @enforce_keys [:proc_table, :alias_table] 32 32 defstruct @enforce_keys 33 33 34 34 def new do 35 35 %ProcStore{ 36 36 proc_table: :ets.new(:proc_table, [:set, :private]), 37 + alias_table: :ets.new(:alias_table, [:set, :private]), 37 38 } 38 39 end 39 40 ··· 171 172 :ets.insert(ps.proc_table, {pid, %{proc_state | monitors: monitors}}) 172 173 :ets.insert(ps.proc_table, {target_pid, %{target_state | monitored_by: monitored_by}}) 173 174 :ok 175 + end 176 + 177 + @spec add_alias(t, pid, reference) :: :ok 178 + def add_alias(%ProcStore{} = ps, pid, alias) when is_pid(pid) and is_reference(alias) do 179 + :ets.insert(ps.alias_table, {alias, pid}) 180 + :ok 181 + end 182 + 183 + @spec remove_alias(t, pid, reference) :: :ok | {:error, :wrong_process | :not_found} 184 + def remove_alias(%ProcStore{} = ps, pid, alias) when is_pid(pid) and is_reference(alias) do 185 + case resolve_alias(ps, alias) do 186 + {:ok, ^pid} -> 187 + :ets.delete(ps.alias_table, alias) 188 + :ok 189 + 190 + {:ok, _other} -> {:error, :wrong_process} 191 + :error -> {:error, :not_found} 192 + end 193 + end 194 + 195 + @spec resolve_alias(t, reference) :: {:ok, pid} | :error 196 + def resolve_alias(%ProcStore{} = ps, alias) when is_reference(alias) do 197 + case :ets.lookup(ps.alias_table, alias) do 198 + [{^alias, pid}] when is_pid(pid) -> 199 + {:ok, pid} 200 + [] -> 201 + :error 202 + end 174 203 end 175 204 176 205 @spec set_flag(t, pid, :trap_exit, boolean) :: {:ok, boolean}
+2 -2
lib/construct/sim_internal.ex
··· 45 45 sim_loop(module, state) 46 46 end 47 47 48 - defp dispatch({:"$sim_call", {client_pid, ref} = from, request}, module, state) do 48 + defp dispatch({:"$sim_call", {_pid, [:alias | alias_ref] = tag} = from, request}, module, state) do 49 49 try do 50 50 module.handle_call(request, from, state) 51 51 catch ··· 55 55 end 56 56 |> case do 57 57 {:reply, response, state} -> 58 - SimServer.send client_pid, {ref, response} 58 + SimServer.send alias_ref, {tag, response} 59 59 state 60 60 {:noreply, state} -> 61 61 state
+5
lib/construct/sim_log.ex
··· 96 96 end 97 97 end 98 98 99 + defp homogenize(%Known{} = known, [:alias | ref]) when is_reference(ref) do 100 + {known, h_ref} = homogenize(known, ref) 101 + {known, [:alias | h_ref]} 102 + end 103 + 99 104 defp homogenize(%Known{} = known, list) when is_list(list) do 100 105 {known, h_list} = Enum.reduce(list, {known, []}, fn element, {known, acc_list} -> 101 106 {k, h_el} = homogenize(known, element)
+24 -11
lib/construct/sim_server.ex
··· 196 196 end 197 197 198 198 defp sim_call(scheduler_pid, server, request, timeout) do 199 - request_id = sim_send_request(scheduler_pid, server, request) 199 + alias_ref = sim_send_request(scheduler_pid, server, request) 200 + 200 201 try do 201 202 yield_receive(scheduler_pid, timeout) do 202 - {^request_id, reply} -> reply 203 + {[:alias | ^alias_ref], reply} -> reply 203 204 end 204 205 catch 205 206 :exit, reason -> 206 - exit({reason, {__MODULE__, :call, [server, request, timeout]}}) 207 + true = Scheduler.unalias(scheduler_pid, alias_ref) 208 + receive do 209 + {[:alias | ^alias_ref], reply} -> reply 210 + after 211 + 0 -> exit({reason, {__MODULE__, :call, [server, request, timeout]}}) 212 + end 207 213 end 208 214 end 209 215 216 + @dialyzer {:no_improper_lists, sim_send_request: 3} 210 217 defp sim_send_request(scheduler_pid, server, request) do 211 - ref = make_ref() 212 - Scheduler.send(scheduler_pid, server, {:"$sim_call", {self(), ref}, request}) 218 + alias_ref = Scheduler.alias(scheduler_pid) 219 + Scheduler.send(scheduler_pid, server, {:"$sim_call", {self(), [:alias | alias_ref]}, request}) 213 220 214 - ref 221 + alias_ref 215 222 end 216 223 217 224 defp sim_receive_response(scheduler_pid, request_id, timeout) when is_reference(request_id) do 225 + alias_ref = request_id 218 226 # TODO: catch timeout exit 219 227 try do 220 228 yield_receive(scheduler_pid, timeout) do 221 - {^request_id, reply} -> {:reply, reply} 229 + {[:alias | ^alias_ref], reply} -> {:reply, reply} 222 230 end 223 231 catch 224 - :exit, _reason -> 225 - :timeout 232 + :exit, :timeout -> 233 + true = Scheduler.unalias(scheduler_pid, alias_ref) 234 + receive do 235 + {[:alias | ^alias_ref], reply} -> {:reply, reply} 236 + after 237 + 0 -> :timeout 238 + end 226 239 end 227 240 end 228 241 ··· 257 270 GenServer.reply(client, response) 258 271 259 272 scheduler_pid when is_pid(scheduler_pid) -> 260 - {client_pid, ref} = client 261 - Scheduler.send(scheduler_pid, client_pid, {ref, response}) 273 + {_client_pid, [:alias | alias_ref] = tag} = client 274 + Scheduler.send(scheduler_pid, alias_ref, {tag, response}) 262 275 :ok 263 276 end 264 277 end
-2
lib/servers/distributor.ex
··· 128 128 end 129 129 end 130 130 131 - def handle_info(_message, %State{} = state), do: {:noreply, state} 132 - 133 131 defp scan_shards(%State{} = state) when state.cluster.status != :normal do 134 132 state 135 133 end
-2
lib/workloads/cycle.ex
··· 66 66 {:noreply, state} 67 67 end 68 68 69 - def handle_info(_message, %State{} = state), do: {:noreply, state} 70 - 71 69 defp work(%State{} = state) do 72 70 state = inc_stat(state, :swaps) 73 71
-2
lib/workloads/shard_move.ex
··· 46 46 {:noreply, state} 47 47 end 48 48 49 - def handle_info(_message, %State{} = state), do: {:noreply, state} 50 - 51 49 defp tick(%State{cluster: cluster} = state) do 52 50 storage_servers = get_servers(cluster, Hobbes.Servers.Storage) 53 51