this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Use slots during bootstrap

garrison 33b60d8c c9c010c7

+84 -38
-3
lib/hobbes.ex
··· 38 38 stateless: 6, 39 39 tlog: [ 40 40 "/tlog_1", 41 - "/tlog_2", 42 - #"/tlog_3", 43 - #"/tlog_4", 44 41 ], 45 42 storage: [ 46 43 "/storage_1",
+84 -35
lib/servers/manager.ex
··· 206 206 end 207 207 end 208 208 209 + defp add_supervisor(%State{} = state, _supervisor_pid, nil = _slots) do 210 + # Don't add supervisor until it has loaded its slots (once it receives a cluster) 211 + state 212 + end 213 + 209 214 defp add_supervisor(%State{} = state, supervisor_pid, slots) do 210 215 case List.keymember?(state.supervisors, supervisor_pid, 0) do 211 216 false -> %{state | supervisors: [{supervisor_pid, slots} | state.supervisors]} ··· 217 222 # Do nothing if we have already recovered 218 223 defp maybe_start_generation(%State{} = state) when state.cluster.status == :normal, do: state 219 224 220 - # Database initialization 221 - defp maybe_start_generation(%State{} = state) when state.cluster.tlog_generations == [] do 222 - assert state.cluster.status == :recovering 223 - 224 - # TODO: length(supervisors) >= config.num_tlogs (or similar) 225 - case length(state.supervisors) >= 3 do 226 - true -> bootstrap(state) 227 - false -> state 228 - end 229 - end 230 - 231 225 @min_recovery_duration_us 2_000_000 232 226 233 227 defp maybe_start_generation(%State{} = state) do 234 228 assert state.cluster.status == :recovering 235 - assert length(state.cluster.tlog_generations) > 0 236 229 237 230 # 1 of (Sequencer, BeginBuffer, CommitBuffer, Resolver, Distributor) 238 231 min_stateless_slots = 5 ··· 252 245 (collected_stateless_slots >= min_stateless_slots) and 253 246 (collected_tlog_slots >= min_tlog_slots) 254 247 248 + case has_enough_slots? do 249 + true -> 250 + case state.cluster.tlog_generations do 251 + [] -> bootstrap(state) 252 + [_ | _] -> maybe_recover(state) 253 + end 254 + 255 + false -> state 256 + end 257 + end 258 + 259 + defp maybe_recover(%State{} = state) do 260 + assert state.cluster.status == :recovering 261 + assert length(state.cluster.tlog_generations) > 0 262 + 255 263 prev_generation = hd(state.cluster.tlog_generations) 256 264 # TODO: 2 -> (prev_generation.replication_factor - 1) 257 265 min_prev_tlogs = length(prev_generation.tlog_ids) - 2 ··· 262 270 elapsed_us = SimServer.current_time() - state.recovery_started_timestamp 263 271 enough_elapsed? = elapsed_us > @min_recovery_duration_us 264 272 265 - case has_enough_slots? and has_enough_prev_tlogs? and enough_elapsed? do 273 + case has_enough_prev_tlogs? and enough_elapsed? do 266 274 true -> recover(state) 267 275 false -> state 268 276 end ··· 271 279 defp bootstrap(%State{} = state) do 272 280 assert state.cluster.tlog_generations == [] 273 281 282 + supervisors_stateless = Enum.map(state.supervisors, fn {pid, %{stateless: slots}} -> {pid, slots} end) 283 + supervisors_tlogs = Enum.map(state.supervisors, fn {pid, %{tlog: slots}} -> {pid, slots} end) 284 + 285 + open_stateless_slots = Enum.sum_by(supervisors_stateless, fn {_pid, slots} -> slots end) 286 + open_tlog_slots = Enum.sum_by(supervisors_tlogs, fn {_pid, slots} -> slots end) 287 + 288 + assert open_stateless_slots >= 5 289 + assert open_tlog_slots >= state.config.num_replicas 290 + 291 + # 3 -> Sequencer, Resolver, Distributor 292 + {num_begin_buffers, num_commit_buffers} = compute_buffer_counts(open_stateless_slots - 3) 293 + 274 294 %Config{} = config = state.config 275 295 276 296 ids = allocate_server_ids(state, 277 297 storage: config.num_storage, 278 - tlog: config.num_tlogs, 279 - begin_buffer: config.num_begin_buffers, 280 - commit_buffer: config.num_commit_buffers, 298 + tlog: min(open_tlog_slots, config.num_tlogs), 299 + begin_buffer: min(num_begin_buffers, config.num_begin_buffers), 300 + commit_buffer: min(num_commit_buffers, config.num_commit_buffers), 281 301 sequencer: 1, resolver: 1, distributor: 1) 282 302 283 - storage_ids = ids.storage 284 - tlog_ids = ids.tlog 285 - begin_buffer_ids = ids.begin_buffer 286 - commit_buffer_ids = ids.commit_buffer 287 - [sequencer_id] = ids.sequencer 288 - [resolver_id] = ids.resolver 289 - [distributor_id] = ids.distributor 290 - 291 - first_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: 0, tlog_ids: tlog_ids} 303 + first_tlog_generation = %TLogGeneration{generation: state.cluster.generation, start_version: 0, tlog_ids: ids.tlog} 292 304 state = put_in(state.cluster.tlog_generations, [first_tlog_generation]) 293 305 294 306 # Create seed metadata needed by servers 295 - meta_pairs = build_seed_meta(config, storage_ids) 307 + meta_pairs = build_seed_meta(config, ids.storage) 296 308 key_servers_pairs = Enum.filter(meta_pairs, fn 297 309 {key_servers_prefix() <> _, _value} -> true 298 310 _ -> false ··· 301 313 302 314 cluster = state.cluster 303 315 gen = cluster.generation 316 + 317 + # Recruit storage 318 + state = recruit(state, gen, Hobbes.Servers.Storage, ids.storage, %{cluster: cluster}) 319 + 320 + # Recruit TLogs 304 321 state = 305 - state 306 - |> recruit(gen, Hobbes.Servers.Storage, storage_ids, %{cluster: cluster}) 307 - |> recruit(gen, Hobbes.Servers.TLog, tlog_ids, %{cluster: cluster, prev_version: 0, meta_pairs: meta_pairs}) 308 - |> recruit(gen, Hobbes.Servers.BeginBuffer, begin_buffer_ids, %{cluster: cluster}) 309 - |> recruit(gen, Hobbes.Servers.CommitBuffer, commit_buffer_ids, %{cluster: cluster, key_servers_pairs: key_servers_pairs}) 310 - |> recruit(gen, Hobbes.Servers.Sequencer, [sequencer_id], %{cluster: cluster, prev_version: 1}) 311 - |> recruit(gen, Hobbes.Servers.Resolver, [resolver_id], %{cluster: cluster, prev_version: 1}) 312 - |> recruit(gen, Hobbes.Servers.Distributor, [distributor_id], %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: [], next_shard_move_id: next_shard_move_id}) 322 + Enum.map(ids.tlog, fn id -> {Hobbes.Servers.TLog, id, %{cluster: cluster, prev_version: 0, meta_pairs: meta_pairs}} end) 323 + |> recruit_servers(supervisors_tlogs, gen) 324 + |> Enum.reduce(state, fn %Server{} = server, state -> 325 + put_in(state.cluster.servers[server.id], server) 326 + end) 327 + 328 + state = 329 + [ 330 + [{Hobbes.Servers.Sequencer, hd(ids.sequencer), %{cluster: cluster, prev_version: 1}}], 331 + [{Hobbes.Servers.Resolver, hd(ids.resolver), %{cluster: cluster, prev_version: 1}}], 332 + [{Hobbes.Servers.Distributor, hd(ids.distributor), %{cluster: cluster, key_servers_pairs: key_servers_pairs, shard_moves_pairs: [], next_shard_move_id: next_shard_move_id}}], 333 + Enum.map(ids.begin_buffer, fn id -> {Hobbes.Servers.BeginBuffer, id, %{cluster: cluster}} end), 334 + Enum.map(ids.commit_buffer, fn id -> {Hobbes.Servers.CommitBuffer, id, %{cluster: cluster, key_servers_pairs: key_servers_pairs}} end), 335 + ] 336 + |> Enum.concat() 337 + |> recruit_servers(supervisors_stateless, gen) 338 + |> Enum.reduce(state, fn %Server{} = server, state -> 339 + put_in(state.cluster.servers[server.id], server) 340 + end) 313 341 314 342 seed_meta_store(state, first_tlog_generation, meta_pairs) 315 343 ··· 358 386 assert open_stateless_slots >= 5 359 387 assert open_tlog_slots >= state.config.num_replicas 360 388 361 - ids = allocate_server_ids(state, tlog: open_tlog_slots, begin_buffer: 1, commit_buffer: 1, sequencer: 1, resolver: 1, distributor: 1) 389 + # 3 -> Sequencer, Resolver, Distributor 390 + {num_begin_buffers, num_commit_buffers} = compute_buffer_counts(open_stateless_slots - 3) 391 + 392 + %Config{} = config = state.config 393 + 394 + ids = allocate_server_ids(state, 395 + tlog: min(open_tlog_slots, config.num_tlogs), 396 + begin_buffer: min(num_begin_buffers, config.num_begin_buffers), 397 + commit_buffer: min(num_commit_buffers, config.num_commit_buffers), 398 + sequencer: 1, resolver: 1, distributor: 1) 362 399 363 400 # Create new TLog generation 364 401 last_generation_end_version = max_kcv ··· 626 663 end) 627 664 628 665 :ok 666 + end 667 + 668 + @spec compute_buffer_counts(pos_integer) :: {pos_integer, pos_integer} 669 + defp compute_buffer_counts(open_buffer_slots) do 670 + case open_buffer_slots do 671 + 2 -> 672 + {1, 1} 673 + n when n > 2 -> 674 + begin_count = div(n, 3) 675 + commit_count = n - begin_count 676 + {begin_count, commit_count} 677 + end 629 678 end 630 679 631 680 @spec allocate_server_ids(State.t, [{atom, pos_integer}]) :: %{atom => pos_integer}