this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Tag shards by team instead of by server

garrison 35a9b12a febfe410

+139 -115
+5 -4
lib/servers/commit_buffer.ex
··· 102 102 end 103 103 end 104 104 105 - def init(%{id: id, cluster: %Cluster{} = cluster, key_storage_pairs: key_storage_pairs}) do 105 + def init(%{id: id, cluster: %Cluster{} = cluster, storage_teams_pairs: storage_teams_pairs, key_storage_pairs: key_storage_pairs}) do 106 106 %TLogGeneration{} = current_generation = hd(cluster.tlog_generations) 107 107 assert current_generation.generation == cluster.generation 108 108 ··· 119 119 storage_servers: %{}, 120 120 } 121 121 122 + ShardTagMap.load_meta_pairs(state.shard_map, storage_teams_pairs) 122 123 ShardTagMap.load_meta_pairs(state.shard_map, key_storage_pairs) 123 124 124 125 SimServer.send_after(self(), :flush, @flush_interval_ms) ··· 132 133 shard_lists = 133 134 Enum.map(keys_or_ranges, fn key_or_range -> 134 135 ShardTagMap.shards_for_key_or_range(stm, key_or_range) 135 - |> Enum.map(fn {sk, ek, {_tlogs, _all_tags, from_tags}} -> 136 - pids = Enum.map(from_tags, &Map.get(storage_servers, &1)) 137 - {sk, ek, {from_tags, pids}} 136 + |> Enum.map(fn {sk, ek, storage_server_ids} -> 137 + storage_pids = Enum.map(storage_server_ids, &Map.get(storage_servers, &1)) 138 + {sk, ek, {storage_server_ids, storage_pids}} 138 139 end) 139 140 end) 140 141
+42 -24
lib/servers/manager.ex
··· 405 405 # Create seed metadata needed by servers 406 406 meta_pairs = build_seed_meta(config, ids.storage) 407 407 408 + meta_kv = FlatKV.new() 409 + FlatKV.load(meta_kv, meta_pairs) 410 + 408 411 # Recruit storage 409 - state = recruit_storage(state, ids.storage, supervisors_storage) 412 + state = recruit_storage(state, ids.storage, supervisors_storage, meta_kv) 410 413 411 414 # Recruit TLogs 412 415 state = recruit_tlogs(state, ids.tlog, supervisors_tlogs, 0, meta_pairs) 413 416 414 417 # Recruit stateless servers 415 - meta_kv = FlatKV.new() 416 - FlatKV.load(meta_kv, meta_pairs) 417 418 prev_version = 1 418 - 419 419 state = recruit_stateless(state, ids, supervisors_stateless, prev_version, meta_kv) 420 420 421 421 # Seed the meta pairs into the database ··· 649 649 state 650 650 end 651 651 652 - defp recruit_storage(%State{} = state, storage_ids, supervisor_slots) 652 + defp recruit_storage(%State{} = state, storage_ids, supervisor_slots, meta_kv) 653 653 when is_list(storage_ids) and is_list(supervisor_slots) do 654 + storage_server_team_lookup = 655 + FlatKV.scan(meta_kv, storage_teams_prefix(), storage_teams_end()).pairs 656 + |> Enum.reduce(%{}, fn {storage_teams_prefix() <> k, v}, acc -> 657 + # TODO: utils 658 + [team_id] = Keyset.unpack(k) 659 + server_ids = Keyset.unpack(v) 660 + Enum.reduce(server_ids, acc, fn id, acc -> 661 + Map.put(acc, id, team_id) 662 + end) 663 + end) 664 + 654 665 storage_ids 655 - |> Enum.map(fn id -> {Hobbes.Servers.Storage, id, %{cluster: state.cluster}} end) 666 + |> Enum.map(fn id -> {Hobbes.Servers.Storage, id, %{cluster: state.cluster, storage_team_id: Map.fetch!(storage_server_team_lookup, id)}} end) 656 667 |> recruit_servers(supervisor_slots, state.cluster.generation) 657 668 |> put_servers_in_cluster(state) 658 669 end ··· 683 694 [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: prev_version}}], 684 695 [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, storage_teams_pairs: storage_teams_pairs, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 685 696 Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 686 - Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_storage_pairs: key_storage_pairs}} end), 697 + Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, storage_teams_pairs: storage_teams_pairs, key_storage_pairs: key_storage_pairs}} end), 687 698 ] 688 699 |> Enum.concat() 689 700 |> recruit_servers(supervisor_slots, cluster.generation) ··· 833 844 |> Enum.reverse() 834 845 end 835 846 836 - defp build_seed_meta(%Config{} = config, storage_server_ids) do 837 - num_storage_teams = div(length(storage_server_ids), config.num_replicas) 847 + defp build_seed_meta(%Config{num_replicas: num_replicas} = config, storage_server_ids) do 848 + num_storage_teams = div(length(storage_server_ids), num_replicas) 838 849 storage_team_ids = Enum.to_list(0..(num_storage_teams - 1)) 839 850 840 - storage_pairs = build_storage_meta(config, storage_team_ids, storage_server_ids) 841 - shard_pairs = build_shard_meta(config, storage_team_ids) 851 + storage_server_chunks = Enum.chunk_every(storage_server_ids, num_replicas, num_replicas, :discard) 852 + assert length(storage_server_chunks) == length(storage_team_ids) 853 + 854 + storage_teams = Enum.zip(storage_team_ids, storage_server_chunks) 855 + 856 + storage_pairs = build_storage_meta(config, storage_teams, storage_server_ids) 857 + shard_pairs = build_shard_meta(config, storage_teams) 842 858 843 859 other_pairs = [ 844 860 {next_shard_move_id_key(), Keyset.pack([0])}, ··· 847 863 Enum.sort(storage_pairs ++ shard_pairs ++ other_pairs) 848 864 end 849 865 850 - defp build_storage_meta(%Config{num_replicas: num_replicas}, storage_team_ids, storage_server_ids) do 866 + defp build_storage_meta(%Config{num_replicas: num_replicas}, storage_teams, storage_server_ids) do 851 867 server_tag_pairs = 852 868 Enum.map(storage_server_ids, fn id -> 853 869 { ··· 857 873 } 858 874 end) 859 875 860 - storage_server_chunks = Enum.chunk_every(storage_server_ids, num_replicas, num_replicas, :discard) 861 - assert length(storage_server_chunks) == length(storage_team_ids) 862 - 863 876 storage_teams_pairs = 864 - Enum.zip(storage_team_ids, storage_server_chunks) 865 - |> Enum.map(fn {team_id, server_ids} -> 877 + Enum.map(storage_teams, fn {team_id, server_ids} -> 866 878 assert length(server_ids) == num_replicas 867 879 pack_storage_team_pair(team_id, server_ids) 868 880 end) 869 881 870 882 assert length(storage_teams_pairs) > 0 871 - 872 883 server_tag_pairs ++ storage_teams_pairs 873 884 end 874 885 875 - defp build_shard_meta(%Config{initial_shards: shards}, storage_team_ids) do 886 + defp build_shard_meta(%Config{initial_shards: shards}, storage_teams) do 887 + storage_team_count = length(storage_teams) 888 + 876 889 shards 877 890 |> Enum.with_index() 878 - |> Enum.map(fn {shard_key, i} -> 879 - team_id = Enum.at(storage_team_ids, rem(i, length(storage_team_ids))) 880 - _end_key = Enum.at(shards, i + 1, all_keys_end()) 891 + |> Enum.map(fn {start_key, i} -> 892 + {team_id, _server_ids} = Enum.at(storage_teams, rem(i, storage_team_count)) 893 + end_key = Enum.at(shards, i + 1, all_keys_end()) 881 894 882 - key_storage_pair = pack_key_storage_pair(shard_key, team_id, nil) 883 - [key_storage_pair] 895 + key_storage_pair = pack_key_storage_pair(start_key, team_id, nil) 896 + storage_keys_pair = { 897 + storage_keys_prefix() <> pack_server_keys_key(team_id, start_key), 898 + pack_server_keys_value(nil, end_key, "complete"), 899 + } 900 + 901 + [key_storage_pair, storage_keys_pair] 884 902 end) 885 903 |> Enum.concat() 886 904 end
+40 -76
lib/servers/storage.ex
··· 40 40 defmodule State do 41 41 @type t :: %__MODULE__{ 42 42 id: non_neg_integer, 43 + storage_team_id: non_neg_integer, 43 44 cluster: Cluster.t, 44 45 45 46 started: boolean, ··· 47 48 durable_version: integer, 48 49 49 50 peek_nonce: reference, 50 - peek_results: %{non_neg_integer => nil}, 51 - buddy_tlog_failed: boolean, 52 51 53 52 kv: term, 54 53 shard_map: SparseShardMap.t, ··· 58 57 } 59 58 @enforce_keys [ 60 59 :id, 60 + :storage_team_id, 61 61 :cluster, 62 62 63 63 :started, ··· 65 65 :durable_version, 66 66 67 67 :peek_nonce, 68 - :peek_results, 69 - :buddy_tlog_failed, 68 + :peek_tlog_i, 70 69 71 70 :kv, 72 71 :shard_map, ··· 141 140 end 142 141 end 143 142 144 - def init(%{cluster: %Cluster{} = cluster, path: path, id: id}) do 143 + def init(%{cluster: %Cluster{} = cluster, path: path, id: id, storage_team_id: storage_team_id}) do 145 144 # Init new storage server 146 145 assert is_integer(id) 147 146 path = case SimServer.simulated?() do ··· 152 151 153 152 # Write id to storage and commit 154 153 HybridKV.put_storage(kv, special_id_key(), Keyset.pack([id])) 154 + HybridKV.put_storage(kv, special_storage_team_id_key(), Keyset.pack([storage_team_id])) 155 155 HybridKV.commit(kv) 156 156 157 157 do_init(cluster, kv) ··· 167 167 168 168 def do_init(%Cluster{} = cluster, %HybridKV{} = kv) do 169 169 [id] = HybridKV.get(kv, 1, special_id_key()) |> Keyset.unpack() 170 + [storage_team_id] = HybridKV.get(kv, 1, special_storage_team_id_key()) |> Keyset.unpack() 170 171 assert is_integer(id) 172 + assert is_integer(storage_team_id) 171 173 172 174 # We are reading from storage here no matter what so the read at version=1 is fine 173 175 startup_version = String.to_integer(HybridKV.get(kv, 1, special_prefix() <> "durable_version") || "0") 174 176 175 177 state = %State{ 176 178 id: id, 179 + storage_team_id: storage_team_id, 177 180 cluster: cluster, 178 181 179 182 started: false, ··· 181 184 durable_version: startup_version, 182 185 183 186 peek_nonce: nil, 184 - peek_results: %{}, 185 - buddy_tlog_failed: false, 187 + # TODO: would be good to randomize this but we don't know the replication factor here 188 + peek_tlog_i: 0, 186 189 187 190 kv: kv, 188 191 shard_map: SparseShardMap.new(), ··· 193 196 194 197 # Load the shard map and imports from the special keyspace 195 198 shard_map = state.shard_map 196 - %RangeResult{pairs: sk_pairs} = HybridKV.scan(kv, 1, special_server_keys_prefix(), special_server_keys_end()) 199 + %RangeResult{pairs: sk_pairs} = HybridKV.scan(kv, 1, special_storage_keys_prefix(), special_storage_keys_end()) 197 200 198 201 state = 199 202 Enum.reduce(sk_pairs, state, fn {k, v}, state -> 200 - special_server_keys_prefix() <> k_rest = k 201 - [_server_id, start_key] = unpack_server_keys_key(k_rest) 203 + special_storage_keys_prefix() <> k_rest = k 204 + [team_id, start_key] = unpack_server_keys_key(k_rest) 205 + assert team_id == state.storage_team_id 202 206 203 207 case unpack_server_keys_value(v) do 204 208 [_sm, end_key, "complete"] -> ··· 328 332 end 329 333 330 334 def handle_info(:peek_retry, %State{} = state) do 331 - {:noreply, peek_logs(state)} 335 + {:noreply, peek_logs(state, false)} 332 336 end 333 337 334 338 def handle_info({:peek_timeout, nonce}, %State{} = state) when nonce == state.peek_nonce do 335 - state = %{state | buddy_tlog_failed: true} 336 - {:noreply, peek_logs(state)} 339 + {:noreply, peek_logs(state, true)} 337 340 end 338 341 339 342 def handle_info({:peek_timeout, _nonce}, %State{} = state) do ··· 465 468 %State{state | durable_version: flush_version} 466 469 end 467 470 468 - defp peek_logs(%State{} = state) when state.cluster.status != :normal do 471 + defp peek_logs(%State{} = state, _rotate_tlog?) when state.cluster.status != :normal do 469 472 # If we are waiting for recovery, just keep trying 470 473 # TODO: can we peek *during* a recovery? 471 474 SimServer.send_after(self(), :peek_retry, 100) 472 475 state 473 476 end 474 477 475 - defp peek_logs(%State{} = state) do 478 + defp peek_logs(%State{} = state, rotate_tlog?) do 476 479 assert state.cluster.status == :normal 477 480 478 - nonce = make_ref() 479 - tag = state.id 480 481 start_version = state.data_version + 1 481 482 482 483 %TLogGeneration{} = generation = tlog_generation_for_version(state.cluster.tlog_generations, start_version) ··· 487 488 assert end_version >= start_version 488 489 end 489 490 490 - buddy_server = Map.get(state.cluster.servers, buddy_tlog_id(generation.tlog_ids, state.id)) 491 + state = case rotate_tlog? do 492 + true -> %{state | peek_tlog_i: rem(state.peek_tlog_i + 1, generation.replication_factor)} 493 + false -> state 494 + end 495 + # TODO: this will be tripped if we allow decreasing the replication_factor between generations 496 + assert state.peek_tlog_i < generation.replication_factor 491 497 492 - case state.buddy_tlog_failed or is_nil(buddy_server) do 493 - true -> 494 - # Gather all known tlog pids to peek 495 - generation.tlog_ids 496 - |> Enum.reduce([], fn id, acc -> 497 - case Map.fetch(state.cluster.servers, id) do 498 - {:ok, %Server{pid: pid}} -> [pid | acc] 499 - :error -> acc 500 - end 501 - end) 502 - false -> 503 - # Peek buddy tlog 504 - %Server{pid: pid} = buddy_server 505 - [pid] 498 + tlog_team_count = div(length(generation.tlog_ids), generation.replication_factor) 499 + tlog_team_i = rem(state.storage_team_id, tlog_team_count) 500 + tlog_id = Enum.at(generation.tlog_ids, (tlog_team_i * generation.replication_factor) + state.peek_tlog_i) 501 + 502 + tlog_server = Map.get(state.cluster.servers, tlog_id) 503 + 504 + nonce = make_ref() 505 + tag = state.storage_team_id 506 + assert is_integer(tag) 507 + 508 + if tlog_server do 509 + TLog.peek(tlog_server.pid, nonce, tag, start_version, end_version) 506 510 end 507 - |> Enum.each(fn pid -> 508 - TLog.peek(pid, nonce, tag, start_version, end_version) 509 - end) 510 511 511 512 SimServer.send_after self(), {:peek_timeout, nonce}, 300 512 - %{state | peek_nonce: nonce, peek_results: %{}} 513 + %{state | peek_nonce: nonce} 513 514 end 514 515 515 516 defp on_peek_result(%State{} = state, %PeekResult{} = result) do 516 517 assert result.nonce == state.peek_nonce 517 - 518 518 # TODO: maybe better to store start_version in state on peek instead of getting it back in tlog response? 519 519 %TLogGeneration{} = generation = tlog_generation_for_version(state.cluster.tlog_generations, result.start_version) 520 520 assert result.tlog_id in generation.tlog_ids 521 521 522 - case result.tlog_id == buddy_tlog_id(generation.tlog_ids, state.id) do 523 - true -> 524 - state = %{state | buddy_tlog_failed: false} 525 - apply_buddy_peek_result(state, result) 526 - 527 - false -> 528 - state = put_in(state.peek_results[result.tlog_id], result) 529 - case map_size(state.peek_results) > (length(generation.tlog_ids) - (generation.replication_factor - 1)) do 530 - true -> apply_merged_peek_results(state) 531 - false -> state 532 - end 533 - end 534 - end 535 - 536 - defp apply_buddy_peek_result(%State{} = state, %PeekResult{} = result) do 537 522 assert result.start_version == (state.data_version + 1) 538 523 assert result.end_version >= state.data_version 539 524 540 525 state = apply_batches(state, result.batches) 541 526 SimServer.send_after self(), :peek_retry, 1 542 527 %{state | data_version: result.end_version, peek_nonce: nil} 543 - end 544 - 545 - defp apply_merged_peek_results(%State{} = state) do 546 - results = state.peek_results |> Map.values() |> Enum.sort_by(&(&1.tlog_id)) 547 - 548 - Enum.each(results, fn %PeekResult{} = result -> 549 - assert result.start_version == (state.data_version + 1) 550 - assert result.end_version >= state.data_version 551 - end) 552 - 553 - batches = 554 - results 555 - |> Enum.map(fn %PeekResult{} = result -> result.batches end) 556 - |> merge_batches() 557 - 558 - min_end_version = results |> Enum.map(fn %PeekResult{} = result -> result.end_version end) |> Enum.min() 559 - batches = Enum.filter(batches, fn {version, _mutations} -> version <= min_end_version end) 560 - 561 - state = apply_batches(state, batches) 562 - SimServer.send_after self(), :peek_retry, 1 563 - %{state | data_version: min_end_version, peek_nonce: nil} 564 528 end 565 529 566 530 defp apply_batches(%State{} = state, batches) when is_list(batches) do ··· 609 573 end) 610 574 end 611 575 612 - defp handle_special_diff(special_server_keys_prefix() <> rest, old_value, new_value, version, %State{} = state) do 613 - [id, start_key] = unpack_server_keys_key(rest) 614 - assert id == state.id 576 + defp handle_special_diff(special_storage_keys_prefix() <> rest, old_value, new_value, version, %State{} = state) do 577 + [team_id, start_key] = unpack_server_keys_key(rest) 578 + assert team_id == state.storage_team_id 615 579 616 580 old_unpacked = old_value && unpack_server_keys_value(old_value) 617 581 new_unpacked = new_value && unpack_server_keys_value(new_value)
+33 -6
lib/shard_tag_map.ex
··· 1 1 defmodule Hobbes.ShardTagMap do 2 2 alias Hobbes.{DenseShardMap, ShardTagMap, Utils} 3 3 alias Hobbes.Structs.TLogGeneration 4 + alias Hobbes.Encoding.Keyset 4 5 5 6 import Hobbes.Utils 6 7 ··· 8 9 tlog_ids: [non_neg_integer], 9 10 replication_factor: non_neg_integer, 10 11 shard_map: DenseShardMap.t, 12 + storage_lookup: :ets.table, 11 13 } 12 - @enforce_keys [:tlog_ids, :replication_factor, :shard_map] 14 + @enforce_keys [:tlog_ids, :replication_factor, :shard_map, :storage_lookup] 13 15 defstruct @enforce_keys 14 16 15 17 @spec new(TLogGeneration.t) :: t ··· 18 20 tlog_ids: generation.tlog_ids, 19 21 replication_factor: generation.replication_factor, 20 22 shard_map: DenseShardMap.new(), 23 + storage_lookup: :ets.new(__MODULE__, [:set, :private]), 21 24 } 22 25 end 23 26 ··· 36 39 37 40 @spec apply_metadata_mutations(t, [Utils.mutation]) :: :ok 38 41 def apply_metadata_mutations(%ShardTagMap{} = stm, mutations) when is_list(mutations) do 39 - %{shard_map: shard_map, tlog_ids: tlog_ids, replication_factor: rf} = stm 42 + %ShardTagMap{ 43 + tlog_ids: tlog_ids, 44 + replication_factor: rf, 45 + shard_map: shard_map, 46 + storage_lookup: storage_lookup, 47 + } = stm 40 48 tlog_teams = Enum.chunk_every(tlog_ids, rf, rf, :discard) 41 49 tlog_team_count = length(tlog_teams) 42 50 ··· 65 73 {:clear, key_storage_prefix() <> start_key} -> 66 74 DenseShardMap.delete(shard_map, start_key) 67 75 76 + {:write, storage_teams_prefix() <> team_enc, value} -> 77 + # TODO: utils 78 + [storage_team_id] = Keyset.unpack(team_enc) 79 + storage_server_ids = Keyset.unpack(value) 80 + :ets.insert(storage_lookup, {storage_team_id, storage_server_ids}) 81 + 82 + {:clear, storage_teams_prefix() <> team_enc} -> 83 + [storage_team_id] = Keyset.unpack(team_enc) 84 + :ets.delete(storage_lookup, storage_team_id) 85 + 68 86 _mut -> :noop 69 87 end) 70 88 ··· 95 113 mut, {acc, i} -> 96 114 {tlogs, tags} = 97 115 case mutation_key(mut) do 98 - special_server_keys_prefix() <> rest -> 99 - # Special server_keys mutations are sent only to that particular Storage server 100 - [id, _sk] = unpack_server_keys_key(rest) 101 - tags = [id] 116 + special_storage_keys_prefix() <> rest -> 117 + # Special storage_keys mutations are sent only to that particular Storage team 118 + [team_id, _sk] = unpack_server_keys_key(rest) 119 + tags = [team_id] 102 120 # Send to all TLogs for simplicity (shard moves should never be a bottleneck) 103 121 {all_tlog_ids, tags} 104 122 ··· 129 147 @spec shards_for_key_or_range(t, binary | {binary, binary}) :: [{binary, binary, {list, Utils.tag_list, Utils.tag_list}}] 130 148 def shards_for_key_or_range(%ShardTagMap{} = stm, key) when is_binary(key) do 131 149 [DenseShardMap.shard_for_key(stm.shard_map, key)] 150 + |> lookup_servers_for_shards(stm.storage_lookup) 132 151 end 133 152 134 153 def shards_for_key_or_range(%ShardTagMap{} = stm, {start_key, end_key}) do 135 154 DenseShardMap.shards_for_range(stm.shard_map, start_key, end_key) 155 + |> lookup_servers_for_shards(stm.storage_lookup) 156 + end 157 + 158 + defp lookup_servers_for_shards(shards, storage_lookup) do 159 + Enum.map(shards, fn {sk, ek, {_tlog_ids, _all_storage_team_ids, [from_storage_team_id]}} -> 160 + [{_key, storage_server_ids}] = :ets.lookup(storage_lookup, from_storage_team_id) 161 + {sk, ek, storage_server_ids} 162 + end) 136 163 end 137 164 138 165 @doc false
+17 -5
lib/utils.ex
··· 32 32 33 33 defmacro key_storage_prefix, do: "\xFF/ks/" 34 34 defmacro key_storage_end, do: "\xFF/ks0" 35 + # TODO: rename to "sk/" once server_keys has been removed 36 + defmacro storage_keys_prefix, do: "\xFF/stk/" 37 + defmacro storage_keys_end, do: "\xFF/stk0" 35 38 36 39 defmacro key_servers_prefix, do: "\xFF/key_servers/" 37 40 defmacro key_servers_end, do: "\xFF/key_servers0" ··· 44 47 defmacro special_prefix, do: "\xFF\xFF" 45 48 defmacro special_end, do: "\xFF\xFF\xFF\xFF" 46 49 50 + # TODO: rename to "sk/" once server_keys has been removed 51 + defmacro special_storage_keys_prefix, do: "\xFF\xFF/stk/" 52 + defmacro special_storage_keys_end, do: "\xFF\xFF/stk0" 53 + 47 54 defmacro special_server_keys_prefix, do: "\xFF\xFF/sk/" 48 55 defmacro special_server_keys_end, do: "\xFF\xFF/sk0" 49 56 ··· 53 60 defmacro special_byte_sample_prefix, do: "\xFF\xFF/bs/" 54 61 defmacro special_byte_sample_end, do: "\xFF\xFF/bs0" 55 62 56 - defmacro next_shard_move_id_key, do: "\xFF/next_shard_move_id" 57 63 defmacro special_id_key, do: "\xFF\xFF/id" 64 + defmacro special_storage_team_id_key, do: special_prefix() <> "/storage_team_id" 65 + 66 + defmacro next_shard_move_id_key, do: "\xFF/next_shard_move_id" 58 67 59 68 def meta_tag, do: -1 60 69 ··· 86 95 def meta_mutation?({:clear_range, _sk, _ek}), do: false 87 96 88 97 @spec special_mutation?(mutation) :: boolean 98 + def special_mutation?({:write, special_storage_keys_prefix() <> _key, _value}), do: true 89 99 def special_mutation?({:write, special_server_keys_prefix() <> _key, _value}), do: true 90 100 def special_mutation?({:write, _key, _value}), do: false 91 101 102 + def special_mutation?({:clear, special_storage_keys_prefix() <> _key}), do: true 92 103 def special_mutation?({:clear, special_server_keys_prefix() <> _key}), do: true 93 104 def special_mutation?({:clear, _key}), do: false 94 105 ··· 104 115 } 105 116 end 106 117 118 + @spec pack_key_storage_pair(binary, non_neg_integer, non_neg_integer | nil) :: {binary, binary} 107 119 def pack_key_storage_pair(shard_start_key, from_storage_team_id, to_storage_team_id) 108 120 when is_binary(shard_start_key) and is_integer(from_storage_team_id) and (is_integer(to_storage_team_id) or is_nil(to_storage_team_id)) do 109 121 { ··· 189 201 190 202 def compute_special_mutations(mutations) when is_list(mutations) do 191 203 Enum.reduce(mutations, [], fn 192 - {:write, server_keys_prefix() <> k, v}, acc -> 193 - [{:write, special_server_keys_prefix() <> k, v} | acc] 204 + {:write, storage_keys_prefix() <> k, v}, acc -> 205 + [{:write, special_storage_keys_prefix() <> k, v} | acc] 194 206 195 - {:clear, server_keys_prefix() <> k}, acc -> 196 - [{:clear, special_server_keys_prefix() <> k} | acc] 207 + {:clear, storage_keys_prefix() <> k}, acc -> 208 + [{:clear, special_storage_keys_prefix() <> k} | acc] 197 209 198 210 _, acc -> acc 199 211 end)
+2
test/hobbes_test.exs
··· 95 95 defmodule CycleShardMoveWorkloadTest do 96 96 use ExUnit.Case, async: true 97 97 @tag :cycle_shard_move 98 + # TODO: re-enable after shard moves are redesigned 99 + @tag :disable 98 100 test "Cycle and ShardMove", %{test: test} do 99 101 Workloads.run([ 100 102 {Workloads.Cycle, [