···2727 :ok
2828 end
29293030+ @spec delete_range(t, binary, binary) :: :ok
3131+ def delete_range(table, start_key, end_key) when is_binary(start_key) and is_binary(end_key) do
3232+ :ets.delete(table, start_key)
3333+ do_scan_delete(table, end_key, start_key)
3434+ end
3535+3636+ defp do_scan_delete(table, end_key, prev_key) do
3737+ case :ets.next(table, prev_key) do
3838+ key when is_binary(key) and key < end_key ->
3939+ :ets.delete(table, key)
4040+ do_scan_delete(table, end_key, key)
4141+4242+ _ -> :ok
4343+ end
4444+ end
4545+3046 @spec scan(t, binary, binary) :: [{binary, float}]
3147 def scan(table, start_key, end_key) do
3248 acc =
···8298 false -> acc
8399 end
841008585- {:clear_range, _k, _v}, _acc ->
8686- raise "Not implemented"
101101+ {:clear_range, sk, ek}, acc ->
102102+ delete_range(table, sk, ek)
103103+ # TODO: if the range was empty in the byte sample we don't need this mutation
104104+ mut = {:clear_range, special_byte_sample_prefix() <> sk, special_byte_sample_prefix() <> ek}
105105+ [mut | acc]
87106 end)
88107 |> Enum.reverse()
89108 end
+48-61
lib/servers/storage.ex
···2828 defstruct @enforce_keys
2929 end
30303131- defmodule ShardClear do
3232- @type t :: %__MODULE__{}
3333- @enforce_keys [:at_durable_version, :start_key, :end_key]
3434- defstruct @enforce_keys
3535- end
3636-3731 defmodule State do
3832 @type t :: %__MODULE__{
3933 id: non_neg_integer,
···5347 byte_sample: ByteSample.t,
54485549 imports: [ShardImport.t],
5656- shard_clears: [ShardClear.t],
5750 }
5851 @enforce_keys [
5952 :id,
···7366 :byte_sample,
74677568 :imports,
7676- :shard_clears,
7769 ]
7870 defstruct @enforce_keys
7971 end
···183175 byte_sample: ByteSample.new(),
184176185177 imports: %{},
186186- shard_clears: [],
187178 }
188179189180 byte_sample_pairs = HybridKV.scan(kv, 1, special_byte_sample_prefix(), special_byte_sample_end()).pairs
···381372 end
382373383374 defp flush(%State{kv: kv} = state) do
384384- flush_version = max(state.data_version - mvcc_window(), 0)
385385-386386- {shards_to_clear, remaining_clears} =
387387- Enum.split_with(state.shard_clears, fn %ShardClear{at_durable_version: adv} ->
388388- adv <= flush_version
389389- end)
390390-391391- # We process shard clears and flushes in total order
392392- # Each shard clear is executed on storage at the exact moment when all of the
393393- # relevant versions are in storage and out of memory
375375+ # We must maintain the invariant that
376376+ # state.durable_version < shard_import.current_read_version
377377+ # for all in-flight shard import reads
394378 #
395395- # Note: the vast majority of the time, there are no clears and this entire
396396- # pipeline is a noop
397397- shards_to_clear
398398- |> Enum.sort_by(fn %ShardClear{at_durable_version: adv} -> adv end)
399399- |> Enum.each(fn %ShardClear{at_durable_version: adv, start_key: start_key, end_key: end_key} ->
400400- # If this invariant were to be violated, we would clear data written after the shard was removed
401401- # from this server. Any mutations past that point would still be valid (if the shard was added back),
402402- # so that is not acceptable
403403- #
404404- # We avoid violating this invariant by processing clears and flushes in total order
405405- # The only way to violate this invariant is to queue a clear at a version < durable_version,
406406- # which should not happen
407407- if state.durable_version > adv, do: raise "Shard cleared too late"
379379+ # If this invariant is violated we could clobber newer versions with older reads in storage
380380+ #
381381+ # This could only happen if a read took longer than the mvcc window to complete, which
382382+ # is extremely unlikely in practice because the import read timeout window is considerably
383383+ # shorter than the mvcc window
384384+ min_import_version =
385385+ state.imports
386386+ |> Map.values()
387387+ |> Enum.reject(fn %ShardImport{} = si -> si.status == :complete end)
388388+ |> Enum.map(fn %ShardImport{} = si -> si.current_read_version end)
389389+ |> Enum.min(&<=/2, fn -> state.data_version end)
408390409409- HybridKV.flush(kv, adv)
391391+ flush_version =
392392+ state.data_version - mvcc_window()
393393+ |> min(min_import_version - 1)
394394+ |> max(0)
410395411411- HybridKV.delete_range_storage(kv, start_key, end_key)
412412- # TODO: clear byte sample
413413- end)
414414- state = %State{state | shard_clears: remaining_clears}
415415-416416- # Now that all shard clears are complete, we can flush the remaining versions
417396 HybridKV.flush(kv, flush_version)
418397 HybridKV.put_storage(kv, special_prefix() <> "durable_version", Integer.to_string(flush_version))
419398 HybridKV.commit(kv)
···594573 # Handle shard imports
595574 case {old_value, new_value} do
596575 {nil, "fetching/" <> end_key} ->
597597- begin_import_shard(version, start_key, end_key, state)
576576+ begin_import(version, start_key, end_key, state)
598577599578 {"fetching/" <> _, "complete/" <> _} ->
600600- remove_import(start_key, false, state)
579579+ remove_import(start_key, state)
601580602602- {"fetching/" <> _end_key, nil} ->
603603- remove_import(start_key, true, state)
581581+ {"fetching/" <> end_key, nil} ->
582582+ clear_shard(version, start_key, end_key, state)
583583+ remove_import(start_key, state)
604584605585 {"complete/" <> end_key, nil} ->
606606- queue_clear_shard(version, start_key, end_key, state)
586586+ clear_shard(version, start_key, end_key, state)
587587+ state
607588608589 # Shard split, we do nothing
609590 {"complete/" <> _, "complete/" <> _} -> state
···612593 end
613594 end
614595615615- defp queue_clear_shard(version, start_key, end_key, %State{} = state) when is_binary(start_key) do
616616- sc = %ShardClear{at_durable_version: version, start_key: start_key, end_key: end_key}
596596+ defp clear_shard(version, start_key, end_key, %State{} = state) when is_binary(start_key) and is_binary(end_key) do
597597+ mutations = [
598598+ {:clear_range, start_key, end_key},
599599+ {:clear_range, special_byte_sample_prefix() <> start_key, special_byte_sample_prefix() <> end_key},
600600+ ]
601601+ MutationLog.append(state.kv.mutation_log, version, mutations)
602602+ ByteSample.delete_range(state.byte_sample, start_key, end_key)
617603618618- %State{state | shard_clears: [sc | state.shard_clears]}
604604+ :ok
619605 end
620606621607 defp fetch_import_by_nonce(%State{} = state, nonce) when is_reference(nonce) do
622622- case Map.values(state.imports) |> Enum.find(fn %ShardImport{} = import -> import.nonce == nonce end) do
623623- %ShardImport{} = import -> {:ok, import}
624624- nil -> :error
608608+ find_import_by(state, &(&1.nonce == nonce))
609609+ end
610610+611611+ defp fetch_import_by_start_key(%State{} = state, start_key) when is_binary(start_key) do
612612+ find_import_by(state, &(&1.start_key == start_key))
613613+ end
614614+615615+ defp find_import_by(%State{} = state, fun) when is_function(fun, 1) do
616616+ case Map.values(state.imports) |> Enum.filter(fun) do
617617+ [%ShardImport{} = si] -> {:ok, si}
618618+ [] -> :error
625619 end
626620 end
627621628628- defp begin_import_shard(version, start_key, end_key, %State{} = state)
622622+ defp begin_import(version, start_key, end_key, %State{} = state)
629623 when is_integer(version) and is_binary(start_key) do
630624 shard_import = %ShardImport{
631625 id: make_ref(),
···643637 put_in(state.imports[shard_import.id], shard_import)
644638 end
645639646646- defp remove_import(start_key, cancelled?, %State{} = state) when is_binary(start_key) and is_boolean(cancelled?) do
647647- [%ShardImport{} = si] = Map.values(state.imports) |> Enum.filter(&(&1.start_key == start_key))
648648-649649- if cancelled? do
650650- %ShardImport{start_key: start_key, end_key: end_key} = si
651651- HybridKV.nuke_range(state.kv, start_key, end_key)
652652- # TODO: clear byte sample
653653- end
654654-655655- %State{state | imports: Map.delete(state.imports, si.id)}
640640+ defp remove_import(start_key, %State{} = state) when is_binary(start_key) do
641641+ {:ok, %ShardImport{} = shard_import} = fetch_import_by_start_key(state, start_key)
642642+ %{state | imports: Map.delete(state.imports, shard_import.id)}
656643 end
657644658645 defp tick_import(%ShardImport{} = shard_import, %State{} = state) do