this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use file XKS in sim

garrison 1b002bcb 64ba25c9

+63 -17
+12 -3
lib/servers/coordinator.ex
··· 1 1 defmodule Hobbes.Servers.Coordinator do 2 2 use GenServer 3 - alias Trinity.{SimProcess, SimServer, SimLogger} 3 + alias Trinity.{Sim, SimProcess, SimServer, SimLogger} 4 4 require SimLogger 5 5 6 6 import ExUnit.Assertions, only: [assert: 1] ··· 300 300 replica_ids = Enum.to_list(0..(length(replicas) - 1)) 301 301 quorum_size = div(length(replica_ids), 2) + 1 302 302 303 - # TODO: larger blocks outside tests 304 - xks = XKS.new(block_size: 1024, lsm_subtable_size: 1024) 303 + path = 304 + case Sim.simulated?() do 305 + true -> "/coordinator_#{id}.xks" 306 + false -> :memory 307 + end 308 + 309 + xks = XKS.new([ 310 + path: path, 311 + block_size: 1024, 312 + lsm_subtable_size: 1024, 313 + ]) 305 314 # TODO: init @state_partition once required 306 315 307 316 # TODO: load version from XKS
+14 -9
lib/servers/server_supervisor.ex
··· 1 1 defmodule Hobbes.Servers.ServerSupervisor do 2 2 use GenServer 3 - alias Trinity.{Sim, SimProcess, SimServer} 3 + alias Trinity.{Sim, SimProcess, SimServer, SimFile} 4 4 5 - alias Hobbes.Construct.SimFile 6 5 import ExUnit.Assertions, only: [assert: 1] 7 6 8 7 alias Hobbes.Structs.Cluster ··· 208 207 209 208 defp load_stateful_slots(%State{} = state) do 210 209 # TODO: check if cluster is in-memory instead 211 - case false && Sim.simulated?() do 210 + case Sim.simulated?() do 212 211 true -> 213 212 state 214 213 |> load_tlog_slots() ··· 301 300 [slot_path | rest] -> 302 301 state = %{state | open_tlog: rest} 303 302 304 - kv_path = slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 305 - # TODO 306 - #if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 303 + # TODO: check for in-memory cluster instead 304 + kv_path = case Sim.simulated?() do 305 + true -> slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 306 + false -> :memory 307 + end 308 + if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 307 309 308 310 arg = Map.put(arg, :path, kv_path) 309 311 restart_arg = %{path: kv_path} ··· 320 322 [slot_path | rest] -> 321 323 state = %{state | open_storage: rest} 322 324 323 - kv_path = slot_path <> "/storage.kv" 324 - # TODO 325 - #if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 325 + # TODO: check for in-memory cluster instead 326 + kv_path = case Sim.simulated?() do 327 + true -> slot_path <> "/storage.kv" 328 + false -> :memory 329 + end 330 + if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 326 331 327 332 arg = Map.put(arg, :path, kv_path) 328 333 restart_arg = %{path: kv_path}
+8 -2
lib/servers/storage.ex
··· 149 149 @data_partition 1 150 150 defmacro durable_version_key, do: "durable_version" 151 151 152 - def init(%{cluster: %Cluster{} = cluster, path: _path, id: id, storage_team_id: storage_team_id}) do 152 + def init(%{cluster: %Cluster{} = cluster, path: path, id: id, storage_team_id: storage_team_id}) do 153 153 # Init new storage server 154 154 assert is_integer(id) 155 + assert is_binary(path) or path == :memory 156 + 155 157 # TODO: larger outside sim 156 - xks = XKS.new(block_size: 1024, lsm_subtable_size: 1024) 158 + xks = XKS.new([ 159 + path: path, 160 + block_size: 1024, 161 + lsm_subtable_size: 1024, 162 + ]) 157 163 158 164 # TODO: init state_partition (currently initialized by new()) 159 165 XKS.init_partition(xks, @special_partiton)
+3 -2
lib/servers/tlog.ex
··· 128 128 SimServer.call(server, {:read_meta_store, version}) 129 129 end 130 130 131 - def init(%{cluster: %Cluster{} = cluster, path: _path, id: id, prev_version: prev_version, meta_pairs: meta_pairs}) do 131 + def init(%{cluster: %Cluster{} = cluster, path: path, id: id, prev_version: prev_version, meta_pairs: meta_pairs}) do 132 132 # Init new TLog 133 + assert is_binary(path) or path == :memory 133 134 assert is_integer(id) 134 135 assert is_integer(prev_version) 135 136 assert is_list(meta_pairs) 136 137 137 - xks = XKS.new() 138 + xks = XKS.new(path: path) 138 139 # TODO: init state partition once XKS does not init partition 0 during new() 139 140 XKS.init_partition(xks, @meta_partition) 140 141 XKS.init_partition(xks, @queue_partition)
-1
lib/workloads.ex
··· 68 68 results = 69 69 Enum.map(seeds, fn seed -> 70 70 Task.Supervisor.async_nolink(supervisor, fn -> 71 - # TODO: only run in sim if opts[:simulated] 72 71 if opts[:simulated] do 73 72 Sim.run_simulation(fn -> 74 73 run_one(seed, workloads, opts)
+8
lib/xks/blocks.ex
··· 119 119 MemoryStore.put(memory_store, extent_index, existing <> data) 120 120 end 121 121 122 + defp do_write_extent({:file, file_store}, extent_index, position, data) do 123 + FileStore.put_extent(file_store, extent_index, position, data) 124 + end 125 + 122 126 @spec read_extent(XKS.block_store, non_neg_integer, non_neg_integer, non_neg_integer) :: binary 123 127 def read_extent(block_store, extent_index, position, size) do 124 128 do_read_extent(block_store, extent_index, position, size) ··· 137 141 >> = data 138 142 139 143 read_data 144 + end 145 + 146 + defp do_read_extent({:file, file_store}, extent_index, position, size) do 147 + FileStore.get_extent(file_store, extent_index, position, size) 140 148 end 141 149 end
+18
lib/xks/file_store.ex
··· 79 79 {:ok, binary} -> binary 80 80 end 81 81 end 82 + 83 + @spec put_extent(t, non_neg_integer, non_neg_integer, binary) :: :ok 84 + def put_extent(%FileStore{} = file_store, extent_index, position, binary) do 85 + %{fd: fd, block_size: block_size} = file_store 86 + 87 + loc = (extent_index * block_size) + position 88 + :ok = SimFile.pwrite(fd, loc, binary) 89 + end 90 + 91 + @spec get_extent(t, non_neg_integer, non_neg_integer, non_neg_integer) :: binary 92 + def get_extent(%FileStore{} = file_store, extent_index, position, bytes) do 93 + %{fd: fd, block_size: block_size} = file_store 94 + 95 + loc = (extent_index * block_size) + position 96 + case SimFile.pread(fd, loc, bytes) do 97 + {:ok, binary} -> binary 98 + end 99 + end 82 100 end