Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use crate::metrics::counters::StaticCounter;
2use crate::metrics::histograms::StaticHistogram;
3use crate::snowflake_client::SnowflakeClient;
4use crate::{
5 coordinator_metrics::OspreyCoordinatorMetrics,
6 priority_queue::AckableAction,
7 priority_queue::{AckOrNack, PriorityQueueSender},
8 proto::{self, osprey_coordinator_sync_action},
9};
10use anyhow::{anyhow, Context, Result};
11use std::sync::Arc;
12use tokio::time::Instant;
13
14use osprey_coordinator_sync_action::osprey_coordinator_sync_action_service_server::OspreyCoordinatorSyncActionService;
15use osprey_coordinator_sync_action::ProcessActionRequest;
16use rand::Rng;
17
18pub(crate) struct SyncActionServer {
19 snowflake_client: Arc<SnowflakeClient>,
20 priority_queue_sender: PriorityQueueSender,
21 metrics: Arc<OspreyCoordinatorMetrics>,
22}
23
24impl SyncActionServer {
25 pub fn new(
26 snowflake_client: Arc<SnowflakeClient>,
27 priority_queue_sender: PriorityQueueSender,
28 metrics: Arc<OspreyCoordinatorMetrics>,
29 ) -> SyncActionServer {
30 SyncActionServer {
31 snowflake_client,
32 priority_queue_sender,
33 metrics,
34 }
35 }
36}
37
38async fn create_osprey_coordinator_action(
39 ack_id: u64,
40 action_request: &osprey_coordinator_sync_action::ProcessActionRequest,
41 snowflake_client: &SnowflakeClient,
42) -> Result<proto::OspreyCoordinatorAction> {
43 // generate snowflake if one is not provided, to match the behaviour in pubsub.rs
44 let action_id = match action_request.action_id {
45 Some(id) => match id {
46 // handle 0 as none-type, since protos default u64 to 0
47 0 => snowflake_client.generate_id().await?,
48 _ => id,
49 },
50 None => snowflake_client.generate_id().await?,
51 };
52 if action_request.action_name.is_empty() {
53 return Err(anyhow!("`action_name` must not be empty"));
54 }
55 let osprey_coordinator_action = proto::OspreyCoordinatorAction {
56 ack_id,
57 action_id,
58 action_name: action_request.action_name.clone(),
59 action_data: Some(
60 proto::osprey_coordinator_action::ActionData::JsonActionData(
61 action_request.action_data_json.clone().into(),
62 ),
63 ),
64 secret_data: None,
65 timestamp: Some(
66 action_request
67 .timestamp
68 .as_ref()
69 .context("`timestamp` not found")?
70 .clone(),
71 ),
72 };
73
74 Ok(osprey_coordinator_action)
75}
76
77impl SyncActionServer {
78 async fn try_process_action(
79 &self,
80 ack_id: u64,
81 action_request: &ProcessActionRequest,
82 ) -> Result<tonic::Response<osprey_coordinator_sync_action::ProcessActionResponse>, tonic::Status>
83 {
84 let unvalidated_action_id = action_request.action_id;
85
86 let osprey_coordinator_action = match create_osprey_coordinator_action(
87 ack_id,
88 action_request,
89 self.snowflake_client.as_ref(),
90 )
91 .await
92 {
93 Ok(result) => result,
94 Err(error) => {
95 tracing::error!({error=%error, ack_id=ack_id, action_id=unvalidated_action_id},"[rpc] deserialization error");
96 self.metrics
97 .sync_classification_failure_deserialization
98 .incr();
99 return Err(tonic::Status::new(tonic::Code::Aborted, error.to_string()));
100 }
101 };
102
103 let action_id = osprey_coordinator_action.action_id;
104
105 let (ackable_action, acking_receiver) = AckableAction::new(osprey_coordinator_action);
106
107 let send_start_time = Instant::now();
108 match self.priority_queue_sender.send_sync(ackable_action).await {
109 Ok(_) => {
110 tracing::debug!({action_id=%action_id, ack_id=ack_id}, "[rpc] sent message to priority queue")
111 }
112 Err(e) => {
113 self.metrics.sync_classification_failure_pq_send.incr();
114 tracing::error!({error=%e, action_id=%action_id, ack_id=ack_id},"[rpc] tried to send action to closed channel");
115 return Err(tonic::Status::new(tonic::Code::Unavailable, e.to_string()));
116 }
117 };
118 self.metrics
119 .priority_queue_send_time_sync
120 .record(send_start_time.elapsed());
121 tracing::debug!({action_id=%action_id, ack_id=ack_id},"[rpc] waiting on ack or nack");
122
123 let receive_start_time = Instant::now();
124 match acking_receiver.await {
125 Ok(ack_or_nack) => match ack_or_nack {
126 AckOrNack::Ack(verdicts) => {
127 tracing::debug!({action_id=%action_id, ack_id=ack_id},"[rpc] acking message");
128
129 let response =
130 osprey_coordinator_sync_action::ProcessActionResponse { verdicts };
131
132 self.metrics.sync_classification_result_ack.incr();
133 self.metrics
134 .receiver_ack_time_sync
135 .record(receive_start_time.elapsed());
136 Ok(tonic::Response::new(response))
137 }
138 AckOrNack::Nack => {
139 tracing::debug!({action_id=%action_id, ack_id=ack_id},"[rpc] nacking message");
140 self.metrics.sync_classification_result_nack.incr();
141 self.metrics
142 .receiver_ack_time_sync
143 .record(receive_start_time.elapsed());
144 Err(tonic::Status::aborted("action nacked"))
145 }
146 },
147 Err(recv_error) => {
148 tracing::error!({action_id=%action_id, recv_error=%recv_error, ack_id=ack_id},"[rpc] acking sender dropped");
149 self.metrics
150 .sync_classification_failure_oneshot_dropped
151 .incr();
152 self.metrics
153 .receiver_ack_time_sync
154 .record(receive_start_time.elapsed());
155 Err(tonic::Status::internal("acking onshot dropped"))
156 }
157 }
158 }
159}
160
161#[tonic::async_trait]
162impl OspreyCoordinatorSyncActionService for SyncActionServer {
163 async fn process_action(
164 &self,
165 request: tonic::Request<osprey_coordinator_sync_action::ProcessActionRequest>,
166 ) -> Result<tonic::Response<osprey_coordinator_sync_action::ProcessActionResponse>, tonic::Status>
167 {
168 self.metrics.sync_classification_action_received.incr();
169 let action_request = request.into_inner();
170 tracing::debug!({action_request=?action_request}, "[rpc] action request received");
171
172 let ack_id: u64 = {
173 let mut rng = rand::thread_rng();
174 rng.gen()
175 };
176
177 match self.try_process_action(ack_id, &action_request).await {
178 response @ Ok(_) => response,
179 Err(e) => {
180 tracing::error!("initial process_action attempt failed, retrying: {}", e);
181 self.try_process_action(ack_id, &action_request).await
182 }
183 }
184 }
185}