Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 185 lines 7.1 kB view raw
1use crate::metrics::counters::StaticCounter; 2use crate::metrics::histograms::StaticHistogram; 3use crate::snowflake_client::SnowflakeClient; 4use crate::{ 5 coordinator_metrics::OspreyCoordinatorMetrics, 6 priority_queue::AckableAction, 7 priority_queue::{AckOrNack, PriorityQueueSender}, 8 proto::{self, osprey_coordinator_sync_action}, 9}; 10use anyhow::{anyhow, Context, Result}; 11use std::sync::Arc; 12use tokio::time::Instant; 13 14use osprey_coordinator_sync_action::osprey_coordinator_sync_action_service_server::OspreyCoordinatorSyncActionService; 15use osprey_coordinator_sync_action::ProcessActionRequest; 16use rand::Rng; 17 18pub(crate) struct SyncActionServer { 19 snowflake_client: Arc<SnowflakeClient>, 20 priority_queue_sender: PriorityQueueSender, 21 metrics: Arc<OspreyCoordinatorMetrics>, 22} 23 24impl SyncActionServer { 25 pub fn new( 26 snowflake_client: Arc<SnowflakeClient>, 27 priority_queue_sender: PriorityQueueSender, 28 metrics: Arc<OspreyCoordinatorMetrics>, 29 ) -> SyncActionServer { 30 SyncActionServer { 31 snowflake_client, 32 priority_queue_sender, 33 metrics, 34 } 35 } 36} 37 38async fn create_osprey_coordinator_action( 39 ack_id: u64, 40 action_request: &osprey_coordinator_sync_action::ProcessActionRequest, 41 snowflake_client: &SnowflakeClient, 42) -> Result<proto::OspreyCoordinatorAction> { 43 // generate snowflake if one is not provided, to match the behaviour in pubsub.rs 44 let action_id = match action_request.action_id { 45 Some(id) => match id { 46 // handle 0 as none-type, since protos default u64 to 0 47 0 => snowflake_client.generate_id().await?, 48 _ => id, 49 }, 50 None => snowflake_client.generate_id().await?, 51 }; 52 if action_request.action_name.is_empty() { 53 return Err(anyhow!("`action_name` must not be empty")); 54 } 55 let osprey_coordinator_action = proto::OspreyCoordinatorAction { 56 ack_id, 57 action_id, 58 action_name: action_request.action_name.clone(), 59 action_data: Some( 60 proto::osprey_coordinator_action::ActionData::JsonActionData( 61 action_request.action_data_json.clone().into(), 62 ), 63 ), 64 secret_data: None, 65 timestamp: Some( 66 action_request 67 .timestamp 68 .as_ref() 69 .context("`timestamp` not found")? 70 .clone(), 71 ), 72 }; 73 74 Ok(osprey_coordinator_action) 75} 76 77impl SyncActionServer { 78 async fn try_process_action( 79 &self, 80 ack_id: u64, 81 action_request: &ProcessActionRequest, 82 ) -> Result<tonic::Response<osprey_coordinator_sync_action::ProcessActionResponse>, tonic::Status> 83 { 84 let unvalidated_action_id = action_request.action_id; 85 86 let osprey_coordinator_action = match create_osprey_coordinator_action( 87 ack_id, 88 action_request, 89 self.snowflake_client.as_ref(), 90 ) 91 .await 92 { 93 Ok(result) => result, 94 Err(error) => { 95 tracing::error!({error=%error, ack_id=ack_id, action_id=unvalidated_action_id},"[rpc] deserialization error"); 96 self.metrics 97 .sync_classification_failure_deserialization 98 .incr(); 99 return Err(tonic::Status::new(tonic::Code::Aborted, error.to_string())); 100 } 101 }; 102 103 let action_id = osprey_coordinator_action.action_id; 104 105 let (ackable_action, acking_receiver) = AckableAction::new(osprey_coordinator_action); 106 107 let send_start_time = Instant::now(); 108 match self.priority_queue_sender.send_sync(ackable_action).await { 109 Ok(_) => { 110 tracing::debug!({action_id=%action_id, ack_id=ack_id}, "[rpc] sent message to priority queue") 111 } 112 Err(e) => { 113 self.metrics.sync_classification_failure_pq_send.incr(); 114 tracing::error!({error=%e, action_id=%action_id, ack_id=ack_id},"[rpc] tried to send action to closed channel"); 115 return Err(tonic::Status::new(tonic::Code::Unavailable, e.to_string())); 116 } 117 }; 118 self.metrics 119 .priority_queue_send_time_sync 120 .record(send_start_time.elapsed()); 121 tracing::debug!({action_id=%action_id, ack_id=ack_id},"[rpc] waiting on ack or nack"); 122 123 let receive_start_time = Instant::now(); 124 match acking_receiver.await { 125 Ok(ack_or_nack) => match ack_or_nack { 126 AckOrNack::Ack(verdicts) => { 127 tracing::debug!({action_id=%action_id, ack_id=ack_id},"[rpc] acking message"); 128 129 let response = 130 osprey_coordinator_sync_action::ProcessActionResponse { verdicts }; 131 132 self.metrics.sync_classification_result_ack.incr(); 133 self.metrics 134 .receiver_ack_time_sync 135 .record(receive_start_time.elapsed()); 136 Ok(tonic::Response::new(response)) 137 } 138 AckOrNack::Nack => { 139 tracing::debug!({action_id=%action_id, ack_id=ack_id},"[rpc] nacking message"); 140 self.metrics.sync_classification_result_nack.incr(); 141 self.metrics 142 .receiver_ack_time_sync 143 .record(receive_start_time.elapsed()); 144 Err(tonic::Status::aborted("action nacked")) 145 } 146 }, 147 Err(recv_error) => { 148 tracing::error!({action_id=%action_id, recv_error=%recv_error, ack_id=ack_id},"[rpc] acking sender dropped"); 149 self.metrics 150 .sync_classification_failure_oneshot_dropped 151 .incr(); 152 self.metrics 153 .receiver_ack_time_sync 154 .record(receive_start_time.elapsed()); 155 Err(tonic::Status::internal("acking onshot dropped")) 156 } 157 } 158 } 159} 160 161#[tonic::async_trait] 162impl OspreyCoordinatorSyncActionService for SyncActionServer { 163 async fn process_action( 164 &self, 165 request: tonic::Request<osprey_coordinator_sync_action::ProcessActionRequest>, 166 ) -> Result<tonic::Response<osprey_coordinator_sync_action::ProcessActionResponse>, tonic::Status> 167 { 168 self.metrics.sync_classification_action_received.incr(); 169 let action_request = request.into_inner(); 170 tracing::debug!({action_request=?action_request}, "[rpc] action request received"); 171 172 let ack_id: u64 = { 173 let mut rng = rand::thread_rng(); 174 rng.gen() 175 }; 176 177 match self.try_process_action(ack_id, &action_request).await { 178 response @ Ok(_) => response, 179 Err(e) => { 180 tracing::error!("initial process_action attempt failed, retrying: {}", e); 181 self.try_process_action(ack_id, &action_request).await 182 } 183 } 184 } 185}