For now? I'm experimenting on an old concept.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Migrate to SQLX: database.rs done.

+59 -54
+5
.idea/inspectionProfiles/Project_Default.xml
··· 1 1 <component name="InspectionProjectProfileManager"> 2 2 <profile version="1.0"> 3 3 <option name="myName" value="Project Default" /> 4 + <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true"> 5 + <Languages> 6 + <language minSize="58" name="Rust" /> 7 + </Languages> 8 + </inspection_tool> 4 9 <inspection_tool class="RsUnusedImport" enabled="false" level="WARNING" enabled_by_default="false" /> 5 10 <inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false"> 6 11 <option name="processCode" value="true" />
+4
Cargo.lock
··· 2270 2270 "sha2", 2271 2271 "smallvec", 2272 2272 "thiserror 2.0.18", 2273 + "time", 2273 2274 "tokio", 2274 2275 "tokio-stream", 2275 2276 "tracing", ··· 2353 2354 "sqlx-core", 2354 2355 "stringprep", 2355 2356 "thiserror 2.0.18", 2357 + "time", 2356 2358 "tracing", 2357 2359 "uuid", 2358 2360 "whoami", ··· 2391 2393 "sqlx-core", 2392 2394 "stringprep", 2393 2395 "thiserror 2.0.18", 2396 + "time", 2394 2397 "tracing", 2395 2398 "uuid", 2396 2399 "whoami", ··· 2416 2419 "serde_urlencoded", 2417 2420 "sqlx-core", 2418 2421 "thiserror 2.0.18", 2422 + "time", 2419 2423 "tracing", 2420 2424 "url", 2421 2425 "uuid",
+2 -2
flake.nix
··· 47 47 # nodePackages.prettier 48 48 sqlx-cli 49 49 50 - # Helpers on OS level 51 - pkg-config 50 + # Pkg config 51 + pkg-config-unwrapped 52 52 53 53 # Podman 54 54 podman
+1
server/Cargo.toml
··· 20 20 "postgres", 21 21 "runtime-tokio", 22 22 "tls-native-tls", 23 + "time", 23 24 "uuid", 24 25 ] } 25 26 anyhow = "1.0.101"
+40 -40
server/src/database.rs
··· 29 29 use cynthia_con::{CynthiaColors, CynthiaStyles}; 30 30 use sqlx::postgres::PgPool; 31 31 use std::time::Duration; 32 + use sqlx::{Postgres}; 32 33 33 34 struct DatabaseConfig { 34 35 postgres_username: String, ··· 153 154 pg_config.postgres_port, 154 155 pg_config.postgres_dbname 155 156 ); 156 - let pg_pool = PgPool::connect(uri.as_str()).await?; 157 + let pg_pool: sqlx::Pool<Postgres> = PgPool::connect(uri.as_str()).await?; 157 158 { 158 159 // This is where previously the database schema was created if it did not exist, but now 159 160 // we use sqlx and let it do that :) ··· 222 223 } 223 224 224 225 impl DatabaseConnections for DbConn { 225 - /// Recreate the database connection. 226 - /// This clones the pools - bb8 pools are cheap to clone as they share the underlying connections. 227 - // This function converts a generic DbConn to the more concrete PgConn type. 228 - async fn recreate(&self) -> PgConn { 229 - PgConn { 230 - postgres_pool: self.get_postgres_pool(), 231 - redis_pool: self.get_redis_pool(), 232 - } 233 - } 234 - 235 226 fn get_redis_pool(&self) -> Pool<RedisConnectionManager> { 236 227 match self { 237 228 DbConn::PgsqlConnection(_, redis_pool) => redis_pool.clone(), 238 229 } 239 230 } 231 + 240 232 fn get_postgres_pool(&self) -> PgPool { 241 233 match self { 242 - DbConn::PgsqlConnection(pg_pool, _) => return pg_pool.clone(), 234 + DbConn::PgsqlConnection(pg_pool, _) => pg_pool.clone(), 235 + } 236 + } 237 + /// Recreate the database connection. 238 + /// This clones the pools - bb8 pools are cheap to clone as they share the underlying connections. 239 + // This function converts a generic DbConn to the more concrete PgConn type. 240 + async fn recreate(&self) -> PgConn { 241 + PgConn { 242 + postgres_pool: self.get_postgres_pool(), 243 + redis_pool: self.get_redis_pool(), 243 244 } 244 245 } 245 246 } ··· 289 290 let db = DbConn::from(db); 290 291 match db { 291 292 DbConn::PgsqlConnection(pg_pool, redis_pool) => { 292 - let mut session_interval = tokio::time::interval(std::time::Duration::from_secs(60)); 293 - let mut cache_interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes 293 + let mut session_interval = tokio::time::interval(Duration::from_secs(60)); 294 + let mut cache_interval = tokio::time::interval(Duration::from_secs(300)); // 5 minutes 294 295 295 296 loop { 296 297 tokio::select! { 297 298 _ = session_interval.tick() => { 298 299 // Delete any sessions older than 20 days 299 - if let Ok(client) = pg_pool.get().await { 300 - let _ = client 301 - .execute( 302 - "DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'", 303 - &[], 304 - ) 305 - .await; 306 - } 300 + match sqlx::query!("DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'").execute(&pg_pool).await { 301 + Ok(_) => (), 302 + Err(err) => { 303 + error!("Failed to delete session: {}", err); 304 + } 305 + }; 306 + 307 307 } 308 308 _ = cache_interval.tick() => { 309 309 // Clean up expired timeline caches and manage cache invalidation 310 310 if let Ok(mut redis_conn) = redis_pool.get().await { 311 311 let _ = cleanup_timeline_caches(&mut redis_conn).await; 312 - if let Ok(pg_conn) = pg_pool.get().await { 313 - let _ = check_timeline_invalidations(&mut redis_conn, &pg_conn).await; 314 - } 312 + 313 + let _ = check_timeline_invalidations(&mut redis_conn, &pg_pool).await; 315 314 } 316 315 } 317 316 } ··· 369 368 // Check for timeline changes and invalidate caches accordingly (PostgreSQL) 370 369 async fn check_timeline_invalidations( 371 370 redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>, 372 - client: &bb8::PooledConnection<'_, PostgresConnectionManager<NoTls>>, 371 + pg_pool: &PgPool, 373 372 ) -> Result<(), LuminaError> { 374 373 // Get the last check timestamp 375 - let last_check: Option<String> = redis::cmd("GET") 374 + let last_check = redis::cmd("GET") 376 375 .arg("timeline_cache_last_check") 377 376 .query_async(&mut **redis_conn) 378 377 .await 379 - .unwrap_or(None); 378 + .unwrap_or(None) 379 + .map(|a: String| time::OffsetDateTime::parse(a.as_str(), &time::format_description::well_known::Rfc3339)); 380 380 381 - let query = if let Some(timestamp) = last_check { 382 - client 383 - .query( 384 - "SELECT DISTINCT tlid FROM timelines WHERE timestamp > $1", 385 - &[&timestamp], 381 + let query = if let Some(Ok(timestamp)) = last_check { 382 + sqlx::query!("SELECT DISTINCT tlid FROM timelines WHERE timestamp > $1", timestamp) 383 + .fetch_all( 384 + pg_pool, 386 385 ) 387 386 .await 387 + } else if let Some(Err(_)) = last_check { 388 + panic!("timeline_cache_last_check returned an error, this means there's probably been tampering with the Redis DB."); 388 389 } else { 389 390 // First run, don't invalidate anything 390 391 let _: () = redis::cmd("SET") ··· 400 401 }; 401 402 402 403 match query { 403 - Ok(rows) => { 404 - for row in rows { 405 - let timeline_id: String = row.get(0); 406 - let _ = timeline::invalidate_timeline_cache(redis_conn, &timeline_id).await; 404 + Ok(timelines) => { 405 + for timeline in timelines { 406 + 407 + let _ = timeline::invalidate_timeline_cache(redis_conn, timeline.tlid).await; 407 408 } 408 409 409 410 // Update last check timestamp ··· 436 437 437 438 mod operations { 438 439 use super::*; 439 - use anyhow::Result; 440 440 /// List all users and their emails from the database, used for populating bloom filters on 441 441 ///startup 442 442 /// ··· 444 444 /// ```rust 445 445 /// Vec<(String, String)> // (email, username) 446 446 /// ``` 447 - pub async fn list_users_and_emails(pool: &PgPool) -> Result<Vec<(String, String)>> { 447 + pub async fn list_users_and_emails(pool: &PgPool) -> Result<Vec<(String, String)>, sqlx::Error> { 448 448 let recs = sqlx::query!( 449 449 r#" 450 450 SELECT email, username ··· 457 457 for rec in recs { 458 458 res.push((rec.email, rec.username)); 459 459 } 460 - return Ok(res); 460 + Ok(res) 461 461 } 462 462 }
+4 -10
server/src/errors.rs
··· 24 24 pub(crate) enum LuminaError { 25 25 ConfInvalid(crate::EnvVar), 26 26 DbError(LuminaDbError), 27 - Bb8RunErrorPg(bb8::RunError<crate::postgres::Error>), 28 27 Bb8RunErrorRedis(Box<bb8::RunError<redis::RedisError>>), 29 28 Unknown, 30 29 RocketFaillure(Box<rocket::Error>), ··· 50 49 #[derive(Debug)] 51 50 pub(crate) enum LuminaDbError { 52 51 Redis(Box<redis::RedisError>), 53 - Postgres(crate::postgres::Error), 52 + Postgres(sqlx::Error), 54 53 } 55 54 56 55 impl From<rocket::Error> for LuminaError { ··· 65 64 } 66 65 } 67 66 68 - impl From<crate::postgres::Error> for LuminaError { 69 - fn from(err: crate::postgres::Error) -> Self { 67 + impl From<sqlx::Error> for LuminaError { 68 + fn from(err: sqlx::Error) -> Self { 70 69 LuminaError::DbError(LuminaDbError::Postgres(err)) 71 70 } 72 71 } ··· 76 75 LuminaError::DbError(LuminaDbError::Redis(Box::new(err))) 77 76 } 78 77 } 79 - impl From<bb8::RunError<crate::postgres::Error>> for LuminaError { 80 - fn from(err: bb8::RunError<crate::postgres::Error>) -> Self { 81 - LuminaError::Bb8RunErrorPg(err) 82 - } 83 - } 78 + 84 79 impl From<bb8::RunError<redis::RedisError>> for LuminaError { 85 80 fn from(err: bb8::RunError<redis::RedisError>) -> Self { 86 81 LuminaError::Bb8RunErrorRedis(Box::new(err)) ··· 106 101 LuminaDbError::Redis(re) => format!("Redis error: {}", re), 107 102 LuminaDbError::Postgres(pe) => format!("Postgres error: {}", pe), 108 103 }, 109 - LuminaError::Bb8RunErrorPg(e) => format!("Postgres connection pool error: {}", e), 110 104 LuminaError::Bb8RunErrorRedis(e) => format!("Redis connection pool error: {}", e), 111 105 LuminaError::RocketFaillure(e) => format!("Rocket error: {}", e), 112 106 LuminaError::BcryptError => "Bcrypt error".to_string(),
+3 -2
server/src/timeline.rs
··· 138 138 /// Invalidate all cache entries for a timeline 139 139 pub async fn invalidate_timeline_cache( 140 140 redis_conn: &mut bb8::PooledConnection<'_, bb8_redis::RedisConnectionManager>, 141 - timeline_id: &str, 141 + timeline_id: Uuid, 142 142 ) -> Result<(), LuminaError> { 143 + let timeline_id_string = Uuid::to_string(&timeline_id); 143 144 // Use SCAN to find all cache keys for this timeline 144 - let pattern = format!("timeline_cache:{}:*", timeline_id); 145 + let pattern = format!("timeline_cache:{}:*", timeline_id_string); 145 146 146 147 let mut cursor = 0; 147 148 loop {