this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Perform shard import reads asynchronously

garrison 2298a0a1 48147269

+111 -81
+1 -1
lib/servers/commit_buffer.ex
··· 11 11 import Hobbes.Utils 12 12 13 13 # TODO: centralize types? 14 - @type shard :: {binary, binary, [integer]} 14 + @type shard :: {binary, binary, {[integer], [pid]}} 15 15 16 16 @flush_interval_ms 1 17 17 @max_buffer_size 300
+110 -80
lib/servers/storage.ex
··· 14 14 15 15 defmodule ShardImport do 16 16 @type t :: %__MODULE__{ 17 - ref: reference, 17 + id: reference, 18 + nonce: reference, 18 19 initiated_version: non_neg_integer, 19 20 completed_version: non_neg_integer | nil, 20 21 start_key: binary, 21 22 end_key: binary, 22 - current_key: binary, 23 + current_end_key: binary, 24 + current_read_version: non_neg_integer, 23 25 status: :importing | :complete, 24 26 } 25 - @enforce_keys [:ref, :initiated_version, :completed_version, :start_key, :end_key, :current_key, :status] 27 + @enforce_keys [:id, :nonce, :initiated_version, :completed_version, :start_key, :end_key, :current_end_key, :current_read_version, :status] 26 28 defstruct @enforce_keys 27 29 end 28 30 ··· 77 79 end 78 80 79 81 @ping_distributor_interval_ms 1_000 80 - #@peek_interval_ms 1 81 82 @flush_interval_ms 250 82 - @import_interval_ms 10 83 83 84 84 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 85 85 ··· 115 115 catch 116 116 :exit, {:timeout, _} -> {:error, :timeout} 117 117 end 118 + end 119 + 120 + @spec read_range_async(pid, reference, non_neg_integer, binary, binary, keyword) :: :ok 121 + def read_range_async(server, nonce, read_version, start_key, end_key, opts) 122 + when is_reference(nonce) and is_integer(read_version) and is_binary(start_key) and is_binary(end_key) do 123 + limit = Keyword.fetch!(opts, :limit) 124 + SimServer.cast(server, {:read_range_async, self(), nonce, read_version, start_key, end_key, limit}) 118 125 end 119 126 120 127 @spec get_shard_stats(pid, binary, binary) :: {:ok, ShardStats.t} ··· 265 272 end 266 273 end 267 274 275 + def handle_cast({:read_range_async, from_pid, nonce, read_version, start_key, end_key, limit}, %State{} = state) do 276 + with :ok <- check_up_to_date(state, read_version), 277 + :ok <- check_contains_range(state, start_key, end_key) 278 + do 279 + %RangeResult{} = result = HybridKV.scan(state.kv, read_version, start_key, end_key, limit: limit, reverse: false) 280 + SimServer.simulate_work([1, 2, 3, 10, 30]) 281 + SimServer.send from_pid, {:read_range_result, nonce, result} 282 + else 283 + {:error, _err} = error -> 284 + SimServer.send from_pid, {:read_range_result, nonce, error} 285 + end 286 + {:noreply, state} 287 + end 288 + 268 289 def handle_info(:tick_ping, %State{} = state) do 269 290 state = ping_distributor(state) 270 291 SimServer.send_after(self(), :tick_ping, @ping_distributor_interval_ms) ··· 293 314 {:noreply, peek_logs(state)} 294 315 end 295 316 317 + def handle_info({:read_range_result, nonce, %RangeResult{} = result}, %State{} = state) do 318 + case fetch_import_by_nonce(state, nonce) do 319 + {:ok, %ShardImport{} = shard_import} -> 320 + {:noreply, on_import_result(shard_import, result, state)} 321 + 322 + # Ignore old reply 323 + :error -> {:noreply, state} 324 + end 325 + end 326 + 327 + def handle_info({:read_range_result, nonce, {:error, _error}}, %State{} = state) do 328 + case fetch_import_by_nonce(state, nonce) do 329 + {:ok, %ShardImport{} = import} -> 330 + # TODO: maybe some sort of delay or backoff here? 331 + {:noreply, tick_import(import, state)} 332 + 333 + # Ignore old reply 334 + :error -> {:noreply, state} 335 + end 336 + end 337 + 338 + def handle_info({:import_read_range_timeout, nonce}, %State{} = state) do 339 + case fetch_import_by_nonce(state, nonce) do 340 + {:ok, %ShardImport{} = import} -> 341 + {:noreply, tick_import(import, state)} 342 + 343 + # Ignore timeout for old nonce 344 + :error -> {:noreply, state} 345 + end 346 + end 347 + 296 348 def handle_info(:flush, state) do 297 349 state = flush(state) 298 350 SimServer.send_after(self(), :flush, @flush_interval_ms) 299 351 {:noreply, state} 300 - end 301 - 302 - def handle_info({:tick_import, ref}, %State{} = state) when is_reference(ref) do 303 - {:noreply, tick_import(ref, state)} 304 352 end 305 353 306 354 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) when cluster.generation >= state.cluster.generation do ··· 570 618 %State{state | shard_clears: [sc | state.shard_clears]} 571 619 end 572 620 621 + defp fetch_import_by_nonce(%State{} = state, nonce) when is_reference(nonce) do 622 + case Map.values(state.imports) |> Enum.find(fn %ShardImport{} = import -> import.nonce == nonce end) do 623 + %ShardImport{} = import -> {:ok, import} 624 + nil -> :error 625 + end 626 + end 627 + 573 628 defp begin_import_shard(version, start_key, end_key, %State{} = state) 574 629 when is_integer(version) and is_binary(start_key) do 575 - ref = make_ref() 576 630 shard_import = %ShardImport{ 577 - ref: ref, 631 + id: make_ref(), 632 + nonce: make_ref(), 578 633 initiated_version: version, 579 634 completed_version: nil, 580 635 start_key: start_key, 581 636 end_key: end_key, 582 - current_key: start_key, 637 + current_end_key: start_key, 638 + current_read_version: 0, 583 639 status: :importing, 584 640 } 585 641 586 - SimServer.send_after(self(), {:tick_import, ref}, 0) 587 - 588 - %State{state | imports: Map.put(state.imports, ref, shard_import)} 642 + SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 0) 643 + put_in(state.imports[shard_import.id], shard_import) 589 644 end 590 645 591 646 defp remove_import(start_key, cancelled?, %State{} = state) when is_binary(start_key) and is_boolean(cancelled?) do ··· 597 652 # TODO: clear byte sample 598 653 end 599 654 600 - %State{state | imports: Map.delete(state.imports, si.ref)} 655 + %State{state | imports: Map.delete(state.imports, si.id)} 601 656 end 602 657 603 - defp tick_import(import_ref, %State{} = state) when is_reference(import_ref) do 604 - %ShardImport{} = si = Map.fetch!(state.imports, import_ref) 605 - 606 - # For correctness, the read_version must obey two invariants: 607 - # 608 - # `read_version >= si.initiated_version` 609 - # This ensures that there is no gap in versions between the pairs 610 - # we fetch and the stream of changes we receive from tlogs. 611 - # 612 - # `read_version >= state.durable_version` 613 - # This ensures that the pairs we fetch do not clobber any later versions 614 - # which have reached storage before the fetch. 615 - # 616 - # The choice of `state.data_version` should obey both invariants. 617 - read_version = state.data_version 618 - 619 - # TODO: assert invariants 658 + defp tick_import(%ShardImport{} = shard_import, %State{} = state) do 659 + # TODO: maybe better to read at a slightly older version to prevent too_new errors? 660 + shard_import = %{shard_import | nonce: make_ref(), current_read_version: state.data_version} 661 + state = put_in(state.imports[shard_import.id], shard_import) 620 662 621 - # TODO: it would be more efficient to pipeline these requests and receive 622 - # the response in a handle_info so that we can serve reads while waiting 623 - # (this would also prevent import deadlocks which can currently cause timeouts) 624 - try do 625 - %Server{pid: buf_pid} = state.cluster |> get_servers(Hobbes.Servers.CommitBuffer) |> hd() 663 + %Server{pid: buf_pid} = state.cluster |> get_servers(Hobbes.Servers.CommitBuffer) |> Enum.random() 626 664 627 - {:ok, [{_sk, _ek, {_ids, from_pids}}]} = too_new_backoff(fn -> 628 - CommitBuffer.get_shards(buf_pid, {si.current_key, si.end_key}) 629 - end) 630 - # TODO: handle all nil 631 - {:ok, from_pid} = random_not_nil(from_pids) 665 + with {:ok, [[{_sk, _ek, {_ids, from_pids}}]]} <- CommitBuffer.get_shards_multi(buf_pid, [{shard_import.start_key, shard_import.end_key}]), 666 + {:ok, from_storage_pid} <- random_not_nil(from_pids) 667 + do 668 + Storage.read_range_async(from_storage_pid, shard_import.nonce, shard_import.current_read_version, shard_import.current_end_key, shard_import.end_key, limit: 4) 632 669 633 - # TODO: handle wrong_server (probably very rare in practice) 634 - too_new_backoff(fn -> 635 - Storage.read_range(from_pid, read_version, si.current_key, si.end_key, limit: 4, timeout: 1000) 636 - end) 637 - catch 638 - :exit, {:timeout, _mfa} -> 639 - {:error, :timeout} 670 + SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 300) 671 + state 672 + else 673 + # Failure cases: 674 + # get_shards_multi times out 675 + # get_shards_multi returns multiple shards (only possible if CommitBuffer is lagging behind, very unlikely) 676 + # from_pids is all nil because CommitBuffer does not know current storage pids 677 + _ -> 678 + # Retry 679 + SimServer.send_after(self(), {:import_read_range_timeout, shard_import.nonce}, 100) 680 + state 640 681 end 641 - |> case do 642 - {:ok, %RangeResult{pairs: pairs, more: more}} -> 643 - # Write imported KV pairs into the mutation log at `read_version` 644 - mutations = Enum.map(pairs, fn {k, v} -> {:write, k, v} end) 645 - MutationLog.append(state.kv.mutation_log, read_version, mutations) 682 + end 646 683 647 - case more do 648 - true -> 649 - {last_key, _last_value} = List.last(pairs) 650 - # TODO: this should really be increment(last_key), i.e. last_key <> "\x00" 651 - si = %ShardImport{si | current_key: last_key} 684 + defp on_import_result(%ShardImport{} = shard_import, %RangeResult{} = result, %State{} = state) do 685 + %{start_key: start_key, end_key: end_key} = shard_import 686 + mutations = Enum.map(result.pairs, fn {k, v} -> 687 + assert k >= start_key 688 + assert k < end_key 689 + {:write, k, v} 690 + end) 652 691 653 - SimServer.send_after(self(), {:tick_import, si.ref}, @import_interval_ms) 654 - %State{state | imports: Map.put(state.imports, si.ref, si)} 692 + MutationLog.append(state.kv.mutation_log, shard_import.current_read_version, mutations) 655 693 656 - false -> 657 - # We have finished fetching pairs for this import 658 - # The shard data on disk is NOT in a consistent state, but the versions 659 - # in memory and/or on the tlogs will ensure that all reads see a consistent 660 - # state and the disk state will become consistent eventually 661 - # (due to idempotency of writes) 662 - si = %ShardImport{si | status: :complete, completed_version: read_version} 663 - %State{state | imports: Map.put(state.imports, si.ref, si)} 664 - end 694 + case result.more do 695 + true -> 696 + {last_key, _value} = List.last(result.pairs) 697 + shard_import = %{shard_import | current_end_key: last_key <> "\x00"} 698 + state = put_in(state.imports[shard_import.id], shard_import) 665 699 666 - {:error, :read_version_too_old} -> 667 - # This server is lagging by a lot, retry 668 - # TODO: longer interval here? 669 - SimServer.send_after(self(), {:tick_import, si.ref}, @import_interval_ms) 670 - state 700 + # TODO: maybe add a delay here to throttle? 701 + tick_import(shard_import, state) 671 702 672 - {:error, :timeout} -> 673 - # If the request times out (possibly due to import deadlock), retry 674 - SimServer.send_after(self(), {:tick_import, si.ref}, @import_interval_ms) 675 - state 703 + false -> 704 + shard_import = %{shard_import | nonce: nil, status: :complete, completed_version: shard_import.current_read_version} 705 + put_in(state.imports[shard_import.id], shard_import) 676 706 end 677 707 end 678 708