···138138 end
139139140140 defp maybe_complete_flush(%State{} = state) do
141141- # We use the replication_factor for the current generation to do the liveness check
142142- %TLogGeneration{replication_factor: replication_factor} = hd(state.cluster.tlog_generations)
141141+ %{
142142+ cluster: cluster,
143143+ check_locked_reply_ids: check_locked_reply_ids,
144144+ } = state
143145144144- # TODO: handle teams
145145- case length(state.check_locked_reply_ids) >= replication_factor do
146146- true ->
147147- read_version = state.batch_read_version
146146+ # Retrieve TLog teams for the current generation
147147+ %TLogGeneration{} = gen = hd(cluster.tlog_generations)
148148+ tlog_teams = Enum.chunk_every(gen.tlog_ids, gen.replication_factor, gen.replication_factor, :discard)
148149149149- Enum.each(state.batch_buffer, fn from ->
150150- SimServer.reply(from, {:ok, read_version})
151151- end)
150150+ # Liveness is confirmed when all TLogs from any team confirm they are not locked,
151151+ # as a recovery must lock at least one TLog from every team to proceed
152152+ liveness_confirmed? = Enum.any?(tlog_teams, fn ids ->
153153+ Enum.all?(ids, fn id -> id in check_locked_reply_ids end)
154154+ end)
152155153153- %{state |
154154- batch_buffer: nil,
155155- batch_read_version: nil,
156156- check_locked_nonce: nil,
157157- check_locked_reply_ids: nil,
158158- }
159159-156156+ case liveness_confirmed? do
157157+ true -> do_complete_flush(state)
160158 false -> state
161159 end
160160+ end
161161+162162+ defp do_complete_flush(%State{} = state) do
163163+ read_version = state.batch_read_version
164164+165165+ Enum.each(state.batch_buffer, fn from ->
166166+ SimServer.reply(from, {:ok, read_version})
167167+ end)
168168+169169+ %{state |
170170+ batch_buffer: nil,
171171+ batch_read_version: nil,
172172+ check_locked_nonce: nil,
173173+ check_locked_reply_ids: nil,
174174+ }
162175 end
163176end