Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: few formatting and clippy nits

Mia 636e1ae8 97e80e8d

+74 -42
+4 -1
consumer/src/backfill/db.rs
··· 44 44 sync_state: types::ActorSyncState, 45 45 ) -> QueryResult<usize> { 46 46 diesel::update(schema::actors::table) 47 - .set((schema::actors::status.eq(status), schema::actors::sync_state.eq(sync_state))) 47 + .set(( 48 + schema::actors::status.eq(status), 49 + schema::actors::sync_state.eq(sync_state), 50 + )) 48 51 .filter(schema::actors::did.eq(did)) 49 52 .execute(conn) 50 53 .await
+1 -1
consumer/src/backfill/mod.rs
··· 6 6 use diesel_async::{AsyncConnection, AsyncPgConnection}; 7 7 use flume::{unbounded, Receiver, Sender}; 8 8 use ipld_core::cid::Cid; 9 + use metrics::counter; 9 10 use parakeet_db::types::{ActorStatus, ActorSyncState}; 10 11 use reqwest::{Client, StatusCode}; 11 12 use std::str::FromStr; 12 13 use std::sync::Arc; 13 - use metrics::counter; 14 14 use tracing::{instrument, Instrument}; 15 15 16 16 mod db;
+2 -2
consumer/src/backfill/types.rs
··· 1 + use crate::indexer::types::RecordTypes; 1 2 use ipld_core::cid::Cid; 2 3 use serde::Deserialize; 3 4 use serde_bytes::ByteBuf; 4 - use crate::indexer::types::RecordTypes; 5 5 6 6 #[derive(Debug, Deserialize)] 7 7 #[serde(untagged)] ··· 41 41 pub active: bool, 42 42 pub status: Option<crate::firehose::AtpAccountStatus>, 43 43 pub rev: Option<String>, 44 - } 44 + }
+6 -2
consumer/src/config.rs
··· 37 37 Realtime, 38 38 } 39 39 40 - fn default_backfill_workers() -> u8 { 4 } 40 + fn default_backfill_workers() -> u8 { 41 + 4 42 + } 41 43 42 - fn default_indexer_workers() -> u8 { 4 } 44 + fn default_indexer_workers() -> u8 { 45 + 4 46 + }
+2 -6
consumer/src/firehose/mod.rs
··· 21 21 let cursor = start_cursor.unwrap_or(0); 22 22 let mut request = format!("{url}/xrpc/com.atproto.sync.subscribeRepos?cursor={cursor}") 23 23 .into_client_request()?; 24 - request 25 - .headers_mut() 26 - .insert(USER_AGENT, ua.parse()?); 24 + request.headers_mut().insert(USER_AGENT, ua.parse()?); 27 25 28 26 let (wss, _) = tokio_tungstenite::connect_async(request).await?; 29 27 let (_, stream) = wss.split(); ··· 38 36 let cursor = start_cursor.unwrap_or(0); 39 37 let mut request = format!("{url}/xrpc/com.atproto.label.subscribeLabels?cursor={cursor}") 40 38 .into_client_request()?; 41 - request 42 - .headers_mut() 43 - .insert(USER_AGENT, ua.parse()?); 39 + request.headers_mut().insert(USER_AGENT, ua.parse()?); 44 40 45 41 let (wss, _) = tokio_tungstenite::connect_async(request).await?; 46 42 let (_, stream) = wss.split();
+12 -4
consumer/src/indexer/db.rs
··· 636 636 created_at: rec.created_at.naive_utc(), 637 637 }; 638 638 639 - diesel::insert_into(schema::likes::table).values(&data).execute(conn).await 639 + diesel::insert_into(schema::likes::table) 640 + .values(&data) 641 + .execute(conn) 642 + .await 640 643 } 641 644 642 645 pub async fn delete_like(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> { ··· 660 663 created_at: rec.created_at.naive_utc(), 661 664 }; 662 665 663 - diesel::insert_into(schema::reposts::table).values(&data).execute(conn).await 666 + diesel::insert_into(schema::reposts::table) 667 + .values(&data) 668 + .execute(conn) 669 + .await 664 670 } 665 671 666 672 pub async fn delete_repost(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> { ··· 900 906 .on_conflict(schema::verification::at_uri) 901 907 .do_update() 902 908 .set(&data) 903 - .execute(conn).await 909 + .execute(conn) 910 + .await 904 911 } 905 912 906 913 pub async fn delete_verification(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> { 907 914 diesel::delete(schema::verification::table) 908 915 .filter(schema::verification::at_uri.eq(at_uri)) 909 - .execute(conn).await 916 + .execute(conn) 917 + .await 910 918 }
+6 -3
consumer/src/indexer/records.rs
··· 60 60 61 61 impl AppBskyEmbed { 62 62 pub fn record_with_media_allowed(&self) -> bool { 63 - matches!(self, AppBskyEmbed::Images(_) | AppBskyEmbed::Video(_) | AppBskyEmbed::External(_)) 63 + matches!( 64 + self, 65 + AppBskyEmbed::Images(_) | AppBskyEmbed::Video(_) | AppBskyEmbed::External(_) 66 + ) 64 67 } 65 68 66 69 pub fn as_str(&self) -> &'static str { ··· 196 199 #[serde(tag = "$type")] 197 200 pub enum PostgateEmbeddingRules { 198 201 #[serde(rename = "app.bsky.feed.postgate#disableRule")] 199 - Disable 202 + Disable, 200 203 } 201 204 202 205 impl PostgateEmbeddingRules { ··· 237 240 #[serde(rename = "app.bsky.feed.threadgate#followingRule")] 238 241 Following, 239 242 #[serde(rename = "app.bsky.feed.threadgate#listRule")] 240 - List { list: String } 243 + List { list: String }, 241 244 } 242 245 243 246 impl ThreadgateRule {
+2 -2
consumer/src/indexer/types.rs
··· 1 - use ipld_core::cid::Cid; 2 1 use super::records; 2 + use ipld_core::cid::Cid; 3 3 use serde::{Deserialize, Serialize}; 4 4 5 5 #[derive(Debug, Deserialize, Serialize)] ··· 120 120 Create(RecordTypes), 121 121 Update(RecordTypes), 122 122 Delete, 123 - } 123 + }
+10 -2
consumer/src/main.rs
··· 31 31 ..Default::default() 32 32 })?); 33 33 34 - let (label_mgr, label_svc_tx) = label_indexer::LabelServiceManager::new(&conf.database_url, resolver.clone(), user_agent.clone()).await?; 34 + let (label_mgr, label_svc_tx) = label_indexer::LabelServiceManager::new( 35 + &conf.database_url, 36 + resolver.clone(), 37 + user_agent.clone(), 38 + ) 39 + .await?; 35 40 let relay_firehose = 36 41 firehose::FirehoseConsumer::new_relay(&conf.relay_source, None, &user_agent).await?; 37 42 let (backfiller, backfill_tx) = ··· 52 57 tokio::spawn(label_mgr.run(conf.initial_label_services)), 53 58 }?; 54 59 55 - firehose_res.and(indexer_res).and(backfill_res).and(label_res) 60 + firehose_res 61 + .and(indexer_res) 62 + .and(backfill_res) 63 + .and(label_res) 56 64 } 57 65 58 66 async fn relay_consumer(
+2 -2
consumer/src/utils.rs
··· 43 43 } 44 44 45 45 pub fn empty_str_as_none(input: String) -> Option<String> { 46 - match input.is_empty() { 46 + match input.is_empty() { 47 47 true => None, 48 - false => Some(input) 48 + false => Some(input), 49 49 } 50 50 }
+7 -6
lexica/src/app_bsky/actor.rs
··· 1 1 use crate::com_atproto::label::Label; 2 2 use chrono::prelude::*; 3 3 use serde::{Deserialize, Serialize}; 4 + use std::fmt::Display; 4 5 use std::str::FromStr; 5 6 6 7 #[derive(Clone, Default, Debug, Serialize)] ··· 28 29 Following, 29 30 } 30 31 31 - impl ToString for ChatAllowIncoming { 32 - fn to_string(&self) -> String { 32 + impl Display for ChatAllowIncoming { 33 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 33 34 match self { 34 - ChatAllowIncoming::All => "all".into(), 35 - ChatAllowIncoming::None => "none".into(), 36 - ChatAllowIncoming::Following => "following".into(), 35 + ChatAllowIncoming::All => write!(f, "all"), 36 + ChatAllowIncoming::None => write!(f, "none"), 37 + ChatAllowIncoming::Following => write!(f, "following"), 37 38 } 38 39 } 39 40 } ··· 46 47 "all" => Ok(ChatAllowIncoming::All), 47 48 "none" => Ok(ChatAllowIncoming::None), 48 49 "following" => Ok(ChatAllowIncoming::Following), 49 - x => Err(format!("Unrecognized variant {}", x).into()), 50 + x => Err(format!("Unrecognized variant {}", x)), 50 51 } 51 52 } 52 53 }
+1 -1
lexica/src/app_bsky/mod.rs
··· 14 14 pub repost_count: i64, 15 15 pub like_count: i64, 16 16 pub quote_count: i64, 17 - } 17 + }
+1 -1
parakeet/src/hydration/profile.rs
··· 68 68 let (verifications, verified_status) = match verification { 69 69 Some(verif) => { 70 70 let verifications = 71 - get_verifications(&accept_verifiers, verif, handle, &profile.display_name); 71 + get_verifications(accept_verifiers, verif, handle, &profile.display_name); 72 72 73 73 let status = verifications 74 74 .iter()
+8 -2
parakeet/src/main.rs
··· 27 27 28 28 let dataloaders = Arc::new(loaders::Dataloaders::new(pool.clone())); 29 29 30 + #[allow(unused)] 30 31 hydration::TRUSTED_VERIFIERS.set(conf.trusted_verifiers); 31 32 32 - let cors = CorsLayer::new().allow_headers(AllowHeaders::any()).allow_origin(AllowOrigin::any()); 33 + let cors = CorsLayer::new() 34 + .allow_headers(AllowHeaders::any()) 35 + .allow_origin(AllowOrigin::any()); 33 36 34 37 let did_doc = did_web_doc(&conf.service); 35 38 36 39 let app = axum::Router::new() 37 40 .nest("/xrpc", xrpc::xrpc_routes()) 38 - .route("/.well-known/did.json", axum::routing::get(async || axum::Json(did_doc))) 41 + .route( 42 + "/.well-known/did.json", 43 + axum::routing::get(async || axum::Json(did_doc)), 44 + ) 39 45 .layer(TraceLayer::new_for_http()) 40 46 .layer(cors) 41 47 .with_state(GlobalState { pool, dataloaders });
+8 -5
parakeet/src/xrpc/com_atproto/identity.rs
··· 1 + use crate::xrpc::error::XrpcResult; 2 + use crate::xrpc::get_actor_did; 3 + use crate::GlobalState; 1 4 use axum::extract::{Query, State}; 2 5 use axum::Json; 3 - use crate::GlobalState; 4 - use crate::xrpc::error::XrpcResult; 5 - use crate::xrpc::{get_actor_did}; 6 6 use serde::{Deserialize, Serialize}; 7 7 8 8 #[derive(Debug, Deserialize)] ··· 15 15 pub did: String, 16 16 } 17 17 18 - pub async fn resolve_handle(State(state): State<GlobalState>, Query(query): Query<ResolveHandleQuery>) -> XrpcResult<Json<ResolveHandleRes>> { 18 + pub async fn resolve_handle( 19 + State(state): State<GlobalState>, 20 + Query(query): Query<ResolveHandleQuery>, 21 + ) -> XrpcResult<Json<ResolveHandleRes>> { 19 22 let did = get_actor_did(&state.dataloaders, query.handle).await?; 20 23 21 24 Ok(Json(ResolveHandleRes { did })) 22 - } 25 + }
+2 -2
parakeet/src/xrpc/com_atproto/mod.rs
··· 1 - use axum::Router; 2 1 use axum::routing::get; 2 + use axum::Router; 3 3 4 4 mod identity; 5 5 6 6 pub fn routes() -> Router<crate::GlobalState> { 7 7 Router::new() 8 8 .route("/com.atproto.identity.resolveHandle", get(identity::resolve_handle)) 9 - } 9 + }