Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 250 lines 7.6 kB view raw
1use crate::metrics::gauges::StaticGauge; 2use crate::metrics::histograms::StaticHistogram; 3use tokio::{ 4 sync::oneshot, 5 time::{interval, Duration, Instant, MissedTickBehavior}, 6}; 7 8use crate::{coordinator_metrics::OspreyCoordinatorMetrics, proto}; 9 10use crate::tokio_utils::AbortOnDrop; 11use std::{cell::Cell, sync::Arc}; 12 13#[derive(Debug)] 14pub enum AckOrNack { 15 Ack(Option<crate::proto::Verdicts>), 16 Nack, 17} 18 19impl From<proto::ack_or_nack::AckOrNack> for AckOrNack { 20 fn from(ack_or_nack: proto::ack_or_nack::AckOrNack) -> Self { 21 match ack_or_nack { 22 proto::ack_or_nack::AckOrNack::Ack(inner) => Self::Ack(inner.verdicts), 23 proto::ack_or_nack::AckOrNack::Nack(_) => Self::Nack, 24 } 25 } 26} 27 28pub struct AckableAction { 29 pub action: proto::OspreyCoordinatorAction, 30 acking_oneshot_sender: oneshot::Sender<AckOrNack>, 31 local_retry_count: Cell<u32>, 32 pub created_at: Instant, 33} 34 35impl AckableAction { 36 pub fn new( 37 action: proto::OspreyCoordinatorAction, 38 ) -> ( 39 AckableAction, 40 oneshot::Receiver<crate::priority_queue::AckOrNack>, 41 ) { 42 let (acking_oneshot_sender, acking_oneshot_receiver) = oneshot::channel::<AckOrNack>(); 43 let ackable_action = AckableAction { 44 action, 45 acking_oneshot_sender, 46 local_retry_count: 0.into(), 47 created_at: Instant::now(), 48 }; 49 (ackable_action, acking_oneshot_receiver) 50 } 51 52 pub fn into_action(self) -> (proto::OspreyCoordinatorAction, ActionAcker) { 53 ( 54 self.action, 55 ActionAcker { 56 acking_oneshot_sender: self.acking_oneshot_sender, 57 }, 58 ) 59 } 60 61 fn increment_retry_count(&self) { 62 let count = self.local_retry_count.get(); 63 self.local_retry_count.set(count + 1); 64 } 65 66 #[allow(unused)] 67 pub fn retry_count(&self) -> u32 { 68 self.local_retry_count.get() 69 } 70} 71 72#[derive(Debug)] 73pub struct ActionAcker { 74 acking_oneshot_sender: oneshot::Sender<AckOrNack>, 75} 76 77impl ActionAcker { 78 pub fn ack_or_nack<T: Into<AckOrNack>>(self, ack_or_nack: T) { 79 self.acking_oneshot_sender.send(ack_or_nack.into()).ok(); 80 } 81} 82 83pub enum Priority { 84 Sync, 85 Async, 86} 87 88#[derive(Clone)] 89pub struct PriorityQueueSender { 90 sync_sender: async_channel::Sender<AckableAction>, 91 async_sender: async_channel::Sender<AckableAction>, 92} 93 94impl PriorityQueueSender { 95 fn new( 96 sync_sender: async_channel::Sender<AckableAction>, 97 async_sender: async_channel::Sender<AckableAction>, 98 ) -> PriorityQueueSender { 99 PriorityQueueSender { 100 sync_sender, 101 async_sender, 102 } 103 } 104 105 pub fn close(&self) { 106 self.sync_sender.close(); 107 self.async_sender.close(); 108 } 109 pub async fn send_sync( 110 &self, 111 ackable_action: AckableAction, 112 ) -> Result<(), async_channel::SendError<AckableAction>> { 113 self.send(ackable_action, Priority::Sync).await 114 } 115 116 pub async fn send_async( 117 &self, 118 ackable_action: AckableAction, 119 ) -> Result<(), async_channel::SendError<AckableAction>> { 120 self.send(ackable_action, Priority::Async).await 121 } 122 123 async fn send( 124 &self, 125 ackable_action: AckableAction, 126 priority: Priority, 127 ) -> Result<(), async_channel::SendError<AckableAction>> { 128 ackable_action.increment_retry_count(); 129 match priority { 130 Priority::Sync => self.sync_sender.send(ackable_action).await, 131 Priority::Async => self.async_sender.send(ackable_action).await, 132 } 133 } 134 135 pub fn len_sync(&self) -> usize { 136 self.sync_sender.len() 137 } 138 139 pub fn len_async(&self) -> usize { 140 self.async_sender.len() 141 } 142 143 pub fn receiver_count_sync(&self) -> usize { 144 self.sync_sender.receiver_count() 145 } 146 147 pub fn receiver_count_async(&self) -> usize { 148 self.async_sender.receiver_count() 149 } 150} 151 152#[derive(Clone)] 153pub struct PriorityQueueReceiver { 154 sync_receiver: async_channel::Receiver<AckableAction>, 155 async_receiver: async_channel::Receiver<AckableAction>, 156} 157 158impl PriorityQueueReceiver { 159 fn new( 160 sync_receiver: async_channel::Receiver<AckableAction>, 161 async_receiver: async_channel::Receiver<AckableAction>, 162 ) -> PriorityQueueReceiver { 163 PriorityQueueReceiver { 164 sync_receiver, 165 async_receiver, 166 } 167 } 168 pub async fn recv( 169 &self, 170 metrics: Arc<OspreyCoordinatorMetrics>, 171 ) -> Result<AckableAction, async_channel::RecvError> { 172 loop { 173 let result = tokio::select! { 174 biased; 175 result = self.sync_receiver.recv() => result, 176 result = self.async_receiver.recv() => match result { 177 Ok(ackable_action) => { 178 metrics.action_time_in_async_queue.record(Instant::now().duration_since(ackable_action.created_at)); 179 Ok(ackable_action) 180 } 181 Err(_) => self.sync_receiver.recv().await 182 }, 183 }; 184 match result { 185 Ok(ackable_action) => { 186 // If the acking oneshot receiver is closed then there is no reason to process this action 187 // This can happen if the client sending a sync classification request times out 188 if ackable_action.acking_oneshot_sender.is_closed() { 189 continue; 190 } else { 191 return Ok(ackable_action); 192 } 193 } 194 Err(err) => return Err(err), 195 } 196 } 197 } 198 199 pub fn nack_all_async(&self) { 200 loop { 201 match self.async_receiver.try_recv() { 202 Ok(action) => match action.acking_oneshot_sender.send(AckOrNack::Nack) { 203 Ok(_) => (), 204 Err(_) => println!( 205 "tried to nack {:?} and the nacking receiver was dropped", 206 action.action 207 ), 208 }, 209 Err(_) => return, 210 } 211 } 212 } 213} 214 215pub fn create_ackable_action_priority_queue() -> (PriorityQueueSender, PriorityQueueReceiver) { 216 let (sync_sender, sync_receiver) = async_channel::unbounded(); 217 let (async_sender, async_receiver) = async_channel::unbounded(); 218 ( 219 PriorityQueueSender::new(sync_sender, async_sender), 220 PriorityQueueReceiver::new(sync_receiver, async_receiver), 221 ) 222} 223 224pub fn spawn_priority_queue_metrics_worker( 225 queue_sender: PriorityQueueSender, 226 metrics: Arc<OspreyCoordinatorMetrics>, 227) -> AbortOnDrop<()> { 228 let mut interval = interval(Duration::from_millis(100)); 229 interval.set_missed_tick_behavior(MissedTickBehavior::Skip); 230 231 let join_handle = tokio::task::spawn(async move { 232 loop { 233 interval.tick().await; 234 metrics 235 .priority_queue_size_sync 236 .set(queue_sender.len_sync() as u64); 237 metrics 238 .priority_queue_size_async 239 .set(queue_sender.len_async() as u64); 240 metrics 241 .priority_queue_receiver_count_async 242 .set(queue_sender.receiver_count_async() as u64); 243 metrics 244 .priority_queue_receiver_count_sync 245 .set(queue_sender.receiver_count_sync() as u64); 246 } 247 }); 248 249 AbortOnDrop::new(join_handle) 250}