Ramjet is a relay consumer that supports configurable forward and track collections, as well as record reconciliation.
event-stream relay firehose riblt atprotocol
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 256 lines 8.3 kB view raw
1//! Ramjet service entry point. 2//! 3//! Initializes configuration, storage, metrics, and the HTTP server, 4//! then spawns the async pipeline tasks. 5 6use std::sync::Arc; 7 8use atproto_identity::resolve::{ 9 HickoryDnsResolver, InnerIdentityResolver, SharedIdentityResolver, 10}; 11use atproto_identity::traits::IdentityResolver; 12use clap::Parser; 13use tokio::net::TcpListener; 14use tokio::sync::{Semaphore, mpsc}; 15use tokio_util::sync::CancellationToken; 16 17use ramjet::config::{CliArgs, ServiceConfig}; 18use ramjet::pipeline::backfill::run_backfill_worker; 19use ramjet::pipeline::fanout::FanOutChannels; 20use ramjet::pipeline::identity::run_identity_worker; 21use ramjet::pipeline::ingester::run_ingester; 22use ramjet::pipeline::writer::run_writer; 23use ramjet::server::metrics::Metrics; 24use ramjet::server::{AppState, build_router}; 25use ramjet::storage::FjallDb; 26 27#[tokio::main] 28async fn main() -> anyhow::Result<()> { 29 // Parse CLI arguments and build config 30 let args = CliArgs::parse(); 31 let config = ServiceConfig::from(args); 32 33 // Initialize tracing 34 tracing_subscriber::fmt() 35 .with_env_filter( 36 tracing_subscriber::EnvFilter::try_from_default_env() 37 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), 38 ) 39 .init(); 40 41 tracing::info!( 42 db_path = %config.db_path.display(), 43 listen_addr = %config.listen_addr, 44 relay_host = %config.relay_host, 45 "starting ramjet" 46 ); 47 48 // Open storage 49 let db = Arc::new(FjallDb::open( 50 &config.db_path, 51 config.zstd_dict_path.as_deref(), 52 )?); 53 tracing::info!("fjall database opened"); 54 55 if let Ok(Some(cursor)) = db.get_cursor() { 56 tracing::info!(cursor, "resuming from stored cursor"); 57 } else { 58 tracing::info!("no stored cursor, will start from live head"); 59 } 60 61 // Queue any DIDs specified via --backfill that haven't been backfilled yet. 62 for did in &config.backfill_dids { 63 let already_backfilled = db 64 .get_repo_state(did) 65 .ok() 66 .flatten() 67 .is_some_and(|rs| rs.backfilled); 68 if already_backfilled { 69 tracing::info!(%did, "skipping --backfill DID, already backfilled"); 70 } else { 71 let queue_key = format!("backfill_queue\x00{did}"); 72 db.meta.insert(queue_key.as_bytes(), b"")?; 73 tracing::info!(%did, "queued --backfill DID for backfill"); 74 } 75 } 76 77 // Create shared state 78 let config = Arc::new(config); 79 let fanout = Arc::new(FanOutChannels::with_consumer_groups( 80 8192, 81 &config.consumer_groups, 82 )); 83 for group in &config.consumer_groups { 84 tracing::info!( 85 group = %group.name, 86 partitions = group.partition_count, 87 "registered consumer group" 88 ); 89 } 90 let metrics = Arc::new(Metrics::new()); 91 let cancel = CancellationToken::new(); 92 93 // Build identity resolver 94 let dns_resolver = HickoryDnsResolver::create_resolver(&[]); 95 let http_client = reqwest::Client::new(); 96 let identity_resolver: Arc<dyn IdentityResolver> = 97 Arc::new(SharedIdentityResolver(Arc::new(InnerIdentityResolver { 98 dns_resolver: Arc::new(dns_resolver), 99 http_client, 100 plc_hostname: "plc.directory".to_string(), 101 }))); 102 103 let state = AppState { 104 db: db.clone(), 105 config: config.clone(), 106 metrics: metrics.clone(), 107 fanout: fanout.clone(), 108 resolver: identity_resolver.clone(), 109 }; 110 111 // Ingester → Writer channel 112 let (ingest_tx, ingest_rx) = mpsc::channel(81920); 113 114 // Identity event channel 115 let (identity_tx, identity_rx) = mpsc::channel(40960); 116 117 // Task monitor for tokio-metrics instrumentation 118 let task_monitor = metrics.tokio_metrics.monitor().clone(); 119 120 // Spawn tokio-metrics collector (updates prometheus gauges every 10s) 121 let tokio_metrics_handle = tokio::spawn({ 122 let tokio_metrics = metrics.tokio_metrics.clone(); 123 let cancel = cancel.clone(); 124 async move { 125 tokio_metrics 126 .run_collector(std::time::Duration::from_secs(10), cancel) 127 .await; 128 } 129 }); 130 131 // Spawn pipeline tasks (all instrumented with tokio-metrics) 132 let ingester_handle = tokio::spawn(task_monitor.instrument({ 133 let config = config.clone(); 134 let db = db.clone(); 135 let identity_tx = identity_tx.clone(); 136 let metrics = metrics.clone(); 137 let cancel = cancel.clone(); 138 async move { 139 if let Err(e) = run_ingester(config, db, ingest_tx, identity_tx, metrics, cancel).await 140 { 141 tracing::error!(error = %e, "ingester failed"); 142 } 143 } 144 })); 145 146 let writer_handle = tokio::spawn(task_monitor.instrument({ 147 let config = config.clone(); 148 let db = db.clone(); 149 let fanout = fanout.clone(); 150 let metrics = metrics.clone(); 151 let cancel = cancel.clone(); 152 async move { 153 if let Err(e) = run_writer(config, db, ingest_rx, fanout, metrics, cancel).await { 154 tracing::error!(error = %e, "writer failed"); 155 } 156 } 157 })); 158 159 let identity_handle = tokio::spawn(task_monitor.instrument({ 160 let db = db.clone(); 161 let cancel = cancel.clone(); 162 let semaphore = Arc::new(Semaphore::new(10)); 163 let resolver = identity_resolver.clone(); 164 let metrics = metrics.clone(); 165 async move { 166 if let Err(e) = 167 run_identity_worker(db, resolver, semaphore, metrics, identity_rx, cancel).await 168 { 169 tracing::error!(error = %e, "identity worker failed"); 170 } 171 } 172 })); 173 174 let backfill_handle = tokio::spawn(task_monitor.instrument({ 175 let db = db.clone(); 176 let config = config.clone(); 177 let fanout = fanout.clone(); 178 let resolver = identity_resolver.clone(); 179 let metrics = metrics.clone(); 180 let cancel = cancel.clone(); 181 async move { 182 if let Err(e) = run_backfill_worker(db, config, fanout, resolver, metrics, cancel).await 183 { 184 tracing::error!(error = %e, "backfill worker failed"); 185 } 186 } 187 })); 188 189 // Spawn QUIC reconciliation server (if configured) 190 let quic_handle = config.quic_listen_addr.map(|addr| { 191 tokio::spawn(task_monitor.instrument({ 192 let db = db.clone(); 193 let config = config.clone(); 194 let cancel = cancel.clone(); 195 async move { 196 if let Err(e) = 197 ramjet::server::quic::run_quic_server(addr, db, config, cancel).await 198 { 199 tracing::error!(error = %e, "QUIC reconciliation server failed"); 200 } 201 } 202 })) 203 }); 204 205 // Build router and bind 206 let router = build_router(state); 207 let listener = TcpListener::bind(config.listen_addr).await?; 208 tracing::info!(addr = %config.listen_addr, "HTTP server listening"); 209 210 // Serve with graceful shutdown 211 axum::serve(listener, router) 212 .with_graceful_shutdown(shutdown_signal(cancel.clone())) 213 .await?; 214 215 // Cancel all pipeline tasks 216 cancel.cancel(); 217 let _ = tokio::join!( 218 ingester_handle, 219 writer_handle, 220 identity_handle, 221 backfill_handle, 222 tokio_metrics_handle, 223 ); 224 if let Some(handle) = quic_handle { 225 let _ = handle.await; 226 } 227 228 tracing::info!("ramjet shut down"); 229 Ok(()) 230} 231 232async fn shutdown_signal(cancel: CancellationToken) { 233 let ctrl_c = async { 234 tokio::signal::ctrl_c() 235 .await 236 .expect("failed to install Ctrl+C handler"); 237 }; 238 239 #[cfg(unix)] 240 let terminate = async { 241 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) 242 .expect("failed to install SIGTERM handler") 243 .recv() 244 .await; 245 }; 246 247 #[cfg(not(unix))] 248 let terminate = std::future::pending::<()>(); 249 250 tokio::select! { 251 () = ctrl_c => tracing::info!("received Ctrl+C"), 252 () = terminate => tracing::info!("received SIGTERM"), 253 } 254 255 cancel.cancel(); 256}