this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use distributor storage map instead of cluster to find storage pids

+25 -9
+25 -9
lib/servers/distributor.ex
··· 156 156 end) 157 157 |> Enum.chunk_every(2, 1, :discard) 158 158 |> Enum.map(fn [{start_key, from, _to}, {end_key, _, _}] -> 159 - %Server{pid: storage_pid} = Map.fetch!(state.cluster.servers, SimServer.deterministic_random(from)) 160 - {:ok, %ShardStats{} = stats} = too_new_backoff(fn -> 161 - Storage.get_shard_stats(storage_pid, start_key, end_key) 162 - end) 163 - {{start_key, end_key}, stats} 159 + from 160 + |> Enum.map(fn id -> Map.get(state.storage_servers, id).pid end) 161 + |> Enum.reject(&is_nil/1) 162 + |> case do 163 + [_ | _] = from_pids -> 164 + storage_pid = SimServer.deterministic_random(from_pids) 165 + 166 + {:ok, %ShardStats{} = stats} = too_new_backoff(fn -> 167 + Storage.get_shard_stats(storage_pid, start_key, end_key) 168 + end) 169 + {{start_key, end_key}, stats} 170 + 171 + [] -> nil 172 + end 164 173 end) 174 + |> Enum.reject(&is_nil/1) 165 175 166 176 oversize_shards = Enum.filter(shard_sizes, fn {_shard, %ShardStats{} = stats} -> 167 177 stats.size_bytes > @shard_max_size_bytes ··· 238 248 %State{state | shard_moves: shard_moves} 239 249 end 240 250 241 - defp shard_move_complete?(%ShardMove{shard_key: shard_key} = move, %State{cluster: cluster}) do 251 + defp shard_move_complete?(%ShardMove{shard_key: shard_key} = move, %State{} = state) do 242 252 move.to_servers 243 253 |> Enum.map(fn id -> 244 - %Server{pid: storage_pid} = Map.fetch!(cluster.servers, id) 245 - Storage.check_import_complete_send(storage_pid, move.start_version, shard_key) 254 + case Map.fetch(state.storage_servers, id) do 255 + {:ok, %StorageInfo{pid: pid}} -> {:ok, Storage.check_import_complete_send(pid, move.start_version, shard_key)} 256 + :error -> :error 257 + end 246 258 end) 247 - |> Enum.map(&Storage.check_import_complete_receive/1) 259 + |> Enum.map(fn 260 + {:ok, req_id} -> Storage.check_import_complete_receive(req_id) 261 + :error -> {:error, :server_unknown} 262 + end) 248 263 |> Enum.all?(fn 249 264 {:ok, completed?} -> completed? 250 265 # Storage server has not yet received the server_keys mutation 251 266 {:error, :version_too_new} -> false 267 + {:error, :server_unknown} -> false 252 268 end) 253 269 end 254 270