Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 276 lines 10 kB view raw
1use std::{collections::HashMap, sync::Arc, time::Duration}; 2 3use crate::backoff_utils::{AsyncBackoff, Config as AsyncBackoffConfig}; 4use crate::metrics::counters::StaticCounter; 5use parking_lot::RwLock; 6use tokio::{sync::mpsc, time::sleep}; 7use tracing::{error, info}; 8 9use crate::etcd_watcherd::{ 10 WatchEvents, WatchHandle, WatchResult, WatchState, WatcherImpl, WatcherStats, 11}; 12 13pub(crate) struct WatcherInner<E> 14where 15 E: WatchEvents, 16{ 17 #[allow(dead_code)] 18 key: String, 19 current_value: E::Value, 20 next_subscription_id: usize, 21 watchers: HashMap<WatcherSubscriptionId, mpsc::Sender<E>>, 22 stats: Arc<WatcherStats>, 23} 24 25pub(crate) struct WatcherSubscription<E> 26where 27 E: WatchEvents, 28{ 29 pub subscription_id: WatcherSubscriptionId, 30 pub receiver: mpsc::Receiver<E>, 31 pub full_sync_event: E, 32} 33 34/// A subscription id is created by [`WatcherInner::create_subscription`] and is used to identify 35/// a given subscription within the system. 36#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] 37pub(crate) struct WatcherSubscriptionId(usize); 38 39impl<E> WatcherInner<E> 40where 41 E: WatchEvents, 42{ 43 /// Get a reference to the watcher inner's current value. 44 pub(crate) fn current_value(&self) -> &E::Value { 45 &self.current_value 46 } 47 48 /// Creates a watcher subscription, which holds a snapshot of the current value, and a receiver which 49 /// will receive a stream of events as the watched key is updated. 50 pub(crate) fn create_subscription(&mut self) -> WatcherSubscription<E> { 51 self.stats.subscriptions_created.incr(); 52 self.create_subscription_impl() 53 } 54 55 fn create_subscription_impl(&mut self) -> WatcherSubscription<E> { 56 let subscription_id = WatcherSubscriptionId(self.next_subscription_id); 57 self.next_subscription_id += 1; 58 let (sender, receiver) = mpsc::channel(E::WATCHER_MAX_BACKLOG_SIZE); 59 self.watchers.insert(subscription_id, sender); 60 WatcherSubscription { 61 subscription_id, 62 receiver, 63 full_sync_event: E::full_sync_event(self.current_value.clone()), 64 } 65 } 66 67 fn update_subscribers(&mut self, events: Vec<E>, current_value: &<E as WatchEvents>::Value) { 68 self.current_value = current_value.clone(); 69 70 // No watchers or events, so no further dispatching to do. 71 if self.watchers.is_empty() || events.is_empty() { 72 return; 73 } 74 75 // We have events to dispatch to their respective watchers. 76 for event in events { 77 let prev_watchers_len = self.watchers.len(); 78 79 // Try to write to all senders, retaining only the ones that have not overflowed. The ones that have overflowed will need to be 80 // restarted, as they missed events. This will trigger a full-sync. This is to make sure if a receiver goes pathologically 81 // behind, we'll terminate it, and it must restart watching. 82 self.watchers 83 .retain(|_, sender| sender.try_send(event.clone()).is_ok()); 84 85 let watchers_len = self.watchers.len(); 86 let watchers_removed = prev_watchers_len - watchers_len; 87 self.stats 88 .subscriptions_reset 89 .incr_by(watchers_removed as _); 90 self.stats.subscriptions_notified.incr_by(watchers_len as _); 91 } 92 } 93 94 pub(crate) fn remove_subscription(&mut self, watcher_id: WatcherSubscriptionId) { 95 if self.watchers.remove(&watcher_id).is_some() { 96 self.stats.subscriptions_removed.incr(); 97 } 98 } 99 100 pub(crate) fn recreate_subscription( 101 &mut self, 102 watcher_id: WatcherSubscriptionId, 103 ) -> WatcherSubscription<E> { 104 self.stats.subscriptions_recreated.incr(); 105 self.watchers.remove(&watcher_id); 106 self.create_subscription_impl() 107 } 108 109 pub(crate) fn num_subscribers(&self) -> usize { 110 self.watchers.len() 111 } 112} 113 114pub(crate) struct WatcherLoop<I> 115where 116 I: WatcherImpl, 117{ 118 inner: Arc<RwLock<WatcherInner<I::Events>>>, 119 watcher_impl: I, 120 backoff: AsyncBackoff, 121 stats: Arc<WatcherStats>, 122} 123 124impl<I> WatcherLoop<I> 125where 126 I: WatcherImpl, 127{ 128 /// Starts the watcher loop, resolving once the loop has started, and an initial value has been read from etcd. 129 pub(crate) async fn start( 130 watcher_impl: I, 131 stats: Arc<WatcherStats>, 132 ) -> Result<WatchHandle<I::Events>, crate::etcd::EtcdError> { 133 // Begin watching by synchronizing with etcd. 134 let initial_watch_state = match watcher_impl.begin_watching().await { 135 Ok(initial_watch_state) => initial_watch_state, 136 Err(err) => { 137 error!( 138 "watcher {:?} failed to start (error: {:?})", 139 watcher_impl, err 140 ); 141 stats.watcher_start_fails.incr(); 142 return Err(err); 143 } 144 }; 145 146 let key = watcher_impl.key().to_owned(); 147 let backoff = AsyncBackoff::new(AsyncBackoffConfig { 148 min_delay: Duration::from_secs(5), 149 max_delay: Duration::from_secs(60), 150 multiplier: 2.0, 151 }); 152 153 let watcher = Self { 154 watcher_impl, 155 backoff, 156 stats: stats.clone(), 157 inner: Arc::new(RwLock::new(WatcherInner { 158 current_value: initial_watch_state.current_value.clone(), 159 next_subscription_id: 0, 160 watchers: HashMap::new(), 161 stats, 162 key, 163 })), 164 }; 165 166 let inner = watcher.inner.clone(); 167 let join_handle = tokio::task::spawn(watcher.run_loop(initial_watch_state)); 168 let watch_handle = WatchHandle::new(inner, join_handle); 169 170 Ok(watch_handle) 171 } 172 173 /// The background task that runs the watcher loop fsm. 174 async fn run_loop( 175 mut self, 176 initial_watch_state: WatchState<<I::Events as WatchEvents>::Value>, 177 ) { 178 let mut watch_loop_state = WatchLoopState::ContinueWatching(initial_watch_state); 179 self.stats.watcher_start_success.incr(); 180 info!("Watcher (impl={:?} has started)", &self.watcher_impl); 181 182 loop { 183 let (next_watch_loop_state, events) = self.run_loop_once(watch_loop_state).await; 184 watch_loop_state = next_watch_loop_state; 185 self.inner 186 .write() 187 .update_subscribers(events, watch_loop_state.current_value()); 188 } 189 } 190 191 /// Runs one iteration of the watcher loop, returning the events to be emitted, and the next watch loop state that this function will be called with. 192 async fn run_loop_once( 193 &mut self, 194 watch_loop_state: WatchLoopState<<I::Events as WatchEvents>::Value>, 195 ) -> ( 196 WatchLoopState<<I::Events as WatchEvents>::Value>, 197 Vec<I::Events>, 198 ) { 199 let (watch_result, prev_watch_state) = match watch_loop_state { 200 WatchLoopState::ContinueWatching(prev_watch_state) => { 201 let watch_result = self.watcher_impl.continue_watching(&prev_watch_state).await; 202 203 match watch_result { 204 // The watch was a success! 205 Ok(result) => (result, prev_watch_state), 206 // The watch failed, we'll backoff and try again. 207 Err(err) => { 208 self.log_and_sleep_after_error(err).await; 209 return (WatchLoopState::ContinueWatching(prev_watch_state), vec![]); 210 } 211 } 212 } 213 WatchLoopState::ResetWatcher(prev_watch_state) => { 214 match self 215 .watcher_impl 216 .reset_watcher(&prev_watch_state.current_value) 217 .await 218 { 219 // The watch reset was successful, which means we can continue the watcher now. 220 Ok(watch_result) => (watch_result, prev_watch_state), 221 // Watch reset was unsuccessful, we'll need to retry resetting. 222 Err(err) => { 223 self.log_and_sleep_after_error(err).await; 224 return (WatchLoopState::ResetWatcher(prev_watch_state), vec![]); 225 } 226 } 227 } 228 }; 229 230 self.backoff.succeed(); 231 match watch_result { 232 // The watch was successful, and state will be updated to the next state. 233 WatchResult::Success { events, state } => { 234 self.stats.watch_success.incr(); 235 let next_state = WatchLoopState::ContinueWatching(state); 236 (next_state, events) 237 } 238 // The watch has timed out, so we'll retry the watch with the same state as before. 239 WatchResult::Timeout => { 240 self.stats.watch_timeout.incr(); 241 (WatchLoopState::ContinueWatching(prev_watch_state), vec![]) 242 } 243 // The watch needs to be reset. Inner watcher state has not changed. 244 WatchResult::ResetWatcher => { 245 self.stats.watch_reset.incr(); 246 (WatchLoopState::ResetWatcher(prev_watch_state), vec![]) 247 } 248 } 249 } 250 251 async fn log_and_sleep_after_error(&mut self, error: crate::etcd::EtcdError) { 252 let delay = self.backoff.fail(); 253 error!( 254 "Watcher for key {} encountered an etcd error ({:?}), will retry after {:?}", 255 self.watcher_impl.key(), 256 error, 257 delay 258 ); 259 self.stats.watch_etcd_errors.incr(); 260 sleep(delay).await; 261 } 262} 263 264pub(crate) enum WatchLoopState<V> { 265 ContinueWatching(WatchState<V>), 266 ResetWatcher(WatchState<V>), 267} 268 269impl<V> WatchLoopState<V> { 270 fn current_value(&self) -> &V { 271 match self { 272 WatchLoopState::ResetWatcher(watch_state) => &watch_state.current_value, 273 WatchLoopState::ContinueWatching(watch_state) => &watch_state.current_value, 274 } 275 } 276}