Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 199 lines 7.0 kB view raw
1use crate::backoff_utils::{AsyncBackoff, Config}; 2use crate::etcd::{Client, EtcdError}; 3use etcd::{kv::KeyValueInfo, Response}; 4use futures::Stream; 5use log::error; 6use std::future::Future; 7use std::pin::Pin; 8use std::task::{Context, Poll}; 9use std::time::Duration; 10use tokio::time::sleep; 11 12type ResponseFuture = 13 Pin<Box<dyn Future<Output = Result<Response<KeyValueInfo>, EtcdError>> + Send>>; 14type BackoffFuture = Pin<Box<dyn Future<Output = ()> + Send>>; 15 16/// A single event returned from watch stream. 17pub enum WatchEvent { 18 /// The initial information when a watch is started. 19 /// `KeyValueInfo` is `None` if the key doesn't exist. 20 Get(Option<KeyValueInfo>), 21 /// A node update. 22 Update(KeyValueInfo), 23} 24 25/// The next future that should be polled in the `Stream::poll_next` method. 26enum PendingFuture { 27 Get(ResponseFuture), 28 Watch(ResponseFuture), 29 Backoff(BackoffFuture), 30} 31 32/// A watcher that will stream watch events until it is dropped. It handles backoffs if etcd 33/// goes offline or returns errors. 34pub struct Watcher { 35 /// Etcd client. 36 client: Client, 37 /// Key to watch. 38 key: String, 39 /// Backoff for when errors happen. 40 backoff: AsyncBackoff, 41 /// The future that should be polled next during `Stream::poll_next`. 42 pending_future: Option<PendingFuture>, 43} 44 45impl Watcher { 46 pub fn new(client: Client, key: String) -> Self { 47 let config = Config { 48 min_delay: Duration::from_secs(1), 49 max_delay: Duration::from_secs(15), 50 ..Default::default() 51 }; 52 let backoff = AsyncBackoff::new(config); 53 Self { 54 client, 55 key, 56 backoff, 57 pending_future: None, 58 } 59 } 60} 61 62impl Stream for Watcher { 63 type Item = WatchEvent; 64 65 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> { 66 let watcher = Pin::get_mut(self); 67 68 // Order of operations. 69 // 1. Do a get to retrieve the initial index. 70 // 2. Start a watch from the index. 71 // 3. Loop on the watch until there is an error. 72 // 73 // If there is ever an error, start a backoff. Once the backoff is complete, go to step 1. 74 let (poll, next_future) = match watcher.pending_future.take() { 75 // Poll the current pending future if there isn't one. 76 Some(pending_future) => watcher.poll_pending(pending_future, context), 77 // There is no pending future. We are just starting the stream. Start by doing a get. 78 None => watcher.start_get(context), 79 }; 80 81 watcher.pending_future = Some(next_future); 82 poll 83 } 84} 85 86impl Watcher { 87 /// Unwrap the current `pending_future` and poll it. 88 fn poll_pending( 89 &mut self, 90 pending_future: PendingFuture, 91 context: &mut Context, 92 ) -> (Poll<Option<WatchEvent>>, PendingFuture) { 93 match pending_future { 94 PendingFuture::Backoff(backoff_future) => self.poll_backoff(backoff_future, context), 95 PendingFuture::Watch(watch_future) => self.poll_watch(watch_future, context), 96 PendingFuture::Get(get_future) => self.poll_get(get_future, context), 97 } 98 } 99 100 /// Poll the backoff future. If the backoff is ready then it returns Ready. 101 fn poll_backoff( 102 &mut self, 103 mut backoff_future: BackoffFuture, 104 context: &mut Context, 105 ) -> (Poll<Option<WatchEvent>>, PendingFuture) { 106 match backoff_future.as_mut().poll(context) { 107 Poll::Ready(()) => self.start_get(context), 108 Poll::Pending => (Poll::Pending, PendingFuture::Backoff(backoff_future)), 109 } 110 } 111 112 /// Poll the watch future. 113 fn poll_watch( 114 &mut self, 115 mut watch_future: ResponseFuture, 116 context: &mut Context, 117 ) -> (Poll<Option<WatchEvent>>, PendingFuture) { 118 match watch_future.as_mut().poll(context) { 119 Poll::Ready(Err(error)) => self.start_backoff(context, error.into()), 120 Poll::Ready(Ok(response)) => { 121 let after_index = match response.data.node.modified_index { 122 Some(after_index) => after_index, 123 None => return self.start_backoff(context, EtcdError::NoIndex.into()), 124 }; 125 ( 126 Poll::Ready(Some(WatchEvent::Update(response.data))), 127 self.make_watch_future(after_index), 128 ) 129 } 130 Poll::Pending => (Poll::Pending, PendingFuture::Watch(watch_future)), 131 } 132 } 133 134 /// Poll the get future. 135 fn poll_get( 136 &mut self, 137 mut get_future: ResponseFuture, 138 context: &mut Context, 139 ) -> (Poll<Option<WatchEvent>>, PendingFuture) { 140 match get_future.as_mut().poll(context) { 141 Poll::Ready(Ok(response)) => { 142 let after_index = match response.cluster_info.etcd_index { 143 Some(after_index) => after_index, 144 None => return self.start_backoff(context, EtcdError::NoIndex.into()), 145 }; 146 147 self.backoff.succeed(); 148 ( 149 Poll::Ready(Some(WatchEvent::Get(Some(response.data)))), 150 self.make_watch_future(after_index), 151 ) 152 } 153 Poll::Ready(Err(error)) => match error { 154 EtcdError::KeyNotFound { 155 index: after_index, 156 error: _error, 157 } => ( 158 Poll::Ready(Some(WatchEvent::Get(None))), 159 self.make_watch_future(after_index), 160 ), 161 other => self.start_backoff(context, other.into()), 162 }, 163 Poll::Pending => (Poll::Pending, PendingFuture::Get(get_future)), 164 } 165 } 166 167 /// Start the backoff future. 168 fn start_backoff( 169 &mut self, 170 context: &mut Context, 171 error: anyhow::Error, 172 ) -> (Poll<Option<WatchEvent>>, PendingFuture) { 173 let backoff_duration = self.backoff.fail(); 174 error!( 175 "Watch error. error={:?}. Starting backoff {:?}", 176 error, backoff_duration 177 ); 178 179 let backoff_future = Box::pin(async move { sleep(backoff_duration).await }); 180 self.poll_backoff(backoff_future, context) 181 } 182 183 /// Start the get future. 184 fn start_get(&mut self, context: &mut Context) -> (Poll<Option<WatchEvent>>, PendingFuture) { 185 let client = self.client.clone(); 186 let key = self.key.clone(); 187 let get_future = Box::pin(async move { client.get(&*key).await }); 188 self.poll_get(get_future, context) 189 } 190 191 /// Make a watch future. 192 fn make_watch_future(&mut self, after_index: u64) -> PendingFuture { 193 let client = self.client.clone(); 194 let key = self.key.clone(); 195 PendingFuture::Watch(Box::pin(async move { 196 client.watch_recursive(&key, after_index).await 197 })) 198 } 199}