this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use keyset encoding for key_servers value

garrison ca20d369 67c6a3ce

+26 -30
+2 -7
lib/servers/distributor.ex
··· 3 3 4 4 import ExUnit.Assertions, only: [assert: 1] 5 5 6 - alias Hobbes.{Transaction, MetaStore, ShardInfoMap} 6 + alias Hobbes.{Transaction, ShardInfoMap} 7 7 alias Hobbes.ShardInfoMap.Shard 8 8 alias Hobbes.Structs.{Cluster, Server, ShardStats} 9 9 alias Hobbes.Servers.Storage ··· 357 357 end 358 358 359 359 defp to_ks_pair(%Shard{} = shard) do 360 - to_server_ids = case shard.to_server_ids do 361 - [] -> nil 362 - list -> list 363 - end 364 - 365 360 key = key_servers_prefix() <> shard.start_key 366 - value = MetaStore.encode_key_servers(shard.from_server_ids, to_server_ids) 361 + value = pack_key_servers(shard.from_server_ids, shard.to_server_ids) 367 362 368 363 {key, value} 369 364 end
+2 -2
lib/servers/manager.ex
··· 3 3 4 4 import ExUnit.Assertions, only: [assert: 1] 5 5 6 - alias Hobbes.{MetaStore, ShardTagMap} 6 + alias Hobbes.ShardTagMap 7 7 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, TLogStatus, LogBatch} 8 8 alias Hobbes.Servers.{Coordinator, ServerSupervisor, TLog} 9 9 ··· 617 617 618 618 key_servers_pair = { 619 619 key_servers_prefix() <> shard_key, 620 - MetaStore.encode_server_ids(shard_server_ids), 620 + pack_key_servers(shard_server_ids, []), 621 621 } 622 622 623 623 server_keys_pairs =
+3 -5
lib/shard_info_map.ex
··· 1 1 defmodule Hobbes.ShardInfoMap do 2 2 alias Hobbes.{DenseShardMap, ShardInfoMap} 3 3 alias Hobbes.Structs.ShardStats 4 - alias Hobbes.MetaStore 5 4 6 5 import Hobbes.Utils 7 6 ··· 83 82 84 83 @spec shard_from_ks_pairs({binary, binary}, {binary, binary}) :: Shard.t 85 84 defp shard_from_ks_pairs({key_servers_prefix() <> start_key, value}, {key_servers_prefix() <> end_key, _value}) do 86 - {from, to} = MetaStore.decode_key_servers(value) 87 - to = to || [] 85 + [from_ids, to_ids] = unpack_key_servers(value) 88 86 89 87 %Shard{ 90 88 start_key: start_key, 91 89 end_key: end_key, 92 - from_server_ids: from, 93 - to_server_ids: to, 90 + from_server_ids: from_ids, 91 + to_server_ids: to_ids, 94 92 stats: nil, 95 93 } 96 94 end
+2 -10
lib/shard_tag_map.ex
··· 3 3 alias Hobbes.Structs.TLogGeneration 4 4 5 5 import Hobbes.Utils 6 - import Hobbes.MetaStore, only: [decode_server_ids: 1] 7 6 8 7 @type t :: %__MODULE__{ 9 8 tlog_ids: [non_neg_integer], ··· 104 103 105 104 # Returns {all_tags, from_tags} where all_tags is (from_tags ++ to_tags) 106 105 defp server_tags_from_key_servers_value(value) do 107 - case String.split(value, "/") do 108 - [from_str, to_str] -> 109 - from_ids = decode_server_ids(from_str) 110 - {from_ids ++ decode_server_ids(to_str), from_ids} 111 - 112 - [from_str] -> 113 - from_ids = decode_server_ids(from_str) 114 - {from_ids, from_ids} 115 - end 106 + [from_ids, to_ids] = unpack_key_servers(value) 107 + {from_ids ++ to_ids, from_ids} 116 108 end 117 109 118 110 @doc false
+11
lib/utils.ex
··· 5 5 6 6 alias Hobbes.Structs.{Cluster, Server} 7 7 alias Hobbes.Construct.SimServer 8 + alias Hobbes.Encoding.Keyset 8 9 9 10 @type mutation :: {:write, binary, binary} | {:clear, binary} | {:clear_range, binary, binary} 10 11 @type numbered_mutation :: {non_neg_integer, mutation} ··· 71 72 72 73 def special_mutation?({:clear, special_server_keys_prefix() <> _key}), do: true 73 74 def special_mutation?({:clear, _key}), do: false 75 + 76 + @spec pack_key_servers([integer], [integer]) :: binary 77 + def pack_key_servers(from_ids, to_ids) when is_list(from_ids) and is_list(to_ids) do 78 + Keyset.pack([from_ids, to_ids]) 79 + end 80 + 81 + @spec unpack_key_servers(binary) :: [[integer]] 82 + def unpack_key_servers(value) when is_binary(value) do 83 + [_from_ids, _to_ids] = Keyset.unpack(value) 84 + end 74 85 75 86 @spec get_servers(%Cluster{}, module) :: [%Server{}] 76 87 def get_servers(%Cluster{} = cluster, type) when is_atom(type) do
+6 -6
lib/workloads/read_write.ex
··· 277 277 end) 278 278 |> then(fn shards -> shards ++ [{"\xFF\xFF", nil}] end) 279 279 |> Enum.chunk_every(2, 1, :discard) 280 - |> Enum.map(fn [{start_key, servers}, {end_key, _}] -> 281 - {[id | _], _to} = Hobbes.MetaStore.decode_key_servers(servers) 282 - %{pid: pid} = Map.fetch!(cluster.servers, id) 283 - {:ok, %{size_bytes: size_bytes}} = too_new_backoff(fn -> 284 - Hobbes.Servers.Storage.get_shard_stats(pid, start_key, end_key) 285 - end) 280 + |> Enum.map(fn [{start_key, value}, {end_key, _}] -> 281 + [from_ids, to_ids] = unpack_key_servers(value) 286 282 283 + %{pid: pid} = Map.fetch!(cluster.servers, hd(from_ids)) 284 + {:ok, %{size_bytes: size_bytes}} = Hobbes.Servers.Storage.get_shard_stats(pid, start_key, end_key) 285 + 286 + servers = "#{inspect(from_ids)} -> #{inspect(to_ids)}" 287 287 {start_key, servers, size_bytes} 288 288 end) 289 289 end