ceres: a small planet in a giant solar system
32
fork

Configure Feed

Select the types of activity you want to include in your feed.

lightrail inspired rework

+825 -146
+1
.gitignore
··· 1 1 /target 2 2 .env 3 3 .ceres_data 4 + lightrail
+116
Cargo.lock
··· 405 405 "dotenvy", 406 406 "env_logger", 407 407 "fjall", 408 + "futures", 408 409 "jacquard", 409 410 "jacquard-api", 410 411 "jacquard-axum", 412 + "jacquard-common", 411 413 "jacquard-identity", 412 414 "log", 415 + "postcard", 413 416 "reqwest", 414 417 "serde", 415 418 "serde_json", 419 + "thiserror 2.0.18", 416 420 "tokio", 421 + "tokio-util", 417 422 "tower-http", 423 + "url", 418 424 ] 419 425 420 426 [[package]] ··· 1142 1148 ] 1143 1149 1144 1150 [[package]] 1151 + name = "futures" 1152 + version = "0.3.32" 1153 + source = "registry+https://github.com/rust-lang/crates.io-index" 1154 + checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" 1155 + dependencies = [ 1156 + "futures-channel", 1157 + "futures-core", 1158 + "futures-executor", 1159 + "futures-io", 1160 + "futures-sink", 1161 + "futures-task", 1162 + "futures-util", 1163 + ] 1164 + 1165 + [[package]] 1145 1166 name = "futures-buffered" 1146 1167 version = "0.2.13" 1147 1168 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1161 1182 checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" 1162 1183 dependencies = [ 1163 1184 "futures-core", 1185 + "futures-sink", 1164 1186 ] 1165 1187 1166 1188 [[package]] ··· 1170 1192 checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" 1171 1193 1172 1194 [[package]] 1195 + name = "futures-executor" 1196 + version = "0.3.32" 1197 + source = "registry+https://github.com/rust-lang/crates.io-index" 1198 + checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" 1199 + dependencies = [ 1200 + "futures-core", 1201 + "futures-task", 1202 + "futures-util", 1203 + ] 1204 + 1205 + [[package]] 1173 1206 name = "futures-io" 1174 1207 version = "0.3.32" 1175 1208 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1217 1250 source = "registry+https://github.com/rust-lang/crates.io-index" 1218 1251 checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" 1219 1252 dependencies = [ 1253 + "futures-channel", 1220 1254 "futures-core", 1221 1255 "futures-io", 1222 1256 "futures-macro", ··· 1921 1955 "ciborium-io", 1922 1956 "cid", 1923 1957 "fluent-uri", 1958 + "futures", 1924 1959 "getrandom 0.2.17", 1925 1960 "getrandom 0.3.4", 1926 1961 "hashbrown 0.15.5", ··· 1931 1966 "miette", 1932 1967 "multibase", 1933 1968 "multihash", 1969 + "n0-future", 1934 1970 "ouroboros", 1935 1971 "oxilangtag", 1936 1972 "p256", ··· 1952 1988 "spin 0.10.0", 1953 1989 "thiserror 2.0.18", 1954 1990 "tokio", 1991 + "tokio-tungstenite-wasm", 1955 1992 "tokio-util", 1956 1993 "trait-variant", 1957 1994 "unicode-segmentation", ··· 3390 3427 ] 3391 3428 3392 3429 [[package]] 3430 + name = "rustls-native-certs" 3431 + version = "0.8.3" 3432 + source = "registry+https://github.com/rust-lang/crates.io-index" 3433 + checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" 3434 + dependencies = [ 3435 + "openssl-probe", 3436 + "rustls-pki-types", 3437 + "schannel", 3438 + "security-framework", 3439 + ] 3440 + 3441 + [[package]] 3393 3442 name = "rustls-pki-types" 3394 3443 version = "1.14.0" 3395 3444 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3663 3712 ] 3664 3713 3665 3714 [[package]] 3715 + name = "sha1" 3716 + version = "0.10.6" 3717 + source = "registry+https://github.com/rust-lang/crates.io-index" 3718 + checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 3719 + dependencies = [ 3720 + "cfg-if", 3721 + "cpufeatures", 3722 + "digest", 3723 + ] 3724 + 3725 + [[package]] 3666 3726 name = "sha1_smol" 3667 3727 version = "1.0.1" 3668 3728 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4049 4109 "bytes", 4050 4110 "libc", 4051 4111 "mio", 4112 + "parking_lot", 4052 4113 "pin-project-lite", 4053 4114 "signal-hook-registry", 4054 4115 "socket2 0.6.3", ··· 4085 4146 dependencies = [ 4086 4147 "rustls", 4087 4148 "tokio", 4149 + ] 4150 + 4151 + [[package]] 4152 + name = "tokio-tungstenite" 4153 + version = "0.24.0" 4154 + source = "registry+https://github.com/rust-lang/crates.io-index" 4155 + checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" 4156 + dependencies = [ 4157 + "futures-util", 4158 + "log", 4159 + "rustls", 4160 + "rustls-native-certs", 4161 + "rustls-pki-types", 4162 + "tokio", 4163 + "tokio-rustls", 4164 + "tungstenite", 4165 + ] 4166 + 4167 + [[package]] 4168 + name = "tokio-tungstenite-wasm" 4169 + version = "0.4.0" 4170 + source = "registry+https://github.com/rust-lang/crates.io-index" 4171 + checksum = "e21a5c399399c3db9f08d8297ac12b500e86bca82e930253fdc62eaf9c0de6ae" 4172 + dependencies = [ 4173 + "futures-channel", 4174 + "futures-util", 4175 + "http", 4176 + "httparse", 4177 + "js-sys", 4178 + "rustls", 4179 + "thiserror 1.0.69", 4180 + "tokio", 4181 + "tokio-tungstenite", 4182 + "wasm-bindgen", 4183 + "web-sys", 4088 4184 ] 4089 4185 4090 4186 [[package]] ··· 4237 4333 version = "0.2.5" 4238 4334 source = "registry+https://github.com/rust-lang/crates.io-index" 4239 4335 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 4336 + 4337 + [[package]] 4338 + name = "tungstenite" 4339 + version = "0.24.0" 4340 + source = "registry+https://github.com/rust-lang/crates.io-index" 4341 + checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" 4342 + dependencies = [ 4343 + "byteorder", 4344 + "bytes", 4345 + "data-encoding", 4346 + "http", 4347 + "httparse", 4348 + "log", 4349 + "rand 0.8.5", 4350 + "rustls", 4351 + "rustls-pki-types", 4352 + "sha1", 4353 + "thiserror 1.0.69", 4354 + "utf-8", 4355 + ] 4240 4356 4241 4357 [[package]] 4242 4358 name = "twoway"
+9 -2
Cargo.toml
··· 9 9 dotenvy = "0.15.7" 10 10 env_logger = "0.11.10" 11 11 fjall = "3.1.4" 12 + futures = "0.3" 12 13 # jacquard = "0.11.0" 13 14 jacquard = { path = "../jacquard/crates/jacquard" } 14 15 # jacquard-api = { version = "0.11.1", features = ["default", "app_bsky"] } 15 - jacquard-api = { path = "../jacquard/crates/jacquard-api", features = ["default", "app_bsky"] } 16 + jacquard-api = { path = "../jacquard/crates/jacquard-api", features = ["default", "app_bsky", "streaming"] } 16 17 # jacquard-axum = "0.11.0" 17 18 jacquard-axum = { path = "../jacquard/crates/jacquard-axum" } 19 + # jacquard-common = "0.11.0" 20 + jacquard-common = { path = "../jacquard/crates/jacquard-common", features = ["websocket", "streaming", "reqwest-client"] } 18 21 # jacquard-identity = { version = "0.11.0", features = ["cache", "dns"] } 19 22 jacquard-identity = { path = "../jacquard/crates/jacquard-identity", features = ["cache", "dns"] } 20 23 log = "0.4.29" 24 + postcard = { version = "1", features = ["alloc"] } 21 25 reqwest = { version = "0.12.23", features = ["stream", "json"] } 22 26 serde = { version = "1.0.228", features = ["derive"] } 23 27 serde_json = "1.0.149" 24 - tokio = { version = "1.52.1", features = ["macros", "rt-multi-thread", "net", "signal"] } 28 + thiserror = "2" 29 + tokio = { version = "1.52.1", features = ["full"] } 30 + tokio-util = { version = "0.7", features = ["rt"] } 25 31 tower-http = { version = "0.6.8", features = ["cors"] } 32 + url = "2"
+1 -1
README.md
··· 4 4 5 5 # Work done currently 6 6 - `app.bsky.actor.getProfile` - Just started, and let me tell you. It got hands 7 - - `app.bsky.actor.getPreferences` - Works, but not as expected. It has it's own internal preferences. Not the ones from your PDS 7 + - `app.bsky.actor.getPreferences` - Works, but not as expected. It has it's own internal preferences. Not the ones from your PDS. best to just use a social app that clears the atproto-proxy to get from your own PDS 8 8 - `app.bsky.actor.putPreferences` - [lol](https://github.com/bluesky-social/atproto/issues/4193). Best to just use a bsky-social app fork like https://blacksky.community for running this AppView
+16
src/error.rs
··· 1 + use crate::storage::StorageError; 2 + use thiserror::Error; 3 + 4 + #[derive(Debug, Error)] 5 + pub enum Error { 6 + #[error("storage: {0}")] 7 + Storage(#[from] StorageError), 8 + #[error("I/O: {0}")] 9 + Io(#[from] std::io::Error), 10 + #[error("task: {0}")] 11 + Join(#[from] tokio::task::JoinError), 12 + #[error("{0}")] 13 + Other(String), 14 + } 15 + 16 + pub type Result<T> = std::result::Result<T, Error>;
-2
src/handlers/mod.rs
··· 1 - pub mod well_known; 2 - pub mod xrpc;
+5 -12
src/handlers/well_known.rs src/server/well_known.rs
··· 50 50 "z42tusbpVy6BCE8tqcQAkvvRi9W3M7LEqLWNdTxtW2KicMNJ".to_string(), 51 51 ), 52 52 }], 53 - service: vec![ 54 - // Service { 55 - // id: "#bsky_notif".to_string(), 56 - // type_: "BskyNotificationService".to_string(), 57 - // service_endpoint: endpoint.clone(), 58 - // }, 59 - Service { 60 - id: "#bsky_appview".to_string(), 61 - type_: "BskyAppView".to_string(), 62 - service_endpoint: endpoint, 63 - }, 64 - ], 53 + service: vec![Service { 54 + id: "#bsky_appview".to_string(), 55 + type_: "BskyAppView".to_string(), 56 + service_endpoint: endpoint, 57 + }], 65 58 id: did_id, 66 59 }) 67 60 }
+8 -19
src/handlers/xrpc/app_bsky_actor.rs src/server/xrpc/app_bsky_actor.rs
··· 1 - use crate::handlers::xrpc::XrpcErrorResponse; 1 + use crate::server::xrpc::XrpcErrorResponse; 2 2 use crate::state::AppState; 3 - use axum::{ 4 - Json, Router, 5 - extract::State, 6 - response::{IntoResponse, Response}, 7 - }; 3 + use crate::storage; 4 + use axum::{Json, Router, extract::State}; 8 5 use jacquard::{ 9 6 IntoStatic, 10 7 prelude::IdentityResolver, 11 8 types::{datetime::Datetime, did::Did, uri::UriValue}, 12 - xrpc::XrpcEndpoint, 13 9 }; 14 10 use jacquard_api::app_bsky::actor::{ 15 11 PreferencesItem, ProfileViewDetailed, ··· 101 97 Ok(profile.into()) 102 98 } 103 99 104 - fn pref_key(did: &Did<'_>) -> Vec<u8> { 105 - format!("preferences:{}", did.as_ref()).into_bytes() 106 - } 107 - 108 100 pub async fn get_preferences( 109 101 State(state): State<AppState>, 110 102 ExtractOptionalServiceAuth(auth): ExtractOptionalServiceAuth, ··· 112 104 let auth = auth.ok_or_else(XrpcErrorResponse::auth_missing)?; 113 105 let did = auth.did(); 114 106 115 - let stored = state.persistent_ks.get(pref_key(did)).map_err(|err| { 107 + let stored = storage::preferences::get(&state.db, did).map_err(|err| { 116 108 log::error!("fjall get preferences: {err}"); 117 109 XrpcErrorResponse::internal_server_error() 118 110 })?; ··· 146 138 XrpcErrorResponse::internal_server_error() 147 139 })?; 148 140 149 - state 150 - .persistent_ks 151 - .insert(pref_key(did), bytes) 152 - .map_err(|err| { 153 - log::error!("fjall insert preferences: {err}"); 154 - XrpcErrorResponse::internal_server_error() 155 - })?; 141 + storage::preferences::put(&state.db, did, &bytes).map_err(|err| { 142 + log::error!("fjall insert preferences: {err}"); 143 + XrpcErrorResponse::internal_server_error() 144 + })?; 156 145 157 146 Ok(Json(())) 158 147 }
src/handlers/xrpc/mod.rs src/server/xrpc/mod.rs
+62 -107
src/main.rs
··· 1 - use axum::body::Bytes; 2 - use axum::{ 3 - Json, Router, body, 4 - extract::{Request, State}, 5 - http::{Method, StatusCode}, 6 - response::{IntoResponse, Response}, 7 - routing::get, 8 - }; 1 + use std::env; 2 + use std::net::SocketAddr; 3 + use std::path::Path; 4 + 9 5 use env_logger::Env; 10 - use fjall::Database; 11 - use handlers::{well_known::did_document, xrpc::app_bsky_actor}; 12 6 use jacquard::{identity::resolver::ResolverOptions, prelude::JacquardResolver, types::did::Did}; 13 7 use jacquard_axum::service_auth::ServiceAuthConfig; 14 - use log::info; 15 - use serde_json::{Value, json}; 16 - use state::AppState; 17 - use std::env; 18 - use tokio::net::TcpListener; 19 - use tower_http::cors::CorsLayer; 8 + use log::{info, warn}; 9 + use tokio::task::JoinSet; 10 + use tokio_util::sync::CancellationToken; 11 + 12 + use crate::error::{Error, Result}; 13 + use crate::state::AppState; 20 14 21 - mod handlers; 15 + mod error; 16 + mod server; 22 17 mod state; 18 + mod storage; 19 + mod sync; 23 20 24 - static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 21 + static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); 25 22 26 23 #[tokio::main] 27 24 async fn main() -> anyhow::Result<()> { 28 25 dotenvy::dotenv().ok(); 29 26 env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); 30 27 31 - let bind_addr = env::var("BIND_ADDR").unwrap_or_else(|_| "127.0.0.1:3000".to_string()); 28 + run().await?; 29 + Ok(()) 30 + } 31 + 32 + async fn run() -> Result<()> { 33 + let bind_addr: SocketAddr = env::var("BIND_ADDR") 34 + .unwrap_or_else(|_| "127.0.0.1:3000".to_string()) 35 + .parse() 36 + .map_err(|e: std::net::AddrParseError| Error::Other(format!("BIND_ADDR: {e}")))?; 32 37 33 38 let reqwest_client = reqwest::Client::builder() 34 39 .user_agent(APP_USER_AGENT) 35 - .build()?; 40 + .build() 41 + .map_err(|e| Error::Other(format!("reqwest build: {e}")))?; 36 42 37 43 let resolver = JacquardResolver::new(reqwest_client.clone(), ResolverOptions::default()); 38 44 ··· 41 47 .expect("APP_VIEW_DOMAIN produced an invalid did:web"); 42 48 let service_auth = ServiceAuthConfig::new(service_did, &["bsky_appview"], resolver.clone()); 43 49 44 - //fjall db setup 45 - let data_directory = 46 - env::var("CERES_DATA_DIRECTORY").unwrap_or_else(|_| ".ceres_data".to_string()); 47 - let db = Database::builder(data_directory).open()?; 48 - let persistent_ks = db.keyspace("persistent", fjall::KeyspaceCreateOptions::default)?; 49 - let cache_ks = db.keyspace("cache", fjall::KeyspaceCreateOptions::default)?; 50 + let data_dir = env::var("CERES_DATA_DIRECTORY").unwrap_or_else(|_| ".ceres_data".to_string()); 51 + let db = storage::open(Path::new(&data_dir))?; 50 52 51 53 let state = AppState { 52 54 service_auth, 53 55 reqwest_client, 54 56 resolver, 55 57 forwarded_app_view: env::var("FORWARDED_APP_VIEW").ok(), 56 - database: db, 57 - persistent_ks, 58 - cache_ks, 58 + db: db.clone(), 59 59 }; 60 60 61 - let app = Router::new() 62 - .route("/.well-known/did.json", get(did_document)) 63 - .merge(app_bsky_actor::routes()) 64 - .fallback(log_request) 65 - .layer(CorsLayer::permissive()) 66 - .with_state(state); 61 + let token = CancellationToken::new(); 62 + let mut tasks: JoinSet<Result<()>> = JoinSet::new(); 67 63 68 - let listener = TcpListener::bind(&bind_addr) 69 - .await 70 - .expect("failed to bind listener"); 71 - info!("listening on https://{bind_addr}"); 64 + // Firehose — optional, gated on CERES_FIREHOSE_HOST 65 + if let Ok(fh_host) = env::var("CERES_FIREHOSE_HOST") { 66 + let db = db.clone(); 67 + let token = token.clone(); 68 + tasks.spawn(async move { 69 + sync::firehose::Subscriber::new(fh_host, db).run(token).await 70 + }); 71 + } else { 72 + info!("firehose: CERES_FIREHOSE_HOST unset; not spawning subscriber"); 73 + } 72 74 73 - axum::serve(listener, app).await.expect("server error"); 74 - Ok(()) 75 - } 76 - 77 - async fn log_request(State(state): State<AppState>, req: Request) -> Response { 78 - let (parts, body) = req.into_parts(); 79 - let method = parts.method.clone(); 80 - let path = parts.uri.path().to_string(); 81 - let query = parts 82 - .uri 83 - .query() 84 - .map(|q| format!("?{}", q)) 85 - .unwrap_or_default(); 86 - 87 - info!("{method} {path}{query}"); 88 - 89 - if method == Method::GET { 90 - match parts.uri.query() { 91 - Some(q) => info!(" query: {q}"), 92 - None => info!(" query: <none>"), 93 - } 75 + // Backfill dispatcher 76 + { 77 + let db = db.clone(); 78 + let token = token.clone(); 79 + tasks.spawn(async move { sync::backfill::dispatcher::run(db, token).await }); 94 80 } 95 81 96 - let bytes = match body::to_bytes(body, usize::MAX).await { 97 - Ok(b) => b, 98 - Err(e) => { 99 - info!(" body: <failed to read: {e}>"); 100 - if state.forwarded_app_view.is_none() { 101 - return (StatusCode::OK, Json(json!({}))).into_response(); 102 - } 103 - // If we are proxying, we might want to try anyway or fail. 104 - // Let's just use empty bytes if it failed to read. 105 - Bytes::new() 106 - } 107 - }; 108 - 109 - if !bytes.is_empty() { 110 - match serde_json::from_slice::<Value>(&bytes) { 111 - Ok(value) => { 112 - let pretty = 113 - serde_json::to_string_pretty(&value).unwrap_or_else(|_| value.to_string()); 114 - info!(" body (json):\n{pretty}"); 115 - } 116 - Err(_) => { 117 - info!(" body: <non-json, {} bytes>", bytes.len()); 118 - } 119 - } 120 - } else if method != Method::GET { 121 - info!(" body: <empty>"); 82 + // HTTP server 83 + { 84 + let state = state.clone(); 85 + let token = token.clone(); 86 + tasks.spawn(async move { server::serve(bind_addr, state, token).await }); 122 87 } 123 88 124 - if let Some(ref base_url) = state.forwarded_app_view { 125 - let target_url = format!("{base_url}{path}{query}"); 126 - let proxy_res = state 127 - .reqwest_client 128 - .request(method, &target_url) 129 - .body(bytes) 130 - .send() 131 - .await; 132 - 133 - match proxy_res { 134 - Ok(res) => { 135 - let status = res.status(); 136 - let body_bytes = res.bytes().await.unwrap_or_default(); 137 - return (status, body_bytes).into_response(); 138 - } 139 - Err(e) => { 140 - log::error!("Proxy request failed: {e}"); 141 - return (StatusCode::INTERNAL_SERVER_ERROR, "Proxy Error").into_response(); 142 - } 89 + tokio::select! { 90 + _ = tokio::signal::ctrl_c() => info!("ctrl-c received; shutting down"), 91 + r = tasks.join_next() => warn!("task exited early: {r:?}"), 92 + } 93 + token.cancel(); 94 + while let Some(r) = tasks.join_next().await { 95 + match r { 96 + Ok(Ok(())) => {} 97 + Ok(Err(e)) => warn!("task error: {e}"), 98 + Err(e) => warn!("task join: {e}"), 143 99 } 144 100 } 145 - 146 - (StatusCode::OK, Json(json!({}))).into_response() 101 + Ok(()) 147 102 }
+81
src/server/fallback.rs
··· 1 + use axum::{ 2 + Json, 3 + body::{self, Bytes}, 4 + extract::{Request, State}, 5 + http::{Method, StatusCode}, 6 + response::{IntoResponse, Response}, 7 + }; 8 + use log::info; 9 + use serde_json::{Value, json}; 10 + 11 + use crate::state::AppState; 12 + 13 + pub async fn log_request(State(state): State<AppState>, req: Request) -> Response { 14 + let (parts, body) = req.into_parts(); 15 + let method = parts.method.clone(); 16 + let path = parts.uri.path().to_string(); 17 + let query = parts 18 + .uri 19 + .query() 20 + .map(|q| format!("?{}", q)) 21 + .unwrap_or_default(); 22 + 23 + info!("{method} {path}{query}"); 24 + 25 + if method == Method::GET { 26 + match parts.uri.query() { 27 + Some(q) => info!(" query: {q}"), 28 + None => info!(" query: <none>"), 29 + } 30 + } 31 + 32 + let bytes = match body::to_bytes(body, usize::MAX).await { 33 + Ok(b) => b, 34 + Err(e) => { 35 + info!(" body: <failed to read: {e}>"); 36 + if state.forwarded_app_view.is_none() { 37 + return (StatusCode::OK, Json(json!({}))).into_response(); 38 + } 39 + Bytes::new() 40 + } 41 + }; 42 + 43 + if !bytes.is_empty() { 44 + match serde_json::from_slice::<Value>(&bytes) { 45 + Ok(value) => { 46 + let pretty = 47 + serde_json::to_string_pretty(&value).unwrap_or_else(|_| value.to_string()); 48 + info!(" body (json):\n{pretty}"); 49 + } 50 + Err(_) => { 51 + info!(" body: <non-json, {} bytes>", bytes.len()); 52 + } 53 + } 54 + } else if method != Method::GET { 55 + info!(" body: <empty>"); 56 + } 57 + 58 + if let Some(ref base_url) = state.forwarded_app_view { 59 + let target_url = format!("{base_url}{path}{query}"); 60 + let proxy_res = state 61 + .reqwest_client 62 + .request(method, &target_url) 63 + .body(bytes) 64 + .send() 65 + .await; 66 + 67 + match proxy_res { 68 + Ok(res) => { 69 + let status = res.status(); 70 + let body_bytes = res.bytes().await.unwrap_or_default(); 71 + return (status, body_bytes).into_response(); 72 + } 73 + Err(e) => { 74 + log::error!("Proxy request failed: {e}"); 75 + return (StatusCode::INTERNAL_SERVER_ERROR, "Proxy Error").into_response(); 76 + } 77 + } 78 + } 79 + 80 + (StatusCode::OK, Json(json!({}))).into_response() 81 + }
+34
src/server/mod.rs
··· 1 + pub mod fallback; 2 + pub mod well_known; 3 + pub mod xrpc; 4 + 5 + use std::net::SocketAddr; 6 + 7 + use axum::{Router, routing::get}; 8 + use log::info; 9 + use tokio::net::TcpListener; 10 + use tokio_util::sync::CancellationToken; 11 + use tower_http::cors::CorsLayer; 12 + 13 + use crate::error::Result; 14 + use crate::state::AppState; 15 + 16 + pub async fn serve( 17 + addr: SocketAddr, 18 + state: AppState, 19 + token: CancellationToken, 20 + ) -> Result<()> { 21 + let app: Router = Router::new() 22 + .route("/.well-known/did.json", get(well_known::did_document)) 23 + .merge(xrpc::app_bsky_actor::routes()) 24 + .fallback(fallback::log_request) 25 + .layer(CorsLayer::permissive()) 26 + .with_state(state); 27 + 28 + let listener = TcpListener::bind(addr).await?; 29 + info!("listening on http://{addr}"); 30 + axum::serve(listener, app) 31 + .with_graceful_shutdown(token.cancelled_owned()) 32 + .await?; 33 + Ok(()) 34 + }
+3 -3
src/state.rs
··· 1 1 use jacquard::{prelude::JacquardResolver, types::did::Did}; 2 2 use jacquard_axum::service_auth::{ServiceAuth, ServiceAuthConfig}; 3 3 4 + use crate::storage::DbRef; 5 + 4 6 #[derive(Clone)] 5 7 pub struct AppState { 6 8 pub service_auth: ServiceAuthConfig<JacquardResolver>, 7 9 pub reqwest_client: reqwest::Client, 8 10 pub resolver: JacquardResolver, 9 11 pub forwarded_app_view: Option<String>, 10 - pub database: fjall::Database, 11 - pub persistent_ks: fjall::Keyspace, 12 - pub cache_ks: fjall::Keyspace, 12 + pub db: DbRef, 13 13 } 14 14 15 15 impl ServiceAuth for AppState {
+113
src/storage/backfill_queue.rs
··· 1 + //! Timestamp-ordered backfill queue backed by fjall. 2 + //! 3 + //! Keys: `"bfq"<ts_be:u64>\0<did>` — big-endian timestamp gives FIFO ordering. 4 + //! Values: postcard-encoded [`BackfillJob`]. 5 + 6 + use std::collections::HashSet; 7 + use std::time::{SystemTime, UNIX_EPOCH}; 8 + 9 + use serde::{Deserialize, Serialize}; 10 + 11 + use crate::storage::{DbRef, PREFIX_BACKFILL, StorageError, StorageResult}; 12 + 13 + const NUL: u8 = b'\0'; 14 + 15 + #[derive(Debug, Clone, Serialize, Deserialize)] 16 + pub struct BackfillJob { 17 + pub did: String, 18 + pub pds_host: String, 19 + pub cursor: Option<String>, 20 + pub retry_count: u16, 21 + pub reason: String, 22 + } 23 + 24 + fn to_millis(t: SystemTime) -> u64 { 25 + t.duration_since(UNIX_EPOCH) 26 + .expect("system clock before Unix epoch") 27 + .as_millis() as u64 28 + } 29 + 30 + fn key(ts: u64, did: &str) -> Vec<u8> { 31 + let mut k = Vec::with_capacity(PREFIX_BACKFILL.len() + 8 + 1 + did.len()); 32 + k.extend_from_slice(&PREFIX_BACKFILL); 33 + k.extend_from_slice(&ts.to_be_bytes()); 34 + k.push(NUL); 35 + k.extend_from_slice(did.as_bytes()); 36 + k 37 + } 38 + 39 + fn parse_did(raw: &[u8]) -> StorageResult<String> { 40 + let rest = raw 41 + .strip_prefix(PREFIX_BACKFILL.as_slice()) 42 + .ok_or(StorageError::Corrupt { 43 + key: String::from_utf8_lossy(raw).into_owned(), 44 + reason: "wrong prefix", 45 + })?; 46 + if rest.len() < 9 { 47 + return Err(StorageError::Corrupt { 48 + key: String::from_utf8_lossy(raw).into_owned(), 49 + reason: "key too short", 50 + }); 51 + } 52 + let did_bytes = rest[9..].to_vec(); 53 + String::from_utf8(did_bytes).map_err(|_| StorageError::Corrupt { 54 + key: String::from_utf8_lossy(raw).into_owned(), 55 + reason: "did not utf-8", 56 + }) 57 + } 58 + 59 + /// Enqueue a job to be claimed when wall-clock time reaches `when`. 60 + pub fn enqueue(db: &DbRef, when: SystemTime, job: &BackfillJob) -> StorageResult<()> { 61 + let ts = to_millis(when); 62 + let k = key(ts, &job.did); 63 + let v = postcard::to_allocvec(job)?; 64 + db.persistent.insert(k, v)?; 65 + Ok(()) 66 + } 67 + 68 + /// Claim the next ready job whose timestamp is `<= now` and whose DID is not already in flight. 69 + /// 70 + /// Atomically removes the key before returning. `since` is an opaque cursor from a prior 71 + /// `claim` call — callers can hold it in memory to skip over tombstones left by prior claims. 72 + pub fn claim( 73 + db: &DbRef, 74 + now: SystemTime, 75 + since: Option<Vec<u8>>, 76 + busy: &HashSet<String>, 77 + ) -> StorageResult<Option<(BackfillJob, Vec<u8>)>> { 78 + let now_ms = to_millis(now); 79 + 80 + let lower = match since { 81 + Some(suffix) => { 82 + let mut k = PREFIX_BACKFILL.to_vec(); 83 + k.extend_from_slice(&suffix); 84 + k 85 + } 86 + None => PREFIX_BACKFILL.to_vec(), 87 + }; 88 + // Exclusive upper bound at "now": only claim jobs whose ts < now_ms. 89 + let mut upper = PREFIX_BACKFILL.to_vec(); 90 + upper.extend_from_slice(&now_ms.to_be_bytes()); 91 + upper.push(NUL); 92 + 93 + for guard in db.persistent.range(lower..upper) { 94 + let (key_slice, val_slice) = guard.into_inner()?; 95 + let key_bytes = key_slice.as_ref(); 96 + let did = parse_did(key_bytes)?; 97 + 98 + if busy.contains(&did) { 99 + continue; 100 + } 101 + 102 + let job: BackfillJob = postcard::from_bytes(val_slice.as_ref())?; 103 + let next_since = key_bytes[PREFIX_BACKFILL.len()..].to_vec(); 104 + 105 + let mut batch = db.database.batch(); 106 + batch.remove_weak(&db.persistent, key_bytes); 107 + batch.commit()?; 108 + 109 + return Ok(Some((job, next_since))); 110 + } 111 + 112 + Ok(None) 113 + }
+16
src/storage/error.rs
··· 1 + use thiserror::Error; 2 + 3 + #[derive(Debug, Error)] 4 + pub enum StorageError { 5 + #[error("fjall: {0}")] 6 + Fjall(#[from] fjall::Error), 7 + #[error("postcard: {0}")] 8 + Postcard(#[from] postcard::Error), 9 + #[error("corrupt key {key}: {reason}")] 10 + Corrupt { 11 + key: String, 12 + reason: &'static str, 13 + }, 14 + } 15 + 16 + pub type StorageResult<T> = Result<T, StorageError>;
+33
src/storage/firehose_cursor.rs
··· 1 + use crate::storage::{DbRef, PREFIX_FH_CURSOR, StorageError, StorageResult}; 2 + 3 + const NUL: u8 = b'\0'; 4 + const CURSOR_SUFFIX: &[u8] = b"cursor"; 5 + 6 + fn key(host: &str) -> Vec<u8> { 7 + let mut k = 8 + Vec::with_capacity(PREFIX_FH_CURSOR.len() + host.len() + 1 + CURSOR_SUFFIX.len()); 9 + k.extend_from_slice(&PREFIX_FH_CURSOR); 10 + k.extend_from_slice(host.as_bytes()); 11 + k.push(NUL); 12 + k.extend_from_slice(CURSOR_SUFFIX); 13 + k 14 + } 15 + 16 + pub fn get(db: &DbRef, host: &str) -> StorageResult<Option<u64>> { 17 + let k = key(host); 18 + match db.persistent.get(&k)? { 19 + None => Ok(None), 20 + Some(v) => { 21 + let bytes: [u8; 8] = v.as_ref().try_into().map_err(|_| StorageError::Corrupt { 22 + key: String::from_utf8_lossy(&k).into_owned(), 23 + reason: "cursor value is not 8 bytes", 24 + })?; 25 + Ok(Some(u64::from_be_bytes(bytes))) 26 + } 27 + } 28 + } 29 + 30 + pub fn set(db: &DbRef, host: &str, cursor: u64) -> StorageResult<()> { 31 + db.persistent.insert(key(host), cursor.to_be_bytes())?; 32 + Ok(()) 33 + }
+40
src/storage/mod.rs
··· 1 + pub mod backfill_queue; 2 + pub mod error; 3 + pub mod firehose_cursor; 4 + pub mod preferences; 5 + 6 + pub use error::{StorageError, StorageResult}; 7 + 8 + use std::path::Path; 9 + use std::sync::Arc; 10 + 11 + /// Fixed-length 3-byte key prefix per data type. 12 + pub(crate) type KeyPrefix = [u8; 3]; 13 + 14 + pub(crate) const PREFIX_PREFS: KeyPrefix = *b"prf"; 15 + pub(crate) const PREFIX_FH_CURSOR: KeyPrefix = *b"fhc"; 16 + pub(crate) const PREFIX_BACKFILL: KeyPrefix = *b"bfq"; 17 + 18 + /// Shared handle to the fjall database and its keyspaces. 19 + pub struct Db { 20 + pub(crate) database: fjall::Database, 21 + /// Durable state: preferences, queue jobs, firehose cursor. 22 + pub(crate) persistent: fjall::Keyspace, 23 + /// Ephemeral cache — currently unused; kept as a home for future read-through caches. 24 + #[allow(dead_code)] 25 + pub(crate) cache: fjall::Keyspace, 26 + } 27 + 28 + pub type DbRef = Arc<Db>; 29 + 30 + /// Open (or create) the fjall database at `path` and return a shared handle. 31 + pub fn open(path: &Path) -> StorageResult<DbRef> { 32 + let database = fjall::Database::builder(path).open()?; 33 + let persistent = database.keyspace("persistent", fjall::KeyspaceCreateOptions::default)?; 34 + let cache = database.keyspace("cache", fjall::KeyspaceCreateOptions::default)?; 35 + Ok(Arc::new(Db { 36 + database, 37 + persistent, 38 + cache, 39 + })) 40 + }
+20
src/storage/preferences.rs
··· 1 + use jacquard::types::did::Did; 2 + 3 + use crate::storage::{DbRef, PREFIX_PREFS, StorageResult}; 4 + 5 + fn key(did: &Did<'_>) -> Vec<u8> { 6 + let d = did.as_str(); 7 + let mut k = Vec::with_capacity(PREFIX_PREFS.len() + d.len()); 8 + k.extend_from_slice(&PREFIX_PREFS); 9 + k.extend_from_slice(d.as_bytes()); 10 + k 11 + } 12 + 13 + pub fn get(db: &DbRef, did: &Did<'_>) -> StorageResult<Option<Vec<u8>>> { 14 + Ok(db.persistent.get(key(did))?.map(|v| v.as_ref().to_vec())) 15 + } 16 + 17 + pub fn put(db: &DbRef, did: &Did<'_>, bytes: &[u8]) -> StorageResult<()> { 18 + db.persistent.insert(key(did), bytes)?; 19 + Ok(()) 20 + }
+79
src/sync/backfill/dispatcher.rs
··· 1 + //! Backfill job dispatcher. 2 + //! 3 + //! Claims [`BackfillJob`]s from the on-disk queue and runs each through a stub worker. 4 + //! The worker currently just logs the job — real repo-walking logic lands later. 5 + 6 + use std::collections::HashSet; 7 + use std::time::{Duration, SystemTime}; 8 + 9 + use log::{debug, info}; 10 + use tokio::task::JoinSet; 11 + use tokio_util::sync::CancellationToken; 12 + 13 + use crate::error::Result; 14 + use crate::storage::{DbRef, backfill_queue::{self, BackfillJob}}; 15 + 16 + const MAX_WORKERS: usize = 4; 17 + const IDLE_POLL: Duration = Duration::from_millis(500); 18 + 19 + pub async fn run(db: DbRef, token: CancellationToken) -> Result<()> { 20 + info!("backfill dispatcher started (max_workers={MAX_WORKERS})"); 21 + 22 + let mut busy: HashSet<String> = HashSet::new(); 23 + let mut since: Option<Vec<u8>> = None; 24 + let mut workers: JoinSet<String> = JoinSet::new(); 25 + 26 + loop { 27 + // Fill worker slots from the queue. 28 + while workers.len() < MAX_WORKERS { 29 + let claim_db = db.clone(); 30 + let claim_since = since.clone(); 31 + let claim_busy = busy.clone(); 32 + let claimed = tokio::task::spawn_blocking(move || { 33 + backfill_queue::claim(&claim_db, SystemTime::now(), claim_since, &claim_busy) 34 + }) 35 + .await??; 36 + 37 + match claimed { 38 + Some((job, next_since)) => { 39 + since = Some(next_since); 40 + busy.insert(job.did.clone()); 41 + workers.spawn(run_worker(job)); 42 + } 43 + None => { 44 + since = None; 45 + break; 46 + } 47 + } 48 + } 49 + 50 + tokio::select! { 51 + _ = token.cancelled() => { 52 + info!("backfill dispatcher: cancelled; waiting for {} workers to drain", workers.len()); 53 + while let Some(r) = workers.join_next().await { 54 + if let Err(e) = r { 55 + log::warn!("worker join: {e}"); 56 + } 57 + } 58 + return Ok(()); 59 + } 60 + Some(res) = workers.join_next(), if !workers.is_empty() => { 61 + match res { 62 + Ok(did) => { busy.remove(&did); } 63 + Err(e) => log::warn!("worker panicked: {e}"), 64 + } 65 + } 66 + _ = tokio::time::sleep(IDLE_POLL), if workers.is_empty() => { 67 + debug!("backfill dispatcher: idle"); 68 + } 69 + } 70 + } 71 + } 72 + 73 + async fn run_worker(job: BackfillJob) -> String { 74 + info!( 75 + "backfill stub: did={} pds={} cursor={:?} retry={} reason={}", 76 + job.did, job.pds_host, job.cursor, job.retry_count, job.reason 77 + ); 78 + job.did 79 + }
+3
src/sync/backfill/mod.rs
··· 1 + pub mod dispatcher; 2 + 3 + pub use crate::storage::backfill_queue::BackfillJob;
+183
src/sync/firehose/mod.rs
··· 1 + //! Firehose WebSocket subscriber. 2 + //! 3 + //! Connects to an ATProto relay's `com.atproto.sync.subscribeRepos` endpoint, 4 + //! reconnects with exponential backoff on failure, and persists the latest 5 + //! sequence number as a cursor. 6 + //! 7 + //! Events are **dropped** — real commit/account/identity handling lands later. 8 + 9 + use std::time::{Duration, Instant}; 10 + 11 + use futures::StreamExt; 12 + use jacquard_api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 13 + use jacquard_common::deps::fluent_uri::Uri; 14 + use jacquard_common::xrpc::SubscriptionExt; 15 + use jacquard_common::{StreamErrorKind, TungsteniteClient}; 16 + use log::{debug, info, trace, warn}; 17 + use tokio_util::sync::CancellationToken; 18 + 19 + use crate::error::{Error, Result}; 20 + use crate::storage::{self, DbRef}; 21 + 22 + const MAX_BACKOFF_SECS: u64 = 64; 23 + const CURSOR_SAVE_EVERY: u64 = 1000; 24 + 25 + pub struct Subscriber { 26 + host: String, 27 + db: DbRef, 28 + } 29 + 30 + impl Subscriber { 31 + pub fn new(host: String, db: DbRef) -> Self { 32 + Self { host, db } 33 + } 34 + 35 + pub async fn run(&mut self, token: CancellationToken) -> Result<()> { 36 + let base = Uri::parse(format!("wss://{}", self.host)) 37 + .map_err(|e| Error::Other(format!("invalid firehose host: {e:?}")))? 38 + .to_owned(); 39 + 40 + let client = TungsteniteClient::new(); 41 + let mut backoff_secs: u64 = 1; 42 + 43 + loop { 44 + if token.is_cancelled() { 45 + return Ok(()); 46 + } 47 + 48 + let connect_cursor = { 49 + let db = self.db.clone(); 50 + let host = self.host.clone(); 51 + tokio::task::spawn_blocking(move || storage::firehose_cursor::get(&db, &host)) 52 + .await?? 53 + }; 54 + 55 + info!( 56 + "firehose: connecting host={} cursor={:?}", 57 + self.host, connect_cursor 58 + ); 59 + 60 + let params = SubscribeRepos { 61 + cursor: connect_cursor.map(|c| c as i64), 62 + }; 63 + 64 + let stream = tokio::select! { 65 + _ = token.cancelled() => return Ok(()), 66 + r = client.subscription(base.clone()).subscribe(&params) => r, 67 + }; 68 + 69 + let stream = match stream { 70 + Ok(s) => s, 71 + Err(e) => { 72 + warn!( 73 + "firehose: connect failed host={} err={e} backoff_secs={backoff_secs}", 74 + self.host 75 + ); 76 + if !sleep_cancellable(&token, Duration::from_secs(backoff_secs)).await { 77 + return Ok(()); 78 + } 79 + backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 80 + continue; 81 + } 82 + }; 83 + 84 + info!("firehose: connected host={}", self.host); 85 + backoff_secs = 1; 86 + 87 + let (_sink, mut messages) = stream.into_stream(); 88 + let mut processed: u64 = 0; 89 + let mut last_seq: Option<i64> = None; 90 + let mut last_message_at = Instant::now(); 91 + 92 + loop { 93 + tokio::select! { 94 + _ = token.cancelled() => { 95 + persist_cursor(&self.db, &self.host, last_seq).await; 96 + return Ok(()); 97 + } 98 + next = messages.next() => { 99 + match next { 100 + None => { 101 + info!("firehose: stream ended host={}; reconnecting", self.host); 102 + persist_cursor(&self.db, &self.host, last_seq).await; 103 + break; 104 + } 105 + Some(Err(e)) => { 106 + last_message_at = Instant::now(); 107 + match e.kind() { 108 + StreamErrorKind::Decode 109 + | StreamErrorKind::WrongMessageFormat => { 110 + warn!("firehose: decode error host={} err={e}", self.host); 111 + } 112 + StreamErrorKind::Closed => { 113 + info!("firehose: closed host={}; reconnecting", self.host); 114 + persist_cursor(&self.db, &self.host, last_seq).await; 115 + break; 116 + } 117 + _ => { 118 + warn!( 119 + "firehose: stream error host={} err={e} backoff={backoff_secs}s", 120 + self.host 121 + ); 122 + persist_cursor(&self.db, &self.host, last_seq).await; 123 + if !sleep_cancellable(&token, Duration::from_secs(backoff_secs)).await { 124 + return Ok(()); 125 + } 126 + backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 127 + break; 128 + } 129 + } 130 + } 131 + Some(Ok(msg)) => { 132 + last_message_at = Instant::now(); 133 + processed += 1; 134 + if let Some(seq) = seq_of(&msg) { 135 + trace!("firehose: msg seq={seq}"); 136 + last_seq = Some(seq); 137 + } 138 + if processed % CURSOR_SAVE_EVERY == 0 { 139 + persist_cursor(&self.db, &self.host, last_seq).await; 140 + } 141 + // Events are dropped — real handling lands later. 142 + let _ = msg; 143 + let _ = last_message_at; 144 + } 145 + } 146 + } 147 + } 148 + } 149 + } 150 + } 151 + } 152 + 153 + fn seq_of(msg: &SubscribeReposMessage<'_>) -> Option<i64> { 154 + match msg { 155 + SubscribeReposMessage::Commit(c) => Some(c.seq), 156 + SubscribeReposMessage::Sync(s) => Some(s.seq), 157 + SubscribeReposMessage::Identity(i) => Some(i.seq), 158 + SubscribeReposMessage::Account(a) => Some(a.seq), 159 + _ => None, 160 + } 161 + } 162 + 163 + async fn sleep_cancellable(token: &CancellationToken, d: Duration) -> bool { 164 + tokio::select! { 165 + _ = token.cancelled() => false, 166 + _ = tokio::time::sleep(d) => true, 167 + } 168 + } 169 + 170 + async fn persist_cursor(db: &DbRef, host: &str, seq: Option<i64>) { 171 + let Some(seq) = seq else { return }; 172 + let db = db.clone(); 173 + let host = host.to_string(); 174 + let res = tokio::task::spawn_blocking(move || { 175 + storage::firehose_cursor::set(&db, &host, seq as u64) 176 + }) 177 + .await; 178 + match res { 179 + Ok(Ok(())) => debug!("firehose: cursor saved seq={seq}"), 180 + Ok(Err(e)) => warn!("firehose: cursor save failed: {e}"), 181 + Err(e) => warn!("firehose: cursor save task panicked: {e}"), 182 + } 183 + }
+2
src/sync/mod.rs
··· 1 + pub mod backfill; 2 + pub mod firehose;