Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 90 lines 2.8 kB view raw
1use anyhow::{anyhow, Result}; 2use convert_case::{Case, Casing}; 3use prost::Message as ProstMessage; 4use prost_types::Timestamp; 5use serde::Deserialize; 6use serde_json::Value; 7 8use crate::{ 9 coordinator_metrics::OspreyCoordinatorMetrics, 10 metrics::counters::StaticCounter, 11 proto::{ 12 self, osprey_coordinator_action::ActionData, osprey_coordinator_action::SecretData, 13 Action as OspreyProtoAction, 14 }, 15 snowflake_client::SnowflakeClient, 16}; 17 18pub async fn decode_proto_message( 19 message_data: &[u8], 20 ack_id: u64, 21 message_timestamp: Timestamp, 22 snowflake_client: &SnowflakeClient, 23 metrics: &OspreyCoordinatorMetrics, 24) -> Result<proto::OspreyCoordinatorAction> { 25 let osprey_proto_action = OspreyProtoAction::decode(message_data)?; 26 let action_id = if osprey_proto_action.id == 0 { 27 metrics.action_id_snowflake_generation_proto.incr(); 28 snowflake_client.generate_id().await? 29 } else { 30 osprey_proto_action.id 31 }; 32 let action_name = osprey_proto_action 33 .data 34 .ok_or_else(|| anyhow!("missing action data"))? 35 .to_string() 36 .to_case(Case::Snake); 37 Ok(proto::OspreyCoordinatorAction { 38 ack_id, 39 action_id, 40 action_name, 41 action_data: Some(ActionData::ProtoActionData(message_data.into())), 42 secret_data: None, 43 timestamp: Some(message_timestamp), 44 }) 45} 46 47pub async fn decode_msgpack_json_message( 48 message_data: &[u8], 49 ack_id: u64, 50 message_timestamp: Timestamp, 51 snowflake_client: &SnowflakeClient, 52 metrics: &OspreyCoordinatorMetrics, 53) -> Result<proto::OspreyCoordinatorAction> { 54 use msgpack_simple::MsgPack; 55 56 #[derive(Deserialize, Debug)] 57 struct MsgpackAction { 58 id: Option<String>, 59 name: String, 60 data: Value, 61 secret_data: Option<Value>, 62 } 63 64 let decoded = MsgPack::parse(message_data)?; 65 let decoded = decoded.as_string()?; 66 let action: MsgpackAction = serde_json::from_str(decoded.as_str())?; 67 68 let serde_json_vec = serde_json::to_vec(&action.data)?; 69 let optional_secret_data = match &action.secret_data { 70 Some(secret_data) => Some(SecretData::JsonSecretData(serde_json::to_vec(secret_data)?)), 71 _ => None, 72 }; 73 74 let action_id = match action.id { 75 Some(id) => id.parse::<u64>()?, 76 None => { 77 metrics.action_id_snowflake_generation_json.incr(); 78 snowflake_client.generate_id().await? 79 } 80 }; 81 82 Ok(proto::OspreyCoordinatorAction { 83 ack_id, 84 action_id, 85 action_name: action.name, 86 action_data: Some(ActionData::JsonActionData(serde_json_vec)), 87 secret_data: optional_secret_data, 88 timestamp: Some(message_timestamp), 89 }) 90}