Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

1//! This crate provides and implementation of consistent hashing consistent with our go 2//! implementation to allow members in both to work compatibly. 3 4use serde::{Deserialize, Serialize}; 5use std::collections::BTreeMap; 6use std::sync::Arc; 7use thiserror::Error; 8 9#[derive(Debug, Error)] 10#[error("Node already in ring.")] 11pub struct NodeAlreadyExists; 12 13#[derive(Debug, Error)] 14#[error("Node with the given node name does not exist within the ring")] 15pub struct NodeDoesNotExist; 16 17#[derive(Debug, Error)] 18#[error("The ring contains no nodes.")] 19pub struct RingEmpty; 20 21#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, PartialOrd, Ord)] 22pub struct Node { 23 /// The name of the node. 24 name: Arc<str>, 25 26 /// The number of replicas for this node. 27 num_replicas: u32, 28} 29 30impl Node { 31 pub fn new(name: impl Into<Arc<str>>, num_replicas: u32) -> Node { 32 Node { 33 name: name.into(), 34 num_replicas, 35 } 36 } 37 38 pub fn name(&self) -> &str { 39 &self.name 40 } 41 42 pub fn name_arc(&self) -> Arc<str> { 43 self.name.clone() 44 } 45 46 pub fn num_replicas(&self) -> u32 { 47 self.num_replicas 48 } 49} 50 51#[derive(Clone, Debug)] 52struct Item { 53 // Number is derived from a hash of the node's key 54 ring_index: u64, 55 // The node name that this item belongs to. 56 node_name: Arc<str>, 57} 58 59/// Provides consistent hashing for keys to nodes. 60#[derive(Debug, Default)] 61pub struct HashRing { 62 nodes: RingNodes, 63 items: RingItems, 64} 65 66#[derive(Default, Debug, Clone)] 67pub struct RingNodes(BTreeMap<Arc<str>, Node>); 68 69impl RingNodes { 70 pub fn with_nodes(nodes: impl IntoIterator<Item = Node>) -> Self { 71 Self(nodes.into_iter().map(|n| (n.name.clone(), n)).collect()) 72 } 73 74 /// Collects the nodes into a vec ordered by node name. 75 pub fn collect_nodes(&self) -> Vec<Node> { 76 self.0.values().cloned().collect() 77 } 78 79 /// Adds a node to the collection, failing if a node with that name already exists. 80 pub fn add(&mut self, node: Node) -> Result<(), NodeAlreadyExists> { 81 if self.0.contains_key(&node.name) { 82 return Err(NodeAlreadyExists); 83 } 84 85 self.0.insert(node.name.clone(), node); 86 Ok(()) 87 } 88 89 /// Removes a node from the collection by its name, failing if a node with that 90 /// name does not exist. 91 pub fn remove(&mut self, node_name: &str) -> Result<(), NodeDoesNotExist> { 92 self.0.remove(node_name).ok_or(NodeDoesNotExist)?; 93 Ok(()) 94 } 95 96 fn build_ring_items(&self) -> RingItems { 97 let mut items = Vec::with_capacity(self.num_replicas()); 98 for node in self.0.values() { 99 for idx in 0..node.num_replicas { 100 items.push(Item { 101 node_name: node.name.clone(), 102 ring_index: hash(format!("{}{}", node.name, idx)), 103 }); 104 } 105 } 106 items.sort_by_key(|node| node.ring_index); 107 108 RingItems(items.into_boxed_slice()) 109 } 110 111 /// Computes the sum of the replicas of all nodes in the collection. 112 fn num_replicas(&self) -> usize { 113 self.0.values().map(|node| node.num_replicas as usize).sum() 114 } 115 116 /// Returns the number of nodes in the collection. 117 fn len(&self) -> usize { 118 self.0.len() 119 } 120} 121 122#[derive(Default, Debug)] 123struct RingItems(Box<[Item]>); 124 125impl RingItems { 126 /// Finds the node index for the corresponding key. 127 /// 128 /// The returned index is guaranteed to be a valid index that can be safely used to index into the contained items array. 129 /// 130 /// Returns an error if the ring is empty - as an empty ring cannot have a valid index returned. 131 #[inline(always)] 132 fn find_index(&self, key: u64) -> Result<usize, RingEmpty> { 133 let len = self.len(); 134 if len == 0 { 135 return Err(RingEmpty); 136 } 137 138 let index = match self.0.binary_search_by_key(&key, |item| item.ring_index) { 139 // We landed right on the node! What are the odds!? 140 // Since we are looking for the *next* node, advance the index by 1. 141 Ok(index) => index + 1, 142 Err(index) => index, 143 }; 144 145 // Bounds checking on idx, check if it overflows and wrap around. 146 Ok(if index >= len { 0 } else { index }) 147 } 148 149 /// Finds the corresponding [`Item`] for the given key. 150 /// 151 /// Returns an error if the ring is empty. 152 #[inline(always)] 153 fn find_next(&self, key: u64) -> Result<&Item, RingEmpty> { 154 let index = self.find_index(key)?; 155 // safety: index bounds are checked in `find_index`. 156 Ok(unsafe { self.0.get_unchecked(index) }) 157 } 158 159 /// Returns the length of items within the hash ring items array. 160 #[inline(always)] 161 fn len(&self) -> usize { 162 self.0.len() 163 } 164 165 /// Computes the next index by adding 1, or wrapping around if the index overflows. 166 #[inline(always)] 167 fn next_index(&self, mut index: usize) -> usize { 168 index += 1; 169 if index >= self.len() { 170 0 171 } else { 172 index 173 } 174 } 175} 176 177impl HashRing { 178 /// Creates an empty hash ring with no nodes. 179 pub fn new() -> Self { 180 Default::default() 181 } 182 183 /// Constructs a HashRing given an iterator of nodes. If a node is duplicated within the iterator, the 184 /// last value will be used. 185 pub fn with_nodes(nodes: impl IntoIterator<Item = Node>) -> Self { 186 let nodes = RingNodes::with_nodes(nodes); 187 let items = nodes.build_ring_items(); 188 Self { items, nodes } 189 } 190 191 /// Returns an immutable reference to the current nodes in the hash ring. 192 pub fn nodes(&self) -> &RingNodes { 193 &self.nodes 194 } 195 196 /// Replaces the nodes in the hash ring. 197 pub fn set_nodes(&mut self, nodes: impl IntoIterator<Item = Node>) { 198 *self = Self::with_nodes(nodes); 199 } 200 201 /// Adds a node to the hash ring. 202 pub fn add(&mut self, node: Node) -> Result<(), NodeAlreadyExists> { 203 self.nodes.add(node)?; 204 self.items = self.nodes.build_ring_items(); 205 Ok(()) 206 } 207 208 /// Removes a node from the hash ring. 209 pub fn remove(&mut self, node_name: impl AsRef<str>) -> Result<(), NodeDoesNotExist> { 210 self.nodes.remove(node_name.as_ref())?; 211 self.items = self.nodes.build_ring_items(); 212 Ok(()) 213 } 214 215 /// Finds a node for a given key. 216 #[inline(always)] 217 pub fn find(&self, key: impl AsRef<str>) -> Result<&Arc<str>, RingEmpty> { 218 self.items.find_next(hash(key)).map(|item| &item.node_name) 219 } 220 221 /// Finds `num` number of nodes for a given key. 222 pub fn find_n(&self, key: impl AsRef<str>, num: usize) -> Result<Vec<Arc<str>>, RingEmpty> { 223 let num = self.nodes.len().min(num); 224 let mut index = self.items.find_index(hash(key.as_ref()))?; 225 226 let mut nodes = Vec::with_capacity(num); 227 while nodes.len() < num { 228 // safety: bounds checking is done in `find_index` and `next_index`. 229 let item = unsafe { self.items.0.get_unchecked(index) }; 230 index = self.items.next_index(index); 231 232 // Skip Nodes we have already seen. 233 if !nodes.contains(&item.node_name) { 234 nodes.push(item.node_name.clone()); 235 } 236 } 237 238 Ok(nodes) 239 } 240} 241 242fn hash(key: impl AsRef<str>) -> u64 { 243 let digest = md5::compute(key.as_ref()); 244 245 // This may seem sub-optimal, but actually, this is more fast than say doing 246 // u64::from_le_bytes((&digest[8..].try_into().unwrap())); 247 // or u64::from_ne_bytes([digest[8], digest[9], ..]) 248 let low = u32::from(digest[11]) << 24 249 | u32::from(digest[10]) << 16 250 | u32::from(digest[9]) << 8 251 | u32::from(digest[8]); 252 let high = u32::from(digest[15]) << 24 253 | u32::from(digest[14]) << 16 254 | u32::from(digest[13]) << 8 255 | u32::from(digest[12]); 256 let mut key_int = u64::from(high); 257 key_int <<= 32; 258 key_int &= 0xffff_ffff_0000_0000; 259 key_int |= u64::from(low); 260 key_int 261}