Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1//! Wrappers around making an authenticated GRPC connection to a Google Cloud service.
2
3use std::time::Duration;
4
5use thiserror::Error;
6use tonic::transport::ClientTlsConfig;
7
8use crate::gcloud::{
9 auth::{AuthorizationHeaderInterceptor, Scope, TokenRefresher},
10 gcp_metadata::GCPMetadataClient,
11 root_ca_certificate,
12};
13
14#[derive(Debug, Error)]
15pub enum ConnectionError {
16 #[error("AccessToken error: {0}")]
17 AccessTokenError(anyhow::Error),
18
19 #[error("Certificate error: {0}")]
20 CertificateError(String),
21
22 #[error("I/O Error: {0}")]
23 IoError(#[from] std::io::Error),
24
25 #[error("Invalid URI {0}")]
26 InvalidUri(#[from] http::uri::InvalidUri),
27
28 #[error("Transport error: {0}")]
29 TransportError(#[from] tonic::transport::Error),
30}
31
32pub type Result<T> = std::result::Result<T, ConnectionError>;
33
34#[derive(Clone)]
35pub struct Connection {
36 pub(crate) authorization_header_interceptor: AuthorizationHeaderInterceptor,
37 pub(crate) channel: tonic::transport::Channel,
38}
39
40impl Connection {
41 fn create_google_endpoint(domain: &str) -> Result<tonic::transport::Endpoint> {
42 let endpoint = tonic::transport::Channel::from_shared(format!("https://{domain}"))?;
43 let tls_config = ClientTlsConfig::new()
44 .ca_certificate(root_ca_certificate::load().map_err(ConnectionError::CertificateError)?)
45 .domain_name(domain);
46 endpoint.tls_config(tls_config).map_err(|e| e.into())
47 }
48
49 pub async fn from_metadata_client(
50 client: GCPMetadataClient,
51 timeout: impl Into<Option<Duration>>,
52 token_refresh_period: Duration,
53 domain: &str,
54 ) -> Result<Self> {
55 let access_token_handle = TokenRefresher::with_metadata_client(client)
56 .await
57 .map_err(ConnectionError::AccessTokenError)?
58 .spawn_refresher(token_refresh_period);
59
60 let connection = Self {
61 authorization_header_interceptor:
62 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle),
63 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()),
64 };
65 Ok(connection)
66 }
67
68 pub async fn from_json(
69 timeout: impl Into<Option<Duration>>,
70 token_refresh_period: Duration,
71 domain: &str,
72 ac_json: &[u8],
73 scope: Scope,
74 ) -> Result<Self> {
75 let access_token_handle = TokenRefresher::with_json_credentials(scope, ac_json)
76 .await
77 .map_err(ConnectionError::AccessTokenError)?
78 .spawn_refresher(token_refresh_period);
79
80 let connection = Self {
81 authorization_header_interceptor:
82 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle),
83 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()),
84 };
85 Ok(connection)
86 }
87
88 /// Create a new connection using gcloud cli authentication
89 pub async fn new_with_gcloud(
90 timeout: impl Into<Option<Duration>>,
91 token_refresh_period: Duration,
92 domain: &str,
93 ) -> Result<Self> {
94 let access_token_handle = TokenRefresher::with_gcloud_auth()
95 .await
96 .map_err(ConnectionError::AccessTokenError)?
97 .spawn_refresher(token_refresh_period);
98
99 let connection = Self {
100 authorization_header_interceptor:
101 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle),
102 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()),
103 };
104 Ok(connection)
105 }
106
107 pub async fn new(
108 timeout: impl Into<Option<Duration>>,
109 token_refresh_period: Duration,
110 domain: &str,
111 scope: Scope,
112 ) -> Result<Self> {
113 let access_token_handle = TokenRefresher::with_local_credentials(scope)
114 .await
115 .map_err(ConnectionError::AccessTokenError)?
116 .spawn_refresher(token_refresh_period);
117
118 let connection = Self {
119 authorization_header_interceptor:
120 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle),
121 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()),
122 };
123 Ok(connection)
124 }
125
126 pub fn new_no_auth(
127 endpoint: tonic::transport::Endpoint,
128 timeout: impl Into<Option<Duration>>,
129 ) -> Self {
130 Self {
131 authorization_header_interceptor:
132 AuthorizationHeaderInterceptor::without_access_token_provider(),
133 channel: apply_timeout(endpoint, timeout.into()),
134 }
135 }
136}
137
138fn apply_timeout(
139 endpoint: tonic::transport::Endpoint,
140 timeout: Option<Duration>,
141) -> tonic::transport::Channel {
142 match timeout {
143 Some(timeout) => endpoint.timeout(timeout),
144 None => endpoint,
145 }
146 .connect_lazy()
147}