···11-defmodule Hobbes.Construct.Scheduler do
22- use GenServer
33-44- alias Hobbes.Construct.SimLog
55- alias Hobbes.Construct.Scheduler.{ProcStore, ProcQueue, ProcRegistry, FileStore}
66- alias Hobbes.Construct.Scheduler.ProcStore.ProcState
77-88- defmodule Spawn do
99- @enforce_keys [:module, :function, :args, :link, :parent_resume]
1010- defstruct @enforce_keys
1111- end
1212-1313- defmodule Resume do
1414- @enforce_keys [:ref, :pid, :value]
1515- defstruct @enforce_keys
1616- end
1717-1818- defmodule Timeout do
1919- @enforce_keys [:pid, :ref]
2020- defstruct @enforce_keys
2121- end
2222-2323- defmodule Send do
2424- @enforce_keys [:dest, :message]
2525- defstruct @enforce_keys
2626- end
2727-2828- defmodule Exit do
2929- @enforce_keys [:from_pid, :to_pid, :reason]
3030- defstruct @enforce_keys
3131- end
3232-3333- defmodule StartNode do
3434- @enforce_keys [:name]
3535- defstruct @enforce_keys
3636- end
3737-3838- defmodule Node do
3939- @type t :: %__MODULE__{
4040- pid: pid | nil,
4141- status: :running | :stopped,
4242- name: atom,
4343- app_module: module,
4444- args: term,
4545- }
4646- @enforce_keys [:pid, :status, :name, :app_module, :args]
4747- defstruct @enforce_keys
4848- end
4949-5050- defmodule State do
5151- @type t :: %__MODULE__{
5252- clock: non_neg_integer,
5353- current: pid | nil,
5454-5555- nodes: [{atom, Node.t}],
5656- proc_store: ProcStore.t,
5757- proc_queue: ProcQueue.t,
5858- proc_registry: ProcRegistry.t,
5959- file_stores: %{atom => FileStore.t},
6060-6161- log_server_pid: pid,
6262-6363- resumes_without_send: non_neg_integer,
6464- }
6565-6666- defstruct [
6767- clock: 0,
6868-6969- nodes: [],
7070- current: nil,
7171- proc_store: nil,
7272- proc_queue: nil,
7373- proc_registry: nil,
7474- file_stores: %{},
7575-7676- log_server_pid: nil,
7777- resumes_without_send: 0,
7878- ]
7979- end
8080-8181- @spec start_link(term) :: GenServer.on_start
8282- def start_link(seed) when is_integer(seed) do
8383- :rand.seed(:exsss, seed)
8484- scheduler_seed = random_seed()
8585-8686- {:ok, scheduler_pid} = GenServer.start_link(__MODULE__, %{initial_pid: self(), seed: scheduler_seed})
8787- Hobbes.Construct.SimServer.set_scheduler_pid(scheduler_pid)
8888-8989- {:ok, scheduler_pid}
9090- end
9191-9292- @spec yield(term) :: :ok
9393- def yield(scheduler, delay \\ 0) do
9494- resume = %Resume{ref: make_ref(), pid: self(), value: nil}
9595- GenServer.cast(scheduler, {:queue_task, self(), delay, resume})
9696- GenServer.cast(scheduler, {:yield, self()})
9797-9898- nil = await_resume(resume)
9999- :ok
100100- end
101101-102102- @spec yield_until_message(pid, function, non_neg_integer | :infinity) :: :ok
103103- def yield_until_message(scheduler, check_fun, timeout_ms)
104104- when is_function(check_fun) and ((is_integer(timeout_ms) and timeout_ms >= 0) or timeout_ms == :infinity) do
105105- resume = %Resume{ref: make_ref(), pid: self(), value: nil}
106106-107107- GenServer.cast(scheduler, {:await_message, self(), check_fun, timeout_ms, resume})
108108- GenServer.cast(scheduler, {:yield, self()})
109109-110110- nil = await_resume(resume)
111111- :ok
112112- end
113113-114114- @spec spawn_and_yield(term, :link | :nolink, module, atom, [term]) :: pid
115115- def spawn_and_yield(scheduler, link, module, function, args) when link in [:link, :nolink] do
116116- resume = %Resume{ref: make_ref(), pid: self(), value: nil}
117117- spawn = %Spawn{module: module, function: function, args: args, parent_resume: resume, link: link == :link}
118118-119119- GenServer.cast(scheduler, {:queue_task, self(), 0, spawn})
120120- GenServer.cast(scheduler, {:yield, self()})
121121-122122- spawned_pid = await_resume(resume)
123123- spawned_pid
124124- end
125125-126126- defp await_resume(%Resume{ref: ref} = _resume) do
127127- receive do
128128- {:resume, ^ref, value} -> value
129129- {:timeout, ^ref} -> exit(:timeout)
130130- end
131131- end
132132-133133- @spec send(term, pid, term) :: :ok
134134- def send(scheduler, dest, message, delay \\ 0) do
135135- send = %Send{dest: dest, message: message}
136136- GenServer.cast(scheduler, {:queue_task, self(), delay, send})
137137- :ok
138138- end
139139-140140- @spec exit(term, pid, term) :: :ok
141141- def exit(scheduler, to_pid, reason) do
142142- exit = %Exit{from_pid: self(), to_pid: to_pid, reason: reason}
143143- GenServer.cast(scheduler, {:queue_task, self(), 0, exit})
144144- :ok
145145- end
146146-147147- @spec start_node(pid, atom, module, term) :: :ok
148148- def start_node(scheduler, name, app_module, args) when is_atom(name) and is_atom(app_module) do
149149- GenServer.call(scheduler, {:create_node, name, app_module, args})
150150- :ok
151151- end
152152-153153- @spec restart_node(pid, atom, non_neg_integer) :: :ok | {:error, :node_not_found | :node_stopped}
154154- def restart_node(scheduler, name, delay) when is_atom(name) do
155155- GenServer.call(scheduler, {:restart_node, name, delay})
156156- end
157157-158158- @spec list_nodes(pid) :: [atom]
159159- def list_nodes(scheduler) do
160160- GenServer.call(scheduler, :list_nodes)
161161- end
162162-163163- def get_current_node(scheduler) do
164164- GenServer.call(scheduler, {:get_current_node, self()})
165165- end
166166-167167- @spec monitor(pid, pid) :: reference
168168- def monitor(scheduler, target_pid) do
169169- ref = make_ref()
170170- :ok = GenServer.call(scheduler, {:monitor, self(), target_pid, ref})
171171-172172- ref
173173- end
174174-175175- @spec alias(pid) :: reference
176176- def alias(scheduler) do
177177- GenServer.call(scheduler, {:alias, self()})
178178- end
179179-180180- @spec unalias(pid, reference) :: boolean
181181- def unalias(scheduler, alias) when is_reference(alias) do
182182- GenServer.call(scheduler, {:unalias, self(), alias})
183183- end
184184-185185- @spec set_process_flag(pid, :trap_exit, boolean) :: :ok
186186- def set_process_flag(scheduler, :trap_exit = flag, value) when is_boolean(value) do
187187- {:ok, old_value} = GenServer.call(scheduler, {:set_process_flag, self(), flag, value})
188188- old_value
189189- end
190190-191191- def register_process(scheduler, pid, name) when is_pid(pid) and is_atom(name) do
192192- GenServer.call(scheduler, {:register_process, self(), pid, name})
193193- end
194194-195195- def whereis(scheduler, name) when is_atom(name) do
196196- GenServer.call(scheduler, {:whereis, self(), name})
197197- end
198198-199199- @spec get_file_store(pid) :: FileStore.t
200200- def get_file_store(scheduler) do
201201- GenServer.call(scheduler, {:get_file_store, self()})
202202- end
203203-204204- @spec current_time(term, pid) :: non_neg_integer
205205- def current_time(scheduler, for_pid \\ self()) when is_pid(for_pid) do
206206- GenServer.call(scheduler, {:current_time, for_pid})
207207- end
208208-209209- @spec configure_log_server(term, pid) :: :ok
210210- def configure_log_server(scheduler, log_server_pid) do
211211- GenServer.call(scheduler, {:configure_log_server, log_server_pid})
212212- end
213213-214214- @spec get_log(term) :: term
215215- def get_log(scheduler) do
216216- # We run the get_log through the scheduler to ensure
217217- # that all log messages from the scheduler process (which are cast)
218218- # have been processed first (messages are always received in order)
219219- GenServer.call(scheduler, :get_log)
220220- end
221221-222222- def init(%{initial_pid: initial_pid, seed: seed}) do
223223- Process.flag(:trap_exit, true)
224224- :rand.seed(:exsss, seed)
225225-226226- state = %State{
227227- proc_store: ProcStore.new(),
228228- proc_queue: ProcQueue.new(),
229229- proc_registry: ProcRegistry.new(),
230230- # TODO: spawn per node instead
231231- file_stores: %{nonode: FileStore.new()},
232232- }
233233- ProcStore.add_process(state.proc_store, initial_pid, :nonode)
234234-235235- state = set_current_process(state, initial_pid)
236236- {:ok, state}
237237- end
238238-239239- def handle_call({:alias, pid}, _from, %State{} = state) do
240240- alias = make_ref()
241241- ProcStore.add_alias(state.proc_store, pid, alias)
242242- {:reply, alias, state}
243243- end
244244-245245- def handle_call({:unalias, pid, alias}, _from, %State{} = state) do
246246- case ProcStore.remove_alias(state.proc_store, pid, alias) do
247247- :ok -> {:reply, true, state}
248248- {:error, _err} -> {:reply, false, state}
249249- end
250250- end
251251-252252- def handle_call({:set_process_flag, pid, flag, value}, _from, %State{} = state) do
253253- {:ok, old_value} = ProcStore.set_flag(state.proc_store, pid, flag, value)
254254- {:reply, {:ok, old_value}, state}
255255- end
256256-257257- def handle_call({:register_process, from_pid, pid, name}, _from, %State{} = state) do
258258- # TODO: ensure name is valid? (not nil/true/false/:undefined)
259259- {:ok, %ProcState{node: from_node}} = ProcStore.fetch_state(state.proc_store, from_pid)
260260- {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, pid)
261261-262262- case node == from_node do
263263- true ->
264264- case ProcRegistry.register(state.proc_registry, pid, name, node) do
265265- :ok -> {:reply, :ok, state}
266266- {:error, _error} -> {:reply, :error, state}
267267- end
268268-269269- false -> {:reply, :error, state}
270270- end
271271- end
272272-273273- def handle_call({:whereis, from_pid, name}, _from, %State{} = state) do
274274- {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, from_pid)
275275-276276- case ProcRegistry.whereis(state.proc_registry, name, node) do
277277- {:ok, pid} -> {:reply, pid, state}
278278- {:error, :not_found} -> {:reply, nil, state}
279279- end
280280- end
281281-282282- def handle_call({:get_file_store, pid}, _from, %State{} = state) when is_pid(pid) do
283283- {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, pid)
284284-285285- case Map.fetch(state.file_stores, node) do
286286- {:ok, file_store} ->
287287- {:reply, file_store, state}
288288- :error ->
289289- file_store = FileStore.new()
290290- state = %{state | file_stores: Map.put(state.file_stores, node, file_store)}
291291- {:reply, file_store, state}
292292- end
293293- end
294294-295295- def handle_call({:current_time, for_pid}, _from, %State{} = state) when is_pid(for_pid) do
296296- # TODO: maybe we should store clock in microseconds?
297297- {:reply, state.clock * 1000, state}
298298- end
299299-300300- def handle_call({:configure_log_server, log_server_pid}, _from, %State{} = state) do
301301- state = %{state | log_server_pid: log_server_pid}
302302- {:reply, :ok, state}
303303- end
304304-305305- def handle_call(:get_log, _from, %State{} = state) do
306306- log = SimLog.get_log(state.log_server_pid)
307307- {:reply, log, state}
308308- end
309309-310310- def handle_call({:monitor, pid, target_pid, ref}, _from, %State{} = state) when is_pid(pid) and is_pid(target_pid) and is_reference(ref) do
311311- case ProcStore.fetch_state(state.proc_store, target_pid) do
312312- {:ok, _proc} ->
313313- ProcStore.add_monitor(state.proc_store, pid, target_pid, ref)
314314- {:reply, :ok, state}
315315-316316- :error ->
317317- send pid, {:DOWN, ref, :process, target_pid, :noproc}
318318- {:reply, :ok, state}
319319- end
320320- end
321321-322322- def handle_call({:create_node, name, app_module, args}, _from, %State{} = state) when is_atom(name) and is_atom(app_module) do
323323- node = %Node{pid: nil, status: :stopped, name: name, app_module: app_module, args: args}
324324- state = %{state | nodes: [{node.name, node} | state.nodes]}
325325-326326- queue_task(state, state.clock, %StartNode{name: node.name})
327327-328328- {:reply, :ok, state}
329329- end
330330-331331- def handle_call({:restart_node, name, delay}, _from, %State{} = state) when is_atom(name) and is_integer(delay) do
332332- case List.keyfind(state.nodes, name, 0) do
333333- {^name, %Node{status: :running} = node} ->
334334- :ok = kill_node(state, node)
335335-336336- node = %{node | pid: nil, status: :stopped}
337337- state = %{state | nodes: List.keyreplace(state.nodes, name, 0, {name, node})}
338338-339339- start_node = %StartNode{name: node.name}
340340- state = queue_task(state, state.clock + delay, start_node)
341341-342342- {:reply, :ok, state}
343343-344344- {^name, %Node{status: :stopped}} -> {:reply, {:error, :node_stopped}, state}
345345- [] -> {:reply, {:error, :node_not_found}, state}
346346- end
347347- end
348348-349349- def handle_call(:list_nodes, _from, %State{} = state) do
350350- nodes =
351351- state.nodes
352352- |> Enum.map(fn {_name, %Node{} = node} -> node.name end)
353353- |> Enum.reverse()
354354- {:reply, nodes, state}
355355- end
356356-357357- def handle_call({:get_current_node, pid}, _from, %State{} = state) do
358358- {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, pid)
359359- {:reply, node, state}
360360- end
361361-362362- def handle_cast({:queue_task, pid, delay, %Send{} = send}, %State{} = state) when is_pid(pid) and is_integer(delay) do
363363- delay =
364364- case send.dest == pid do
365365- true -> delay
366366- # Minimum 10ms of latency for all messages not sent to self()
367367- # TODO: vary this based on whether processes are on the same node
368368- false -> max(delay, 10)
369369- end
370370-371371- case delay do
372372- 0 ->
373373- log(state, {:send_now, state.clock, send.dest, send.message})
374374- {:noreply, send_message(state, send)}
375375-376376- delay when delay > 0 ->
377377- {:noreply, queue_task(state, state.clock + delay, send)}
378378- end
379379- end
380380-381381- def handle_cast({:queue_task, pid, delay, task}, %State{} = state) when is_pid(pid) and is_integer(delay) do
382382- {:noreply, queue_task(state, state.clock + delay, task)}
383383- end
384384-385385- def handle_cast({:await_message, pid, check_fun, timeout_ms, %Resume{} = resume}, %State{} = state) when is_pid(pid) and is_function(check_fun) do
386386- # Add new timeout if needed
387387- timeout_key =
388388- case timeout_ms do
389389- :infinity ->
390390- nil
391391- timeout_ms ->
392392- {:ok, key} = ProcQueue.enqueue(state.proc_queue, state.clock + timeout_ms, %Timeout{pid: pid, ref: resume.ref})
393393- key
394394- end
395395-396396- ProcStore.track_await(state.proc_store, pid, check_fun, resume, timeout_key)
397397- {:noreply, state}
398398- end
399399-400400- def handle_cast({:yield, pid}, %State{} = state) when is_pid(pid) do
401401- case state.current do
402402- ^pid -> :noop
403403- end
404404- {:noreply, %{state | current: nil} |> perform_next_task()}
405405- end
406406-407407- def handle_info({:EXIT, pid, reason} = message, %State{} = state) do
408408- case state.current do
409409- ^pid ->
410410- state = clean_up_dead_process(state, pid, reason)
411411- {:noreply, %{state | current: nil} |> perform_next_task()}
412412-413413- _ ->
414414- raise """
415415- Received invalid EXIT message: #{inspect(message)}
416416-417417- Scheduler state:
418418-419419- #{inspect(state, pretty: true)}
420420- """
421421- end
422422- end
423423-424424- defp queue_task(%State{} = state, time, %Resume{} = resume) when is_integer(time) do
425425- {:ok, key} = ProcQueue.enqueue(state.proc_queue, time, resume)
426426- ProcStore.track_queued(state.proc_store, resume.pid, key)
427427-428428- state
429429- end
430430-431431- defp queue_task(%State{} = state, time, task) when is_integer(time) do
432432- ProcQueue.enqueue(state.proc_queue, time, task)
433433- state
434434- end
435435-436436- defp perform_next_task(%State{resumes_without_send: resumes_without_send} = state) when resumes_without_send >= 1000 do
437437- # This is a rather primitive form of deadlock detection but it works for now
438438- raise """
439439- Resumed #{inspect(resumes_without_send)} times without sending a message! Possible deadlock?
440440-441441- Scheduler state:
442442-443443- #{inspect(state, pretty: true)}
444444- """
445445- end
446446-447447- defp perform_next_task(%State{} = state) do
448448- {time, task} =
449449- case ProcQueue.pop_next(state.proc_queue) do
450450- {:ok, {time, task}} ->
451451- {time, task}
452452-453453- {:error, :empty} ->
454454- raise """
455455- Attempted to call `perform_next_task/1` but the queue is empty! Possible deadlock?
456456-457457- Scheduler state:
458458-459459- #{inspect(state, pretty: true)}
460460- """
461461- end
462462-463463- if time < state.clock do
464464- raise """
465465- Time ran backwards!
466466-467467- Scheduler state:
468468-469469- #{inspect(state, pretty: true)}
470470- """
471471- end
472472-473473- %{state | clock: time}
474474- |> perform(task)
475475- end
476476-477477- defp perform(%State{} = state, %Spawn{} = spawn) do
478478- parent_pid = spawn.parent_resume.pid
479479- {:ok, %ProcState{node: node}} = ProcStore.fetch_state(state.proc_store, parent_pid)
480480-481481- child_pid = do_spawn_apply(spawn.module, spawn.function, spawn.args)
482482-483483- ProcStore.add_process(state.proc_store, child_pid, node)
484484- if spawn.link, do: ProcStore.add_link(state.proc_store, parent_pid, child_pid)
485485-486486- log(state, {:spawn, child_pid, {spawn.module, spawn.function, spawn.args}})
487487-488488- parent_resume = %{%Resume{} = spawn.parent_resume | value: child_pid}
489489-490490- state
491491- |> set_current_process(child_pid)
492492- |> queue_task(state.clock, parent_resume)
493493- end
494494-495495- defp perform(%State{} = state, %StartNode{} = start_node) do
496496- {_name, %Node{} = node} = List.keyfind(state.nodes, start_node.name, 0)
497497- pid = do_spawn_apply(node.app_module, :start, [:temporary, node.args])
498498-499499- ProcStore.add_process(state.proc_store, pid, node.name)
500500- log(state, {:start_node, pid, {node.name, node.app_module, node.args}})
501501-502502- node = %{node | pid: pid, status: :running}
503503- state = %{state | nodes: List.keyreplace(state.nodes, node.name, 0, {node.name, node})}
504504-505505- state
506506- |> set_current_process(pid)
507507- end
508508-509509- defp perform(%State{} = state, %Send{} = send) do
510510- log(state, {:send, state.clock, send.dest, send.message})
511511-512512- state
513513- |> send_message(send)
514514- |> perform_next_task()
515515- end
516516-517517- defp perform(%State{} = state, %Exit{} = exit) do
518518- log(state, {:exit, state.clock, exit.to_pid, exit.reason})
519519-520520- case ProcStore.fetch_state(state.proc_store, exit.to_pid) do
521521- {:ok, %ProcState{} = proc} ->
522522- case proc.trap_exit and exit.reason != :kill do
523523- true ->
524524- state
525525- |> send_message(%Send{dest: proc.pid, message: {:EXIT, exit.from_pid, exit.reason}})
526526- |> perform_next_task()
527527- false ->
528528- state = set_current_process(state, proc.pid)
529529- Process.exit(proc.pid, exit.reason)
530530-531531- state
532532- end
533533-534534- # Process already died
535535- :error -> perform_next_task(state)
536536- end
537537- end
538538-539539- defp perform(%State{} = state, %Resume{} = resume) do
540540- #log(state, {:resume, time, resume.pid, resume.value})
541541-542542- case ProcStore.fetch_state(state.proc_store, resume.pid) do
543543- {:ok, _proc} ->
544544- ProcStore.clear_queued(state.proc_store, resume.pid)
545545-546546- state = set_current_process(state, resume.pid)
547547- send resume.pid, {:resume, resume.ref, resume.value}
548548-549549- %{state | resumes_without_send: state.resumes_without_send + 1}
550550-551551- :error ->
552552- # TODO: this should be unreachable because we remove queue entries proactively
553553- raise "Tried to resume dead process"
554554- end
555555- end
556556-557557- defp perform(%State{} = state, %Timeout{pid: pid, ref: ref}) do
558558- ProcStore.clear_await(state.proc_store, pid)
559559-560560- state = set_current_process(state, pid)
561561- send pid, {:timeout, ref}
562562-563563- state
564564- end
565565-566566- defp set_current_process(%State{} = state, pid) when is_pid(pid) do
567567- %{state | current: pid}
568568- end
569569-570570- defp send_message(%State{} = state, %Send{} = send) do
571571- dest_pid =
572572- case send.dest do
573573- # TODO: for local names, send/2 actually raises if the name is not registered
574574- name when is_atom(name) ->
575575- # TODO: use node from sender
576576- case ProcRegistry.whereis(state.proc_registry, name, :nonode) do
577577- {:ok, pid} -> pid
578578- {:error, :not_found} -> nil
579579- end
580580-581581- {name, node} when is_atom(name) and is_atom(node) ->
582582- case ProcRegistry.whereis(state.proc_registry, name, node) do
583583- {:ok, pid} -> pid
584584- {:error, :not_found} -> nil
585585- end
586586-587587- alias when is_reference(alias) ->
588588- case ProcStore.resolve_alias(state.proc_store, alias) do
589589- {:ok, pid} -> pid
590590- :error -> nil
591591- end
592592-593593- pid when is_pid(pid) -> pid
594594- end
595595-596596- send_message_to_pid(state, dest_pid, send.message)
597597- end
598598-599599- # Ignore messages sent to unregistered names
600600- # TODO: technically we should raise if name is unregistered and local
601601- # (but only for immediate send)
602602- defp send_message_to_pid(%State{} = state, nil, _message) do
603603- state
604604- end
605605-606606- defp send_message_to_pid(%State{} = state, dest_pid, message) when is_pid(dest_pid) do
607607- send dest_pid, message
608608- state = %{state | resumes_without_send: 0}
609609-610610- case ProcStore.fetch_state(state.proc_store, dest_pid) do
611611- {:ok, %ProcState{await: {check_fun, resume}, queue_key: timeout_key}} ->
612612- case check_fun.(message) do
613613- true ->
614614- if timeout_key, do: ProcQueue.remove(state.proc_queue, timeout_key)
615615- ProcStore.clear_await(state.proc_store, dest_pid)
616616-617617- queue_task(state, state.clock, resume)
618618-619619- false -> state
620620- end
621621-622622- _ -> state
623623- end
624624- end
625625-626626- defp clean_up_dead_process(%State{} = state, pid, reason) when is_pid(pid) do
627627- state =
628628- state
629629- |> dispatch_links(pid, reason)
630630- |> dispatch_monitors(pid, reason)
631631-632632- :ok = remove_process(state, pid)
633633- state
634634- end
635635-636636- # Don't send exits for reason :normal
637637- defp dispatch_links(%State{} = state, target_pid, :normal) do
638638- {:ok, %ProcState{} = target_proc} = ProcStore.fetch_state(state.proc_store, target_pid)
639639-640640- # Clean up links for the exiting process
641641- state = Enum.reduce(target_proc.linked_to, state, fn linked_pid, state ->
642642- {:ok, %ProcState{} = linked_proc} = ProcStore.fetch_state(state.proc_store, linked_pid)
643643-644644- state =
645645- case linked_proc.trap_exit do
646646- true -> send_message(state, %Send{dest: linked_proc.pid, message: {:EXIT, target_proc.pid, :normal}})
647647- false -> state
648648- end
649649-650650- ProcStore.remove_link(state.proc_store, target_pid, linked_pid)
651651- state
652652- end)
653653-654654- state
655655- end
656656-657657- defp dispatch_links(%State{} = state, target_pid, reason) when is_pid(target_pid) do
658658- {:ok, %ProcState{} = target_proc} = ProcStore.fetch_state(state.proc_store, target_pid)
659659-660660- {state, pids_to_kill} = traverse_linked(state, target_proc, reason)
661661- # Cycles are prevented, but there could still be duplicates
662662- pids_to_kill = Enum.uniq(pids_to_kill)
663663-664664- # Kill processes
665665- Enum.each(pids_to_kill, fn pid ->
666666- kill_process(pid, reason)
667667- end)
668668-669669- # Dispatch monitors for all killed processes
670670- state =
671671- Enum.reduce(pids_to_kill, state, fn pid, state ->
672672- dispatch_monitors(state, pid, reason)
673673- end)
674674-675675- # Once processes are all dead and monitors dispatched,
676676- # we can remove them from the queue and store
677677- #
678678- # Note that these processes still have links to each other,
679679- # but it does not matter because both sides of the links will be deleted
680680- # along with them
681681- Enum.each(pids_to_kill, fn pid ->
682682- remove_process(state, pid)
683683- end)
684684-685685- state
686686- end
687687-688688- defp traverse_linked(%State{} = state, %ProcState{} = target_proc, reason, parents \\ MapSet.new()) do
689689- target_proc.linked_to
690690- |> Enum.reject(&MapSet.member?(parents, &1))
691691- |> Enum.reduce({state, []}, fn linked_pid, {state, acc} ->
692692- {:ok, %ProcState{} = linked_proc} = ProcStore.fetch_state(state.proc_store, linked_pid)
693693-694694- case linked_proc.trap_exit do
695695- true ->
696696- # Since one side of this link will survive, we must clean up the link
697697- ProcStore.remove_link(state.proc_store, target_proc.pid, linked_pid)
698698-699699- # Send trapped EXIT message
700700- message = {:EXIT, target_proc.pid, reason}
701701- state = send_message(state, %Send{dest: linked_pid, message: message})
702702-703703- {state, acc}
704704-705705- false ->
706706- {state, to_kill} = traverse_linked(state, linked_proc, reason, MapSet.put(parents, target_proc.pid))
707707- {state, acc ++ [linked_proc.pid | to_kill]}
708708- end
709709- end)
710710- end
711711-712712- defp dispatch_monitors(%State{} = state, target_pid, reason) when is_pid(target_pid) do
713713- {:ok, %ProcState{} = target_proc} = ProcStore.fetch_state(state.proc_store, target_pid)
714714-715715- target_proc.monitored_by
716716- |> Enum.reduce(state, fn {ref, to_pid}, state ->
717717- ProcStore.remove_monitor(state.proc_store, to_pid, ref)
718718-719719- msg = {:DOWN, ref, :process, target_pid, reason}
720720- send_message(state, %Send{dest: to_pid, message: msg})
721721- end)
722722- end
723723-724724- defp log(%State{} = state, event) do
725725- SimLog.log(state.log_server_pid, event)
726726- end
727727-728728- @spec do_spawn_apply(module, atom, list) :: pid
729729- defp do_spawn_apply(m, f, a) do
730730- scheduler_pid = self()
731731- seed = random_seed()
732732- spawn_link(fn ->
733733- Hobbes.Construct.SimServer.set_scheduler_pid(scheduler_pid)
734734- :rand.seed(:exsss, seed)
735735- apply(m, f, a)
736736- end)
737737- end
738738-739739- defp kill_node(%State{} = state, %Node{} = node) do
740740- ProcStore.list_processes(state.proc_store)
741741- |> Enum.filter(&(&1.node == node.name))
742742- |> Enum.each(fn %ProcState{pid: pid} ->
743743- # TODO: the order here is not deterministic, but it probably doesn't matter?
744744- :ok = kill_process(pid, :shutdown)
745745- :ok = remove_process(state, pid)
746746- end)
747747- :ok
748748- end
749749-750750- # Note: remember to call remove_process() afterwards to clear the process from the ProcStore
751751- defp kill_process(pid, reason) do
752752- Process.exit(pid, reason)
753753- receive do
754754- {:EXIT, ^pid, ^reason} -> :noop
755755- after
756756- # Sanity to stop the scheduler hanging, this should never happen
757757- 1000 -> raise "Process #{inspect(pid)} failed to exit"
758758- end
759759- :ok
760760- end
761761-762762- defp remove_process(%State{} = state, pid) when is_pid(pid) do
763763- {:ok, %ProcState{queue_key: queue_key}} = ProcStore.fetch_state(state.proc_store, pid)
764764- if queue_key, do: ProcQueue.remove(state.proc_queue, queue_key)
765765-766766- ProcStore.remove_process(state.proc_store, pid)
767767- ProcRegistry.remove_process(state.proc_registry, pid)
768768- :ok
769769- end
770770-771771- defp random_seed, do: Enum.random(1..1_000_000)
772772-end
-126
lib/construct/scheduler/file_store.ex
···11-defmodule Hobbes.Construct.Scheduler.FileStore do
22- alias Hobbes.KV.FlatKV
33-44- @type t :: FlatKV.t
55- @type posix :: :enoent | :eexist | :enotdir | :eisdir
66-77- @spec new :: t
88- def new do
99- kv = FlatKV.new(public: true)
1010- FlatKV.put(kv, "/", "directory")
1111-1212- kv
1313- end
1414-1515- @spec mkdir(t, binary) :: :ok | {:error, posix}
1616- def mkdir(kv, path) do
1717- path = normalize_directory(path)
1818-1919- case all_components_dir?(kv, path) do
2020- true ->
2121- case exists?(kv, path) do
2222- false ->
2323- FlatKV.put(kv, path, "directory")
2424- :ok
2525-2626- true -> {:error, :eexist}
2727- end
2828-2929- false -> {:error, :enoent}
3030- end
3131- end
3232-3333- @spec mkdir_p(t, binary) :: :ok | {:error, posix}
3434- def mkdir_p(kv, path) do
3535- path
3636- |> components()
3737- |> Enum.reduce_while(:ok, fn p, :ok ->
3838- case exists?(kv, p) do
3939- false ->
4040- :ok = mkdir(kv, p)
4141- {:cont, :ok}
4242-4343- true ->
4444- case dir?(kv, p) do
4545- true -> {:cont, :ok}
4646- false -> {:halt, {:error, :enotdir}}
4747- end
4848- end
4949- end)
5050- end
5151-5252- @spec write(t, binary, binary) :: :ok | {:error, posix}
5353- def write(kv, path, contents) do
5454- case all_components_dir?(kv, path) do
5555- true ->
5656- FlatKV.put(kv, path, "file:" <> contents)
5757- :ok
5858- false ->
5959- {:error, :enoent}
6060- end
6161- end
6262-6363- @spec read(t, binary) :: {:ok, binary} | {:error, posix}
6464- def read(kv, path) do
6565- case FlatKV.get(kv, path) do
6666- "file:" <> contents ->
6767- {:ok, contents}
6868-6969- "directory" ->
7070- {:error, :eisdir}
7171-7272- nil ->
7373- {:error, :enoent}
7474- end
7575- end
7676-7777- @spec ls(t, binary) :: {:ok, [binary]}
7878- def ls(kv, path) do
7979- # TODO: error handling (enoent, enotdir)
8080- path = normalize_directory(path)
8181- # TODO: would be more efficient to only scan keys
8282- %{pairs: pairs} = FlatKV.scan(kv, path <> "/", path <> "0")
8383-8484- child_paths =
8585- pairs
8686- |> Enum.map(fn {^path <> "/" <> file_name, _contents} -> file_name end)
8787- |> Enum.reject(fn name -> String.contains?(name, "/") end)
8888-8989- {:ok, child_paths}
9090- end
9191-9292- @spec dir?(t, binary) :: boolean
9393- def dir?(kv, path) do
9494- path = normalize_directory(path)
9595- FlatKV.get(kv, path) == "directory"
9696- end
9797-9898- @spec exists?(t, binary) :: boolean
9999- def exists?(kv, path) do
100100- FlatKV.get(kv, path) != nil
101101- end
102102-103103- defp components(path) do
104104- split = Path.split(path)
105105-106106- Enum.map(1..length(split), fn count ->
107107- Enum.take(split, count) |> Path.join()
108108- end)
109109- end
110110-111111- defp all_components_dir?(kv, path) do
112112- path
113113- |> components()
114114- |> case do
115115- [] -> []
116116- list -> List.delete_at(list, -1)
117117- end
118118- |> Enum.all?(&dir?(kv, &1))
119119- end
120120-121121- defp normalize_directory("/"), do: "/"
122122- defp normalize_directory(path), do: String.trim_trailing(path, "/")
123123-124124- @doc false
125125- def dump(kv), do: FlatKV.dump(kv)
126126-end
-45
lib/construct/scheduler/proc_queue.ex
···11-defmodule Hobbes.Construct.Scheduler.ProcQueue do
22- @type t :: :ets.table
33- @type key :: {non_neg_integer, non_neg_integer}
44-55- def new do
66- :ets.new(:proc_queue, [:private, :ordered_set])
77- end
88-99- @spec enqueue(t, non_neg_integer, term) :: {:ok, key}
1010- def enqueue(table, time, task) when is_integer(time) do
1111- # Tasks are enqueued with key {time, i} to ensure FIFO order for a given time
1212- i =
1313- case :ets.prev(table, {time, :infinity}) do
1414- {^time, i} -> i + 1
1515- _ -> 0
1616- end
1717-1818- key = {time, i}
1919- :ets.insert(table, {key, task})
2020- {:ok, key}
2121- end
2222-2323- @spec remove(t, key) :: :ok
2424- def remove(table, key) do
2525- case :ets.member(table, key) do
2626- true ->
2727- :ets.delete(table, key)
2828- :ok
2929- false ->
3030- raise "Tried to remove key that does not exist: #{inspect(key)}"
3131- end
3232- end
3333-3434- @spec pop_next(t) :: {:ok, {non_neg_integer, term}} | {:error, :empty}
3535- def pop_next(table) do
3636- case :ets.first_lookup(table) do
3737- {_key, [{{time, _i} = key, task}]} ->
3838- :ets.delete(table, key)
3939- {:ok, {time, task}}
4040-4141- :"$end_of_table" ->
4242- {:error, :empty}
4343- end
4444- end
4545-end
-58
lib/construct/scheduler/proc_registry.ex
···11-defmodule Hobbes.Construct.Scheduler.ProcRegistry do
22- @type t :: :ets.table
33-44- @spec new :: t
55- def new do
66- :ets.new(:proc_registry, [:set, :private])
77- end
88-99- @spec register(t, pid, atom, atom) :: :ok | {:error, :name_already_registered | :pid_already_registered}
1010- def register(table, pid, name, node) when is_pid(pid) and is_atom(name) and is_atom(node) do
1111- case whereis(table, name, node) do
1212- {:error, :not_found} ->
1313- case :ets.lookup(table, pid) do
1414- [] ->
1515- :ets.insert(table, {{name, node}, pid})
1616- :ets.insert(table, {pid, {name, node}})
1717- :ok
1818-1919- [{^pid, _value}] -> {:error, :pid_already_registered}
2020- end
2121-2222- {:ok, _pid} -> {:error, :name_already_registered}
2323- end
2424- end
2525-2626- @spec whereis(t, atom, atom) :: {:ok, pid} | {:error, :not_found}
2727- def whereis(table, name, node) when is_atom(name) and is_atom(node) do
2828- case :ets.lookup(table, {name, node}) do
2929- [{{^name, ^node}, pid}] -> {:ok, pid}
3030- [] -> {:error, :not_found}
3131- end
3232- end
3333-3434- @spec unregister(t, atom, atom) :: :ok | :error
3535- def unregister(table, name, node) when is_atom(name) and is_atom(node) do
3636- case whereis(table, name, node) do
3737- {:ok, pid} ->
3838- :ets.delete(table, {name, node})
3939- :ets.delete(table, pid)
4040- :ok
4141-4242- {:error, :not_found} ->
4343- :error
4444- end
4545- end
4646-4747- @spec remove_process(t, pid) :: :ok | :error
4848- def remove_process(table, pid) when is_pid(pid) do
4949- case :ets.lookup(table, pid) do
5050- [{pid, {name, node}}] ->
5151- :ets.delete(table, {name, node})
5252- :ets.delete(table, pid)
5353- :ok
5454-5555- [] -> :error
5656- end
5757- end
5858-end
-220
lib/construct/scheduler/proc_store.ex
···11-defmodule Hobbes.Construct.Scheduler.ProcStore do
22- alias Hobbes.Construct.Scheduler.{ProcStore, ProcQueue}
33-44- @type check_fun :: (-> boolean)
55-66- defmodule ProcState do
77- @type t :: %__MODULE__{
88- pid: pid,
99- node: atom,
1010- monitors: %{reference => pid},
1111- monitored_by: [{reference, pid}],
1212- linked_to: [pid],
1313- trap_exit: boolean,
1414- queue_key: ProcQueue.key | nil,
1515- await: {ProcStore.check_fun, term} | nil,
1616- }
1717- @enforce_keys [:pid, :node]
1818- defstruct [
1919- monitors: %{},
2020- monitored_by: [],
2121- linked_to: [],
2222- trap_exit: false,
2323- queue_key: nil,
2424- await: nil,
2525- ] ++ @enforce_keys
2626- end
2727-2828- @type t :: %__MODULE__{
2929- proc_table: :ets.table,
3030- alias_table: :ets.table,
3131- }
3232-3333- @enforce_keys [:proc_table, :alias_table]
3434- defstruct @enforce_keys
3535-3636- def new do
3737- %ProcStore{
3838- proc_table: :ets.new(:proc_table, [:set, :private]),
3939- alias_table: :ets.new(:alias_table, [:set, :private]),
4040- }
4141- end
4242-4343- @spec add_process(t, pid, atom) :: :ok
4444- def add_process(%ProcStore{} = ps, pid, node) when is_pid(pid) and is_atom(node) do
4545- state = %ProcState{pid: pid, node: node}
4646-4747- :ets.insert(ps.proc_table, {pid, state})
4848- :ok
4949- end
5050-5151- @spec remove_process(t, pid) :: :ok
5252- def remove_process(%ProcStore{} = ps, pid) when is_pid(pid) do
5353- case fetch_state(ps, pid) do
5454- {:ok, _proc} ->
5555- :ets.delete(ps.proc_table, pid)
5656- :ok
5757- :error ->
5858- raise "Process #{inspect(pid)} cannot be removed because it does not exist"
5959- end
6060- end
6161-6262- @spec fetch_state(t, pid) :: {:ok, ProcState.t} | :error
6363- def fetch_state(%ProcStore{} = ps, pid) when is_pid(pid) do
6464- case :ets.lookup(ps.proc_table, pid) do
6565- [{^pid, %ProcState{} = state}] ->
6666- {:ok, state}
6767- _ ->
6868- :error
6969- end
7070- end
7171-7272- @spec track_queued(t, pid, ProcQueue.key) :: :ok
7373- def track_queued(%ProcStore{} = ps, pid, {_, _} = queue_key) when is_pid(pid) do
7474- {:ok, %ProcState{} = state} = fetch_state(ps, pid)
7575- if state.queue_key != nil, do: raise "Process is already in queue: #{inspect(state)}"
7676-7777- state = %{state | queue_key: queue_key}
7878- :ets.insert(ps.proc_table, {pid, state})
7979- :ok
8080- end
8181-8282- @spec clear_queued(t, pid) :: :ok
8383- def clear_queued(%ProcStore{} = ps, pid) do
8484- {:ok, %ProcState{} = state} = fetch_state(ps, pid)
8585-8686- :ets.insert(ps.proc_table, {pid, %{state | queue_key: nil}})
8787- :ok
8888- end
8989-9090- @spec track_await(t, pid, check_fun, term, ProcQueue.key | nil) :: :ok
9191- def track_await(%ProcStore{} = ps, pid, check_fun, resume, timeout_queue_key)
9292- when is_pid(pid) and is_function(check_fun) and is_struct(resume) and (is_nil(timeout_queue_key) or is_tuple(timeout_queue_key)) do
9393- {:ok, %ProcState{} = state} = fetch_state(ps, pid)
9494- if state.queue_key != nil, do: raise "Process is in queue: #{inspect(state)}"
9595-9696- state = %{state | await: {check_fun, resume}, queue_key: timeout_queue_key}
9797- :ets.insert(ps.proc_table, {pid, state})
9898- :ok
9999- end
100100-101101- @spec clear_await(t, pid) :: :ok
102102- def clear_await(%ProcStore{} = ps, pid) do
103103- {:ok, %ProcState{} = state} = fetch_state(ps, pid)
104104- if !state.await, do: raise "Process is not awaiting: #{inspect(state)}"
105105-106106- state = %{state | await: nil, queue_key: nil}
107107- :ets.insert(ps.proc_table, {pid, state})
108108- :ok
109109- end
110110-111111- @spec add_link(t, pid, pid) :: :ok | :error
112112- def add_link(%ProcStore{} = ps, pid1, pid2) do
113113- {:ok, %ProcState{} = state1} = fetch_state(ps, pid1)
114114- {:ok, %ProcState{} = state2} = fetch_state(ps, pid2)
115115-116116- case pid2 in state1.linked_to do
117117- false ->
118118- state1 = Map.update!(state1, :linked_to, &[pid2 | &1])
119119- state2 = Map.update!(state2, :linked_to, &[pid1 | &1])
120120-121121- :ets.insert(ps.proc_table, {pid1, state1})
122122- :ets.insert(ps.proc_table, {pid2, state2})
123123- :ok
124124-125125- true -> :error
126126- end
127127- end
128128-129129- @spec remove_link(t, pid, pid) :: :ok | :error
130130- def remove_link(%ProcStore{} = ps, pid1, pid2) do
131131- {:ok, %ProcState{} = state1} = fetch_state(ps, pid1)
132132- {:ok, %ProcState{} = state2} = fetch_state(ps, pid2)
133133-134134- case pid2 in state1.linked_to do
135135- true ->
136136- state1 = Map.update!(state1, :linked_to, &List.delete(&1, pid2))
137137- state2 = Map.update!(state2, :linked_to, &List.delete(&1, pid1))
138138-139139- :ets.insert(ps.proc_table, {pid1, state1})
140140- :ets.insert(ps.proc_table, {pid2, state2})
141141- :ok
142142-143143- false -> :error
144144- end
145145- end
146146-147147- @spec add_monitor(t, pid, pid, reference) :: :ok
148148- def add_monitor(%ProcStore{} = ps, pid, target_pid, ref) when is_pid(pid) and is_pid(target_pid) and is_reference(ref) do
149149- {:ok, %ProcState{} = proc_state} = fetch_state(ps, pid)
150150- {:ok, %ProcState{} = target_state} = fetch_state(ps, target_pid)
151151-152152- monitors = Map.put(proc_state.monitors, ref, target_pid)
153153- monitored_by = [{ref, pid} | target_state.monitored_by]
154154-155155- :ets.insert(ps.proc_table, {pid, %{proc_state | monitors: monitors}})
156156- :ets.insert(ps.proc_table, {target_pid, %{target_state | monitored_by: monitored_by}})
157157- :ok
158158- end
159159-160160- @spec remove_monitor(t, pid, reference) :: :ok
161161- def remove_monitor(%ProcStore{} = ps, pid, ref) do
162162- {:ok, %ProcState{} = proc_state} = fetch_state(ps, pid)
163163-164164- {target_pid, monitors} = Map.pop!(proc_state.monitors, ref)
165165-166166- {:ok, %ProcState{} = target_state} = fetch_state(ps, target_pid)
167167- monitored_by =
168168- target_state.monitored_by
169169- |> Enum.filter(fn
170170- {^ref, _pid} -> false
171171- _ -> true
172172- end)
173173-174174- :ets.insert(ps.proc_table, {pid, %{proc_state | monitors: monitors}})
175175- :ets.insert(ps.proc_table, {target_pid, %{target_state | monitored_by: monitored_by}})
176176- :ok
177177- end
178178-179179- @spec add_alias(t, pid, reference) :: :ok
180180- def add_alias(%ProcStore{} = ps, pid, alias) when is_pid(pid) and is_reference(alias) do
181181- :ets.insert(ps.alias_table, {alias, pid})
182182- :ok
183183- end
184184-185185- @spec remove_alias(t, pid, reference) :: :ok | {:error, :wrong_process | :not_found}
186186- def remove_alias(%ProcStore{} = ps, pid, alias) when is_pid(pid) and is_reference(alias) do
187187- case resolve_alias(ps, alias) do
188188- {:ok, ^pid} ->
189189- :ets.delete(ps.alias_table, alias)
190190- :ok
191191-192192- {:ok, _other} -> {:error, :wrong_process}
193193- :error -> {:error, :not_found}
194194- end
195195- end
196196-197197- @spec resolve_alias(t, reference) :: {:ok, pid} | :error
198198- def resolve_alias(%ProcStore{} = ps, alias) when is_reference(alias) do
199199- case :ets.lookup(ps.alias_table, alias) do
200200- [{^alias, pid}] when is_pid(pid) ->
201201- {:ok, pid}
202202- [] ->
203203- :error
204204- end
205205- end
206206-207207- @spec set_flag(t, pid, :trap_exit, boolean) :: {:ok, boolean}
208208- def set_flag(%ProcStore{} = ps, pid, :trap_exit, value) when is_boolean(value) do
209209- {:ok, %ProcState{} = state} = fetch_state(ps, pid)
210210-211211- :ets.insert(ps.proc_table, {pid, %{state | trap_exit: value}})
212212- {:ok, state.trap_exit}
213213- end
214214-215215- @spec list_processes(t) :: [ProcState.t]
216216- def list_processes(%ProcStore{} = ps) do
217217- :ets.tab2list(ps.proc_table)
218218- |> Enum.map(fn {_pid, %ProcState{} = proc} -> proc end)
219219- end
220220-end
-74
lib/construct/sim_file.ex
···11-defmodule Hobbes.Construct.SimFile do
22- alias Hobbes.Construct.Scheduler
33- alias Hobbes.Construct.Scheduler.FileStore
44-55- import Hobbes.Construct.SimServer, only: [get_scheduler_pid: 0]
66-77- @spec mkdir(binary) :: :ok | {:error, File.posix}
88- def mkdir(path) when is_binary(path) do
99- case get_scheduler_pid() do
1010- spid when is_pid(spid) ->
1111- fs = Scheduler.get_file_store(spid)
1212- FileStore.mkdir(fs, path)
1313- end
1414- end
1515-1616- @spec mkdir_p(binary) :: :ok | {:error, File.posix}
1717- def mkdir_p(path) when is_binary(path) do
1818- case get_scheduler_pid() do
1919- spid when is_pid(spid) ->
2020- fs = Scheduler.get_file_store(spid)
2121- FileStore.mkdir_p(fs, path)
2222- end
2323- end
2424-2525- @spec write(binary, binary) :: :ok | {:error, File.posix}
2626- def write(path, contents) when is_binary(path) and is_binary(contents) do
2727- case get_scheduler_pid() do
2828- spid when is_pid(spid) ->
2929- fs = Scheduler.get_file_store(spid)
3030- FileStore.write(fs, path, contents)
3131- end
3232- end
3333-3434- @spec read(binary) :: {:ok, binary} | {:error, File.posix}
3535- def read(path) when is_binary(path) do
3636- case get_scheduler_pid() do
3737- spid when is_pid(spid) ->
3838- fs = Scheduler.get_file_store(spid)
3939- FileStore.read(fs, path)
4040- end
4141- end
4242-4343- @spec ls(binary) :: {:ok, [binary]}
4444- def ls(path) when is_binary(path) do
4545- case get_scheduler_pid() do
4646- spid when is_pid(spid) ->
4747- fs = Scheduler.get_file_store(spid)
4848- FileStore.ls(fs, path)
4949- end
5050- end
5151-5252- @spec exists?(binary) :: boolean
5353- def exists?(path) when is_binary(path) do
5454- case get_scheduler_pid() do
5555- spid when is_pid(spid) ->
5656- fs = Scheduler.get_file_store(spid)
5757- FileStore.exists?(fs, path)
5858- end
5959- end
6060-6161- @doc false
6262- def dump_paths do
6363- case get_scheduler_pid() do
6464- spid when is_pid(spid) ->
6565- fs = Scheduler.get_file_store(spid)
6666-6767- FileStore.dump(fs)
6868- |> Enum.map(fn {path, _contents} -> path end)
6969-7070- nil ->
7171- raise "SimFile.dump_paths/0 is a debug function which can only be called in simulation"
7272- end
7373- end
7474-end
-106
lib/construct/sim_internal.ex
···11-defmodule Hobbes.Construct.SimInternal do
22- @moduledoc """
33- Internal functions for running a `SimServer` (registration, receive loop, etc).
44-55- Similar to erlang's `:gen` but for `SimServer`.
66- """
77-88- require Logger
99- alias Hobbes.Construct.{SimServer, Scheduler}
1010- import Hobbes.Construct.SimServer, only: [yield_receive: 2, yield_receive: 3, fetch_scheduler_pid!: 0]
1111-1212- @type name :: {:global, term}
1313-1414- @spec start(module, term, Keyword.t) :: {:ok, pid}
1515- def start(module, init_arg, opts) do
1616- scheduler_pid = fetch_scheduler_pid!()
1717- pid = Scheduler.spawn_and_yield(scheduler_pid, :link, __MODULE__, :server_init, [module, init_arg, opts, self()])
1818-1919- # We await ack from the spawned SimServer just like :proc_lib.start()
2020- # Timeout is for sanity, an init should never take so long
2121- yield_receive(scheduler_pid, 60_000) do
2222- {:ack, ^pid} ->
2323- if name = opts[:name], do: SimServer.register(pid, name)
2424- {:ok, pid}
2525-2626- # TODO: error handling?
2727- end
2828- end
2929-3030- # These functions run within the SimServer process
3131-3232- def server_init(module, arg, _options, parent)
3333- when is_atom(module) and is_pid(parent) do
3434- {:ok, initial_state} = module.init(arg)
3535-3636- SimServer.send parent, {:ack, self()}
3737- sim_loop(module, initial_state)
3838- end
3939-4040- defp sim_loop(module, state) do
4141- state = yield_receive(fetch_scheduler_pid!()) do
4242- message -> dispatch(message, module, state)
4343- end
4444-4545- sim_loop(module, state)
4646- end
4747-4848- defp dispatch({:"$sim_call", {_pid, [:alias | alias_ref] = tag} = from, request}, module, state) do
4949- try do
5050- module.handle_call(request, from, state)
5151- catch
5252- :exit, reason ->
5353- log_exit(reason, __STACKTRACE__, state)
5454- exit(reason)
5555- end
5656- |> case do
5757- {:reply, response, state} ->
5858- SimServer.send alias_ref, {tag, response}
5959- state
6060- {:noreply, state} ->
6161- state
6262- # TODO: error message for invalid reply
6363- end
6464- end
6565-6666- defp dispatch({:"$sim_cast", request}, module, state) do
6767- try do
6868- module.handle_cast(request, state)
6969- catch
7070- :exit, reason ->
7171- log_exit(reason, __STACKTRACE__, state)
7272- exit(reason)
7373- end
7474- |> case do
7575- {:noreply, state} -> state
7676- # TODO: error message for invalid reply
7777- end
7878- end
7979-8080- defp dispatch(message, module, state) do
8181- try do
8282- module.handle_info(message, state)
8383- catch
8484- :exit, reason ->
8585- log_exit(reason, __STACKTRACE__, state)
8686- exit(reason)
8787- end
8888- |> case do
8989- {:noreply, state} -> state
9090- # TODO: error message for invalid reply
9191- end
9292- end
9393-9494- defp log_exit(:normal, _stacktrace, _state), do: :noop
9595- defp log_exit(:shutdown, _stacktrace, _state), do: :noop
9696-9797- defp log_exit(reason, stacktrace, state) do
9898- require Logger
9999- Logger.error """
100100- SimServer #{inspect(self())} terminating
101101- ** (stop) #{Exception.format_exit(reason)}
102102- #{Exception.format_stacktrace(stacktrace)}\
103103- State: #{inspect(state)}\
104104- """
105105- end
106106-end
-124
lib/construct/sim_log.ex
···11-defmodule Hobbes.Construct.SimLog do
22- use GenServer
33-44- defmodule Known do
55- @enforce_keys [:pids, :refs]
66- defstruct @enforce_keys
77- end
88-99- defmodule State do
1010- @type t :: %__MODULE__{
1111- num_events: non_neg_integer,
1212- rolling_hash: term,
1313- known: %Known{},
1414- log: list,
1515- }
1616- @enforce_keys [:known, :log, :rolling_hash, :num_events]
1717- defstruct @enforce_keys
1818- end
1919-2020- def start_link(args) do
2121- GenServer.start_link(__MODULE__, args)
2222- end
2323-2424- @spec get_log(term) :: list
2525- def get_log(server) do
2626- GenServer.call(server, :get_log)
2727- end
2828-2929- @spec log(term, term) :: :ok
3030- def log(server, event) do
3131- GenServer.cast(server, {:log, event})
3232- end
3333-3434- def init(_) do
3535- {:ok, %State{
3636- num_events: 0,
3737- rolling_hash: nil,
3838- known: %Known{pids: %{}, refs: %{}},
3939- log: [],
4040- }}
4141- end
4242-4343- def handle_call(:get_log, _from, %State{} = state) do
4444- response = %{
4545- num_events: state.num_events,
4646- rolling_hash: state.rolling_hash,
4747- known: state.known,
4848- log: Enum.reverse(state.log),
4949- }
5050- {:reply, response, state}
5151- end
5252-5353- def handle_cast({:log, event}, %State{} = state) do
5454- {known, event} = homogenize(state.known, event)
5555-5656- state = %{state |
5757- num_events: state.num_events + 1,
5858- rolling_hash: :erlang.phash2({state.rolling_hash, event}),
5959- known: known,
6060- # TODO: configurable
6161- #log: [event | state.log],
6262- }
6363-6464- {:noreply, state}
6565- end
6666-6767- defmodule Homogenized do
6868- @enforce_keys [:type, :i]
6969- defstruct @enforce_keys
7070- end
7171-7272- defimpl Inspect, for: Homogenized do
7373- def inspect(%Homogenized{type: :pid, i: i}, _opts), do: "#sPID<#{i}>"
7474- def inspect(%Homogenized{type: :ref, i: i}, _opts), do: "#sRef<#{i}>"
7575- end
7676-7777- defp homogenize(%Known{} = known, value) when is_pid(value) do
7878- case Map.get(known.pids, value) do
7979- nil ->
8080- hmg = %Homogenized{type: :pid, i: map_size(known.pids)}
8181- known = %Known{known | pids: Map.put(known.pids, value, hmg)}
8282- {known, Map.fetch!(known.pids, value)}
8383- %Homogenized{} = hmg ->
8484- {known, hmg}
8585- end
8686- end
8787-8888- defp homogenize(%Known{} = known, value) when is_reference(value) do
8989- case Map.get(known.refs, value) do
9090- nil ->
9191- hmg = %Homogenized{type: :ref, i: map_size(known.refs)}
9292- known = %Known{known | refs: Map.put(known.refs, value, hmg)}
9393- {known, Map.fetch!(known.refs, value)}
9494- %Homogenized{} = hmg ->
9595- {known, hmg}
9696- end
9797- end
9898-9999- defp homogenize(%Known{} = known, [:alias | ref]) when is_reference(ref) do
100100- {known, h_ref} = homogenize(known, ref)
101101- {known, [:alias | h_ref]}
102102- end
103103-104104- defp homogenize(%Known{} = known, list) when is_list(list) do
105105- {known, h_list} = Enum.reduce(list, {known, []}, fn element, {known, acc_list} ->
106106- {k, h_el} = homogenize(known, element)
107107- {k, [h_el | acc_list]}
108108- end)
109109- {known, Enum.reverse(h_list)}
110110- end
111111-112112- defp homogenize(%Known{} = known, tuple) when is_tuple(tuple) do
113113- {known, h_list} = homogenize(known, Tuple.to_list(tuple))
114114- {known, List.to_tuple(h_list)}
115115- end
116116-117117- # This one also handles structs
118118- defp homogenize(%Known{} = known, map) when is_map(map) do
119119- {known, h_list} = homogenize(known, Map.to_list(map))
120120- {known, Map.new(h_list)}
121121- end
122122-123123- defp homogenize(%Known{} = known, value), do: {known, value}
124124-end
-458
lib/construct/sim_server.ex
···11-defmodule Hobbes.Construct.SimServer do
22- alias Hobbes.Construct.{SimInternal, Scheduler, SimLog}
33-44- # TODO: support more GenServer init/1 return types here
55- @callback init(init_arg :: term) :: {:ok, term}
66-77- defmacro __using__(_opts) do
88- quote do
99- alias Hobbes.Construct.SimServer
1010- use GenServer
1111- end
1212- end
1313-1414- @doc """
1515- Yields until a matching message is received.
1616-1717- ## Examples
1818-1919- yield_receive(get_scheduler_pid()) do
2020- :good_message -> :ok
2121- :bad_message -> :error
2222- end
2323-2424- """
2525- defmacro yield_receive(scheduler_pid, timeout \\ :infinity, [do: do_block]) do
2626- # Here, we use `do_block` to build a function which
2727- # checks if a value matches the intended receive block
2828- #
2929- # e.g.
3030- #
3131- # yield_receive do
3232- # {:ok, :hello} -> :ok
3333- # {:error, error} -> error
3434- # end
3535- #
3636- # becomes...
3737- #
3838- # fn
3939- # {:ok, :hello} -> true
4040- # {:error, error} -> true
4141- # _ -> false
4242- # end
4343- check_block =
4444- Enum.map(do_block, fn {:->, _meta, [clause, _result]} ->
4545- {:->, [], [clause, true]}
4646- end)
4747- final_clause =
4848- quote do
4949- _ -> false
5050- end
5151- check_block = check_block ++ final_clause
5252- check_func_ast = {:fn, [], check_block}
5353- # Walk the ast and mark as generated to prevent warnings about unused variables
5454- # and redundant clauses
5555- # (also remove line numbers just because)
5656- check_func_ast = Macro.prewalk(check_func_ast, fn
5757- {a, meta, b} -> {a, meta |> Keyword.drop([:line, :column]) |> Keyword.put(:generated, true), b}
5858- other -> other
5959- end)
6060-6161- quote do
6262- check_func = unquote(check_func_ast)
6363-6464- # We use a ref to communicate whether a value was actually received
6565- # (since it is impossible for receive to return our unique ref)
6666- ref = make_ref()
6767- receive_func = fn ->
6868- receive do
6969- unquote(do_block)
7070- after
7171- 0 -> ref
7272- end
7373- end
7474-7575- Hobbes.Construct.Scheduler.yield(unquote(scheduler_pid))
7676- # We first check if the message queue already has a matching message
7777- # If so, we don't want to yield_until_message because the Scheduler would
7878- # have no way of knowing that a matching message is already in our queue
7979- case receive_func.() do
8080- ^ref ->
8181- # If there is no matching message, we yield until one is sent to us
8282- Hobbes.Construct.Scheduler.yield_until_message(unquote(scheduler_pid), check_func, unquote(timeout))
8383-8484- # Now that a message has arrived, we extract it with the receive
8585- # The case is a sanity check
8686- case receive_func.() do
8787- ^ref -> raise "Resumed for message but no message received!"
8888- value -> value
8989- end
9090-9191- # A matching message was already in our queue, so we simply return it
9292- value ->
9393- value
9494- end
9595- end
9696- end
9797-9898- @type request_id :: :gen_server.request_id | reference
9999-100100- @scheduler_key :scheduler_key
101101- def set_scheduler_pid(pid), do: nil = Process.put(@scheduler_key, pid)
102102- def get_scheduler_pid, do: Process.get(@scheduler_key)
103103- def fetch_scheduler_pid!, do: get_scheduler_pid() || raise "No scheduler pid!"
104104-105105- @spec simulated? :: boolean
106106- def simulated?, do: get_scheduler_pid() != nil
107107-108108- @spec start_scheduler() :: {:ok, pid}
109109- def start_scheduler(seed \\ 100) do
110110- {:ok, scheduler_pid} = Scheduler.start_link(seed)
111111-112112- {:ok, log_pid} = SimLog.start_link(nil)
113113- Scheduler.configure_log_server(scheduler_pid, log_pid)
114114-115115- {:ok, scheduler_pid}
116116- end
117117-118118- @doc """
119119- Starts a `SimServer`.
120120-121121- ## Examples
122122-123123- iex> SimServer.start(__MODULE__, nil, name: {:global, :my_server})
124124- {:ok, #PID{0.0.0}}
125125-126126- """
127127- @spec start_link(module, term, Keyword.t) :: GenServer.on_start
128128- def start_link(module, init_arg, options \\ []) do
129129- case get_scheduler_pid() do
130130- nil ->
131131- GenServer.start_link(module, init_arg, options)
132132- scheduler_pid when is_pid(scheduler_pid) ->
133133- SimInternal.start(module, init_arg, options)
134134- end
135135- end
136136-137137- @doc """
138138- Performs a `SimServer` call.
139139-140140- ## Examples
141141-142142- iex> SimServer.call({:global, :my_server}, :foo)
143143- :bar
144144-145145- """
146146- @spec call(term, term, timeout) :: term
147147- def call(server, request, timeout \\ 5000) do
148148- case get_scheduler_pid() do
149149- nil ->
150150- GenServer.call(server, request, timeout)
151151- scheduler_pid when is_pid(scheduler_pid) ->
152152- sim_call(scheduler_pid, server, request, timeout)
153153- end
154154- end
155155-156156- @doc """
157157- Sends a request to a SimServer.
158158-159159- See `:gen_server.send_request/2` and `SimServer.receive_response/2`.
160160-161161- ## Examples
162162-163163- iex> req_id = SimServer.send_request(pid, :foo)
164164- iex> SimServer.receive_response(req_id)
165165-166166- """
167167- @spec send_request(term, term) :: request_id
168168- def send_request(server, request) do
169169- case get_scheduler_pid() do
170170- nil ->
171171- :gen_server.send_request(server, request)
172172- scheduler_pid when is_pid(scheduler_pid) ->
173173- sim_send_request(scheduler_pid, server, request)
174174- end
175175- end
176176-177177- @doc """
178178- Receives a response from a SimServer.
179179-180180- See `:gen_server.receive_response/2` and `SimServer.send_request/2`.
181181- """
182182- @spec receive_response(request_id, timeout) :: {:reply, term} | :timeout
183183- def receive_response(request_id, timeout \\ 5000) do
184184- case get_scheduler_pid() do
185185- nil ->
186186- :gen_server.receive_response(request_id, timeout)
187187- scheduler_pid when is_pid(scheduler_pid) ->
188188- sim_receive_response(scheduler_pid, request_id, timeout)
189189- end
190190- |> case do
191191- {:reply, _reply} = reply -> reply
192192- # Coalesce errors into timeouts for now
193193- {:error, _err} -> :timeout
194194- :timeout -> :timeout
195195- end
196196- end
197197-198198- defp sim_call(scheduler_pid, server, request, timeout) do
199199- alias_ref = sim_send_request(scheduler_pid, server, request)
200200-201201- try do
202202- yield_receive(scheduler_pid, timeout) do
203203- {[:alias | ^alias_ref], reply} -> reply
204204- end
205205- catch
206206- :exit, reason ->
207207- true = Scheduler.unalias(scheduler_pid, alias_ref)
208208- receive do
209209- {[:alias | ^alias_ref], reply} -> reply
210210- after
211211- 0 -> exit({reason, {__MODULE__, :call, [server, request, timeout]}})
212212- end
213213- end
214214- end
215215-216216- @dialyzer {:no_improper_lists, sim_send_request: 3}
217217- defp sim_send_request(scheduler_pid, server, request) do
218218- alias_ref = Scheduler.alias(scheduler_pid)
219219- Scheduler.send(scheduler_pid, server, {:"$sim_call", {self(), [:alias | alias_ref]}, request})
220220-221221- alias_ref
222222- end
223223-224224- defp sim_receive_response(scheduler_pid, request_id, timeout) when is_reference(request_id) do
225225- alias_ref = request_id
226226- # TODO: catch timeout exit
227227- try do
228228- yield_receive(scheduler_pid, timeout) do
229229- {[:alias | ^alias_ref], reply} -> {:reply, reply}
230230- end
231231- catch
232232- :exit, :timeout ->
233233- true = Scheduler.unalias(scheduler_pid, alias_ref)
234234- receive do
235235- {[:alias | ^alias_ref], reply} -> {:reply, reply}
236236- after
237237- 0 -> :timeout
238238- end
239239- end
240240- end
241241-242242- @doc """
243243- Performs a `SimServer` cast.
244244-245245- ## Examples
246246-247247- iex> SimServer.cast({:global, :my_server}, :foo}
248248- :ok
249249-250250- """
251251- @spec cast(term, term) :: :ok
252252- def cast(server, request) do
253253- case get_scheduler_pid() do
254254- nil ->
255255- GenServer.cast(server, request)
256256- scheduler_pid when is_pid(scheduler_pid) ->
257257- sim_cast(scheduler_pid, server, request)
258258- end
259259- end
260260-261261- defp sim_cast(scheduler_pid, server, request) do
262262- Scheduler.send(scheduler_pid, server, {:"$sim_cast", request})
263263- :ok
264264- end
265265-266266- @spec reply(GenServer.from, term) :: :ok
267267- def reply(client, response) do
268268- case get_scheduler_pid() do
269269- nil ->
270270- GenServer.reply(client, response)
271271-272272- scheduler_pid when is_pid(scheduler_pid) ->
273273- {_client_pid, [:alias | alias_ref] = tag} = client
274274- Scheduler.send(scheduler_pid, alias_ref, {tag, response})
275275- :ok
276276- end
277277- end
278278-279279- @spec spawn(function) :: pid
280280- def spawn(fun) when is_function(fun) do
281281- case get_scheduler_pid() do
282282- scheduler_pid when is_pid(scheduler_pid) ->
283283- Scheduler.spawn_and_yield(scheduler_pid, :nolink, Kernel, :apply, [fun, []])
284284- nil ->
285285- Kernel.spawn(fun)
286286- end
287287- end
288288-289289- @spec spawn_link(function) :: pid
290290- def spawn_link(fun) when is_function(fun) do
291291- case get_scheduler_pid() do
292292- scheduler_pid when is_pid(scheduler_pid) ->
293293- Scheduler.spawn_and_yield(scheduler_pid, :link, Kernel, :apply, [fun, []])
294294- nil ->
295295- Kernel.spawn_link(fun)
296296- end
297297- end
298298-299299- @spec sleep(non_neg_integer) :: term
300300- def sleep(time) do
301301- case get_scheduler_pid() do
302302- scheduler_pid when is_pid(scheduler_pid) ->
303303- Scheduler.yield(scheduler_pid, time)
304304- nil ->
305305- :timer.sleep(time)
306306- end
307307- end
308308-309309- @spec send(pid, term) :: term
310310- def send(dest, message) do
311311- case get_scheduler_pid() do
312312- scheduler_pid when is_pid(scheduler_pid) ->
313313- # TODO: time=nil
314314- Scheduler.send(scheduler_pid, dest, message, 0)
315315- nil ->
316316- Kernel.send(dest, message)
317317- end
318318- end
319319-320320- @spec send_after(pid, term, non_neg_integer) :: term
321321- def send_after(dest, message, time) do
322322- case get_scheduler_pid() do
323323- scheduler_pid when is_pid(scheduler_pid) ->
324324- Scheduler.send(scheduler_pid, dest, message, time)
325325- nil ->
326326- Process.send_after(dest, message, time)
327327- end
328328- end
329329-330330- @spec exit(pid, term) :: term
331331- def exit(pid, reason) do
332332- case get_scheduler_pid() do
333333- scheduler_pid when is_pid(scheduler_pid) ->
334334- Scheduler.exit(scheduler_pid, pid, reason)
335335- nil ->
336336- Process.exit(pid, reason)
337337- end
338338- end
339339-340340- @spec node :: atom
341341- def node do
342342- case get_scheduler_pid() do
343343- scheduler_pid when is_pid(scheduler_pid) ->
344344- Scheduler.get_current_node(scheduler_pid)
345345- nil ->
346346- # TODO
347347- raise "Not supported"
348348- end
349349- end
350350-351351- @spec list_nodes :: [atom]
352352- def list_nodes do
353353- if not simulated?(), do: raise "list_nodes/0 can only be called in simulation"
354354- scheduler_pid = fetch_scheduler_pid!()
355355-356356- Scheduler.list_nodes(scheduler_pid)
357357- end
358358-359359- @spec start_node(atom, module, term) :: :ok
360360- def start_node(name, app_module, args) when is_atom(name) and is_atom(app_module) do
361361- if not simulated?(), do: raise "start_node/3 can only be called in simulation"
362362- scheduler_pid = fetch_scheduler_pid!()
363363-364364- Scheduler.start_node(scheduler_pid, name, app_module, args)
365365- end
366366-367367- @spec restart_node(atom, non_neg_integer) :: :ok | {:error, :node_not_found | :node_stopped}
368368- def restart_node(name, delay_ms \\ 0) when is_atom(name) and is_integer(delay_ms) do
369369- if not simulated?(), do: raise "restart_node/1 can only be called in simulation"
370370- scheduler_pid = fetch_scheduler_pid!()
371371-372372- Scheduler.restart_node(scheduler_pid, name, delay_ms)
373373- end
374374-375375- @spec monitor(pid) :: reference
376376- def monitor(monitor_pid) do
377377- case get_scheduler_pid() do
378378- scheduler_pid when is_pid(scheduler_pid) ->
379379- Scheduler.monitor(scheduler_pid, monitor_pid)
380380- nil ->
381381- Process.monitor(monitor_pid)
382382- end
383383- end
384384-385385- @spec flag(:trap_exit, boolean) :: boolean
386386- def flag(flag, value) when flag in [:trap_exit] and is_boolean(value) do
387387- case get_scheduler_pid() do
388388- scheduler_pid when is_pid(scheduler_pid) ->
389389- Scheduler.set_process_flag(scheduler_pid, flag, value)
390390- nil ->
391391- Process.flag(flag, value)
392392- end
393393- end
394394-395395- @spec register(pid, atom) :: true
396396- def register(pid, name) when is_pid(pid) and is_atom(name) do
397397- case get_scheduler_pid() do
398398- scheduler_pid when is_pid(scheduler_pid) ->
399399- case Scheduler.register_process(scheduler_pid, pid, name) do
400400- :ok -> true
401401- :error -> raise ArgumentError
402402- end
403403- nil ->
404404- Process.register(pid, name)
405405- end
406406- end
407407-408408- @spec whereis(atom) :: pid | nil
409409- def whereis(name) do
410410- case get_scheduler_pid() do
411411- scheduler_pid when is_pid(scheduler_pid) ->
412412- Scheduler.whereis(scheduler_pid, name)
413413- nil ->
414414- Process.whereis(name)
415415- end
416416- end
417417-418418- @doc """
419419- Returns a monotonic microsecond timestamp.
420420-421421- Outside of simulation this is UNIX time.
422422-423423- Inside the simulation the timestamp starts at `0`.
424424-425425- ## Examples
426426-427427- iex> current_time()
428428- 1000
429429-430430- iex> current_time()
431431- 1100
432432-433433- """
434434- @spec current_time :: non_neg_integer
435435- def current_time do
436436- case get_scheduler_pid() do
437437- scheduler_pid when is_pid(scheduler_pid) ->
438438- Scheduler.current_time(scheduler_pid)
439439- nil ->
440440- System.monotonic_time(:microsecond)
441441- end
442442- end
443443-444444- @spec deterministic_random(Enum.t) :: Enum.element
445445- def deterministic_random(enumerable), do: Enum.random(enumerable)
446446-447447- @spec simulate_work(integer | [integer]) :: :ok
448448- def simulate_work(delay_or_delays) do
449449- case get_scheduler_pid() do
450450- nil -> :noop
451451- pid -> do_work_delay(delay_or_delays, pid)
452452- end
453453- :ok
454454- end
455455-456456- defp do_work_delay([_ | _] = delays, pid), do: Scheduler.yield(pid, Enum.random(delays))
457457- defp do_work_delay(delay, pid) when is_integer(delay), do: Scheduler.yield(pid, delay)
458458-end
-60
lib/construct/sim_utils.ex
···11-defmodule Hobbes.Construct.SimUtils do
22- require Logger
33-44- @doc """
55- Builds a printable message for the given log.
66-77- Options:
88- - `:full` - returns a full log message including PIDs and events.
99- """
1010- @spec build_log_message(%{log: list, known: map}, keyword) :: String.t
1111- def build_log_message(%{log: log, known: known} = log_state, opts \\ []) do
1212- pid_text =
1313- known.pids
1414- |> Enum.sort_by(fn {_k, v} -> v end)
1515- |> Enum.map(fn {k, v} -> "#{inspect(k)} -> #{inspect(v)}" end)
1616- |> Enum.join("\n")
1717-1818- log_text =
1919- case (log_length = length(log)) > 100 do
2020- true ->
2121- first = Enum.take(log, 50)
2222- last = Enum.take(log, -50)
2323- format_log(first) <> "\n\n... #{inspect(log_length - 100)} more ...\n\n" <> format_log(last)
2424-2525- false ->
2626- format_log(log)
2727- end
2828-2929- hash = log_state.rolling_hash
3030- # For readability (difficult to remember integers)
3131- # sha256 results in better distribution, the bytes from phash2 are too visually similar
3232- hash_alpha = :crypto.hash(:sha256, <<hash::integer-size(32)>>) |> Base.encode32(padding: false) |> String.slice(0, 8)
3333-3434- full_text = if opts[:full] do
3535- """
3636- Scheduler Log:
3737-3838- PIDs:
3939- #{pid_text}
4040-4141- Log:
4242- #{log_text}
4343-4444- """
4545- else
4646- ""
4747- end
4848-4949- full_text <> """
5050- Logged #{log_state.num_events} events with hash #{inspect(hash)}
5151- SHA256: #{hash_alpha}\
5252- """
5353- end
5454-5555- defp format_log(events) when is_list(events) do
5656- events
5757- |> Enum.map(&inspect(&1, []))
5858- |> Enum.join("\n")
5959- end
6060-end
-368
lib/hybrid_kv.ex
···11-defmodule Hobbes.HybridKV do
22- alias Hobbes.{HybridKV, MemKV, RangeForest, Utils}
33- alias Hobbes.RangeForest.RangeTree
44- alias Hobbes.KV.{FlatKV, FlatStorageKV, MutationLog}
55- alias Hobbes.Structs.RangeResult
66-77- @type t :: %__MODULE__{
88- mem_kv: term,
99- storage_module: module,
1010- storage_kv: FlatKV.t | FlatStorageKV.t,
1111- deleted_forest: term,
1212- mutation_log: MutationLog.t,
1313- flushed_version: non_neg_integer,
1414- }
1515-1616- @enforce_keys [:mem_kv, :storage_module, :storage_kv, :deleted_forest, :mutation_log, :flushed_version]
1717- defstruct @enforce_keys
1818-1919- def new(opts \\ []) do
2020- {storage_module, storage_kv} =
2121- case Keyword.get(opts, :path) do
2222- nil -> {FlatKV, FlatKV.new()}
2323- path when is_binary(path) -> {FlatStorageKV, FlatStorageKV.new(path)}
2424- end
2525-2626- new(storage_module, storage_kv)
2727- end
2828-2929- def new(storage_module, storage_kv) when is_atom(storage_module) do
3030- %HybridKV{
3131- mem_kv: MemKV.new(),
3232- storage_module: storage_module,
3333- storage_kv: storage_kv,
3434- deleted_forest: RangeForest.new(),
3535- mutation_log: MutationLog.new(),
3636- flushed_version: 0,
3737- }
3838- end
3939-4040- @spec put(%HybridKV{}, non_neg_integer, binary, binary) :: %HybridKV{}
4141- def put(%HybridKV{} = kv, version, key, value)
4242- when is_integer(version) and version >= 0 and is_binary(key) and is_binary(value) do
4343- :ok = MemKV.put(kv.mem_kv, version, key, value)
4444-4545- case RangeForest.split_at(kv.deleted_forest, version, key) do
4646- {:updated, deleted_forest} -> %HybridKV{kv | deleted_forest: deleted_forest}
4747- :noop -> kv
4848- end
4949- end
5050-5151- @spec delete(%HybridKV{}, non_neg_integer, binary) :: :ok
5252- def delete(%HybridKV{} = kv, version, key) when is_integer(version) and version >= 0 and is_binary(key) do
5353- :ok = MemKV.delete(kv.mem_kv, version, key)
5454- end
5555-5656- @spec delete_range(%HybridKV{}, non_neg_integer, binary, binary) :: %HybridKV{}
5757- def delete_range(%HybridKV{} = kv, version, start_key, end_key) do
5858- %HybridKV{kv | deleted_forest: RangeForest.add_range(kv.deleted_forest, version, start_key, end_key)}
5959- end
6060-6161- @spec get(%HybridKV{}, non_neg_integer, binary) :: binary | nil
6262- def get(%HybridKV{} = kv, version, key)
6363- when is_integer(version) and version >= 0 and is_binary(key) do
6464- kv.deleted_forest
6565- |> RangeForest.tree_at(version)
6666- |> RangeTree.intersect_key(kv.flushed_version + 1, key)
6767- |> case do
6868- nil ->
6969- # There are no overlapping range deletes
7070- case MemKV.get(kv.mem_kv, version, 0, key) do
7171- nil -> kv.storage_module.get(kv.storage_kv, key)
7272- :deleted -> nil
7373- value -> value
7474- end
7575-7676- {_sk, _ek, range_deleted_at} ->
7777- # There is an overlapping range delete, so we return nil if
7878- # a newer key is not found
7979- case MemKV.get(kv.mem_kv, version, range_deleted_at + 1, key) do
8080- nil -> nil
8181- :deleted -> nil
8282- value -> value
8383- end
8484- end
8585- end
8686-8787- @spec scan(%HybridKV{}, non_neg_integer, binary, binary, keyword) :: RangeResult.t
8888- def scan(%HybridKV{} = kv, version, start_key, end_key, opts \\ []) do
8989- limit = Keyword.get(opts, :limit, :infinity)
9090- reverse = Keyword.get(opts, :reverse, false)
9191-9292- deleted_ranges =
9393- kv.deleted_forest
9494- |> RangeForest.tree_at(version)
9595- |> RangeTree.intersect_range(kv.flushed_version, start_key, end_key)
9696-9797- read_limit = case limit do
9898- :infinity -> :infinity
9999- limit -> limit + 1
100100- end
101101-102102- {pairs, count} =
103103- case reverse do
104104- false -> do_scan(:forward, kv, deleted_ranges, version, start_key, end_key, read_limit, 0)
105105- true -> do_scan(:backward, kv, Enum.reverse(deleted_ranges), version, start_key, end_key, read_limit, 0)
106106- end
107107-108108- # We over-read by 1 and then use the extra read
109109- # to check if there are more
110110- cond do
111111- count < read_limit ->
112112- %RangeResult{pairs: pairs, count: count, more: false}
113113- count == read_limit ->
114114- %RangeResult{pairs: Enum.take(pairs, limit), count: limit, more: true}
115115- end
116116- end
117117-118118- # This exists purely as a sanity check and should be unreachable except for bugs
119119- defp do_scan(_direction, _kv, _deleted_ranges, _version, _start_key, _end_key, _limit, 1000), do: raise "Scan caught in loop!"
120120-121121- # Abandon hope all ye who enter here
122122- defp do_scan(:forward, %HybridKV{} = kv, deleted_ranges, version, start_key, end_key, limit, scan_count) do
123123- {merged_pairs, scanned_end_key, deleted_ranges} =
124124- case deleted_ranges do
125125- [{sk, ek, del_v} | rest] when sk <= start_key ->
126126- # We are "inside" a range clear, so we scan mem only to the end of min(ek, end_key)
127127- mem_result = MemKV.scan(kv.mem_kv, version, del_v + 1, start_key, min(ek, end_key), limit: limit)
128128- {_start, mem_end_key} = mem_result.range
129129- # Merge just to clear :deleted
130130- pairs = merge([], mem_result.pairs, false)
131131-132132- # Note: it would be wrong to return "rest" here if we are still inside
133133- # the deleted range (due to hitting the limit), however if a mem scan
134134- # hits the limit then we are done scanning because there is nothing
135135- # for the tombstones to clear out, so we don't care
136136- {pairs, mem_end_key, rest}
137137-138138- _ ->
139139- # If there is a range delete ahead, we want to stop this hybrid scan before that
140140- # delete's start_key so that the next iteration will scan it mem only (the previous clause)
141141- stop_key = case deleted_ranges do
142142- [{sk, _ek, _del_v} | _] -> sk
143143- [] -> end_key
144144- end
145145-146146- storage_result = kv.storage_module.scan(kv.storage_kv, start_key, stop_key, limit: limit)
147147- {_start, storage_end_key} = storage_result.range
148148-149149- # Read mem only up to the end of the range scanned by storage
150150- mem_result = MemKV.scan(kv.mem_kv, version, 0, start_key, storage_end_key, limit: limit)
151151- {_start, mem_end_key} = mem_result.range
152152-153153- # Both KVs were scanned up to this key
154154- # Anything past this key was only scanned by storage and must be discarded
155155- scanned_end_key = min(storage_end_key, mem_end_key)
156156-157157- merged_pairs =
158158- merge(storage_result.pairs, mem_result.pairs, false)
159159- # TODO: more efficient to do this in merge/3
160160- |> Enum.take_while(fn {k, _v} -> k < scanned_end_key end)
161161-162162- {merged_pairs, scanned_end_key, deleted_ranges}
163163- end
164164-165165- # TODO: compute count in merge/3
166166- count = length(merged_pairs)
167167-168168- cond do
169169- count > limit ->
170170- # We got more pairs than we asked for, which can happen if the
171171- # storage and mem keys are disjoint (which is fairly likely in practice)
172172- {Enum.take(merged_pairs, limit), limit}
173173-174174- count == limit ->
175175- # We got exactly what we wanted!
176176- {merged_pairs, count}
177177-178178- count < limit ->
179179- case scanned_end_key == end_key do
180180- true ->
181181- # We got <limit pairs but we really did scan the full range
182182- {merged_pairs, count}
183183- false ->
184184- # We got <limit pairs and we have not yet scanned the full range,
185185- # so we need to keep scanning
186186- #
187187- # This could be because we hit the limit and then the :deleted
188188- # tombstones cleared out enough pairs to bring us back under,
189189- # or because we ran into a range delete and had to skip over it
190190- {next_pairs, next_count} = do_scan(:forward, kv, deleted_ranges, version, scanned_end_key, end_key, subtract_limit(limit, count), scan_count + 1)
191191- {merged_pairs ++ next_pairs, count + next_count}
192192- end
193193- end
194194- end
195195-196196- # See :forward for comments, the :backward version is the same except key logic
197197- # is inverted (deals with start_key instead of end_key)
198198- defp do_scan(:backward, %HybridKV{} = kv, deleted_ranges, version, start_key, end_key, limit, scan_count) do
199199- {merged_pairs, scanned_start_key, deleted_ranges} =
200200- case deleted_ranges do
201201- [{sk, ek, del_v} | rest] when ek >= end_key ->
202202- mem_result = MemKV.scan(kv.mem_kv, version, del_v + 1, max(sk, start_key), end_key, limit: limit, reverse: true)
203203- {mem_start_key, _end_key} = mem_result.range
204204- pairs = merge([], mem_result.pairs, true)
205205- {pairs, mem_start_key, rest}
206206-207207- _ ->
208208- stop_key = case deleted_ranges do
209209- [{_sk, ek, _del_v} | _] -> ek
210210- [] -> start_key
211211- end
212212-213213- storage_result = kv.storage_module.scan(kv.storage_kv, stop_key, end_key, limit: limit, reverse: true)
214214- {storage_start_key, _end_key} = storage_result.range
215215-216216- mem_result = MemKV.scan(kv.mem_kv, version, 0, storage_start_key, end_key, limit: limit, reverse: true)
217217- {mem_start_key, _end_key} = mem_result.range
218218-219219- scanned_start_key = max(storage_start_key, mem_start_key)
220220-221221- merged_pairs =
222222- merge(storage_result.pairs, mem_result.pairs, true)
223223- |> Enum.take_while(fn {k, _v} -> k >= scanned_start_key end)
224224-225225- {merged_pairs, scanned_start_key, deleted_ranges}
226226- end
227227-228228- count = length(merged_pairs)
229229-230230- cond do
231231- count > limit ->
232232- {Enum.take(merged_pairs, limit), limit}
233233-234234- count == limit ->
235235- {merged_pairs, count}
236236-237237- count < limit ->
238238- case scanned_start_key == start_key do
239239- true ->
240240- {merged_pairs, count}
241241- false ->
242242- {next_pairs, next_count} = do_scan(:backward, kv, deleted_ranges, version, start_key, scanned_start_key, subtract_limit(limit, count), scan_count + 1)
243243- {merged_pairs ++ next_pairs, count + next_count}
244244- end
245245- end
246246- end
247247-248248- defp subtract_limit(:infinity, _n), do: :infinity
249249- defp subtract_limit(limit, n), do: limit - n
250250-251251- @spec merge([{binary, binary}], [{binary, binary}], boolean) :: [{binary, binary}]
252252- defp merge(list1, list2, reverse), do: do_merge(reverse, list1, list2, []) |> Enum.reverse()
253253-254254- defp do_merge(_reverse, [], [], acc), do: acc
255255- defp do_merge(reverse, [p1 | rest1], [], acc), do: do_merge(reverse, rest1, [], acc_pair(p1, acc))
256256- defp do_merge(reverse, [], [p2 | rest2], acc), do: do_merge(reverse, [], rest2, acc_pair(p2, acc))
257257-258258- defp do_merge(false, [{k1, _} = p1 | rest1] = list1, [{k2, _} = p2 | rest2] = list2, acc) do
259259- cond do
260260- k1 < k2 ->
261261- do_merge(false, rest1, list2, acc_pair(p1, acc))
262262- k1 > k2 ->
263263- do_merge(false, list1, rest2, acc_pair(p2, acc))
264264- k1 == k2 ->
265265- do_merge(false, rest1, rest2, acc_pair(p2, acc))
266266- end
267267- end
268268-269269- defp do_merge(true, [{k1, _} = p1 | rest1] = list1, [{k2, _} = p2 | rest2] = list2, acc) do
270270- cond do
271271- k1 > k2 ->
272272- do_merge(true, rest1, list2, acc_pair(p1, acc))
273273- k1 < k2 ->
274274- do_merge(true, list1, rest2, acc_pair(p2, acc))
275275- k1 == k2 ->
276276- do_merge(true, rest1, rest2, acc_pair(p2, acc))
277277- end
278278- end
279279-280280- defp acc_pair({_key, :deleted}, acc), do: acc
281281- defp acc_pair({_key, _value} = pair, acc), do: [pair | acc]
282282-283283- @spec apply_batch(t, non_neg_integer, [Utils.mutation]) :: :ok
284284- def apply_batch(kv, version, mutations) when is_list(mutations) do
285285- MutationLog.insert(kv.mutation_log, version, mutations)
286286-287287- Enum.reduce(mutations, kv, fn
288288- {:write, k, v}, kv ->
289289- put(kv, version, k, v)
290290- {:clear, k}, kv ->
291291- delete(kv, version, k)
292292- kv
293293- {:clear_range, sk, ek}, kv ->
294294- delete_range(kv, version, sk, ek)
295295- end)
296296- end
297297-298298- @doc """
299299- Flushes mutations with a version <= `version` to unversioned storage.
300300- """
301301- @spec flush(%HybridKV{}, non_neg_integer) :: %HybridKV{}
302302- def flush(%HybridKV{storage_module: s_mod, storage_kv: s_kv} = kv, version) when is_integer(version) and version >= 0 do
303303- batches = MutationLog.pop_up_to(kv.mutation_log, version)
304304-305305- mem_kv = kv.mem_kv
306306- Enum.each(batches, fn {ver, mutations} ->
307307- Enum.each(mutations, fn
308308- {:write, k, v} ->
309309- MemKV.remove_key_at_version(mem_kv, ver, k)
310310- s_mod.put(s_kv, k, v)
311311-312312- {:clear, k} ->
313313- MemKV.remove_key_at_version(mem_kv, ver, k)
314314- s_mod.delete(s_kv, k)
315315-316316- {:clear_range, sk, ek} ->
317317- MemKV.remove_range_at_version(mem_kv, ver, sk, ek)
318318- s_mod.delete_range(s_kv, sk, ek)
319319- end)
320320- end)
321321-322322- {_ranges, deleted_forest} = RangeForest.flush(kv.deleted_forest, version, kv.flushed_version + 1)
323323-324324- %{kv | deleted_forest: deleted_forest, flushed_version: version}
325325- end
326326-327327- @spec put_storage(t, binary, binary) :: :ok
328328- def put_storage(%HybridKV{} = kv, key, value) when is_binary(key) and is_binary(value) do
329329- kv.storage_module.put(kv.storage_kv, key, value)
330330- end
331331-332332- @spec commit(t) :: :ok
333333- def commit(%HybridKV{} = kv) do
334334- kv.storage_module.commit(kv.storage_kv)
335335- end
336336-337337- @spec load_storage(%HybridKV{}, [{binary, binary}]) :: :ok
338338- def load_storage(%HybridKV{storage_module: s_mod, storage_kv: s_kv}, pairs) when is_list(pairs) do
339339- Enum.each(pairs, fn {k, v} when is_binary(k) and is_binary(v) ->
340340- s_mod.put(s_kv, k, v)
341341- end)
342342- end
343343-344344- @spec delete_range_storage(%HybridKV{}, binary, binary) :: :ok
345345- def delete_range_storage(%HybridKV{storage_module: s_mod, storage_kv: s_kv}, start_key, end_key) do
346346- s_mod.delete_range(s_kv, start_key, end_key)
347347- end
348348-349349- @doc """
350350- Clears a range from both memory and storage **at all versions**.
351351- """
352352- @spec nuke_range(%HybridKV{}, binary, binary) :: :ok
353353- def nuke_range(%HybridKV{} = kv, start_key, end_key) do
354354- MemKV.nuke_range(kv.mem_kv, start_key, end_key)
355355- delete_range_storage(kv, start_key, end_key)
356356- :ok
357357- end
358358-359359- @doc false
360360- def dump(%HybridKV{} = kv) do
361361- %{
362362- mem_kv: MemKV.dump(kv.mem_kv),
363363- storage_kv: kv.storage_module.dump(kv.storage_kv),
364364- deleted_forest: RangeForest.dump(kv.deleted_forest),
365365- mutation_log: MutationLog.dump(kv.mutation_log),
366366- }
367367- end
368368-end
-138
lib/kv/byte_sample.ex
···11-defmodule Hobbes.KV.ByteSample do
22- alias Hobbes.Utils
33- import Hobbes.Utils
44-55- # Byte sample should be 1/250th the size of k/v data
66- @byte_sample_factor 250
77- # TODO: 250 is to rare for tests, buggify
88- #@byte_sample_factor 2
99-1010- # Approximate overhead per sample (other than key size)
1111- # (currently just 8 bytes to store the size value as a string)
1212- @byte_sample_overhead_bytes 8
1313-1414- @type t :: :ets.table
1515-1616- @spec new :: t
1717- def new do
1818- :ets.new(__MODULE__, [:ordered_set, :private])
1919- end
2020-2121- @spec load(t, [{binary, binary}]) :: :ok
2222- def load(table, pairs) when is_list(pairs) do
2323- Enum.each(pairs, fn {k, v} when is_binary(k) and is_binary(v) ->
2424- bytes = decode_float(v)
2525- :ets.insert(table, {k, bytes})
2626- end)
2727- :ok
2828- end
2929-3030- @spec delete_range(t, binary, binary) :: :ok
3131- def delete_range(table, start_key, end_key) when is_binary(start_key) and is_binary(end_key) do
3232- :ets.delete(table, start_key)
3333- do_scan_delete(table, end_key, start_key)
3434- end
3535-3636- defp do_scan_delete(table, end_key, prev_key) do
3737- case :ets.next(table, prev_key) do
3838- key when is_binary(key) and key < end_key ->
3939- :ets.delete(table, key)
4040- do_scan_delete(table, end_key, key)
4141-4242- _ -> :ok
4343- end
4444- end
4545-4646- @spec scan(t, binary, binary) :: [{binary, float}]
4747- def scan(table, start_key, end_key) do
4848- acc =
4949- case :ets.lookup(table, start_key) do
5050- [{^start_key, _size}] = result -> result
5151- [] -> []
5252- end
5353-5454- do_scan(table, end_key, start_key, acc)
5555- |> Enum.reverse()
5656- end
5757-5858- defp do_scan(table, end_key, prev_key, acc) do
5959- case :ets.next_lookup(table, prev_key) do
6060- {_key, [{key, _size} = pair]} ->
6161- case key < end_key do
6262- true -> do_scan(table, end_key, key, [pair | acc])
6363- false -> acc
6464- end
6565-6666- :"$end_of_table" -> acc
6767- end
6868- end
6969-7070- @spec apply_batch(t, [Utils.mutation]) :: [Utils.mutation]
7171- def apply_batch(table, mutations) when is_list(mutations) do
7272- mutations
7373- |> Enum.reduce([], fn
7474- {:write, k, v}, acc ->
7575- key_size = byte_size(k)
7676- pair_size = key_size + byte_size(v)
7777- probability = byte_sample_probability(key_size, pair_size)
7878-7979- case (:erlang.phash2(k, 1000) / 1000) < probability do
8080- true ->
8181- # Correct for sampling probability (see comments in byte_sample_probability/2)
8282- sampled_size = pair_size / min(probability, 1)
8383-8484- :ets.insert(table, {k, sampled_size})
8585- mut = {:write, special_byte_sample_prefix() <> k, encode_float(sampled_size)}
8686- [mut | acc]
8787-8888- false -> acc
8989- end
9090-9191- {:clear, k}, acc ->
9292- case :ets.member(table, k) do
9393- true ->
9494- :ets.delete(table, k)
9595- mut = {:clear, special_byte_sample_prefix() <> k}
9696- [mut | acc]
9797-9898- false -> acc
9999- end
100100-101101- {:clear_range, sk, ek}, acc ->
102102- delete_range(table, sk, ek)
103103- # TODO: if the range was empty in the byte sample we don't need this mutation
104104- mut = {:clear_range, special_byte_sample_prefix() <> sk, special_byte_sample_prefix() <> ek}
105105- [mut | acc]
106106- end)
107107- |> Enum.reverse()
108108- end
109109-110110- defp byte_sample_probability(key_size, pair_size) do
111111- # Probability that a key/value pair of this size belongs in the byte sample
112112- # This is a function of the size rather than a percentage of keys so that
113113- # we can maintain the byte sample as a fraction of *total KV size*
114114- #
115115- # Intuitively: the byte sample only stores keys, so if values are larger than
116116- # the overhead we can afford to store more samples while staying under the limit
117117- # Therefore, if the value is large, probability should increase
118118- #
119119- # We then correct out the probability factor by dividing size to get sampled_size
120120- # at the end, so that the sample is not biased by the larger pairs
121121- #
122122- # This algorithm is borrowed directly from FDB (storageserver isKeyValueInSample)
123123- (pair_size / (key_size + @byte_sample_overhead_bytes)) / @byte_sample_factor
124124- end
125125-126126- defp encode_float(float) when is_number(float) do
127127- Integer.to_string(round(float * 1000))
128128- end
129129-130130- defp decode_float(string) when is_binary(string) do
131131- String.to_integer(string) / 1000
132132- end
133133-134134- @doc false
135135- def dump(table) do
136136- :ets.tab2list(table)
137137- end
138138-end
···11-defmodule Hobbes.KV.MutationLog do
22- alias Hobbes.Utils
33-44- @type t :: :ets.table
55-66- @spec new :: t
77- def new do
88- :ets.new(__MODULE__, [:ordered_set, :private])
99- end
1010-1111- @spec insert(t, non_neg_integer, [Utils.mutation]) :: :ok
1212- def insert(table, version, mutations) when is_integer(version) and is_list(mutations) do
1313- :ets.insert(table, {version, mutations})
1414- :ok
1515- end
1616-1717- @spec append(t, non_neg_integer, [Utils.mutation]) :: :ok
1818- def append(table, version, mutations) when is_integer(version) and is_list(mutations) do
1919- case :ets.lookup(table, version) do
2020- [{^version, existing}] -> :ets.insert(table, {version, existing ++ mutations})
2121- [] -> :ets.insert(table, {version, mutations})
2222- end
2323- :ok
2424- end
2525-2626- @spec pop_up_to(t, non_neg_integer) :: [{non_neg_integer, [Utils.mutation]}]
2727- def pop_up_to(table, end_version) when is_integer(end_version) do
2828- scan_pop(table, end_version, -1, [])
2929- |> Enum.reverse()
3030- end
3131-3232- defp scan_pop(table, end_version, prev_version, acc) do
3333- case :ets.next(table, prev_version) do
3434- ver when is_integer(ver) ->
3535- case ver <= end_version do
3636- true ->
3737- [{^ver, mutations}] = :ets.lookup(table, ver)
3838- :ets.delete(table, ver)
3939-4040- scan_pop(table, end_version, ver, [{ver, mutations} | acc])
4141-4242- false -> acc
4343- end
4444-4545- :"$end_of_table" -> acc
4646- end
4747- end
4848-4949- @doc false
5050- def dump(table) do
5151- :ets.tab2list(table)
5252- end
5353-end
-106
lib/storage_queue.ex
···11-defmodule Hobbes.StorageQueue do
22- alias Hobbes.{StorageQueue, Utils}
33- alias Hobbes.KV.{FlatKV, FlatStorageKV}
44-55- import Hobbes.Utils
66-77- @type t :: %__MODULE__{
88- storage_module: module,
99- storage_kv: FlatKV.t | FlatStorageKV.t,
1010- }
1111-1212- @enforce_keys [:storage_module, :storage_kv]
1313- defstruct @enforce_keys
1414-1515- @spec new(module, term) :: t
1616- def new(storage_module, storage_kv) when is_atom(storage_module) do
1717- %StorageQueue{
1818- storage_module: storage_module,
1919- storage_kv: storage_kv,
2020- }
2121- end
2222-2323- @locked_key special_prefix() <> "/locked?"
2424- @version_key special_prefix() <> "/version"
2525- @kcv_key special_prefix() <> "/known_committed_version"
2626-2727- @spec put_state(t, map) :: :ok
2828- def put_state(%StorageQueue{storage_module: s_mod, storage_kv: s_kv} = _sq, fields) when is_map(fields) do
2929- %{
3030- locked?: locked?,
3131- version: version,
3232- known_committed_version: known_committed_version,
3333- } = fields
3434-3535- s_mod.put(s_kv, @locked_key, encode_boolean(locked?))
3636- s_mod.put(s_kv, @version_key, Integer.to_string(version))
3737- s_mod.put(s_kv, @kcv_key, Integer.to_string(known_committed_version))
3838- :ok
3939- end
4040-4141- def get_state(%StorageQueue{storage_module: s_mod, storage_kv: s_kv} = _sq) do
4242- %{
4343- locked?: s_mod.get(s_kv, @locked_key) |> decode_boolean(),
4444- version: s_mod.get(s_kv, @version_key) |> String.to_integer(),
4545- known_committed_version: s_mod.get(s_kv, @kcv_key) |> String.to_integer(),
4646- }
4747- end
4848-4949- @spec peek_batches(t) :: [{non_neg_integer, [Utils.tagged_mutation]}]
5050- def peek_batches(%StorageQueue{} = sq) do
5151- start_key = encode_version(0)
5252- # TODO: fix after switching to binary encoding
5353- end_key = String.duplicate("9", 20)
5454-5555- sq.storage_module.scan(sq.storage_kv, start_key, end_key)
5656- |> Map.fetch!(:pairs)
5757- |> Enum.map(fn {k, v} ->
5858- {decode_version(k), decode_mutations(v)}
5959- end)
6060- end
6161-6262- @spec append_batch(t, non_neg_integer, [Utils.tagged_mutation]) :: :ok
6363- def append_batch(%StorageQueue{} = sq, version, tagged_mutations) when is_integer(version) and is_list(tagged_mutations) do
6464- sq.storage_module.put(sq.storage_kv, encode_version(version), encode_mutations(tagged_mutations))
6565- end
6666-6767- @spec pop_batches(t, non_neg_integer) :: :ok
6868- def pop_batches(%StorageQueue{} = sq, up_to_version) when is_integer(up_to_version) do
6969- start_key = encode_version(0)
7070- # Note: we need next_key/1 because up_to_version is inclusive
7171- end_key = encode_version(up_to_version) |> next_key()
7272-7373- sq.storage_module.delete_range(sq.storage_kv, start_key, end_key)
7474- :ok
7575- end
7676-7777- @spec commit(t) :: :ok
7878- def commit(%StorageQueue{storage_module: s_mod, storage_kv: kv}) do
7979- s_mod.commit(kv)
8080- end
8181-8282- # TODO: binary encoding
8383- defp encode_version(version) when is_integer(version), do: Integer.to_string(version) |> String.pad_leading(20, "0")
8484- defp decode_version(string), do: String.to_integer(string)
8585-8686- defp encode_mutations(mutations) when is_list(mutations) do
8787- :erlang.term_to_binary(mutations, [:deterministic])
8888- end
8989-9090- defp decode_mutations(binary) when is_binary(binary) do
9191- :erlang.binary_to_term(binary, [:safe])
9292- |> case do
9393- mutations when is_list(mutations) -> mutations
9494- end
9595- end
9696-9797- defp encode_boolean(boolean) when is_boolean(boolean), do: Atom.to_string(boolean)
9898-9999- defp decode_boolean("true"), do: true
100100- defp decode_boolean("false"), do: false
101101-102102- @doc false
103103- def dump(%StorageQueue{} = sq) do
104104- sq.storage_module.dump(sq.storage_kv)
105105- end
106106-end
-99
test/construct/scheduler/file_store_test.exs
···11-defmodule Hobbes.Construct.Scheduler.FileStoreTest do
22- use ExUnit.Case, async: true
33-44- alias Hobbes.Construct.Scheduler.FileStore
55-66- @moduletag :file_store
77-88- setup do
99- %{fs: FileStore.new()}
1010- end
1111-1212- describe "mkdir/2" do
1313- test "returns error if file in path", %{fs: fs} do
1414- :ok = FileStore.mkdir(fs, "/foo")
1515- :ok = FileStore.write(fs, "/foo/bar", "hello")
1616-1717- assert {:error, :eexist} = FileStore.mkdir(fs, "/foo/bar")
1818- assert {:error, :enoent} = FileStore.mkdir(fs, "/foo/bar/baz")
1919- end
2020-2121- test "returns error if directory exists", %{fs: fs} do
2222- :ok = FileStore.mkdir(fs, "/foo")
2323- assert {:error, :eexist} = FileStore.mkdir(fs, "/foo")
2424- end
2525- end
2626-2727- describe "mkdir_p/2" do
2828- test "returns error if file in path", %{fs: fs} do
2929- assert :ok = FileStore.mkdir(fs, "/foo")
3030- assert :ok = FileStore.write(fs, "/foo/file", "hello world")
3131-3232- assert {:error, :enotdir} = FileStore.mkdir_p(fs, "/foo/file/bar")
3333- end
3434-3535- test "makes directories", %{fs: fs} do
3636- assert :ok = FileStore.mkdir_p(fs, "/foo/bar/baz")
3737- assert :ok = FileStore.mkdir_p(fs, "/foo/bar/baz/qux")
3838-3939- assert FileStore.dir?(fs, "/")
4040- assert FileStore.dir?(fs, "/foo")
4141- assert FileStore.dir?(fs, "/foo/bar")
4242- assert FileStore.dir?(fs, "/foo/bar/baz")
4343- assert FileStore.dir?(fs, "/foo/bar/baz/qux")
4444- assert FileStore.dir?(fs, "/foo/bar/baz/qux/")
4545- end
4646- end
4747-4848- describe "write/3" do
4949- test "returns error if no directory", %{fs: fs} do
5050- assert {:error, :enoent} = FileStore.write(fs, "/foo/bar.txt", "hello")
5151- end
5252-5353- test "returns error if file in path", %{fs: fs} do
5454- :ok = FileStore.write(fs, "/foo", "hello")
5555- assert {:error, :enoent} = FileStore.write(fs, "/foo/bar.txt", "hello")
5656- end
5757-5858- test "writes file", %{fs: fs} do
5959- :ok = FileStore.mkdir_p(fs, "/foo/bar")
6060-6161- assert :ok = FileStore.write(fs, "/foo/bar/baz.txt", "hello world!")
6262-6363- assert {:ok, "hello world!"} = FileStore.read(fs, "/foo/bar/baz.txt")
6464- end
6565- end
6666-6767- describe "read/2" do
6868- test "returns error if file does not exist", %{fs: fs} do
6969- assert {:error, :enoent} = FileStore.read(fs, "/foo/bar.txt")
7070- end
7171-7272- test "returns error if file is a directory", %{fs: fs} do
7373- :ok = FileStore.mkdir_p(fs, "/foo/bar")
7474- assert {:error, :eisdir} = FileStore.read(fs, "/foo/bar")
7575- end
7676-7777- test "reads file", %{fs: fs} do
7878- :ok = FileStore.mkdir(fs, "/foo")
7979- :ok = FileStore.write(fs, "/foo/bar.txt", "hello world!")
8080-8181- assert {:ok, "hello world!"} = FileStore.read(fs, "/foo/bar.txt")
8282- end
8383- end
8484-8585- describe "ls/2" do
8686- test "returns paths", %{fs: fs} do
8787- FileStore.mkdir_p(fs, "/foo/bar/a.txt")
8888- FileStore.mkdir_p(fs, "/foo/bar/b.txt")
8989- FileStore.mkdir_p(fs, "/foo/bar/c.txt")
9090- FileStore.mkdir_p(fs, "/foo/bar/baz/d.txt")
9191-9292- assert {:ok, ["a.txt", "b.txt", "baz", "c.txt"]} = FileStore.ls(fs, "/foo/bar/")
9393- assert {:ok, ["a.txt", "b.txt", "baz", "c.txt"]} = FileStore.ls(fs, "/foo/bar")
9494-9595- assert {:ok, ["bar"]} = FileStore.ls(fs, "/foo")
9696- assert {:ok, ["d.txt"]} = FileStore.ls(fs, "/foo/bar/baz")
9797- end
9898- end
9999-end
-68
test/construct_test.exs
···11-defmodule Hobbes.ConstructTest do
22- use ExUnit.Case, async: false
33-44- alias Hobbes.Construct.{SimServer, SimFile}
55-66- @moduletag :construct
77-88- describe "process exits" do
99- test "handles child process exiting normally" do
1010- SimServer.start_scheduler()
1111-1212- SimServer.spawn(fn ->
1313- 1 + 2
1414- end)
1515-1616- assert true
1717- end
1818-1919- # These tests use SimServer.sleep(0) to hand off to each other,
2020- # which ensures the monitors/links connect *before* the error is raised
2121- #
2222- # Note: the :timer.sleep(1) at the end is needed to prevent error logs from being
2323- # printed even though :capture_log is true (test process otherwise seems to die too early)
2424- @tag :capture_log
2525- test "handles unlinked child process raising error" do
2626- SimServer.start_scheduler()
2727-2828- pid = SimServer.spawn(fn ->
2929- SimServer.sleep(0)
3030- raise "Some error"
3131- end)
3232-3333- mref = Process.monitor(pid)
3434- SimServer.sleep(0)
3535-3636- assert_receive {:DOWN, ^mref, :process, ^pid, {%RuntimeError{message: "Some error"}, _}}
3737- :timer.sleep(1)
3838- end
3939-4040- @tag :capture_log
4141- test "handles linked process raising error" do
4242- SimServer.start_scheduler()
4343-4444- SimServer.flag(:trap_exit, true)
4545-4646- pid = SimServer.spawn_link(fn ->
4747- SimServer.sleep(0)
4848- raise "Some error"
4949- end)
5050- SimServer.sleep(0)
5151-5252- assert_receive {:EXIT, ^pid, {%RuntimeError{message: "Some error"}, _}}
5353- Process.sleep(1)
5454- end
5555- end
5656-5757- describe "SimFile" do
5858- test "reads and writes files" do
5959- SimServer.start_scheduler()
6060-6161- assert :ok = SimFile.mkdir_p("/foo/bar/baz")
6262- assert :ok = SimFile.write("/foo/bar/baz/qux.txt", "hello world!")
6363-6464- assert {:ok, "hello world!"} = SimFile.read("/foo/bar/baz/qux.txt")
6565- assert {:error, :enoent} = SimFile.read("/foo/qux.txt")
6666- end
6767- end
6868-end
-197
test/hybrid_kv_test.exs
···11-defmodule Hobbes.HybridKVTest do
22- use ExUnit.Case, async: true
33-44- alias Hobbes.HybridKV
55- alias Hobbes.KV.TestKV
66-77- @moduletag :hybrid_kv
88-99- setup do
1010- %{kv: HybridKV.new()}
1111- end
1212-1313- defmodule Verifier do
1414- @moduledoc """
1515- This module is used to verify the correctness of HybridKV by checking
1616- all of its operations against a much simpler implementation (TestKV).
1717-1818- We generate random (deterministic by seed) operations (get, put, etc)
1919- and run them through both KVs, and then check that the results match.
2020- """
2121-2222- alias Verifier
2323-2424- @enforce_keys [:hybrid_kv, :test_kv, :version, :durable_version, :seed]
2525- defstruct @enforce_keys
2626-2727- def new(seed) do
2828- :rand.seed(:exsss, seed)
2929-3030- %Verifier{
3131- hybrid_kv: HybridKV.new(),
3232- test_kv: TestKV.new(),
3333- version: 1,
3434- durable_version: 0,
3535- seed: seed,
3636- }
3737- end
3838-3939- def run(%Verifier{} = verifier, op_count) do
4040- Enum.reduce(1..op_count, verifier, fn i, verifier ->
4141- #if i == 0, do: dump(verifier)
4242-4343- try do
4444- perform(random_op(), verifier)
4545- rescue
4646- e in [ExUnit.AssertionError] ->
4747- e = Map.update!(e, :message, &(&1 <> " (at op #{i}, seed=#{inspect(verifier.seed)})"))
4848- reraise e, __STACKTRACE__
4949- e ->
5050- require Logger
5151- Logger.error("Error #{inspect(e)} at op=#{i}, seed=#{inspect(verifier.seed)}")
5252- reraise e, __STACKTRACE__
5353- end
5454- end)
5555- end
5656-5757- @doc false
5858- def dump(%Verifier{} = verifier) do
5959- dbg [
6060- hybrid_kv: HybridKV.dump(verifier.hybrid_kv),
6161- test_kv: TestKV.dump(verifier.test_kv),
6262- durable_version: verifier.durable_version,
6363- ], limit: :infinity
6464- end
6565-6666- @ops [:apply_batch, :get, :scan]
6767- defp random_op do
6868- case Enum.random(1..100) do
6969- 1 -> :flush
7070- _ -> Enum.random(@ops)
7171- end
7272- end
7373-7474- defp perform(:get, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do
7575- read_version = rand_read_version(verifier)
7676-7777- key = rand_hash()
7878-7979- hkv_result = HybridKV.get(hkv, read_version, key)
8080- tkv_result = TestKV.get(tkv, read_version, key)
8181-8282- assert hkv_result == tkv_result
8383-8484- verifier
8585- end
8686-8787- defp perform(:scan, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do
8888- read_version = rand_read_version(verifier)
8989-9090- start_key = rand_range_key()
9191- end_key = rand_range_key()
9292- {start_key, end_key, reverse} =
9393- cond do
9494- start_key < end_key -> {start_key, end_key, false}
9595- start_key > end_key -> {end_key, start_key, true}
9696- start_key == end_key -> {start_key, start_key <> "\x00", false}
9797- end
9898-9999- limit = rand_limit()
100100-101101- hkv_result = HybridKV.scan(hkv, read_version, start_key, end_key, limit: limit, reverse: reverse)
102102- tkv_result = TestKV.scan(tkv, read_version, start_key, end_key, limit: limit, reverse: reverse)
103103-104104- assert hkv_result == tkv_result
105105- verifier
106106- end
107107-108108- defp perform(:apply_batch, %Verifier{hybrid_kv: hkv, test_kv: tkv} = verifier) do
109109- verifier = %{verifier | version: verifier.version + Enum.random(1..1000)}
110110-111111- mutation_count = Enum.random(1..10)
112112- mutations = Enum.reduce(1..mutation_count, [], fn _i, acc ->
113113- [rand_mutation() | acc]
114114- end)
115115-116116- version = verifier.version
117117- hkv = HybridKV.apply_batch(hkv, version, mutations)
118118- tkv = TestKV.apply_batch(tkv, version, mutations)
119119-120120- %{verifier | hybrid_kv: hkv, test_kv: tkv}
121121- end
122122-123123- defp perform(:flush, %Verifier{hybrid_kv: hkv} = verifier) do
124124- flush_version = rand(verifier.durable_version, verifier.version)
125125- hkv = HybridKV.flush(hkv, flush_version)
126126-127127- %Verifier{verifier | hybrid_kv: hkv, durable_version: flush_version}
128128- end
129129-130130- defp rand_mutation do
131131- case Enum.random(1..10) do
132132- 1 ->
133133- start_key = rand_hash()
134134- end_key = rand_hash()
135135- {start_key, end_key} =
136136- cond do
137137- start_key < end_key -> {start_key, end_key}
138138- start_key > end_key -> {end_key, start_key}
139139- start_key == end_key -> {start_key, start_key <> "\x00"}
140140- end
141141- {:clear_range, start_key, end_key}
142142-143143- i when i in [2, 3] ->
144144- {:clear, rand_hash()}
145145-146146- _ ->
147147- {:write, rand_hash(), rand_hash()}
148148- end
149149- end
150150-151151- defp rand(s \\ 0, e), do: Enum.random(s..e)
152152-153153- defp rand_read_version(%Verifier{} = verifier) do
154154- rand(verifier.durable_version, verifier.version)
155155- end
156156-157157- defp rand_limit do
158158- case Enum.random(1..3) do
159159- 1 -> Enum.random(2..9)
160160- 2 -> Enum.random([1, 10, 100, 1000])
161161- 3 -> :infinity
162162- end
163163- end
164164-165165- defp rand_range_key do
166166- case Enum.random(1..10) do
167167- 1 -> ""
168168- 2 -> "\xFF"
169169- _ -> rand_hash()
170170- end
171171- end
172172-173173- defp rand_hash(bound \\ 99) do
174174- :crypto.hash(:sha256, Integer.to_string(rand(bound)))
175175- |> Base.encode16(padding: false)
176176- |> String.slice(0, 8)
177177- end
178178- end
179179-180180- describe "verify HybridKV" do
181181- @tag :hkv_verify
182182- test "operations" do
183183- Verifier.new({100, 101, 102})
184184- |> Verifier.run(1000)
185185- end
186186-187187- @tag :hkv_verify_multi
188188- @tag :disable
189189- test "operations multi (slow)" do
190190- # 300 verification tests (takes about 1.3s)
191191- Enum.map(1..300, fn s ->
192192- Verifier.new({100 + s, 101, 102})
193193- |> Verifier.run(1000)
194194- end)
195195- end
196196- end
197197-end
-22
test/kv/byte_sample_test.exs
···11-defmodule Hobbes.KV.ByteSampleTest do
22- use ExUnit.Case, async: true
33-44- alias Hobbes.KV.ByteSample
55-66- @moduletag :byte_sample
77-88- setup do
99- %{bs: ByteSample.new()}
1010- end
1111-1212- describe "ByteSample" do
1313- test "samples", %{bs: bs} do
1414- mutations = Enum.map(1..4000, fn i -> {:write, "k#{String.pad_leading(to_string(i), 4, "0")}", "v#{i}"} end)
1515- ByteSample.apply_batch(bs, mutations)
1616-1717- pairs = ByteSample.scan(bs, "k1000", "k2000")
1818- # Anything else will flake if we change the parameters
1919- assert is_list(pairs)
2020- end
2121- end
2222-end