this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add range clears

garrison baa3a4a2 50e63357

+73 -34
+4 -4
ROADMAP.md
··· 6 6 7 7 - [X] BeginBuffer server (batch get_read_version requests) 8 8 - [X] Use slots to spawn servers 9 - - [ ] Range clears 9 + - [X] Range clears 10 10 - [X] Support write conflict ranges in VersionMap (decreased bench_rw from 28k to 24k TPS) 11 - - [ ] Add clear_range() to Transaction 12 - - [ ] Slice range clear mutations in CommitBuffer 13 - - [ ] Apply range clears on Storage servers 11 + - [X] Add clear_range() to Transaction 12 + - [X] Slice range clear mutations in ShardTagMap 13 + - [X] Test range clears in Model workload 14 14 - [ ] Storage server handling 15 15 - [X] Spawn servers using slots in Manager 16 16 - [ ] Add unknown servers to meta keyspace in Distributor
+41 -25
lib/shard_tag_map.ex
··· 59 59 %{shard_map: shard_map, tlog_ids: all_tlog_ids} = stm 60 60 tlog_mutations = Map.new(stm.tlog_ids, fn id -> {id, []} end) 61 61 62 - Enum.reduce(mutations, {tlog_mutations, 0}, fn mut, {acc, i} -> 63 - {tlogs, tags} = 64 - case mutation_key(mut) do 65 - special_server_keys_prefix() <> rest -> 66 - # Special server_keys mutations are sent only to that particular Storage server 67 - [id, _sk] = unpack_server_keys_key(rest) 68 - tags = [id] 69 - # Send to all TLogs for simplicity (shard moves should never be a bottleneck) 70 - {all_tlog_ids, tags} 62 + Enum.reduce(mutations, {tlog_mutations, 0}, fn 63 + {:clear_range, sk, ek}, {acc, i} -> 64 + shards = DenseShardMap.shards_for_range(shard_map, sk, ek) 71 65 72 - meta_prefix() <> _ = key -> 73 - {_sk, _ek, {_tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 74 - # Additionally tag meta mutations with meta tag (-1) 75 - tags = [meta_tag() | tags] 76 - # Send meta mutations to all tlogs 77 - {all_tlog_ids, tags} 66 + # Split the range across shard boundaries and create a mutation for each 67 + intersect_ranges({sk, ek}, shards) 68 + |> Enum.reduce({acc, i}, fn {sk, ek, {tlogs, tags, _from}}, {acc, i} -> 69 + tagged_mut = {tags, {i, {:clear_range, sk, ek}}} 78 70 79 - key -> 80 - {_sk, _ek, {tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 81 - # Send to pre-computed tlogs/tags for this shard 82 - {tlogs, tags} 83 - end 71 + acc = Enum.reduce(tlogs, acc, fn tlog_id, acc -> 72 + Map.update!(acc, tlog_id, &[tagged_mut | &1]) 73 + end) 74 + {acc, i + 1} 75 + end) 84 76 85 - tagged_mut = {tags, {i, mut}} 86 - acc = Enum.reduce(tlogs, acc, fn tlog_id, acc -> 87 - Map.update!(acc, tlog_id, &[tagged_mut | &1]) 88 - end) 89 - {acc, i + 1} 77 + # Single-key mutations (:write and :clear) 78 + mut, {acc, i} -> 79 + {tlogs, tags} = 80 + case mutation_key(mut) do 81 + special_server_keys_prefix() <> rest -> 82 + # Special server_keys mutations are sent only to that particular Storage server 83 + [id, _sk] = unpack_server_keys_key(rest) 84 + tags = [id] 85 + # Send to all TLogs for simplicity (shard moves should never be a bottleneck) 86 + {all_tlog_ids, tags} 87 + 88 + meta_prefix() <> _ = key -> 89 + {_sk, _ek, {_tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 90 + # Additionally tag meta mutations with meta tag (-1) 91 + tags = [meta_tag() | tags] 92 + # Send meta mutations to all tlogs 93 + {all_tlog_ids, tags} 94 + 95 + key -> 96 + {_sk, _ek, {tlogs, tags, _from_tags}} = DenseShardMap.shard_for_key(shard_map, key) 97 + # Send to pre-computed tlogs/tags for this shard 98 + {tlogs, tags} 99 + end 100 + 101 + tagged_mut = {tags, {i, mut}} 102 + acc = Enum.reduce(tlogs, acc, fn tlog_id, acc -> 103 + Map.update!(acc, tlog_id, &[tagged_mut | &1]) 104 + end) 105 + {acc, i + 1} 90 106 end) 91 107 |> then(fn {tlog_mutations, _i} -> 92 108 Map.new(tlog_mutations, fn {tlog_id, mutations_reversed} -> {tlog_id, Enum.reverse(mutations_reversed)} end)
+8
lib/transaction.ex
··· 241 241 |> add_write_confict(key) 242 242 end 243 243 244 + @spec clear_range(TxnState.t, binary, binary) :: TxnState.t 245 + def clear_range(%TxnState{} = txn, start_key, end_key) 246 + when is_binary(start_key) and is_binary(end_key) and start_key < end_key do 247 + txn 248 + |> add_mutation({:clear_range, start_key, end_key}) 249 + |> add_write_confict({start_key, end_key}) 250 + end 251 + 244 252 @spec add_mutation(TxnState.t, Utils.mutation) :: TxnState.t 245 253 defp add_mutation(%TxnState{} = txn, mutation) do 246 254 %TxnState{txn | mutations: [mutation | txn.mutations]}
+6 -1
lib/utils.ex
··· 73 73 74 74 def meta_mutation?({:clear, meta_prefix() <> _}), do: true 75 75 def meta_mutation?({:clear, _key}), do: false 76 - # TODO: range clears 76 + 77 + def meta_mutation?({:clear_range, _sk, meta_prefix() <> _}), do: true 78 + def meta_mutation?({:clear_range, _sk, _ek}), do: false 77 79 78 80 @spec special_mutation?(mutation) :: boolean 79 81 def special_mutation?({:write, special_server_keys_prefix() <> _key, _value}), do: true ··· 81 83 82 84 def special_mutation?({:clear, special_server_keys_prefix() <> _key}), do: true 83 85 def special_mutation?({:clear, _key}), do: false 86 + 87 + # Special range clears are not currently supported 88 + def special_mutation?({:clear_range, _sk, _ek}), do: false 84 89 85 90 @spec pack_key_servers([integer], [integer]) :: binary 86 91 def pack_key_servers(from_ids, to_ids) when is_list(from_ids) and is_list(to_ids) do
+6 -1
lib/workloads/model.ex
··· 93 93 defp add_mutations(%TxnState{} = txn, mutations) do 94 94 Enum.reduce(mutations, txn, fn 95 95 {:write, key, value}, txn -> Transaction.write(txn, key, value) 96 + {:clear_range, sk, ek}, txn -> Transaction.clear_range(txn, sk, ek) 96 97 end) 97 98 end 98 99 ··· 129 130 count = Enum.random(1..10) 130 131 Enum.map(1..count, fn _i -> 131 132 case Enum.random(1..4) do 132 - # TODO: range clears 133 + 1 -> 134 + {sk, ek} = random_range(state) 135 + {:clear_range, sk, ek} 136 + 137 + # TODO: :clear 133 138 _ -> 134 139 {:write, random_key(state), random_key(state)} 135 140 end
+4 -2
lib/workloads/model/database_model.ex
··· 77 77 kv = 78 78 Enum.reduce(txn.mutations, dm.kv, fn 79 79 {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v) 80 - # TODO: :clear, :clear_range 80 + # TODO: clear 81 + {:clear_range, sk, ek}, kv -> TestKV.delete_range(kv, commit_version, sk, ek) 81 82 end) 82 83 83 84 write_conflicts = 84 85 Enum.map(txn.mutations, fn 85 86 {:write, k, _v} -> k 86 - # TODO: :clear, :clear_range 87 + # TODO: :clear 88 + {:clear_range, sk, ek} -> {sk, ek} 87 89 end) 88 90 vm = TestVersionMap.add_writes(dm.vm, commit_version, write_conflicts) 89 91
+4 -1
test/hobbes_test.exs
··· 21 21 22 22 @cluster_presets %{ 23 23 default: [], 24 + model: [ 25 + initial_shards: ["", "025", "050", "075"], 26 + ], 24 27 cycle: [ 25 28 initial_shards: ["", "key10"], 26 29 ], ··· 52 55 duration_ms: 10_000, 53 56 client_tick_ms: 100, 54 57 ]}, 55 - ], HobbesTest.SimOpts.sim_opts(name: test, preset: :cycle)) 58 + ], HobbesTest.SimOpts.sim_opts(name: test, preset: :model)) 56 59 end 57 60 end 58 61