this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feature: cache did verification methods with plc lookups

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+187 -3
+1
.gitignore
··· 20 20 21 21 # release hooks 22 22 create-release-*.sh 23 + .tmp.release_info
+1
Cargo.toml
··· 37 37 tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono", "json"] } 38 38 tracing = { version = "0.1.40", features = ["async-await", "log", "valuable"] } 39 39 zstd = "0.13.2" 40 + reqwest = { version = "0.12.9", features = ["json", "zstd", "rustls-tls"] }
+40
src/bin/supercell.rs
··· 3 3 use std::collections::HashMap; 4 4 use std::collections::HashSet; 5 5 use std::env; 6 + use supercell::vmc::VerificationMethodCacheTask; 6 7 use tokio::net::TcpListener; 7 8 use tokio::signal; 8 9 use tokio_util::{sync::CancellationToken, task::TaskTracker}; ··· 33 34 34 35 let config = supercell::config::Config::new()?; 35 36 37 + let mut client_builder = reqwest::Client::builder(); 38 + for ca_certificate in config.certificate_bundles.as_ref() { 39 + tracing::info!("Loading CA certificate: {:?}", ca_certificate); 40 + let cert = std::fs::read(ca_certificate)?; 41 + let cert = reqwest::Certificate::from_pem(&cert)?; 42 + client_builder = client_builder.add_root_certificate(cert); 43 + } 44 + 45 + client_builder = client_builder.user_agent(config.user_agent.clone()); 46 + let http_client = client_builder.build()?; 47 + 36 48 let pool = SqlitePool::connect(&config.database_url).await?; 37 49 sqlx::migrate!().run(&pool).await?; 38 50 ··· 42 54 .iter() 43 55 .map(|feed| (feed.uri.clone(), (feed.deny.clone(), feed.allow.clone()))) 44 56 .collect(); 57 + 58 + let all_dids = feeds 59 + .iter() 60 + .flat_map(|(_, (_, allow))| allow.iter().cloned()) 61 + .collect::<HashSet<String>>(); 45 62 46 63 let web_context = WebContext::new(pool.clone(), config.external_base.as_str(), feeds); 47 64 ··· 84 101 let task_enable = *inner_config.consumer_task_enable.as_ref(); 85 102 if task_enable { 86 103 let consumer_task_config = ConsumerTaskConfig { 104 + user_agent: inner_config.user_agent.clone(), 87 105 zstd_dictionary_location: inner_config.zstd_dictionary.clone(), 88 106 jetstream_hostname: inner_config.jetstream_hostname.clone(), 89 107 feeds: inner_config.feeds.clone(), ··· 92 110 let inner_token = token.clone(); 93 111 tracker.spawn(async move { 94 112 if let Err(err) = task.run_background().await { 113 + tracing::warn!(error = ?err, "consumer task error"); 114 + } 115 + inner_token.cancel(); 116 + }); 117 + } 118 + } 119 + 120 + { 121 + let inner_config = config.clone(); 122 + let task_enable = *inner_config.vmc_task_enable.as_ref(); 123 + if task_enable { 124 + let task = VerificationMethodCacheTask::new( 125 + pool.clone(), 126 + http_client, 127 + inner_config.plc_hostname.clone(), 128 + all_dids, 129 + token.clone(), 130 + ); 131 + task.main().await?; 132 + let inner_token = token.clone(); 133 + tracker.spawn(async move { 134 + if let Err(err) = task.run_background(chrono::Duration::hours(4)).await { 95 135 tracing::warn!(error = ?err, "consumer task error"); 96 136 } 97 137 inner_token.cancel();
+10 -2
src/config.rs
··· 48 48 pub database_url: String, 49 49 pub certificate_bundles: CertificateBundles, 50 50 pub consumer_task_enable: TaskEnable, 51 + pub vmc_task_enable: TaskEnable, 52 + pub plc_hostname: String, 51 53 pub user_agent: String, 52 54 pub zstd_dictionary: String, 53 55 pub jetstream_hostname: String, ··· 56 58 57 59 impl Config { 58 60 pub fn new() -> Result<Self> { 59 - let http_port: HttpPort = default_env("HTTP_PORT", "4040").try_into()?; 61 + let http_port: HttpPort = default_env("HTTP_PORT", "4050").try_into()?; 60 62 let external_base = require_env("EXTERNAL_BASE")?; 61 63 62 64 let database_url = default_env("DATABASE_URL", "sqlite://development.db"); ··· 68 70 let zstd_dictionary = require_env("ZSTD_DICTIONARY")?; 69 71 70 72 let consumer_task_enable: TaskEnable = 71 - default_env("CONSUMER_TASK_ENABLE", "false").try_into()?; 73 + default_env("CONSUMER_TASK_ENABLE", "true").try_into()?; 74 + 75 + let vmc_task_enable: TaskEnable = default_env("VMC_TASK_ENABLE", "true").try_into()?; 76 + 77 + let plc_hostname = default_env("PLC_HOSTNAME", "plc.directory"); 72 78 73 79 let default_user_agent = format!( 74 80 "supercell ({}; +https://github.com/astrenoxcoop/supercell)", ··· 86 92 database_url, 87 93 certificate_bundles, 88 94 consumer_task_enable, 95 + vmc_task_enable, 96 + plc_hostname, 89 97 user_agent, 90 98 jetstream_hostname, 91 99 zstd_dictionary,
+6
src/consumer.rs
··· 3 3 use anyhow::{Context, Result}; 4 4 use futures_util::SinkExt; 5 5 use futures_util::StreamExt; 6 + use http::HeaderValue; 6 7 use http::Uri; 7 8 use tokio::time::{sleep, Instant}; 8 9 use tokio_util::sync::CancellationToken; ··· 20 21 21 22 #[derive(Clone)] 22 23 pub struct ConsumerTaskConfig { 24 + pub user_agent: String, 23 25 pub zstd_dictionary_location: String, 24 26 pub jetstream_hostname: String, 25 27 pub feeds: config::Feeds, ··· 65 67 .context("invalid jetstream URL")?; 66 68 67 69 let (mut client, _) = ClientBuilder::from_uri(uri) 70 + .add_header( 71 + http::header::USER_AGENT, 72 + HeaderValue::from_str(&self.config.user_agent)?, 73 + ) 68 74 .connect() 69 75 .await 70 76 .map_err(|err| anyhow::Error::new(err).context("cannot connect to jetstream"))?;
+1
src/lib.rs
··· 5 5 pub mod http; 6 6 pub mod matcher; 7 7 pub mod storage; 8 + pub mod vmc;
+15 -1
src/storage.rs
··· 1 1 use anyhow::{Context, Result}; 2 - use chrono::prelude::*; 2 + use chrono::{prelude::*, Duration}; 3 3 use sqlx::{Pool, Sqlite}; 4 4 5 5 use model::FeedContent; ··· 168 168 .bind(now) 169 169 .execute(tx.as_mut()) 170 170 .await.context("failed to update verification method cache")?; 171 + 172 + tx.commit().await.context("failed to commit transaction") 173 + } 174 + 175 + pub async fn verification_method_cleanup(pool: &StoragePool) -> Result<()> { 176 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 177 + 178 + let now = Utc::now(); 179 + let seven_days_ago = now - Duration::days(7); 180 + sqlx::query("DELETE FROM verification_method_cache WHERE updated_at < ?") 181 + .bind(seven_days_ago) 182 + .execute(tx.as_mut()) 183 + .await 184 + .context("failed to delete old verification method cache records")?; 171 185 172 186 tx.commit().await.context("failed to commit transaction") 173 187 }
+113
src/vmc.rs
··· 1 + use std::collections::HashSet; 2 + 3 + use anyhow::{anyhow, Result}; 4 + use chrono::Duration; 5 + use serde::Deserialize; 6 + use tokio::time::{sleep, Instant}; 7 + use tokio_util::sync::CancellationToken; 8 + 9 + use crate::storage::{verifcation_method_insert, verification_method_cleanup, StoragePool}; 10 + 11 + #[derive(Deserialize)] 12 + struct VerificationMethod { 13 + #[serde(rename = "publicKeyMultibase")] 14 + public_key_multibase: String, 15 + } 16 + 17 + #[derive(Deserialize)] 18 + struct ResolvedPlcDid { 19 + id: String, 20 + #[serde(rename = "verificationMethod")] 21 + verification_method: Vec<VerificationMethod>, 22 + } 23 + 24 + pub struct VerificationMethodCacheTask { 25 + pool: StoragePool, 26 + http_client: reqwest::Client, 27 + plc_hostname: String, 28 + dids: HashSet<String>, 29 + cancellation_token: CancellationToken, 30 + } 31 + 32 + impl VerificationMethodCacheTask { 33 + pub fn new( 34 + pool: StoragePool, 35 + http_client: reqwest::Client, 36 + plc_hostname: String, 37 + dids: HashSet<String>, 38 + cancellation_token: CancellationToken, 39 + ) -> Self { 40 + Self { 41 + pool, 42 + http_client, 43 + plc_hostname, 44 + dids, 45 + cancellation_token, 46 + } 47 + } 48 + 49 + pub async fn run_background(&self, interval: Duration) -> Result<()> { 50 + let interval = interval.to_std()?; 51 + 52 + let sleeper = sleep(interval); 53 + tokio::pin!(sleeper); 54 + 55 + loop { 56 + tokio::select! { 57 + () = self.cancellation_token.cancelled() => { 58 + break; 59 + }, 60 + () = &mut sleeper => { 61 + 62 + if let Err(err) = self.main().await { 63 + tracing::error!("StatsTask task failed: {}", err); 64 + } 65 + 66 + 67 + sleeper.as_mut().reset(Instant::now() + interval); 68 + } 69 + } 70 + } 71 + Ok(()) 72 + } 73 + 74 + pub async fn main(&self) -> Result<()> { 75 + for did in &self.dids { 76 + let query_response = self.plc_query(did).await; 77 + if let Err(err) = query_response { 78 + tracing::error!(error = ?err, "Failed to query PLC for DID: {}", did); 79 + continue; 80 + } 81 + let key = query_response.unwrap(); 82 + 83 + verifcation_method_insert(&self.pool, did, &key).await?; 84 + } 85 + 86 + verification_method_cleanup(&self.pool).await?; 87 + Ok(()) 88 + } 89 + 90 + async fn plc_query(&self, did: &str) -> Result<String> { 91 + let url = format!("https://{}/{}", self.plc_hostname, did); 92 + 93 + let resolved_did: ResolvedPlcDid = self 94 + .http_client 95 + .get(url) 96 + .timeout(Duration::seconds(10).to_std()?) 97 + .send() 98 + .await? 99 + .json() 100 + .await?; 101 + 102 + if resolved_did.id != did { 103 + return Err(anyhow!("DID mismatch")); 104 + } 105 + 106 + let key = resolved_did 107 + .verification_method 108 + .first() 109 + .map(|value| value.public_key_multibase.clone()); 110 + 111 + key.ok_or(anyhow!("No key found")) 112 + } 113 + }