this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add scheduler

garrison 8af68975 9f7af7bf

+148
+134
lib/trinity/scheduler.ex
··· 1 + defmodule Trinity.Scheduler do 2 + defmodule Simulation do 3 + @enforce_keys [:queue, :now, :supervisor_pid] 4 + defstruct @enforce_keys 5 + end 6 + 7 + defmodule SimulationSupervisor do 8 + use GenServer 9 + 10 + def start_link(%Simulation{} = sim) do 11 + GenServer.start_link(__MODULE__, sim) 12 + end 13 + 14 + def register_self(server) do 15 + GenServer.cast(server, {:register, self()}) 16 + end 17 + 18 + def init(%Simulation{} = sim) do 19 + {:ok, sim} 20 + end 21 + 22 + def handle_cast({:register, pid}, state) do 23 + Process.monitor(pid) 24 + {:noreply, state} 25 + end 26 + 27 + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do 28 + Trinity.Scheduler.perform_next_task(state) 29 + {:noreply, state} 30 + end 31 + end 32 + 33 + @simulation_key :_trinity_simulation 34 + 35 + defp get_sim, do: Process.get(@simulation_key) 36 + defp put_sim(%Simulation{} = sim), do: Process.put(@simulation_key, sim) 37 + 38 + @spec start :: :ok 39 + def start do 40 + sim = %Simulation{ 41 + queue: :ets.new(__MODULE__, [:ordered_set, :public]), 42 + now: :atomics.new(1, signed: false), 43 + supervisor_pid: nil, 44 + } 45 + 46 + {:ok, supervisor_pid} = SimulationSupervisor.start_link(sim) 47 + SimulationSupervisor.register_self(supervisor_pid) 48 + 49 + sim = %{sim | supervisor_pid: supervisor_pid} 50 + put_sim(sim) 51 + 52 + :ok 53 + end 54 + 55 + @spec yield(non_neg_integer) :: :ok 56 + def yield(delay \\ 0) do 57 + %Simulation{queue: queue, now: now} = get_sim() 58 + yield_ref = enqueue_self(queue, now, delay) 59 + 60 + perform_next(queue, now) 61 + 62 + receive do 63 + ^yield_ref -> :ok 64 + end 65 + end 66 + 67 + @spec spawn_and_yield(function) :: pid 68 + def spawn_and_yield(fun) do 69 + %Simulation{queue: queue, now: now} = sim = get_sim() 70 + yield_ref = enqueue_self(queue, now, 0) 71 + 72 + spawned_pid = spawn(fn -> 73 + SimulationSupervisor.register_self(sim.supervisor_pid) 74 + put_sim(sim) 75 + 76 + fun.() 77 + end) 78 + 79 + receive do 80 + ^yield_ref -> spawned_pid 81 + end 82 + end 83 + 84 + defp perform_next(queue, now) do 85 + case pop_next(queue) do 86 + {time, {:resume, pid, ref}} -> 87 + set_now(now, time) 88 + send pid, ref 89 + end 90 + end 91 + 92 + defp pop_next(queue) do 93 + case :ets.next_lookup(queue, {-1, 0}) do 94 + {_, [{{time, _i} = key, entry}]} -> 95 + :ets.delete(queue, key) 96 + {time, entry} 97 + _ -> 98 + raise "Queue empty!" 99 + end 100 + end 101 + 102 + defp enqueue_self(queue, now, delay) do 103 + ref = make_ref() 104 + time = read_now(now) + delay 105 + 106 + entry = {:resume, self(), ref} 107 + 108 + i = case :ets.prev(queue, {time, :infinity}) do 109 + [{^time, prev}] -> prev + 1 110 + _ -> 0 111 + end 112 + :ets.insert(queue, {{time, i}, entry}) 113 + 114 + ref 115 + end 116 + 117 + defp read_now(now), do: :atomics.get(now, 1) 118 + defp set_now(now, time), do: :atomics.put(now, 1, time) 119 + 120 + # Used by SimulationSupervisor 121 + @doc false 122 + def perform_next_task(%Simulation{queue: queue, now: now}) do 123 + perform_next(queue, now) 124 + end 125 + 126 + @doc false 127 + def dump(%Simulation{} = sim) do 128 + %{ 129 + queue: :ets.tab2list(sim.queue), 130 + now: read_now(sim.now), 131 + supervisor_pid: sim.supervisor_pid, 132 + } 133 + end 134 + end
+14
test/trinity_test.exs
··· 1 1 defmodule TrinityTest do 2 2 use ExUnit.Case 3 + 4 + alias Trinity.Scheduler 5 + 6 + test "scheduler" do 7 + Scheduler.start() 8 + 9 + pid1 = Scheduler.spawn_and_yield(fn -> dbg "hello 1" end) 10 + pid2 = Scheduler.spawn_and_yield(fn -> dbg "hello 2" end) 11 + pid3 = Scheduler.spawn_and_yield(fn -> dbg "hello 3" end) 12 + dbg {pid1, pid2, pid3} 13 + 14 + Scheduler.yield(100) 15 + dbg "foo" 16 + end 3 17 end