Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: parakeet control base

Mia 924b6f6a 807cd8c5

+79
+1
crates/parakeet/src/config.rs
··· 27 27 #[serde(default)] 28 28 pub cdn: ConfigCdn, 29 29 pub did_allowlist: Option<Vec<String>>, 30 + pub admin_dids: Option<Vec<String>>, 30 31 #[serde(default)] 31 32 pub migrate: bool, 32 33 }
+2
crates/parakeet/src/main.rs
··· 30 30 pub jwt: Arc<xrpc::jwt::JwtVerifier>, 31 31 pub cdn: Arc<xrpc::cdn::BskyCdn>, 32 32 pub did_allowlist: Option<Vec<String>>, 33 + pub admins: Option<Vec<String>>, 33 34 } 34 35 35 36 #[tokio::main] ··· 107 108 jwt, 108 109 cdn, 109 110 did_allowlist: conf.did_allowlist, 111 + admins: conf.admin_dids, 110 112 }); 111 113 112 114 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port);
+55
crates/parakeet/src/xrpc/at_parakeet/admin.rs
··· 1 + use super::check_admin_did; 2 + use crate::xrpc::error::{Error, XrpcResult}; 3 + use crate::xrpc::extract::AtpAuth; 4 + use crate::GlobalState; 5 + use axum::extract::State; 6 + use axum::http::StatusCode; 7 + use axum::Json; 8 + use redis::AsyncTypedCommands; 9 + use serde::{Deserialize, Serialize}; 10 + 11 + const BACKFILL_QUEUE: &str = "backfill_queue"; 12 + 13 + #[derive(Debug, Serialize)] 14 + pub struct BackfillQueueSizeRes { 15 + size: usize, 16 + } 17 + 18 + pub async fn backfill_queue_size( 19 + State(mut state): State<GlobalState>, 20 + auth: AtpAuth, 21 + ) -> XrpcResult<Json<BackfillQueueSizeRes>> { 22 + if !check_admin_did(&state, &auth.0) { 23 + return Err(Error::new(StatusCode::FORBIDDEN, "Forbidden", None)); 24 + } 25 + 26 + match state.redis_mp.llen(BACKFILL_QUEUE).await { 27 + Ok(size) => Ok(Json(BackfillQueueSizeRes { size })), 28 + Err(e) => { 29 + tracing::error!("failed to read backfill queue size: {e}"); 30 + Err(Error::server_error(None)) 31 + } 32 + } 33 + } 34 + 35 + #[derive(Debug, Deserialize)] 36 + pub struct RequestBackfillReq { 37 + pub dids: Vec<String>, 38 + } 39 + 40 + pub async fn request_backfill( 41 + State(mut state): State<GlobalState>, 42 + auth: AtpAuth, 43 + Json(form): Json<RequestBackfillReq>, 44 + ) -> XrpcResult<()> { 45 + if !check_admin_did(&state, &auth.0) { 46 + return Err(Error::new(StatusCode::FORBIDDEN, "Forbidden", None)); 47 + } 48 + 49 + if let Err(e) = state.redis_mp.rpush(BACKFILL_QUEUE, form.dids).await { 50 + tracing::error!("failed to push to backfill queue: {e}"); 51 + return Err(Error::server_error(None)); 52 + } 53 + 54 + Ok(()) 55 + }
+19
crates/parakeet/src/xrpc/at_parakeet/mod.rs
··· 1 + use axum::routing::{get, post}; 2 + use axum::Router; 3 + 4 + mod admin; 5 + 6 + #[rustfmt::skip] 7 + pub fn routes() -> Router<crate::GlobalState> { 8 + Router::new() 9 + .route("/at.parakeet.admin.backfillQueueSize", get(admin::backfill_queue_size)) 10 + .route("/at.parakeet.admin.requestBackfill", post(admin::request_backfill)) 11 + } 12 + 13 + pub fn check_admin_did(state: &crate::GlobalState, did: &String) -> bool { 14 + state 15 + .admins 16 + .as_ref() 17 + .map(|admins| admins.contains(did)) 18 + .unwrap_or_default() 19 + }
+2
crates/parakeet/src/xrpc/mod.rs
··· 6 6 use std::str::FromStr; 7 7 8 8 mod app_bsky; 9 + mod at_parakeet; 9 10 pub mod cdn; 10 11 mod com_atproto; 11 12 mod community_lexicon; ··· 16 17 pub fn xrpc_routes() -> Router<crate::GlobalState> { 17 18 Router::new() 18 19 .merge(app_bsky::routes()) 20 + .merge(at_parakeet::routes()) 19 21 .merge(com_atproto::routes()) 20 22 .merge(community_lexicon::routes()) 21 23 }