Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1mod backoff_utils;
2mod cached_futures;
3mod consumer;
4mod coordinator_metrics;
5mod discovery;
6mod etcd;
7mod etcd_config;
8mod etcd_watcherd;
9mod future_utils;
10mod gcloud;
11mod hashring;
12mod metrics;
13mod osprey_bidirectional_stream;
14mod pigeon;
15mod priority_queue;
16mod proto;
17mod pub_sub_streaming_pull;
18mod shutdown_handler;
19mod signals;
20mod snowflake_client;
21mod sync_action_rpc;
22mod tokio_utils;
23#[cfg(test)]
24mod tonic_mock;
25use anyhow::Result;
26use clap::Parser;
27use proto::osprey_coordinator_sync_action::osprey_coordinator_sync_action_service_server::OspreyCoordinatorSyncActionServiceServer;
28use std::sync::Arc;
29use std::time::Duration;
30
31use crate::coordinator_metrics::OspreyCoordinatorMetrics;
32use crate::snowflake_client::SnowflakeClient;
33
34use crate::metrics::emit_worker::SpawnEmitWorker;
35use crate::metrics::new_client;
36
37use consumer::{start_kafka_consumer, start_pubsub_subscriber};
38use priority_queue::{create_ackable_action_priority_queue, spawn_priority_queue_metrics_worker};
39use tokio::join;
40
41use crate::osprey_bidirectional_stream::OspreyCoordinatorServer;
42use crate::proto::osprey_coordinator_service_server::OspreyCoordinatorServiceServer;
43
44#[derive(Debug, Parser)]
45struct CliOptions {
46 #[arg(
47 short,
48 long,
49 default_value = "19950",
50 env = "OSPREY_COORDINATOR_BIDI_STREAM_PORT"
51 )]
52 bidi_stream_port: u16,
53 #[arg(
54 long,
55 default_value = "19951",
56 env = "OSPREY_COORDINATOR_SYNC_ACTION_PORT"
57 )]
58 sync_action_port: u16,
59 #[arg(
60 long,
61 default_value = "http://localhost:19952",
62 env = "SNOWFLAKE_API_ENDPOINT"
63 )]
64 snowflake_api_endpoint: String,
65}
66
67#[tokio::main]
68async fn main() -> Result<()> {
69 tracing_subscriber::fmt::init();
70 let opts = CliOptions::parse();
71
72 tracing::info!("starting Osprey Coordinator");
73
74 tracing::info!("creating osprey-snowflake client");
75 let snowflake_client = Arc::new(SnowflakeClient::new(opts.snowflake_api_endpoint));
76
77 let (priority_queue_sender, priority_queue_receiver) = create_ackable_action_priority_queue();
78 let metrics = OspreyCoordinatorMetrics::new();
79 tracing::info!("starting grpc metrics worker");
80 let _worker_guard = metrics
81 .clone()
82 .spawn_emit_worker(new_client("osprey_coordinator").unwrap());
83
84 let osprey_coordinator_grpc_bidi_stream_service =
85 OspreyCoordinatorServiceServer::new(OspreyCoordinatorServer::new(
86 priority_queue_sender.clone(),
87 priority_queue_receiver.clone(),
88 metrics.clone(),
89 ));
90
91 let osprey_coordinator_sync_action_service =
92 OspreyCoordinatorSyncActionServiceServer::new(sync_action_rpc::SyncActionServer::new(
93 snowflake_client.clone(),
94 priority_queue_sender.clone(),
95 metrics.clone(),
96 ));
97
98 let consumer_type = std::env::var("OSPREY_COORDINATOR_CONSUMER_TYPE").ok();
99
100 let consumer_fut = match consumer_type.as_deref() {
101 Some("kafka") => {
102 tracing::info!("starting Kafka consumer");
103 Box::pin(start_kafka_consumer(
104 snowflake_client.clone(),
105 priority_queue_sender.clone(),
106 metrics.clone(),
107 ))
108 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
109 }
110 Some("pubsub") => {
111 tracing::info!("starting PubSub subscriber");
112 Box::pin(start_pubsub_subscriber(
113 snowflake_client.clone(),
114 priority_queue_sender.clone(),
115 metrics.clone(),
116 ))
117 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
118 }
119 Some(invalid) => {
120 anyhow::bail!(
121 "invalid OSPREY_COORDINATOR_CONSUMER_TYPE '{}', must be 'kafka' or 'pubsub'",
122 invalid
123 );
124 }
125 None => {
126 tracing::info!(
127 "OSPREY_COORDINATOR_CONSUMER_TYPE not set, defaulting to Kafka consumer"
128 );
129 Box::pin(start_kafka_consumer(
130 snowflake_client.clone(),
131 priority_queue_sender.clone(),
132 metrics.clone(),
133 ))
134 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
135 }
136 };
137
138 let grpc_bidi_stream_service_fut = pigeon::serve(
139 osprey_coordinator_grpc_bidi_stream_service,
140 "osprey_coordinator",
141 opts.bidi_stream_port,
142 Duration::from_secs(30),
143 );
144 let sync_action_service_fut = pigeon::serve(
145 osprey_coordinator_sync_action_service,
146 "osprey_coordinator_sync_action",
147 opts.sync_action_port,
148 Duration::from_secs(60),
149 );
150
151 tracing::info!("starting priority queue metrics worker");
152 let _drop_guard =
153 spawn_priority_queue_metrics_worker(priority_queue_sender.clone(), metrics.clone());
154
155 shutdown_handler::spawn_shutdown_handler(
156 priority_queue_sender.clone(),
157 priority_queue_receiver.clone(),
158 );
159
160 tracing::info!("starting consumer/bidi stream/sync classification rpc");
161 let (consumer_result, grpc_bidi_stream_service_result, sync_action_service_result) = join!(
162 consumer_fut,
163 grpc_bidi_stream_service_fut,
164 sync_action_service_fut
165 );
166 tracing::info!({
167 consumer_result=?consumer_result,
168 bidi_stream_result=?grpc_bidi_stream_service_result,
169 sync_action_result=?sync_action_service_result},
170 "osprey coordinator terminated");
171
172 Ok(())
173}