this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Suspend processes awaiting a message

garrison 3a1f4fdc 44610f45

+127 -24
+117 -21
lib/trinity/scheduler.ex
··· 6 6 queue: :ets.table, 7 7 proc_queue_keys: :ets.table, 8 8 proc_links: :ets.table, 9 + proc_aliases: :ets.table, 10 + 9 11 now: :atomics.atomics_ref, 10 12 supervisor_pid: pid, 11 13 } ··· 14 16 :proc_queue_keys, 15 17 :proc_links, 16 18 :proc_aliases, 19 + 17 20 :now, 18 21 :supervisor_pid, 19 22 ] ··· 22 25 23 26 defmacro simulation_key, do: :_trinity_simulation 24 27 28 + @spec get_sim :: Simulation.t 25 29 defp get_sim, do: Process.get(simulation_key()) 26 30 27 31 def run_simulation(fun) do 28 32 SimulationSupervisor.run_simulation(fun) 29 33 end 30 34 35 + @spec now :: integer 36 + def now do 37 + %{now: now} = get_sim() 38 + read_now(now) 39 + end 40 + 31 41 @spec yield(non_neg_integer) :: :ok 32 42 def yield(delay \\ 0) do 33 43 %Simulation{queue: queue, proc_queue_keys: proc_queue_keys, now: now} = get_sim() 34 - yield_ref = enqueue_self(queue, proc_queue_keys, now, delay) 44 + yield_ref = enqueue_resume(queue, proc_queue_keys, now, delay) 45 + 46 + perform_next(queue, proc_queue_keys, now) 47 + 48 + receive do 49 + ^yield_ref -> :ok 50 + end 51 + end 35 52 36 - perform_next(queue, now) 53 + @spec yield_until_message(non_neg_integer) :: :ok 54 + def yield_until_message(timeout) do 55 + %Simulation{queue: queue, proc_queue_keys: proc_queue_keys, now: now} = get_sim() 56 + yield_ref = enqueue_timeout(queue, proc_queue_keys, now, timeout) 57 + 58 + perform_next(queue, proc_queue_keys, now) 37 59 38 60 receive do 39 61 ^yield_ref -> :ok ··· 50 72 # Enqueue the new process and unsuspend the parent 51 73 # so that it can register any links before yielding back 52 74 # to us (eventually, if there are others in queue at `time=now`) 53 - yield_ref = enqueue_self(sim.queue, sim.proc_queue_keys, sim.now, 0) 75 + yield_ref = enqueue_resume(sim.queue, sim.proc_queue_keys, sim.now, 0) 54 76 send parent_pid, spawn_ref 55 77 receive do 56 78 ^yield_ref -> :noop ··· 94 116 end 95 117 end 96 118 97 - case Trinity.Scheduler.receive_loop(receive_fun, ref, unquote(timeout)) do 119 + start = Trinity.Scheduler.now() 120 + # This will always perform one receive() check synchronously before yielding, which is 121 + # needed to avoid a race where the message is already in the queue so we never unsuspend 122 + # 123 + # TODO: we could perform a yield(0) here to ensure all calls to `receive_yield()` actually yield 124 + case Trinity.Scheduler.receive_loop(receive_fun, ref, unquote(timeout), start) do 98 125 # We re-use the same ref to detect a timeout 99 126 ^ref -> unquote(timeout_value) 100 127 value -> value ··· 103 130 end 104 131 105 132 @doc false 106 - def receive_loop(receive_fun, ref, timeout) do 133 + def receive_loop(receive_fun, ref, timeout, start) do 107 134 case receive_fun.() do 108 135 ^ref -> 109 136 # TODO: timeout 110 - yield(10) 111 - receive_loop(receive_fun, ref, timeout) 137 + now = Trinity.Scheduler.now() 138 + case (now - start) >= timeout do 139 + true -> 140 + ref 141 + 142 + false -> 143 + remaining = timeout_remaining(timeout, now - start) 144 + yield_until_message(remaining) 145 + receive_loop(receive_fun, ref, timeout, start) 146 + end 147 + 112 148 value -> 113 149 value 114 150 end 115 151 end 116 152 153 + defp timeout_remaining(:infinity, _elapsed), do: :infinity 154 + defp timeout_remaining(timeout, elapsed), do: timeout - elapsed 155 + 117 156 @spec add_link(pid) :: :ok 118 157 def add_link(to_pid) do 119 158 # TODO: noproc if `to_pid` has exited ··· 143 182 end 144 183 145 184 def handle_down(%Simulation{} = sim, _pid, :normal) do 146 - perform_next(sim.queue, sim.now) 185 + %{queue: queue, proc_queue_keys: proc_queue_keys} = sim 186 + perform_next(queue, proc_queue_keys, sim.now) 147 187 [] 148 188 end 149 189 150 190 def handle_down(%Simulation{} = sim, pid, reason) do 151 - %Simulation{proc_links: proc_links} = sim 191 + %{queue: queue, proc_queue_keys: proc_queue_keys, proc_links: proc_links} = sim 152 192 linked = gather_linked(pid, proc_links) 153 193 154 194 Enum.each(linked, fn pid -> ··· 156 196 Process.exit(pid, reason) 157 197 end) 158 198 159 - perform_next(sim.queue, sim.now) 199 + perform_next(queue, proc_queue_keys, sim.now) 160 200 linked 161 201 end 162 202 163 - defp perform_next(queue, now) do 164 - case pop_next(queue) do 165 - {time, {:resume, pid, ref}} -> 203 + def handle_sent(dest_ref) when is_reference(dest_ref) do 204 + %{proc_aliases: proc_aliases} = get_sim() 205 + # Look up the alias ref in the aliases table to get its pid 206 + case :ets.lookup(proc_aliases, dest_ref) do 207 + [{_key, dest_pid}] -> handle_sent(dest_pid) 208 + # This alias has no known pid (alias was unaliased), do nothing 209 + [] -> :noop 210 + end 211 + end 212 + 213 + def handle_sent(dest_pid) when is_pid(dest_pid) do 214 + %{queue: queue, proc_queue_keys: proc_queue_keys, now: now} = get_sim() 215 + 216 + case :ets.lookup(proc_queue_keys, dest_pid) do 217 + [{_key, {:timeout_infinity, ref}}] -> 218 + # This is an infinite timeout so we add the dest process to the queue 219 + queue_key = enqueue_event(queue, now, 0, {:resume, dest_pid, ref}) 220 + :ets.insert(proc_queue_keys, {dest_pid, {:resume, queue_key}}) 221 + 222 + [{_key, {:timeout, ref, existing_queue_key}}] -> 223 + # This is a finite timeout so we remove the existing (:timeout) queue entry 224 + :ets.delete(queue, existing_queue_key) 225 + # Then add the dest process back to the queue at now() 226 + queue_key = enqueue_event(queue, now, 0, {:resume, dest_pid, ref}) 227 + :ets.insert(proc_queue_keys, {dest_pid, {:resume, queue_key}}) 228 + 229 + _ -> 230 + # Process is already queued for :resume or unknown (dead) 231 + :noop 232 + end 233 + end 234 + 235 + defp perform_next(queue, proc_queue_keys, now) do 236 + case pop_next(queue, proc_queue_keys) do 237 + {time, {_event, pid, ref}} -> 166 238 set_now(now, time) 167 239 send pid, ref 168 240 end 169 241 end 170 242 171 - defp pop_next(queue) do 243 + defp pop_next(queue, proc_queue_keys) do 172 244 case :ets.next_lookup(queue, {-1, 0}) do 173 245 {_, [{{time, _i} = key, entry}]} -> 174 246 :ets.delete(queue, key) 247 + {_event, pid, _ref} = entry 248 + :ets.delete(proc_queue_keys, pid) 249 + 175 250 {time, entry} 176 251 _ -> 177 252 raise "Queue empty!" 178 253 end 179 254 end 180 255 181 - defp enqueue_self(queue, proc_queue_keys, now, delay) do 256 + defp enqueue_timeout(_queue, proc_queue_keys, _now, :infinity) do 182 257 ref = make_ref() 183 - time = read_now(now) + delay 258 + pid = self() 259 + :ets.insert(proc_queue_keys, {pid, {:timeout_infinity, ref}}) 260 + 261 + ref 262 + end 184 263 264 + defp enqueue_timeout(queue, proc_queue_keys, now, timeout) do 265 + ref = make_ref() 185 266 pid = self() 186 - entry = {:resume, pid, ref} 267 + queue_key = enqueue_event(queue, now, timeout, {:timeout, pid, ref}) 268 + :ets.insert(proc_queue_keys, {pid, {:timeout, ref, queue_key}}) 269 + 270 + ref 271 + end 272 + 273 + defp enqueue_resume(queue, proc_queue_keys, now, delay) do 274 + ref = make_ref() 275 + pid = self() 276 + queue_key = enqueue_event(queue, now, delay, {:resume, pid, ref}) 277 + :ets.insert(proc_queue_keys, {pid, {:resume, queue_key}}) 278 + 279 + ref 280 + end 281 + 282 + defp enqueue_event(queue, now, delay, event) do 283 + time = read_now(now) + delay 187 284 188 285 i = case :ets.prev(queue, {time, :infinity}) do 189 286 {^time, prev} -> prev + 1 190 287 _ -> 0 191 288 end 192 289 queue_key = {time, i} 193 - :ets.insert(queue, {{time, i}, entry}) 194 - :ets.insert(proc_queue_keys, {pid, queue_key}) 290 + :ets.insert(queue, {{time, i}, event}) 195 291 196 - ref 292 + queue_key 197 293 end 198 294 199 295 defp destroy_process(%Simulation{} = sim, pid) do ··· 206 302 destroy_links(proc_links, pid) 207 303 208 304 case :ets.lookup(proc_queue_keys, pid) do 209 - [{^pid, qk}] -> 305 + [{_event, ^pid, qk}] -> 210 306 :ets.delete(queue, qk) 211 307 :ets.delete(proc_queue_keys, pid) 212 308 _ -> :noop
+8 -3
lib/trinity/scheduler/simulation_supervisor.ex
··· 15 15 defstruct @enforce_keys 16 16 end 17 17 18 + @spec run_simulation(function) :: :ok 18 19 def run_simulation(fun) do 19 20 ref = make_ref() 20 21 GenServer.start_link(__MODULE__, %{ ··· 24 25 }) 25 26 26 27 receive do 27 - ^ref -> :ok 28 + {^ref, :normal} -> :ok 29 + {^ref, :shutdown} -> :ok 30 + # Propagate root exit to parent process so tests fail 31 + {^ref, reason} -> exit(reason) 28 32 end 29 33 end 30 34 ··· 41 45 proc_queue_keys: :ets.new(__MODULE__, [:set, :public]), 42 46 proc_links: :ets.new(__MODULE__, [:set, :public]), 43 47 proc_aliases: :ets.new(__MODULE__, [:set, :public]), 48 + 44 49 now: :atomics.new(1, signed: false), 45 50 supervisor_pid: self(), 46 51 } ··· 78 83 79 84 ^root_pid -> 80 85 # The root has died, end the simulation 81 - send parent_pid, parent_ref 86 + send parent_pid, {parent_ref, reason} 82 87 {:noreply, state} 83 88 84 89 _ -> ··· 92 97 93 98 # If the root was killed, end the simulation 94 99 case pid == root_pid do 95 - true -> send parent_pid, parent_ref 100 + true -> send parent_pid, {parent_ref, reason} 96 101 _ -> :noop 97 102 end 98 103 end)
+2
lib/trinity/sim_process.ex
··· 1 1 defmodule Trinity.SimProcess do 2 + alias Trinity.Scheduler 2 3 alias Trinity.Scheduler.Simulation 3 4 4 5 import Trinity.Scheduler, only: [simulation_key: 0] ··· 7 8 defp get_sim, do: Process.get(simulation_key()) 8 9 9 10 def send(dest, message) do 11 + Scheduler.handle_sent(dest) 10 12 Kernel.send(dest, message) 11 13 end 12 14