Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1//! This crate provides and implementation of consistent hashing consistent with our go
2//! implementation to allow members in both to work compatibly.
3
4use serde::{Deserialize, Serialize};
5use std::collections::BTreeMap;
6use std::sync::Arc;
7use thiserror::Error;
8
9#[derive(Debug, Error)]
10#[error("Node already in ring.")]
11pub struct NodeAlreadyExists;
12
13#[derive(Debug, Error)]
14#[error("Node with the given node name does not exist within the ring")]
15pub struct NodeDoesNotExist;
16
17#[derive(Debug, Error)]
18#[error("The ring contains no nodes.")]
19pub struct RingEmpty;
20
21#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, PartialOrd, Ord)]
22pub struct Node {
23 /// The name of the node.
24 name: Arc<str>,
25
26 /// The number of replicas for this node.
27 num_replicas: u32,
28}
29
30impl Node {
31 pub fn new(name: impl Into<Arc<str>>, num_replicas: u32) -> Node {
32 Node {
33 name: name.into(),
34 num_replicas,
35 }
36 }
37
38 pub fn name(&self) -> &str {
39 &self.name
40 }
41
42 pub fn name_arc(&self) -> Arc<str> {
43 self.name.clone()
44 }
45
46 pub fn num_replicas(&self) -> u32 {
47 self.num_replicas
48 }
49}
50
51#[derive(Clone, Debug)]
52struct Item {
53 // Number is derived from a hash of the node's key
54 ring_index: u64,
55 // The node name that this item belongs to.
56 node_name: Arc<str>,
57}
58
59/// Provides consistent hashing for keys to nodes.
60#[derive(Debug, Default)]
61pub struct HashRing {
62 nodes: RingNodes,
63 items: RingItems,
64}
65
66#[derive(Default, Debug, Clone)]
67pub struct RingNodes(BTreeMap<Arc<str>, Node>);
68
69impl RingNodes {
70 pub fn with_nodes(nodes: impl IntoIterator<Item = Node>) -> Self {
71 Self(nodes.into_iter().map(|n| (n.name.clone(), n)).collect())
72 }
73
74 /// Collects the nodes into a vec ordered by node name.
75 pub fn collect_nodes(&self) -> Vec<Node> {
76 self.0.values().cloned().collect()
77 }
78
79 /// Adds a node to the collection, failing if a node with that name already exists.
80 pub fn add(&mut self, node: Node) -> Result<(), NodeAlreadyExists> {
81 if self.0.contains_key(&node.name) {
82 return Err(NodeAlreadyExists);
83 }
84
85 self.0.insert(node.name.clone(), node);
86 Ok(())
87 }
88
89 /// Removes a node from the collection by its name, failing if a node with that
90 /// name does not exist.
91 pub fn remove(&mut self, node_name: &str) -> Result<(), NodeDoesNotExist> {
92 self.0.remove(node_name).ok_or(NodeDoesNotExist)?;
93 Ok(())
94 }
95
96 fn build_ring_items(&self) -> RingItems {
97 let mut items = Vec::with_capacity(self.num_replicas());
98 for node in self.0.values() {
99 for idx in 0..node.num_replicas {
100 items.push(Item {
101 node_name: node.name.clone(),
102 ring_index: hash(format!("{}{}", node.name, idx)),
103 });
104 }
105 }
106 items.sort_by_key(|node| node.ring_index);
107
108 RingItems(items.into_boxed_slice())
109 }
110
111 /// Computes the sum of the replicas of all nodes in the collection.
112 fn num_replicas(&self) -> usize {
113 self.0.values().map(|node| node.num_replicas as usize).sum()
114 }
115
116 /// Returns the number of nodes in the collection.
117 fn len(&self) -> usize {
118 self.0.len()
119 }
120}
121
122#[derive(Default, Debug)]
123struct RingItems(Box<[Item]>);
124
125impl RingItems {
126 /// Finds the node index for the corresponding key.
127 ///
128 /// The returned index is guaranteed to be a valid index that can be safely used to index into the contained items array.
129 ///
130 /// Returns an error if the ring is empty - as an empty ring cannot have a valid index returned.
131 #[inline(always)]
132 fn find_index(&self, key: u64) -> Result<usize, RingEmpty> {
133 let len = self.len();
134 if len == 0 {
135 return Err(RingEmpty);
136 }
137
138 let index = match self.0.binary_search_by_key(&key, |item| item.ring_index) {
139 // We landed right on the node! What are the odds!?
140 // Since we are looking for the *next* node, advance the index by 1.
141 Ok(index) => index + 1,
142 Err(index) => index,
143 };
144
145 // Bounds checking on idx, check if it overflows and wrap around.
146 Ok(if index >= len { 0 } else { index })
147 }
148
149 /// Finds the corresponding [`Item`] for the given key.
150 ///
151 /// Returns an error if the ring is empty.
152 #[inline(always)]
153 fn find_next(&self, key: u64) -> Result<&Item, RingEmpty> {
154 let index = self.find_index(key)?;
155 // safety: index bounds are checked in `find_index`.
156 Ok(unsafe { self.0.get_unchecked(index) })
157 }
158
159 /// Returns the length of items within the hash ring items array.
160 #[inline(always)]
161 fn len(&self) -> usize {
162 self.0.len()
163 }
164
165 /// Computes the next index by adding 1, or wrapping around if the index overflows.
166 #[inline(always)]
167 fn next_index(&self, mut index: usize) -> usize {
168 index += 1;
169 if index >= self.len() {
170 0
171 } else {
172 index
173 }
174 }
175}
176
177impl HashRing {
178 /// Creates an empty hash ring with no nodes.
179 pub fn new() -> Self {
180 Default::default()
181 }
182
183 /// Constructs a HashRing given an iterator of nodes. If a node is duplicated within the iterator, the
184 /// last value will be used.
185 pub fn with_nodes(nodes: impl IntoIterator<Item = Node>) -> Self {
186 let nodes = RingNodes::with_nodes(nodes);
187 let items = nodes.build_ring_items();
188 Self { items, nodes }
189 }
190
191 /// Returns an immutable reference to the current nodes in the hash ring.
192 pub fn nodes(&self) -> &RingNodes {
193 &self.nodes
194 }
195
196 /// Replaces the nodes in the hash ring.
197 pub fn set_nodes(&mut self, nodes: impl IntoIterator<Item = Node>) {
198 *self = Self::with_nodes(nodes);
199 }
200
201 /// Adds a node to the hash ring.
202 pub fn add(&mut self, node: Node) -> Result<(), NodeAlreadyExists> {
203 self.nodes.add(node)?;
204 self.items = self.nodes.build_ring_items();
205 Ok(())
206 }
207
208 /// Removes a node from the hash ring.
209 pub fn remove(&mut self, node_name: impl AsRef<str>) -> Result<(), NodeDoesNotExist> {
210 self.nodes.remove(node_name.as_ref())?;
211 self.items = self.nodes.build_ring_items();
212 Ok(())
213 }
214
215 /// Finds a node for a given key.
216 #[inline(always)]
217 pub fn find(&self, key: impl AsRef<str>) -> Result<&Arc<str>, RingEmpty> {
218 self.items.find_next(hash(key)).map(|item| &item.node_name)
219 }
220
221 /// Finds `num` number of nodes for a given key.
222 pub fn find_n(&self, key: impl AsRef<str>, num: usize) -> Result<Vec<Arc<str>>, RingEmpty> {
223 let num = self.nodes.len().min(num);
224 let mut index = self.items.find_index(hash(key.as_ref()))?;
225
226 let mut nodes = Vec::with_capacity(num);
227 while nodes.len() < num {
228 // safety: bounds checking is done in `find_index` and `next_index`.
229 let item = unsafe { self.items.0.get_unchecked(index) };
230 index = self.items.next_index(index);
231
232 // Skip Nodes we have already seen.
233 if !nodes.contains(&item.node_name) {
234 nodes.push(item.node_name.clone());
235 }
236 }
237
238 Ok(nodes)
239 }
240}
241
242fn hash(key: impl AsRef<str>) -> u64 {
243 let digest = md5::compute(key.as_ref());
244
245 // This may seem sub-optimal, but actually, this is more fast than say doing
246 // u64::from_le_bytes((&digest[8..].try_into().unwrap()));
247 // or u64::from_ne_bytes([digest[8], digest[9], ..])
248 let low = u32::from(digest[11]) << 24
249 | u32::from(digest[10]) << 16
250 | u32::from(digest[9]) << 8
251 | u32::from(digest[8]);
252 let high = u32::from(digest[15]) << 24
253 | u32::from(digest[14]) << 16
254 | u32::from(digest[13]) << 8
255 | u32::from(digest[12]);
256 let mut key_int = u64::from(high);
257 key_int <<= 32;
258 key_int &= 0xffff_ffff_0000_0000;
259 key_int |= u64::from(low);
260 key_int
261}