Ramjet is a relay consumer that supports configurable forward and track collections, as well as record reconciliation.
event-stream
relay
firehose
riblt
atprotocol
1//! Ramjet service entry point.
2//!
3//! Initializes configuration, storage, metrics, and the HTTP server,
4//! then spawns the async pipeline tasks.
5
6use std::sync::Arc;
7
8use atproto_identity::resolve::{
9 HickoryDnsResolver, InnerIdentityResolver, SharedIdentityResolver,
10};
11use atproto_identity::traits::IdentityResolver;
12use clap::Parser;
13use tokio::net::TcpListener;
14use tokio::sync::{Semaphore, mpsc};
15use tokio_util::sync::CancellationToken;
16
17use ramjet::config::{CliArgs, ServiceConfig};
18use ramjet::pipeline::backfill::run_backfill_worker;
19use ramjet::pipeline::fanout::FanOutChannels;
20use ramjet::pipeline::identity::run_identity_worker;
21use ramjet::pipeline::ingester::run_ingester;
22use ramjet::pipeline::writer::run_writer;
23use ramjet::server::metrics::Metrics;
24use ramjet::server::{AppState, build_router};
25use ramjet::storage::FjallDb;
26
27#[tokio::main]
28async fn main() -> anyhow::Result<()> {
29 // Parse CLI arguments and build config
30 let args = CliArgs::parse();
31 let config = ServiceConfig::from(args);
32
33 // Initialize tracing
34 tracing_subscriber::fmt()
35 .with_env_filter(
36 tracing_subscriber::EnvFilter::try_from_default_env()
37 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
38 )
39 .init();
40
41 tracing::info!(
42 db_path = %config.db_path.display(),
43 listen_addr = %config.listen_addr,
44 relay_host = %config.relay_host,
45 "starting ramjet"
46 );
47
48 // Open storage
49 let db = Arc::new(FjallDb::open(
50 &config.db_path,
51 config.zstd_dict_path.as_deref(),
52 )?);
53 tracing::info!("fjall database opened");
54
55 if let Ok(Some(cursor)) = db.get_cursor() {
56 tracing::info!(cursor, "resuming from stored cursor");
57 } else {
58 tracing::info!("no stored cursor, will start from live head");
59 }
60
61 // Queue any DIDs specified via --backfill that haven't been backfilled yet.
62 for did in &config.backfill_dids {
63 let already_backfilled = db
64 .get_repo_state(did)
65 .ok()
66 .flatten()
67 .is_some_and(|rs| rs.backfilled);
68 if already_backfilled {
69 tracing::info!(%did, "skipping --backfill DID, already backfilled");
70 } else {
71 let queue_key = format!("backfill_queue\x00{did}");
72 db.meta.insert(queue_key.as_bytes(), b"")?;
73 tracing::info!(%did, "queued --backfill DID for backfill");
74 }
75 }
76
77 // Create shared state
78 let config = Arc::new(config);
79 let fanout = Arc::new(FanOutChannels::with_consumer_groups(
80 8192,
81 &config.consumer_groups,
82 ));
83 for group in &config.consumer_groups {
84 tracing::info!(
85 group = %group.name,
86 partitions = group.partition_count,
87 "registered consumer group"
88 );
89 }
90 let metrics = Arc::new(Metrics::new());
91 let cancel = CancellationToken::new();
92
93 // Build identity resolver
94 let dns_resolver = HickoryDnsResolver::create_resolver(&[]);
95 let http_client = reqwest::Client::new();
96 let identity_resolver: Arc<dyn IdentityResolver> =
97 Arc::new(SharedIdentityResolver(Arc::new(InnerIdentityResolver {
98 dns_resolver: Arc::new(dns_resolver),
99 http_client,
100 plc_hostname: "plc.directory".to_string(),
101 })));
102
103 let state = AppState {
104 db: db.clone(),
105 config: config.clone(),
106 metrics: metrics.clone(),
107 fanout: fanout.clone(),
108 resolver: identity_resolver.clone(),
109 };
110
111 // Ingester → Writer channel
112 let (ingest_tx, ingest_rx) = mpsc::channel(81920);
113
114 // Identity event channel
115 let (identity_tx, identity_rx) = mpsc::channel(40960);
116
117 // Task monitor for tokio-metrics instrumentation
118 let task_monitor = metrics.tokio_metrics.monitor().clone();
119
120 // Spawn tokio-metrics collector (updates prometheus gauges every 10s)
121 let tokio_metrics_handle = tokio::spawn({
122 let tokio_metrics = metrics.tokio_metrics.clone();
123 let cancel = cancel.clone();
124 async move {
125 tokio_metrics
126 .run_collector(std::time::Duration::from_secs(10), cancel)
127 .await;
128 }
129 });
130
131 // Spawn pipeline tasks (all instrumented with tokio-metrics)
132 let ingester_handle = tokio::spawn(task_monitor.instrument({
133 let config = config.clone();
134 let db = db.clone();
135 let identity_tx = identity_tx.clone();
136 let metrics = metrics.clone();
137 let cancel = cancel.clone();
138 async move {
139 if let Err(e) = run_ingester(config, db, ingest_tx, identity_tx, metrics, cancel).await
140 {
141 tracing::error!(error = %e, "ingester failed");
142 }
143 }
144 }));
145
146 let writer_handle = tokio::spawn(task_monitor.instrument({
147 let config = config.clone();
148 let db = db.clone();
149 let fanout = fanout.clone();
150 let metrics = metrics.clone();
151 let cancel = cancel.clone();
152 async move {
153 if let Err(e) = run_writer(config, db, ingest_rx, fanout, metrics, cancel).await {
154 tracing::error!(error = %e, "writer failed");
155 }
156 }
157 }));
158
159 let identity_handle = tokio::spawn(task_monitor.instrument({
160 let db = db.clone();
161 let cancel = cancel.clone();
162 let semaphore = Arc::new(Semaphore::new(10));
163 let resolver = identity_resolver.clone();
164 let metrics = metrics.clone();
165 async move {
166 if let Err(e) =
167 run_identity_worker(db, resolver, semaphore, metrics, identity_rx, cancel).await
168 {
169 tracing::error!(error = %e, "identity worker failed");
170 }
171 }
172 }));
173
174 let backfill_handle = tokio::spawn(task_monitor.instrument({
175 let db = db.clone();
176 let config = config.clone();
177 let fanout = fanout.clone();
178 let resolver = identity_resolver.clone();
179 let metrics = metrics.clone();
180 let cancel = cancel.clone();
181 async move {
182 if let Err(e) = run_backfill_worker(db, config, fanout, resolver, metrics, cancel).await
183 {
184 tracing::error!(error = %e, "backfill worker failed");
185 }
186 }
187 }));
188
189 // Spawn QUIC reconciliation server (if configured)
190 let quic_handle = config.quic_listen_addr.map(|addr| {
191 tokio::spawn(task_monitor.instrument({
192 let db = db.clone();
193 let config = config.clone();
194 let cancel = cancel.clone();
195 async move {
196 if let Err(e) =
197 ramjet::server::quic::run_quic_server(addr, db, config, cancel).await
198 {
199 tracing::error!(error = %e, "QUIC reconciliation server failed");
200 }
201 }
202 }))
203 });
204
205 // Build router and bind
206 let router = build_router(state);
207 let listener = TcpListener::bind(config.listen_addr).await?;
208 tracing::info!(addr = %config.listen_addr, "HTTP server listening");
209
210 // Serve with graceful shutdown
211 axum::serve(listener, router)
212 .with_graceful_shutdown(shutdown_signal(cancel.clone()))
213 .await?;
214
215 // Cancel all pipeline tasks
216 cancel.cancel();
217 let _ = tokio::join!(
218 ingester_handle,
219 writer_handle,
220 identity_handle,
221 backfill_handle,
222 tokio_metrics_handle,
223 );
224 if let Some(handle) = quic_handle {
225 let _ = handle.await;
226 }
227
228 tracing::info!("ramjet shut down");
229 Ok(())
230}
231
232async fn shutdown_signal(cancel: CancellationToken) {
233 let ctrl_c = async {
234 tokio::signal::ctrl_c()
235 .await
236 .expect("failed to install Ctrl+C handler");
237 };
238
239 #[cfg(unix)]
240 let terminate = async {
241 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
242 .expect("failed to install SIGTERM handler")
243 .recv()
244 .await;
245 };
246
247 #[cfg(not(unix))]
248 let terminate = std::future::pending::<()>();
249
250 tokio::select! {
251 () = ctrl_c => tracing::info!("received Ctrl+C"),
252 () = terminate => tracing::info!("received SIGTERM"),
253 }
254
255 cancel.cancel();
256}