this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Optimize queue

garrison 23299e9e e79ce2bd

+18 -21
+17 -20
lib/trinity/scheduler.ex
··· 64 64 @spec now :: integer 65 65 def now do 66 66 %{now: now} = get_sim() 67 - read_now(now) 67 + :atomics.get(now, 1) 68 68 end 69 69 70 70 @spec yield(non_neg_integer) :: :ok ··· 163 163 end 164 164 165 165 start = Trinity.Scheduler.now() 166 + # Yield to ensure some progress is made 167 + # (otherwise it's too easy to write processes that block forever) 168 + # TODO: we could instead yield conditionally *after* the receive has completed, 169 + # if no time has passed 170 + Trinity.Scheduler.yield(1) 166 171 # This will always perform one receive() check synchronously before yielding, which is 167 172 # needed to avoid a race where the message is already in the queue so we never unsuspend 168 - # 169 - # TODO: we could perform a yield(0) here to ensure all calls to `receive_yield()` actually yield 170 - Trinity.Scheduler.yield(1) 171 173 case Trinity.Scheduler.receive_loop(receive_fun, ref, unquote(timeout), start) do 172 174 # We re-use the same ref to detect a timeout 173 175 ^ref -> unquote(timeout_value) ··· 326 328 case pop_next(queue) do 327 329 {time, {:timeout, pid, ref}} -> 328 330 :ets.delete(proc_queue_keys, pid) 329 - set_now(now, time) 331 + :atomics.put(now, 1, time) 330 332 send pid, ref 331 333 {time, {:resume, pid, ref}} -> 332 334 :ets.delete(proc_queue_keys, pid) 333 - set_now(now, time) 335 + :atomics.put(now, 1, time) 334 336 send pid, ref 335 337 {time, {:send_after, dest, message}} -> 336 - set_now(now, time) 338 + :atomics.put(now, 1, time) 337 339 SimProcess.perform_send(dest, message) 338 340 perform_next(queue, proc_queue_keys, now) 339 341 end ··· 341 343 342 344 defp pop_next(queue) do 343 345 case :ets.next_lookup(queue, {-1, 0}) do 344 - {_, [{{time, _i} = key, entry}]} -> 346 + {_, [{[time | _i] = key, entry}]} -> 345 347 :ets.delete(queue, key) 346 348 {time, entry} 347 349 _ -> ··· 375 377 ref 376 378 end 377 379 380 + @dialyzer {:no_improper_lists, enqueue_event: 4} 378 381 defp enqueue_event(queue, now, delay, event) do 379 - time = read_now(now) + delay 380 - 381 - i = case :ets.prev(queue, {time, :infinity}) do 382 - {^time, prev} -> prev + 1 383 - _ -> 0 384 - end 385 - queue_key = {time, i} 386 - :ets.insert(queue, {{time, i}, event}) 382 + time = :atomics.get(now, 1) + delay 383 + i = :atomics.add_get(now, 2, 1) 384 + # Improper list is smaller than a tuple 385 + queue_key = [time | i] 386 + :ets.insert(queue, {queue_key, event}) 387 387 388 388 queue_key 389 389 end ··· 454 454 :ets.insert(node_procs, {node, List.delete(existing_procs, pid)}) 455 455 end 456 456 457 - defp read_now(now), do: :atomics.get(now, 1) 458 - defp set_now(now, time), do: :atomics.put(now, 1, time) 459 - 460 457 @doc false 461 458 def dump do 462 459 sim = get_sim() ··· 468 465 proc_nodes: :ets.tab2list(sim.proc_nodes), 469 466 node_procs: :ets.tab2list(sim.node_procs), 470 467 471 - now: read_now(sim.now), 468 + now: :atomics.get(sim.now, 1), 472 469 supervisor_pid: sim.supervisor_pid, 473 470 } 474 471 end
+1 -1
lib/trinity/scheduler/simulation_supervisor.ex
··· 58 58 log: :ets.new(__MODULE__, [:ordered_set, :public]), 59 59 log_atomic: :atomics.new(2, signed: false), 60 60 61 - now: :atomics.new(1, signed: false), 61 + now: :atomics.new(2, signed: false), 62 62 supervisor_pid: self(), 63 63 } 64 64 Process.put(simulation_key(), sim)