this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Group mutations by tag when slicing

garrison 5e97ec46 7ff1fd0a

+53 -58
+1
lib/servers/manager.ex
··· 591 591 %{prev_version: prev_version, commit_version: commit_version} = batch 592 592 593 593 # Unwrap mutations 594 + # TODO: no, don't 594 595 mutations = Enum.map(batch.mutations, fn {_i, mut} -> mut end) 595 596 596 597 meta_mutations = Enum.filter(mutations, &meta_mutation?/1)
+1 -2
lib/servers/storage.ex
··· 518 518 end 519 519 520 520 defp apply_batches(%State{} = state, batches) when is_list(batches) do 521 - Enum.reduce(batches, state, fn {commit_version, numbered_mutations}, %State{} = state -> 522 - mutations = Enum.map(numbered_mutations, fn {_i, mut} -> mut end) 521 + Enum.reduce(batches, state, fn {commit_version, mutations}, %State{} = state -> 523 522 # TODO: update state.data_version after each batch? 524 523 apply_batch(state, commit_version, mutations) 525 524 end)
+2 -15
lib/servers/tlog.ex
··· 450 450 end 451 451 452 452 defp append_batch(%State{} = state, %LogBatch{} = batch) do 453 - # TODO: mutations should come from CommitBuffer grouped this way 454 - groups = group_mutations(batch.tagged_mutations) 453 + groups = batch.tagged_mutations 455 454 state = do_append_groups(groups, batch.commit_version, state) 456 455 457 456 XKS.maybe_rotate_memtable(state.xks, @meta_partition) ··· 501 500 502 501 defp maybe_write_meta({meta_tag(), mutations}, commit_version, %State{} = state) do 503 502 %State{xks: xks} = state 504 - 505 - unwrapped = Enum.map(mutations, fn {_i, mut} -> mut end) 506 - XKS.apply_batch(xks, @meta_partition, commit_version, unwrapped) 503 + XKS.apply_batch(xks, @meta_partition, commit_version, mutations) 507 504 508 505 state 509 506 end 510 507 511 508 defp maybe_write_meta(_group, _commit_version, state), do: state 512 - 513 - defp group_mutations(tagged_mutations) do 514 - Enum.reduce(tagged_mutations, %{}, fn {tags, mut}, acc -> 515 - Enum.reduce(tags, acc, fn t, acc -> 516 - Map.update(acc, t, [mut], &[mut | &1]) 517 - end) 518 - end) 519 - |> Enum.sort_by(&elem(&1, 0)) 520 - |> Enum.map(fn {k, v} -> {k, Enum.reverse(v)} end) 521 - end 522 509 523 510 defp pop_tag(%State{} = state, tag) do 524 511 %State{
+49 -41
lib/shard_tag_map.ex
··· 91 91 92 92 @spec tag_and_slice_mutations(t, [Utils.mutation]) :: %{non_neg_integer => [Utils.tagged_mutation]} 93 93 def tag_and_slice_mutations(%ShardTagMap{} = stm, mutations) when is_list(mutations) do 94 - %{shard_map: shard_map, tlog_ids: all_tlog_ids} = stm 95 - tlog_mutations = Map.new(stm.tlog_ids, fn id -> {id, []} end) 94 + %{shard_map: shard_map, tlog_ids: tlog_ids, replication_factor: rf} = stm 95 + tlog_teams = Enum.chunk_every(tlog_ids, rf, rf, :discard) 96 + tlog_team_count = length(tlog_teams) 97 + 98 + tag_mutations = group_mutations_by_tag(shard_map, mutations) 99 + 100 + tlog_batches = Map.new(tlog_ids, fn id -> {id, []} end) 101 + Enum.reduce(tag_mutations, tlog_batches, fn {tag, _mutations} = tup, acc -> 102 + tlog_ids = 103 + case tag do 104 + # Send meta mutations to all TLogs 105 + meta_tag() -> tlog_ids 106 + tag -> Enum.at(tlog_teams, rem(tag, tlog_team_count)) 107 + end 108 + 109 + Enum.reduce(tlog_ids, acc, fn tid, acc -> 110 + Map.update!(acc, tid, &[tup | &1]) 111 + end) 112 + end) 113 + end 96 114 97 - Enum.reduce(mutations, {tlog_mutations, 0}, fn 98 - {:clear_range, sk, ek}, {acc, i} -> 99 - shards = DenseShardMap.shards_for_range(shard_map, sk, ek) 115 + defp group_mutations_by_tag(shard_map, mutations) do 116 + Enum.reduce(mutations, %{}, fn 117 + {:clear_range, start_key, end_key}, acc -> 118 + shards = DenseShardMap.shards_for_range(shard_map, start_key, end_key) 100 119 101 - # Split the range across shard boundaries and create a mutation for each 102 - intersect_ranges({sk, ek}, shards) 103 - |> Enum.reduce({acc, i}, fn {sk, ek, {tlogs, tags, _from}}, {acc, i} -> 104 - tagged_mut = {tags, {i, {:clear_range, sk, ek}}} 120 + intersected = intersect_ranges({start_key, end_key}, shards) 105 121 106 - acc = Enum.reduce(tlogs, acc, fn tlog_id, acc -> 107 - Map.update!(acc, tlog_id, &[tagged_mut | &1]) 122 + Enum.reduce(intersected, acc, fn sh, acc -> 123 + {sk, ek, {_tlogs, tags, _from_tags}} = sh 124 + mut = {:clear_range, sk, ek} 125 + Enum.reduce(tags, acc, fn t, acc -> 126 + Map.update(acc, t, [mut], &[mut | &1]) 108 127 end) 109 - {acc, i + 1} 110 128 end) 111 129 112 - # Single-key mutations (:write and :clear) 113 - mut, {acc, i} -> 114 - {tlogs, tags} = 115 - case mutation_key(mut) do 116 - special_storage_keys_prefix() <> rest -> 117 - # Special storage_keys mutations are sent only to that particular Storage team 118 - [team_id, _sk] = unpack_server_keys_key(rest) 119 - tags = [team_id] 120 - # Send to all TLogs for simplicity (shard moves should never be a bottleneck) 121 - {all_tlog_ids, tags} 130 + mut, acc -> 131 + tags = case mutation_key(mut) do 132 + special_storage_keys_prefix() <> rest -> 133 + [team_id, _sk] = unpack_server_keys_key(rest) 134 + [team_id] 122 135 123 - meta_prefix() <> _ = key -> 124 - {_sk, _ek, {_tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 125 - # Additionally tag meta mutations with meta tag (-1) 126 - tags = [meta_tag() | tags] 127 - # Send meta mutations to all tlogs 128 - {all_tlog_ids, tags} 136 + meta_prefix() <> _ = key -> 137 + {_sk, _ek, {_tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 138 + [meta_tag() | tags] 129 139 130 - key -> 131 - {_sk, _ek, {tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 132 - # Send to pre-computed tlogs/tags for this shard 133 - {tlogs, tags} 134 - end 135 - 136 - tagged_mut = {tags, {i, mut}} 137 - acc = Enum.reduce(tlogs, acc, fn tlog_id, acc -> 138 - Map.update!(acc, tlog_id, &[tagged_mut | &1]) 140 + key -> 141 + {_sk, _ek, {_tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 142 + tags 143 + end 144 + Enum.reduce(tags, acc, fn t, acc -> 145 + Map.update(acc, t, [mut], &[mut | &1]) 139 146 end) 140 - {acc, i + 1} 141 147 end) 142 - |> then(fn {tlog_mutations, _i} -> 143 - Map.new(tlog_mutations, fn {tlog_id, mutations_reversed} -> {tlog_id, Enum.reverse(mutations_reversed)} end) 144 - end) 148 + |> Map.new(fn {tag, mutations} -> {tag, Enum.reverse(mutations)} end) 149 + # Groups are sorted desc because they will be reversed in the reduce above 150 + # The group tag order does not matter for correctness but it's easier to read debug logs 151 + # when things are the right way around 152 + |> Enum.sort_by(&elem(&1, 0), :desc) 145 153 end 146 154 147 155 @spec shards_for_key_or_range(t, binary | {binary, binary}) :: [{binary, binary, {list, Utils.tag_list, Utils.tag_list}}]