Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use crate::backoff_utils::{AsyncBackoff, Config};
2use crate::etcd::{Client, EtcdError};
3use etcd::{kv::KeyValueInfo, Response};
4use futures::Stream;
5use log::error;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10use tokio::time::sleep;
11
12type ResponseFuture =
13 Pin<Box<dyn Future<Output = Result<Response<KeyValueInfo>, EtcdError>> + Send>>;
14type BackoffFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
15
16/// A single event returned from watch stream.
17pub enum WatchEvent {
18 /// The initial information when a watch is started.
19 /// `KeyValueInfo` is `None` if the key doesn't exist.
20 Get(Option<KeyValueInfo>),
21 /// A node update.
22 Update(KeyValueInfo),
23}
24
25/// The next future that should be polled in the `Stream::poll_next` method.
26enum PendingFuture {
27 Get(ResponseFuture),
28 Watch(ResponseFuture),
29 Backoff(BackoffFuture),
30}
31
32/// A watcher that will stream watch events until it is dropped. It handles backoffs if etcd
33/// goes offline or returns errors.
34pub struct Watcher {
35 /// Etcd client.
36 client: Client,
37 /// Key to watch.
38 key: String,
39 /// Backoff for when errors happen.
40 backoff: AsyncBackoff,
41 /// The future that should be polled next during `Stream::poll_next`.
42 pending_future: Option<PendingFuture>,
43}
44
45impl Watcher {
46 pub fn new(client: Client, key: String) -> Self {
47 let config = Config {
48 min_delay: Duration::from_secs(1),
49 max_delay: Duration::from_secs(15),
50 ..Default::default()
51 };
52 let backoff = AsyncBackoff::new(config);
53 Self {
54 client,
55 key,
56 backoff,
57 pending_future: None,
58 }
59 }
60}
61
62impl Stream for Watcher {
63 type Item = WatchEvent;
64
65 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
66 let watcher = Pin::get_mut(self);
67
68 // Order of operations.
69 // 1. Do a get to retrieve the initial index.
70 // 2. Start a watch from the index.
71 // 3. Loop on the watch until there is an error.
72 //
73 // If there is ever an error, start a backoff. Once the backoff is complete, go to step 1.
74 let (poll, next_future) = match watcher.pending_future.take() {
75 // Poll the current pending future if there isn't one.
76 Some(pending_future) => watcher.poll_pending(pending_future, context),
77 // There is no pending future. We are just starting the stream. Start by doing a get.
78 None => watcher.start_get(context),
79 };
80
81 watcher.pending_future = Some(next_future);
82 poll
83 }
84}
85
86impl Watcher {
87 /// Unwrap the current `pending_future` and poll it.
88 fn poll_pending(
89 &mut self,
90 pending_future: PendingFuture,
91 context: &mut Context,
92 ) -> (Poll<Option<WatchEvent>>, PendingFuture) {
93 match pending_future {
94 PendingFuture::Backoff(backoff_future) => self.poll_backoff(backoff_future, context),
95 PendingFuture::Watch(watch_future) => self.poll_watch(watch_future, context),
96 PendingFuture::Get(get_future) => self.poll_get(get_future, context),
97 }
98 }
99
100 /// Poll the backoff future. If the backoff is ready then it returns Ready.
101 fn poll_backoff(
102 &mut self,
103 mut backoff_future: BackoffFuture,
104 context: &mut Context,
105 ) -> (Poll<Option<WatchEvent>>, PendingFuture) {
106 match backoff_future.as_mut().poll(context) {
107 Poll::Ready(()) => self.start_get(context),
108 Poll::Pending => (Poll::Pending, PendingFuture::Backoff(backoff_future)),
109 }
110 }
111
112 /// Poll the watch future.
113 fn poll_watch(
114 &mut self,
115 mut watch_future: ResponseFuture,
116 context: &mut Context,
117 ) -> (Poll<Option<WatchEvent>>, PendingFuture) {
118 match watch_future.as_mut().poll(context) {
119 Poll::Ready(Err(error)) => self.start_backoff(context, error.into()),
120 Poll::Ready(Ok(response)) => {
121 let after_index = match response.data.node.modified_index {
122 Some(after_index) => after_index,
123 None => return self.start_backoff(context, EtcdError::NoIndex.into()),
124 };
125 (
126 Poll::Ready(Some(WatchEvent::Update(response.data))),
127 self.make_watch_future(after_index),
128 )
129 }
130 Poll::Pending => (Poll::Pending, PendingFuture::Watch(watch_future)),
131 }
132 }
133
134 /// Poll the get future.
135 fn poll_get(
136 &mut self,
137 mut get_future: ResponseFuture,
138 context: &mut Context,
139 ) -> (Poll<Option<WatchEvent>>, PendingFuture) {
140 match get_future.as_mut().poll(context) {
141 Poll::Ready(Ok(response)) => {
142 let after_index = match response.cluster_info.etcd_index {
143 Some(after_index) => after_index,
144 None => return self.start_backoff(context, EtcdError::NoIndex.into()),
145 };
146
147 self.backoff.succeed();
148 (
149 Poll::Ready(Some(WatchEvent::Get(Some(response.data)))),
150 self.make_watch_future(after_index),
151 )
152 }
153 Poll::Ready(Err(error)) => match error {
154 EtcdError::KeyNotFound {
155 index: after_index,
156 error: _error,
157 } => (
158 Poll::Ready(Some(WatchEvent::Get(None))),
159 self.make_watch_future(after_index),
160 ),
161 other => self.start_backoff(context, other.into()),
162 },
163 Poll::Pending => (Poll::Pending, PendingFuture::Get(get_future)),
164 }
165 }
166
167 /// Start the backoff future.
168 fn start_backoff(
169 &mut self,
170 context: &mut Context,
171 error: anyhow::Error,
172 ) -> (Poll<Option<WatchEvent>>, PendingFuture) {
173 let backoff_duration = self.backoff.fail();
174 error!(
175 "Watch error. error={:?}. Starting backoff {:?}",
176 error, backoff_duration
177 );
178
179 let backoff_future = Box::pin(async move { sleep(backoff_duration).await });
180 self.poll_backoff(backoff_future, context)
181 }
182
183 /// Start the get future.
184 fn start_get(&mut self, context: &mut Context) -> (Poll<Option<WatchEvent>>, PendingFuture) {
185 let client = self.client.clone();
186 let key = self.key.clone();
187 let get_future = Box::pin(async move { client.get(&*key).await });
188 self.poll_get(get_future, context)
189 }
190
191 /// Make a watch future.
192 fn make_watch_future(&mut self, after_index: u64) -> PendingFuture {
193 let client = self.client.clone();
194 let key = self.key.clone();
195 PendingFuture::Watch(Box::pin(async move {
196 client.watch_recursive(&key, after_index).await
197 }))
198 }
199}