Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use crate::cached_futures::CachedFutures;
4use crate::metrics::counters::StaticCounter;
5use crate::tokio_utils::AbortOnDrop;
6use anyhow::Result;
7use parking_lot::RwLock;
8use tokio::{sync::mpsc, task::JoinHandle, time::interval};
9
10use crate::etcd_watcherd::{
11 KeyWatchEvents, RecursiveKeyWatchEvents, WatchEvents, WatcherInner, WatcherLoop, WatcherStats,
12 WatcherSubscription, WatcherSubscriptionId,
13};
14
15const WATCHER_SWEEP_INTERVAL: Duration = Duration::from_secs(120);
16
17#[derive(Clone, Debug, Default)]
18pub(crate) struct ActiveWatchers {
19 pub(crate) key: CachedFutures<String, WatchHandle<KeyWatchEvents>, Arc<crate::etcd::EtcdError>>,
20 pub(crate) recursive_key:
21 CachedFutures<String, WatchHandle<RecursiveKeyWatchEvents>, Arc<crate::etcd::EtcdError>>,
22}
23
24#[derive(Clone, Debug)]
25pub struct Watcher {
26 client: crate::etcd::Client,
27 _sweeper_abort_on_drop: Arc<AbortOnDrop<()>>,
28 pub(crate) active_watchers: ActiveWatchers,
29 pub(crate) stats: Arc<WatcherStats>,
30}
31
32impl Watcher {
33 /// Creates a new [`Watcher`], using the given etcd client.
34 ///
35 /// Ideally for the lifespan of your application, you should use a single watcher instance.
36 ///
37 /// This watcher instance will multiplex all watches to etcd, so it makes senes to use it as a
38 /// singleton within your application.
39 pub fn new(client: crate::etcd::Client) -> Self {
40 let active_watchers = ActiveWatchers::default();
41 let stats = Arc::new(WatcherStats::default());
42 let sweeper_abort_on_drop = Arc::new(AbortOnDrop::new(tokio::task::spawn(
43 Self::sweep_watchers(active_watchers.clone(), stats.clone()),
44 )));
45
46 Self {
47 client,
48 active_watchers,
49 _sweeper_abort_on_drop: sweeper_abort_on_drop,
50 stats,
51 }
52 }
53
54 /// This background task is responsible for cleaning up watchers that no longer are being actively used.
55 async fn sweep_watchers(active_watchers: ActiveWatchers, stats: Arc<WatcherStats>) {
56 let mut interval = interval(WATCHER_SWEEP_INTERVAL);
57 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
58
59 loop {
60 interval.tick().await;
61
62 let mut watchers_swept = 0;
63 watchers_swept += active_watchers
64 .key
65 .retain_ready(|handle| handle.num_handles() > 1);
66 watchers_swept += active_watchers
67 .recursive_key
68 .retain_ready(|handle| handle.num_handles() > 1);
69
70 stats.watchers_swept.incr_by(watchers_swept as _);
71 }
72 }
73
74 /// Watches a single key for changes.
75 ///
76 /// Returns a [`WatchHandle`] that can be used to acquire the current value of the key at any given time,
77 /// or subscribe to real time updates to that key.
78 pub async fn watch_key(&self, key: impl Into<String>) -> Result<WatchHandle<KeyWatchEvents>> {
79 let key = key.into();
80 let client = self.client.clone();
81 let stats = self.stats.clone();
82 let watcher_future =
83 self.active_watchers
84 .key
85 .get_or_cache_default(key, move |key| async move {
86 use crate::etcd_watcherd::watcher_impl::key_watcher::KeyWatcher;
87 let watcher_impl = KeyWatcher::new(client, key);
88 let handle = WatcherLoop::start(watcher_impl, stats)
89 .await
90 .map_err(Arc::new)?;
91 Ok(handle)
92 });
93
94 watcher_future
95 .await
96 .map_err(|e| anyhow::anyhow!("etcd error: {:?}", e))
97 }
98
99 /// Watches the keys within a directory for changes.
100 ///
101 /// Returns a [`WatchHandle`] that can be used to acquire the current value of the keys within the directory
102 /// at any given time, or subscribe to real time updates to that key.
103 pub async fn watch_key_recursive(
104 &self,
105 key: impl Into<String>,
106 ) -> Result<WatchHandle<RecursiveKeyWatchEvents>> {
107 let key = key.into();
108 let client = self.client.clone();
109 let stats = self.stats.clone();
110 let watcher_future = self.active_watchers.recursive_key.get_or_cache_default(
111 key,
112 move |key| async move {
113 use crate::etcd_watcherd::watcher_impl::recursive_key_watcher::RecursiveKeyWatcher;
114 let watcher_impl = RecursiveKeyWatcher::new(client, key);
115 let handle = WatcherLoop::start(watcher_impl, stats)
116 .await
117 .map_err(Arc::new)?;
118 Ok(handle)
119 },
120 );
121
122 watcher_future
123 .await
124 .map_err(|e| anyhow::anyhow!("etcd error: {:?}", e))
125 }
126}
127
128/// Returned by the [`Watcher::watch_key`] or [`Watcher::watch_key_recursive`] methods.
129///
130/// Allows you to read the key's current value with [`WatchHandle::current_value`], and also subscribe to updates
131/// to the key using [`WatchHandle::events`].
132#[derive(Clone)]
133pub struct WatchHandle<E>
134where
135 E: WatchEvents,
136{
137 /// Holds an arc to the WatcherInner that contains the state of the watched key, and the
138 /// means to establish additional subscriptions.
139 inner: Arc<RwLock<WatcherInner<E>>>,
140 /// A join handle to the background task that is keeping the watcher synced with etcd.
141 watcher_loop_abort_on_drop: Arc<AbortOnDrop<()>>,
142}
143
144impl<E> WatchHandle<E>
145where
146 E: WatchEvents,
147{
148 pub(crate) fn new(
149 inner: Arc<RwLock<WatcherInner<E>>>,
150 watcher_loop_join_handle: JoinHandle<()>,
151 ) -> Self {
152 Self {
153 inner,
154 watcher_loop_abort_on_drop: Arc::new(AbortOnDrop::new(watcher_loop_join_handle)),
155 }
156 }
157
158 /// Returns the current value of the key. This is a snapshotted value, meaning that it is immutable.
159 pub fn current_value(&self) -> E::Value {
160 self.inner.read().current_value().clone()
161 }
162
163 /// Consumes the [`WatchHandle`] and returns a [`WatcherEvents`], which allows your application to be notified of changes to watched keys in real time.
164 ///
165 /// We will try and emit the minimal set of events and eliminate duplicate events that would net in no effective state change. What this means is
166 /// that if a key is set to the same value multiple times in etcd, where as etcd might update the watcher internally, we will not emit these events
167 /// as they result in no effective change of state.
168 ///
169 /// # Example:
170 /// ```no_run
171 /// # use anyhow::Result;
172 /// # use etcd_watcherd::{Watcher, KeyWatchEvents};
173 /// # async fn example() -> Result<()> {
174 /// let watcher = Watcher::new(crate::etcd::Client::from_etcd_peers()?);
175 /// let mut events = watcher.watch_key("/hello").await?.events();
176 /// loop {
177 /// let KeyWatchEvents::FullSync { value } = events.next().await;
178 /// println!("key updated: {:?}", value);
179 /// }
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub fn events(self) -> WatcherEvents<E> {
184 let WatcherSubscription {
185 subscription_id,
186 receiver,
187 full_sync_event,
188 } = self.inner.write().create_subscription();
189
190 WatcherEvents {
191 handle: self,
192 subscription_id,
193 initial_event: Some(Box::new(full_sync_event)),
194 receiver,
195 }
196 }
197
198 /// Returns the number of active subscribers for this watch handle.
199 pub(crate) fn num_subscribers(&self) -> usize {
200 self.inner.read().num_subscribers()
201 }
202
203 /// Returns the number of copies of this watch handle that are currently active in the
204 /// program.
205 fn num_handles(&self) -> usize {
206 // We are using the strong count of the loop's abort on drop, since this will correspond 1:1
207 // with the number of copies this handle has active.
208 Arc::strong_count(&self.watcher_loop_abort_on_drop)
209 }
210}
211
212/// Returned by [`WatchHandle::events`].
213///
214/// Acts as a stream of events that can be consumed to be kept up-to-date on the changes to an etcd key
215/// in real time.
216pub struct WatcherEvents<E>
217where
218 E: WatchEvents,
219{
220 handle: WatchHandle<E>,
221 subscription_id: WatcherSubscriptionId,
222 receiver: mpsc::Receiver<E>,
223 // a box is chosen here to reduce the size of the struct, as this will be empty most of the time,
224 // as this event is consumed by the first call to [`WatcherEvents::next`], or [`WatcherEvents::initial_event`].
225 initial_event: Option<Box<E>>,
226}
227
228impl<E> Drop for WatcherEvents<E>
229where
230 E: WatchEvents,
231{
232 fn drop(&mut self) {
233 let mut inner = self.handle.inner.write();
234 inner.remove_subscription(self.subscription_id);
235 }
236}
237
238impl<E> WatcherEvents<E>
239where
240 E: WatchEvents,
241{
242 /// Returns the "initial watch event", which is either [`crate::RecursiveKeyWatchEvents::FullSync`] or [`crate::KeyWatchEvents::FullSync`]
243 /// depending on what kind of watcher you have initialized (recursive or not).
244 ///
245 /// # Panics
246 /// This method must be called before the first call to `next`, and must only be called once. If these invariants are not upheld, this method
247 /// will cause a panic.
248 pub fn initial_event(&mut self) -> E {
249 *self.initial_event.take().expect("invariant: `initial_event` can only be called once, and before the first call to `next`.")
250 }
251
252 /// Returns a new copy of the [`WatchHandle`] that this [`WatcherEvents`] is using.
253 pub fn handle(&self) -> WatchHandle<E> {
254 self.handle.clone()
255 }
256
257 /// Awaits a the next key change event.
258 ///
259 /// If `initial_event` was not called, this method will return immediately the first time called with the initial event, otherwise
260 /// will wait until an event is received.
261 ///
262 /// It is *safe* to drop this future and re-call it, (for example, wrapping this with a `timeout`, or using the `select!` macro.)
263 /// No events will be dropped if this future is cancelled, as long as the return value is not dropped.
264 pub async fn next(&mut self) -> E {
265 if let Some(event) = self.initial_event.take() {
266 return *event;
267 }
268
269 match self.receiver.recv().await {
270 Some(event) => event,
271 // The subscription has ended, which means we've fallen too far behind,so we'll automatically re-create the subscription,
272 // and emit a full sync event.
273 None => self.recreate_subscription(),
274 }
275 }
276
277 fn recreate_subscription(&mut self) -> E {
278 let WatcherSubscription {
279 subscription_id,
280 receiver,
281 full_sync_event,
282 } = self
283 .handle
284 .inner
285 .write()
286 .recreate_subscription(self.subscription_id);
287
288 self.receiver = receiver;
289 self.subscription_id = subscription_id;
290 full_sync_event
291 }
292}