this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Destroy old TLogs once they are no longer needed

garrison 9922ea77 36cbb32e

+168 -26
+16 -6
lib/servers/coordinator.ex
··· 188 188 request(server, {Commands, :write_generation, [gen]}) 189 189 end 190 190 191 + @spec delete_generations(server, non_neg_integer, [non_neg_integer]) :: :ok | {:error, :wrong_generation} | request_error 192 + def delete_generations(server, generation, delete_generations) 193 + when is_integer(generation) and is_list(delete_generations) do 194 + request(server, {Commands, :delete_generations, [generation, delete_generations]}) 195 + end 196 + 191 197 @spec allocate_ids(server, non_neg_integer, pos_integer) :: {:ok, {non_neg_integer, non_neg_integer}} | request_error 192 198 def allocate_ids(server, generation, count) when is_integer(generation) and is_integer(count) do 193 199 request(server, {Commands, :allocate_ids, [generation, count]}) ··· 665 671 scan = fn sk, ek -> scan(state, sk, ek) end 666 672 667 673 {writes, result} = apply(m, f, [get, scan, a]) 668 - Enum.each(writes, fn w -> 669 - assert {k, v} = w 670 - assert is_binary(k) 671 - assert is_binary(v) 672 - end) 674 + #Enum.each(writes, fn w -> 675 + # assert {k, v} = w 676 + # assert is_binary(k) 677 + # assert is_binary(v) 678 + #end) 673 679 674 680 {writes, result} 675 681 end 676 682 677 683 @spec apply_writes(State.t, list) :: State.t 678 684 defp apply_writes(%State{} = state, writes) when is_list(writes) do 679 - mutations = Enum.map(writes, fn {k, v} -> {:write, k, v} end) 685 + # TODO: get rid of this 686 + mutations = Enum.map(writes, fn 687 + {:clear, k} -> {:clear, k} 688 + {k, v} -> {:write, k, v} 689 + end) 680 690 apply_batch_to_storage(state, mutations) 681 691 end 682 692
+20
lib/servers/coordinator/commands.ex
··· 3 3 alias Hobbes.Structs.TLogGeneration 4 4 5 5 import Hobbes.Utils 6 + import ExUnit.Assertions, only: [assert: 1] 6 7 7 8 def write(_get, _scan, [pairs]) when is_list(pairs) do 8 9 {pairs, :ok} ··· 65 66 end 66 67 67 68 _other -> {[], {:error, :wrong_generation}} 69 + end 70 + end 71 + 72 + def delete_generations(get, _scan, [generation, delete_generations]) do 73 + gen_str = Integer.to_string(generation) 74 + 75 + case get.("generation") do 76 + ^gen_str -> 77 + mutations = Enum.map(delete_generations, fn gen -> 78 + assert is_integer(gen) 79 + {:clear, pack_tlog_generation_key(gen)} 80 + end) 81 + 82 + { 83 + mutations, 84 + :ok 85 + } 86 + 87 + _ -> {[], {:error, :wrong_generation}} 68 88 end 69 89 end 70 90
+52
lib/servers/manager.ex
··· 25 25 generation: non_neg_integer | nil, 26 26 supervisors: [pid], 27 27 recovered_tlogs: [TLogStatus.t], 28 + tlog_statuses: %{non_neg_integer => TLogStatus.t}, 28 29 config: ClusterConfig.t, 29 30 cluster: Cluster.t, 30 31 server_monitors: %{reference => non_neg_integer}, ··· 37 38 generation: nil, 38 39 supervisors: [], 39 40 recovered_tlogs: [], 41 + tlog_statuses: %{}, 40 42 config: nil, 41 43 cluster: nil, 42 44 server_monitors: %{}, ··· 214 216 # We keep adding old TLogs back to the cluster as they come back, even if recovery is already complete 215 217 # Otherwise the cluster could get stuck as Storage servers might never learn their pids 216 218 state = add_tlog_to_cluster(state, tlog_status) 219 + state = gc_tlog_generations(state) 220 + 217 221 state 218 222 end 219 223 ··· 250 254 end 251 255 252 256 defp add_tlog_to_cluster(%State{} = state, %TLogStatus{} = status) do 257 + state = put_in(state.tlog_statuses[status.id], status) 253 258 case Map.has_key?(state.cluster.servers, status.id) do 254 259 false -> 255 260 server = %Server{type: Hobbes.Servers.TLog, id: status.id, pid: status.pid} ··· 257 262 258 263 true -> state 259 264 end 265 + end 266 + 267 + defp gc_tlog_generations(%State{} = state) do 268 + assert state.cluster.status == :normal 269 + 270 + tlog_statuses = state.tlog_statuses 271 + tlog_generations = state.cluster.tlog_generations 272 + 273 + Enum.reduce(tlog_generations, [], fn %TLogGeneration{} = gen, acc -> 274 + case generation_complete?(gen, tlog_statuses) do 275 + true -> [gen.generation | acc] 276 + false -> acc 277 + end 278 + end) 279 + |> case do 280 + [_ | _] = delete_generations -> 281 + case Coordinator.delete_generations(state.primary_coordinator, state.cluster.generation, delete_generations) do 282 + :ok -> :noop 283 + {:error, _err} = error -> 284 + SimLogger.debug "Failed to delete generations due to #{inspect(error)}, shutting down" 285 + exit(:shutdown) 286 + end 287 + 288 + tlog_generations = Enum.reject(tlog_generations, fn gen -> gen.generation in delete_generations end) 289 + assert length(tlog_generations) > 0 290 + 291 + put_in(state.cluster.tlog_generations, tlog_generations) 292 + 293 + [] -> 294 + state 295 + end 296 + end 297 + 298 + defp generation_complete?(%TLogGeneration{end_version: nil} = _gen, _tlog_statuses), do: false 299 + 300 + defp generation_complete?(%TLogGeneration{} = gen, tlog_statuses) when is_map(tlog_statuses) do 301 + assert is_integer(gen.end_version) 302 + teams = Enum.chunk_every(gen.tlog_ids, gen.replication_factor, gen.replication_factor, :discard) 303 + 304 + Enum.all?(teams, fn ids -> 305 + Enum.any?(ids, fn id -> 306 + case Map.get(tlog_statuses, id) do 307 + %TLogStatus{min_popped_version: min_pv} -> min_pv >= gen.end_version 308 + nil -> false 309 + end 310 + end) 311 + end) 260 312 end 261 313 262 314 defp add_supervisor(%State{} = state, %SupervisorStatus{slots: nil} = _status) do
+72 -19
lib/servers/tlog.ex
··· 6 6 import ExUnit.Assertions, only: [assert: 1] 7 7 8 8 alias Hobbes.Servers.Manager 9 - alias Hobbes.Structs.{Cluster, TLogStatus, LogBatch, PeekResult} 9 + alias Hobbes.Structs.{Cluster, TLogGeneration, TLogStatus, LogBatch, PeekResult} 10 10 alias Hobbes.Encoding.Keyset 11 11 12 12 alias Hobbes.XKS ··· 61 61 62 62 @type tlog_state :: %{ 63 63 id: non_neg_integer, 64 + generation: non_neg_integer, 64 65 version: non_neg_integer, 65 66 known_committed_version: non_neg_integer, 66 67 locked?: boolean, ··· 158 159 assert is_integer(prev_version) 159 160 assert is_list(meta_pairs) 160 161 162 + generation = cluster.generation 163 + assert is_integer(generation) 164 + assert generation >= 0 165 + 161 166 xks = XKS.new([ 162 167 path: path, 163 168 block_size: c_block_size(), ··· 187 192 # Put and commit the initial state of the TLog 188 193 state_fields = %{ 189 194 id: id, 195 + generation: cluster.generation, 190 196 version: prev_version, 191 197 known_committed_version: prev_version, 192 198 locked?: false, ··· 267 273 state = %State{ 268 274 id: state_fields.id, 269 275 cluster: cluster, 270 - generation: cluster.generation, 276 + generation: state_fields.generation, 271 277 272 278 locked?: state_fields.locked?, 273 279 version: state_fields.version, ··· 290 296 # Sanity check persistent state 291 297 assert is_integer(state.id) 292 298 assert state.id >= 0 299 + assert is_integer(state.generation) 300 + assert state.generation >= 0 293 301 assert is_integer(state.version) 294 302 assert state.version >= 0 295 303 assert is_integer(state.known_committed_version) ··· 408 416 {:noreply, tick(state)} 409 417 end 410 418 411 - def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do 412 - case cluster.generation >= state.cluster.generation do 413 - true -> {:noreply, %{state | cluster: cluster}} 414 - false -> {:noreply, state} 419 + def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) 420 + when cluster.generation < state.cluster.generation do 421 + # Ignore older cluster 422 + {:noreply, state} 423 + end 424 + 425 + def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) when cluster.generation >= state.cluster.generation do 426 + assert cluster.generation >= state.cluster.generation 427 + 428 + %TLogGeneration{generation: max_gen} = Enum.max_by(cluster.tlog_generations, &(&1.generation)) 429 + newer_gen? = max_gen > state.generation 430 + 431 + id = state.id 432 + missing? = Enum.all?(cluster.tlog_generations, fn %TLogGeneration{} = gen -> 433 + id not in gen.tlog_ids 434 + end) 435 + 436 + case newer_gen? and missing? do 437 + true -> 438 + SimLogger.debug "TLog (id=#{state.id}, gen=#{state.generation}) is missing from tlog_generations (max_gen=#{max_gen}), deleting and shutting down" 439 + :ok = XKS.delete(state.xks) 440 + exit(:shutdown) 441 + 442 + false -> 443 + {:noreply, %{state | cluster: cluster}} 415 444 end 416 445 end 417 446 ··· 421 450 end 422 451 423 452 defp tick(%State{} = state) do 453 + %{ 454 + id: id, 455 + locked?: locked?, 456 + known_committed_version: known_committed_version, 457 + version: version, 458 + } = state 459 + min_popped_version = compute_min_popped_version(state) 460 + assert min_popped_version >= -1 461 + 424 462 status = %TLogStatus{ 425 - id: state.id, 463 + id: id, 426 464 pid: self(), 427 - locked?: state.locked?, 428 - known_committed_version: state.known_committed_version, 429 - durable_version: state.version, 465 + locked?: locked?, 466 + known_committed_version: known_committed_version, 467 + durable_version: version, 468 + min_popped_version: min_popped_version, 430 469 } 431 470 Manager.tlog_ping(state.cluster.manager_pid, status) 432 471 ··· 488 527 defp get_tlog_state(%XKS{} = xks) do 489 528 << 490 529 id::integer-64, 530 + generation::integer-64, 491 531 version::integer-64, 492 532 known_committed_version::integer-64, 493 533 locked_byte::integer-8, ··· 498 538 0x00 -> false 499 539 0x01 -> true 500 540 end 541 + 542 + assert id >= 0 501 543 assert version >= 0 544 + assert generation >= 0 502 545 assert known_committed_version >= 0 503 546 504 547 %{ 505 548 id: id, 549 + generation: generation, 506 550 version: version, 507 551 known_committed_version: known_committed_version, 508 552 locked?: locked?, ··· 513 557 defp put_tlog_state(%XKS{} = xks, fields) do 514 558 %{ 515 559 id: id, 560 + generation: generation, 516 561 version: version, 517 562 known_committed_version: known_committed_version, 518 563 locked?: locked?, 519 564 } = fields 520 565 assert id >= 0 566 + assert generation >= 0 521 567 assert version >= 0 522 568 assert known_committed_version >= 0 523 569 ··· 529 575 530 576 state_bytes = << 531 577 id::integer-64, 578 + generation::integer-64, 532 579 version::integer-64, 533 580 known_committed_version::integer-64, 534 581 locked_byte::integer-8, ··· 544 591 545 592 xks = 546 593 xks 547 - |> put_tlog_state(Map.take(state, [:id, :version, :known_committed_version, :locked?])) 594 + |> put_tlog_state(Map.take(state, [:id, :generation, :version, :known_committed_version, :locked?])) 548 595 |> XKS.commit() 549 596 550 597 %{state | xks: xks} ··· 673 720 xks: xks, 674 721 version: version, 675 722 current_extent: %{i: current_extent_i}, 676 - virtual_to_real_tags: virtual_to_real_tags, 677 - vtag_popped_versions: vtag_popped_versions, 678 723 } = state 679 724 680 - min_popped_version = 681 - virtual_to_real_tags 682 - # Map order violates determinism but order won't affect min() 683 - |> Map.keys() 684 - |> Enum.map(&Map.get(vtag_popped_versions, &1, -1)) 685 - |> Enum.min() 725 + min_popped_version = compute_min_popped_version(state) 686 726 assert min_popped_version >= -1 687 727 688 728 start_key = pack_extent_key(%{i: 0}) ··· 699 739 XKS.FreeList.release_extent(xks.free_list, extent_index) 700 740 {:clear, pack_extent_key(extent)} 701 741 end) 742 + end 743 + 744 + defp compute_min_popped_version(%State{} = state) do 745 + %{ 746 + virtual_to_real_tags: virtual_to_real_tags, 747 + vtag_popped_versions: vtag_popped_versions, 748 + } = state 749 + 750 + virtual_to_real_tags 751 + # Map order violates determinism but order won't affect min() 752 + |> Map.keys() 753 + |> Enum.map(&Map.get(vtag_popped_versions, &1, -1)) 754 + |> Enum.min() 702 755 end 703 756 704 757 # mutations_size + tag + commit_version + known_committed_version + prev_checksum + entry_checksum
+2
lib/structs.ex
··· 73 73 locked?: boolean, 74 74 known_committed_version: non_neg_integer, 75 75 durable_version: non_neg_integer, 76 + min_popped_version: non_neg_integer | -1, 76 77 } 77 78 @enforce_keys [ 78 79 :id, ··· 80 81 :locked?, 81 82 :known_committed_version, 82 83 :durable_version, 84 + :min_popped_version, 83 85 ] 84 86 defstruct @enforce_keys 85 87 end
+6 -1
lib/utils.ex
··· 173 173 @spec pack_tlog_generation_pair(TLogGeneration.t) :: {binary, binary} 174 174 def pack_tlog_generation_pair(%TLogGeneration{} = gen) do 175 175 { 176 - "gen/" <> Keyset.pack([gen.generation]), 176 + pack_tlog_generation_key(gen.generation), 177 177 Keyset.pack([gen.start_version, gen.replication_factor, gen.tlog_ids]), 178 178 } 179 + end 180 + 181 + @spec pack_tlog_generation_key(non_neg_integer) :: binary 182 + def pack_tlog_generation_key(generation) when is_integer(generation) do 183 + "gen/" <> Keyset.pack([generation]) 179 184 end 180 185 181 186 @spec unpack_tlog_generation_pair({binary, binary}) :: TLogGeneration.t