this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Perform storage compaction asynchronously

garrison 46ca3317 609d7d52

+72 -19
+43 -4
lib/servers/storage.ex
··· 50 50 durable_version: integer, 51 51 known_committed_version: integer, 52 52 53 - peek_nonce: reference, 53 + work_nonce: reference | nil, 54 + 55 + peek_nonce: reference | nil, 54 56 peek_tlog_i: non_neg_integer, 55 57 56 58 xks: XKS.t, ··· 65 67 :data_version, 66 68 :durable_version, 67 69 :known_committed_version, 70 + 71 + :work_nonce, 68 72 69 73 :peek_nonce, 70 74 :peek_tlog_i, ··· 195 199 durable_version: startup_version, 196 200 known_committed_version: startup_version, 197 201 202 + work_nonce: nil, 203 + 198 204 peek_nonce: nil, 199 205 # TODO: would be good to randomize this but we don't know the replication factor here 200 206 peek_tlog_i: 0, ··· 226 232 end) 227 233 228 234 SimProcess.send_after(self(), :tick_ping, 0) 235 + SimProcess.send_after(self(), :work, 0) 229 236 SimProcess.send_after(self(), :flush, @flush_interval_ms) 230 237 SimProcess.send_after(self(), :peek_retry, 0) 231 238 {:ok, state} ··· 342 349 {:noreply, state} 343 350 end 344 351 352 + def handle_info(:work, %State{} = state) do 353 + state = work(state) 354 + {:noreply, state} 355 + end 356 + 357 + def handle_info({:work_result, nonce, result}, %State{} = state) do 358 + assert nonce == state.work_nonce 359 + state = on_work_result(state, result) 360 + {:noreply, state} 361 + end 362 + 345 363 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) when cluster.generation >= state.cluster.generation do 346 364 old_gen = state.cluster.tlog_generations |> Enum.map(&(&1.generation)) |> Enum.max(&>=/2, fn -> 1 end) 347 365 new_gen = cluster.tlog_generations |> Enum.map(&(&1.generation)) |> Enum.max(&>=/2, fn -> 1 end) ··· 387 405 Distributor.storage_ping(distributor_pid, state.id, storage_stats, shard_stats) 388 406 end 389 407 390 - defp flush(%State{} = state) do 408 + defp work(%State{} = state) do 409 + %{xks: xks} = state 410 + parent_pid = self() 411 + work_nonce = make_ref() 412 + 413 + SimProcess.spawn_link(fn -> 414 + result = XKS.perform_compaction(xks) 415 + SimProcess.send parent_pid, {:work_result, work_nonce, result} 416 + end) 417 + 418 + %{state | work_nonce: work_nonce} 419 + end 420 + 421 + defp on_work_result(%State{} = state, result) do 391 422 %{xks: xks} = state 392 - case XKS.perform_compaction(xks) do 393 - :ok -> XKS.garbage_collect_tables(xks) 423 + 424 + case result do 425 + :ok -> XKS.commit(xks) 394 426 :error -> :noop 395 427 end 428 + XKS.maybe_rotate_memtable(xks, @special_partiton) 429 + XKS.maybe_rotate_memtable(xks, @data_partition) 396 430 431 + SimProcess.send_after(self(), :work, 1000) 432 + %{state | work_nonce: nil} 433 + end 434 + 435 + defp flush(%State{} = state) do 397 436 # TODO: compute durable_version from XKS 398 437 durable_version = max(state.data_version - mvcc_window(), 0) 399 438
+5
lib/servers/tlog.ex
··· 418 418 defp load_meta_pairs(%XKS{} = xks, pairs) do 419 419 mutations = Enum.map(pairs, fn {k, v} -> {:write, k, v} end) 420 420 XKS.apply_batch(xks, @meta_partition, 1, mutations) 421 + XKS.maybe_rotate_memtable(xks, @meta_partition) 421 422 end 422 423 423 424 defp compact_and_commit(%XKS{} = xks) do ··· 451 452 # TODO: mutations should come from CommitBuffer grouped this way 452 453 groups = group_mutations(batch.tagged_mutations) 453 454 state = do_append_groups(groups, batch.commit_version, state) 455 + 456 + XKS.maybe_rotate_memtable(state.xks, @meta_partition) 457 + XKS.maybe_rotate_memtable(state.xks, @queue_partition) 458 + 454 459 state 455 460 end 456 461
+1 -1
lib/xks/free_list.ex
··· 20 20 assert is_integer(extent_block_count) 21 21 assert extent_block_count >= 1 22 22 23 - table = :ets.new(__MODULE__, [:ordered_set, :private]) 23 + table = :ets.new(__MODULE__, [:ordered_set, :public]) 24 24 :ets.insert(table, {:max_block, 0}) 25 25 :ets.insert(table, {:extent_block_count, extent_block_count}) 26 26
+1
lib/xks/fuzz/model_fuzz.ex
··· 93 93 batch = make_batch(opts.key_bits, Enum.random(opts.batch_size_range), []) 94 94 95 95 XKS.apply_batch(xks, partition, version, batch) 96 + XKS.maybe_rotate_memtable(xks, partition) 96 97 simple_kv = SimpleKV.apply_batch(simple_kv, version, batch) 97 98 98 99 max_pv = Enum.random(:atomics.get(xks.max_persist_version_atomic, 1)..version)
+1 -1
lib/xks/manifest.ex
··· 69 69 70 70 @spec new :: t 71 71 def new do 72 - table = :ets.new(__MODULE__, [:ordered_set, :protected]) 72 + table = :ets.new(__MODULE__, [:ordered_set, :public]) 73 73 74 74 table 75 75 end
+1 -1
lib/xks/manifest_log.ex
··· 11 11 defmacrop delete_entry_type, do: 1 12 12 13 13 def new do 14 - log = :ets.new(__MODULE__, [:ordered_set, :private]) 14 + log = :ets.new(__MODULE__, [:ordered_set, :public]) 15 15 :ets.insert(log, {:current_index, 0}) 16 16 17 17 log
+1 -1
lib/xks/memory_store.ex
··· 3 3 4 4 @type new :: t 5 5 def new(block_size) when is_integer(block_size) do 6 - store = :ets.new(__MODULE__, [:set, :protected]) 6 + store = :ets.new(__MODULE__, [:set, :public]) 7 7 :ets.insert(store, {:block_size, block_size}) 8 8 9 9 store
+1 -1
lib/xks/memtable.ex
··· 5 5 6 6 @spec new :: t 7 7 def new do 8 - :ets.new(__MODULE__, [:ordered_set, :protected]) 8 + :ets.new(__MODULE__, [:ordered_set, :public]) 9 9 end 10 10 11 11 @spec apply_mutations(XKS.t, t, integer, non_neg_integer, list) :: non_neg_integer
+1 -1
lib/xks/work_queue.ex
··· 12 12 13 13 @spec new :: t 14 14 def new do 15 - queue = :ets.new(__MODULE__, [:ordered_set, :private]) 15 + queue = :ets.new(__MODULE__, [:ordered_set, :public]) 16 16 :ets.insert(queue, {:next_i, 0}) 17 17 queue 18 18 end
+17 -9
lib/xks/xks.ex
··· 252 252 assert active_info.last_version == nil 253 253 254 254 batch_size_bytes = Memtable.apply_mutations(xks, active_info.memtable, partition, version, mutations) 255 - active_info = Manifest.inc_memtable_size(manifest, active_info, batch_size_bytes) 255 + Manifest.inc_memtable_size(manifest, active_info, batch_size_bytes) 256 256 257 257 :atomics.put(xks.version_atomic, 1, version) 258 - 259 - # TODO: configurable 260 - memtable_size_max = 32 * 1024 261 - case active_info.size_bytes > memtable_size_max do 262 - true -> rotate_memtable(xks, partition) 263 - false -> :noop 264 - end 265 - 266 258 :ok 267 259 end 268 260 ··· 295 287 ManifestLog.delete_table(manifest_log, table_info) 296 288 end) 297 289 :ok 290 + end 291 + 292 + @spec maybe_rotate_memtable(t, integer) :: boolean 293 + def maybe_rotate_memtable(%XKS{} = xks, partition) when is_integer(partition) do 294 + %{manifest: manifest, epoch_atomic: epoch_atomic} = xks 295 + epoch = :atomics.get(epoch_atomic, 1) 296 + 297 + [%MemtableInfo{} = active_info | _] = Manifest.list_memtables(manifest, partition, epoch) 298 + # TODO: configurable 299 + memtable_size_max = 32 * 1024 300 + case active_info.size_bytes > memtable_size_max do 301 + true -> 302 + rotate_memtable(xks, partition) 303 + true 304 + false -> false 305 + end 298 306 end 299 307 300 308 @spec rotate_memtable(t, integer) :: :ok