lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

very basic admin stats

oops these scans are slowwwww

phil 6df8317f c4417d21

+169 -8
+2
Cargo.lock
··· 2309 2309 version = "0.1.0" 2310 2310 dependencies = [ 2311 2311 "axum", 2312 + "base64", 2312 2313 "bytes", 2313 2314 "cid", 2314 2315 "clap", ··· 2335 2336 "serde", 2336 2337 "serde_ipld_dagcbor", 2337 2338 "serde_json", 2339 + "subtle", 2338 2340 "thiserror 2.0.18", 2339 2341 "tokio", 2340 2342 "tokio-util",
+4 -2
Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 axum = "0.8.8" 8 + base64 = "0.22.1" 8 9 bytes = "1" 9 10 cid = { version = "0.11", default-features = false, features = ["alloc"] } 10 11 clap = { version = "4.5.60", features = ["derive", "env"] } ··· 16 17 jacquard-api = { version = "0.9.5", default-features = false, features = ["com_atproto", "streaming"] } 17 18 jacquard-axum = { version = "0.9.6", default-features = false, features = ["tracing"] } 18 19 jacquard-common = { version = "0.9.5", features = ["websocket", "reqwest-client", "streaming"] } 19 - n0-future = "0.1" 20 20 jacquard-derive = "0.9.5" 21 21 jacquard-identity = "0.9.5" 22 22 jacquard-lexicon = "0.9.5" ··· 24 24 metrics = "0.24.3" 25 25 metrics-exporter-prometheus = { version = "0.18.1", features = ["http-listener"] } 26 26 mini-moka = "0.10" 27 - reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } 27 + n0-future = "0.1" 28 28 repo-stream = { version = "0.5.0-alpha.3", features = ["jacquard"] } 29 + reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } 29 30 rustls = { version = "0.23", default-features = false, features = ["aws-lc-rs"] } 30 31 rustversion = "1" 31 32 serde = { version = "1", features = ["derive"] } 32 33 serde_ipld_dagcbor = "0.6" 33 34 serde_json = "1" 35 + subtle = "2.6.1" 34 36 thiserror = "2.0.18" 35 37 tokio = { version = "1.49.0", features = ["full"] } 36 38 tokio-util = { version = "0.7", features = ["rt", "io-util"] }
+7 -1
src/main.rs
··· 188 188 let token = token.clone(); 189 189 let db = db.clone(); 190 190 let addr = args.listen; 191 - async move { lightrail::server::serve(addr, db, token).await } 191 + let admin_config = args 192 + .admin_password 193 + .map(|pw| lightrail::server::AdminConfig { 194 + subscribe_host: subscribe_host.clone(), 195 + admin_password: pw, 196 + }); 197 + async move { lightrail::server::serve(addr, db, token, admin_config).await } 192 198 }); 193 199 194 200 if args.deep_crawl {
+111
src/server/admin.rs
··· 1 + use axum::{ 2 + Extension, Json, 3 + extract::State, 4 + http::{HeaderMap, StatusCode, header}, 5 + response::{IntoResponse, Response}, 6 + }; 7 + use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; 8 + use serde::Serialize; 9 + use serde_json::json; 10 + use subtle::ConstantTimeEq; 11 + 12 + use crate::storage::{DbRef, backfill_progress, error::StorageError, repo, resync_queue}; 13 + 14 + use super::AdminConfig; 15 + 16 + #[derive(Serialize)] 17 + pub(super) struct AdminStatus { 18 + repos_synced: u64, 19 + resync_queue_depth: u64, 20 + upstream_backfill_complete: bool, 21 + upstream_backfill_completed_at: Option<String>, 22 + } 23 + 24 + pub(super) enum AdminStatusError { 25 + Unauthorized, 26 + Storage, 27 + } 28 + 29 + impl From<StorageError> for AdminStatusError { 30 + fn from(e: StorageError) -> Self { 31 + tracing::error!("storage error in admin status: {e:#}"); 32 + Self::Storage 33 + } 34 + } 35 + 36 + impl IntoResponse for AdminStatusError { 37 + fn into_response(self) -> Response { 38 + match self { 39 + AdminStatusError::Unauthorized => ( 40 + StatusCode::UNAUTHORIZED, 41 + [(header::WWW_AUTHENTICATE, "Basic realm=\"lightrail admin\"")], 42 + Json(json!({ "error": "Unauthorized", "message": "basic auth required" })), 43 + ) 44 + .into_response(), 45 + AdminStatusError::Storage => ( 46 + StatusCode::INTERNAL_SERVER_ERROR, 47 + Json(json!({ "error": "InternalError", "message": "storage error" })), 48 + ) 49 + .into_response(), 50 + } 51 + } 52 + } 53 + 54 + /// `GET /admin/status` — high-level operational overview. 55 + /// 56 + /// Requires HTTP Basic Auth (any username, password = `--admin-password`). 57 + /// The `WWW-Authenticate` response on 401 triggers browser credential prompts. 58 + pub async fn admin_status( 59 + State(db): State<DbRef>, 60 + Extension(config): Extension<AdminConfig>, 61 + headers: HeaderMap, 62 + ) -> Result<Json<AdminStatus>, AdminStatusError> { 63 + if !check_basic_auth(&headers, &config.admin_password) { 64 + return Err(AdminStatusError::Unauthorized); 65 + } 66 + 67 + let host = config.subscribe_host.clone(); 68 + let (repos_synced, queue_depth, backfill) = tokio::task::spawn_blocking(move || { 69 + let repos_synced = repo::count_repos(&db) as u64; 70 + let queue_depth = resync_queue::count_queued(&db) as u64; 71 + let backfill = backfill_progress::get(&db, &host)?; 72 + Ok::<_, StorageError>((repos_synced, queue_depth, backfill)) 73 + }) 74 + .await 75 + .unwrap()?; 76 + 77 + Ok(Json(AdminStatus { 78 + repos_synced, 79 + resync_queue_depth: queue_depth, 80 + upstream_backfill_complete: backfill 81 + .as_ref() 82 + .and_then(|b| b.completed_at.as_ref()) 83 + .is_some(), 84 + upstream_backfill_completed_at: backfill.and_then(|b| b.completed_at), 85 + })) 86 + } 87 + 88 + /// Check an HTTP Basic Auth header against the expected password. 89 + /// 90 + /// Accepts any username. Uses constant-time byte comparison to avoid leaking 91 + /// whether a guessed password shares a prefix with the real one. 92 + fn check_basic_auth(headers: &HeaderMap, expected_password: &str) -> bool { 93 + let Some(auth) = headers.get(header::AUTHORIZATION) else { 94 + return false; 95 + }; 96 + let Ok(auth_str) = auth.to_str() else { 97 + return false; 98 + }; 99 + let Some(encoded) = auth_str.strip_prefix("Basic ") else { 100 + return false; 101 + }; 102 + let Ok(decoded) = BASE64.decode(encoded.trim()) else { 103 + return false; 104 + }; 105 + // Format is "username:password"; take everything after the first colon. 106 + let password = match decoded.iter().position(|&b| b == b':') { 107 + Some(i) => &decoded[i + 1..], 108 + None => &decoded[..], 109 + }; 110 + bool::from(password.ct_eq(expected_password.as_bytes())) 111 + }
+31 -5
src/server/mod.rs
··· 3 3 //! Serves XRPC endpoints via axum routers built with `jacquard-axum`'s 4 4 //! `IntoRouter` helper. 5 5 6 + mod admin; 6 7 mod get_repo_status; 7 8 mod hello; 8 9 mod list_repos; 9 10 mod list_repos_by_collection; 10 11 12 + use admin::admin_status; 11 13 use get_repo_status::get_repo_status; 12 14 use list_repos::list_repos; 13 15 use list_repos_by_collection::list_repos_by_collection; 14 16 15 17 use std::net::SocketAddr; 16 18 19 + use jacquard_common::url::Host; 20 + 17 21 use crate::com_atproto::sync::list_repos_by_collection::ListReposByCollectionRequest; 18 22 use jacquard_api::com_atproto::sync::{ 19 23 get_repo_status::GetRepoStatusRequest, list_repos::ListReposRequest, ··· 23 27 use crate::error::Result; 24 28 use crate::storage::DbRef; 25 29 30 + /// Config for the admin endpoints. Only constructed when `--admin-password` is 31 + /// set; when absent the `/admin/*` routes are not registered at all. 32 + #[derive(Clone)] 33 + pub struct AdminConfig { 34 + /// The relay/PDS host that lightrail is subscribed to and backfilling from. 35 + pub subscribe_host: Host, 36 + /// Required Bearer token for all admin endpoints. 37 + pub admin_password: String, 38 + } 39 + 26 40 /// Build and serve the axum application on `addr`. 27 41 /// 28 - /// Routes: 42 + /// Routes always registered: 29 43 /// GET /xrpc/com.atproto.sync.getRepoStatus 30 44 /// GET /xrpc/com.atproto.sync.listRepos 31 45 /// GET /xrpc/com.atproto.sync.listReposByCollection 46 + /// 47 + /// Registered only when `admin_config` is `Some`: 48 + /// GET /admin/status 32 49 pub async fn serve( 33 50 addr: SocketAddr, 34 51 db: DbRef, 35 52 token: tokio_util::sync::CancellationToken, 53 + admin_config: Option<AdminConfig>, 36 54 ) -> Result<()> { 37 - let app = GetRepoStatusRequest::into_router(get_repo_status) 55 + let base = GetRepoStatusRequest::into_router(get_repo_status) 38 56 .merge(ListReposRequest::into_router(list_repos)) 39 57 .merge(ListReposByCollectionRequest::into_router( 40 58 list_repos_by_collection, 41 - )) 42 - .with_state(db) 43 - .route("/", axum::routing::get(hello::hello)); 59 + )); 60 + 61 + let app = if let Some(config) = admin_config { 62 + base.route("/admin/status", axum::routing::get(admin_status)) 63 + .route("/", axum::routing::get(hello::hello)) 64 + .with_state(db) 65 + .layer(axum::Extension(config)) 66 + } else { 67 + base.route("/", axum::routing::get(hello::hello)) 68 + .with_state(db) 69 + }; 44 70 45 71 let listener = tokio::net::TcpListener::bind(addr).await?; 46 72 axum::serve(listener, app)
+7
src/storage/repo.rs
··· 260 260 Ok(Some(info.status)) 261 261 } 262 262 263 + /// Count the total number of repos under `rev` 264 + /// 265 + /// Counts how many repos have been sync'd ever, sort of 266 + pub fn count_repos(db: &DbRef) -> usize { 267 + db.ks.prefix(PREFIX_REPO_PREV).count() 268 + } 269 + 263 270 /// Insert a [`RepoInfo`] with `state = Pending` for `did` if no record exists. 264 271 /// 265 272 /// Returns `true` if a new record was inserted, `false` if one already existed.
+7
src/storage/resync_queue.rs
··· 166 166 batch.insert(&db.ks, key(ts, &item.did), encode(item)); 167 167 } 168 168 169 + /// Count the total number of entries currently in the resync queue. 170 + /// 171 + /// Performs a full prefix scan; use only for admin/diagnostic views. 172 + pub fn count_queued(db: &DbRef) -> usize { 173 + db.ks.prefix(key_prefix_all()).count() 174 + } 175 + 169 176 /// Enqueue a repo for resync at the given Unix timestamp (seconds). 170 177 pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem) -> StorageResult<()> { 171 178 let mut batch = db.database.batch();