this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add ClusterConfig

garrison 7da24901 0e382072

+69 -42
+54
lib/cluster_config.ex
··· 1 + defmodule Hobbes.ClusterConfig do 2 + alias Hobbes.ClusterConfig 3 + 4 + import Hobbes.Utils 5 + 6 + @type t :: %__MODULE__{ 7 + num_begin_buffers: pos_integer, 8 + num_commit_buffers: pos_integer, 9 + num_tlog_teams: pos_integer, 10 + num_storage_teams: pos_integer, 11 + } 12 + @defaults [ 13 + num_begin_buffers: 1, 14 + num_commit_buffers: 3, 15 + num_tlog_teams: 1, 16 + num_storage_teams: 2, 17 + ] 18 + @keys Enum.map(@defaults, fn {k, _v} -> k end) 19 + 20 + @enforce_keys @keys 21 + defstruct @keys 22 + 23 + @spec from_opts(keyword) :: t 24 + def from_opts(opts) when is_list(opts) do 25 + fields = Enum.map(@defaults, fn {k, v} -> 26 + {k, Keyword.get(opts, k, v)} 27 + end) 28 + struct!(ClusterConfig, fields) 29 + end 30 + 31 + @spec from_pairs([{binary, binary}]) :: t 32 + def from_pairs(pairs) when is_list(pairs) do 33 + fields = Enum.map(@defaults, fn {k, v} -> 34 + key = coordinator_config_prefix() <> Atom.to_string(k) 35 + 36 + case List.keyfind(pairs, key, 0) do 37 + {^key, value} -> {k, String.to_integer(value)} 38 + nil -> {k, v} 39 + end 40 + end) 41 + struct!(ClusterConfig, fields) 42 + end 43 + 44 + @spec to_pairs(t) :: [{binary, binary}] 45 + def to_pairs(%ClusterConfig{} = config) do 46 + Enum.map(@keys, fn k -> 47 + value = Map.fetch!(config, k) 48 + { 49 + coordinator_config_prefix() <> Atom.to_string(k), 50 + Integer.to_string(value), 51 + } 52 + end) 53 + end 54 + end
+5 -32
lib/hobbes.ex
··· 1 1 defmodule Hobbes do 2 - alias Hobbes.ClusterNode 2 + alias Hobbes.{ClusterConfig, ClusterNode} 3 3 alias Hobbes.Servers.{Coordinator, Manager} 4 4 alias Hobbes.Structs.Cluster 5 5 ··· 84 84 coordinators 85 85 end 86 86 87 - defp default_opts do 88 - [ 89 - num_coordinators: 3, 90 - num_begin_buffers: 1, 91 - num_commit_buffers: 3, 92 - num_tlog_teams: 1, 93 - num_storage_teams: 2, 94 - ] 95 - end 96 - 97 - defp config_pairs(opts) do 98 - {initial_shards, opts} = Keyword.pop(opts, :initial_shards, [""]) 99 - 100 - default_opts() 101 - |> Keyword.merge(opts) 102 - |> Enum.map(fn {k, v} -> 103 - { 104 - "config/" <> Atom.to_string(k), 105 - encode_opt_value(v), 106 - } 107 - end) 108 - |> then(fn pairs -> 109 - pairs ++ Enum.map(initial_shards, fn k -> {"config/initial_shards/" <> k, ""} end) 110 - end) 111 - end 112 - 113 - defp encode_opt_value(value) when is_integer(value), do: Integer.to_string(value) 114 - defp encode_opt_value(value) when is_binary(value), do: value 115 - 116 87 @spec start_cluster(Keyword.t) :: {:ok, [pid]} 117 88 def start_cluster(opts) do 118 89 num_coordinators = Keyword.get(opts, :num_coordinators, 3) ··· 125 96 126 97 SimProcess.sleep(1000) 127 98 128 - config = config_pairs(opts) 129 - :ok = Coordinator.write(hd(coordinators), config) 99 + config = ClusterConfig.from_opts(opts) 100 + config_pairs = ClusterConfig.to_pairs(config) 101 + initial_shards_pairs = Keyword.get(opts, :initial_shards, [""]) |> Enum.map(fn k -> {"config/initial_shards/" <> k, ""} end) 102 + :ok = Coordinator.write(hd(coordinators), config_pairs ++ initial_shards_pairs) 130 103 131 104 { 132 105 :ok,
+7 -10
lib/servers/manager.ex
··· 5 5 6 6 import ExUnit.Assertions, only: [assert: 1] 7 7 8 - alias Hobbes.ShardTagMap 8 + alias Hobbes.{ClusterConfig, ShardTagMap} 9 9 alias Hobbes.KV.FlatKV 10 10 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, SupervisorStatus, TLogStatus, LogBatch} 11 11 alias Hobbes.Servers.{Coordinator, ServerSupervisor, TLog} ··· 896 896 end) 897 897 |> Enum.reverse() 898 898 899 - config_map = Map.new(pairs) 900 - %Config{ 901 - num_begin_buffers: Map.fetch!(config_map, "config/num_begin_buffers") |> String.to_integer(), 902 - num_commit_buffers: Map.fetch!(config_map, "config/num_commit_buffers") |> String.to_integer(), 903 - num_tlog_teams: Map.fetch!(config_map, "config/num_tlog_teams") |> String.to_integer(), 904 - num_storage_teams: Map.fetch!(config_map, "config/num_storage_teams") |> String.to_integer(), 905 - num_replicas: 3, 906 - initial_shards: initial_shards, 907 - } 899 + fields = 900 + ClusterConfig.from_pairs(pairs) 901 + |> Map.from_struct() 902 + |> Map.put(:initial_shards, initial_shards) 903 + |> Map.put(:num_replicas, 3) 904 + struct!(Config, fields) 908 905 end 909 906 910 907 defp load_generations(pairs) when is_list(pairs) do
+3
lib/utils.ex
··· 70 70 71 71 def mvcc_window, do: 5_000_000 72 72 73 + defmacro coordinator_config_prefix, do: "config/" 74 + defmacro coordinator_config_end, do: "config0" 75 + 73 76 defguard is_database_key(key) when is_binary(key) and key >= database_prefix() and key < database_end() 74 77 75 78 defguard is_database_range(start_key, end_key)