Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 147 lines 4.9 kB view raw
1//! Wrappers around making an authenticated GRPC connection to a Google Cloud service. 2 3use std::time::Duration; 4 5use thiserror::Error; 6use tonic::transport::ClientTlsConfig; 7 8use crate::gcloud::{ 9 auth::{AuthorizationHeaderInterceptor, Scope, TokenRefresher}, 10 gcp_metadata::GCPMetadataClient, 11 root_ca_certificate, 12}; 13 14#[derive(Debug, Error)] 15pub enum ConnectionError { 16 #[error("AccessToken error: {0}")] 17 AccessTokenError(anyhow::Error), 18 19 #[error("Certificate error: {0}")] 20 CertificateError(String), 21 22 #[error("I/O Error: {0}")] 23 IoError(#[from] std::io::Error), 24 25 #[error("Invalid URI {0}")] 26 InvalidUri(#[from] http::uri::InvalidUri), 27 28 #[error("Transport error: {0}")] 29 TransportError(#[from] tonic::transport::Error), 30} 31 32pub type Result<T> = std::result::Result<T, ConnectionError>; 33 34#[derive(Clone)] 35pub struct Connection { 36 pub(crate) authorization_header_interceptor: AuthorizationHeaderInterceptor, 37 pub(crate) channel: tonic::transport::Channel, 38} 39 40impl Connection { 41 fn create_google_endpoint(domain: &str) -> Result<tonic::transport::Endpoint> { 42 let endpoint = tonic::transport::Channel::from_shared(format!("https://{domain}"))?; 43 let tls_config = ClientTlsConfig::new() 44 .ca_certificate(root_ca_certificate::load().map_err(ConnectionError::CertificateError)?) 45 .domain_name(domain); 46 endpoint.tls_config(tls_config).map_err(|e| e.into()) 47 } 48 49 pub async fn from_metadata_client( 50 client: GCPMetadataClient, 51 timeout: impl Into<Option<Duration>>, 52 token_refresh_period: Duration, 53 domain: &str, 54 ) -> Result<Self> { 55 let access_token_handle = TokenRefresher::with_metadata_client(client) 56 .await 57 .map_err(ConnectionError::AccessTokenError)? 58 .spawn_refresher(token_refresh_period); 59 60 let connection = Self { 61 authorization_header_interceptor: 62 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle), 63 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()), 64 }; 65 Ok(connection) 66 } 67 68 pub async fn from_json( 69 timeout: impl Into<Option<Duration>>, 70 token_refresh_period: Duration, 71 domain: &str, 72 ac_json: &[u8], 73 scope: Scope, 74 ) -> Result<Self> { 75 let access_token_handle = TokenRefresher::with_json_credentials(scope, ac_json) 76 .await 77 .map_err(ConnectionError::AccessTokenError)? 78 .spawn_refresher(token_refresh_period); 79 80 let connection = Self { 81 authorization_header_interceptor: 82 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle), 83 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()), 84 }; 85 Ok(connection) 86 } 87 88 /// Create a new connection using gcloud cli authentication 89 pub async fn new_with_gcloud( 90 timeout: impl Into<Option<Duration>>, 91 token_refresh_period: Duration, 92 domain: &str, 93 ) -> Result<Self> { 94 let access_token_handle = TokenRefresher::with_gcloud_auth() 95 .await 96 .map_err(ConnectionError::AccessTokenError)? 97 .spawn_refresher(token_refresh_period); 98 99 let connection = Self { 100 authorization_header_interceptor: 101 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle), 102 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()), 103 }; 104 Ok(connection) 105 } 106 107 pub async fn new( 108 timeout: impl Into<Option<Duration>>, 109 token_refresh_period: Duration, 110 domain: &str, 111 scope: Scope, 112 ) -> Result<Self> { 113 let access_token_handle = TokenRefresher::with_local_credentials(scope) 114 .await 115 .map_err(ConnectionError::AccessTokenError)? 116 .spawn_refresher(token_refresh_period); 117 118 let connection = Self { 119 authorization_header_interceptor: 120 AuthorizationHeaderInterceptor::with_access_token_provider(access_token_handle), 121 channel: apply_timeout(Self::create_google_endpoint(domain)?, timeout.into()), 122 }; 123 Ok(connection) 124 } 125 126 pub fn new_no_auth( 127 endpoint: tonic::transport::Endpoint, 128 timeout: impl Into<Option<Duration>>, 129 ) -> Self { 130 Self { 131 authorization_header_interceptor: 132 AuthorizationHeaderInterceptor::without_access_token_provider(), 133 channel: apply_timeout(endpoint, timeout.into()), 134 } 135 } 136} 137 138fn apply_timeout( 139 endpoint: tonic::transport::Endpoint, 140 timeout: Option<Duration>, 141) -> tonic::transport::Channel { 142 match timeout { 143 Some(timeout) => endpoint.timeout(timeout), 144 None => endpoint, 145 } 146 .connect_lazy() 147}