Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use crate::discovery::service::ServiceRegistration;
2use crate::discovery::watcher::ServiceWatcher;
3use crate::etcd::Client;
4use anyhow::Result;
5
6#[derive(Clone, Debug)]
7pub struct GlobalLimiter<T = Normal> {
8 /// service watcher to keep track of cluster size
9 service_watcher: ServiceWatcher,
10 /// Limiter type is either [`Normal`] or [`Member`].
11 limiter_type: T,
12}
13
14// type used for unowned limiter, cannot determine self share
15#[derive(Clone, Debug)]
16pub struct Normal;
17
18// type used for owned limiter, can determine self share
19#[derive(Clone, Debug)]
20pub struct Member {
21 /// service id for this node
22 self_id: String,
23}
24
25impl GlobalLimiter<Normal> {
26 /// Watch a service's instances for the calculation of per-node limits.
27 ///
28 /// # Arguments
29 ///
30 /// * `name` - the service name
31 /// * `client` - etcd client
32 /// * `ring` - wether the service uses a ring config
33 pub async fn watch<N: Into<String>>(name: N, client: Client, ring: bool) -> Result<Self> {
34 let service_watcher = ServiceWatcher::watch(name, client, ring).await?;
35
36 let global_limiter = Self {
37 service_watcher,
38 limiter_type: Normal,
39 };
40
41 Ok(global_limiter)
42 }
43}
44
45impl GlobalLimiter<Member> {
46 /// Watch this service's instances for the calculation of per-node limits, implies that a ring config is used.
47 ///
48 /// # Arguments
49 ///
50 /// * `self_registration` - this node's service registration
51 /// * `client` - etcd client
52 pub async fn watch_as_member(
53 self_registration: &ServiceRegistration,
54 client: Client,
55 ) -> Result<Self> {
56 let service_watcher =
57 ServiceWatcher::watch(self_registration.name.clone(), client, true).await?;
58
59 let global_limiter = Self {
60 service_watcher,
61 limiter_type: Member {
62 self_id: self_registration.id(),
63 },
64 };
65
66 Ok(global_limiter)
67 }
68
69 /// Get this node's share, based on the amount of the ring owned by this node
70 pub fn get_self_share(&self, global_limit: f64) -> Option<f64> {
71 self.get_node_share(global_limit, &self.limiter_type.self_id)
72 }
73}
74
75impl<T> GlobalLimiter<T> {
76 /// Get the equal share, based on the number of instances
77 pub fn get_equal_share(&self, global_limit: f64) -> Option<f64> {
78 match self.service_watcher.num_instances() {
79 num_instances if num_instances > 0 => Some(global_limit / (num_instances as f64)),
80 _ => None,
81 }
82 }
83
84 /// Get the given node's share, based on the amount of the ring owned by that node
85 pub fn get_node_share(&self, global_limit: f64, service_id: impl AsRef<str>) -> Option<f64> {
86 match self.service_watcher.ring_share(service_id) {
87 ring_share if ring_share > 0.0 => Some(global_limit * ring_share),
88 _ => None,
89 }
90 }
91}