this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add SimFile

garrison 54af29cf 4c751b00

+200 -13
+6
lib/trinity/scheduler.ex
··· 11 11 proc_nodes: :ets.table, 12 12 node_procs: :ets.table, 13 13 14 + file_paths: :ets.table, 15 + file_data: :ets.table, 16 + 14 17 now: :atomics.atomics_ref, 15 18 supervisor_pid: pid, 16 19 } ··· 22 25 23 26 :proc_nodes, 24 27 :node_procs, 28 + 29 + :file_paths, 30 + :file_data, 25 31 26 32 :now, 27 33 :supervisor_pid,
+5
lib/trinity/scheduler/simulation_supervisor.ex
··· 49 49 proc_nodes: :ets.new(__MODULE__, [:set, :public]), 50 50 node_procs: :ets.new(__MODULE__, [:set, :public]), 51 51 52 + file_paths: :ets.new(__MODULE__, [:ordered_set, :public]), 53 + file_data: :ets.new(__MODULE__, [:ordered_set, :public]), 54 + 52 55 now: :atomics.new(1, signed: false), 53 56 supervisor_pid: self(), 54 57 } 58 + 59 + :ets.insert(sim.file_paths, {:next_fd, 0}) 55 60 56 61 # TODO: configurable? 57 62 root_node = :nonode
+151
lib/trinity/sim_file.ex
··· 1 + defmodule Trinity.SimFile do 2 + alias Trinity.Scheduler.Simulation 3 + import Trinity.Scheduler, only: [simulation_key: 0, sim_node_key: 0] 4 + 5 + @spec get_sim :: Simulation.t | nil 6 + defp get_sim, do: Process.get(simulation_key()) 7 + 8 + @spec get_proc_node :: atom 9 + defp get_proc_node, do: Process.get(sim_node_key()) 10 + 11 + @spec open(String.t, [atom]) :: term 12 + def open(path, modes) do 13 + case get_sim() do 14 + nil -> raise "not supported" 15 + _sim -> sim_open(path, modes) 16 + end 17 + end 18 + 19 + def pread(fd, loc, bytes) do 20 + case get_sim() do 21 + nil -> raise "not supported" 22 + _sim -> sim_pread(fd, loc, bytes) 23 + end 24 + end 25 + 26 + @spec pwrite(term, non_neg_integer, binary) :: :ok | {:error, term} 27 + def pwrite(fd, loc, bytes) do 28 + case get_sim() do 29 + nil -> raise "not supported" 30 + _sim -> sim_pwrite(fd, loc, bytes) 31 + end 32 + end 33 + 34 + defp next_fd(file_paths), do: :ets.update_counter(file_paths, :next_fd, 1) 35 + 36 + defp sim_open(path, _modes) do 37 + %{file_paths: file_paths} = get_sim() 38 + node = get_proc_node() 39 + 40 + key = {node, path} 41 + case :ets.lookup(file_paths, key) do 42 + [{_path, fd}] -> 43 + {:ok, fd} 44 + [] -> 45 + # TODO: check directory 46 + fd = next_fd(file_paths) 47 + :ets.insert(file_paths, {key, fd}) 48 + {:ok, fd} 49 + end 50 + end 51 + 52 + # TODO: much larger 53 + @block_size 4 54 + 55 + defp sim_pread(fd, loc, bytes) do 56 + %{file_data: file_data} = get_sim() 57 + 58 + start_block_index = div(loc, @block_size) 59 + start_pos = rem(loc, @block_size) 60 + 61 + << 62 + _prefix::binary-size(start_pos), 63 + bin_acc::binary, 64 + >> = read_block(file_data, fd, start_block_index) 65 + 66 + bytes_remaining = bytes - byte_size(bin_acc) 67 + do_read_blocks(file_data, fd, start_block_index + 1, bin_acc, bytes_remaining) 68 + end 69 + 70 + defp do_read_blocks(_file_data, _fd, _block_index, bin_acc, 0) do 71 + bin_acc 72 + end 73 + 74 + defp do_read_blocks(file_data, fd, block_index, bin_acc, bytes_remaining) when bytes_remaining < @block_size do 75 + << 76 + bin::binary-size(bytes_remaining), 77 + _rest::binary, 78 + >> = read_block(file_data, fd, block_index) 79 + <<bin_acc::binary, bin::binary>> 80 + end 81 + 82 + defp do_read_blocks(file_data, fd, block_index, bin_acc, bytes_remaining) do 83 + bin = read_block(file_data, fd, block_index) 84 + bin_acc = <<bin_acc::binary, bin::binary>> 85 + do_read_blocks(file_data, fd, block_index + 1, bin_acc, bytes_remaining - @block_size) 86 + end 87 + 88 + defp read_block(file_data, fd, block_index) do 89 + case :ets.lookup(file_data, {fd, block_index}) do 90 + [{_key, existing}] -> existing 91 + [] -> <<0::integer-unit(8)-size(@block_size)>> 92 + end 93 + end 94 + 95 + defp sim_pwrite(fd, loc, bin) do 96 + %{file_data: file_data} = get_sim() 97 + 98 + bin_size = byte_size(bin) 99 + start_block_index = div(loc, @block_size) 100 + start_pos = rem(loc, @block_size) 101 + 102 + case (loc + bin_size) <= @block_size do 103 + true -> 104 + write_block(file_data, fd, start_block_index, start_pos, bin) 105 + 106 + false -> 107 + << 108 + start_bin::binary-size(@block_size - start_pos), 109 + rest::binary, 110 + >> = bin 111 + write_block(file_data, fd, start_block_index, start_pos, start_bin) 112 + do_write_blocks(rest, file_data, fd, start_block_index + 1) 113 + end 114 + :ok 115 + end 116 + 117 + defp do_write_blocks("", _file_data, _fd, _block_index) do 118 + :ok 119 + end 120 + 121 + defp do_write_blocks(<<bin::binary-size(@block_size), rest::binary>>, file_data, fd, block_index) do 122 + write_block(file_data, fd, block_index, 0, bin) 123 + do_write_blocks(rest, file_data, fd, block_index + 1) 124 + end 125 + 126 + defp do_write_blocks(<<bin::binary>>, file_data, fd, block_index) do 127 + # Last block may be smaller than block_size 128 + write_block(file_data, fd, block_index, 0, bin) 129 + :ok 130 + end 131 + 132 + # TODO: handle case where (pos == 0 and byte_size(bin) == block_size) more efficiently 133 + defp write_block(file_data, fd, block_index, pos, bin) do 134 + key = {fd, block_index} 135 + existing_data = case :ets.lookup(file_data, {fd, block_index}) do 136 + [{_key, existing}] -> existing 137 + [] -> <<0::integer-unit(8)-size(@block_size)>> 138 + end 139 + 140 + << 141 + prefix::binary-size(pos), 142 + _::binary-size(byte_size(bin)), 143 + suffix::binary, 144 + >> = existing_data 145 + 146 + # Copy is not needed here due to construction with prefix/suffix 147 + # TODO: confirm the above 148 + new_data = <<prefix::binary, bin::binary, suffix::binary>> 149 + :ets.insert(file_data, {key, new_data}) 150 + end 151 + end
+38 -13
test/trinity_test.exs
··· 6 6 7 7 defmodule Counter do 8 8 use GenServer 9 - alias Trinity.SimServer 9 + alias Trinity.{SimServer, SimFile} 10 + 11 + def start_link(id, initial_count) do 12 + SimServer.start_link(__MODULE__, %{id: id, initial_count: initial_count}) 13 + end 14 + 15 + def add(server, amount) do 16 + SimServer.call(server, {:add, amount}) 17 + end 10 18 11 - def start_link(initial_count), do: SimServer.start_link(__MODULE__, initial_count) 12 - def add(server, amount), do: SimServer.call(server, {:add, amount}) 19 + def init(%{id: id, initial_count: initial_count}) do 20 + state = %{ 21 + fd: SimFile.open("/counters/#{id}.count", [:read, :write]), 22 + size: nil, 23 + } 24 + state = write_value(state, initial_count) 25 + {:ok, state} 26 + end 27 + 28 + def handle_call({:add, amount}, _from, state) do 29 + value = read_value(state) 30 + value = value + amount 31 + state = write_value(state, value) 32 + {:reply, value, state} 33 + end 13 34 14 - def init(initial_count) do 15 - {:ok, initial_count} 35 + defp read_value(%{fd: fd, size: size}) do 36 + data = SimFile.pread(fd, 3, size) 37 + "The current value of the counter is: " <> value_str = data 38 + String.to_integer(value_str) 16 39 end 17 40 18 - def handle_call({:add, amount}, _from, count) do 19 - count = count + amount 20 - {:reply, count, count} 41 + defp write_value(%{fd: fd} = state, value) do 42 + data = "The current value of the counter is: " <> Integer.to_string(value) 43 + SimFile.pwrite(fd, 3, data) 44 + %{state | size: byte_size(data)} 21 45 end 22 46 end 23 47 ··· 38 62 end 39 63 40 64 pids = Enum.map(1..10, fn i -> 41 - {:ok, pid} = Counter.start_link(i) 42 - pid 65 + {:ok, pid} = Counter.start_link(i, i) 66 + {pid, i} 43 67 end) 44 68 45 69 SimProcess.send parent, {ref, pids} ··· 55 79 56 80 dbg pids 57 81 58 - Enum.each(pids, fn pid -> 59 - dbg Counter.add(pid, 10) 82 + Enum.each(pids, fn {pid, id} -> 83 + result = Counter.add(pid, 10) 84 + assert result == (id + 10) 60 85 end) 61 86 62 87 Scheduler.yield(1000) 63 - dbg Scheduler.dump(), limit: :infinity 88 + #dbg Scheduler.dump(), limit: :infinity 64 89 end) 65 90 end 66 91 end