auth dns over atproto
1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use onis_common::config::OnisConfig;
6use tracing_subscriber::EnvFilter;
7
8mod api;
9mod materializer;
10mod tap;
11
12use materializer::AppState;
13use tap::TapConsumer;
14
15#[tokio::main]
16async fn main() -> Result<()> {
17 tracing_subscriber::fmt()
18 .with_env_filter(EnvFilter::from_default_env())
19 .json()
20 .init();
21
22 let metrics_handle = onis_common::metrics::init()
23 .map_err(|e| anyhow::anyhow!("failed to install metrics recorder: {e}"))?;
24
25 metrics::describe_counter!(
26 "appview_firehose_events_total",
27 "Total firehose events processed"
28 );
29 metrics::describe_counter!(
30 "appview_records_skipped_no_zone_total",
31 "DNS records skipped due to missing zone declaration"
32 );
33 metrics::describe_histogram!(
34 "appview_sqlite_write_duration_seconds",
35 "SQLite write duration"
36 );
37 metrics::describe_histogram!(
38 "appview_api_request_duration_seconds",
39 "API request duration"
40 );
41 metrics::describe_gauge!("appview_zones_total", "Total declared zones");
42 metrics::describe_gauge!("appview_users_total", "Total unique DIDs with zones");
43
44 let config = OnisConfig::load()?;
45 let cfg = &config.appview;
46
47 tracing::info!(
48 bind = %cfg.bind,
49 tap_url = %cfg.tap_url,
50 "onis-appview starting"
51 );
52
53 let state = Arc::new(
54 AppState::new(
55 &PathBuf::from(&cfg.index_path),
56 PathBuf::from(&cfg.db_dir),
57 cfg.database.clone(),
58 metrics_handle,
59 )
60 .await?,
61 );
62
63 let api_bind = cfg.bind.clone();
64 let api_state = state.clone();
65 let api_handle = tokio::spawn(async move {
66 let app = api::router(api_state);
67 let listener = tokio::net::TcpListener::bind(&api_bind).await.unwrap();
68 tracing::info!("API listening on {api_bind}");
69 axum::serve(listener, app).await.unwrap();
70 });
71
72 let consumer = TapConsumer::new(
73 cfg.tap_url.clone(),
74 cfg.tap_acks,
75 cfg.tap_reconnect_delay,
76 );
77 let tap_state = state.clone();
78 let tap_handle = tokio::spawn(async move {
79 consumer
80 .run(|event| {
81 let s = tap_state.clone();
82 async move { materializer::handle_event(s, event).await }
83 })
84 .await
85 .unwrap();
86 });
87
88 let gauge_state = state.clone();
89 let gauge_handle = tokio::spawn(async move {
90 update_gauges(gauge_state).await;
91 });
92
93 tokio::select! {
94 _ = api_handle => tracing::error!("API server exited unexpectedly"),
95 _ = tap_handle => tracing::error!("tap consumer exited unexpectedly"),
96 _ = gauge_handle => tracing::error!("gauge updater exited unexpectedly"),
97 }
98
99 Ok(())
100}
101
102/// Periodically query zone/user counts and update Prometheus gauges.
103async fn update_gauges(state: Arc<AppState>) {
104 loop {
105 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
106
107 let zones: Result<(i64,), _> =
108 sqlx::query_as("SELECT COUNT(*) FROM zone_index")
109 .fetch_one(&state.index)
110 .await;
111 let users: Result<(i64,), _> =
112 sqlx::query_as("SELECT COUNT(DISTINCT did) FROM zone_index")
113 .fetch_one(&state.index)
114 .await;
115
116 if let Ok((count,)) = zones {
117 metrics::gauge!("appview_zones_total").set(count as f64);
118 }
119 if let Ok((count,)) = users {
120 metrics::gauge!("appview_users_total").set(count as f64);
121 }
122 }
123}