Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'feat/feeds' into 'main'

feat: Feeds

Closes #7

See merge request parakeet-social/parakeet!17

Mia 69f231be 4f611891

+281 -30
+1
Cargo.lock
··· 2748 2748 "multibase", 2749 2749 "parakeet-db", 2750 2750 "parakeet-index", 2751 + "reqwest", 2751 2752 "serde", 2752 2753 "serde_json", 2753 2754 "tokio",
+46 -8
lexica/src/app_bsky/feed.rs
··· 5 5 use crate::app_bsky::richtext::FacetMain; 6 6 use crate::com_atproto::label::Label; 7 7 use chrono::prelude::*; 8 - use serde::Serialize; 8 + use serde::{Deserialize, Serialize}; 9 9 use std::str::FromStr; 10 10 11 11 #[derive(Clone, Debug, Serialize)] ··· 39 39 pub reply: Option<ReplyRef>, 40 40 #[serde(skip_serializing_if = "Option::is_none")] 41 41 pub reason: Option<FeedViewPostReason>, 42 - // #[serde(skip_serializing_if = "Option::is_none")] 43 - // pub feed_context: Option<String>, 42 + #[serde(skip_serializing_if = "Option::is_none")] 43 + pub feed_context: Option<String>, 44 44 } 45 45 46 46 #[derive(Debug, Serialize)] ··· 75 75 #[serde(tag = "$type")] 76 76 pub enum FeedViewPostReason { 77 77 #[serde(rename = "app.bsky.feed.defs#reasonRepost")] 78 - Repost { 79 - by: ProfileViewBasic, 80 - #[serde(rename = "indexedAt")] 81 - indexed_at: DateTime<Utc>, 82 - }, 78 + Repost(FeedReasonRepost), 83 79 #[serde(rename = "app.bsky.feed.defs#reasonPin")] 84 80 Pin, 81 + } 82 + 83 + #[derive(Debug, Serialize)] 84 + #[serde(rename_all = "camelCase")] 85 + pub struct FeedReasonRepost { 86 + pub by: ProfileViewBasic, 87 + #[serde(skip_serializing_if = "Option::is_none")] 88 + pub uri: Option<String>, 89 + #[serde(skip_serializing_if = "Option::is_none")] 90 + pub cid: Option<String>, 91 + pub indexed_at: DateTime<Utc>, 85 92 } 86 93 87 94 #[derive(Debug, Serialize)] ··· 185 192 pub created_at: DateTime<Utc>, 186 193 pub indexed_at: DateTime<Utc>, 187 194 } 195 + 196 + #[derive(Clone, Debug, Deserialize, Serialize)] 197 + #[serde(rename_all = "camelCase")] 198 + pub struct FeedSkeletonResponse { 199 + pub feed: Vec<SkeletonFeedPost>, 200 + #[serde(skip_serializing_if = "Option::is_none")] 201 + pub cursor: Option<String>, 202 + #[serde(skip_serializing_if = "Option::is_none")] 203 + pub req_id: Option<String>, 204 + } 205 + 206 + #[derive(Clone, Debug, Deserialize, Serialize)] 207 + #[serde(rename_all = "camelCase")] 208 + pub struct SkeletonFeedPost { 209 + pub post: String, 210 + #[serde(skip_serializing_if = "Option::is_none")] 211 + pub reason: Option<SkeletonReason>, 212 + #[serde(skip_serializing_if = "Option::is_none")] 213 + pub feed_context: Option<String>, 214 + } 215 + 216 + #[derive(Clone, Debug, Deserialize, Serialize)] 217 + #[serde(tag = "$type")] 218 + pub enum SkeletonReason { 219 + #[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")] 220 + Pin {}, 221 + #[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")] 222 + Repost { 223 + repost: String, 224 + }, 225 + }
+1
parakeet/Cargo.toml
··· 23 23 multibase = "0.9.1" 24 24 parakeet-db = { path = "../parakeet-db" } 25 25 parakeet-index = { path = "../parakeet-index" } 26 + reqwest = { version = "0.12", features = ["json"] } 26 27 serde = { version = "1.0.217", features = ["derive"] } 27 28 serde_json = "1.0.134" 28 29 tokio = { version = "1.42.0", features = ["full"] }
+1
parakeet/src/hydration/posts.rs
··· 207 207 post, 208 208 reply, 209 209 reason: None, 210 + feed_context: None, 210 211 }, 211 212 )) 212 213 })
+5 -3
parakeet/src/main.rs
··· 19 19 pub struct GlobalState { 20 20 pub pool: Pool<AsyncPgConnection>, 21 21 pub dataloaders: Arc<loaders::Dataloaders>, 22 + pub resolver: Arc<did_resolver::Resolver>, 22 23 pub index_client: parakeet_index::Client, 23 24 pub jwt: Arc<xrpc::jwt::JwtVerifier>, 24 25 pub cdn: Arc<xrpc::cdn::BskyCdn>, ··· 51 52 pool.clone(), 52 53 index_client.clone(), 53 54 )); 54 - let resolver = did_resolver::Resolver::new(did_resolver::ResolverOpts { 55 + let resolver = Arc::new(did_resolver::Resolver::new(did_resolver::ResolverOpts { 55 56 plc_directory: conf.plc_directory, 56 57 ..Default::default() 57 - })?; 58 + })?); 58 59 let jwt = Arc::new(xrpc::jwt::JwtVerifier::new( 59 60 conf.service.did.clone(), 60 - resolver, 61 + resolver.clone(), 61 62 )); 62 63 63 64 let cdn = Arc::new(xrpc::cdn::BskyCdn::new(conf.cdn.base, conf.cdn.video_base)); ··· 82 83 .with_state(GlobalState { 83 84 pool, 84 85 dataloaders, 86 + resolver, 85 87 index_client, 86 88 jwt, 87 89 cdn,
+215 -10
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 5 5 use crate::xrpc::{check_actor_status, datetime_cursor, get_actor_did, normalise_at_uri}; 6 6 use crate::GlobalState; 7 7 use axum::extract::{Query, State}; 8 + use axum::http::StatusCode; 8 9 use axum::Json; 9 10 use axum_extra::extract::Query as ExtraQuery; 11 + use axum_extra::headers::authorization::Bearer; 12 + use axum_extra::headers::Authorization; 13 + use axum_extra::TypedHeader; 14 + use chrono::prelude::*; 10 15 use diesel::prelude::*; 11 - use diesel_async::RunQueryDsl; 16 + use diesel_async::{AsyncPgConnection, RunQueryDsl}; 12 17 use lexica::app_bsky::actor::ProfileView; 13 18 use lexica::app_bsky::feed::{ 14 - FeedViewPost, PostView, ThreadViewPost, ThreadViewPostType, ThreadgateView, 19 + FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, PostView, 20 + SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 15 21 }; 16 22 use parakeet_db::schema; 23 + use reqwest::Url; 17 24 use serde::{Deserialize, Serialize}; 18 25 use std::collections::HashMap; 19 26 20 - // TODO: getFeed: once we get auth! 27 + const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 28 + 29 + #[derive(Debug, Serialize)] 30 + pub struct FeedRes { 31 + #[serde(skip_serializing_if = "Option::is_none")] 32 + cursor: Option<String>, 33 + feed: Vec<FeedViewPost>, 34 + } 35 + 36 + #[derive(Debug, Deserialize)] 37 + pub struct GetFeedQuery { 38 + pub feed: String, 39 + pub limit: Option<u8>, 40 + pub cursor: Option<String>, 41 + } 42 + 43 + pub async fn get_feed( 44 + State(state): State<GlobalState>, 45 + // we have to use Bearer because the tokens come with `aud` set to the feedgen did. 46 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 47 + maybe_tok: Option<TypedHeader<Authorization<Bearer>>>, 48 + Query(query): Query<GetFeedQuery>, 49 + ) -> XrpcResult<Json<FeedRes>> { 50 + let mut conn = state.pool.get().await?; 51 + 52 + // first, look up the feedgen 53 + let service_did: String = schema::feedgens::table 54 + .select(schema::feedgens::service_did) 55 + .find(&query.feed) 56 + .get_result(&mut conn) 57 + .await?; 58 + 59 + // resolve the did 60 + let did_doc = match state.resolver.resolve_did(&service_did).await { 61 + Ok(Some(did_doc)) => did_doc, 62 + Ok(None) => return Err(Error::invalid_request(None)), 63 + Err(err) => { 64 + tracing::error!( 65 + feedgen = service_did, 66 + "failed to resolve feedgen service did: {err}" 67 + ); 68 + return Err(Error::invalid_request(None)); 69 + } 70 + }; 71 + 72 + // find the service 73 + let Some(service) = did_doc.find_service_by_id(FEEDGEN_SERVICE_ID) else { 74 + tracing::error!( 75 + feedgen = service_did, 76 + "DID doc didn't contain BskyFeedGenerator service" 77 + ); 78 + return Err(Error::invalid_request(None)); 79 + }; 80 + 81 + let endpoint = service.service_endpoint.clone(); 82 + let skeleton = get_feed_skeleton( 83 + &query.feed, 84 + &endpoint, 85 + maybe_tok.as_ref(), 86 + query.limit, 87 + query.cursor, 88 + ) 89 + .await?; 90 + 91 + let maybe_auth = match maybe_tok { 92 + Some(hdr) => { 93 + match state 94 + .jwt 95 + .resolve_and_verify_jwt(hdr.token(), Some(&service_did)) 96 + .await 97 + { 98 + Some(claims) => match &state.did_allowlist { 99 + Some(allowlist) if !allowlist.contains(&claims.iss) => { 100 + return Err(Error::new( 101 + StatusCode::FORBIDDEN, 102 + "forbidden".to_string(), 103 + None, 104 + )); 105 + } 106 + _ => Some(AtpAuth(claims.iss)), 107 + }, 108 + None => None, 109 + } 110 + } 111 + None => None, 112 + }; 113 + 114 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 115 + 116 + let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect(); 117 + let repost_skeleton = skeleton 118 + .feed 119 + .iter() 120 + .filter_map(|v| match &v.reason { 121 + Some(SkeletonReason::Repost { repost }) => Some(repost.clone()), 122 + _ => None, 123 + }) 124 + .collect::<Vec<_>>(); 125 + 126 + let mut posts = hyd.hydrate_feed_posts(at_uris).await; 127 + let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await; 128 + 129 + let feed = skeleton 130 + .feed 131 + .into_iter() 132 + .filter_map(|item| { 133 + let mut post = posts.remove(&item.post)?; 134 + let reason = match item.reason { 135 + Some(SkeletonReason::Repost { repost }) => { 136 + repost_data.remove(&repost).map(FeedViewPostReason::Repost) 137 + } 138 + Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin), 139 + _ => None, 140 + }; 141 + 142 + post.reason = reason; 143 + post.feed_context = item.feed_context; 144 + 145 + Some(post) 146 + }) 147 + .collect(); 148 + 149 + Ok(Json(FeedRes { 150 + cursor: skeleton.cursor, 151 + feed, 152 + })) 153 + } 21 154 22 155 #[derive(Debug, Deserialize)] 23 156 #[serde(rename_all = "snake_case")] ··· 45 178 pub filter: GetAuthorFeedFilter, 46 179 #[serde(default)] 47 180 pub include_pins: bool, 48 - } 49 - 50 - #[derive(Debug, Serialize)] 51 - pub struct FeedRes { 52 - #[serde(skip_serializing_if = "Option::is_none")] 53 - cursor: Option<String>, 54 - feed: Vec<FeedViewPost>, 55 181 } 56 182 57 183 pub async fn get_author_feed( ··· 460 586 .eq_any(filter) 461 587 .or(schema::posts::embed_subtype.eq_any(filter)) 462 588 } 589 + 590 + async fn get_feed_skeleton( 591 + feed: &str, 592 + service: &str, 593 + maybe_tok: Option<&TypedHeader<Authorization<Bearer>>>, 594 + limit: Option<u8>, 595 + cursor: Option<String>, 596 + ) -> XrpcResult<FeedSkeletonResponse> { 597 + let mut params = vec![("feed", feed.to_string())]; 598 + 599 + if let Some(cursor) = cursor { 600 + params.push(("cursor", cursor)); 601 + } 602 + if let Some(limit) = limit { 603 + params.push(("limit", limit.to_string())); 604 + } 605 + let url = Url::parse_with_params( 606 + &format!("{service}/xrpc/app.bsky.feed.getFeedSkeleton"), 607 + params, 608 + ) 609 + .unwrap(); 610 + 611 + let mut req = reqwest::Client::new().get(url); 612 + if let Some(auth) = maybe_tok { 613 + req = req.bearer_auth(auth.token()); 614 + } 615 + 616 + match req.send().await { 617 + Ok(skeleton) => match skeleton.json().await { 618 + Ok(skeleton) => Ok(skeleton), 619 + Err(err) => { 620 + tracing::error!("Failed to parse feed skeleton: {err}"); 621 + Err(Error::server_error(Some("Failed to fetch feed skeleton"))) 622 + } 623 + }, 624 + Err(err) => { 625 + tracing::error!("Failed to fetch feed skeleton: {err}"); 626 + Err(Error::server_error(Some("Failed to fetch feed skeleton"))) 627 + } 628 + } 629 + } 630 + 631 + async fn get_skeleton_repost_data<'a>( 632 + conn: &mut AsyncPgConnection, 633 + hyd: &StatefulHydrator<'a>, 634 + reposts: Vec<String>, 635 + ) -> HashMap<String, FeedReasonRepost> { 636 + let Ok(repost_data) = schema::records::table 637 + .select(( 638 + schema::records::at_uri, 639 + schema::records::did, 640 + schema::records::indexed_at, 641 + )) 642 + .filter(schema::records::at_uri.eq_any(&reposts)) 643 + .get_results::<(String, String, NaiveDateTime)>(conn) 644 + .await 645 + else { 646 + return HashMap::new(); 647 + }; 648 + 649 + let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect(); 650 + let profiles = hyd.hydrate_profiles_basic(profiles).await; 651 + 652 + repost_data 653 + .into_iter() 654 + .filter_map(|(uri, did, indexed_at)| { 655 + let by = profiles.get(&did).cloned()?; 656 + 657 + let repost = FeedReasonRepost { 658 + by, 659 + uri: Some(uri.clone()), 660 + cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it. 661 + indexed_at: indexed_at.and_utc(), 662 + }; 663 + 664 + Some((uri, repost)) 665 + }) 666 + .collect() 667 + }
+1
parakeet/src/xrpc/app_bsky/mod.rs
··· 14 14 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 15 15 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 16 16 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed)) 17 + .route("/app.bsky.feed.getFeed", get(feed::posts::get_feed)) 17 18 .route("/app.bsky.feed.getLikes", get(feed::likes::get_likes)) 18 19 .route("/app.bsky.feed.getListFeed", get(feed::posts::get_list_feed)) 19 20 .route("/app.bsky.feed.getPostThread", get(feed::posts::get_post_thread))
+2 -2
parakeet/src/xrpc/extract.rs
··· 81 81 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))? 82 82 .ok_or((StatusCode::UNAUTHORIZED, "missing JWT".to_string()))?; 83 83 84 - match state.jwt.resolve_and_verify_jwt(hdr.token()).await { 84 + match state.jwt.resolve_and_verify_jwt(hdr.token(), None).await { 85 85 Some(claims) => match &state.did_allowlist { 86 86 Some(allowlist) if !allowlist.contains(&claims.iss) => { 87 87 Err((StatusCode::FORBIDDEN, "forbidden".to_string())) ··· 110 110 return Ok(None); 111 111 }; 112 112 113 - match state.jwt.resolve_and_verify_jwt(hdr.token()).await { 113 + match state.jwt.resolve_and_verify_jwt(hdr.token(), None).await { 114 114 Some(claims) => match &state.did_allowlist { 115 115 Some(allowlist) if !allowlist.contains(&claims.iss) => { 116 116 Err((StatusCode::FORBIDDEN, "forbidden".to_string()))
+9 -7
parakeet/src/xrpc/jwt.rs
··· 2 2 use jsonwebtoken::{Algorithm, DecodingKey, Validation}; 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::collections::HashMap; 5 - use std::sync::LazyLock; 5 + use std::sync::{Arc, LazyLock}; 6 6 use tokio::sync::RwLock; 7 7 8 8 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); ··· 25 25 26 26 pub struct JwtVerifier { 27 27 aud: String, 28 - resolver: Resolver, 28 + resolver: Arc<Resolver>, 29 29 key_cache: RwLock<HashMap<String, String>>, 30 30 } 31 31 32 32 impl JwtVerifier { 33 - pub fn new(aud: String, resolver: Resolver) -> Self { 33 + pub fn new(aud: String, resolver: Arc<Resolver>) -> Self { 34 34 JwtVerifier { 35 35 aud, 36 36 resolver, ··· 38 38 } 39 39 } 40 40 41 - pub async fn resolve_and_verify_jwt(&self, token: &str) -> Option<Claims> { 41 + pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 42 42 // first we need to decode without verifying, to get iss. 43 43 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; 44 44 let unsafe_iss = unsafe_data.claims.iss; ··· 52 52 None => self.resolve_key(&unsafe_iss).await?, 53 53 }; 54 54 55 - self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg) 55 + let aud = aud.unwrap_or(&self.aud); 56 + self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 56 57 } 57 58 58 59 async fn resolve_key(&self, did: &str) -> Option<String> { ··· 73 74 pub fn verify_jwt_multibase(&self, token: &str, multibase_key: &str) -> Option<Claims> { 74 75 let alg = jsonwebtoken::decode_header(token).ok()?.alg; 75 76 76 - self.verify_jwt_multibase_with_alg(token, multibase_key, alg) 77 + self.verify_jwt_multibase_with_alg(token, multibase_key, alg, &self.aud) 77 78 } 78 79 79 80 pub fn verify_jwt_multibase_with_alg( ··· 81 82 token: &str, 82 83 multibase_key: &str, 83 84 alg: Algorithm, 85 + aud: &str, 84 86 ) -> Option<Claims> { 85 87 // decode the multibase key 86 88 let (_, key) = multibase::decode(multibase_key).ok()?; ··· 88 90 let key = DecodingKey::from_ec_der(&key[2..]); 89 91 90 92 let mut validation = Validation::new(alg); 91 - validation.set_audience(&[&self.aud]); 93 + validation.set_audience(&[&aud]); 92 94 93 95 let decoded = jsonwebtoken::decode::<Claims>(token, &key, &validation).ok()?; 94 96