···331331 # Once complete, any future recoveries will have to start from these TLogs
332332 :ok = Coordinator.write_generation(state.primary_coordinator, first_tlog_generation.generation, first_tlog_generation.start_version, first_tlog_generation.tlog_ids)
333333334334- state = put_in(state.cluster.tlog_generations, [first_tlog_generation])
335335-336334 put_in(state.cluster.status, :normal)
337335 end
338336···389387 new_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: last_generation_end_version + 1, tlog_ids: ids.tlog}
390388 assert new_tlog_generation.start_version > prev_tlog_generation.start_version
391389392392- state = update_in(state.cluster.tlog_generations, &[new_tlog_generation | &1])
390390+ state = update_in(state.cluster.tlog_generations, fn [%TLogGeneration{} = last_gen | rest] ->
391391+ last_gen = %{last_gen | end_version: last_generation_end_version}
392392+ [new_tlog_generation, last_gen | rest]
393393+ end)
393394394395 # Recruit new TLogs
395396 state = recruit_tlogs(state, ids.tlog, supervisors_tlogs, last_generation_end_version, meta_pairs)
+11-19
lib/servers/storage.ex
···462462 defp peek_logs(%State{} = state) do
463463 assert state.cluster.status == :normal
464464465465- # Get the preferred tlog for this storage server
466466- %TLogGeneration{} = generation = current_tlog_generation(state)
467467-468465 nonce = make_ref()
469466 tag = state.id
470467 start_version = state.data_version + 1
471468469469+ %TLogGeneration{} = generation = tlog_generation_for_version(state.cluster.tlog_generations, start_version)
470470+ end_version = generation.end_version
471471+472472+ if end_version != nil do
473473+ assert is_integer(end_version)
474474+ assert end_version >= start_version
475475+ end
476476+472477 buddy_server = Map.get(state.cluster.servers, buddy_tlog_id(generation.tlog_ids, state.id))
473478474479 case state.buddy_tlog_failed or is_nil(buddy_server) do
···484489 [pid]
485490 end
486491 |> Enum.each(fn pid ->
487487- TLog.peek(pid, nonce, tag, start_version)
492492+ TLog.peek(pid, nonce, tag, start_version, end_version)
488493 end)
489494490495 SimServer.send_after self(), {:peek_timeout, nonce}, 300
···494499 defp on_peek_result(%State{} = state, %PeekResult{} = result) do
495500 assert result.nonce == state.peek_nonce
496501497497- %TLogGeneration{} = generation = current_tlog_generation(state)
502502+ # TODO: maybe better to store start_version on peek instead of getting it back?
503503+ %TLogGeneration{} = generation = tlog_generation_for_version(state.cluster.tlog_generations, result.start_version)
498504 assert result.tlog_id in generation.tlog_ids
499505500506 case result.tlog_id == buddy_tlog_id(generation.tlog_ids, state.id) do
···524530 defp apply_merged_peek_results(%State{} = state) do
525531 results = state.peek_results |> Map.values() |> Enum.sort_by(&(&1.tlog_id))
526532527527- assert length(results) >= (length(current_tlog_generation(state).tlog_ids) - 2)
528528-529533 Enum.each(results, fn %PeekResult{} = result ->
530534 assert result.start_version == (state.data_version + 1)
531535 assert result.end_version >= state.data_version
···542546 state = apply_batches(state, batches)
543547 SimServer.send_after self(), :peek_retry, 1
544548 %{state | data_version: min_end_version, peek_nonce: nil}
545545- end
546546-547547- defp current_tlog_generation(%State{} = state) do
548548- %TLogGeneration{} = generation =
549549- Enum.find(state.cluster.tlog_generations, fn %TLogGeneration{} = tlg ->
550550- tlg.start_version <= state.data_version
551551- end)
552552-553553- assert generation.start_version <= state.data_version
554554- assert generation.end_version >= state.data_version
555555-556556- generation
557549 end
558550559551 defp apply_batches(%State{} = state, batches) when is_list(batches) do
+11-1
lib/utils.ex
···33 Hobbes utils.
44 """
5566- alias Hobbes.Structs.{Cluster, Server}
66+ alias Hobbes.Structs.{Cluster, TLogGeneration, Server}
77 alias Hobbes.Construct.SimServer
88 alias Hobbes.Encoding.Keyset
99···165165 def buddy_tlog_id(tlog_ids, server_id) when is_list(tlog_ids) and is_integer(server_id) do
166166 index = rem(server_id, length(tlog_ids))
167167 Enum.at(tlog_ids, index)
168168+ end
169169+170170+ @spec tlog_generation_for_version([TLogGeneration.t], non_neg_integer) :: TLogGeneration.t
171171+ def tlog_generation_for_version(generations, version) when is_list(generations) and is_integer(version) do
172172+ case Enum.find(generations, fn %TLogGeneration{} = tlg -> tlg.start_version <= version end) do
173173+ %TLogGeneration{} = generation ->
174174+ generation
175175+176176+ nil -> raise "Could not find a generation for version #{inspect(version)} in #{inspect(generations)}"
177177+ end
168178 end
169179170180 @doc """