this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Handle import timeouts (deadlock) in storage server

+22 -5
+22 -5
lib/servers/storage.ex
··· 76 76 77 77 @spec read_range(pid, non_neg_integer, binary, binary, Keyword.t) :: {:ok, RangeResult.t} | {:error, :read_version_too_old} 78 78 def read_range(server, read_version, start_key, end_key, opts \\ []) do 79 + {timeout, opts} = Keyword.pop(opts, :timeout, 5000) 80 + 79 81 limit = Keyword.get(opts, :limit, :infinity) 80 82 reverse = Keyword.get(opts, :reverse, false) 81 - SimServer.call(server, {:read_range, read_version, start_key, end_key, limit, reverse}) 83 + SimServer.call(server, {:read_range, read_version, start_key, end_key, limit, reverse}, timeout) 82 84 end 83 85 84 86 @spec get_estimated_size(pid, non_neg_integer, binary, binary) :: {:ok, non_neg_integer} | {:error, :read_version_too_old | :read_version_too_new} ··· 187 189 def handle_info({:tick_import, ref}, %State{} = state) when is_reference(ref) do 188 190 {:noreply, tick_import(ref, state)} 189 191 end 192 + 193 + # If calls we make time out, we may receive their responses later 194 + def handle_info(_message, state), do: {:noreply, state} 190 195 191 196 defp up_to_date?(%State{} = state, version) do 192 197 cond do ··· 429 434 430 435 # TODO: assert invariants 431 436 432 - # TODO: this can deadlock, it needs to be pipelined 433 - too_new_backoff(fn -> 434 - Storage.read_range(from_pid, read_version, si.current_key, si.end_key, limit: 4) 435 - end) 437 + # TODO: it would be more efficient to pipeline these requests and receive 438 + # the response in a handle_info so that we can serve reads while waiting 439 + # (this would also prevent import deadlocks which can currently cause timeouts) 440 + try do 441 + too_new_backoff(fn -> 442 + Storage.read_range(from_pid, read_version, si.current_key, si.end_key, limit: 4, timeout: 1000) 443 + end) 444 + catch 445 + :exit, {{:timeout, _}, _} -> 446 + {:error, :timeout} 447 + end 436 448 |> case do 437 449 {:ok, %RangeResult{pairs: pairs, more: more}} -> 438 450 # Write imported KV pairs at `read_version` ··· 460 472 {:error, :read_version_too_old} -> 461 473 # This server is lagging by a lot, retry 462 474 # TODO: longer interval here? 475 + SimServer.send_after(self(), {:tick_import, si.ref}, @import_interval_ms) 476 + state 477 + 478 + {:error, :timeout} -> 479 + # If the request times out (possibly due to import deadlock), retry 463 480 SimServer.send_after(self(), {:tick_import, si.ref}, @import_interval_ms) 464 481 state 465 482 end