this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Recruit storage servers in slots

garrison 72875cd8 ab22b72b

+28 -27
+2 -2
ROADMAP.md
··· 5 5 ### Functionality 6 6 7 7 - [X] BeginBuffer server (batch get_read_version requests) 8 - - [ ] Use slots to spawn servers 8 + - [X] Use slots to spawn servers 9 9 - [ ] Storage server handling 10 - - [ ] Spawn servers using slots in Manager 10 + - [X] Spawn servers using slots in Manager 11 11 - [ ] Add unknown servers to meta keyspace in Distributor 12 12 - [ ] Build teams from servers in Distributor 13 13 - [ ] Logs
+26 -25
lib/servers/manager.ex
··· 281 281 282 282 supervisors_stateless = Enum.map(state.supervisors, fn {pid, %{stateless: slots}} -> {pid, slots} end) 283 283 supervisors_tlogs = Enum.map(state.supervisors, fn {pid, %{tlog: slots}} -> {pid, slots} end) 284 + supervisors_storage = Enum.map(state.supervisors, fn {pid, %{storage: slots}} -> {pid, slots} end) 284 285 285 286 open_stateless_slots = Enum.sum_by(supervisors_stateless, fn {_pid, slots} -> slots end) 286 287 open_tlog_slots = Enum.sum_by(supervisors_tlogs, fn {_pid, slots} -> slots end) 288 + open_storage_slots = Enum.sum_by(supervisors_storage, fn {_pid, slots} -> slots end) 287 289 288 290 assert open_stateless_slots >= 5 289 291 assert open_tlog_slots >= state.config.num_replicas 292 + # TODO: this is not guaranteed, we need a maybe_bootstrap() 293 + assert open_storage_slots >= state.config.num_replicas 290 294 291 295 # 3 -> Sequencer, Resolver, Distributor 292 296 {num_begin_buffers, num_commit_buffers} = compute_buffer_counts(open_stateless_slots - 3) ··· 294 298 %Config{} = config = state.config 295 299 296 300 ids = allocate_server_ids(state, 297 - storage: config.num_storage, 301 + storage: open_storage_slots, 298 302 tlog: min(open_tlog_slots, config.num_tlogs), 299 303 begin_buffer: min(num_begin_buffers, config.num_begin_buffers), 300 304 commit_buffer: min(num_commit_buffers, config.num_commit_buffers), ··· 306 310 # Create seed metadata needed by servers 307 311 meta_pairs = build_seed_meta(config, ids.storage) 308 312 309 - cluster = state.cluster 310 - gen = cluster.generation 311 - 312 313 # Recruit storage 313 - state = recruit(state, gen, Hobbes.Servers.Storage, ids.storage, %{cluster: cluster}) 314 + state = recruit_storage(state, ids.storage, supervisors_storage) 314 315 315 316 # Recruit TLogs 316 317 state = recruit_tlogs(state, ids.tlog, supervisors_tlogs, 0, meta_pairs) ··· 548 549 state 549 550 end 550 551 552 + defp recruit_storage(%State{} = state, storage_ids, supervisor_slots) 553 + when is_list(storage_ids) and is_list(supervisor_slots) do 554 + storage_ids 555 + |> Enum.map(fn id -> {Hobbes.Servers.Storage, id, %{cluster: state.cluster}} end) 556 + |> recruit_servers(supervisor_slots, state.cluster.generation) 557 + |> put_servers_in_cluster(state) 558 + end 559 + 551 560 defp recruit_tlogs(%State{} = state, tlog_ids, supervisor_slots, prev_version, meta_pairs) 552 561 when is_list(tlog_ids) and is_list(supervisor_slots) and is_integer(prev_version) and is_list(meta_pairs) do 553 - Enum.map(tlog_ids, fn id -> {Hobbes.Servers.TLog, id, %{cluster: state.cluster, prev_version: prev_version, meta_pairs: meta_pairs}} end) 562 + tlog_ids 563 + |> Enum.map(fn id -> {Hobbes.Servers.TLog, id, %{cluster: state.cluster, prev_version: prev_version, meta_pairs: meta_pairs}} end) 554 564 |> recruit_servers(supervisor_slots, state.cluster.generation) 555 - |> Enum.reduce(state, fn %Server{} = server, state -> 556 - put_in(state.cluster.servers[server.id], server) 557 - end) 565 + |> put_servers_in_cluster(state) 558 566 end 559 567 560 568 defp recruit_stateless(%State{} = state, ids, supervisor_slots, prev_version, meta_kv) ··· 576 584 ] 577 585 |> Enum.concat() 578 586 |> recruit_servers(supervisor_slots, cluster.generation) 579 - |> Enum.reduce(state, fn %Server{} = server, state -> 580 - put_in(state.cluster.servers[server.id], server) 587 + |> put_servers_in_cluster(state) 588 + end 589 + 590 + defp put_servers_in_cluster(server_list, %State{} = state) do 591 + Enum.reduce(server_list, state.cluster.servers, fn %Server{} = server, servers -> 592 + Map.put(servers, server.id, server) 593 + end) 594 + |> then(fn servers -> 595 + put_in(state.cluster.servers, servers) 581 596 end) 582 597 end 583 598 ··· 604 619 {_pid, 0} -> 605 620 recruit_servers(servers, supervisors, generation, i + 1, acc) 606 621 end 607 - end 608 - 609 - defp recruit(%State{} = state, generation, module, ids, args) when is_integer(generation) and is_atom(module) and is_list(ids) and is_map(args) do 610 - ids 611 - |> Enum.with_index() 612 - |> Enum.reduce(state, fn {id, index}, state -> 613 - {sup_pid, _slots} = Enum.at(state.supervisors, rem(index, length(state.supervisors))) 614 - 615 - init_arg = Map.put(args, :id, id) 616 - {:ok, pid} = ServerSupervisor.start_child_server(sup_pid, generation, module, init_arg) 617 - 618 - server = %Server{type: module, id: id, pid: pid} 619 - put_in(state.cluster.servers[id], server) 620 - end) 621 622 end 622 623 623 624 defp commit_seed_transaction(%State{} = state, %TLogGeneration{} = tlog_gen, meta_pairs) when is_list(meta_pairs) do