this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor id allocation

garrison c9c010c7 ca6ad70f

+42 -24
+42 -24
lib/servers/manager.ex
··· 273 273 274 274 %Config{} = config = state.config 275 275 276 - ids = allocate_server_ids(state, config.num_storage + config.num_tlogs + config.num_begin_buffers + config.num_commit_buffers + 3) 277 - {storage_ids, ids} = Enum.split(ids, config.num_storage) 278 - {tlog_ids, ids} = Enum.split(ids, config.num_tlogs) 279 - {begin_buffer_ids, ids} = Enum.split(ids, config.num_begin_buffers) 280 - {commit_buffer_ids, ids} = Enum.split(ids, config.num_commit_buffers) 281 - [sequencer_id, resolver_id, distributor_id] = ids 276 + ids = allocate_server_ids(state, 277 + storage: config.num_storage, 278 + tlog: config.num_tlogs, 279 + begin_buffer: config.num_begin_buffers, 280 + commit_buffer: config.num_commit_buffers, 281 + sequencer: 1, resolver: 1, distributor: 1) 282 + 283 + storage_ids = ids.storage 284 + tlog_ids = ids.tlog 285 + begin_buffer_ids = ids.begin_buffer 286 + commit_buffer_ids = ids.commit_buffer 287 + [sequencer_id] = ids.sequencer 288 + [resolver_id] = ids.resolver 289 + [distributor_id] = ids.distributor 282 290 283 291 first_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: 0, tlog_ids: tlog_ids} 284 292 state = put_in(state.cluster.tlog_generations, [first_tlog_generation]) ··· 339 347 dbg {"Recovery", state.cluster.generation, max_kcv, min_dv} 340 348 #dbg {meta_pairs, state.cluster.tlog_generations} 341 349 350 + # Count open slots and allocate ids 342 351 supervisors_stateless = Enum.map(state.supervisors, fn {pid, %{stateless: stateless_slots}} -> {pid, stateless_slots} end) 343 352 supervisors_tlogs = Enum.map(state.supervisors, fn {pid, %{tlog: tlog_slots}} -> {pid, tlog_slots} end) 344 353 345 354 open_stateless_slots = Enum.sum_by(supervisors_stateless, fn {_pid, stateless_slots} -> stateless_slots end) 346 355 open_tlog_slots = Enum.sum_by(supervisors_tlogs, fn {_pid, tlog_slots} -> tlog_slots end) 347 356 348 - # Sequencer, BeginBuffer, CommitBuffer, Resolver, Distributor 357 + # 5 stateless (Sequencer, BeginBuffer, CommitBuffer, Resolver, Distributor) 349 358 assert open_stateless_slots >= 5 350 359 assert open_tlog_slots >= state.config.num_replicas 351 360 352 - ids = allocate_server_ids(state, open_tlog_slots + 5) 353 - {tlog_ids, ids} = Enum.split(ids, open_tlog_slots) 354 - {begin_buffer_ids, ids} = Enum.split(ids, 1) 355 - {commit_buffer_ids, ids} = Enum.split(ids, 1) 356 - [sequencer_id, resolver_id, distributor_id] = ids 361 + ids = allocate_server_ids(state, tlog: open_tlog_slots, begin_buffer: 1, commit_buffer: 1, sequencer: 1, resolver: 1, distributor: 1) 357 362 358 363 # Create new TLog generation 359 364 last_generation_end_version = max_kcv 360 365 prev_tlog_generation = hd(state.cluster.tlog_generations) 361 - new_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: last_generation_end_version + 1, tlog_ids: tlog_ids} 366 + new_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: last_generation_end_version + 1, tlog_ids: ids.tlog} 362 367 assert new_tlog_generation.start_version > prev_tlog_generation.start_version 363 368 364 369 state = update_in(state.cluster.tlog_generations, &[new_tlog_generation | &1]) ··· 368 373 gen = state.cluster.generation 369 374 370 375 state = 371 - Enum.map(tlog_ids, fn id -> {Hobbes.Servers.TLog, id, %{cluster: state.cluster, prev_version: last_generation_end_version, meta_pairs: meta_pairs}} end) 376 + Enum.map(ids.tlog, fn id -> {Hobbes.Servers.TLog, id, %{cluster: state.cluster, prev_version: last_generation_end_version, meta_pairs: meta_pairs}} end) 372 377 |> recruit_servers(supervisors_tlogs, gen) 373 378 |> Enum.reduce(state, fn %Server{} = server, state -> 374 379 put_in(state.cluster.servers[server.id], server) ··· 411 416 # Recruit stateless servers 412 417 state = 413 418 [ 414 - [{Hobbes.Servers.Sequencer, sequencer_id, %{cluster: cluster, prev_version: recovery_commit_version}}], 415 - [{Hobbes.Servers.Resolver, resolver_id, %{cluster: cluster, prev_version: recovery_commit_version}}], 416 - [{Hobbes.Servers.Distributor, distributor_id, %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 417 - Enum.map(begin_buffer_ids, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 418 - Enum.map(commit_buffer_ids, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 419 + [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: recovery_commit_version}}], 420 + [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: recovery_commit_version}}], 421 + [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 422 + Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 423 + Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 419 424 ] 420 425 |> Enum.concat() 421 426 |> recruit_servers(supervisors_stateless, gen) ··· 623 628 :ok 624 629 end 625 630 626 - @spec allocate_server_ids(State.t, non_neg_integer) :: [non_neg_integer] 627 - defp allocate_server_ids(%State{} = state, count) do 628 - assert state.cluster != nil 631 + @spec allocate_server_ids(State.t, [{atom, pos_integer}]) :: %{atom => pos_integer} 632 + defp allocate_server_ids(%State{} = state, server_ids) when is_list(server_ids) do 633 + total = Enum.sum_by(server_ids, fn {_key, count} -> count end) 629 634 630 - case Coordinator.allocate_ids(state.primary_coordinator, state.cluster.generation, count) do 635 + case Coordinator.allocate_ids(state.primary_coordinator, state.cluster.generation, total) do 631 636 {:ok, {start_id, end_id}} -> 632 - Enum.to_list(start_id..(end_id - 1)) 637 + ids = Enum.to_list(start_id..(end_id - 1)) 638 + 639 + server_ids 640 + |> Enum.reduce({ids, []}, fn {key, count}, {ids, acc} -> 641 + {k_ids, ids} = Enum.split(ids, count) 642 + { 643 + ids, 644 + [{key, k_ids} | acc], 645 + } 646 + end) 647 + |> then(fn {[], acc} -> acc end) 648 + |> Map.new() 649 + 650 + {:error, _err} -> exit(:shutdown) 633 651 end 634 652 end 635 653