use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; use onis_common::config::OnisConfig; use tracing_subscriber::EnvFilter; mod api; mod materializer; mod tap; use materializer::AppState; use tap::TapConsumer; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .json() .init(); let metrics_handle = onis_common::metrics::init() .map_err(|e| anyhow::anyhow!("failed to install metrics recorder: {e}"))?; metrics::describe_counter!( "appview_firehose_events_total", "Total firehose events processed" ); metrics::describe_counter!( "appview_records_skipped_no_zone_total", "DNS records skipped due to missing zone declaration" ); metrics::describe_histogram!( "appview_sqlite_write_duration_seconds", "SQLite write duration" ); metrics::describe_histogram!( "appview_api_request_duration_seconds", "API request duration" ); metrics::describe_gauge!("appview_zones_total", "Total declared zones"); metrics::describe_gauge!("appview_users_total", "Total unique DIDs with zones"); let config = OnisConfig::load()?; let cfg = &config.appview; tracing::info!( bind = %cfg.bind, tap_url = %cfg.tap_url, "onis-appview starting" ); let state = Arc::new( AppState::new( &PathBuf::from(&cfg.index_path), PathBuf::from(&cfg.db_dir), cfg.database.clone(), metrics_handle, ) .await?, ); let api_bind = cfg.bind.clone(); let api_state = state.clone(); let api_handle = tokio::spawn(async move { let app = api::router(api_state); let listener = tokio::net::TcpListener::bind(&api_bind).await.unwrap(); tracing::info!("API listening on {api_bind}"); axum::serve(listener, app).await.unwrap(); }); let consumer = TapConsumer::new( cfg.tap_url.clone(), cfg.tap_acks, cfg.tap_reconnect_delay, ); let tap_state = state.clone(); let tap_handle = tokio::spawn(async move { consumer .run(|event| { let s = tap_state.clone(); async move { materializer::handle_event(s, event).await } }) .await .unwrap(); }); let gauge_state = state.clone(); let gauge_handle = tokio::spawn(async move { update_gauges(gauge_state).await; }); tokio::select! { _ = api_handle => tracing::error!("API server exited unexpectedly"), _ = tap_handle => tracing::error!("tap consumer exited unexpectedly"), _ = gauge_handle => tracing::error!("gauge updater exited unexpectedly"), } Ok(()) } /// Periodically query zone/user counts and update Prometheus gauges. async fn update_gauges(state: Arc) { loop { tokio::time::sleep(std::time::Duration::from_secs(30)).await; let zones: Result<(i64,), _> = sqlx::query_as("SELECT COUNT(*) FROM zone_index") .fetch_one(&state.index) .await; let users: Result<(i64,), _> = sqlx::query_as("SELECT COUNT(DISTINCT did) FROM zone_index") .fetch_one(&state.index) .await; if let Ok((count,)) = zones { metrics::gauge!("appview_zones_total").set(count as f64); } if let Ok((count,)) = users { metrics::gauge!("appview_users_total").set(count as f64); } } }