extern crate alloc; use crate::error::{Error, Result}; use crate::state::AppState; use axum::http::{HeaderValue, header::USER_AGENT}; use env_logger::Env; use jacquard::client::credential_session::CredentialSession; use jacquard::client::{Agent, MemorySessionStore}; use jacquard::xrpc::CallOptions; use jacquard::{identity::resolver::ResolverOptions, prelude::JacquardResolver, types::did::Did}; use jacquard_axum::service_auth::ServiceAuthConfig; use log::{info, warn}; use std::env; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; mod error; mod helpers; mod lexicons; mod server; mod state; mod storage; mod sync; pub use crate::lexicons::builder_types; static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); run().await?; Ok(()) } async fn run() -> Result<()> { let bind_addr: SocketAddr = env::var("BIND_ADDR") .unwrap_or_else(|_| "127.0.0.1:3000".to_string()) .parse() .map_err(|e: std::net::AddrParseError| Error::Other(format!("BIND_ADDR: {e}")))?; let reqwest_client = reqwest::Client::builder() .user_agent(APP_USER_AGENT) .build() .map_err(|e| Error::Other(format!("reqwest build: {e}")))?; let store = MemorySessionStore::default(); let resolver = JacquardResolver::new(reqwest_client.clone(), ResolverOptions::default()); let session = CredentialSession::new(Arc::new(store), Arc::new(resolver.clone())).with_options( CallOptions { auth: None, atproto_proxy: None, atproto_accept_labelers: None, extra_headers: vec![(USER_AGENT, HeaderValue::from_static(APP_USER_AGENT))], }, ); let agent = Arc::new(Agent::new(session)); let app_view_domain = env::var("APP_VIEW_DOMAIN").expect("APP_VIEW_DOMAIN is not set"); let service_did = Did::new_owned(format!("did:web:{app_view_domain}")) .expect("APP_VIEW_DOMAIN produced an invalid did:web"); // Will need this after the new proposal // let service_auth = ServiceAuthConfig::new(service_did, &["bsky_appview"], resolver.clone()); let service_auth = ServiceAuthConfig::new(service_did, resolver.clone()); let data_dir = env::var("CERES_DATA_DIRECTORY").unwrap_or_else(|_| ".ceres_data".to_string()); let db = storage::open(Path::new(&data_dir))?; let constellation_host = env::var("CONSTELLATION_HOST") .unwrap_or_else(|_| "https://constellation.microcosm.blue".to_string()); let state = AppState { service_auth, reqwest_client, agent, resolver, forwarded_app_view: env::var("FORWARDED_APP_VIEW").ok(), constellation_host, db: db.clone(), }; let token = CancellationToken::new(); let mut tasks: JoinSet> = JoinSet::new(); // Firehose — optional, gated on CERES_FIREHOSE_HOST if let Ok(fh_host) = env::var("CERES_FIREHOSE_HOST") { let db = db.clone(); let token = token.clone(); tasks.spawn(async move { sync::firehose::Subscriber::new(fh_host, db) .run(token) .await }); } else { info!("firehose: CERES_FIREHOSE_HOST unset; not spawning subscriber"); } // Backfill dispatcher { let db = db.clone(); let token = token.clone(); tasks.spawn(async move { sync::backfill::dispatcher::run(db, token).await }); } // HTTP server { let state = state.clone(); let token = token.clone(); tasks.spawn(async move { server::serve(bind_addr, state, token).await }); } tokio::select! { _ = tokio::signal::ctrl_c() => info!("ctrl-c received; shutting down"), r = tasks.join_next() => warn!("task exited early: {r:?}"), } token.cancel(); while let Some(r) = tasks.join_next().await { match r { Ok(Ok(())) => {} Ok(Err(e)) => warn!("task error: {e}"), Err(e) => warn!("task join: {e}"), } } Ok(()) }