Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'index-rocksdb' into 'main'

Index to RocksDB

See merge request parakeet-social/parakeet!14

Mia e0347740 ce6af714

+224 -152
+56 -1
Cargo.lock
··· 545 545 checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" 546 546 547 547 [[package]] 548 + name = "bzip2-sys" 549 + version = "0.1.13+1.0.8" 550 + source = "registry+https://github.com/rust-lang/crates.io-index" 551 + checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" 552 + dependencies = [ 553 + "cc", 554 + "pkg-config", 555 + ] 556 + 557 + [[package]] 548 558 name = "cbor4ii" 549 559 version = "0.2.14" 550 560 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2225 2235 checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" 2226 2236 2227 2237 [[package]] 2238 + name = "librocksdb-sys" 2239 + version = "0.17.1+9.9.3" 2240 + source = "registry+https://github.com/rust-lang/crates.io-index" 2241 + checksum = "2b7869a512ae9982f4d46ba482c2a304f1efd80c6412a3d4bf57bb79a619679f" 2242 + dependencies = [ 2243 + "bindgen", 2244 + "bzip2-sys", 2245 + "cc", 2246 + "libc", 2247 + "libz-sys", 2248 + "lz4-sys", 2249 + ] 2250 + 2251 + [[package]] 2252 + name = "libz-sys" 2253 + version = "1.1.22" 2254 + source = "registry+https://github.com/rust-lang/crates.io-index" 2255 + checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" 2256 + dependencies = [ 2257 + "cc", 2258 + "pkg-config", 2259 + "vcpkg", 2260 + ] 2261 + 2262 + [[package]] 2228 2263 name = "linked-hash-map" 2229 2264 version = "0.5.6" 2230 2265 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2268 2303 checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" 2269 2304 dependencies = [ 2270 2305 "linked-hash-map", 2306 + ] 2307 + 2308 + [[package]] 2309 + name = "lz4-sys" 2310 + version = "1.11.1+lz4-1.10.0" 2311 + source = "registry+https://github.com/rust-lang/crates.io-index" 2312 + checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" 2313 + dependencies = [ 2314 + "cc", 2315 + "libc", 2271 2316 ] 2272 2317 2273 2318 [[package]] ··· 2651 2696 "figment", 2652 2697 "itertools 0.14.0", 2653 2698 "prost", 2699 + "rocksdb", 2654 2700 "serde", 2655 - "sled", 2656 2701 "tokio", 2657 2702 "tonic", 2658 2703 "tonic-build", ··· 3304 3349 "spin", 3305 3350 "untrusted", 3306 3351 "windows-sys 0.52.0", 3352 + ] 3353 + 3354 + [[package]] 3355 + name = "rocksdb" 3356 + version = "0.23.0" 3357 + source = "registry+https://github.com/rust-lang/crates.io-index" 3358 + checksum = "26ec73b20525cb235bad420f911473b69f9fe27cc856c5461bccd7e4af037f43" 3359 + dependencies = [ 3360 + "libc", 3361 + "librocksdb-sys", 3307 3362 ] 3308 3363 3309 3364 [[package]]
+7 -4
consumer/src/db/record.rs
··· 1 - use super::{PgExecResult, PgOptResult}; 1 + use super::{PgExecResult, PgOptResult, PgResult}; 2 2 use crate::indexer::records::*; 3 3 use crate::utils::{blob_ref, strongref_to_parts}; 4 4 use chrono::prelude::*; ··· 61 61 repo: &str, 62 62 cid: Cid, 63 63 rec: AppBskyFeedGenerator, 64 - ) -> PgExecResult { 64 + ) -> PgResult<bool> { 65 65 let cid = cid.to_string(); 66 66 let description_facets = rec 67 67 .description_facets ··· 84 84 ], 85 85 ) 86 86 .await 87 + .map(|v| v == 0) 87 88 } 88 89 89 90 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 181 182 repo: &str, 182 183 cid: Cid, 183 184 rec: AppBskyGraphList, 184 - ) -> PgExecResult { 185 + ) -> PgResult<bool> { 185 186 let cid = cid.to_string(); 186 187 let description_facets = rec 187 188 .description_facets ··· 203 204 ], 204 205 ) 205 206 .await 207 + .map(|v| v == 0) 206 208 } 207 209 208 210 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 559 561 repo: &str, 560 562 cid: Cid, 561 563 rec: AppBskyGraphStarterPack, 562 - ) -> PgExecResult { 564 + ) -> PgResult<bool> { 563 565 let cid = cid.to_string(); 564 566 let record = serde_json::to_value(&rec).unwrap(); 565 567 let description_facets = rec ··· 585 587 ], 586 588 ) 587 589 .await 590 + .map(|v| v == 0) 588 591 } 589 592 590 593 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
+2 -1
consumer/src/db/sql/feedgen_upsert.sql
··· 8 8 description=EXCLUDED.description, 9 9 description_facets=EXCLUDED.description_facets, 10 10 avatar_cid=EXCLUDED.avatar_cid, 11 - indexed_at=NOW() 11 + indexed_at=NOW() 12 + RETURNING XMAX
+2 -1
consumer/src/db/sql/list_upsert.sql
··· 6 6 description=EXCLUDED.description, 7 7 description_facets=EXCLUDED.description_facets, 8 8 avatar_cid=EXCLUDED.avatar_cid, 9 - indexed_at=NOW() 9 + indexed_at=NOW() 10 + RETURNING XMAX
+2 -1
consumer/src/db/sql/starterpack_upsert.sql
··· 7 7 description_facets=EXCLUDED.description_facets, 8 8 list=EXCLUDED.list, 9 9 feeds=EXCLUDED.feeds, 10 - indexed_at=NOW() 10 + indexed_at=NOW() 11 + RETURNING XMAX
+13 -12
consumer/src/indexer/mod.rs
··· 527 527 } 528 528 RecordTypes::AppBskyFeedGenerator(record) => { 529 529 let labels = record.labels.clone(); 530 - let count = db::feedgen_upsert(conn, at_uri, repo, cid, record).await?; 530 + let did_insert = db::feedgen_upsert(conn, at_uri, repo, cid, record).await?; 531 531 532 532 if let Some(labels) = labels { 533 533 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 534 534 } 535 535 536 - deltas 537 - .add_delta(repo, AggregateType::ProfileFeed, count as i32) 538 - .await; 536 + if did_insert { 537 + deltas.incr(repo, AggregateType::ProfileFeed).await; 538 + } 539 539 } 540 540 RecordTypes::AppBskyFeedLike(record) => { 541 541 let subject = record.subject.uri.clone(); ··· 635 635 } 636 636 RecordTypes::AppBskyGraphList(record) => { 637 637 let labels = record.labels.clone(); 638 - let count = db::list_upsert(conn, at_uri, repo, cid, record).await?; 638 + let did_insert = db::list_upsert(conn, at_uri, repo, cid, record).await?; 639 639 640 640 if let Some(labels) = labels { 641 641 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 642 642 } 643 643 644 - deltas 645 - .add_delta(repo, AggregateType::ProfileList, count as i32) 646 - .await; 644 + if did_insert { 645 + deltas.incr(repo, AggregateType::ProfileList).await; 646 + } 647 647 } 648 648 RecordTypes::AppBskyGraphListBlock(record) => { 649 649 db::list_block_insert(conn, at_uri, repo, record).await?; ··· 659 659 db::list_item_insert(conn, at_uri, record).await?; 660 660 } 661 661 RecordTypes::AppBskyGraphStarterPack(record) => { 662 - let count = db::starter_pack_upsert(conn, at_uri, repo, cid, record).await?; 663 - deltas 664 - .add_delta(repo, AggregateType::ProfileStarterpack, count as i32) 665 - .await; 662 + let did_insert = db::starter_pack_upsert(conn, at_uri, repo, cid, record).await?; 663 + 664 + if did_insert { 665 + deltas.incr(repo, AggregateType::ProfileStarterpack).await; 666 + } 666 667 } 667 668 RecordTypes::AppBskyGraphVerification(record) => { 668 669 db::verification_insert(conn, at_uri, repo, cid, record).await?;
+2 -2
parakeet-index/Cargo.toml
··· 14 14 eyre = { version = "0.6.12", optional = true } 15 15 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 16 16 itertools = { version = "0.14.0", optional = true } 17 + rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 17 18 serde = { version = "1.0.217", features = ["derive"], optional = true } 18 - sled = { version = "0.34.7", optional = true } 19 19 tokio = { version = "1.42.0", features = ["full"], optional = true } 20 20 tonic-health = { version = "0.13.0", optional = true } 21 21 tracing = { version = "0.1.40", optional = true } ··· 25 25 tonic-build = "0.13.0" 26 26 27 27 [features] 28 - server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:serde", "dep:sled", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"] 28 + server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"]
+1 -1
parakeet-index/Dockerfile
··· 1 1 FROM rust:1.85-slim-bookworm AS builder 2 2 WORKDIR /work 3 - RUN apt-get update && apt-get install -y --no-install-recommends wget libssl-dev protobuf-compiler pkg-config && rm -rf /var/lib/apt/lists/* 3 + RUN apt-get update && apt-get install -y --no-install-recommends wget libssl-dev protobuf-compiler pkg-config clang && rm -rf /var/lib/apt/lists/* 4 4 RUN wget -qO /bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.38/grpc_health_probe-linux-amd64 && \ 5 5 chmod +x /bin/grpc_health_probe 6 6 COPY . .
+112 -49
parakeet-index/src/server/db.rs
··· 1 - use crate::all_none; 2 - use crate::server::utils::{ToIntExt, TreeExt, slice_as_i32}; 3 - use sled::{Db, MergeOperator, Tree}; 1 + use crate::server::utils::{ToIntExt, slice_as_i32, to_key_name}; 2 + use crate::{AggregateType, all_none}; 3 + use itertools::izip; 4 + use rocksdb::{DB, MergeOperands}; 5 + use std::collections::HashMap; 4 6 use std::path::PathBuf; 5 7 6 8 pub struct DbStore { 7 - pub agg_db: Db, 8 - pub label_db: Db, 9 - 10 - pub follows: Tree, 11 - pub followers: Tree, 12 - pub likes: Tree, 13 - pub replies: Tree, 14 - pub reposts: Tree, 15 - pub embeds: Tree, 16 - pub profile_posts: Tree, 17 - pub profile_lists: Tree, 18 - pub profile_feeds: Tree, 19 - pub profile_starterpacks: Tree, 9 + pub agg_db: DB, 10 + pub label_db: DB, 20 11 } 21 12 22 13 impl DbStore { 23 14 pub fn new(db_root: PathBuf) -> eyre::Result<Self> { 24 - let agg_db = sled::open(db_root.join("aggdb"))?; 25 - let label_db = sled::open(db_root.join("labeldb"))?; 15 + let mut opts = rocksdb::Options::default(); 16 + opts.create_if_missing(true); 17 + opts.set_compression_type(rocksdb::DBCompressionType::Lz4); 26 18 27 - Ok(DbStore { 28 - follows: open_tree(&agg_db, "follows", merge_delta)?, 29 - followers: open_tree(&agg_db, "followers", merge_delta)?, 30 - likes: open_tree(&agg_db, "likes", merge_delta)?, 31 - replies: open_tree(&agg_db, "replies", merge_delta)?, 32 - reposts: open_tree(&agg_db, "reposts", merge_delta)?, 33 - embeds: open_tree(&agg_db, "embeds", merge_delta)?, 34 - profile_posts: open_tree(&agg_db, "profile_posts", merge_delta)?, 35 - profile_lists: open_tree(&agg_db, "profile_lists", merge_delta)?, 36 - profile_feeds: open_tree(&agg_db, "profile_feeds", merge_delta)?, 37 - profile_starterpacks: open_tree(&agg_db, "profile_starterpacks", merge_delta)?, 19 + let mut agg_opts = opts.clone(); 20 + agg_opts.set_merge_operator_associative("pk_i32_merge", pk_i32_merge); 21 + let agg_db = DB::open(&agg_opts, db_root.join("aggdb"))?; 22 + 23 + let label_db = DB::open(&opts, db_root.join("labeldb"))?; 38 24 39 - agg_db, 40 - label_db, 41 - }) 25 + Ok(DbStore { agg_db, label_db }) 42 26 } 43 27 44 28 pub fn get_post_stats(&self, post: &str) -> Option<crate::PostStats> { 45 - let replies = self.replies.get_i32(post); 46 - let likes = self.likes.get_i32(post); 47 - let reposts = self.reposts.get_i32(post); 48 - let quotes = self.embeds.get_i32(post); 29 + let replies = self.get_aggregate(post, AggregateType::Reply); 30 + let likes = self.get_aggregate(post, AggregateType::Like); 31 + let reposts = self.get_aggregate(post, AggregateType::Repost); 32 + let quotes = self.get_aggregate(post, AggregateType::Embed); 49 33 50 34 if all_none![replies, likes, reposts, quotes] { 51 35 return None; ··· 59 43 }) 60 44 } 61 45 46 + pub fn get_post_stats_many(&self, posts: Vec<String>) -> HashMap<String, crate::PostStats> { 47 + let replies = self.get_aggregate_many(&posts, AggregateType::Reply); 48 + let likes = self.get_aggregate_many(&posts, AggregateType::Like); 49 + let reposts = self.get_aggregate_many(&posts, AggregateType::Repost); 50 + let quotes = self.get_aggregate_many(&posts, AggregateType::Embed); 51 + 52 + izip!(posts, replies, likes, reposts, quotes) 53 + .filter_map(|(key, replies, likes, reposts, quotes)| { 54 + if all_none![replies, likes, reposts, quotes] { 55 + return None; 56 + } 57 + 58 + let stats = crate::PostStats { 59 + replies: replies.unwrap_or_default(), 60 + likes: likes.unwrap_or_default(), 61 + reposts: reposts.unwrap_or_default(), 62 + quotes: quotes.unwrap_or_default(), 63 + }; 64 + 65 + Some((key, stats)) 66 + }) 67 + .collect() 68 + } 69 + 62 70 pub fn get_profile_stats(&self, did: &str) -> Option<crate::ProfileStats> { 63 - let followers = self.followers.get_i32(did); 64 - let following = self.follows.get_i32(did); 65 - let posts = self.profile_posts.get_i32(did); 66 - let lists = self.profile_lists.get_i32(did); 67 - let feeds = self.profile_feeds.get_i32(did); 68 - let starterpacks = self.profile_starterpacks.get_i32(did); 71 + let followers = self.get_aggregate(did, AggregateType::Follower); 72 + let following = self.get_aggregate(did, AggregateType::Follow); 73 + let posts = self.get_aggregate(did, AggregateType::ProfilePost); 74 + let lists = self.get_aggregate(did, AggregateType::ProfileList); 75 + let feeds = self.get_aggregate(did, AggregateType::ProfileFeed); 76 + let starterpacks = self.get_aggregate(did, AggregateType::ProfileStarterpack); 69 77 70 78 if all_none![followers, following, posts, lists, feeds, starterpacks] { 71 79 return None; ··· 80 88 starterpacks: starterpacks.unwrap_or_default(), 81 89 }) 82 90 } 83 - } 84 91 85 - fn open_tree(db: &Db, name: &str, merge: impl MergeOperator + 'static) -> eyre::Result<Tree> { 86 - let tree = db.open_tree(name)?; 92 + pub fn get_profile_stats_many( 93 + &self, 94 + dids: Vec<String>, 95 + ) -> HashMap<String, crate::ProfileStats> { 96 + let followers = self.get_aggregate_many(&dids, AggregateType::Follower); 97 + let following = self.get_aggregate_many(&dids, AggregateType::Follow); 98 + let posts = self.get_aggregate_many(&dids, AggregateType::ProfilePost); 99 + let lists = self.get_aggregate_many(&dids, AggregateType::ProfileList); 100 + let feeds = self.get_aggregate_many(&dids, AggregateType::ProfileFeed); 101 + let starterpacks = self.get_aggregate_many(&dids, AggregateType::ProfileStarterpack); 87 102 88 - tree.set_merge_operator(merge); 103 + izip!( 104 + dids, 105 + followers, 106 + following, 107 + posts, 108 + lists, 109 + feeds, 110 + starterpacks 111 + ) 112 + .filter_map( 113 + |(key, followers, following, posts, lists, feeds, starterpacks)| { 114 + if all_none![followers, following, posts, lists, feeds, starterpacks] { 115 + return None; 116 + } 89 117 90 - Ok(tree) 118 + let stats = crate::ProfileStats { 119 + followers: followers.unwrap_or_default(), 120 + following: following.unwrap_or_default(), 121 + posts: posts.unwrap_or_default(), 122 + lists: lists.unwrap_or_default(), 123 + feeds: feeds.unwrap_or_default(), 124 + starterpacks: starterpacks.unwrap_or_default(), 125 + }; 126 + 127 + Some((key, stats)) 128 + }, 129 + ) 130 + .collect() 131 + } 132 + 133 + pub fn get_aggregate(&self, uri: &str, typ: AggregateType) -> Option<i32> { 134 + let key = to_key_name(uri, typ); 135 + 136 + self.agg_db 137 + .get_pinned(&key) 138 + .ok() 139 + .flatten() 140 + .and_then(|data| slice_as_i32(&data)) 141 + } 142 + 143 + pub fn get_aggregate_many(&self, uris: &[String], typ: AggregateType) -> Vec<Option<i32>> { 144 + let keys = uris.iter().map(|uri| to_key_name(uri, typ)); 145 + 146 + self.agg_db 147 + .multi_get(keys) 148 + .into_iter() 149 + .filter_map(|v| v.ok()) 150 + .map(|v| v.and_then(|data| slice_as_i32(&data))) 151 + .collect() 152 + } 91 153 } 92 154 93 - fn merge_delta(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { 155 + fn pk_i32_merge(_key: &[u8], old: Option<&[u8]>, op: &MergeOperands) -> Option<Vec<u8>> { 94 156 let old = old.and_then(slice_as_i32); 95 - let new = slice_as_i32(new)?; 157 + 158 + let new = op.iter().map(slice_as_i32).sum::<Option<i32>>()?; 96 159 97 160 let res = match old { 98 161 Some(old) => old + new,
+21 -54
parakeet-index/src/server/service.rs
··· 1 1 use crate::index::*; 2 2 use crate::server::GlobalState; 3 - use crate::server::utils::TreeExt; 4 - use std::collections::HashMap; 3 + use crate::server::utils::to_key_name; 5 4 use std::ops::Deref; 6 5 use std::sync::Arc; 7 6 use tonic::codegen::tokio_stream::StreamExt; ··· 14 13 Service(state) 15 14 } 16 15 17 - fn apply_delta( 18 - &self, 19 - uri: &str, 20 - typ: AggregateType, 21 - delta: i32, 22 - ) -> sled::Result<Option<sled::IVec>> { 16 + fn apply_delta(&self, uri: &str, typ: AggregateType, delta: i32) -> Result<(), rocksdb::Error> { 23 17 let val = delta.to_le_bytes(); 18 + let key = to_key_name(uri, typ); 24 19 25 - match typ { 26 - AggregateType::Unknown => todo!(), 27 - AggregateType::Follow => self.dbs.follows.merge(uri, val), 28 - AggregateType::Follower => self.dbs.followers.merge(uri, val), 29 - AggregateType::Like => self.dbs.likes.merge(uri, val), 30 - AggregateType::Reply => self.dbs.replies.merge(uri, val), 31 - AggregateType::Repost => self.dbs.reposts.merge(uri, val), 32 - AggregateType::Embed => self.dbs.embeds.merge(uri, val), 33 - AggregateType::ProfilePost => self.dbs.profile_posts.merge(uri, val), 34 - AggregateType::ProfileList => self.dbs.profile_lists.merge(uri, val), 35 - AggregateType::ProfileFeed => self.dbs.profile_feeds.merge(uri, val), 36 - AggregateType::ProfileStarterpack => self.dbs.profile_starterpacks.merge(uri, val), 37 - } 20 + self.dbs.agg_db.merge(key, val) 38 21 } 39 22 } 40 23 ··· 69 52 request: Request<AggregateDeltaBatchReq>, 70 53 ) -> Result<Response<AggregateDeltaRes>, Status> { 71 54 let inner = request.into_inner(); 55 + let mut batch = rocksdb::WriteBatch::default(); 72 56 73 57 for data in inner.deltas { 74 - let res = self.apply_delta(&data.uri, data.typ(), data.delta); 58 + let val = data.delta.to_le_bytes(); 59 + let key = to_key_name(&data.uri, data.typ()); 60 + 61 + batch.merge(key, val); 62 + } 75 63 76 - if let Err(e) = res { 77 - tracing::error!("failed to update stats DB: {e}"); 78 - return Err(Status::unknown("failed to update stats DB")); 79 - } 64 + if let Err(e) = self.dbs.agg_db.write(batch) { 65 + tracing::error!("failed to update stats DB: {e}"); 66 + return Err(Status::unknown("failed to update stats DB")); 80 67 } 81 68 82 69 Ok(Response::new(AggregateDeltaRes {})) ··· 121 108 ) -> Result<Response<GetProfileStatsManyRes>, Status> { 122 109 let inner = request.into_inner(); 123 110 124 - // idk if this is the best way of doing this???? 125 - let entries = inner 126 - .uris 127 - .into_iter() 128 - .filter_map(|uri| { 129 - let stats = self.dbs.get_profile_stats(&uri)?; 130 - 131 - Some((uri, stats)) 132 - }) 133 - .collect::<HashMap<_, _>>(); 111 + let entries = self.dbs.get_profile_stats_many(inner.uris); 134 112 135 113 Ok(Response::new(GetProfileStatsManyRes { entries })) 136 114 } ··· 152 130 ) -> Result<Response<GetPostStatsManyRes>, Status> { 153 131 let inner = request.into_inner(); 154 132 155 - let entries = inner 156 - .uris 157 - .into_iter() 158 - .filter_map(|uri| { 159 - let stats = self.dbs.get_post_stats(&uri)?; 160 - 161 - Some((uri, stats)) 162 - }) 163 - .collect::<HashMap<_, _>>(); 133 + let entries = self.dbs.get_post_stats_many(inner.uris); 164 134 165 135 Ok(Response::new(GetPostStatsManyRes { entries })) 166 136 } ··· 173 143 174 144 let likes = self 175 145 .dbs 176 - .likes 177 - .get_i32(inner.uri) 146 + .get_aggregate(&inner.uri, AggregateType::Like) 178 147 .map(|likes| LikeCount { likes }); 179 148 180 149 Ok(Response::new(GetLikeCountRes { likes })) ··· 186 155 ) -> Result<Response<GetLikeCountManyRes>, Status> { 187 156 let inner = request.into_inner(); 188 157 189 - let entries = inner 190 - .uris 158 + let entries = self 159 + .dbs 160 + .get_aggregate_many(&inner.uris, AggregateType::Like) 191 161 .into_iter() 192 - .filter_map(|uri| { 193 - let likes = self.dbs.likes.get_i32(&uri)?; 194 - 195 - Some((uri, LikeCount { likes })) 196 - }) 162 + .zip(inner.uris) 163 + .filter_map(|(val, key)| val.map(|likes| (key, LikeCount { likes }))) 197 164 .collect(); 198 165 199 166 Ok(Response::new(GetLikeCountManyRes { entries }))
+6 -26
parakeet-index/src/server/utils.rs
··· 1 - use sled::{IVec, Tree}; 1 + use crate::AggregateType; 2 2 3 3 pub trait ToIntExt { 4 4 fn as_i32(&self) -> Option<i32>; 5 5 fn from_i32(i: i32) -> Self; 6 6 } 7 7 8 - impl ToIntExt for IVec { 9 - fn as_i32(&self) -> Option<i32> { 10 - if self.len() == 4 { 11 - let bytes = self[0..4].try_into().ok()?; 12 - Some(i32::from_le_bytes(bytes)) 13 - } else { 14 - None 15 - } 16 - } 17 - 18 - fn from_i32(i: i32) -> Self { 19 - IVec::from(&i.to_le_bytes()) 20 - } 21 - } 22 - 23 8 impl ToIntExt for Vec<u8> { 24 9 fn as_i32(&self) -> Option<i32> { 25 10 if self.len() == 4 { ··· 41 26 Some(i32::from_le_bytes(bytes)) 42 27 } 43 28 44 - pub trait TreeExt { 45 - fn get_i32(&self, key: impl AsRef<[u8]>) -> Option<i32>; 46 - } 47 - 48 - impl TreeExt for Tree { 49 - fn get_i32(&self, key: impl AsRef<[u8]>) -> Option<i32> { 50 - self.get(key).ok().flatten().and_then(|v| v.as_i32()) 51 - } 52 - } 53 - 54 29 #[macro_export] 55 30 macro_rules! all_none { 56 31 ($var0:ident, $($var:ident),*) => { 57 32 $var0.is_none() $(&& $var.is_none())* 58 33 }; 59 34 } 35 + 36 + #[inline(always)] 37 + pub fn to_key_name(uri: &str, typ: AggregateType) -> String { 38 + format!("{uri}#{}", typ.as_str_name()) 39 + }