Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 91 lines 2.9 kB view raw
1use crate::discovery::service::ServiceRegistration; 2use crate::discovery::watcher::ServiceWatcher; 3use crate::etcd::Client; 4use anyhow::Result; 5 6#[derive(Clone, Debug)] 7pub struct GlobalLimiter<T = Normal> { 8 /// service watcher to keep track of cluster size 9 service_watcher: ServiceWatcher, 10 /// Limiter type is either [`Normal`] or [`Member`]. 11 limiter_type: T, 12} 13 14// type used for unowned limiter, cannot determine self share 15#[derive(Clone, Debug)] 16pub struct Normal; 17 18// type used for owned limiter, can determine self share 19#[derive(Clone, Debug)] 20pub struct Member { 21 /// service id for this node 22 self_id: String, 23} 24 25impl GlobalLimiter<Normal> { 26 /// Watch a service's instances for the calculation of per-node limits. 27 /// 28 /// # Arguments 29 /// 30 /// * `name` - the service name 31 /// * `client` - etcd client 32 /// * `ring` - wether the service uses a ring config 33 pub async fn watch<N: Into<String>>(name: N, client: Client, ring: bool) -> Result<Self> { 34 let service_watcher = ServiceWatcher::watch(name, client, ring).await?; 35 36 let global_limiter = Self { 37 service_watcher, 38 limiter_type: Normal, 39 }; 40 41 Ok(global_limiter) 42 } 43} 44 45impl GlobalLimiter<Member> { 46 /// Watch this service's instances for the calculation of per-node limits, implies that a ring config is used. 47 /// 48 /// # Arguments 49 /// 50 /// * `self_registration` - this node's service registration 51 /// * `client` - etcd client 52 pub async fn watch_as_member( 53 self_registration: &ServiceRegistration, 54 client: Client, 55 ) -> Result<Self> { 56 let service_watcher = 57 ServiceWatcher::watch(self_registration.name.clone(), client, true).await?; 58 59 let global_limiter = Self { 60 service_watcher, 61 limiter_type: Member { 62 self_id: self_registration.id(), 63 }, 64 }; 65 66 Ok(global_limiter) 67 } 68 69 /// Get this node's share, based on the amount of the ring owned by this node 70 pub fn get_self_share(&self, global_limit: f64) -> Option<f64> { 71 self.get_node_share(global_limit, &self.limiter_type.self_id) 72 } 73} 74 75impl<T> GlobalLimiter<T> { 76 /// Get the equal share, based on the number of instances 77 pub fn get_equal_share(&self, global_limit: f64) -> Option<f64> { 78 match self.service_watcher.num_instances() { 79 num_instances if num_instances > 0 => Some(global_limit / (num_instances as f64)), 80 _ => None, 81 } 82 } 83 84 /// Get the given node's share, based on the amount of the ring owned by that node 85 pub fn get_node_share(&self, global_limit: f64, service_id: impl AsRef<str>) -> Option<f64> { 86 match self.service_watcher.ring_share(service_id) { 87 ring_share if ring_share > 0.0 => Some(global_limit * ring_share), 88 _ => None, 89 } 90 } 91}