ceres: a small planet in a giant solar system
1extern crate alloc;
2
3use crate::error::{Error, Result};
4use crate::state::AppState;
5use axum::http::{HeaderValue, header::USER_AGENT};
6use env_logger::Env;
7use jacquard::client::credential_session::CredentialSession;
8use jacquard::client::{Agent, MemorySessionStore};
9use jacquard::xrpc::CallOptions;
10use jacquard::{identity::resolver::ResolverOptions, prelude::JacquardResolver, types::did::Did};
11use jacquard_axum::service_auth::ServiceAuthConfig;
12use log::{info, warn};
13use std::env;
14use std::net::SocketAddr;
15use std::path::Path;
16use std::sync::Arc;
17use tokio::task::JoinSet;
18use tokio_util::sync::CancellationToken;
19
20mod error;
21mod helpers;
22mod lexicons;
23mod server;
24mod state;
25mod storage;
26mod sync;
27
28pub use crate::lexicons::builder_types;
29
30static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
31
32#[tokio::main]
33async fn main() -> anyhow::Result<()> {
34 dotenvy::dotenv().ok();
35 env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
36
37 run().await?;
38 Ok(())
39}
40
41async fn run() -> Result<()> {
42 let bind_addr: SocketAddr = env::var("BIND_ADDR")
43 .unwrap_or_else(|_| "127.0.0.1:3000".to_string())
44 .parse()
45 .map_err(|e: std::net::AddrParseError| Error::Other(format!("BIND_ADDR: {e}")))?;
46
47 let reqwest_client = reqwest::Client::builder()
48 .user_agent(APP_USER_AGENT)
49 .build()
50 .map_err(|e| Error::Other(format!("reqwest build: {e}")))?;
51
52 let store = MemorySessionStore::default();
53
54 let resolver = JacquardResolver::new(reqwest_client.clone(), ResolverOptions::default());
55 let session = CredentialSession::new(Arc::new(store), Arc::new(resolver.clone())).with_options(
56 CallOptions {
57 auth: None,
58 atproto_proxy: None,
59 atproto_accept_labelers: None,
60 extra_headers: vec![(USER_AGENT, HeaderValue::from_static(APP_USER_AGENT))],
61 },
62 );
63 let agent = Arc::new(Agent::new(session));
64
65 let app_view_domain = env::var("APP_VIEW_DOMAIN").expect("APP_VIEW_DOMAIN is not set");
66 let service_did = Did::new_owned(format!("did:web:{app_view_domain}"))
67 .expect("APP_VIEW_DOMAIN produced an invalid did:web");
68 // Will need this after the new proposal
69 // let service_auth = ServiceAuthConfig::new(service_did, &["bsky_appview"], resolver.clone());
70 let service_auth = ServiceAuthConfig::new(service_did, resolver.clone());
71
72 let data_dir = env::var("CERES_DATA_DIRECTORY").unwrap_or_else(|_| ".ceres_data".to_string());
73 let db = storage::open(Path::new(&data_dir))?;
74
75 let constellation_host = env::var("CONSTELLATION_HOST")
76 .unwrap_or_else(|_| "https://constellation.microcosm.blue".to_string());
77
78 let state = AppState {
79 service_auth,
80 reqwest_client,
81 agent,
82 resolver,
83 forwarded_app_view: env::var("FORWARDED_APP_VIEW").ok(),
84 constellation_host,
85 db: db.clone(),
86 };
87
88 let token = CancellationToken::new();
89 let mut tasks: JoinSet<Result<()>> = JoinSet::new();
90
91 // Firehose — optional, gated on CERES_FIREHOSE_HOST
92 if let Ok(fh_host) = env::var("CERES_FIREHOSE_HOST") {
93 let db = db.clone();
94 let token = token.clone();
95 tasks.spawn(async move {
96 sync::firehose::Subscriber::new(fh_host, db)
97 .run(token)
98 .await
99 });
100 } else {
101 info!("firehose: CERES_FIREHOSE_HOST unset; not spawning subscriber");
102 }
103
104 // Backfill dispatcher
105 {
106 let db = db.clone();
107 let token = token.clone();
108 tasks.spawn(async move { sync::backfill::dispatcher::run(db, token).await });
109 }
110
111 // HTTP server
112 {
113 let state = state.clone();
114 let token = token.clone();
115 tasks.spawn(async move { server::serve(bind_addr, state, token).await });
116 }
117
118 tokio::select! {
119 _ = tokio::signal::ctrl_c() => info!("ctrl-c received; shutting down"),
120 r = tasks.join_next() => warn!("task exited early: {r:?}"),
121 }
122 token.cancel();
123 while let Some(r) = tasks.join_next().await {
124 match r {
125 Ok(Ok(())) => {}
126 Ok(Err(e)) => warn!("task error: {e}"),
127 Err(e) => warn!("task join: {e}"),
128 }
129 }
130 Ok(())
131}