···7474- [X] Use a single clock source in Scheduler (above optimization slowed per-process clock progression)
7575- [X] Optimization: only wake a process when it receives the message it's waiting for
7676- [X] Add timeouts to SimServer calls
7777-- [ ] Optimize timeouts to avoid filling up queue (very slow)
7777+- [X] Optimize timeouts to avoid filling up queue (very slow)
787879798080### Testing
+60-36
lib/construct/scheduler.ex
···2323 defstruct @enforce_keys
2424 end
25252626- defmodule Timeout do
2727- @enforce_keys [:pid, :ref]
2828- defstruct @enforce_keys
2929- end
3030-3126 defmodule State do
3227 @type t :: %__MODULE__{
3328 clock: non_neg_integer,
3429 current: {pid, reference} | nil,
3530 queue: map,
3631 awaiting_message: %{pid => %Resume{}},
3232+ timeouts: [{non_neg_integer, pid}],
3733 log_server_pid: pid,
3834 resumes_without_send: non_neg_integer,
3935 }
···4339 current: nil,
4440 queue: %{},
4541 awaiting_message: %{},
4242+ timeouts: [],
4343+4644 log_server_pid: nil,
4745 resumes_without_send: 0,
4846 ]
···175173 end
176174177175 def handle_cast({:await_message, pid, check_fun, timeout_ms, %Resume{} = resume}, state) when is_pid(pid) and is_function(check_fun) do
178178- ref = make_ref()
179179- state = %State{state | awaiting_message: Map.put(state.awaiting_message, pid, {ref, check_fun, resume})}
180180-181181- state =
176176+ # Add new timeout if needed
177177+ timeouts =
182178 case timeout_ms do
183183- :infinity ->
184184- state
185185-186186- timeout_ms when is_integer(timeout_ms) and timeout_ms >= 0 ->
187187- expires_after = state.clock + timeout_ms
188188- queue_task(state, expires_after, %Timeout{pid: pid, ref: ref})
179179+ :infinity -> state.timeouts
180180+ timeout_ms -> state.timeouts ++ [{state.clock + timeout_ms, pid}]
189181 end
182182+183183+ # Add awaiting_message entry and update timeouts
184184+ state = %State{state |
185185+ awaiting_message: Map.put(state.awaiting_message, pid, {check_fun, resume}),
186186+ timeouts: timeouts,
187187+ }
190188191189 {:noreply, state}
192190 end
···196194 nil -> :noop
197195 {^pid, mref} -> Process.demonitor(mref, [:flush])
198196 end
199199- {:noreply, %State{state | current: nil} |> perform_next_task()}
197197+ {:noreply, %State{state | current: nil} |> do_next()}
200198 end
201199202200 def handle_info({:DOWN, mref, :process, pid, _reason} = message, %State{} = state) do
203201 case state.current do
204202 {^pid, ^mref} ->
205205- {:noreply, %State{state | current: nil} |> perform_next_task()}
203203+ state = purge_timeouts(state, pid)
204204+ {:noreply, %State{state | current: nil} |> do_next()}
206205207206 _ ->
208207 raise """
···220219 %State{state | queue: queue}
221220 end
222221222222+ defp do_next(%State{} = state) do
223223+ # Try to perform a timeout first, and then perform
224224+ # a task if there are no expired timeouts
225225+ case maybe_perform_timeout(state) do
226226+ :noop ->
227227+ perform_next_task(state)
228228+229229+ {:performed, %State{} = state} ->
230230+ state
231231+ end
232232+ end
233233+234234+ defp maybe_perform_timeout(%State{timeouts: timeouts} = state) do
235235+ case Enum.find(timeouts, fn {expires, _pid} -> expires < state.clock end) do
236236+ nil ->
237237+ :noop
238238+239239+ {_expires, timed_out_pid} ->
240240+ {_check_fun, %Resume{} = resume} = Map.fetch!(state.awaiting_message, timed_out_pid)
241241+242242+ # Resume the timed out process with a timeout message
243243+ # The process will then call exit(:timeout) (see await_resume/1)
244244+ state = monitor_current(state, resume.pid)
245245+ send resume.pid, {:timeout, resume.ref}
246246+247247+ # Clean up the timeout
248248+ state = purge_timeouts(state, timed_out_pid)
249249+ {:performed, state}
250250+ end
251251+ end
252252+223253 defp perform_next_task(%State{queue: queue} = state) when map_size(queue) == 0 do
224254 raise """
225255 Attempted to call `resume_next/1` but the queue is empty! Possible deadlock?
···281311 log(state, {:send, time, send.to_pid, send.message})
282312 state = send_message(state, send)
283313284284- perform_next_task(state)
285285- end
286286-287287- defp perform(%State{} = state, _time, %Timeout{pid: pid, ref: ref}) do
288288- case Map.get(state.awaiting_message, pid) do
289289- # The process is still waiting, so we trigger the timeout
290290- {^ref, _check_fun, %Resume{} = resume} ->
291291- state = monitor_current(state, pid)
292292- send resume.pid, {:timeout, resume.ref}
293293-294294- state
295295-296296- # The process is either not waiting anymore, or waiting for
297297- # a different Timeout
298298- _ ->
299299- perform_next_task(state)
300300- end
314314+ do_next(state)
301315 end
302316303317 defp perform(%State{} = state, _time, %Resume{} = resume) do
···322336 nil ->
323337 state
324338325325- {_ref, check_fun, %Resume{} = resume} when is_function(check_fun) ->
339339+ {check_fun, %Resume{} = resume} when is_function(check_fun) ->
340340+ # If check_fun returns true for the message then it matches what
341341+ # the process is waiting for and we can resume it
326342 case check_fun.(send.message) do
327343 false ->
328344 state
329345330346 true ->
331331- state = %State{state | awaiting_message: Map.delete(state.awaiting_message, send.to_pid)}
347347+ # Remove the awaiting_message and timeouts entries
348348+ state =
349349+ %State{state | awaiting_message: Map.delete(state.awaiting_message, send.to_pid)}
350350+ |> purge_timeouts(send.to_pid)
351351+ # Queue a resume for the process
332352 queue_task(state, state.clock, resume)
333353 end
334354 end
355355+ end
356356+357357+ defp purge_timeouts(%State{} = state, pid) when is_pid(pid) do
358358+ %State{state | timeouts: Enum.reject(state.timeouts, &(elem(&1, 1) == pid))}
335359 end
336360337361 defp log(%State{} = state, event) do