Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat!: break at_uri into did/rkey for likes,reposts,follows,blocks

Mia 1a188ea3 ed4aedf1

+102 -67
+13 -6
consumer/src/backfill/mod.rs
··· 282 282 let items: Vec<BackfillItem> = serde_json::from_value(row.data).unwrap(); 283 283 284 284 for item in items { 285 + let Some((_, rkey)) = item.at_uri.rsplit_once("/") else { 286 + return Ok(()); 287 + }; 288 + 285 289 match item.inner { 286 290 BackfillItemInner::Create(record) | BackfillItemInner::Update(record) => { 287 - let Some((_, rkey)) = item.at_uri.rsplit_once("/") else { 288 - return Ok(()); 289 - }; 290 - 291 291 let Some(cid) = item.cid else { 292 292 continue; 293 293 }; ··· 295 295 indexer::index_op(conn, deltas, repo, cid, record, &item.at_uri, rkey).await? 296 296 } 297 297 BackfillItemInner::Delete => { 298 - indexer::index_op_delete(conn, deltas, repo, item.collection, &item.at_uri) 299 - .await? 298 + indexer::index_op_delete( 299 + conn, 300 + deltas, 301 + repo, 302 + item.collection, 303 + &item.at_uri, 304 + rkey, 305 + ) 306 + .await? 300 307 } 301 308 } 302 309 }
+8 -4
consumer/src/backfill/repo.rs
··· 133 133 copies.push_record(&at_uri, cid); 134 134 copies 135 135 .likes 136 - .push((at_uri, rec.subject, rec.via, rec.created_at)); 136 + .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 137 137 } 138 138 RecordTypes::AppBskyFeedPost(rec) => { 139 139 let maybe_reply = rec.reply.as_ref().map(|v| v.parent.uri.clone()); ··· 171 171 copies.push_record(&at_uri, cid); 172 172 copies 173 173 .reposts 174 - .push((at_uri, rec.subject, rec.via, rec.created_at)); 174 + .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 175 175 } 176 176 RecordTypes::AppBskyGraphBlock(rec) => { 177 177 copies.push_record(&at_uri, cid); 178 - copies.blocks.push((at_uri, rec.subject, rec.created_at)); 178 + copies 179 + .blocks 180 + .push((rkey.to_string(), rec.subject, rec.created_at)); 179 181 } 180 182 RecordTypes::AppBskyGraphFollow(rec) => { 181 183 deltas.incr(did, AggregateType::Follow).await; 182 184 deltas.incr(&rec.subject, AggregateType::Follower).await; 183 185 184 186 copies.push_record(&at_uri, cid); 185 - copies.follows.push((at_uri, rec.subject, rec.created_at)); 187 + copies 188 + .follows 189 + .push((rkey.to_string(), rec.subject, rec.created_at)); 186 190 } 187 191 RecordTypes::AppBskyGraphListItem(rec) => { 188 192 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>();
+4 -4
consumer/src/db/copy.rs
··· 46 46 47 47 let writer = conn 48 48 .copy_in( 49 - "COPY likes_tmp (at_uri, did, subject, subject_cid, via_uri, via_cid, created_at) FROM STDIN (FORMAT binary)", 49 + "COPY likes_tmp (rkey, did, subject, subject_cid, via_uri, via_cid, created_at) FROM STDIN (FORMAT binary)", 50 50 ) 51 51 .await?; 52 52 let writer = BinaryCopyInWriter::new(writer, STRONGREF_TYPES); ··· 93 93 94 94 let writer = conn 95 95 .copy_in( 96 - "COPY reposts_tmp (at_uri, did, post, post_cid, via_uri, via_cid, created_at) FROM STDIN (FORMAT binary)", 96 + "COPY reposts_tmp (rkey, did, post, post_cid, via_uri, via_cid, created_at) FROM STDIN (FORMAT binary)", 97 97 ) 98 98 .await?; 99 99 let writer = BinaryCopyInWriter::new(writer, STRONGREF_TYPES); ··· 213 213 .await?; 214 214 215 215 let writer = conn 216 - .copy_in("COPY blocks_tmp (at_uri, did, subject, created_at) FROM STDIN (FORMAT binary)") 216 + .copy_in("COPY blocks_tmp (rkey, did, subject, created_at) FROM STDIN (FORMAT binary)") 217 217 .await?; 218 218 let writer = BinaryCopyInWriter::new(writer, SUBJECT_TYPES); 219 219 ··· 292 292 .await?; 293 293 294 294 let writer = conn 295 - .copy_in("COPY follows_tmp (at_uri, did, subject, created_at) FROM STDIN (FORMAT binary)") 295 + .copy_in("COPY follows_tmp (rkey, did, subject, created_at) FROM STDIN (FORMAT binary)") 296 296 .await?; 297 297 let writer = BinaryCopyInWriter::new(writer, SUBJECT_TYPES); 298 298
+39 -24
consumer/src/db/record.rs
··· 24 24 25 25 pub async fn block_insert<C: GenericClient>( 26 26 conn: &mut C, 27 - at_uri: &str, 27 + rkey: &str, 28 28 repo: &str, 29 29 rec: AppBskyGraphBlock, 30 30 ) -> PgExecResult { 31 31 conn.execute( 32 - "INSERT INTO blocks (at_uri, did, subject, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", 33 - &[&at_uri, &repo, &rec.subject, &rec.created_at], 32 + "INSERT INTO blocks (rkey, did, subject, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", 33 + &[&rkey, &repo, &rec.subject, &rec.created_at], 34 34 ).await 35 35 } 36 36 37 - pub async fn block_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 38 - conn.execute("DELETE FROM blocks WHERE at_uri=$1", &[&at_uri]) 39 - .await 37 + pub async fn block_delete<C: GenericClient>(conn: &mut C, rkey: &str, repo: &str) -> PgExecResult { 38 + conn.execute( 39 + "DELETE FROM blocks WHERE rkey=$1 AND did=$2", 40 + &[&rkey, &repo], 41 + ) 42 + .await 40 43 } 41 44 42 45 pub async fn chat_decl_upsert<C: GenericClient>( ··· 94 97 95 98 pub async fn follow_insert<C: GenericClient>( 96 99 conn: &mut C, 97 - at_uri: &str, 100 + rkey: &str, 98 101 repo: &str, 99 102 rec: AppBskyGraphFollow, 100 103 ) -> PgExecResult { 101 104 conn.execute( 102 - "INSERT INTO follows (at_uri, did, subject, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", 103 - &[&at_uri, &repo, &rec.subject, &rec.created_at], 105 + "INSERT INTO follows (rkey, did, subject, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", 106 + &[&rkey, &repo, &rec.subject, &rec.created_at], 104 107 ).await 105 108 } 106 109 107 - pub async fn follow_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgOptResult<String> { 110 + pub async fn follow_delete<C: GenericClient>( 111 + conn: &mut C, 112 + rkey: &str, 113 + repo: &str, 114 + ) -> PgOptResult<String> { 108 115 let res = conn 109 116 .query_opt( 110 - "DELETE FROM follows WHERE at_uri=$1 RETURNING subject", 111 - &[&at_uri], 117 + "DELETE FROM follows WHERE rkey=$1 AND did=$2 RETURNING subject", 118 + &[&rkey, &repo], 112 119 ) 113 120 .await?; 114 121 ··· 153 160 154 161 pub async fn like_insert<C: GenericClient>( 155 162 conn: &mut C, 156 - at_uri: &str, 163 + rkey: &str, 157 164 repo: &str, 158 165 rec: AppBskyFeedLike, 159 166 ) -> PgExecResult { 160 167 let (via_uri, via_cid) = strongref_to_parts(rec.via.as_ref()); 161 168 162 169 conn.execute( 163 - "INSERT INTO likes (at_uri, did, subject, subject_cid, via_uri, via_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", 164 - &[&at_uri, &repo, &rec.subject.uri, &rec.subject.cid.to_string(), &via_uri, &via_cid, &rec.created_at] 170 + "INSERT INTO likes (rkey, did, subject, subject_cid, via_uri, via_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", 171 + &[&rkey, &repo, &rec.subject.uri, &rec.subject.cid.to_string(), &via_uri, &via_cid, &rec.created_at] 165 172 ).await 166 173 } 167 174 168 - pub async fn like_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgOptResult<String> { 175 + pub async fn like_delete<C: GenericClient>( 176 + conn: &mut C, 177 + rkey: &str, 178 + repo: &str, 179 + ) -> PgOptResult<String> { 169 180 let res = conn 170 181 .query_opt( 171 - "DELETE FROM likes WHERE at_uri=$1 RETURNING subject", 172 - &[&at_uri], 182 + "DELETE FROM likes WHERE rkey=$1 AND did=$2 RETURNING subject", 183 + &[&rkey, &repo], 173 184 ) 174 185 .await?; 175 186 ··· 523 534 524 535 pub async fn repost_insert<C: GenericClient>( 525 536 conn: &mut C, 526 - at_uri: &str, 537 + rkey: &str, 527 538 repo: &str, 528 539 rec: AppBskyFeedRepost, 529 540 ) -> PgExecResult { 530 541 let (via_uri, via_cid) = strongref_to_parts(rec.via.as_ref()); 531 542 532 543 conn.execute( 533 - "INSERT INTO reposts (at_uri, did, post, post_cid, via_uri, via_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", 544 + "INSERT INTO reposts (rkey, did, post, post_cid, via_uri, via_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", 534 545 &[ 535 - &at_uri, 546 + &rkey, 536 547 &repo, 537 548 &rec.subject.uri, 538 549 &rec.subject.cid.to_string(), ··· 544 555 .await 545 556 } 546 557 547 - pub async fn repost_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgOptResult<String> { 558 + pub async fn repost_delete<C: GenericClient>( 559 + conn: &mut C, 560 + rkey: &str, 561 + repo: &str, 562 + ) -> PgOptResult<String> { 548 563 let res = conn 549 564 .query_opt( 550 - "DELETE FROM reposts WHERE at_uri=$1 RETURNING post", 551 - &[&at_uri], 565 + "DELETE FROM reposts WHERE rkey=$1 AND repo=$2 RETURNING post", 566 + &[&rkey, &repo], 552 567 ) 553 568 .await?; 554 569
+10 -9
consumer/src/indexer/mod.rs
··· 477 477 478 478 index_op(conn, deltas, repo, cid, decoded, &full_path, rkey).await?; 479 479 } else if op.action == "delete" { 480 - index_op_delete(conn, deltas, repo, collection, &full_path).await?; 480 + index_op_delete(conn, deltas, repo, collection, &full_path, rkey).await?; 481 481 } else { 482 482 tracing::warn!("op contained invalid action {}", op.action); 483 483 } ··· 539 539 } 540 540 RecordTypes::AppBskyFeedLike(record) => { 541 541 let subject = record.subject.uri.clone(); 542 - let count = db::like_insert(conn, at_uri, repo, record).await?; 542 + let count = db::like_insert(conn, rkey, repo, record).await?; 543 543 544 544 deltas 545 545 .add_delta(&subject, AggregateType::Like, count as i32) ··· 608 608 deltas 609 609 .incr(&record.subject.uri, AggregateType::Repost) 610 610 .await; 611 - db::repost_insert(conn, at_uri, repo, record).await?; 611 + db::repost_insert(conn, rkey, repo, record).await?; 612 612 } 613 613 RecordTypes::AppBskyFeedThreadgate(record) => { 614 614 let split_aturi = record.post.rsplitn(4, '/').collect::<Vec<_>>(); ··· 620 620 db::threadgate_upsert(conn, at_uri, cid, record).await?; 621 621 } 622 622 RecordTypes::AppBskyGraphBlock(record) => { 623 - db::block_insert(conn, at_uri, repo, record).await?; 623 + db::block_insert(conn, rkey, repo, record).await?; 624 624 } 625 625 RecordTypes::AppBskyGraphFollow(record) => { 626 626 let subject = record.subject.clone(); 627 - let count = db::follow_insert(conn, at_uri, repo, record).await?; 627 + let count = db::follow_insert(conn, rkey, repo, record).await?; 628 628 629 629 deltas 630 630 .add_delta(repo, AggregateType::Follow, count as i32) ··· 696 696 repo: &str, 697 697 collection: CollectionType, 698 698 at_uri: &str, 699 + rkey: &str, 699 700 ) -> Result<(), tokio_postgres::Error> { 700 701 match collection { 701 702 CollectionType::BskyProfile => db::profile_delete(conn, repo).await?, 702 703 CollectionType::BskyStatus => db::status_delete(conn, repo).await?, 703 - CollectionType::BskyBlock => db::block_delete(conn, at_uri).await?, 704 + CollectionType::BskyBlock => db::block_delete(conn, rkey, repo).await?, 704 705 CollectionType::BskyFeedGen => { 705 706 let count = db::feedgen_delete(conn, at_uri).await?; 706 707 deltas ··· 709 710 count 710 711 } 711 712 CollectionType::BskyFeedLike => { 712 - if let Some(subject) = db::like_delete(conn, at_uri).await? { 713 + if let Some(subject) = db::like_delete(conn, rkey, repo).await? { 713 714 deltas.decr(&subject, AggregateType::Like).await; 714 715 } 715 716 0 ··· 733 734 } 734 735 CollectionType::BskyFeedPostgate => db::postgate_delete(conn, at_uri).await?, 735 736 CollectionType::BskyFeedRepost => { 736 - if let Some(subject) = db::repost_delete(conn, at_uri).await? { 737 + if let Some(subject) = db::repost_delete(conn, rkey, repo).await? { 737 738 deltas.decr(&subject, AggregateType::Repost).await; 738 739 } 739 740 0 740 741 } 741 742 CollectionType::BskyFeedThreadgate => db::threadgate_delete(conn, at_uri).await?, 742 743 CollectionType::BskyFollow => { 743 - if let Some(followee) = db::follow_delete(conn, at_uri).await? { 744 + if let Some(followee) = db::follow_delete(conn, rkey, repo).await? { 744 745 deltas.decr(&followee, AggregateType::Follower).await; 745 746 deltas.decr(repo, AggregateType::Follow).await; 746 747 }
+8 -4
migrations/2025-01-29-213341_follows_and_blocks/up.sql
··· 1 1 create table blocks 2 2 ( 3 - at_uri text primary key, 3 + rkey text not null, 4 4 did text not null references actors (did), 5 5 subject text not null, 6 - created_at timestamptz not null 6 + created_at timestamptz not null, 7 + 8 + primary key (did, rkey) 7 9 ); 8 10 9 11 create index blocks_did_index on blocks (did); ··· 11 13 12 14 create table follows 13 15 ( 14 - at_uri text primary key, 16 + rkey text not null, 15 17 did text not null references actors (did), 16 18 subject text not null, 17 - created_at timestamptz not null 19 + created_at timestamptz not null, 20 + 21 + primary key (did, rkey) 18 22 ); 19 23 20 24 create index follow_did_index on follows (did);
+8 -4
migrations/2025-04-05-114428_likes_and_reposts/up.sql
··· 1 1 create table likes 2 2 ( 3 - at_uri text primary key, 3 + rkey text not null, 4 4 did text not null references actors (did), 5 5 subject text not null, 6 6 subject_cid text not null, 7 7 created_at timestamptz not null, 8 - indexed_at timestamp not null default now() 8 + indexed_at timestamp not null default now(), 9 + 10 + primary key (did, rkey) 9 11 ); 10 12 11 13 create index likes_did_index on likes (did); ··· 13 15 14 16 create table reposts 15 17 ( 16 - at_uri text primary key, 18 + rkey text not null, 17 19 did text not null references actors (did), 18 20 post text not null, 19 21 post_cid text not null, 20 22 created_at timestamptz not null, 21 - indexed_at timestamp not null default now() 23 + indexed_at timestamp not null default now(), 24 + 25 + primary key (did, rkey) 22 26 ); 23 27 24 28 create index reposts_did_index on reposts (did);
+4 -4
parakeet-db/src/models.rs
··· 31 31 #[diesel(table_name = crate::schema::blocks)] 32 32 #[diesel(check_for_backend(diesel::pg::Pg))] 33 33 pub struct NewBlock<'a> { 34 - pub at_uri: &'a str, 34 + pub rkey: &'a str, 35 35 pub did: &'a str, 36 36 pub subject: &'a str, 37 37 pub created_at: NaiveDateTime, ··· 41 41 #[diesel(table_name = crate::schema::follows)] 42 42 #[diesel(check_for_backend(diesel::pg::Pg))] 43 43 pub struct NewFollow<'a> { 44 - pub at_uri: &'a str, 44 + pub rkey: &'a str, 45 45 pub did: &'a str, 46 46 pub subject: &'a str, 47 47 pub created_at: NaiveDateTime, ··· 519 519 #[diesel(table_name = crate::schema::likes)] 520 520 #[diesel(check_for_backend(diesel::pg::Pg))] 521 521 pub struct NewLike<'a> { 522 - pub at_uri: &'a str, 522 + pub rkey: &'a str, 523 523 pub did: &'a str, 524 524 pub subject: &'a str, 525 525 pub subject_cid: String, ··· 530 530 #[diesel(table_name = crate::schema::reposts)] 531 531 #[diesel(check_for_backend(diesel::pg::Pg))] 532 532 pub struct NewRepost<'a> { 533 - pub at_uri: &'a str, 533 + pub rkey: &'a str, 534 534 pub did: &'a str, 535 535 pub post: &'a str, 536 536 pub post_cid: String,
+8 -8
parakeet-db/src/schema.rs
··· 34 34 } 35 35 36 36 diesel::table! { 37 - blocks (at_uri) { 38 - at_uri -> Text, 37 + blocks (did, rkey) { 38 + rkey -> Text, 39 39 did -> Text, 40 40 subject -> Text, 41 41 created_at -> Timestamptz, ··· 68 68 } 69 69 70 70 diesel::table! { 71 - follows (at_uri) { 72 - at_uri -> Text, 71 + follows (did, rkey) { 72 + rkey -> Text, 73 73 did -> Text, 74 74 subject -> Text, 75 75 created_at -> Timestamptz, ··· 118 118 } 119 119 120 120 diesel::table! { 121 - likes (at_uri) { 122 - at_uri -> Text, 121 + likes (did, rkey) { 122 + rkey -> Text, 123 123 did -> Text, 124 124 subject -> Text, 125 125 subject_cid -> Text, ··· 278 278 } 279 279 280 280 diesel::table! { 281 - reposts (at_uri) { 282 - at_uri -> Text, 281 + reposts (did, rkey) { 282 + rkey -> Text, 283 283 did -> Text, 284 284 post -> Text, 285 285 post_cid -> Text,