auth dns over atproto
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 123 lines 3.6 kB view raw
1use std::path::PathBuf; 2use std::sync::Arc; 3 4use anyhow::Result; 5use onis_common::config::OnisConfig; 6use tracing_subscriber::EnvFilter; 7 8mod api; 9mod materializer; 10mod tap; 11 12use materializer::AppState; 13use tap::TapConsumer; 14 15#[tokio::main] 16async fn main() -> Result<()> { 17 tracing_subscriber::fmt() 18 .with_env_filter(EnvFilter::from_default_env()) 19 .json() 20 .init(); 21 22 let metrics_handle = onis_common::metrics::init() 23 .map_err(|e| anyhow::anyhow!("failed to install metrics recorder: {e}"))?; 24 25 metrics::describe_counter!( 26 "appview_firehose_events_total", 27 "Total firehose events processed" 28 ); 29 metrics::describe_counter!( 30 "appview_records_skipped_no_zone_total", 31 "DNS records skipped due to missing zone declaration" 32 ); 33 metrics::describe_histogram!( 34 "appview_sqlite_write_duration_seconds", 35 "SQLite write duration" 36 ); 37 metrics::describe_histogram!( 38 "appview_api_request_duration_seconds", 39 "API request duration" 40 ); 41 metrics::describe_gauge!("appview_zones_total", "Total declared zones"); 42 metrics::describe_gauge!("appview_users_total", "Total unique DIDs with zones"); 43 44 let config = OnisConfig::load()?; 45 let cfg = &config.appview; 46 47 tracing::info!( 48 bind = %cfg.bind, 49 tap_url = %cfg.tap_url, 50 "onis-appview starting" 51 ); 52 53 let state = Arc::new( 54 AppState::new( 55 &PathBuf::from(&cfg.index_path), 56 PathBuf::from(&cfg.db_dir), 57 cfg.database.clone(), 58 metrics_handle, 59 ) 60 .await?, 61 ); 62 63 let api_bind = cfg.bind.clone(); 64 let api_state = state.clone(); 65 let api_handle = tokio::spawn(async move { 66 let app = api::router(api_state); 67 let listener = tokio::net::TcpListener::bind(&api_bind).await.unwrap(); 68 tracing::info!("API listening on {api_bind}"); 69 axum::serve(listener, app).await.unwrap(); 70 }); 71 72 let consumer = TapConsumer::new( 73 cfg.tap_url.clone(), 74 cfg.tap_acks, 75 cfg.tap_reconnect_delay, 76 ); 77 let tap_state = state.clone(); 78 let tap_handle = tokio::spawn(async move { 79 consumer 80 .run(|event| { 81 let s = tap_state.clone(); 82 async move { materializer::handle_event(s, event).await } 83 }) 84 .await 85 .unwrap(); 86 }); 87 88 let gauge_state = state.clone(); 89 let gauge_handle = tokio::spawn(async move { 90 update_gauges(gauge_state).await; 91 }); 92 93 tokio::select! { 94 _ = api_handle => tracing::error!("API server exited unexpectedly"), 95 _ = tap_handle => tracing::error!("tap consumer exited unexpectedly"), 96 _ = gauge_handle => tracing::error!("gauge updater exited unexpectedly"), 97 } 98 99 Ok(()) 100} 101 102/// Periodically query zone/user counts and update Prometheus gauges. 103async fn update_gauges(state: Arc<AppState>) { 104 loop { 105 tokio::time::sleep(std::time::Duration::from_secs(30)).await; 106 107 let zones: Result<(i64,), _> = 108 sqlx::query_as("SELECT COUNT(*) FROM zone_index") 109 .fetch_one(&state.index) 110 .await; 111 let users: Result<(i64,), _> = 112 sqlx::query_as("SELECT COUNT(DISTINCT did) FROM zone_index") 113 .fetch_one(&state.index) 114 .await; 115 116 if let Ok((count,)) = zones { 117 metrics::gauge!("appview_zones_total").set(count as f64); 118 } 119 if let Ok((count,)) = users { 120 metrics::gauge!("appview_users_total").set(count as f64); 121 } 122 } 123}