Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use anyhow::{anyhow, Result};
2use convert_case::{Case, Casing};
3use prost::Message as ProstMessage;
4use prost_types::Timestamp;
5use serde::Deserialize;
6use serde_json::Value;
7
8use crate::{
9 coordinator_metrics::OspreyCoordinatorMetrics,
10 metrics::counters::StaticCounter,
11 proto::{
12 self, osprey_coordinator_action::ActionData, osprey_coordinator_action::SecretData,
13 Action as OspreyProtoAction,
14 },
15 snowflake_client::SnowflakeClient,
16};
17
18pub async fn decode_proto_message(
19 message_data: &[u8],
20 ack_id: u64,
21 message_timestamp: Timestamp,
22 snowflake_client: &SnowflakeClient,
23 metrics: &OspreyCoordinatorMetrics,
24) -> Result<proto::OspreyCoordinatorAction> {
25 let osprey_proto_action = OspreyProtoAction::decode(message_data)?;
26 let action_id = if osprey_proto_action.id == 0 {
27 metrics.action_id_snowflake_generation_proto.incr();
28 snowflake_client.generate_id().await?
29 } else {
30 osprey_proto_action.id
31 };
32 let action_name = osprey_proto_action
33 .data
34 .ok_or_else(|| anyhow!("missing action data"))?
35 .to_string()
36 .to_case(Case::Snake);
37 Ok(proto::OspreyCoordinatorAction {
38 ack_id,
39 action_id,
40 action_name,
41 action_data: Some(ActionData::ProtoActionData(message_data.into())),
42 secret_data: None,
43 timestamp: Some(message_timestamp),
44 })
45}
46
47pub async fn decode_msgpack_json_message(
48 message_data: &[u8],
49 ack_id: u64,
50 message_timestamp: Timestamp,
51 snowflake_client: &SnowflakeClient,
52 metrics: &OspreyCoordinatorMetrics,
53) -> Result<proto::OspreyCoordinatorAction> {
54 use msgpack_simple::MsgPack;
55
56 #[derive(Deserialize, Debug)]
57 struct MsgpackAction {
58 id: Option<String>,
59 name: String,
60 data: Value,
61 secret_data: Option<Value>,
62 }
63
64 let decoded = MsgPack::parse(message_data)?;
65 let decoded = decoded.as_string()?;
66 let action: MsgpackAction = serde_json::from_str(decoded.as_str())?;
67
68 let serde_json_vec = serde_json::to_vec(&action.data)?;
69 let optional_secret_data = match &action.secret_data {
70 Some(secret_data) => Some(SecretData::JsonSecretData(serde_json::to_vec(secret_data)?)),
71 _ => None,
72 };
73
74 let action_id = match action.id {
75 Some(id) => id.parse::<u64>()?,
76 None => {
77 metrics.action_id_snowflake_generation_json.incr();
78 snowflake_client.generate_id().await?
79 }
80 };
81
82 Ok(proto::OspreyCoordinatorAction {
83 ack_id,
84 action_id,
85 action_name: action.name,
86 action_data: Some(ActionData::JsonActionData(serde_json_vec)),
87 secret_data: optional_secret_data,
88 timestamp: Some(message_timestamp),
89 })
90}