this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor recruitment to reduce duplication

garrison ab22b72b 33b60d8c

+48 -55
+48 -55
lib/servers/manager.ex
··· 305 305 306 306 # Create seed metadata needed by servers 307 307 meta_pairs = build_seed_meta(config, ids.storage) 308 - key_servers_pairs = Enum.filter(meta_pairs, fn 309 - {key_servers_prefix() <> _, _value} -> true 310 - _ -> false 311 - end) 312 - next_shard_move_id = 0 313 308 314 309 cluster = state.cluster 315 310 gen = cluster.generation ··· 318 313 state = recruit(state, gen, Hobbes.Servers.Storage, ids.storage, %{cluster: cluster}) 319 314 320 315 # Recruit TLogs 321 - state = 322 - Enum.map(ids.tlog, fn id -> {Hobbes.Servers.TLog, id, %{cluster: cluster, prev_version: 0, meta_pairs: meta_pairs}} end) 323 - |> recruit_servers(supervisors_tlogs, gen) 324 - |> Enum.reduce(state, fn %Server{} = server, state -> 325 - put_in(state.cluster.servers[server.id], server) 326 - end) 316 + state = recruit_tlogs(state, ids.tlog, supervisors_tlogs, 0, meta_pairs) 327 317 328 - state = 329 - [ 330 - [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: 1}}], 331 - [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: 1}}], 332 - [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: [], next_shard_move_id: next_shard_move_id}}], 333 - Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 334 - Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 335 - ] 336 - |> Enum.concat() 337 - |> recruit_servers(supervisors_stateless, gen) 338 - |> Enum.reduce(state, fn %Server{} = server, state -> 339 - put_in(state.cluster.servers[server.id], server) 340 - end) 318 + # Recruit stateless servers 319 + meta_kv = FlatKV.new() 320 + FlatKV.load(meta_kv, meta_pairs) 321 + prev_version = 1 322 + 323 + state = recruit_stateless(state, ids, supervisors_stateless, prev_version, meta_kv) 341 324 342 - seed_meta_store(state, first_tlog_generation, meta_pairs) 325 + # Seed the meta pairs into the database 326 + # They are already stored by TLogs but must be committed into the actual KV space as well 327 + :ok = commit_seed_transaction(state, first_tlog_generation, meta_pairs) 343 328 344 329 # Write the first generation into the Coordinators 345 330 # Once complete, any future recoveries will have to start from these TLogs ··· 406 391 state = update_in(state.cluster.tlog_generations, &[new_tlog_generation | &1]) 407 392 408 393 # Recruit new TLogs 409 - cluster = state.cluster 410 - gen = state.cluster.generation 411 - 412 - state = 413 - Enum.map(ids.tlog, fn id -> {Hobbes.Servers.TLog, id, %{cluster: state.cluster, prev_version: last_generation_end_version, meta_pairs: meta_pairs}} end) 414 - |> recruit_servers(supervisors_tlogs, gen) 415 - |> Enum.reduce(state, fn %Server{} = server, state -> 416 - put_in(state.cluster.servers[server.id], server) 417 - end) 394 + state = recruit_tlogs(state, ids.tlog, supervisors_tlogs, last_generation_end_version, meta_pairs) 418 395 419 396 # Copy mutations in range [max_kcv + 1, min_dv] to the new TLogs 420 397 shard_map = ShardTagMap.new(new_tlog_generation) ··· 443 420 # 444 421 # TODO: get an actual Sequencer version to respect real time (not important for now) 445 422 recovery_commit_version = recovery_commit_prev_version + 1 446 - state = write_recovery_commit(state, new_tlog_generation, recovery_commit_prev_version, recovery_commit_version) 447 - 448 - # Load metadata needed by servers from the meta_kv 449 - key_servers_pairs = FlatKV.scan(meta_kv, key_servers_prefix(), key_servers_end()).pairs 450 - shard_moves_pairs = FlatKV.scan(meta_kv, shard_moves_prefix(), shard_moves_end()).pairs 451 - [next_shard_move_id] = FlatKV.get(meta_kv, next_shard_move_id_key()) |> Keyset.unpack() 423 + state = commit_recovery_transaction(state, new_tlog_generation, recovery_commit_prev_version, recovery_commit_version) 452 424 453 425 # Recruit stateless servers 454 - state = 455 - [ 456 - [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: recovery_commit_version}}], 457 - [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: recovery_commit_version}}], 458 - [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 459 - Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 460 - Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 461 - ] 462 - |> Enum.concat() 463 - |> recruit_servers(supervisors_stateless, gen) 464 - |> Enum.reduce(state, fn %Server{} = server, state -> 465 - put_in(state.cluster.servers[server.id], server) 466 - end) 426 + state = recruit_stateless(state, ids, supervisors_stateless, recovery_commit_version, meta_kv) 467 427 468 428 # Write the new TLog generation to the Coordinators 469 429 # Once complete, the effect of this recovery is permanent, and any future recoveries will have to start from these TLogs ··· 564 524 end) 565 525 end 566 526 567 - defp write_recovery_commit(%State{} = state, %TLogGeneration{} = tlog_gen, prev_commit_version, commit_version) 527 + defp commit_recovery_transaction(%State{} = state, %TLogGeneration{} = tlog_gen, prev_commit_version, commit_version) 568 528 when is_integer(prev_commit_version) and is_integer(commit_version) do 569 529 log_batch = %LogBatch{ 570 530 commit_buffer_id: nil, ··· 588 548 state 589 549 end 590 550 551 + defp recruit_tlogs(%State{} = state, tlog_ids, supervisor_slots, prev_version, meta_pairs) 552 + when is_list(tlog_ids) and is_list(supervisor_slots) and is_integer(prev_version) and is_list(meta_pairs) do 553 + Enum.map(tlog_ids, fn id -> {Hobbes.Servers.TLog, id, %{cluster: state.cluster, prev_version: prev_version, meta_pairs: meta_pairs}} end) 554 + |> recruit_servers(supervisor_slots, state.cluster.generation) 555 + |> Enum.reduce(state, fn %Server{} = server, state -> 556 + put_in(state.cluster.servers[server.id], server) 557 + end) 558 + end 559 + 560 + defp recruit_stateless(%State{} = state, ids, supervisor_slots, prev_version, meta_kv) 561 + when is_map(ids) and is_list(supervisor_slots) and is_integer(prev_version) do 562 + %Cluster{} = cluster = state.cluster 563 + 564 + # Load meta pairs from meta_kv 565 + key_servers_pairs = FlatKV.scan(meta_kv, key_servers_prefix(), key_servers_end()).pairs 566 + shard_moves_pairs = FlatKV.scan(meta_kv, shard_moves_prefix(), shard_moves_end()).pairs 567 + [next_shard_move_id] = FlatKV.get(meta_kv, next_shard_move_id_key()) |> Keyset.unpack() 568 + 569 + # Recruit stateless servers 570 + [ 571 + [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: prev_version}}], 572 + [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: prev_version}}], 573 + [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: shard_moves_pairs, next_shard_move_id: next_shard_move_id}}], 574 + Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 575 + Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 576 + ] 577 + |> Enum.concat() 578 + |> recruit_servers(supervisor_slots, cluster.generation) 579 + |> Enum.reduce(state, fn %Server{} = server, state -> 580 + put_in(state.cluster.servers[server.id], server) 581 + end) 582 + end 583 + 591 584 @spec recruit_servers([tuple], [{pid, non_neg_integer}], non_neg_integer, non_neg_integer, list) :: list 592 585 defp recruit_servers(servers, supervisors, generation, i \\ 0, acc \\ []) 593 586 ··· 627 620 end) 628 621 end 629 622 630 - defp seed_meta_store(%State{} = state, %TLogGeneration{} = tlog_gen, meta_pairs) when is_list(meta_pairs) do 623 + defp commit_seed_transaction(%State{} = state, %TLogGeneration{} = tlog_gen, meta_pairs) when is_list(meta_pairs) do 631 624 shard_map = ShardTagMap.new(tlog_gen) 632 625 ShardTagMap.load_meta_pairs(shard_map, meta_pairs) 633 626