Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use crate::metrics::gauges::StaticGauge;
2use crate::metrics::histograms::StaticHistogram;
3use tokio::{
4 sync::oneshot,
5 time::{interval, Duration, Instant, MissedTickBehavior},
6};
7
8use crate::{coordinator_metrics::OspreyCoordinatorMetrics, proto};
9
10use crate::tokio_utils::AbortOnDrop;
11use std::{cell::Cell, sync::Arc};
12
13#[derive(Debug)]
14pub enum AckOrNack {
15 Ack(Option<crate::proto::Verdicts>),
16 Nack,
17}
18
19impl From<proto::ack_or_nack::AckOrNack> for AckOrNack {
20 fn from(ack_or_nack: proto::ack_or_nack::AckOrNack) -> Self {
21 match ack_or_nack {
22 proto::ack_or_nack::AckOrNack::Ack(inner) => Self::Ack(inner.verdicts),
23 proto::ack_or_nack::AckOrNack::Nack(_) => Self::Nack,
24 }
25 }
26}
27
28pub struct AckableAction {
29 pub action: proto::OspreyCoordinatorAction,
30 acking_oneshot_sender: oneshot::Sender<AckOrNack>,
31 local_retry_count: Cell<u32>,
32 pub created_at: Instant,
33}
34
35impl AckableAction {
36 pub fn new(
37 action: proto::OspreyCoordinatorAction,
38 ) -> (
39 AckableAction,
40 oneshot::Receiver<crate::priority_queue::AckOrNack>,
41 ) {
42 let (acking_oneshot_sender, acking_oneshot_receiver) = oneshot::channel::<AckOrNack>();
43 let ackable_action = AckableAction {
44 action,
45 acking_oneshot_sender,
46 local_retry_count: 0.into(),
47 created_at: Instant::now(),
48 };
49 (ackable_action, acking_oneshot_receiver)
50 }
51
52 pub fn into_action(self) -> (proto::OspreyCoordinatorAction, ActionAcker) {
53 (
54 self.action,
55 ActionAcker {
56 acking_oneshot_sender: self.acking_oneshot_sender,
57 },
58 )
59 }
60
61 fn increment_retry_count(&self) {
62 let count = self.local_retry_count.get();
63 self.local_retry_count.set(count + 1);
64 }
65
66 #[allow(unused)]
67 pub fn retry_count(&self) -> u32 {
68 self.local_retry_count.get()
69 }
70}
71
72#[derive(Debug)]
73pub struct ActionAcker {
74 acking_oneshot_sender: oneshot::Sender<AckOrNack>,
75}
76
77impl ActionAcker {
78 pub fn ack_or_nack<T: Into<AckOrNack>>(self, ack_or_nack: T) {
79 self.acking_oneshot_sender.send(ack_or_nack.into()).ok();
80 }
81}
82
83pub enum Priority {
84 Sync,
85 Async,
86}
87
88#[derive(Clone)]
89pub struct PriorityQueueSender {
90 sync_sender: async_channel::Sender<AckableAction>,
91 async_sender: async_channel::Sender<AckableAction>,
92}
93
94impl PriorityQueueSender {
95 fn new(
96 sync_sender: async_channel::Sender<AckableAction>,
97 async_sender: async_channel::Sender<AckableAction>,
98 ) -> PriorityQueueSender {
99 PriorityQueueSender {
100 sync_sender,
101 async_sender,
102 }
103 }
104
105 pub fn close(&self) {
106 self.sync_sender.close();
107 self.async_sender.close();
108 }
109 pub async fn send_sync(
110 &self,
111 ackable_action: AckableAction,
112 ) -> Result<(), async_channel::SendError<AckableAction>> {
113 self.send(ackable_action, Priority::Sync).await
114 }
115
116 pub async fn send_async(
117 &self,
118 ackable_action: AckableAction,
119 ) -> Result<(), async_channel::SendError<AckableAction>> {
120 self.send(ackable_action, Priority::Async).await
121 }
122
123 async fn send(
124 &self,
125 ackable_action: AckableAction,
126 priority: Priority,
127 ) -> Result<(), async_channel::SendError<AckableAction>> {
128 ackable_action.increment_retry_count();
129 match priority {
130 Priority::Sync => self.sync_sender.send(ackable_action).await,
131 Priority::Async => self.async_sender.send(ackable_action).await,
132 }
133 }
134
135 pub fn len_sync(&self) -> usize {
136 self.sync_sender.len()
137 }
138
139 pub fn len_async(&self) -> usize {
140 self.async_sender.len()
141 }
142
143 pub fn receiver_count_sync(&self) -> usize {
144 self.sync_sender.receiver_count()
145 }
146
147 pub fn receiver_count_async(&self) -> usize {
148 self.async_sender.receiver_count()
149 }
150}
151
152#[derive(Clone)]
153pub struct PriorityQueueReceiver {
154 sync_receiver: async_channel::Receiver<AckableAction>,
155 async_receiver: async_channel::Receiver<AckableAction>,
156}
157
158impl PriorityQueueReceiver {
159 fn new(
160 sync_receiver: async_channel::Receiver<AckableAction>,
161 async_receiver: async_channel::Receiver<AckableAction>,
162 ) -> PriorityQueueReceiver {
163 PriorityQueueReceiver {
164 sync_receiver,
165 async_receiver,
166 }
167 }
168 pub async fn recv(
169 &self,
170 metrics: Arc<OspreyCoordinatorMetrics>,
171 ) -> Result<AckableAction, async_channel::RecvError> {
172 loop {
173 let result = tokio::select! {
174 biased;
175 result = self.sync_receiver.recv() => result,
176 result = self.async_receiver.recv() => match result {
177 Ok(ackable_action) => {
178 metrics.action_time_in_async_queue.record(Instant::now().duration_since(ackable_action.created_at));
179 Ok(ackable_action)
180 }
181 Err(_) => self.sync_receiver.recv().await
182 },
183 };
184 match result {
185 Ok(ackable_action) => {
186 // If the acking oneshot receiver is closed then there is no reason to process this action
187 // This can happen if the client sending a sync classification request times out
188 if ackable_action.acking_oneshot_sender.is_closed() {
189 continue;
190 } else {
191 return Ok(ackable_action);
192 }
193 }
194 Err(err) => return Err(err),
195 }
196 }
197 }
198
199 pub fn nack_all_async(&self) {
200 loop {
201 match self.async_receiver.try_recv() {
202 Ok(action) => match action.acking_oneshot_sender.send(AckOrNack::Nack) {
203 Ok(_) => (),
204 Err(_) => println!(
205 "tried to nack {:?} and the nacking receiver was dropped",
206 action.action
207 ),
208 },
209 Err(_) => return,
210 }
211 }
212 }
213}
214
215pub fn create_ackable_action_priority_queue() -> (PriorityQueueSender, PriorityQueueReceiver) {
216 let (sync_sender, sync_receiver) = async_channel::unbounded();
217 let (async_sender, async_receiver) = async_channel::unbounded();
218 (
219 PriorityQueueSender::new(sync_sender, async_sender),
220 PriorityQueueReceiver::new(sync_receiver, async_receiver),
221 )
222}
223
224pub fn spawn_priority_queue_metrics_worker(
225 queue_sender: PriorityQueueSender,
226 metrics: Arc<OspreyCoordinatorMetrics>,
227) -> AbortOnDrop<()> {
228 let mut interval = interval(Duration::from_millis(100));
229 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
230
231 let join_handle = tokio::task::spawn(async move {
232 loop {
233 interval.tick().await;
234 metrics
235 .priority_queue_size_sync
236 .set(queue_sender.len_sync() as u64);
237 metrics
238 .priority_queue_size_async
239 .set(queue_sender.len_async() as u64);
240 metrics
241 .priority_queue_receiver_count_async
242 .set(queue_sender.receiver_count_async() as u64);
243 metrics
244 .priority_queue_receiver_count_sync
245 .set(queue_sender.receiver_count_sync() as u64);
246 }
247 });
248
249 AbortOnDrop::new(join_handle)
250}