···1919 :ets.new(__MODULE__, [:ordered_set, :private])
2020 end
21212222+ @spec destroy(t) :: :ok
2323+ def destroy(table) do
2424+ :ets.delete(table)
2525+ :ok
2626+ end
2727+2228 @spec put(t, binary, term) :: :ok
2329 def put(table, start_key, value) when is_database_key(start_key) do
2430 :ets.insert(table, {start_key, value})
+39-48
lib/servers/manager.ex
···3344 import ExUnit.Assertions, only: [assert: 1]
5566- alias Hobbes.MetaStore
66+ alias Hobbes.{MetaStore, ShardTagMap}
77 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, TLogStatus, LogBatch}
88 alias Hobbes.Servers.{Coordinator, ServerSupervisor, TLog}
99···282282 |> recruit(gen, Hobbes.Servers.Resolver, [resolver_id], %{cluster: cluster, prev_version: 1})
283283 |> recruit(gen, Hobbes.Servers.Distributor, [distributor_id], %{cluster: cluster})
284284285285- seed_meta_store(state, meta_pairs)
285285+ seed_meta_store(state, first_tlog_generation, meta_pairs)
286286287287 # Write the first generation into the Coordinators
288288 # Once complete, any future recoveries will have to start from these TLogs
···334334 state = recruit(state, gen, Hobbes.Servers.TLog, tlog_ids, %{cluster: state.cluster, meta_pairs: meta_pairs})
335335336336 # Copy mutations in range [max_kcv + 1, min_dv] to the new TLogs
337337- {state, last_batch_version} = copy_mutations_to_new_generation(state, prev_tlog_generation, new_tlog_generation, meta_pairs, new_tlog_generation.start_version, min_dv)
337337+ shard_map = ShardTagMap.new(new_tlog_generation)
338338+ ShardTagMap.load_meta_pairs(shard_map, meta_pairs)
339339+340340+ {state, last_batch_version} = copy_mutations_to_new_generation(state, prev_tlog_generation, new_tlog_generation, meta_pairs, shard_map, new_tlog_generation.start_version, min_dv)
338341 if last_batch_version != nil, do: assert last_batch_version > max_kcv and last_batch_version <= min_dv
339342340343 # If there are no batches on the TLogs in the range (max_kcv, min_dv],
···352355 recovery_commit_version = max(new_tlog_generation.start_version, last_batch_version + 1)
353356 state = write_recovery_commit(state, new_tlog_generation, last_batch_version, recovery_commit_version)
354357 #dbg {last_batch_version, recovery_commit_version}
358358+359359+ # TODO: CommitBuffers do not currently receive the meta mutations which were copied, which will break correctness
355360356361 # Recruit the rest of the new transaction system
357362 state =
···379384 |> Enum.sort()
380385 end
381386382382- defp copy_mutations_to_new_generation(%State{} = state, %TLogGeneration{} = old_gen, %TLogGeneration{} = new_gen, meta_pairs, start_version, end_version) do
387387+ defp copy_mutations_to_new_generation(%State{} = state, %TLogGeneration{} = old_gen, %TLogGeneration{} = new_gen, meta_pairs, %ShardTagMap{} = shard_map, start_version, end_version) do
383388 tags = extract_tags(meta_pairs)
384389385390 recovered_old_tlog_pids =
···409414 end)
410415 |> merge_batches()
411416 |> then(fn batches ->
417417+ # We lie about the first prev_version being 0 since that's what new TLogs expect
412418 [{0, []} | batches]
413419 end)
414420 |> Enum.chunk_every(2, 1, :discard)
···420426 }
421427 end)
422428423423- meta_store = MetaStore.new()
424424- MetaStore.load(meta_store, meta_pairs)
425425-426429 Enum.each(batches, fn batch ->
427427- write_batch_to_tlogs(state.cluster, new_gen, batch, meta_store)
430430+ write_batch_to_tlogs(state.cluster, new_gen, batch, shard_map)
428431 end)
429432430433 last_batch_version =
···436439 {state, last_batch_version}
437440 end
438441439439- defp write_batch_to_tlogs(%Cluster{} = cluster, %TLogGeneration{} = tlog_gen, batch, meta_store) when is_map(batch) do
440440- %{prev_version: prev_version, commit_version: commit_version, mutations: mutations} = batch
442442+ defp write_batch_to_tlogs(%Cluster{} = cluster, %TLogGeneration{} = tlog_gen, batch, %ShardTagMap{} = shard_map) when is_map(batch) do
443443+ %{prev_version: prev_version, commit_version: commit_version} = batch
441444442442- # Apply meta mutations from the batch first
443443- meta_mutations =
444444- mutations
445445- |> Enum.filter(fn {_i, mut} -> meta_mutation?(mut) end)
446446- |> Enum.map(fn {_i, mut} -> mut end)
447447- MetaStore.apply_meta_mutations(meta_store, commit_version, meta_mutations)
445445+ # Unwrap mutations
446446+ mutations = Enum.map(batch.mutations, fn {_i, mut} -> mut end)
448447449449- tagged_mutations =
450450- mutations
451451- |> Enum.map(fn {_i, mut} = num_mut ->
452452- tags = MetaStore.get_key_server_mutation_tags(meta_store, commit_version, mutation_key(mut))
453453- {tags, num_mut}
454454- end)
455455-456456- sliced_mutations = slice_mutations_for_tlogs(tagged_mutations, tlog_gen.tlog_ids, 3)
448448+ ShardTagMap.apply_metadata_mutations(shard_map, Enum.filter(mutations, &meta_mutation?/1))
449449+ sliced_mutations = ShardTagMap.tag_and_slice_mutations(shard_map, mutations)
457450458451 tlog_gen.tlog_ids
459452 |> Enum.map(fn tlog_id ->
···513506 end)
514507 end
515508516516- defp seed_meta_store(%State{} = state, meta_pairs) when is_list(meta_pairs) do
517517- # Create temporary MetaStore
518518- # TODO: destroy after
519519- meta_store = MetaStore.new()
520520- MetaStore.apply_meta_mutations(meta_store, 0, Enum.map(meta_pairs, fn {k, v} -> {:write, k, v} end))
509509+ defp seed_meta_store(%State{} = state, %TLogGeneration{} = tlog_gen, meta_pairs) when is_list(meta_pairs) do
510510+ shard_map = ShardTagMap.new(tlog_gen)
511511+ ShardTagMap.load_meta_pairs(shard_map, meta_pairs)
521512522522- tagged_mutations =
513513+ sliced_mutations =
523514 meta_pairs
524515 |> Enum.map(fn {k, v} -> {:write, k, v} end)
525516 |> then(fn mutations ->
526517 mutations ++ compute_special_mutations(mutations)
527518 end)
528528- |> Enum.with_index()
529529- |> Enum.map(fn {{:write, k, v}, i} ->
530530- # Note: we remove the meta tag because TLogs are already directly seeded with these pairs
531531- tags = MetaStore.get_key_server_mutation_tags(meta_store, 0, k) |> List.delete(meta_tag())
532532- {tags, {i, {:write, k, v}}}
533533- end)
519519+ |> then(&ShardTagMap.tag_and_slice_mutations(shard_map, &1))
534520535535- batch = %LogBatch{
536536- commit_buffer_id: nil,
537537- commit_version: 1,
538538- prev_commit_version: 0,
539539- tagged_mutations: tagged_mutations,
540540- last_committed_version: 0,
541541- }
521521+ ShardTagMap.destroy(shard_map)
542522543523 # Seed the meta mutations directly into the TLogs with a manual "transaction" at version=1
544524 #
545545- # Send to all tlogs for simplicity (instead of choosing the correct ones)
546546- # Storage servers pop from all tlogs anyway
547547- get_servers(state.cluster, Hobbes.Servers.TLog)
548548- |> Enum.map(fn %Server{pid: pid} ->
549549- TLog.write_batch_send(pid, batch)
525525+ # Note: Meta pairs will be sent to TLogs again even though they were already seeded at init
526526+ # (this should be fine due to idempotence)
527527+ tlog_gen.tlog_ids
528528+ |> Enum.map(fn tlog_id ->
529529+ batch = %LogBatch{
530530+ commit_buffer_id: nil,
531531+ commit_version: 1,
532532+ prev_commit_version: 0,
533533+ tagged_mutations: Map.fetch!(sliced_mutations, tlog_id),
534534+ last_committed_version: 0,
535535+ }
536536+537537+ %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, tlog_id)
538538+ TLog.write_batch_send(tlog_pid, batch)
550539 end)
551540 |> Enum.each(fn req_id ->
552541 :ok = TLog.write_batch_receive(req_id)
553542 end)
543543+544544+ :ok
554545 end
555546556547 @spec allocate_server_ids(State.t, non_neg_integer) :: [non_neg_integer]
+18
lib/shard_tag_map.ex
···2323 }
2424 end
25252626+ @spec destroy(t) :: :ok
2727+ def destroy(%ShardTagMap{} = stm) do
2828+ DenseShardMap.destroy(stm.shard_map)
2929+ :ok
3030+ end
3131+3232+ @spec load_meta_pairs(t, [{binary, binary}]) :: :ok
3333+ def load_meta_pairs(%ShardTagMap{} = stm, pairs) when is_list(pairs) do
3434+ # We could skip wrapping the mutations and load them directly but this is simpler
3535+ # (this function is only called on the recovery path)
3636+ apply_metadata_mutations(stm, Enum.map(pairs, fn {k, v} -> {:write, k, v} end))
3737+ end
3838+2639 @spec apply_metadata_mutations(t, [Utils.mutation]) :: :ok
2740 def apply_metadata_mutations(%ShardTagMap{} = stm, mutations) when is_list(mutations) do
2841 %{shard_map: shard_map, tlog_ids: tlog_ids, replication_factor: rf} = stm
···116129117130 false -> ids
118131 end
132132+ end
133133+134134+ @doc false
135135+ def dump(%ShardTagMap{} = stm) do
136136+ DenseShardMap.dump(stm.shard_map)
119137 end
120138end