···85858686 def handle_cast({:pop, tag, up_to_version}, state)
8787 when is_integer(tag) and is_integer(up_to_version) and up_to_version >= 0 do
8888- tq = TaggedQueue.pop(state.tagged_queue, tag, up_to_version)
8989- {:noreply, %State{state | tagged_queue: tq}}
8888+ :ok = TaggedQueue.pop(state.tagged_queue, tag, up_to_version)
8989+ {:noreply, state}
9090 end
91919292 @spec maybe_write_next(%State{}) :: %State{}
···106106107107 @spec do_write_batch(%State{}, term, %LogBatch{}) :: %State{}
108108 defp do_write_batch(%State{} = state, from, %LogBatch{} = batch) do
109109- tagged_queue =
110110- batch.tagged_mutations
111111- |> Enum.reduce(%{}, fn {tags, mutation}, acc ->
112112- Enum.reduce(tags, acc, fn tag, acc ->
113113- case acc[tag] do
114114- nil -> Map.put(acc, tag, [mutation])
115115- # Note: this reverses mutation order, which would violate correctness
116116- # if we did not restore the order below
117117- existing -> Map.put(acc, tag, [mutation | existing])
118118- end
119119- end)
109109+ batch.tagged_mutations
110110+ |> Enum.reduce(%{}, fn {tags, mutation}, acc ->
111111+ Enum.reduce(tags, acc, fn tag, acc ->
112112+ case acc[tag] do
113113+ nil -> Map.put(acc, tag, [mutation])
114114+ # Note: this reverses mutation order, which would violate correctness
115115+ # if we did not restore the order below
116116+ existing -> Map.put(acc, tag, [mutation | existing])
117117+ end
120118 end)
121121- |> Enum.sort_by(&elem(&1, 0))
122122- |> Enum.reduce(state.tagged_queue, fn {tag, mutations}, tq ->
123123- # Note: we restore the mutation order with Enum.reverse here
124124- TaggedQueue.append_batch(tq, tag, batch.commit_version, Enum.reverse(mutations))
125125- end)
119119+ end)
120120+ # TODO: this sort is not strictly needed for determinism because
121121+ # TaggedQueue ets insert order should not affect anything
122122+ |> Enum.sort_by(&elem(&1, 0))
123123+ |> Enum.each(fn {tag, mutations} ->
124124+ # Note: we restore the mutation order with Enum.reverse here
125125+ :ok = TaggedQueue.append_batch(state.tagged_queue, tag, batch.commit_version, Enum.reverse(mutations))
126126+ end)
126127127128 # Ack to CommitBuffer
128129 SimServer.reply(from, :ok)
129130130130- %State{state | version: batch.commit_version, tagged_queue: tagged_queue}
131131+ %State{state | version: batch.commit_version}
131132 end
132133end
+27-49
lib/tagged_queue.ex
···77 alias Hobbes.TaggedQueue
8899 @type t :: %__MODULE__{
1010- tagged_mutations: %{integer => :queue.queue({integer, [{binary, binary}]})},
1010+ table: :ets.table,
1111 }
1212- @enforce_keys [:tagged_mutations]
1212+ @enforce_keys [:table]
1313 defstruct @enforce_keys
14141515 @doc """
···1717 """
1818 @spec new :: t
1919 def new do
2020- %TaggedQueue{tagged_mutations: %{}}
2020+ table = :ets.new(:tagged_queue, [:ordered_set, :private])
2121+ %TaggedQueue{table: table}
2122 end
22232323- @spec append_batch(%TaggedQueue{}, integer, non_neg_integer, [{binary, binary}]) :: %TaggedQueue{}
2424+ @spec append_batch(%TaggedQueue{}, integer, non_neg_integer, [{binary, binary}]) :: :ok
2425 def append_batch(%TaggedQueue{} = tq, tag, version, mutations) when is_integer(tag) and is_integer(version) and is_list(mutations) do
2525- queue = get_in(tq.tagged_mutations[tag]) || :queue.new()
2626- queue = :queue.in({version, mutations}, queue)
2727-2828- put_in(tq.tagged_mutations[tag], queue)
2626+ true = :ets.insert(tq.table, {{tag, version}, mutations})
2727+ :ok
2928 end
30293130 @doc """
···3332 """
3433 @spec peek(%TaggedQueue{}, integer, non_neg_integer, non_neg_integer) :: []
3534 def peek(%TaggedQueue{} = tq, tag, start_version, end_version) when is_integer(tag) and is_integer(start_version) and is_integer(end_version) do
3636- case tq.tagged_mutations[tag] do
3737- nil ->
3838- []
3939-4040- queue ->
4141- peek_queue(queue, start_version, end_version, [])
4242- |> Enum.reverse()
4343- end
3535+ do_scan(tq.table, tag, end_version, start_version - 1, [])
3636+ |> Enum.reverse()
4437 end
45384646- defp peek_queue(queue, start_version, end_version, acc) do
4747- case :queue.out(queue) do
4848- {:empty, _} ->
3939+ defp do_scan(table, tag, end_version, prev_version, acc) do
4040+ case :ets.next_lookup(table, {tag, prev_version}) do
4141+ {_key, [{{^tag, ver}, mutations}]} when ver <= end_version ->
4242+ do_scan(table, tag, end_version, ver, [{ver, mutations} | acc])
4343+ _ ->
4944 acc
5050-5151- {{:value, {ver, _mut} = batch}, queue} ->
5252- nil
5353- cond do
5454- ver > end_version ->
5555- acc
5656- ver < start_version ->
5757- peek_queue(queue, start_version, end_version, acc)
5858- true ->
5959- peek_queue(queue, start_version, end_version, [batch | acc])
6060- end
6145 end
6246 end
63476448 @doc """
6549 Pops (removes) all mutations for `tag` with a version up to `end_version` (inclusive).
6650 """
6767- @spec pop(%TaggedQueue{}, integer, non_neg_integer) :: %TaggedQueue{}
5151+ @spec pop(%TaggedQueue{}, integer, non_neg_integer) :: :ok
6852 def pop(%TaggedQueue{} = tq, tag, end_version) when is_integer(tag) and is_integer(end_version) do
6969- case tq.tagged_mutations[tag] do
7070- nil ->
7171- tq
7272- queue ->
7373- queue = pop_queue(queue, end_version)
7474- put_in(tq.tagged_mutations[tag], queue)
7575- end
5353+ :ets.select_delete(tq.table, [{
5454+ {{:"$1", :"$2"}, :"$3"},
5555+ [
5656+ {:"==", :"$1", tag},
5757+ {:"=<", :"$2", end_version},
5858+ ],
5959+ [true],
6060+ }])
6161+ :ok
7662 end
77637878- defp pop_queue(queue, end_version) do
7979- case :queue.out(queue) do
8080- {:empty, _} ->
8181- queue
8282-8383- {{:value, {ver, _mut}}, popped_queue} ->
8484- case ver > end_version do
8585- true -> queue
8686- false -> pop_queue(popped_queue, end_version)
8787- end
8888- end
6464+ @doc false
6565+ def dump(%TaggedQueue{table: table}) do
6666+ :ets.tab2list(table)
8967 end
9068end
+17
lib/utils.ex
···183183 end)
184184 end)
185185 end
186186+187187+ @doc """
188188+ Times a function and logs how long it takes.
189189+190190+ ## Examples
191191+192192+ iex> time_fn("hello world", fn -> :foo end)
193193+ :foo
194194+195195+ $ "hello world took 0 ms"
196196+197197+ """
198198+ def time_fn(label, func) when is_binary(label) and is_function(func) do
199199+ {time, result} = :timer.tc(func)
200200+ IO.inspect "#{label} took #{time / 1000} ms"
201201+ result
202202+ end
186203end