Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::backoff_utils::{AsyncBackoff, Config as AsyncBackoffConfig};
4use crate::metrics::counters::StaticCounter;
5use parking_lot::RwLock;
6use tokio::{sync::mpsc, time::sleep};
7use tracing::{error, info};
8
9use crate::etcd_watcherd::{
10 WatchEvents, WatchHandle, WatchResult, WatchState, WatcherImpl, WatcherStats,
11};
12
13pub(crate) struct WatcherInner<E>
14where
15 E: WatchEvents,
16{
17 #[allow(dead_code)]
18 key: String,
19 current_value: E::Value,
20 next_subscription_id: usize,
21 watchers: HashMap<WatcherSubscriptionId, mpsc::Sender<E>>,
22 stats: Arc<WatcherStats>,
23}
24
25pub(crate) struct WatcherSubscription<E>
26where
27 E: WatchEvents,
28{
29 pub subscription_id: WatcherSubscriptionId,
30 pub receiver: mpsc::Receiver<E>,
31 pub full_sync_event: E,
32}
33
34/// A subscription id is created by [`WatcherInner::create_subscription`] and is used to identify
35/// a given subscription within the system.
36#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
37pub(crate) struct WatcherSubscriptionId(usize);
38
39impl<E> WatcherInner<E>
40where
41 E: WatchEvents,
42{
43 /// Get a reference to the watcher inner's current value.
44 pub(crate) fn current_value(&self) -> &E::Value {
45 &self.current_value
46 }
47
48 /// Creates a watcher subscription, which holds a snapshot of the current value, and a receiver which
49 /// will receive a stream of events as the watched key is updated.
50 pub(crate) fn create_subscription(&mut self) -> WatcherSubscription<E> {
51 self.stats.subscriptions_created.incr();
52 self.create_subscription_impl()
53 }
54
55 fn create_subscription_impl(&mut self) -> WatcherSubscription<E> {
56 let subscription_id = WatcherSubscriptionId(self.next_subscription_id);
57 self.next_subscription_id += 1;
58 let (sender, receiver) = mpsc::channel(E::WATCHER_MAX_BACKLOG_SIZE);
59 self.watchers.insert(subscription_id, sender);
60 WatcherSubscription {
61 subscription_id,
62 receiver,
63 full_sync_event: E::full_sync_event(self.current_value.clone()),
64 }
65 }
66
67 fn update_subscribers(&mut self, events: Vec<E>, current_value: &<E as WatchEvents>::Value) {
68 self.current_value = current_value.clone();
69
70 // No watchers or events, so no further dispatching to do.
71 if self.watchers.is_empty() || events.is_empty() {
72 return;
73 }
74
75 // We have events to dispatch to their respective watchers.
76 for event in events {
77 let prev_watchers_len = self.watchers.len();
78
79 // Try to write to all senders, retaining only the ones that have not overflowed. The ones that have overflowed will need to be
80 // restarted, as they missed events. This will trigger a full-sync. This is to make sure if a receiver goes pathologically
81 // behind, we'll terminate it, and it must restart watching.
82 self.watchers
83 .retain(|_, sender| sender.try_send(event.clone()).is_ok());
84
85 let watchers_len = self.watchers.len();
86 let watchers_removed = prev_watchers_len - watchers_len;
87 self.stats
88 .subscriptions_reset
89 .incr_by(watchers_removed as _);
90 self.stats.subscriptions_notified.incr_by(watchers_len as _);
91 }
92 }
93
94 pub(crate) fn remove_subscription(&mut self, watcher_id: WatcherSubscriptionId) {
95 if self.watchers.remove(&watcher_id).is_some() {
96 self.stats.subscriptions_removed.incr();
97 }
98 }
99
100 pub(crate) fn recreate_subscription(
101 &mut self,
102 watcher_id: WatcherSubscriptionId,
103 ) -> WatcherSubscription<E> {
104 self.stats.subscriptions_recreated.incr();
105 self.watchers.remove(&watcher_id);
106 self.create_subscription_impl()
107 }
108
109 pub(crate) fn num_subscribers(&self) -> usize {
110 self.watchers.len()
111 }
112}
113
114pub(crate) struct WatcherLoop<I>
115where
116 I: WatcherImpl,
117{
118 inner: Arc<RwLock<WatcherInner<I::Events>>>,
119 watcher_impl: I,
120 backoff: AsyncBackoff,
121 stats: Arc<WatcherStats>,
122}
123
124impl<I> WatcherLoop<I>
125where
126 I: WatcherImpl,
127{
128 /// Starts the watcher loop, resolving once the loop has started, and an initial value has been read from etcd.
129 pub(crate) async fn start(
130 watcher_impl: I,
131 stats: Arc<WatcherStats>,
132 ) -> Result<WatchHandle<I::Events>, crate::etcd::EtcdError> {
133 // Begin watching by synchronizing with etcd.
134 let initial_watch_state = match watcher_impl.begin_watching().await {
135 Ok(initial_watch_state) => initial_watch_state,
136 Err(err) => {
137 error!(
138 "watcher {:?} failed to start (error: {:?})",
139 watcher_impl, err
140 );
141 stats.watcher_start_fails.incr();
142 return Err(err);
143 }
144 };
145
146 let key = watcher_impl.key().to_owned();
147 let backoff = AsyncBackoff::new(AsyncBackoffConfig {
148 min_delay: Duration::from_secs(5),
149 max_delay: Duration::from_secs(60),
150 multiplier: 2.0,
151 });
152
153 let watcher = Self {
154 watcher_impl,
155 backoff,
156 stats: stats.clone(),
157 inner: Arc::new(RwLock::new(WatcherInner {
158 current_value: initial_watch_state.current_value.clone(),
159 next_subscription_id: 0,
160 watchers: HashMap::new(),
161 stats,
162 key,
163 })),
164 };
165
166 let inner = watcher.inner.clone();
167 let join_handle = tokio::task::spawn(watcher.run_loop(initial_watch_state));
168 let watch_handle = WatchHandle::new(inner, join_handle);
169
170 Ok(watch_handle)
171 }
172
173 /// The background task that runs the watcher loop fsm.
174 async fn run_loop(
175 mut self,
176 initial_watch_state: WatchState<<I::Events as WatchEvents>::Value>,
177 ) {
178 let mut watch_loop_state = WatchLoopState::ContinueWatching(initial_watch_state);
179 self.stats.watcher_start_success.incr();
180 info!("Watcher (impl={:?} has started)", &self.watcher_impl);
181
182 loop {
183 let (next_watch_loop_state, events) = self.run_loop_once(watch_loop_state).await;
184 watch_loop_state = next_watch_loop_state;
185 self.inner
186 .write()
187 .update_subscribers(events, watch_loop_state.current_value());
188 }
189 }
190
191 /// Runs one iteration of the watcher loop, returning the events to be emitted, and the next watch loop state that this function will be called with.
192 async fn run_loop_once(
193 &mut self,
194 watch_loop_state: WatchLoopState<<I::Events as WatchEvents>::Value>,
195 ) -> (
196 WatchLoopState<<I::Events as WatchEvents>::Value>,
197 Vec<I::Events>,
198 ) {
199 let (watch_result, prev_watch_state) = match watch_loop_state {
200 WatchLoopState::ContinueWatching(prev_watch_state) => {
201 let watch_result = self.watcher_impl.continue_watching(&prev_watch_state).await;
202
203 match watch_result {
204 // The watch was a success!
205 Ok(result) => (result, prev_watch_state),
206 // The watch failed, we'll backoff and try again.
207 Err(err) => {
208 self.log_and_sleep_after_error(err).await;
209 return (WatchLoopState::ContinueWatching(prev_watch_state), vec![]);
210 }
211 }
212 }
213 WatchLoopState::ResetWatcher(prev_watch_state) => {
214 match self
215 .watcher_impl
216 .reset_watcher(&prev_watch_state.current_value)
217 .await
218 {
219 // The watch reset was successful, which means we can continue the watcher now.
220 Ok(watch_result) => (watch_result, prev_watch_state),
221 // Watch reset was unsuccessful, we'll need to retry resetting.
222 Err(err) => {
223 self.log_and_sleep_after_error(err).await;
224 return (WatchLoopState::ResetWatcher(prev_watch_state), vec![]);
225 }
226 }
227 }
228 };
229
230 self.backoff.succeed();
231 match watch_result {
232 // The watch was successful, and state will be updated to the next state.
233 WatchResult::Success { events, state } => {
234 self.stats.watch_success.incr();
235 let next_state = WatchLoopState::ContinueWatching(state);
236 (next_state, events)
237 }
238 // The watch has timed out, so we'll retry the watch with the same state as before.
239 WatchResult::Timeout => {
240 self.stats.watch_timeout.incr();
241 (WatchLoopState::ContinueWatching(prev_watch_state), vec![])
242 }
243 // The watch needs to be reset. Inner watcher state has not changed.
244 WatchResult::ResetWatcher => {
245 self.stats.watch_reset.incr();
246 (WatchLoopState::ResetWatcher(prev_watch_state), vec![])
247 }
248 }
249 }
250
251 async fn log_and_sleep_after_error(&mut self, error: crate::etcd::EtcdError) {
252 let delay = self.backoff.fail();
253 error!(
254 "Watcher for key {} encountered an etcd error ({:?}), will retry after {:?}",
255 self.watcher_impl.key(),
256 error,
257 delay
258 );
259 self.stats.watch_etcd_errors.incr();
260 sleep(delay).await;
261 }
262}
263
264pub(crate) enum WatchLoopState<V> {
265 ContinueWatching(WatchState<V>),
266 ResetWatcher(WatchState<V>),
267}
268
269impl<V> WatchLoopState<V> {
270 fn current_value(&self) -> &V {
271 match self {
272 WatchLoopState::ResetWatcher(watch_state) => &watch_state.current_value,
273 WatchLoopState::ContinueWatching(watch_state) => &watch_state.current_value,
274 }
275 }
276}