this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor complete_shard_move to use ShardInfoMap

garrison 8d7da004 64f7c39a

+23 -29
+23 -29
lib/servers/distributor.ex
··· 7 7 alias Hobbes.ShardInfoMap.Shard 8 8 alias Hobbes.Structs.{Cluster, Server, ShardStats} 9 9 alias Hobbes.Servers.Storage 10 - alias Hobbes.Transaction.TxnState 11 10 12 11 import Hobbes.Utils 13 12 ··· 80 79 {:ok, state} 81 80 end 82 81 83 - def handle_call({:move_shard, shard_key, to_servers}, from, state) do 82 + def handle_call({:move_shard, shard_key, to_servers}, _from, state) do 84 83 {result, %State{} = state} = begin_move_shard(state, shard_key, to_servers) 85 84 {:reply, result, state} 86 85 end ··· 238 237 |> Enum.map(fn %ShardMove{} = move -> 239 238 case shard_move_complete?(move, state) do 240 239 true -> 241 - case complete_shard_move(move, state) do 242 - :ok -> nil 243 - # TODO: handle error 244 - end 245 - 240 + :ok = complete_shard_move(state, move) 241 + nil 246 242 false -> move 247 243 end 248 244 end) ··· 329 325 end 330 326 end 331 327 332 - defp complete_shard_move(%ShardMove{start_key: shard_key, end_key: end_key}, %State{} = state) do 333 - with {:ok, txn} = Transaction.new(state.cluster), 334 - {:ok, {ks_value, txn}} <- Transaction.read(txn, key_servers_prefix() <> shard_key), 335 - {from_servers, to_servers} <- MetaStore.decode_key_servers(ks_value), 336 - txn <- write_complete_move_keys(txn, shard_key, end_key, from_servers, to_servers), 337 - {:ok, _txn} <- Transaction.commit(txn) 338 - do 339 - :ok 340 - else 341 - {:error, _error} = error -> error 342 - end 343 - end 328 + defp complete_shard_move(%State{} = state, %ShardMove{} = move) do 329 + {:ok, %Shard{} = shard} = ShardInfoMap.fetch(state.shard_map, move.start_key) 330 + assert shard.end_key == move.end_key 344 331 345 - defp write_complete_move_keys(%TxnState{} = txn, start_key, end_key, from_servers, to_servers) 346 - when is_binary(start_key) and is_binary(end_key) and is_list(from_servers) and is_list(to_servers) do 347 - assert length(from_servers) > 0 348 - assert length(to_servers) > 0 332 + old_server_ids = shard.from_server_ids 333 + new_server_ids = shard.to_server_ids 334 + 335 + shard = %{shard | from_server_ids: new_server_ids, to_server_ids: []} 336 + ShardInfoMap.put(state.shard_map, shard.start_key, shard) 349 337 350 - txn = Enum.reduce(from_servers, txn, fn id, txn -> 351 - Transaction.clear(txn, server_keys_key(id, start_key)) 338 + sk_pairs = Enum.map(new_server_ids, fn id -> 339 + {server_keys_key(id, shard.start_key), "complete/" <> shard.end_key} 352 340 end) 341 + pairs = [to_ks_pair(shard) | sk_pairs] 353 342 354 - txn = Enum.reduce(to_servers, txn, fn id, txn -> 355 - Transaction.write(txn, server_keys_key(id, start_key), "complete/" <> end_key) 343 + {:ok, txn} = Transaction.new(state.cluster, write_only: true) 344 + txn = Enum.reduce(old_server_ids, txn, fn id, txn -> 345 + Transaction.clear(txn, server_keys_key(id, shard.start_key)) 356 346 end) 347 + txn = Transaction.write(txn, pairs) 357 348 358 - Transaction.write(txn, key_servers_prefix() <> start_key, MetaStore.encode_key_servers(to_servers)) 349 + case Transaction.commit(txn) do 350 + {:ok, _txn} -> :ok 351 + {:error, _err} -> exit(:shutdown) 352 + end 359 353 end 360 354 361 355 defp server_keys_key(server_id, shard_key) when is_integer(server_id) and is_binary(shard_key) do 362 356 server_keys_prefix() <> Integer.to_string(server_id) <> "/" <> shard_key 363 357 end 364 358 365 - def to_ks_pair(%Shard{} = shard) do 359 + defp to_ks_pair(%Shard{} = shard) do 366 360 to_server_ids = case shard.to_server_ids do 367 361 [] -> nil 368 362 list -> list