this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor begin_move_shard to use ShardInfoMap

garrison 64f7c39a 8a0eff31

+63 -81
+53 -54
lib/servers/distributor.ex
··· 25 25 @type t :: %__MODULE__{ 26 26 } 27 27 @enforce_keys [ 28 - :initiated_from, 29 28 :start_version, 30 - :shard_key, 29 + :start_key, 31 30 :end_key, 32 31 :to_servers, 33 32 ] ··· 82 81 end 83 82 84 83 def handle_call({:move_shard, shard_key, to_servers}, from, state) do 85 - {result, %State{} = state} = begin_move_shard(shard_key, to_servers, from, state) 86 - case result do 87 - :ok -> {:noreply, state} 88 - {:error, _} = error -> {:reply, error, state} 89 - end 84 + {result, %State{} = state} = begin_move_shard(state, shard_key, to_servers) 85 + {:reply, result, state} 90 86 end 91 87 92 88 def handle_cast({:storage_ping, storage_pid, storage_id, shard_stats}, %State{} = state) do ··· 255 251 %State{state | shard_moves: shard_moves} 256 252 end 257 253 258 - defp shard_move_complete?(%ShardMove{shard_key: shard_key} = move, %State{} = state) do 254 + defp shard_move_complete?(%ShardMove{start_key: shard_key} = move, %State{} = state) do 259 255 move.to_servers 260 256 |> Enum.map(fn id -> 261 257 case Map.fetch(state.storage_servers, id) do ··· 275 271 end) 276 272 end 277 273 278 - defp begin_move_shard(shard_key, to_servers, initiated_from, %State{} = state) when is_binary(shard_key) and is_list(to_servers) do 279 - with {:ok, txn} <- Transaction.new(state.cluster), 280 - {:ok, {shard_value, txn}} <- Transaction.read(txn, key_servers_prefix() <> shard_key), 281 - {:ok, from_servers} <- from_servers_for_move(shard_value, to_servers), 282 - # TODO: limit 1 283 - {:ok, {next_pairs, txn}} <- Transaction.read_range(txn, key_servers_prefix() <> shard_key <> "0", key_servers_end()), 284 - end_key <- end_key_from_next_pairs(next_pairs), 285 - txn <- write_move_shard_pairs(txn, shard_key, end_key, from_servers, to_servers), 286 - {:ok, %TxnState{} = txn} <- Transaction.commit(txn) 274 + defp begin_move_shard(%State{} = state, start_key, to_server_ids) when is_binary(start_key) and is_list(to_server_ids) do 275 + Enum.each(to_server_ids, fn id -> assert is_integer(id) end) 276 + 277 + with {:ok, %Shard{} = shard} <- fetch_shard(state, start_key), 278 + :ok <- validate_shard_not_moving(shard), 279 + :ok <- validate_no_overlap(shard, to_server_ids) 287 280 do 288 - shard_move = %ShardMove{ 289 - initiated_from: initiated_from, 290 - start_version: txn.commit_version, 291 - shard_key: shard_key, 292 - end_key: end_key, 293 - to_servers: to_servers 294 - } 281 + shard = %{shard | to_server_ids: to_server_ids} 282 + ShardInfoMap.put(state.shard_map, shard.start_key, shard) 283 + 284 + sk_pairs = Enum.map(to_server_ids, fn id -> 285 + {server_keys_key(id, shard.start_key), "fetching/" <> shard.end_key} 286 + end) 287 + pairs = [to_ks_pair(shard) | sk_pairs] 288 + 289 + {:ok, txn} = Transaction.new(state.cluster, write_only: true) 290 + txn = Transaction.write(txn, pairs) 291 + 292 + case Transaction.commit(txn) do 293 + {:ok, txn} -> 294 + shard_move = %ShardMove{ 295 + start_version: txn.commit_version, 296 + start_key: shard.start_key, 297 + end_key: shard.end_key, 298 + to_servers: shard.to_server_ids, 299 + } 300 + state = %{state | shard_moves: [shard_move | state.shard_moves]} 301 + {:ok, state} 295 302 296 - state = %{state | shard_moves: [shard_move | state.shard_moves]} 297 - {:ok, state} 303 + # Exit for non-retryable errors to preserve consistency of the ShardInfoMap 304 + {:error, _err} -> exit(:shutdown) 305 + end 298 306 else 299 - {:error, _error} = error -> {error, state} 307 + {:error, _err} = error -> {error, state} 300 308 end 301 309 end 302 310 303 - defp from_servers_for_move(shard_value, to_servers) when is_binary(shard_value) and is_list(to_servers) do 304 - case MetaStore.decode_key_servers(shard_value) do 305 - {from_servers, nil} -> 306 - case Enum.any?(from_servers, fn id -> id in to_servers end) do 307 - false -> {:ok, from_servers} 308 - true -> {:error, :servers_overlap} 309 - end 311 + defp fetch_shard(%State{} = state, start_key) do 312 + case ShardInfoMap.fetch(state.shard_map, start_key) do 313 + {:ok, %Shard{}} = result -> result 314 + :error -> {:error, :shard_not_found} 315 + end 316 + end 310 317 311 - {_from_servers, [_ | _] = _to_servers} -> {:error, :shard_already_moving} 318 + defp validate_shard_not_moving(%Shard{} = shard) do 319 + case shard.to_server_ids do 320 + [] -> :ok 321 + [_ | _] -> {:error, :shard_already_moving} 312 322 end 313 323 end 314 324 315 - defp end_key_from_next_pairs([]), do: all_keys_end() 316 - defp end_key_from_next_pairs([{key_servers_prefix() <> ek, _v} | _]), do: ek 317 - 318 - defp write_move_shard_pairs(%TxnState{} = txn, start_key, end_key, from_servers, to_servers) do 319 - new_value = MetaStore.encode_key_servers(from_servers, to_servers) 320 - pairs = 321 - Enum.map(to_servers, fn id -> 322 - {server_keys_key(id, start_key), "fetching/" <> end_key} 323 - end) 324 - 325 - pairs = [{key_servers_prefix() <> start_key, new_value} | pairs] 326 - Transaction.write(txn, pairs) 325 + defp validate_no_overlap(%Shard{} = shard, to_server_ids) do 326 + case Enum.any?(to_server_ids, fn id -> id in shard.from_server_ids end) do 327 + false -> :ok 328 + true -> {:error, :servers_overlap} 329 + end 327 330 end 328 331 329 - defp complete_shard_move(%ShardMove{shard_key: shard_key, end_key: end_key} = move, %State{} = state) do 332 + defp complete_shard_move(%ShardMove{start_key: shard_key, end_key: end_key}, %State{} = state) do 330 333 with {:ok, txn} = Transaction.new(state.cluster), 331 334 {:ok, {ks_value, txn}} <- Transaction.read(txn, key_servers_prefix() <> shard_key), 332 335 {from_servers, to_servers} <- MetaStore.decode_key_servers(ks_value), 333 336 txn <- write_complete_move_keys(txn, shard_key, end_key, from_servers, to_servers), 334 - {:ok, txn} <- Transaction.commit(txn) 337 + {:ok, _txn} <- Transaction.commit(txn) 335 338 do 336 - if move.initiated_from do 337 - SimServer.reply(move.initiated_from, {:ok, %{from: from_servers, to: to_servers, commit_version: txn.commit_version}}) 338 - end 339 - 340 339 :ok 341 340 else 342 341 {:error, _error} = error -> error ··· 363 362 server_keys_prefix() <> Integer.to_string(server_id) <> "/" <> shard_key 364 363 end 365 364 366 - defp to_ks_pair(%Shard{} = shard) do 365 + def to_ks_pair(%Shard{} = shard) do 367 366 to_server_ids = case shard.to_server_ids do 368 367 [] -> nil 369 368 list -> list
+9
lib/shard_info_map.ex
··· 36 36 } 37 37 end 38 38 39 + @spec put(t, binary, Shard.t) :: :ok 39 40 def put(%ShardInfoMap{shard_map: dsm}, start_key, %Shard{} = shard) do 40 41 DenseShardMap.put(dsm, start_key, shard) 42 + end 43 + 44 + @spec fetch(t, binary) :: {:ok, Shard.t} | :error 45 + def fetch(%ShardInfoMap{shard_map: dsm}, start_key) do 46 + case DenseShardMap.get(dsm, start_key) do 47 + %Shard{} = shard -> {:ok, shard} 48 + nil -> :error 49 + end 41 50 end 42 51 43 52 @spec list_shards(t) :: [Shard.t]
+1 -27
lib/workloads/shard_move.ex
··· 66 66 Enum.at(storage_servers, (random_team * 3) + 2).id, 67 67 ] 68 68 69 - time_before_move = SimServer.current_time() 70 - 71 69 [%Server{pid: distributor_pid}] = get_servers(cluster, Hobbes.Servers.Distributor) 72 70 case Distributor.move_shard(distributor_pid, shard_to_move, to_servers) do 73 - {:ok, %{from: from_servers, to: to_servers, commit_version: commit_version}} -> 74 - after_servers = get_key_servers(cluster, shard_to_move) 75 - if after_servers != to_servers do 76 - raise """ 77 - Shard has not moved! 78 - 79 - from_servers: #{inspect(from_servers)} 80 - to_servers: #{inspect(to_servers)} 81 - after_servers: #{inspect(after_servers)} 82 - """ 83 - end 84 - 85 - took_ms = (SimServer.current_time() - time_before_move) / 1000 86 - moves = [%{shard: shard_to_move, from: from_servers, to: to_servers, took_ms: took_ms, commit_version: commit_version} | state.moves] 87 - %State{state | moves: moves} 88 - 71 + :ok -> state 89 72 {:error, _error} -> state 90 73 end 91 - end 92 - 93 - defp get_key_servers(%Cluster{} = cluster, key) when is_binary(key) do 94 - [commit_buffer | _] = get_servers(cluster, Hobbes.Servers.CommitBuffer) 95 - #read_version = Transaction.new(cluster).read_version 96 - 97 - # TODO: this is no longer guaranteed to be correct, we must read from the database 98 - {:ok, [{_start_key, _end_key, {ids, _pids}}]} = Transaction.get_shards_backoff(commit_buffer.pid, key) 99 - ids 100 74 end 101 75 end 102 76