Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
67
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: caching

Mia 39ae2fb9 3f6e355e

+650 -187
+3
Cargo.lock
··· 2770 2770 "multibase", 2771 2771 "parakeet-db", 2772 2772 "parakeet-index", 2773 + "redis", 2773 2774 "reqwest", 2774 2775 "serde", 2776 + "serde_ipld_dagcbor", 2775 2777 "serde_json", 2776 2778 "tokio", 2777 2779 "tower-http", ··· 2786 2788 "chrono", 2787 2789 "diesel", 2788 2790 "postgres-types", 2791 + "serde", 2789 2792 "serde_json", 2790 2793 ] 2791 2794
+12 -7
consumer/src/backfill/mod.rs
··· 97 97 98 98 let mut inner = self.inner.clone(); 99 99 let mut conn = self.pool.get().await?; 100 + let mut rc = self.redis.clone(); 100 101 101 102 tracker.spawn(async move { 102 103 let _p = p; 103 104 tracing::trace!("backfilling {job}"); 104 105 105 - if let Err(e) = backfill_actor(&mut conn, &mut inner, &job).await { 106 + if let Err(e) = backfill_actor(&mut conn, &mut rc, &mut inner, &job).await { 106 107 tracing::error!(did = &job, "backfill failed: {e}"); 107 108 counter!("backfill_failure").increment(1); 108 109 ··· 132 133 #[instrument(skip(conn, inner))] 133 134 async fn backfill_actor( 134 135 conn: &mut Object, 136 + rc: &mut MultiplexedConnection, 135 137 inner: &mut BackfillManagerInner, 136 138 did: &str, 137 139 ) -> eyre::Result<()> { ··· 140 142 141 143 tracing::trace!("loading repo"); 142 144 143 - let (commit, mut deltas, copies) = repo::insert_repo(&mut t, &inner.tmp_dir, did).await?; 145 + let (commit, mut deltas, copies) = repo::insert_repo(&mut t, rc, &inner.tmp_dir, did).await?; 144 146 145 147 db::actor_set_repo_state(&mut t, did, &commit.rev, commit.data).await?; 146 148 ··· 152 154 ) 153 155 .await?; 154 156 155 - handle_backfill_rows(&mut t, &mut deltas, did, &commit.rev).await?; 157 + handle_backfill_rows(&mut t, rc, &mut deltas, did, &commit.rev).await?; 156 158 157 159 tracing::trace!("insertion finished"); 158 160 ··· 192 194 193 195 async fn handle_backfill_rows( 194 196 conn: &mut Transaction<'_>, 197 + rc: &mut MultiplexedConnection, 195 198 deltas: &mut impl AggregateDeltaStore, 196 199 repo: &str, 197 200 rev: &str, 198 - ) -> Result<(), tokio_postgres::Error> { 201 + ) -> eyre::Result<()> { 199 202 // `pull_backfill_rows` filters out anything before the last commit we pulled 200 203 let backfill_rows = db::backfill_rows_get(conn, repo, rev).await?; 201 204 202 205 for row in backfill_rows { 203 206 // blindly unwrap-ing this CID as we've already parsed it and re-serialized it 204 - let repo_cid = Cid::from_str(&row.cid).unwrap(); 207 + let repo_cid = Cid::from_str(&row.cid)?; 205 208 db::actor_set_repo_state(conn, repo, &row.repo_ver, repo_cid).await?; 206 209 207 210 // again, we've serialized this. 208 - let items: Vec<BackfillItem> = serde_json::from_value(row.data).unwrap(); 211 + let items: Vec<BackfillItem> = serde_json::from_value(row.data)?; 209 212 210 213 for item in items { 211 214 let Some((_, rkey)) = item.at_uri.rsplit_once("/") else { ··· 218 221 continue; 219 222 }; 220 223 221 - indexer::index_op(conn, deltas, repo, cid, record, &item.at_uri, rkey).await? 224 + indexer::index_op(conn, rc, deltas, repo, cid, record, &item.at_uri, rkey) 225 + .await? 222 226 } 223 227 BackfillItemInner::Delete => { 224 228 indexer::index_op_delete( 225 229 conn, 230 + rc, 226 231 deltas, 227 232 repo, 228 233 item.collection,
+6 -3
consumer/src/backfill/repo.rs
··· 10 10 use iroh_car::CarReader; 11 11 use metrics::counter; 12 12 use parakeet_index::AggregateType; 13 + use redis::aio::MultiplexedConnection; 13 14 use std::collections::HashMap; 14 15 use std::path::Path; 15 16 use tokio::io::BufReader; ··· 18 19 19 20 pub async fn insert_repo( 20 21 t: &mut Transaction<'_>, 22 + rc: &mut MultiplexedConnection, 21 23 tmp_dir: &Path, 22 24 repo: &str, 23 25 ) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> { ··· 54 56 } 55 57 CarEntry::Record(record) => { 56 58 if let Some(path) = mst_nodes.remove(&cid) { 57 - record_index(t, &mut copies, &mut deltas, repo, &path, cid, record).await?; 59 + record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 58 60 } else { 59 61 records.insert(cid, record); 60 62 } ··· 84 86 85 87 for (cid, record) in records { 86 88 if let Some(path) = mst_nodes.remove(&cid) { 87 - record_index(t, &mut copies, &mut deltas, repo, &path, cid, record).await?; 89 + record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 88 90 } else { 89 91 tracing::warn!("couldn't find MST node for record {cid}") 90 92 } ··· 97 99 98 100 async fn record_index( 99 101 t: &mut Transaction<'_>, 102 + rc: &mut MultiplexedConnection, 100 103 copies: &mut CopyStore, 101 104 deltas: &mut BackfillDeltaStore, 102 105 did: &str, ··· 190 193 copies.push_record(&at_uri, cid); 191 194 copies.verifications.push((at_uri, cid, rec)); 192 195 } 193 - _ => indexer::index_op(t, deltas, did, cid, record, &at_uri, rkey).await?, 196 + _ => indexer::index_op(t, rc, deltas, did, cid, record, &at_uri, rkey).await?, 194 197 } 195 198 196 199 Ok(())
+59 -13
consumer/src/indexer/mod.rs
··· 394 394 db::actor_set_repo_state(&mut t, &commit.repo, &commit.rev, commit.commit).await?; 395 395 396 396 for op in &commit.ops { 397 - process_op(&mut t, &mut state.idxc_tx, &commit.repo, op, &blocks).await?; 397 + process_op(&mut t, rc, &mut state.idxc_tx, &commit.repo, op, &blocks).await?; 398 398 } 399 399 400 400 t.commit().await?; ··· 461 461 #[inline(always)] 462 462 async fn process_op( 463 463 conn: &mut Transaction<'_>, 464 + rc: &mut MultiplexedConnection, 464 465 deltas: &mut impl AggregateDeltaStore, 465 466 repo: &str, 466 467 op: &CommitOp, 467 468 blocks: &HashMap<Cid, Vec<u8>>, 468 - ) -> Result<(), tokio_postgres::Error> { 469 + ) -> eyre::Result<()> { 469 470 let Some((collection_raw, rkey)) = op.path.split_once("/") else { 470 471 tracing::warn!("op contained invalid path {}", op.path); 471 472 return Ok(()); ··· 490 491 return Ok(()); 491 492 }; 492 493 493 - index_op(conn, deltas, repo, cid, decoded, &full_path, rkey).await?; 494 + index_op(conn, rc, deltas, repo, cid, decoded, &full_path, rkey).await?; 494 495 } else if op.action == "delete" { 495 - index_op_delete(conn, deltas, repo, collection, &full_path, rkey).await?; 496 + index_op_delete(conn, rc, deltas, repo, collection, &full_path, rkey).await?; 496 497 } else { 497 498 tracing::warn!("op contained invalid action {}", op.action); 498 499 } ··· 517 518 518 519 pub async fn index_op( 519 520 conn: &mut Transaction<'_>, 521 + rc: &mut MultiplexedConnection, 520 522 deltas: &mut impl AggregateDeltaStore, 521 523 repo: &str, 522 524 cid: Cid, 523 525 record: RecordTypes, 524 526 at_uri: &str, 525 527 rkey: &str, 526 - ) -> Result<(), tokio_postgres::Error> { 528 + ) -> eyre::Result<()> { 527 529 match record { 528 530 RecordTypes::AppBskyActorProfile(record) => { 529 531 if rkey == "self" { ··· 533 535 if let Some(labels) = labels { 534 536 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 535 537 } 538 + 539 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 536 540 } 537 541 } 538 542 RecordTypes::AppBskyActorStatus(record) => { 539 543 if rkey == "self" { 540 544 db::status_upsert(conn, repo, record).await?; 545 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 541 546 } 542 547 } 543 548 RecordTypes::AppBskyFeedGenerator(record) => { ··· 550 555 551 556 if did_insert { 552 557 deltas.incr(repo, AggregateType::ProfileFeed).await; 558 + } else { 559 + redis::AsyncTypedCommands::del(rc, format!("feedgen#{at_uri}")).await?; 553 560 } 554 561 } 555 562 RecordTypes::AppBskyFeedLike(record) => { ··· 618 625 disable_effective, 619 626 ) 620 627 .await?; 628 + 629 + // TODO: should we purge embed#{at_uri} for everything in detached_embeding_uris? 630 + // maybe postgate_maintain_detaches should return a list of uris? 621 631 } 622 632 RecordTypes::AppBskyFeedRepost(record) => { 623 633 deltas ··· 633 643 } 634 644 635 645 db::threadgate_upsert(conn, at_uri, cid, record).await?; 646 + redis::AsyncTypedCommands::del(rc, format!("post#{at_uri}")).await?; 636 647 } 637 648 RecordTypes::AppBskyGraphBlock(record) => { 638 649 db::block_insert(conn, rkey, repo, record).await?; ··· 658 669 659 670 if did_insert { 660 671 deltas.incr(repo, AggregateType::ProfileList).await; 672 + } else { 673 + redis::AsyncTypedCommands::del(rc, format!("list#{at_uri}")).await?; 661 674 } 662 675 } 663 676 RecordTypes::AppBskyGraphListBlock(record) => { ··· 671 684 return Ok(()); 672 685 } 673 686 687 + redis::AsyncTypedCommands::del(rc, format!("list#{}", &record.list)).await?; 674 688 db::list_item_insert(conn, at_uri, record).await?; 675 689 } 676 690 RecordTypes::AppBskyGraphStarterPack(record) => { ··· 678 692 679 693 if did_insert { 680 694 deltas.incr(repo, AggregateType::ProfileStarterpack).await; 695 + } else { 696 + redis::AsyncTypedCommands::del(rc, format!("starterpacks#{at_uri}")).await?; 681 697 } 682 698 } 683 699 RecordTypes::AppBskyGraphVerification(record) => { ··· 691 707 if let Some(labels) = labels { 692 708 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 693 709 } 710 + 711 + redis::AsyncTypedCommands::del(rc, format!("labeler#{repo}")).await?; 694 712 } 695 713 } 696 714 RecordTypes::AppBskyNotificationDeclaration(record) => { 697 715 if rkey == "self" { 698 716 db::notif_decl_upsert(conn, repo, record).await?; 717 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 699 718 } 700 719 } 701 720 RecordTypes::ChatBskyActorDeclaration(record) => { 702 721 if rkey == "self" { 703 722 db::chat_decl_upsert(conn, repo, record).await?; 723 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 704 724 } 705 725 } 706 726 } ··· 712 732 713 733 pub async fn index_op_delete( 714 734 conn: &mut Transaction<'_>, 735 + rc: &mut MultiplexedConnection, 715 736 deltas: &mut impl AggregateDeltaStore, 716 737 repo: &str, 717 738 collection: CollectionType, 718 739 at_uri: &str, 719 740 rkey: &str, 720 - ) -> Result<(), tokio_postgres::Error> { 741 + ) -> eyre::Result<()> { 721 742 match collection { 722 - CollectionType::BskyProfile => db::profile_delete(conn, repo).await?, 723 - CollectionType::BskyStatus => db::status_delete(conn, repo).await?, 743 + CollectionType::BskyProfile => { 744 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 745 + db::profile_delete(conn, repo).await? 746 + } 747 + CollectionType::BskyStatus => { 748 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 749 + db::status_delete(conn, repo).await? 750 + } 724 751 CollectionType::BskyBlock => db::block_delete(conn, rkey, repo).await?, 725 752 CollectionType::BskyFeedGen => { 753 + redis::AsyncTypedCommands::del(rc, format!("feedgen#{at_uri}")).await?; 726 754 let count = db::feedgen_delete(conn, at_uri).await?; 727 755 deltas 728 756 .add_delta(repo, AggregateType::ProfileFeed, -(count as i32)) ··· 739 767 let post_info = db::post_get_info_for_delete(conn, at_uri).await?; 740 768 741 769 db::post_delete(conn, at_uri).await?; 770 + redis::AsyncTypedCommands::del(rc, format!("post#{at_uri}")).await?; 742 771 743 772 if let Some((reply_to, embed)) = post_info { 744 773 deltas.decr(repo, AggregateType::ProfilePost).await; ··· 759 788 } 760 789 0 761 790 } 762 - CollectionType::BskyFeedThreadgate => db::threadgate_delete(conn, at_uri).await?, 791 + CollectionType::BskyFeedThreadgate => { 792 + redis::AsyncTypedCommands::del(rc, format!("post#{at_uri}")).await?; 793 + db::threadgate_delete(conn, at_uri).await? 794 + } 763 795 CollectionType::BskyFollow => { 764 796 if let Some(followee) = db::follow_delete(conn, rkey, repo).await? { 765 797 deltas.decr(&followee, AggregateType::Follower).await; ··· 768 800 0 769 801 } 770 802 CollectionType::BskyList => { 803 + redis::AsyncTypedCommands::del(rc, format!("list#{at_uri}")).await?; 771 804 let count = db::list_delete(conn, at_uri).await?; 772 805 deltas 773 806 .add_delta(repo, AggregateType::ProfileList, -(count as i32)) ··· 775 808 count 776 809 } 777 810 CollectionType::BskyListBlock => db::list_block_delete(conn, at_uri).await?, 778 - CollectionType::BskyListItem => db::list_item_delete(conn, at_uri).await?, 811 + CollectionType::BskyListItem => { 812 + redis::AsyncTypedCommands::del(rc, format!("list#{at_uri}")).await?; 813 + db::list_item_delete(conn, at_uri).await? 814 + } 779 815 CollectionType::BskyStarterPack => { 816 + redis::AsyncTypedCommands::del(rc, format!("starterpacks#{at_uri}")).await?; 780 817 let count = db::starter_pack_delete(conn, at_uri).await?; 781 818 deltas 782 819 .add_delta(repo, AggregateType::ProfileStarterpack, -(count as i32)) ··· 784 821 count 785 822 } 786 823 CollectionType::BskyVerification => db::verification_delete(conn, at_uri).await?, 787 - CollectionType::BskyLabelerService => db::labeler_delete(conn, at_uri).await?, 788 - CollectionType::BskyNotificationDeclaration => db::notif_decl_delete(conn, repo).await?, 789 - CollectionType::ChatActorDecl => db::chat_decl_delete(conn, repo).await?, 824 + CollectionType::BskyLabelerService => { 825 + redis::AsyncTypedCommands::del(rc, format!("labeler#{repo}")).await?; 826 + db::labeler_delete(conn, at_uri).await? 827 + } 828 + CollectionType::BskyNotificationDeclaration => { 829 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 830 + db::notif_decl_delete(conn, repo).await? 831 + } 832 + CollectionType::ChatActorDecl => { 833 + redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 + db::chat_decl_delete(conn, repo).await? 835 + } 790 836 _ => unreachable!(), 791 837 }; 792 838
+200
dataloader-rs/src/async_cached.rs
··· 1 + use crate::runtime::{Arc, Mutex}; 2 + use crate::{yield_fn, BatchFn, WaitForWorkFn}; 3 + use std::collections::{HashMap, HashSet}; 4 + use std::hash::Hash; 5 + use std::iter::IntoIterator; 6 + 7 + pub trait AsyncCache { 8 + type Key; 9 + type Val; 10 + async fn get(&mut self, key: &Self::Key) -> Option<Self::Val>; 11 + async fn insert(&mut self, key: Self::Key, val: Self::Val); 12 + async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val>; 13 + async fn clear(&mut self); 14 + } 15 + 16 + struct State<K, V, C> 17 + where 18 + C: AsyncCache<Key = K, Val = V>, 19 + { 20 + completed: C, 21 + pending: HashSet<K>, 22 + } 23 + 24 + impl<K: Eq + Hash, V, C> State<K, V, C> 25 + where 26 + C: AsyncCache<Key = K, Val = V>, 27 + { 28 + fn with_cache(cache: C) -> Self { 29 + State { 30 + completed: cache, 31 + pending: HashSet::new(), 32 + } 33 + } 34 + } 35 + 36 + #[derive(Clone)] 37 + pub struct Loader<K, V, F, C> 38 + where 39 + K: Eq + Hash + Clone, 40 + V: Clone, 41 + F: BatchFn<K, V>, 42 + C: AsyncCache<Key = K, Val = V>, 43 + { 44 + state: Arc<Mutex<State<K, V, C>>>, 45 + load_fn: Arc<Mutex<F>>, 46 + wait_for_work_fn: Arc<dyn WaitForWorkFn>, 47 + max_batch_size: usize, 48 + } 49 + 50 + impl<K, V, F, C> Loader<K, V, F, C> 51 + where 52 + K: Eq + Hash + Clone, 53 + V: Clone, 54 + F: BatchFn<K, V>, 55 + C: AsyncCache<Key = K, Val = V>, 56 + { 57 + pub fn new(load_fn: F, cache: C) -> Self { 58 + Loader { 59 + state: Arc::new(Mutex::new(State::with_cache(cache))), 60 + load_fn: Arc::new(Mutex::new(load_fn)), 61 + max_batch_size: 200, 62 + wait_for_work_fn: Arc::new(yield_fn(10)), 63 + } 64 + } 65 + 66 + pub fn with_max_batch_size(mut self, max_batch_size: usize) -> Self { 67 + self.max_batch_size = max_batch_size; 68 + self 69 + } 70 + 71 + pub fn with_yield_count(mut self, yield_count: usize) -> Self { 72 + self.wait_for_work_fn = Arc::new(yield_fn(yield_count)); 73 + self 74 + } 75 + 76 + /// Replaces the yielding for work behavior with an arbitrary future. Rather than yielding 77 + /// the runtime repeatedly this will generate and `.await` a future of your choice. 78 + /// ***This is incompatible with*** [`Self::with_yield_count()`]. 79 + pub fn with_custom_wait_for_work(mut self, wait_for_work_fn: impl WaitForWorkFn) -> Self { 80 + self.wait_for_work_fn = Arc::new(wait_for_work_fn); 81 + self 82 + } 83 + 84 + pub fn max_batch_size(&self) -> usize { 85 + self.max_batch_size 86 + } 87 + 88 + pub async fn load(&self, key: K) -> Option<V> { 89 + let mut state = self.state.lock().await; 90 + if let Some(v) = state.completed.get(&key).await { 91 + return Some(v.clone()); 92 + } 93 + 94 + if !state.pending.contains(&key) { 95 + state.pending.insert(key.clone()); 96 + if state.pending.len() >= self.max_batch_size { 97 + let keys = state.pending.drain().collect::<Vec<K>>(); 98 + let mut load_fn = self.load_fn.lock().await; 99 + let load_ret = load_fn.load(keys.as_ref()).await; 100 + drop(load_fn); 101 + for (k, v) in load_ret.into_iter() { 102 + state.completed.insert(k, v).await; 103 + } 104 + return state.completed.get(&key).await.clone(); 105 + } 106 + } 107 + drop(state); 108 + 109 + (self.wait_for_work_fn)().await; 110 + 111 + let mut state = self.state.lock().await; 112 + if let Some(v) = state.completed.get(&key).await { 113 + return Some(v.clone()); 114 + } 115 + 116 + if !state.pending.is_empty() { 117 + let keys = state.pending.drain().collect::<Vec<K>>(); 118 + let mut load_fn = self.load_fn.lock().await; 119 + let load_ret = load_fn.load(keys.as_ref()).await; 120 + drop(load_fn); 121 + for (k, v) in load_ret.into_iter() { 122 + state.completed.insert(k, v).await; 123 + } 124 + } 125 + 126 + state.completed.get(&key).await.clone() 127 + } 128 + 129 + pub async fn load_many(&self, keys: Vec<K>) -> HashMap<K, V> { 130 + let mut state = self.state.lock().await; 131 + let mut ret = HashMap::new(); 132 + let mut rest = Vec::new(); 133 + for key in keys.into_iter() { 134 + if let Some(v) = state.completed.get(&key).await.clone() { 135 + ret.insert(key, v); 136 + continue; 137 + } 138 + if !state.pending.contains(&key) { 139 + state.pending.insert(key.clone()); 140 + 141 + if state.pending.len() >= self.max_batch_size { 142 + let keys = state.pending.drain().collect::<Vec<K>>(); 143 + let mut load_fn = self.load_fn.lock().await; 144 + let load_ret = load_fn.load(keys.as_ref()).await; 145 + drop(load_fn); 146 + for (k, v) in load_ret.into_iter() { 147 + state.completed.insert(k, v).await; 148 + } 149 + } 150 + } 151 + rest.push(key); 152 + } 153 + drop(state); 154 + 155 + (self.wait_for_work_fn)().await; 156 + 157 + if !rest.is_empty() { 158 + let mut state = self.state.lock().await; 159 + if !state.pending.is_empty() { 160 + let keys = state.pending.drain().collect::<Vec<K>>(); 161 + let mut load_fn = self.load_fn.lock().await; 162 + let load_ret = load_fn.load(keys.as_ref()).await; 163 + drop(load_fn); 164 + for (k, v) in load_ret.into_iter() { 165 + state.completed.insert(k, v).await; 166 + } 167 + } 168 + 169 + for key in rest.into_iter() { 170 + if let Some(v) = state.completed.get(&key).await.clone() { 171 + ret.insert(key, v); 172 + } 173 + } 174 + } 175 + 176 + ret 177 + } 178 + 179 + pub async fn prime(&self, key: K, val: V) { 180 + let mut state = self.state.lock().await; 181 + state.completed.insert(key, val).await; 182 + } 183 + 184 + pub async fn prime_many(&self, values: impl IntoIterator<Item = (K, V)>) { 185 + let mut state = self.state.lock().await; 186 + for (k, v) in values.into_iter() { 187 + state.completed.insert(k, v).await; 188 + } 189 + } 190 + 191 + pub async fn clear(&self, key: K) { 192 + let mut state = self.state.lock().await; 193 + state.completed.remove(&key).await; 194 + } 195 + 196 + pub async fn clear_all(&self) { 197 + let mut state = self.state.lock().await; 198 + state.completed.clear().await 199 + } 200 + }
+1
dataloader-rs/src/lib.rs
··· 1 1 #![allow(async_fn_in_trait)] 2 2 3 + pub mod async_cached; 3 4 mod batch_fn; 4 5 pub mod cached; 5 6 pub mod non_cached;
+1
parakeet-db/Cargo.toml
··· 7 7 chrono = { version = "0.4.39", features = ["serde"] } 8 8 diesel = { version = "2.2.6", features = ["chrono", "serde_json"], optional = true } 9 9 postgres-types = { version = "0.2.9", optional = true } 10 + serde = { version = "1.0.217", features = ["derive"] } 10 11 serde_json = "1.0.134" 11 12 12 13 [features]
+16 -15
parakeet-db/src/models.rs
··· 1 1 use crate::types::*; 2 2 use chrono::prelude::*; 3 3 use diesel::prelude::*; 4 + use serde::{Deserialize, Serialize}; 4 5 5 6 #[derive(Debug, Queryable, Selectable, Identifiable)] 6 7 #[diesel(table_name = crate::schema::actors)] ··· 16 17 pub last_indexed: Option<NaiveDateTime>, 17 18 } 18 19 19 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 20 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 20 21 #[diesel(table_name = crate::schema::profiles)] 21 22 #[diesel(primary_key(did))] 22 23 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 40 41 pub indexed_at: NaiveDateTime, 41 42 } 42 43 43 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 44 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 44 45 #[diesel(table_name = crate::schema::lists)] 45 46 #[diesel(primary_key(at_uri))] 46 47 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 85 86 pub indexed_at: NaiveDateTime, 86 87 } 87 88 88 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 89 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 89 90 #[diesel(table_name = crate::schema::feedgens)] 90 91 #[diesel(primary_key(at_uri))] 91 92 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 121 122 pub indexed_at: NaiveDateTime, 122 123 } 123 124 124 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 125 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 125 126 #[diesel(table_name = crate::schema::posts)] 126 127 #[diesel(primary_key(at_uri))] 127 128 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 148 149 pub indexed_at: NaiveDateTime, 149 150 } 150 151 151 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 152 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 152 153 #[diesel(table_name = crate::schema::post_embed_images)] 153 154 #[diesel(primary_key(post_uri, seq))] 154 155 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 164 165 pub height: Option<i32>, 165 166 } 166 167 167 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 168 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 168 169 #[diesel(table_name = crate::schema::post_embed_video)] 169 170 #[diesel(primary_key(post_uri))] 170 171 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 179 180 pub height: Option<i32>, 180 181 } 181 182 182 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 183 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 183 184 #[diesel(table_name = crate::schema::post_embed_video_captions)] 184 185 #[diesel(primary_key(post_uri, language))] 185 186 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 191 192 pub cid: String, 192 193 } 193 194 194 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 195 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 195 196 #[diesel(table_name = crate::schema::post_embed_ext)] 196 197 #[diesel(primary_key(post_uri))] 197 198 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 206 207 pub thumb_cid: Option<String>, 207 208 } 208 209 209 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 210 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 210 211 #[diesel(table_name = crate::schema::post_embed_record)] 211 212 #[diesel(primary_key(post_uri))] 212 213 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 236 237 pub indexed_at: NaiveDateTime, 237 238 } 238 239 239 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 240 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 240 241 #[diesel(table_name = crate::schema::threadgates)] 241 242 #[diesel(primary_key(post_uri))] 242 243 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 255 256 pub indexed_at: NaiveDateTime, 256 257 } 257 258 258 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 259 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 259 260 #[diesel(table_name = crate::schema::starterpacks)] 260 261 #[diesel(primary_key(at_uri))] 261 262 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 275 276 pub indexed_at: NaiveDateTime, 276 277 } 277 278 278 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 279 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 279 280 #[diesel(table_name = crate::schema::labelers)] 280 281 #[diesel(primary_key(did))] 281 282 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 291 292 pub indexed_at: NaiveDateTime, 292 293 } 293 294 294 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable, Associations)] 295 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable, Associations)] 295 296 #[diesel(table_name = crate::schema::labeler_defs)] 296 297 #[diesel(belongs_to(LabelerService, foreign_key = labeler))] 297 298 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 329 330 pub indexed_at: NaiveDateTime, 330 331 } 331 332 332 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 333 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 333 334 #[diesel(table_name = crate::schema::verification)] 334 335 #[diesel(primary_key(at_uri))] 335 336 #[diesel(check_for_backend(diesel::pg::Pg))] ··· 346 347 pub indexed_at: NaiveDateTime, 347 348 } 348 349 349 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 350 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 350 351 #[diesel(table_name = crate::schema::statuses)] 351 352 #[diesel(primary_key(did))] 352 353 #[diesel(check_for_backend(diesel::pg::Pg))]
+2
parakeet/Cargo.toml
··· 23 23 multibase = "0.9.1" 24 24 parakeet-db = { path = "../parakeet-db" } 25 25 parakeet-index = { path = "../parakeet-index" } 26 + redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 26 27 reqwest = { version = "0.12", features = ["json"] } 27 28 serde = { version = "1.0.217", features = ["derive"] } 29 + serde_ipld_dagcbor = "0.6.1" 28 30 serde_json = "1.0.134" 29 31 tokio = { version = "1.42.0", features = ["full"] } 30 32 tower-http = { version = "0.6.2", features = ["cors", "trace"] }
+150
parakeet/src/cache.rs
··· 1 + use dataloader::async_cached::AsyncCache; 2 + use redis::aio::MultiplexedConnection; 3 + use redis::AsyncTypedCommands; 4 + use serde::{Deserialize, Serialize}; 5 + use std::marker::PhantomData; 6 + 7 + /// General Loader Cache 8 + pub struct LoaderCache<V> { 9 + conn: MultiplexedConnection, 10 + exp: Option<u64>, 11 + _phantom: PhantomData<V>, 12 + } 13 + 14 + impl<V> LoaderCache<V> { 15 + pub fn new(conn: &MultiplexedConnection, exp: Option<u64>) -> Self { 16 + LoaderCache::<V> { 17 + conn: conn.clone(), 18 + exp, 19 + _phantom: PhantomData, 20 + } 21 + } 22 + } 23 + 24 + impl<V> AsyncCache for LoaderCache<V> 25 + where 26 + V: for<'a> Deserialize<'a> + Serialize, 27 + { 28 + type Key = String; 29 + type Val = V; 30 + 31 + async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 33 + 34 + match serde_ipld_dagcbor::from_slice(&res?) { 35 + Ok(v) => Some(v), 36 + Err(err) => { 37 + tracing::error!(key, "failed to decode cache value: {err}"); 38 + None 39 + } 40 + } 41 + } 42 + 43 + async fn insert(&mut self, key: Self::Key, val: Self::Val) { 44 + let data = match serde_ipld_dagcbor::to_vec(&val) { 45 + Ok(data) => data, 46 + Err(err) => { 47 + tracing::error!(key, "failed to encode cache value: {err}"); 48 + return; 49 + } 50 + }; 51 + 52 + if let Some(exp) = self.exp { 53 + self.conn.set_ex(key, data, exp).await.unwrap(); 54 + } else { 55 + self.conn.set(key, data).await.unwrap(); 56 + } 57 + } 58 + 59 + async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 61 + .await 62 + .ok()?; 63 + 64 + match serde_ipld_dagcbor::from_slice(&res?) { 65 + Ok(v) => Some(v), 66 + Err(err) => { 67 + tracing::error!(key, "failed to decode cache value: {err}"); 68 + None 69 + } 70 + } 71 + } 72 + 73 + async fn clear(&mut self) {} 74 + } 75 + 76 + /// A Loader Cache in with a key prefix 77 + pub struct PrefixedLoaderCache<V> { 78 + conn: MultiplexedConnection, 79 + prefix: String, 80 + exp: Option<u64>, 81 + _phantom: PhantomData<V>, 82 + } 83 + 84 + impl<V> PrefixedLoaderCache<V> { 85 + pub fn new(conn: &MultiplexedConnection, prefix: String, exp: Option<u64>) -> Self { 86 + PrefixedLoaderCache { 87 + conn: conn.clone(), 88 + prefix, 89 + exp, 90 + _phantom: PhantomData, 91 + } 92 + } 93 + } 94 + 95 + impl<V> AsyncCache for PrefixedLoaderCache<V> 96 + where 97 + V: for<'a> Deserialize<'a> + Serialize, 98 + { 99 + type Key = String; 100 + type Val = V; 101 + 102 + async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 103 + let key = format!("{}#{}", self.prefix, key); 104 + 105 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 106 + 107 + match serde_ipld_dagcbor::from_slice(&res?) { 108 + Ok(v) => Some(v), 109 + Err(err) => { 110 + tracing::error!(key, "failed to decode cache value: {err}"); 111 + None 112 + } 113 + } 114 + } 115 + 116 + async fn insert(&mut self, key: Self::Key, val: Self::Val) { 117 + let key = format!("{}#{}", self.prefix, key); 118 + let data = match serde_ipld_dagcbor::to_vec(&val) { 119 + Ok(data) => data, 120 + Err(err) => { 121 + tracing::error!(key = &key, "failed to encode cache value: {err}"); 122 + return; 123 + } 124 + }; 125 + 126 + if let Some(exp) = self.exp { 127 + self.conn.set_ex(key, data, exp).await.unwrap(); 128 + } else { 129 + self.conn.set(key, data).await.unwrap(); 130 + } 131 + } 132 + 133 + async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 134 + let key = format!("{}#{}", self.prefix, key); 135 + 136 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 137 + .await 138 + .ok()?; 139 + 140 + match serde_ipld_dagcbor::from_slice(&res?) { 141 + Ok(v) => Some(v), 142 + Err(err) => { 143 + tracing::error!(key, "failed to decode cache value: {err}"); 144 + None 145 + } 146 + } 147 + } 148 + 149 + async fn clear(&mut self) {} 150 + }
+1
parakeet/src/config.rs
··· 15 15 pub struct Config { 16 16 pub index_uri: String, 17 17 pub database_url: String, 18 + pub cache_uri: String, 18 19 #[serde(default)] 19 20 pub server: ConfigServer, 20 21 pub service: ConfigService,
+6 -3
parakeet/src/hydration/feedgen.rs
··· 43 43 impl super::StatefulHydrator<'_> { 44 44 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 45 45 let labels = self.get_label(&feedgen).await; 46 - let (feedgen, likes) = self.loaders.feedgen.load(feedgen).await?; 46 + let likes = self.loaders.like.load(feedgen.clone()).await; 47 + let feedgen = self.loaders.feedgen.load(feedgen).await?; 47 48 let profile = self.hydrate_profile(feedgen.owner.clone()).await?; 48 49 49 50 Some(build_feedgen(feedgen, profile, labels, likes, &self.cdn)) ··· 51 52 52 53 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 53 54 let labels = self.get_label_many(&feedgens).await; 55 + let mut likes = self.loaders.like.load_many(feedgens.clone()).await; 54 56 let feedgens = self.loaders.feedgen.load_many(feedgens).await; 55 57 56 58 let creators = feedgens 57 59 .values() 58 - .map(|(feedgen, _)| feedgen.owner.clone()) 60 + .map(|feedgen| feedgen.owner.clone()) 59 61 .collect(); 60 62 61 63 let creators = self.hydrate_profiles(creators).await; 62 64 63 65 feedgens 64 66 .into_iter() 65 - .filter_map(|(uri, (feedgen, likes))| { 67 + .filter_map(|(uri, feedgen)| { 66 68 let creator = creators.get(&feedgen.owner).cloned()?; 67 69 let labels = labels.get(&uri).cloned().unwrap_or_default(); 70 + let likes = likes.remove(&uri); 68 71 69 72 Some(( 70 73 uri,
+23 -14
parakeet/src/hydration/labeler.rs
··· 92 92 impl StatefulHydrator<'_> { 93 93 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 94 94 let labels = self.get_label(&labeler).await; 95 - let (labeler, _, likes) = self.loaders.labeler.load(labeler).await?; 95 + let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 96 + let (labeler, _) = self.loaders.labeler.load(labeler).await?; 96 97 let creator = self.hydrate_profile(labeler.did.clone()).await?; 97 98 98 99 Some(build_view(labeler, creator, labels, likes)) ··· 102 103 let labels = self.get_label_many(&labelers).await; 103 104 let labelers = self.loaders.labeler.load_many(labelers).await; 104 105 105 - let creators = labelers 106 + let (creators, uris) = labelers 106 107 .values() 107 - .map(|(labeler, _, _)| labeler.did.clone()) 108 - .collect(); 108 + .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 109 + .unzip::<_, _, Vec<_>, Vec<_>>(); 109 110 let creators = self.hydrate_profiles(creators).await; 111 + let mut likes = self.loaders.like.load_many(uris.clone()).await; 110 112 111 113 labelers 112 114 .into_iter() 113 - .filter_map(|(k, (labeler, _, likes))| { 115 + .filter_map(|(k, (labeler, _))| { 114 116 let creator = creators.get(&labeler.did).cloned()?; 115 117 let labels = labels.get(&k).cloned().unwrap_or_default(); 118 + let likes = likes.remove(&make_labeler_uri(&labeler.did)); 116 119 117 120 Some((k, build_view(labeler, creator, labels, likes))) 118 121 }) ··· 121 124 122 125 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 123 126 let labels = self.get_label(&labeler).await; 124 - let (labeler, defs, likes) = self.loaders.labeler.load(labeler).await?; 127 + let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 128 + let (labeler, defs) = self.loaders.labeler.load(labeler).await?; 125 129 let creator = self.hydrate_profile(labeler.did.clone()).await?; 126 130 127 131 Some(build_view_detailed(labeler, defs, creator, labels, likes)) ··· 134 138 let labels = self.get_label_many(&labelers).await; 135 139 let labelers = self.loaders.labeler.load_many(labelers).await; 136 140 137 - let creators = labelers 141 + let (creators, uris) = labelers 138 142 .values() 139 - .map(|(labeler, _, _)| labeler.did.clone()) 140 - .collect(); 143 + .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 144 + .unzip::<_, _, Vec<_>, Vec<_>>(); 141 145 let creators = self.hydrate_profiles(creators).await; 146 + let mut likes = self.loaders.like.load_many(uris.clone()).await; 142 147 143 148 labelers 144 149 .into_iter() 145 - .filter_map(|(k, (labeler, defs, likes))| { 150 + .filter_map(|(k, (labeler, defs))| { 146 151 let creator = creators.get(&labeler.did).cloned()?; 147 152 let labels = labels.get(&k).cloned().unwrap_or_default(); 153 + let likes = likes.remove(&make_labeler_uri(&labeler.did)); 148 154 149 - Some(( 150 - k, 151 - build_view_detailed(labeler, defs, creator, labels, likes), 152 - )) 155 + let view = build_view_detailed(labeler, defs, creator, labels, likes); 156 + 157 + Some((k, view)) 153 158 }) 154 159 .collect() 155 160 } 156 161 } 162 + 163 + fn make_labeler_uri(did: &str) -> String { 164 + format!("at://{did}/app.bsky.labeler.service/self") 165 + }
+12 -7
parakeet/src/hydration/posts.rs
··· 99 99 } 100 100 101 101 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 102 - let (post, threadgate, stats) = self.loaders.posts.load(post).await?; 102 + let stats = self.loaders.post_stats.load(post.clone()).await; 103 + let (post, threadgate) = self.loaders.posts.load(post).await?; 103 104 let embed = self.hydrate_embed(post.at_uri.clone()).await; 104 105 let author = self.hydrate_profile_basic(post.did.clone()).await?; 105 106 let threadgate = self.hydrate_threadgate(threadgate).await; ··· 111 112 } 112 113 113 114 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 115 + let stats = self.loaders.post_stats.load_many(posts.clone()).await; 114 116 let posts = self.loaders.posts.load_many(posts).await; 115 117 116 118 let (authors, post_uris) = posts 117 119 .values() 118 - .map(|(post, _, _)| (post.did.clone(), post.at_uri.clone())) 120 + .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) 119 121 .unzip::<_, _, Vec<_>, Vec<_>>(); 120 122 let authors = self.hydrate_profiles_basic(authors).await; 121 123 ··· 123 125 124 126 let threadgates = posts 125 127 .values() 126 - .filter_map(|(_, threadgate, _)| threadgate.clone()) 128 + .filter_map(|(_, threadgate)| threadgate.clone()) 127 129 .collect(); 128 130 let threadgates = self.hydrate_threadgates(threadgates).await; 129 131 ··· 131 133 132 134 posts 133 135 .into_iter() 134 - .filter_map(|(uri, (post, threadgate, stats))| { 136 + .filter_map(|(uri, (post, threadgate))| { 135 137 let author = authors.get(&post.did)?; 136 138 let embed = embeds.get(&uri).cloned(); 137 139 let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); 138 140 let labels = post_labels.get(&uri).cloned().unwrap_or_default(); 141 + let stats = stats.get(&uri).cloned(); 139 142 140 143 Some(( 141 144 uri, ··· 146 149 } 147 150 148 151 pub async fn hydrate_feed_posts(&self, posts: Vec<String>) -> HashMap<String, FeedViewPost> { 152 + let stats = self.loaders.post_stats.load_many(posts.clone()).await; 149 153 let posts = self.loaders.posts.load_many(posts).await; 150 154 151 155 let (authors, post_uris) = posts 152 156 .values() 153 - .map(|(post, _, _)| (post.did.clone(), post.at_uri.clone())) 157 + .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) 154 158 .unzip::<_, _, Vec<_>, Vec<_>>(); 155 159 let authors = self.hydrate_profiles_basic(authors).await; 156 160 ··· 160 164 161 165 let reply_refs = posts 162 166 .values() 163 - .flat_map(|(post, _, _)| [post.parent_uri.clone(), post.root_uri.clone()]) 167 + .flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()]) 164 168 .flatten() 165 169 .collect::<Vec<_>>(); 166 170 ··· 168 172 169 173 posts 170 174 .into_iter() 171 - .filter_map(|(post_uri, (post, _, stats))| { 175 + .filter_map(|(post_uri, (post, _))| { 172 176 let author = authors.get(&post.did)?; 173 177 174 178 let root = post.root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); ··· 199 203 200 204 let embed = embeds.get(&post_uri).cloned(); 201 205 let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); 206 + let stats = stats.get(&post_uri).cloned(); 202 207 let post = build_postview(post, author.to_owned(), labels, embed, None, stats); 203 208 204 209 Some((
+27 -11
parakeet/src/hydration/profile.rs
··· 152 152 } 153 153 154 154 fn build_basic( 155 - (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 155 + (handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 156 + stats: Option<ProfileStats>, 156 157 labels: Vec<models::Label>, 157 158 verifications: Option<Vec<models::VerificationEntry>>, 158 159 cdn: &BskyCdn, ··· 176 177 } 177 178 178 179 fn build_profile( 179 - (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 180 + (handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 181 + stats: Option<ProfileStats>, 180 182 labels: Vec<models::Label>, 181 183 verifications: Option<Vec<models::VerificationEntry>>, 182 184 cdn: &BskyCdn, ··· 202 204 } 203 205 204 206 fn build_detailed( 205 - (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 207 + (handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 208 + stats: Option<ProfileStats>, 206 209 labels: Vec<models::Label>, 207 210 verifications: Option<Vec<models::VerificationEntry>>, 208 211 cdn: &BskyCdn, ··· 235 238 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 236 239 let labels = self.get_profile_label(&did).await; 237 240 let verif = self.loaders.verification.load(did.clone()).await; 241 + let stats = self.loaders.profile_stats.load(did.clone()).await; 238 242 let profile_info = self.loaders.profile.load(did).await?; 239 243 240 - Some(build_basic(profile_info, labels, verif, &self.cdn)) 244 + Some(build_basic(profile_info, stats, labels, verif, &self.cdn)) 241 245 } 242 246 243 247 pub async fn hydrate_profiles_basic( ··· 246 250 ) -> HashMap<String, ProfileViewBasic> { 247 251 let labels = self.get_profile_label_many(&dids).await; 248 252 let verif = self.loaders.verification.load_many(dids.clone()).await; 253 + let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 249 254 let profiles = self.loaders.profile.load_many(dids).await; 250 255 251 256 profiles ··· 253 258 .map(|(k, profile_info)| { 254 259 let labels = labels.get(&k).cloned().unwrap_or_default(); 255 260 let verif = verif.get(&k).cloned(); 261 + let stats = stats.get(&k).cloned(); 256 262 257 - let v = build_basic(profile_info, labels, verif, &self.cdn); 263 + let v = build_basic(profile_info, stats, labels, verif, &self.cdn); 258 264 (k, v) 259 265 }) 260 266 .collect() ··· 262 268 263 269 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 264 270 let labels = self.get_profile_label(&did).await; 265 - 266 271 let verif = self.loaders.verification.load(did.clone()).await; 272 + let stats = self.loaders.profile_stats.load(did.clone()).await; 267 273 let profile_info = self.loaders.profile.load(did).await?; 268 274 269 - Some(build_profile(profile_info, labels, verif, &self.cdn)) 275 + Some(build_profile(profile_info, stats, labels, verif, &self.cdn)) 270 276 } 271 277 272 278 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 273 279 let labels = self.get_profile_label_many(&dids).await; 274 280 let verif = self.loaders.verification.load_many(dids.clone()).await; 281 + let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 275 282 let profiles = self.loaders.profile.load_many(dids).await; 276 283 277 284 profiles ··· 279 286 .map(|(k, profile_info)| { 280 287 let labels = labels.get(&k).cloned().unwrap_or_default(); 281 288 let verif = verif.get(&k).cloned(); 289 + let stats = stats.get(&k).cloned(); 282 290 283 - let v = build_profile(profile_info, labels, verif, &self.cdn); 291 + let v = build_profile(profile_info, stats, labels, verif, &self.cdn); 284 292 (k, v) 285 293 }) 286 294 .collect() ··· 288 296 289 297 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 290 298 let labels = self.get_profile_label(&did).await; 291 - 292 299 let verif = self.loaders.verification.load(did.clone()).await; 300 + let stats = self.loaders.profile_stats.load(did.clone()).await; 293 301 let profile_info = self.loaders.profile.load(did).await?; 294 302 295 - Some(build_detailed(profile_info, labels, verif, &self.cdn)) 303 + Some(build_detailed( 304 + profile_info, 305 + stats, 306 + labels, 307 + verif, 308 + &self.cdn, 309 + )) 296 310 } 297 311 298 312 pub async fn hydrate_profiles_detailed( ··· 301 315 ) -> HashMap<String, ProfileViewDetailed> { 302 316 let labels = self.get_profile_label_many(&dids).await; 303 317 let verif = self.loaders.verification.load_many(dids.clone()).await; 318 + let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 304 319 let profiles = self.loaders.profile.load_many(dids).await; 305 320 306 321 profiles ··· 308 323 .map(|(k, profile_info)| { 309 324 let labels = labels.get(&k).cloned().unwrap_or_default(); 310 325 let verif = verif.get(&k).cloned(); 326 + let stats = stats.get(&k).cloned(); 311 327 312 - let v = build_detailed(profile_info, labels, verif, &self.cdn); 328 + let v = build_detailed(profile_info, stats, labels, verif, &self.cdn); 313 329 (k, v) 314 330 }) 315 331 .collect()
+123 -114
parakeet/src/loaders.rs
··· 1 + use crate::cache::PrefixedLoaderCache; 1 2 use crate::xrpc::extract::LabelConfigItem; 2 - use dataloader::cached::Loader; 3 + use dataloader::async_cached::Loader; 4 + use dataloader::non_cached::Loader as NonCachedLoader; 3 5 use dataloader::BatchFn; 4 6 use diesel::prelude::*; 5 7 use diesel_async::pooled_connection::deadpool::Pool; ··· 7 9 use itertools::Itertools; 8 10 use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions}; 9 11 use parakeet_db::{models, schema}; 12 + use redis::aio::MultiplexedConnection; 13 + use serde::{Deserialize, Serialize}; 10 14 use std::collections::HashMap; 11 15 use std::str::FromStr; 12 16 17 + type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 18 + 19 + fn new_plc_loader<V, F>( 20 + load_fn: F, 21 + conn: &MultiplexedConnection, 22 + prefix: &str, 23 + exp: u64, 24 + ) -> Loader<String, V, F, PrefixedLoaderCache<V>> 25 + where 26 + V: Clone + Serialize + for<'a> Deserialize<'a>, 27 + F: BatchFn<String, V>, 28 + { 29 + Loader::new( 30 + load_fn, 31 + PrefixedLoaderCache::new(conn, prefix.to_string(), Some(exp)), 32 + ) 33 + } 34 + 13 35 pub struct Dataloaders { 14 - pub embed: Loader<String, (EmbedLoaderRet, String), EmbedLoader>, 15 - pub feedgen: Loader<String, FeedGenLoaderRet, FeedGenLoader>, 16 - pub handle: Loader<String, String, HandleLoader>, 36 + pub embed: CachingLoader<String, (EmbedLoaderRet, String), EmbedLoader>, 37 + pub feedgen: CachingLoader<String, models::FeedGen, FeedGenLoader>, 38 + pub handle: CachingLoader<String, String, HandleLoader>, 17 39 pub label: LabelLoader, 18 - pub labeler: Loader<String, LabelServiceLoaderRet, LabelServiceLoader>, 19 - pub list: Loader<String, ListLoaderRet, ListLoader>, 20 - pub posts: Loader<String, PostLoaderRet, PostLoader>, 21 - pub profile: Loader<String, ProfileLoaderRet, ProfileLoader>, 22 - pub starterpacks: Loader<String, StarterPackLoaderRet, StarterPackLoader>, 23 - pub verification: Loader<String, Vec<models::VerificationEntry>, VerificationLoader>, 40 + pub labeler: CachingLoader<String, LabelServiceLoaderRet, LabelServiceLoader>, 41 + pub list: CachingLoader<String, ListLoaderRet, ListLoader>, 42 + pub like: NonCachedLoader<String, i32, LikeLoader>, 43 + pub posts: CachingLoader<String, PostLoaderRet, PostLoader>, 44 + pub post_stats: NonCachedLoader<String, parakeet_index::PostStats, PostStatsLoader>, 45 + pub profile: CachingLoader<String, ProfileLoaderRet, ProfileLoader>, 46 + pub profile_stats: NonCachedLoader<String, parakeet_index::ProfileStats, ProfileStatsLoader>, 47 + pub starterpacks: CachingLoader<String, StarterPackLoaderRet, StarterPackLoader>, 48 + pub verification: CachingLoader<String, Vec<models::VerificationEntry>, VerificationLoader>, 24 49 } 25 50 26 51 impl Dataloaders { 27 - // for the moment, we set up memory cached loaders 28 - // we should build a redis/valkey backend at some point in the future. 29 - pub fn new(pool: Pool<AsyncPgConnection>, idxc: parakeet_index::Client) -> Dataloaders { 52 + #[rustfmt::skip] 53 + pub fn new( 54 + pool: Pool<AsyncPgConnection>, 55 + rc: MultiplexedConnection, 56 + idxc: parakeet_index::Client, 57 + ) -> Dataloaders { 30 58 Dataloaders { 31 - embed: Loader::new(EmbedLoader(pool.clone())), 32 - feedgen: Loader::new(FeedGenLoader(pool.clone(), idxc.clone())), 33 - handle: Loader::new(HandleLoader(pool.clone())), 59 + embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 60 + feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600), 61 + handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 34 62 label: LabelLoader(pool.clone()), // CARE: never cache this. 35 - labeler: Loader::new(LabelServiceLoader(pool.clone(), idxc.clone())), 36 - list: Loader::new(ListLoader(pool.clone())), 37 - posts: Loader::new(PostLoader(pool.clone(), idxc.clone())), 38 - profile: Loader::new(ProfileLoader(pool.clone(), idxc.clone())), 39 - starterpacks: Loader::new(StarterPackLoader(pool.clone())), 40 - verification: Loader::new(VerificationLoader(pool.clone())), 63 + labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 64 + like: NonCachedLoader::new(LikeLoader(idxc.clone())), 65 + list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), 66 + posts: new_plc_loader(PostLoader(pool.clone()), &rc, "post", 3600), 67 + post_stats: NonCachedLoader::new(PostStatsLoader(idxc.clone())), 68 + profile: new_plc_loader(ProfileLoader(pool.clone()), &rc, "profile", 3600), 69 + profile_stats: NonCachedLoader::new(ProfileStatsLoader(idxc.clone())), 70 + starterpacks: new_plc_loader(StarterPackLoader(pool.clone()), &rc, "starterpacks", 600), 71 + verification: new_plc_loader(VerificationLoader(pool.clone()), &rc, "verification", 60), 72 + } 73 + } 74 + } 75 + 76 + pub struct LikeLoader(parakeet_index::Client); 77 + impl BatchFn<String, i32> for LikeLoader { 78 + async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 79 + let res = self 80 + .0 81 + .get_like_count_many(parakeet_index::GetStatsManyReq { 82 + uris: keys.to_vec(), 83 + }) 84 + .await 85 + .map(|v| v.into_inner()); 86 + 87 + match res { 88 + Ok(data) => data 89 + .entries 90 + .into_iter() 91 + .map(|(k, v)| (k, v.likes)) 92 + .collect(), 93 + Err(_) => HashMap::new(), 41 94 } 42 95 } 43 96 } ··· 66 119 } 67 120 } 68 121 69 - pub struct ProfileLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 122 + pub struct ProfileLoader(Pool<AsyncPgConnection>); 70 123 pub type ProfileLoaderRet = ( 71 124 Option<String>, 72 125 models::Profile, 73 126 Option<ChatAllowIncoming>, 74 127 bool, 75 - Option<parakeet_index::ProfileStats>, 76 128 Option<models::Status>, 77 129 Option<ProfileAllowSubscriptions>, 78 130 ); ··· 115 167 )>(&mut conn) 116 168 .await; 117 169 118 - let stats_req = parakeet_index::GetStatsManyReq { 119 - uris: keys.to_vec(), 120 - }; 121 - let mut stats = self 122 - .1 123 - .get_profile_stats_many(stats_req) 124 - .await 125 - .unwrap() 126 - .into_inner() 127 - .entries; 128 - 129 170 match res { 130 171 Ok(res) => HashMap::from_iter(res.into_iter().map( 131 172 |(did, handle, profile, chat_decl, labeler_cid, status, notif_decl)| { ··· 133 174 let notif_decl = 134 175 notif_decl.and_then(|v| ProfileAllowSubscriptions::from_str(&v).ok()); 135 176 let is_labeler = labeler_cid.is_some(); 136 - let maybe_stats = stats.remove(&did); 137 177 138 - let val = ( 139 - handle, 140 - profile, 141 - chat_decl, 142 - is_labeler, 143 - maybe_stats, 144 - status, 145 - notif_decl, 146 - ); 178 + let val = (handle, profile, chat_decl, is_labeler, status, notif_decl); 147 179 148 180 (did, val) 149 181 }, ··· 153 185 HashMap::new() 154 186 } 155 187 } 188 + } 189 + } 190 + 191 + pub struct ProfileStatsLoader(parakeet_index::Client); 192 + impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 193 + async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 194 + let stats_req = parakeet_index::GetStatsManyReq { 195 + uris: keys.to_vec(), 196 + }; 197 + 198 + self.0 199 + .get_profile_stats_many(stats_req) 200 + .await 201 + .unwrap() 202 + .into_inner() 203 + .entries 156 204 } 157 205 } 158 206 ··· 189 237 } 190 238 191 239 pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 192 - type FeedGenLoaderRet = (models::FeedGen, Option<i32>); 193 - impl BatchFn<String, FeedGenLoaderRet> for FeedGenLoader { 194 - async fn load(&mut self, keys: &[String]) -> HashMap<String, FeedGenLoaderRet> { 240 + impl BatchFn<String, models::FeedGen> for FeedGenLoader { 241 + async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 195 242 let mut conn = self.0.get().await.unwrap(); 196 243 197 244 let res = schema::feedgens::table ··· 200 247 .load(&mut conn) 201 248 .await; 202 249 203 - let stats_req = parakeet_index::GetStatsManyReq { 204 - uris: keys.to_vec(), 205 - }; 206 - let mut stats = self 207 - .1 208 - .get_like_count_many(stats_req) 209 - .await 210 - .unwrap() 211 - .into_inner() 212 - .entries; 213 - 214 250 match res { 215 - Ok(res) => HashMap::from_iter(res.into_iter().map(|feedgen| { 216 - let likes = stats.remove(&feedgen.at_uri).map(|v| v.likes); 217 - 218 - (feedgen.at_uri.clone(), (feedgen, likes)) 219 - })), 251 + Ok(res) => HashMap::from_iter( 252 + res.into_iter() 253 + .map(|feedgen| (feedgen.at_uri.clone(), feedgen)), 254 + ), 220 255 Err(e) => { 221 256 tracing::error!("feedgen load failed: {e}"); 222 257 HashMap::new() ··· 225 260 } 226 261 } 227 262 228 - pub struct PostLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 229 - type PostLoaderRet = ( 230 - models::Post, 231 - Option<models::Threadgate>, 232 - Option<parakeet_index::PostStats>, 233 - ); 263 + pub struct PostLoader(Pool<AsyncPgConnection>); 264 + type PostLoaderRet = (models::Post, Option<models::Threadgate>); 234 265 impl BatchFn<String, PostLoaderRet> for PostLoader { 235 266 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 236 267 let mut conn = self.0.get().await.unwrap(); ··· 245 276 .load(&mut conn) 246 277 .await; 247 278 279 + match res { 280 + Ok(res) => HashMap::from_iter( 281 + res.into_iter() 282 + .map(|(post, threadgate)| (post.at_uri.clone(), (post, threadgate))), 283 + ), 284 + Err(e) => { 285 + tracing::error!("post load failed: {e}"); 286 + HashMap::new() 287 + } 288 + } 289 + } 290 + } 291 + 292 + pub struct PostStatsLoader(parakeet_index::Client); 293 + impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 294 + async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 248 295 let stats_req = parakeet_index::GetStatsManyReq { 249 296 uris: keys.to_vec(), 250 297 }; 251 - let mut stats = self 252 - .1 298 + 299 + self.0 253 300 .get_post_stats_many(stats_req) 254 301 .await 255 302 .unwrap() 256 303 .into_inner() 257 - .entries; 258 - 259 - match res { 260 - Ok(res) => HashMap::from_iter(res.into_iter().map(|(post, threadgate)| { 261 - let maybe_stats = stats.remove(&post.at_uri); 262 - 263 - (post.at_uri.clone(), (post, threadgate, maybe_stats)) 264 - })), 265 - Err(e) => { 266 - tracing::error!("post load failed: {e}"); 267 - HashMap::new() 268 - } 269 - } 304 + .entries 270 305 } 271 306 } 272 307 273 308 pub struct EmbedLoader(Pool<AsyncPgConnection>); 274 - #[derive(Debug, Clone)] 309 + #[derive(Debug, Clone, Serialize, Deserialize)] 275 310 pub enum EmbedLoaderRet { 276 311 Images(Vec<models::PostEmbedImage>), 277 312 Video(models::PostEmbedVideo), ··· 385 420 } 386 421 387 422 pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 388 - type LabelServiceLoaderRet = ( 389 - models::LabelerService, 390 - Vec<models::LabelDefinition>, 391 - Option<i32>, 392 - ); 423 + type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 393 424 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 394 425 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 395 426 let mut conn = self.0.get().await.unwrap(); ··· 408 439 409 440 let defs = defs.grouped_by(&labelers); 410 441 411 - let uris = keys 412 - .iter() 413 - .map(|v| format!("at://{v}/app.bsky.labeler.service/self")) 414 - .collect(); 415 - let stats_req = parakeet_index::GetStatsManyReq { uris }; 416 - let mut stats = self 417 - .1 418 - .get_like_count_many(stats_req) 419 - .await 420 - .unwrap() 421 - .into_inner() 422 - .entries; 423 - 424 442 labelers 425 443 .into_iter() 426 444 .zip(defs) 427 - .map(|(labeler, defs)| { 428 - let likes = stats 429 - .remove(&format!( 430 - "at://{}/app.bsky.labeler.service/self", 431 - &labeler.did 432 - )) 433 - .map(|v| v.likes); 434 - 435 - (labeler.did.clone(), (labeler, defs, likes)) 436 - }) 445 + .map(|(labeler, defs)| (labeler.did.clone(), (labeler, defs))) 437 446 .collect() 438 447 } 439 448 }
+8
parakeet/src/main.rs
··· 3 3 use diesel_async::pooled_connection::AsyncDieselConnectionManager; 4 4 use diesel_async::AsyncPgConnection; 5 5 use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; 6 + use redis::aio::MultiplexedConnection; 6 7 use std::sync::Arc; 7 8 use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; 8 9 use tower_http::trace::TraceLayer; 9 10 10 11 const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); 11 12 13 + mod cache; 12 14 mod config; 13 15 mod db; 14 16 mod hydration; ··· 18 20 #[derive(Clone)] 19 21 pub struct GlobalState { 20 22 pub pool: Pool<AsyncPgConnection>, 23 + pub redis_mp: MultiplexedConnection, 21 24 pub dataloaders: Arc<loaders::Dataloaders>, 22 25 pub resolver: Arc<did_resolver::Resolver>, 23 26 pub index_client: parakeet_index::Client, ··· 46 49 tracing::info!("database migrations complete"); 47 50 } 48 51 52 + let redis_client = redis::Client::open(conf.cache_uri)?; 53 + let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 54 + 49 55 let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 50 56 51 57 let dataloaders = Arc::new(loaders::Dataloaders::new( 52 58 pool.clone(), 59 + redis_mp.clone(), 53 60 index_client.clone(), 54 61 )); 55 62 let resolver = Arc::new(did_resolver::Resolver::new(did_resolver::ResolverOpts { ··· 82 89 .layer(cors) 83 90 .with_state(GlobalState { 84 91 pool, 92 + redis_mp, 85 93 dataloaders, 86 94 resolver, 87 95 index_client,