this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

overhaul commit.rs

move to using `ConfigValue` which acts like a `LazyLock`
but supports async initializers (mostly)

also rename INDEX_USER to USER_DID and add support for
overriding the PDS url, repo export url, and (later)
the subscribe repos url

+226 -105
+8
.env.template
··· 1 + # required 2 + USER_DID=did:plc:4zht3z4caxwrw3dlsybodywc # did 3 + DATABASE_URL=postgres://admin@localhost:5432/meview # postgres uri 4 + 5 + # optional 6 + USER_PDS_URL=katproto.girlonthemoon.xyz # if ommited, will resolve from did 7 + USER_EXPORT_URL=katproto.girlonthemoon.xyz # if ommited, will copy USER_PDS_URL 8 + USER_SUBSCRIBE_URL=katproto.girlonthemoon.xyz # if ommited, will copy USER_PDS_URL
+5 -5
src/backfill/mod.rs
··· 6 6 //! 4. convert cbor data to json 7 7 //! 5. store in db (limit to DB_MAX_REQ / 4 to avoid err) 8 8 9 - use std::str::FromStr; 10 - 11 9 use ipld_core::cid::multibase::Base; 12 10 use jacquard::url::Url; 13 11 use sqlx::{Pool, Postgres, query}; ··· 36 34 } 37 35 38 36 pub async fn backfill( 39 - pds: &str, 40 37 conn: &Pool<Postgres>, 41 38 time: Option<std::time::Instant>, 42 39 ) -> Result<(), Error> { 43 - let pds = Url::from_str(&format!("https://{pds}/")).unwrap(); 44 - let car = load_car(config::USER.clone(), pds).await?; 40 + let car = load_car( 41 + config::USER_DID.clone(), 42 + Url::parse(&format!("https://{}/", config::USER_EXPORT_URL)).unwrap(), 43 + ) 44 + .await?; 45 45 46 46 if let Some(time) = time { 47 47 println!("Downloaded car file ({:?})", time.elapsed());
-62
src/config.rs
··· 1 - //! get static and parsed environment variables 2 - //! 3 - //! USER is from env variable USER and parsed into a jacquard Did 4 - //! POSTGRES_URL is from POSTGRES_USER, POSTGRES_PASSWORD, and POSTGRES_HOST 5 - 6 - use jacquard::types::string::Did; 7 - use std::env; 8 - use std::sync::LazyLock; 9 - 10 - pub const DB_MAX_REQ: usize = 65535; 11 - 12 - // this should be loaded before the program starts any threads 13 - // if this panics threads that access it will be poisoned 14 - pub static USER: LazyLock<Did<'static>> = LazyLock::new(|| { 15 - let Ok(env) = env::var("INDEX_USER") else { 16 - panic!("INDEX_USER not set"); 17 - }; 18 - 19 - let Ok(did) = Did::new_owned(env) else { 20 - panic!("INDEX_USER was not a valid did") 21 - }; 22 - 23 - did 24 - }); 25 - 26 - pub static POSTGRES_URL: LazyLock<String> = LazyLock::new(|| { 27 - if let Ok(url) = env::var("DATABASE_URL") { 28 - return url; 29 - } 30 - 31 - let user = env::var("POSTGRES_USER"); 32 - let db = env::var("POSTGRES_DATABASE").or_else(|_| user.clone()); 33 - let password = env::var("POSTGRES_PASSWORD"); 34 - let host = env::var("POSTGRES_HOST"); 35 - 36 - if let Ok(user) = user.clone() 37 - && let Ok(db) = db.clone() 38 - && let Ok(password) = password.clone() 39 - && let Ok(host) = host.clone() 40 - { 41 - format!("postgres://{}:{}@{}/{}", user, password, host, db) 42 - } else { 43 - let missing = [ 44 - (user, "USER"), 45 - (db, "DATABASE"), 46 - (password, "PASSWORD"), 47 - (host, "HOST"), 48 - ] 49 - .iter() 50 - .filter_map(|x| { 51 - if x.0.is_err() { 52 - Some(String::from("POSTGRES_") + x.1) 53 - } else { 54 - None 55 - } 56 - }) 57 - .collect::<Vec<String>>() 58 - .join(", "); 59 - 60 - panic!("Could not generate database url. Missing environment variables {}. Set DATABASE_URL to define the postgres url manually", missing); 61 - } 62 - });
+96
src/config/config_value.rs
··· 1 + //! Define `ConfigValue` & helpers 2 + 3 + use std::fmt::Display; 4 + use std::ops::Deref; 5 + use std::pin::Pin; 6 + use std::sync::OnceLock; 7 + 8 + /// A struct which acts like a `OnceLock` but with async support 9 + pub struct ConfigValue<T> { 10 + inner: Inner<T>, 11 + } 12 + 13 + enum Inner<T> { 14 + Sync { 15 + lock: OnceLock<T>, 16 + func: fn() -> T, 17 + }, 18 + Async { 19 + lock: OnceLock<T>, 20 + func: fn() -> Pin<Box<dyn Future<Output = T>>>, 21 + }, 22 + } 23 + 24 + impl<T> ConfigValue<T> { 25 + /// create a new synchronous ConfigValue 26 + /// 27 + /// this is the same as a OnceLock basically 28 + pub const fn new_sync(func: fn() -> T) -> ConfigValue<T> { 29 + ConfigValue { 30 + inner: Inner::Sync { 31 + lock: OnceLock::new(), 32 + func, 33 + }, 34 + } 35 + } 36 + 37 + /// create a new async based ConfigValue 38 + pub const fn new_async(func: fn() -> Pin<Box<dyn Future<Output = T>>>) -> ConfigValue<T> { 39 + ConfigValue { 40 + inner: Inner::Async { 41 + lock: OnceLock::new(), 42 + func, 43 + }, 44 + } 45 + } 46 + 47 + /// get the value. if the value is uninitialized, initialize it 48 + pub async fn get(&self) -> &T { 49 + match &self.inner { 50 + Inner::Sync { lock, func } => lock.get_or_init(func), 51 + Inner::Async { lock, func } => { 52 + if let Some(val) = lock.get() { 53 + return val; 54 + } 55 + let res = func().await; 56 + let _ = lock.set(res); 57 + lock.wait() 58 + } 59 + } 60 + } 61 + 62 + /// same as `get` but discards the result 63 + /// this should be called in main to make sure that values are properly initialized before deref 64 + pub async fn init(&self) { 65 + let _ = self.get().await; 66 + } 67 + } 68 + 69 + impl<T> Deref for ConfigValue<T> { 70 + type Target = T; 71 + fn deref(&self) -> &Self::Target { 72 + match &self.inner { 73 + Inner::Sync { lock, func: _ } => lock.wait(), 74 + Inner::Async { lock, func: _ } => lock.wait(), 75 + } 76 + } 77 + } 78 + 79 + impl<T> Display for ConfigValue<T> 80 + where 81 + T: Display, 82 + { 83 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 84 + self.deref().fmt(f) 85 + } 86 + } 87 + 88 + #[macro_export] 89 + /// initializes all values passed in. useful to batch init without tons of .init values 90 + macro_rules! init { 91 + ( $( $x:expr ),* ) => { 92 + $( 93 + $x.init().await; 94 + )* 95 + }; 96 + }
+95
src/config/mod.rs
··· 1 + //! get static and parsed environment variables 2 + //! 3 + //! USER_DID is parsed into a jacquard Did 4 + //! DATABASE_URL is from DATABASE_URL or POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, and POSTGRES_DATABASE 5 + //! USER_PDS_URL is resolved from USER_DID if ommited 6 + //! USER_EXPORT_URL falls back to USER_PDS_URL 7 + //! USER_SUBSCRIBE_URL falls back to USER_PDS_URL 8 + 9 + use jacquard::prelude::IdentityResolver; 10 + use jacquard::types::string::Did; 11 + use std::env; 12 + 13 + pub const DB_MAX_REQ: usize = 65535; 14 + 15 + mod config_value; 16 + 17 + use crate::config::config_value::ConfigValue; 18 + 19 + // this should be loaded before the program starts any threads 20 + // if this panics threads that access it will be poisoned 21 + pub static USER_DID: ConfigValue<Did<'static>> = ConfigValue::new_sync(|| { 22 + let Ok(env) = env::var("USER_DID") else { 23 + panic!("USER_DID not set"); 24 + }; 25 + 26 + let Ok(did) = Did::new_owned(env) else { 27 + panic!("USER_DID was not a valid did") 28 + }; 29 + 30 + did 31 + }); 32 + 33 + pub static DATABASE_URL: ConfigValue<String> = ConfigValue::new_sync(|| { 34 + if let Ok(url) = env::var("DATABASE_URL") { 35 + return url; 36 + } 37 + 38 + let user = env::var("POSTGRES_USER"); 39 + let db = env::var("POSTGRES_DATABASE").or_else(|_| user.clone()); 40 + let password = env::var("POSTGRES_PASSWORD"); 41 + let host = env::var("POSTGRES_HOST"); 42 + 43 + if let Ok(user) = user.clone() 44 + && let Ok(db) = db.clone() 45 + && let Ok(password) = password.clone() 46 + && let Ok(host) = host.clone() 47 + { 48 + format!("postgres://{}:{}@{}/{}", user, password, host, db) 49 + } else { 50 + let missing = [ 51 + (user, "USER"), 52 + (db, "DATABASE"), 53 + (password, "PASSWORD"), 54 + (host, "HOST"), 55 + ] 56 + .iter() 57 + .filter_map(|x| { 58 + if x.0.is_err() { 59 + Some(String::from("POSTGRES_") + x.1) 60 + } else { 61 + None 62 + } 63 + }) 64 + .collect::<Vec<String>>() 65 + .join(", "); 66 + 67 + panic!( 68 + "Could not generate database url. Missing environment variables {}. Set DATABASE_URL to define the postgres url manually", 69 + missing 70 + ); 71 + } 72 + }); 73 + 74 + pub static USER_PDS_URL: ConfigValue<String> = ConfigValue::new_async(|| { 75 + Box::pin(async { 76 + if let Ok(url) = env::var("USER_PDS_URL") { 77 + url 78 + } else { 79 + let resolver = jacquard::identity::PublicResolver::default(); 80 + resolver 81 + .pds_for_did(&self::USER_DID) 82 + .await 83 + .unwrap() 84 + .domain() 85 + .unwrap() 86 + .to_string() 87 + } 88 + }) 89 + }); 90 + 91 + pub static USER_EXPORT_URL: ConfigValue<String> = 92 + ConfigValue::new_sync(|| env::var("USER_EXPORT_URL").unwrap_or(USER_PDS_URL.clone())); 93 + 94 + pub static USER_SUBSCRIBE_URL: ConfigValue<String> = 95 + ConfigValue::new_sync(|| env::var("USER_SUBSCRIBE_URL").unwrap_or(USER_PDS_URL.clone()));
+1 -1
src/db.rs
··· 4 4 use sqlx::{Pool, Postgres, migrate, postgres::PgPool}; 5 5 6 6 pub async fn conn() -> Pool<Postgres> { 7 - let conn = match PgPool::connect(&config::POSTGRES_URL).await { 7 + let conn = match PgPool::connect(&config::DATABASE_URL).await { 8 8 Ok(val) => val, 9 9 Err(err) => { 10 10 panic!("Could not connect to the database. Got error {err}");
+21 -7
src/main.rs
··· 13 13 #[tokio::main] 14 14 async fn main() -> Result<(), Error> { 15 15 env_logger::init(); 16 - println!("User: {}", *config::USER); 16 + init![ 17 + config::USER_DID, 18 + config::USER_PDS_URL, 19 + config::USER_EXPORT_URL, 20 + config::USER_SUBSCRIBE_URL, 21 + config::DATABASE_URL 22 + ]; 23 + println!( 24 + "Starting meview: 25 + User: {} 26 + PDS URL: {} 27 + Repo Export URL: {} 28 + Subscribe Repos URL: {} 29 + Database: {}", 30 + config::USER_DID.get().await, 31 + config::USER_PDS_URL.get().await, 32 + config::USER_EXPORT_URL.get().await, 33 + config::USER_SUBSCRIBE_URL.get().await, 34 + config::DATABASE_URL.get().await 35 + ); 17 36 let conn: Pool<Postgres> = db::conn().await; 18 37 println!("Database connected and initialized"); 19 38 20 - let pds = utils::resolver::resolve(&config::USER) 21 - .await 22 - .unwrap_or_else(|err| panic!("{}", err)); 23 - 24 39 println!("Starting backfill"); 25 40 let timer = std::time::Instant::now(); 26 - 27 - backfill(&pds, &conn, Some(timer)) 41 + backfill(&conn, Some(timer)) 28 42 .await 29 43 .unwrap_or_else(|err| panic!("{}", err)); 30 44
-1
src/utils/mod.rs
··· 3 3 //! see sub modules for more details 4 4 5 5 pub mod ipld_json; 6 - pub mod resolver;
-29
src/utils/resolver.rs
··· 1 - //! resolve a Did to a pds domain 2 - 3 - use jacquard::prelude::IdentityResolver; 4 - use jacquard::types::did::Did; 5 - use thiserror::Error; 6 - 7 - #[derive(Debug, Error)] 8 - pub enum Error { 9 - #[error("Identity error: {}", .0)] 10 - IdentityError(Box<jacquard::identity::resolver::IdentityError>), 11 - #[error("Missing domain")] 12 - MissingDomain, 13 - } 14 - 15 - impl From<jacquard::identity::resolver::IdentityError> for Error { 16 - fn from(value: jacquard::identity::resolver::IdentityError) -> Self { 17 - Self::IdentityError(Box::new(value)) 18 - } 19 - } 20 - 21 - pub async fn resolve(did: &Did<'_>) -> Result<String, Error> { 22 - // resolve did to pds 23 - let resolver = jacquard::identity::PublicResolver::default(); 24 - let pds = resolver.pds_for_did(did).await?; 25 - let Some(pds) = pds.domain() else { 26 - return Err(Error::MissingDomain); 27 - }; 28 - Ok(String::from(pds)) 29 - }