Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: formatting & clippy fixes i've been neglecting

Mia 9ca43755 e8510d71

+37 -38
+2 -7
consumer/src/db/copy.rs
··· 5 5 use deadpool_postgres::Transaction; 6 6 use futures::pin_mut; 7 7 use ipld_core::cid::Cid; 8 + use lexica::StrongRef; 8 9 use tokio_postgres::binary_copy::BinaryCopyInWriter; 9 10 use tokio_postgres::types::Type; 10 - use lexica::StrongRef; 11 11 12 12 // StrongRefs are used in both likes and reposts 13 13 const STRONGREF_TYPES: &[Type] = &[ ··· 19 19 Type::TEXT, 20 20 Type::TIMESTAMP, 21 21 ]; 22 - type StrongRefRow = ( 23 - String, 24 - StrongRef, 25 - Option<StrongRef>, 26 - DateTime<Utc>, 27 - ); 22 + type StrongRefRow = (String, StrongRef, Option<StrongRef>, DateTime<Utc>); 28 23 29 24 // SubjectRefs are used in both blocks and follows 30 25 const SUBJECT_TYPES: &[Type] = &[Type::TEXT, Type::TEXT, Type::TEXT, Type::TIMESTAMP];
+8 -1
consumer/src/db/record.rs
··· 37 37 38 38 conn.execute( 39 39 include_str!("sql/bookmarks_upsert.sql"), 40 - &[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at], 40 + &[ 41 + &repo, 42 + &rkey, 43 + &rec.subject, 44 + &rec_type, 45 + &rec.tags, 46 + &rec.created_at, 47 + ], 41 48 ) 42 49 .await 43 50 }
+1 -2
consumer/src/firehose/mod.rs
··· 119 119 } 120 120 "#sync" => { 121 121 counter!("firehose_events.total", "event" => "sync").increment(1); 122 - let event: AtpSyncEvent = 123 - serde_ipld_dagcbor::from_reader(&mut reader)?; 122 + let event: AtpSyncEvent = serde_ipld_dagcbor::from_reader(&mut reader)?; 124 123 125 124 // increment the seq 126 125 if self.seq < event.seq {
+3 -4
consumer/src/indexer/mod.rs
··· 213 213 rc: &mut MultiplexedConnection, 214 214 sync: AtpSyncEvent, 215 215 ) -> eyre::Result<()> { 216 - let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? else { 216 + let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? 217 + else { 217 218 return Ok(()); 218 219 }; 219 220 ··· 866 867 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 867 868 db::chat_decl_delete(conn, repo).await? 868 869 } 869 - CollectionType::CommunityLexiconBookmark => { 870 - db::bookmark_delete(conn, rkey, repo).await? 871 - } 870 + CollectionType::CommunityLexiconBookmark => db::bookmark_delete(conn, rkey, repo).await?, 872 871 _ => unreachable!(), 873 872 }; 874 873
+1 -1
consumer/src/indexer/types.rs
··· 42 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 44 #[serde(rename = "community.lexicon.bookmarks.bookmark")] 45 - CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark) 45 + CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark), 46 46 } 47 47 48 48 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)]
+2 -4
consumer/src/utils.rs
··· 1 - use serde::{Deserialize, Deserializer}; 2 1 use lexica::{Blob, StrongRef}; 2 + use serde::{Deserialize, Deserializer}; 3 3 4 4 // see https://deer.social/profile/did:plc:63y3oh7iakdueqhlj6trojbq/post/3ltuv4skhqs2h 5 5 pub fn safe_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> { ··· 12 12 blob.map(|blob| blob.cid.to_string()) 13 13 } 14 14 15 - pub fn strongref_to_parts( 16 - strongref: Option<&StrongRef>, 17 - ) -> (Option<String>, Option<String>) { 15 + pub fn strongref_to_parts(strongref: Option<&StrongRef>) -> (Option<String>, Option<String>) { 18 16 strongref 19 17 .map(|sr| (sr.uri.clone(), sr.cid.to_string())) 20 18 .unzip()
+1 -3
lexica/src/app_bsky/feed.rs
··· 219 219 #[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")] 220 220 Pin {}, 221 221 #[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")] 222 - Repost { 223 - repost: String, 224 - }, 222 + Repost { repost: String }, 225 223 }
+1 -1
lexica/src/community_lexicon/bookmarks.rs
··· 11 11 #[serde(skip_serializing_if = "Vec::is_empty")] 12 12 pub tags: Vec<String>, 13 13 pub created_at: DateTime<Utc>, 14 - } 14 + }
+1 -1
lexica/src/community_lexicon/mod.rs
··· 1 - pub mod bookmarks; 1 + pub mod bookmarks;
+2 -2
lexica/src/utils.rs
··· 27 27 LinkRef { 28 28 link: inp.to_string(), 29 29 } 30 - .serialize(serializer) 31 - } 30 + .serialize(serializer) 31 + }
+4 -2
parakeet-db/src/models.rs
··· 292 292 pub indexed_at: NaiveDateTime, 293 293 } 294 294 295 - #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable, Associations)] 295 + #[derive( 296 + Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable, Associations, 297 + )] 296 298 #[diesel(table_name = crate::schema::labeler_defs)] 297 299 #[diesel(belongs_to(LabelerService, foreign_key = labeler))] 298 300 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 408 410 pub subject_cid: Option<String>, 409 411 pub subject_type: &'a str, 410 412 pub tags: Vec<String>, 411 - } 413 + }
+1 -1
parakeet-db/src/types.rs
··· 85 85 use std::io::Write; 86 86 let val = self.to_string(); 87 87 88 - out.write(val.as_bytes())?; 88 + out.write_all(val.as_bytes())?; 89 89 Ok(diesel::serialize::IsNull::No) 90 90 } 91 91 }
+8 -7
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 8 8 use diesel::prelude::*; 9 9 use diesel_async::RunQueryDsl; 10 10 use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use lexica::StrongRef; 11 12 use parakeet_db::{models, schema}; 12 13 use serde::{Deserialize, Serialize}; 13 - use lexica::StrongRef; 14 14 15 15 const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"]; 16 16 ··· 125 125 // otherwise just ditch. we should have one. 126 126 let cid = bookmark.subject_cid.or(maybe_cid)?; 127 127 128 - let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or( 129 - BookmarkViewItem::NotFound { 130 - uri: bookmark.subject.clone(), 131 - not_found: true, 132 - }, 133 - ); 128 + let item = 129 + maybe_item 130 + .map(BookmarkViewItem::Post) 131 + .unwrap_or(BookmarkViewItem::NotFound { 132 + uri: bookmark.subject.clone(), 133 + not_found: true, 134 + }); 134 135 135 136 let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 136 137
+1 -1
parakeet/src/xrpc/app_bsky/graph/relations.rs
··· 50 50 .map(|(last, _)| last.timestamp_millis().to_string()); 51 51 52 52 let dids = results.iter().map(|(_, did)| did.clone()).collect(); 53 - 53 + 54 54 let profiles = hyd.hydrate_profiles(dids).await; 55 55 let blocks = profiles.into_values().collect::<Vec<_>>(); 56 56
+1 -1
parakeet/src/xrpc/app_bsky/mod.rs
··· 68 68 69 69 async fn not_implemented() -> axum::http::StatusCode { 70 70 axum::http::StatusCode::NOT_IMPLEMENTED 71 - } 71 + }