Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 292 lines 11 kB view raw
1use std::{fmt::Debug, sync::Arc, time::Duration}; 2 3use crate::cached_futures::CachedFutures; 4use crate::metrics::counters::StaticCounter; 5use crate::tokio_utils::AbortOnDrop; 6use anyhow::Result; 7use parking_lot::RwLock; 8use tokio::{sync::mpsc, task::JoinHandle, time::interval}; 9 10use crate::etcd_watcherd::{ 11 KeyWatchEvents, RecursiveKeyWatchEvents, WatchEvents, WatcherInner, WatcherLoop, WatcherStats, 12 WatcherSubscription, WatcherSubscriptionId, 13}; 14 15const WATCHER_SWEEP_INTERVAL: Duration = Duration::from_secs(120); 16 17#[derive(Clone, Debug, Default)] 18pub(crate) struct ActiveWatchers { 19 pub(crate) key: CachedFutures<String, WatchHandle<KeyWatchEvents>, Arc<crate::etcd::EtcdError>>, 20 pub(crate) recursive_key: 21 CachedFutures<String, WatchHandle<RecursiveKeyWatchEvents>, Arc<crate::etcd::EtcdError>>, 22} 23 24#[derive(Clone, Debug)] 25pub struct Watcher { 26 client: crate::etcd::Client, 27 _sweeper_abort_on_drop: Arc<AbortOnDrop<()>>, 28 pub(crate) active_watchers: ActiveWatchers, 29 pub(crate) stats: Arc<WatcherStats>, 30} 31 32impl Watcher { 33 /// Creates a new [`Watcher`], using the given etcd client. 34 /// 35 /// Ideally for the lifespan of your application, you should use a single watcher instance. 36 /// 37 /// This watcher instance will multiplex all watches to etcd, so it makes senes to use it as a 38 /// singleton within your application. 39 pub fn new(client: crate::etcd::Client) -> Self { 40 let active_watchers = ActiveWatchers::default(); 41 let stats = Arc::new(WatcherStats::default()); 42 let sweeper_abort_on_drop = Arc::new(AbortOnDrop::new(tokio::task::spawn( 43 Self::sweep_watchers(active_watchers.clone(), stats.clone()), 44 ))); 45 46 Self { 47 client, 48 active_watchers, 49 _sweeper_abort_on_drop: sweeper_abort_on_drop, 50 stats, 51 } 52 } 53 54 /// This background task is responsible for cleaning up watchers that no longer are being actively used. 55 async fn sweep_watchers(active_watchers: ActiveWatchers, stats: Arc<WatcherStats>) { 56 let mut interval = interval(WATCHER_SWEEP_INTERVAL); 57 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 58 59 loop { 60 interval.tick().await; 61 62 let mut watchers_swept = 0; 63 watchers_swept += active_watchers 64 .key 65 .retain_ready(|handle| handle.num_handles() > 1); 66 watchers_swept += active_watchers 67 .recursive_key 68 .retain_ready(|handle| handle.num_handles() > 1); 69 70 stats.watchers_swept.incr_by(watchers_swept as _); 71 } 72 } 73 74 /// Watches a single key for changes. 75 /// 76 /// Returns a [`WatchHandle`] that can be used to acquire the current value of the key at any given time, 77 /// or subscribe to real time updates to that key. 78 pub async fn watch_key(&self, key: impl Into<String>) -> Result<WatchHandle<KeyWatchEvents>> { 79 let key = key.into(); 80 let client = self.client.clone(); 81 let stats = self.stats.clone(); 82 let watcher_future = 83 self.active_watchers 84 .key 85 .get_or_cache_default(key, move |key| async move { 86 use crate::etcd_watcherd::watcher_impl::key_watcher::KeyWatcher; 87 let watcher_impl = KeyWatcher::new(client, key); 88 let handle = WatcherLoop::start(watcher_impl, stats) 89 .await 90 .map_err(Arc::new)?; 91 Ok(handle) 92 }); 93 94 watcher_future 95 .await 96 .map_err(|e| anyhow::anyhow!("etcd error: {:?}", e)) 97 } 98 99 /// Watches the keys within a directory for changes. 100 /// 101 /// Returns a [`WatchHandle`] that can be used to acquire the current value of the keys within the directory 102 /// at any given time, or subscribe to real time updates to that key. 103 pub async fn watch_key_recursive( 104 &self, 105 key: impl Into<String>, 106 ) -> Result<WatchHandle<RecursiveKeyWatchEvents>> { 107 let key = key.into(); 108 let client = self.client.clone(); 109 let stats = self.stats.clone(); 110 let watcher_future = self.active_watchers.recursive_key.get_or_cache_default( 111 key, 112 move |key| async move { 113 use crate::etcd_watcherd::watcher_impl::recursive_key_watcher::RecursiveKeyWatcher; 114 let watcher_impl = RecursiveKeyWatcher::new(client, key); 115 let handle = WatcherLoop::start(watcher_impl, stats) 116 .await 117 .map_err(Arc::new)?; 118 Ok(handle) 119 }, 120 ); 121 122 watcher_future 123 .await 124 .map_err(|e| anyhow::anyhow!("etcd error: {:?}", e)) 125 } 126} 127 128/// Returned by the [`Watcher::watch_key`] or [`Watcher::watch_key_recursive`] methods. 129/// 130/// Allows you to read the key's current value with [`WatchHandle::current_value`], and also subscribe to updates 131/// to the key using [`WatchHandle::events`]. 132#[derive(Clone)] 133pub struct WatchHandle<E> 134where 135 E: WatchEvents, 136{ 137 /// Holds an arc to the WatcherInner that contains the state of the watched key, and the 138 /// means to establish additional subscriptions. 139 inner: Arc<RwLock<WatcherInner<E>>>, 140 /// A join handle to the background task that is keeping the watcher synced with etcd. 141 watcher_loop_abort_on_drop: Arc<AbortOnDrop<()>>, 142} 143 144impl<E> WatchHandle<E> 145where 146 E: WatchEvents, 147{ 148 pub(crate) fn new( 149 inner: Arc<RwLock<WatcherInner<E>>>, 150 watcher_loop_join_handle: JoinHandle<()>, 151 ) -> Self { 152 Self { 153 inner, 154 watcher_loop_abort_on_drop: Arc::new(AbortOnDrop::new(watcher_loop_join_handle)), 155 } 156 } 157 158 /// Returns the current value of the key. This is a snapshotted value, meaning that it is immutable. 159 pub fn current_value(&self) -> E::Value { 160 self.inner.read().current_value().clone() 161 } 162 163 /// Consumes the [`WatchHandle`] and returns a [`WatcherEvents`], which allows your application to be notified of changes to watched keys in real time. 164 /// 165 /// We will try and emit the minimal set of events and eliminate duplicate events that would net in no effective state change. What this means is 166 /// that if a key is set to the same value multiple times in etcd, where as etcd might update the watcher internally, we will not emit these events 167 /// as they result in no effective change of state. 168 /// 169 /// # Example: 170 /// ```no_run 171 /// # use anyhow::Result; 172 /// # use etcd_watcherd::{Watcher, KeyWatchEvents}; 173 /// # async fn example() -> Result<()> { 174 /// let watcher = Watcher::new(crate::etcd::Client::from_etcd_peers()?); 175 /// let mut events = watcher.watch_key("/hello").await?.events(); 176 /// loop { 177 /// let KeyWatchEvents::FullSync { value } = events.next().await; 178 /// println!("key updated: {:?}", value); 179 /// } 180 /// # Ok(()) 181 /// # } 182 /// ``` 183 pub fn events(self) -> WatcherEvents<E> { 184 let WatcherSubscription { 185 subscription_id, 186 receiver, 187 full_sync_event, 188 } = self.inner.write().create_subscription(); 189 190 WatcherEvents { 191 handle: self, 192 subscription_id, 193 initial_event: Some(Box::new(full_sync_event)), 194 receiver, 195 } 196 } 197 198 /// Returns the number of active subscribers for this watch handle. 199 pub(crate) fn num_subscribers(&self) -> usize { 200 self.inner.read().num_subscribers() 201 } 202 203 /// Returns the number of copies of this watch handle that are currently active in the 204 /// program. 205 fn num_handles(&self) -> usize { 206 // We are using the strong count of the loop's abort on drop, since this will correspond 1:1 207 // with the number of copies this handle has active. 208 Arc::strong_count(&self.watcher_loop_abort_on_drop) 209 } 210} 211 212/// Returned by [`WatchHandle::events`]. 213/// 214/// Acts as a stream of events that can be consumed to be kept up-to-date on the changes to an etcd key 215/// in real time. 216pub struct WatcherEvents<E> 217where 218 E: WatchEvents, 219{ 220 handle: WatchHandle<E>, 221 subscription_id: WatcherSubscriptionId, 222 receiver: mpsc::Receiver<E>, 223 // a box is chosen here to reduce the size of the struct, as this will be empty most of the time, 224 // as this event is consumed by the first call to [`WatcherEvents::next`], or [`WatcherEvents::initial_event`]. 225 initial_event: Option<Box<E>>, 226} 227 228impl<E> Drop for WatcherEvents<E> 229where 230 E: WatchEvents, 231{ 232 fn drop(&mut self) { 233 let mut inner = self.handle.inner.write(); 234 inner.remove_subscription(self.subscription_id); 235 } 236} 237 238impl<E> WatcherEvents<E> 239where 240 E: WatchEvents, 241{ 242 /// Returns the "initial watch event", which is either [`crate::RecursiveKeyWatchEvents::FullSync`] or [`crate::KeyWatchEvents::FullSync`] 243 /// depending on what kind of watcher you have initialized (recursive or not). 244 /// 245 /// # Panics 246 /// This method must be called before the first call to `next`, and must only be called once. If these invariants are not upheld, this method 247 /// will cause a panic. 248 pub fn initial_event(&mut self) -> E { 249 *self.initial_event.take().expect("invariant: `initial_event` can only be called once, and before the first call to `next`.") 250 } 251 252 /// Returns a new copy of the [`WatchHandle`] that this [`WatcherEvents`] is using. 253 pub fn handle(&self) -> WatchHandle<E> { 254 self.handle.clone() 255 } 256 257 /// Awaits a the next key change event. 258 /// 259 /// If `initial_event` was not called, this method will return immediately the first time called with the initial event, otherwise 260 /// will wait until an event is received. 261 /// 262 /// It is *safe* to drop this future and re-call it, (for example, wrapping this with a `timeout`, or using the `select!` macro.) 263 /// No events will be dropped if this future is cancelled, as long as the return value is not dropped. 264 pub async fn next(&mut self) -> E { 265 if let Some(event) = self.initial_event.take() { 266 return *event; 267 } 268 269 match self.receiver.recv().await { 270 Some(event) => event, 271 // The subscription has ended, which means we've fallen too far behind,so we'll automatically re-create the subscription, 272 // and emit a full sync event. 273 None => self.recreate_subscription(), 274 } 275 } 276 277 fn recreate_subscription(&mut self) -> E { 278 let WatcherSubscription { 279 subscription_id, 280 receiver, 281 full_sync_event, 282 } = self 283 .handle 284 .inner 285 .write() 286 .recreate_subscription(self.subscription_id); 287 288 self.receiver = receiver; 289 self.subscription_id = subscription_id; 290 full_sync_event 291 } 292}