ceres: a small planet in a giant solar system
32
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 131 lines 4.3 kB view raw
1extern crate alloc; 2 3use crate::error::{Error, Result}; 4use crate::state::AppState; 5use axum::http::{HeaderValue, header::USER_AGENT}; 6use env_logger::Env; 7use jacquard::client::credential_session::CredentialSession; 8use jacquard::client::{Agent, MemorySessionStore}; 9use jacquard::xrpc::CallOptions; 10use jacquard::{identity::resolver::ResolverOptions, prelude::JacquardResolver, types::did::Did}; 11use jacquard_axum::service_auth::ServiceAuthConfig; 12use log::{info, warn}; 13use std::env; 14use std::net::SocketAddr; 15use std::path::Path; 16use std::sync::Arc; 17use tokio::task::JoinSet; 18use tokio_util::sync::CancellationToken; 19 20mod error; 21mod helpers; 22mod lexicons; 23mod server; 24mod state; 25mod storage; 26mod sync; 27 28pub use crate::lexicons::builder_types; 29 30static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); 31 32#[tokio::main] 33async fn main() -> anyhow::Result<()> { 34 dotenvy::dotenv().ok(); 35 env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); 36 37 run().await?; 38 Ok(()) 39} 40 41async fn run() -> Result<()> { 42 let bind_addr: SocketAddr = env::var("BIND_ADDR") 43 .unwrap_or_else(|_| "127.0.0.1:3000".to_string()) 44 .parse() 45 .map_err(|e: std::net::AddrParseError| Error::Other(format!("BIND_ADDR: {e}")))?; 46 47 let reqwest_client = reqwest::Client::builder() 48 .user_agent(APP_USER_AGENT) 49 .build() 50 .map_err(|e| Error::Other(format!("reqwest build: {e}")))?; 51 52 let store = MemorySessionStore::default(); 53 54 let resolver = JacquardResolver::new(reqwest_client.clone(), ResolverOptions::default()); 55 let session = CredentialSession::new(Arc::new(store), Arc::new(resolver.clone())).with_options( 56 CallOptions { 57 auth: None, 58 atproto_proxy: None, 59 atproto_accept_labelers: None, 60 extra_headers: vec![(USER_AGENT, HeaderValue::from_static(APP_USER_AGENT))], 61 }, 62 ); 63 let agent = Arc::new(Agent::new(session)); 64 65 let app_view_domain = env::var("APP_VIEW_DOMAIN").expect("APP_VIEW_DOMAIN is not set"); 66 let service_did = Did::new_owned(format!("did:web:{app_view_domain}")) 67 .expect("APP_VIEW_DOMAIN produced an invalid did:web"); 68 // Will need this after the new proposal 69 // let service_auth = ServiceAuthConfig::new(service_did, &["bsky_appview"], resolver.clone()); 70 let service_auth = ServiceAuthConfig::new(service_did, resolver.clone()); 71 72 let data_dir = env::var("CERES_DATA_DIRECTORY").unwrap_or_else(|_| ".ceres_data".to_string()); 73 let db = storage::open(Path::new(&data_dir))?; 74 75 let constellation_host = env::var("CONSTELLATION_HOST") 76 .unwrap_or_else(|_| "https://constellation.microcosm.blue".to_string()); 77 78 let state = AppState { 79 service_auth, 80 reqwest_client, 81 agent, 82 resolver, 83 forwarded_app_view: env::var("FORWARDED_APP_VIEW").ok(), 84 constellation_host, 85 db: db.clone(), 86 }; 87 88 let token = CancellationToken::new(); 89 let mut tasks: JoinSet<Result<()>> = JoinSet::new(); 90 91 // Firehose — optional, gated on CERES_FIREHOSE_HOST 92 if let Ok(fh_host) = env::var("CERES_FIREHOSE_HOST") { 93 let db = db.clone(); 94 let token = token.clone(); 95 tasks.spawn(async move { 96 sync::firehose::Subscriber::new(fh_host, db) 97 .run(token) 98 .await 99 }); 100 } else { 101 info!("firehose: CERES_FIREHOSE_HOST unset; not spawning subscriber"); 102 } 103 104 // Backfill dispatcher 105 { 106 let db = db.clone(); 107 let token = token.clone(); 108 tasks.spawn(async move { sync::backfill::dispatcher::run(db, token).await }); 109 } 110 111 // HTTP server 112 { 113 let state = state.clone(); 114 let token = token.clone(); 115 tasks.spawn(async move { server::serve(bind_addr, state, token).await }); 116 } 117 118 tokio::select! { 119 _ = tokio::signal::ctrl_c() => info!("ctrl-c received; shutting down"), 120 r = tasks.join_next() => warn!("task exited early: {r:?}"), 121 } 122 token.cancel(); 123 while let Some(r) = tasks.join_next().await { 124 match r { 125 Ok(Ok(())) => {} 126 Ok(Err(e)) => warn!("task error: {e}"), 127 Err(e) => warn!("task join: {e}"), 128 } 129 } 130 Ok(()) 131}