Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 173 lines 5.6 kB view raw
1mod backoff_utils; 2mod cached_futures; 3mod consumer; 4mod coordinator_metrics; 5mod discovery; 6mod etcd; 7mod etcd_config; 8mod etcd_watcherd; 9mod future_utils; 10mod gcloud; 11mod hashring; 12mod metrics; 13mod osprey_bidirectional_stream; 14mod pigeon; 15mod priority_queue; 16mod proto; 17mod pub_sub_streaming_pull; 18mod shutdown_handler; 19mod signals; 20mod snowflake_client; 21mod sync_action_rpc; 22mod tokio_utils; 23#[cfg(test)] 24mod tonic_mock; 25use anyhow::Result; 26use clap::Parser; 27use proto::osprey_coordinator_sync_action::osprey_coordinator_sync_action_service_server::OspreyCoordinatorSyncActionServiceServer; 28use std::sync::Arc; 29use std::time::Duration; 30 31use crate::coordinator_metrics::OspreyCoordinatorMetrics; 32use crate::snowflake_client::SnowflakeClient; 33 34use crate::metrics::emit_worker::SpawnEmitWorker; 35use crate::metrics::new_client; 36 37use consumer::{start_kafka_consumer, start_pubsub_subscriber}; 38use priority_queue::{create_ackable_action_priority_queue, spawn_priority_queue_metrics_worker}; 39use tokio::join; 40 41use crate::osprey_bidirectional_stream::OspreyCoordinatorServer; 42use crate::proto::osprey_coordinator_service_server::OspreyCoordinatorServiceServer; 43 44#[derive(Debug, Parser)] 45struct CliOptions { 46 #[arg( 47 short, 48 long, 49 default_value = "19950", 50 env = "OSPREY_COORDINATOR_BIDI_STREAM_PORT" 51 )] 52 bidi_stream_port: u16, 53 #[arg( 54 long, 55 default_value = "19951", 56 env = "OSPREY_COORDINATOR_SYNC_ACTION_PORT" 57 )] 58 sync_action_port: u16, 59 #[arg( 60 long, 61 default_value = "http://localhost:19952", 62 env = "SNOWFLAKE_API_ENDPOINT" 63 )] 64 snowflake_api_endpoint: String, 65} 66 67#[tokio::main] 68async fn main() -> Result<()> { 69 tracing_subscriber::fmt::init(); 70 let opts = CliOptions::parse(); 71 72 tracing::info!("starting Osprey Coordinator"); 73 74 tracing::info!("creating osprey-snowflake client"); 75 let snowflake_client = Arc::new(SnowflakeClient::new(opts.snowflake_api_endpoint)); 76 77 let (priority_queue_sender, priority_queue_receiver) = create_ackable_action_priority_queue(); 78 let metrics = OspreyCoordinatorMetrics::new(); 79 tracing::info!("starting grpc metrics worker"); 80 let _worker_guard = metrics 81 .clone() 82 .spawn_emit_worker(new_client("osprey_coordinator").unwrap()); 83 84 let osprey_coordinator_grpc_bidi_stream_service = 85 OspreyCoordinatorServiceServer::new(OspreyCoordinatorServer::new( 86 priority_queue_sender.clone(), 87 priority_queue_receiver.clone(), 88 metrics.clone(), 89 )); 90 91 let osprey_coordinator_sync_action_service = 92 OspreyCoordinatorSyncActionServiceServer::new(sync_action_rpc::SyncActionServer::new( 93 snowflake_client.clone(), 94 priority_queue_sender.clone(), 95 metrics.clone(), 96 )); 97 98 let consumer_type = std::env::var("OSPREY_COORDINATOR_CONSUMER_TYPE").ok(); 99 100 let consumer_fut = match consumer_type.as_deref() { 101 Some("kafka") => { 102 tracing::info!("starting Kafka consumer"); 103 Box::pin(start_kafka_consumer( 104 snowflake_client.clone(), 105 priority_queue_sender.clone(), 106 metrics.clone(), 107 )) 108 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 109 } 110 Some("pubsub") => { 111 tracing::info!("starting PubSub subscriber"); 112 Box::pin(start_pubsub_subscriber( 113 snowflake_client.clone(), 114 priority_queue_sender.clone(), 115 metrics.clone(), 116 )) 117 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 118 } 119 Some(invalid) => { 120 anyhow::bail!( 121 "invalid OSPREY_COORDINATOR_CONSUMER_TYPE '{}', must be 'kafka' or 'pubsub'", 122 invalid 123 ); 124 } 125 None => { 126 tracing::info!( 127 "OSPREY_COORDINATOR_CONSUMER_TYPE not set, defaulting to Kafka consumer" 128 ); 129 Box::pin(start_kafka_consumer( 130 snowflake_client.clone(), 131 priority_queue_sender.clone(), 132 metrics.clone(), 133 )) 134 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 135 } 136 }; 137 138 let grpc_bidi_stream_service_fut = pigeon::serve( 139 osprey_coordinator_grpc_bidi_stream_service, 140 "osprey_coordinator", 141 opts.bidi_stream_port, 142 Duration::from_secs(30), 143 ); 144 let sync_action_service_fut = pigeon::serve( 145 osprey_coordinator_sync_action_service, 146 "osprey_coordinator_sync_action", 147 opts.sync_action_port, 148 Duration::from_secs(60), 149 ); 150 151 tracing::info!("starting priority queue metrics worker"); 152 let _drop_guard = 153 spawn_priority_queue_metrics_worker(priority_queue_sender.clone(), metrics.clone()); 154 155 shutdown_handler::spawn_shutdown_handler( 156 priority_queue_sender.clone(), 157 priority_queue_receiver.clone(), 158 ); 159 160 tracing::info!("starting consumer/bidi stream/sync classification rpc"); 161 let (consumer_result, grpc_bidi_stream_service_result, sync_action_service_result) = join!( 162 consumer_fut, 163 grpc_bidi_stream_service_fut, 164 sync_action_service_fut 165 ); 166 tracing::info!({ 167 consumer_result=?consumer_result, 168 bidi_stream_result=?grpc_bidi_stream_service_result, 169 sync_action_result=?sync_action_service_result}, 170 "osprey coordinator terminated"); 171 172 Ok(()) 173}