this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feature: feed caching

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+371 -25
+1
Cargo.toml
··· 42 42 zstd = "0.13.2" 43 43 reqwest = { version = "0.12.9", features = ["json", "zstd", "rustls-tls"] } 44 44 rhai = { version = "1.20.0", features = ["serde", "std", "sync"]} 45 + duration-str = "0.11.2"
+26 -1
src/bin/supercell.rs
··· 3 3 use std::collections::HashMap; 4 4 use std::collections::HashSet; 5 5 use std::env; 6 + use supercell::cache::Cache; 7 + use supercell::cache::CacheTask; 6 8 use supercell::vmc::VerificationMethodCacheTask; 7 9 use tokio::net::TcpListener; 8 10 use tokio::signal; ··· 60 62 .flat_map(|(_, (_, allow))| allow.iter().cloned()) 61 63 .collect::<HashSet<String>>(); 62 64 63 - let web_context = WebContext::new(pool.clone(), config.external_base.as_str(), feeds); 65 + let cache = Cache::default(); 66 + 67 + let web_context = WebContext::new(pool.clone(), config.external_base.as_str(), feeds, cache.clone()); 64 68 65 69 let app = build_router(web_context.clone()); 66 70 ··· 134 138 let inner_token = token.clone(); 135 139 tracker.spawn(async move { 136 140 if let Err(err) = task.run_background(chrono::Duration::hours(4)).await { 141 + tracing::warn!(error = ?err, "consumer task error"); 142 + } 143 + inner_token.cancel(); 144 + }); 145 + } 146 + } 147 + { 148 + let inner_config = config.clone(); 149 + let task_enable = *inner_config.cache_task_enable.as_ref(); 150 + if task_enable { 151 + let task = CacheTask::new( 152 + pool.clone(), 153 + cache.clone(), 154 + inner_config.clone(), 155 + token.clone(), 156 + ); 157 + task.main().await?; 158 + let inner_token = token.clone(); 159 + let interval = *inner_config.cache_task_interval.as_ref(); 160 + tracker.spawn(async move { 161 + if let Err(err) = task.run_background(interval).await { 137 162 tracing::warn!(error = ?err, "consumer task error"); 138 163 } 139 164 inner_token.cancel();
+225
src/cache.rs
··· 1 + use anyhow::Result; 2 + use chrono::Utc; 3 + use std::{collections::HashMap, sync::Arc}; 4 + use tokio::sync::RwLock; 5 + use tokio_util::sync::CancellationToken; 6 + 7 + use crate::storage::{feed_content_cached, StoragePool}; 8 + 9 + pub(crate) struct InnerCache { 10 + pub(crate) page_size: u8, 11 + pub(crate) cached_feeds: HashMap<String, Vec<Vec<String>>>, 12 + } 13 + 14 + #[derive(Clone)] 15 + pub struct Cache { 16 + pub(crate) inner_cache: Arc<RwLock<InnerCache>>, 17 + } 18 + 19 + impl Default for InnerCache { 20 + fn default() -> Self { 21 + Self { 22 + page_size: 20, 23 + cached_feeds: HashMap::new(), 24 + } 25 + } 26 + } 27 + 28 + impl Default for Cache { 29 + fn default() -> Self { 30 + Self { 31 + inner_cache: Arc::new(RwLock::new(InnerCache::default())), 32 + } 33 + } 34 + } 35 + 36 + impl InnerCache { 37 + pub(crate) fn new(page_size: u8) -> Self { 38 + Self { 39 + page_size, 40 + cached_feeds: HashMap::new(), 41 + } 42 + } 43 + } 44 + 45 + impl Cache { 46 + pub(crate) fn new(page_size: u8) -> Self { 47 + Self { 48 + inner_cache: Arc::new(RwLock::new(InnerCache::new(page_size))), 49 + } 50 + } 51 + 52 + pub(crate) async fn get_posts(&self, feed_id: &str, page: usize) -> Option<Vec<String>> { 53 + let inner = self.inner_cache.read().await; 54 + 55 + let feed_chunks = inner.cached_feeds.get(feed_id)?; 56 + 57 + if page as usize > feed_chunks.len() { 58 + return None; 59 + } 60 + 61 + feed_chunks.get(page).cloned() 62 + } 63 + 64 + pub(crate) async fn update_feed(&self, feed_id: &str, posts: &Vec<String>) { 65 + let mut inner = self.inner_cache.write().await; 66 + 67 + let chunks = posts 68 + .chunks(inner.page_size.into()) 69 + .map(|chunk| chunk.to_vec()) 70 + .collect(); 71 + 72 + inner.cached_feeds.insert(feed_id.to_string(), chunks); 73 + } 74 + } 75 + 76 + pub struct CacheTask { 77 + pub(crate) pool: StoragePool, 78 + pub(crate) cache: Cache, 79 + pub(crate) config: crate::config::Config, 80 + 81 + pub(crate) cancellation_token: CancellationToken, 82 + } 83 + 84 + impl CacheTask { 85 + pub fn new( 86 + pool: StoragePool, 87 + cache: Cache, 88 + config: crate::config::Config, 89 + cancellation_token: CancellationToken, 90 + ) -> Self { 91 + Self { 92 + pool, 93 + cache, 94 + config, 95 + cancellation_token, 96 + } 97 + } 98 + 99 + pub async fn run_background(&self, interval: chrono::Duration) -> Result<()> { 100 + let interval = interval.to_std()?; 101 + 102 + let sleeper = tokio::time::sleep(interval); 103 + tokio::pin!(sleeper); 104 + 105 + loop { 106 + tokio::select! { 107 + () = self.cancellation_token.cancelled() => { 108 + break; 109 + }, 110 + () = &mut sleeper => { 111 + 112 + if let Err(err) = self.main().await { 113 + tracing::error!("CacheTask task failed: {}", err); 114 + } 115 + 116 + 117 + sleeper.as_mut().reset(tokio::time::Instant::now() + interval); 118 + } 119 + } 120 + } 121 + Ok(()) 122 + } 123 + 124 + pub async fn main(&self) -> Result<()> { 125 + for feed in &self.config.feeds.feeds { 126 + let query = feed.query.clone(); 127 + 128 + match query { 129 + crate::config::FeedQuery::Simple { limit } => { 130 + if let Err(err) = self.generate_simple(&feed.uri, *limit.as_ref()).await { 131 + tracing::error!(error = ?err, feed_uri = ?feed.uri, "failed to generate simple feed"); 132 + } 133 + } 134 + crate::config::FeedQuery::Popular { gravity, limit } => { 135 + if let Err(err) = self 136 + .generate_popular(&feed.uri, gravity, *limit.as_ref()) 137 + .await 138 + { 139 + 140 + tracing::error!(error = ?err, feed_uri = ?feed.uri, "failed to generate simple feed"); 141 + } 142 + } 143 + } 144 + } 145 + 146 + Ok(()) 147 + } 148 + 149 + async fn generate_simple(&self, feed_uri: &str, limit: u32) -> Result<()> { 150 + let posts = feed_content_cached(&self.pool, feed_uri, limit).await?; 151 + let posts = posts.iter().map(|post| post.uri.clone()).collect(); 152 + self.cache.update_feed(feed_uri, &posts).await; 153 + Ok(()) 154 + } 155 + 156 + async fn generate_popular(&self, feed_uri: &str, gravity: f64, limit: u32) -> Result<()> { 157 + let posts = feed_content_cached(&self.pool, feed_uri, limit).await?; 158 + 159 + let now = Utc::now().timestamp(); 160 + let mut scored_posts = posts 161 + .iter() 162 + .map(|post| { 163 + let age = post.age_in_hours(now); 164 + 165 + let score = ((post.score - 1).min(0) as f64) / ((2 + age) as f64).powf(gravity); 166 + 167 + (score, post.uri.clone(), age) 168 + }) 169 + .collect::<Vec<(f64, String, i64)>>(); 170 + 171 + scored_posts.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap()); 172 + 173 + println!("{:?}", scored_posts); 174 + 175 + let sorted_posts = scored_posts.iter().map(|post| post.1.clone()).collect(); 176 + 177 + self.cache.update_feed(feed_uri, &sorted_posts).await; 178 + Ok(()) 179 + } 180 + } 181 + 182 + #[cfg(test)] 183 + mod tests { 184 + 185 + use super::*; 186 + use anyhow::Result; 187 + 188 + #[tokio::test] 189 + async fn record_feed_content() -> Result<()> { 190 + let sorted_posts = (0..12) 191 + .map(|value| format!("at://did:not:real/post/{}", value)) 192 + .collect(); 193 + 194 + let cache = Cache::new(5); 195 + cache.update_feed("feed", &sorted_posts).await; 196 + 197 + assert_eq!( 198 + cache.get_posts("feed", 0).await, 199 + Some( 200 + (0..5) 201 + .map(|value| format!("at://did:not:real/post/{}", value)) 202 + .collect() 203 + ) 204 + ); 205 + assert_eq!( 206 + cache.get_posts("feed", 1).await, 207 + Some( 208 + (5..10) 209 + .map(|value| format!("at://did:not:real/post/{}", value)) 210 + .collect() 211 + ) 212 + ); 213 + assert_eq!( 214 + cache.get_posts("feed", 2).await, 215 + Some( 216 + (10..12) 217 + .map(|value| format!("at://did:not:real/post/{}", value)) 218 + .collect() 219 + ) 220 + ); 221 + assert_eq!(cache.get_posts("feed", 3).await, None); 222 + 223 + Ok(()) 224 + } 225 + }
+57 -7
src/config.rs
··· 4 4 use std::str::FromStr; 5 5 6 6 use anyhow::{anyhow, Result}; 7 + use chrono::Duration; 7 8 use serde::de::{self, MapAccess, Visitor}; 8 9 use serde::{Deserialize, Deserializer}; 9 10 ··· 13 14 } 14 15 15 16 #[derive(Clone, Debug, Deserialize)] 17 + pub struct FeedQueryLimit(pub u32); 18 + 19 + impl Default for FeedQueryLimit { 20 + fn default() -> Self { 21 + FeedQueryLimit(500) 22 + } 23 + } 24 + 25 + #[derive(Clone, Debug, Deserialize)] 16 26 #[serde(tag = "type")] 17 27 pub enum FeedQuery { 18 28 #[serde(rename = "simple")] 19 - Simple {}, 29 + Simple { 30 + #[serde(default)] 31 + limit: FeedQueryLimit, 32 + }, 20 33 21 34 #[serde(rename = "popular")] 22 35 Popular { 23 36 #[serde(default)] 24 - age_floor: i64, 37 + gravity: f64, 25 38 26 39 #[serde(default)] 27 - gravity: f64, 40 + limit: FeedQueryLimit, 28 41 }, 29 42 } 30 43 ··· 87 100 pub struct TaskEnable(bool); 88 101 89 102 #[derive(Clone)] 103 + pub struct TaskInterval(Duration); 104 + 105 + #[derive(Clone)] 90 106 pub struct Compression(bool); 91 107 92 108 #[derive(Clone)] ··· 100 116 pub database_url: String, 101 117 pub certificate_bundles: CertificateBundles, 102 118 pub consumer_task_enable: TaskEnable, 119 + pub cache_task_enable: TaskEnable, 120 + pub cache_task_interval: TaskInterval, 103 121 pub vmc_task_enable: TaskEnable, 104 122 pub plc_hostname: String, 105 123 pub user_agent: String, ··· 133 151 let consumer_task_enable: TaskEnable = 134 152 default_env("CONSUMER_TASK_ENABLE", "true").try_into()?; 135 153 154 + let cache_task_enable: TaskEnable = default_env("CACHE_TASK_ENABLE", "true").try_into()?; 155 + 156 + let cache_task_interval: TaskInterval = 157 + default_env("CACHE_TASK_INTERVAL", "3m").try_into()?; 158 + 136 159 let vmc_task_enable: TaskEnable = default_env("VMC_TASK_ENABLE", "true").try_into()?; 137 160 138 161 let plc_hostname = default_env("PLC_HOSTNAME", "plc.directory"); ··· 156 179 database_url, 157 180 certificate_bundles, 158 181 consumer_task_enable, 182 + cache_task_enable, 183 + cache_task_interval, 159 184 vmc_task_enable, 160 185 plc_hostname, 161 186 user_agent, ··· 247 272 } 248 273 } 249 274 275 + impl AsRef<Duration> for TaskInterval { 276 + fn as_ref(&self) -> &Duration { 277 + &self.0 278 + } 279 + } 280 + 281 + impl TryFrom<String> for TaskInterval { 282 + type Error = anyhow::Error; 283 + fn try_from(value: String) -> Result<Self, Self::Error> { 284 + let duration = duration_str::parse_chrono(&value) 285 + .map_err(|err| anyhow!(err).context("parsing task interval into duration failed"))?; 286 + Ok(Self(duration)) 287 + } 288 + } 289 + 250 290 impl AsRef<bool> for Compression { 251 291 fn as_ref(&self) -> &bool { 252 292 &self.0 ··· 302 342 303 343 impl Default for FeedQuery { 304 344 fn default() -> Self { 305 - FeedQuery::Simple {} 345 + FeedQuery::Simple { 346 + limit: FeedQueryLimit::default(), 347 + } 306 348 } 307 349 } 308 350 ··· 311 353 312 354 fn from_str(value: &str) -> Result<Self, Self::Err> { 313 355 match value { 314 - "simple" => Ok(FeedQuery::Simple {}), 356 + "simple" => Ok(FeedQuery::Simple { 357 + limit: FeedQueryLimit::default(), 358 + }), 315 359 "popular" => Ok(FeedQuery::Popular { 316 - age_floor: 0, 317 - gravity: 2.0, 360 + gravity: 1.8, 361 + limit: FeedQueryLimit::default(), 318 362 }), 319 363 _ => Err(anyhow!("unsupported query")), 320 364 } ··· 355 399 356 400 deserializer.deserialize_any(StringOrStruct(PhantomData)) 357 401 } 402 + 403 + impl AsRef<u32> for FeedQueryLimit { 404 + fn as_ref(&self) -> &u32 { 405 + &self.0 406 + } 407 + }
+4 -1
src/http/context.rs
··· 5 5 sync::Arc, 6 6 }; 7 7 8 - use crate::storage::StoragePool; 8 + use crate::{cache::Cache, storage::StoragePool}; 9 9 10 10 #[derive(Clone, Debug)] 11 11 pub(crate) struct FeedControl { ··· 17 17 pub(crate) pool: StoragePool, 18 18 pub(crate) external_base: String, 19 19 pub(crate) feeds: HashMap<String, FeedControl>, 20 + pub(crate) cache: Cache, 20 21 } 21 22 22 23 #[derive(Clone, FromRef)] ··· 35 36 pool: StoragePool, 36 37 external_base: &str, 37 38 feeds: HashMap<String, (Option<String>, HashSet<String>)>, 39 + cache: Cache, 38 40 ) -> Self { 39 41 let feeds = feeds 40 42 .into_iter() ··· 44 46 pool, 45 47 external_base: external_base.to_string(), 46 48 feeds, 49 + cache, 47 50 })) 48 51 } 49 52 }
+22 -15
src/http/handle_get_feed_skeleton.rs
··· 8 8 use serde_json::json; 9 9 10 10 use crate::errors::SupercellError; 11 - use crate::storage::feed_content_paginate; 12 11 use crate::storage::{verification_method_get, StoragePool}; 13 12 14 13 use crate::crypto::{validate, JwtClaims, JwtHeader}; ··· 104 103 } 105 104 } 106 105 107 - let parsed_cursor = parse_cursor(feed_params.cursor); 108 - let feed_items = feed_content_paginate( 109 - &web_context.pool, 110 - &feed_uri, 111 - feed_params.limit, 112 - parsed_cursor, 113 - ) 114 - .await?; 106 + let parsed_cursor = parse_cursor(feed_params.cursor).map(|value| value.clamp(0, 10000)).unwrap_or(0) as usize; 115 107 116 - let cursor = feed_items 117 - .iter() 118 - .last() 119 - .map(|last_feed_item| last_feed_item.indexed_at.to_string()); 108 + let posts = web_context.cache.get_posts(&feed_uri, parsed_cursor).await; 120 109 121 - let feed_item_views = feed_items 110 + if posts.is_none() { 111 + return Ok(( 112 + StatusCode::BAD_REQUEST, 113 + Json(json!({ 114 + "error": "UnknownFeed", 115 + "message": "unknown feed", 116 + })), 117 + ) 118 + .into_response()); 119 + } 120 + let posts = posts.unwrap(); 121 + 122 + let cursor = if posts.len() != 0 { 123 + Some((parsed_cursor + 1).to_string()) 124 + } else { 125 + Some(parsed_cursor.to_string()) 126 + }; 127 + 128 + let feed_item_views = posts 122 129 .iter() 123 130 .map(|feed_item| FeedItemView { 124 - post: feed_item.uri.clone(), 131 + post: feed_item.clone(), 125 132 }) 126 133 .collect::<Vec<_>>(); 127 134
+1
src/lib.rs
··· 1 + pub mod cache; 1 2 pub mod config; 2 3 pub mod consumer; 3 4 pub mod crypto;
+35 -1
src/storage.rs
··· 7 7 pub type StoragePool = Pool<Sqlite>; 8 8 9 9 pub mod model { 10 + use chrono::{DateTime, SubsecRound}; 10 11 use serde::Serialize; 11 12 use sqlx::prelude::FromRow; 12 13 ··· 16 17 pub uri: String, 17 18 pub indexed_at: i64, 18 19 pub score: i32, 20 + } 21 + 22 + impl FeedContent { 23 + pub(crate) fn age_in_hours(&self, now: i64) -> i64 { 24 + let target = DateTime::from_timestamp_micros(self.indexed_at) 25 + .map(|value| value.trunc_subsecs(0).timestamp()); 26 + if target.is_none() { 27 + return 1; 28 + } 29 + let target = target.unwrap(); 30 + let diff_seconds = now - target; 31 + std::cmp::max((diff_seconds / (60 * 60)) + 1, 1) 32 + } 19 33 } 20 34 } 21 35 ··· 132 146 Ok(results) 133 147 } 134 148 149 + pub async fn feed_content_cached( 150 + pool: &StoragePool, 151 + feed_uri: &str, 152 + limit: u32, 153 + ) -> Result<Vec<FeedContent>> { 154 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 155 + 156 + let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?"; 157 + 158 + let results = sqlx::query_as::<_, FeedContent>(query) 159 + .bind(feed_uri) 160 + .bind(limit) 161 + .fetch_all(tx.as_mut()) 162 + .await?; 163 + 164 + tx.commit().await.context("failed to commit transaction")?; 165 + 166 + Ok(results) 167 + } 168 + 135 169 pub async fn consumer_control_insert(pool: &StoragePool, source: &str, time_us: i64) -> Result<()> { 136 170 let mut tx = pool.begin().await.context("failed to begin transaction")?; 137 171 ··· 244 278 indexed_at: 1730673934229172_i64, 245 279 score: 1, 246 280 }; 247 - super::feed_content_insert(&pool, &record) 281 + super::feed_content_upsert(&pool, &record) 248 282 .await 249 283 .expect("failed to insert record"); 250 284