···2626use crate::LuminaError;
2727use crate::database::PgConn;
2828use cynthia_con::{CynthiaColors, CynthiaStyles};
2929-use time::OffsetDateTime;
2929+use time::{OffsetDateTime, PrimitiveDateTime};
30303131/// Levels of logging supported by the Logger.
3232#[derive(Debug)]
···8686 /// asynchronously inserts a log entry in the logs table.
8787 pub async fn log(&self, level: EventType, message: &str) {
8888 // Get the current timestamp.
8989- let now = OffsetDateTime::now_utc();
8989+ let now_odt = OffsetDateTime::now_utc();
9090+ let now_pdt = PrimitiveDateTime::new(now_odt.date(), now_odt.time());
90919192 // Determine the appropriate prefix for stdout.
9293 // These prefixes are colored and styled matching helpers::prefixes().
···166167 .chars()
167168 .filter(|c| !c.is_control() || c.is_whitespace())
168169 .collect();
169169- let ts = now
170170- .format(&time::format_description::well_known::Rfc3339)
171171- .unwrap();
172172-173173- if let Ok(pg_conn) = db_conn.postgres_pool.get().await {
174174- let _ = pg_conn
175175- .execute(
176176- "INSERT INTO logs (type, message, timestamp) VALUES ($1, $2, $3)",
177177- &[&level_str, &message_db, &ts],
170170+ match sqlx::query!(
171171+ "INSERT INTO logs (type, message, timestamp) VALUES ($1, $2, $3)",
172172+ &level_str,
173173+ &message_db,
174174+ &now_pdt,
175175+ )
176176+ .execute(&db_conn.postgres_pool)
177177+ .await
178178+ {
179179+ Ok(_) => (),
180180+ Err(p) => {
181181+ panic!(
182182+ "{0}\n\n\n{1}\n\n\n\n{0}",
183183+ "Could not write logs to database! Crashing.", p
178184 )
179179- .await;
185185+ }
180186 }
181187 }
182188 EventLogger::OnlyStdout => {
+10-19
server/src/main.rs
···156156 error_elog!(ev_log, "While connecting to postgres database: {}", a);
157157 None
158158 }
159159- Err(LuminaError::Bb8RunErrorPg(a)) => {
160160- error_elog!(ev_log, "While setting up database pool: {}", a);
161161- None
162162- }
163159 Err(LuminaError::DbError(crate::errors::LuminaDbError::Redis(a))) => {
164160 error_elog!(ev_log, "While connecting to Redis: {}", a);
165161 None
···203199 if cfg!(debug_assertions) {
204200 let redis_pool = db.get_redis_pool();
205201 let mut redis_conn = redis_pool.get().await.unwrap();
206206- timeline::invalidate_timeline_cache(
207207- &mut redis_conn,
208208- "00000000-0000-0000-0000-000000000000",
209209- )
210210- .await
211211- .unwrap();
202202+ timeline::invalidate_timeline_cache(&mut redis_conn, Uuid::nil())
203203+ .await
204204+ .unwrap();
212205 let global = timeline::fetch_timeline_post_ids(
213206 ev_log.clone(),
214207 &db,
215215- "00000000-0000-0000-0000-000000000000",
208208+ &Uuid::nil(),
216209 None,
217210 )
218211 .await
···227220228221 match db.recreate().await.into() {
229222 DbConn::PgsqlConnection(pg_pool, _) => {
230230- let client = pg_pool.get().await.unwrap();
231223 // Insert Hello World post and timeline entry if not exists
232224 let user_1_: Result<user::User, LuminaError> =
233225 match user::User::create_user(
···277269 println!(
278270 "Created two users with password 'MyTestPassw9292!' and usernames 'testuser1' and 'testuser2'."
279271 );
280280- let _ = client
281281- .execute(
282282- "INSERT INTO post_text (id, author_id, content, created_at) VALUES ($1, $2, $3, CURRENT_TIMESTAMP) ON CONFLICT (id) DO NOTHING",
283283- &[&generated_uuid, &user_1.id, &hello_content],
272272+ sqlx::query!("INSERT INTO post_text (id, author_id, content, created_at) VALUES ($1, $2, $3, CURRENT_TIMESTAMP) ON CONFLICT (id) DO NOTHING",
273273+ &generated_uuid, &user_1.id, &hello_content
284274 )
285285- .await;
275275+ .execute(&pg_pool)
276276+ .await.unwrap_or_default();
286277 let add_clone = ev_log.clone();
287278 timeline::add_to_timeline(
288279 add_clone,
289280 &db,
290290- "00000000-0000-0000-0000-000000000000",
291291- generated_uuid.to_string().as_str(),
281281+ &Uuid::nil(),
282282+ &generated_uuid,
292283 )
293284 .await
294285 .unwrap_or(());
+4-5
server/src/tests.rs
···1616 * along with this program. If not, see <https://www.gnu.org/licenses/>.
1717 */
18181919+use uuid::Uuid;
2020+1921use crate::database::{self, DatabaseConnections};
2022use crate::errors::LuminaError;
2123use crate::timeline;
···2628 let result = database::setup()
2729 .await
2830 .expect("Database setup should succeed.");
2929- assert!(
3030- result.get_postgres_pool().get().await.is_ok(),
3131- "Should get Postgres connection"
3232- );
3331 assert!(
3432 result.get_redis_pool().get().await.is_ok(),
3533 "Should get Redis connection"
···7573 let db = database::setup().await.expect("DB setup");
7674 let redis_pool = db.get_redis_pool();
7775 let mut conn = redis_pool.get().await.expect("Redis conn");
7878- let timeline_id = "test-timeline-invalidation";
7676+ // Global timeline
7777+ let timeline_id = Uuid::nil();
79788079 // Set a test cache key
8180 let cache_key = format!("timeline_cache:{}:page:0", timeline_id);
+44-48
server/src/timeline.rs
···175175async fn fetch_timeline_total_count(db: &DbConn, timeline_id: &str) -> Result<usize, LuminaError> {
176176 match db {
177177 DbConn::PgsqlConnection(pg_pool, _redis_pool) => {
178178- let client = pg_pool.get().await?;
179178 let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?;
180180- let row = client
181181- .query_one(
182182- "SELECT COUNT(*) FROM timelines WHERE tlid = $1",
183183- &[&timeline_uuid],
184184- )
185185- .await?;
179179+ let row = sqlx::query!(
180180+ "SELECT COUNT(*) AS count FROM timelines WHERE tlid = $1",
181181+ &timeline_uuid
182182+ )
183183+ .fetch_one(pg_pool)
184184+ .await?;
186185187187- let count: i64 = row.get(0);
186186+ let count: i64 = row.count.unwrap_or(0);
188187 Ok(count as usize)
189188 }
190189 }
···199198) -> Result<Vec<String>, LuminaError> {
200199 match db {
201200 DbConn::PgsqlConnection(pg_pool, _redis_pool) => {
202202- let client = pg_pool.get().await?;
203201 let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?;
204204- let rows = client
205205- .query(
202202+ let rows =
203203+ sqlx::query!(
206204 "SELECT item_id FROM timelines WHERE tlid = $1 ORDER BY timestamp DESC LIMIT $2 OFFSET $3",
207207- &[&timeline_uuid, &(limit as i64), &(offset as i64)],
205205+ &timeline_uuid, &(limit as i64), &(offset as i64),
208206 )
207207+ .fetch_all(pg_pool)
209208 .await
210209 ?;
211210212211 let post_ids = rows
213212 .into_iter()
214214- .map(|row| row.get::<_, Uuid>(0).to_string())
213213+ .map(|row| row.item_id.to_string())
215214 .collect();
216215 Ok(post_ids)
217216 }
···223222pub async fn fetch_timeline_post_ids(
224223 event_logger: EventLogger,
225224 db: &DbConn,
226226- timeline_id: &str,
225225+ timeline: &Uuid,
227226 page: Option<usize>,
228227) -> Result<(Vec<String>, usize, bool), LuminaError> {
228228+ let timeline_id = timeline.to_string();
229229 let page = page.unwrap_or(0);
230230 let offset = page * TIMELINE_PAGE_SIZE;
231231···241241 .await?;
242242243243 // Check if this timeline should be cached
244244- let should_cache = is_high_traffic_timeline(&mut redis_conn, timeline_id).await?;
244244+ let should_cache = is_high_traffic_timeline(&mut redis_conn, &timeline_id).await?;
245245246246 // Try to get from cache if it's a high-traffic timeline
247247 if should_cache
248248 && let Some(cached_page) =
249249- get_cached_timeline_page(&mut redis_conn, timeline_id, page).await?
249249+ get_cached_timeline_page(&mut redis_conn, &timeline_id, page).await?
250250 {
251251 let has_more = (page + 1) * TIMELINE_PAGE_SIZE < cached_page.total_count;
252252 return Ok((cached_page.post_ids, cached_page.total_count, has_more));
···255255 // Cache miss or low-traffic timeline - fetch from database
256256 if timeline_id == GLOBAL_TIMELINE_ID || should_cache {
257257 // Get total count
258258- let total_count = fetch_timeline_total_count(db, timeline_id).await?;
258258+ let total_count = fetch_timeline_total_count(db, &timeline_id).await?;
259259260260 // Get page data
261261- let post_ids = fetch_timeline_from_db(db, timeline_id, offset, TIMELINE_PAGE_SIZE).await?;
261261+ let post_ids = fetch_timeline_from_db(db, &timeline_id, offset, TIMELINE_PAGE_SIZE).await?;
262262263263 // Cache the result if it's high-traffic
264264 if should_cache {
265265- match cache_timeline_page(&mut redis_conn, timeline_id, page, &post_ids, total_count)
265265+ match cache_timeline_page(&mut redis_conn, &timeline_id, page, &post_ids, total_count)
266266 .await
267267 {
268268 Ok(_) => info_elog!(
···324324 );
325325 // For now, only global timeline is supported.
326326 if timeline_name == "global" {
327327- let timeline_uuid =
327327+ let global_timeline_uuid =
328328 Uuid::parse_str(GLOBAL_TIMELINE_ID).map_err(|_| LuminaError::UUidError)?;
329329 let (post_ids, total_count, has_more) =
330330- fetch_timeline_post_ids(event_logger, db, GLOBAL_TIMELINE_ID, page).await?;
331331- Ok((timeline_uuid, post_ids, total_count, has_more))
330330+ fetch_timeline_post_ids(event_logger, db, &global_timeline_uuid, page).await?;
331331+ Ok((global_timeline_uuid, post_ids, total_count, has_more))
332332 } else {
333333 // Handle other timelines in the future
334334 error_elog!(
···344344pub async fn add_to_timeline(
345345 event_logger: EventLogger,
346346 db: &DbConn,
347347- timeline_id: &str,
348348- item_id: &str,
347347+ timeline: &Uuid,
348348+ item: &Uuid,
349349) -> Result<(), LuminaError> {
350350 // Add to database
351351 match db {
352352 DbConn::PgsqlConnection(pg_pool, redis_pool) => {
353353- let client = pg_pool.get().await?;
354354- let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?;
355355- let item_uuid = Uuid::parse_str(item_id).map_err(|_| LuminaError::UUidError)?;
356356- client
357357- .execute(
358358- "INSERT INTO timelines (tlid, item_id, timestamp) VALUES ($1, $2, NOW())",
359359- &[&timeline_uuid, &item_uuid],
360360- )
361361- .await?;
353353+ sqlx::query!(
354354+ "INSERT INTO timelines (tlid, item_id, timestamp) VALUES ($1, $2, NOW())",
355355+ *timeline,
356356+ item,
357357+ )
358358+ .execute(pg_pool)
359359+ .await?;
362360363361 // Invalidate cache
364362 let mut redis_conn = redis_pool.get().await?;
365365- if let Err(e) = invalidate_timeline_cache(&mut redis_conn, timeline_id).await {
363363+ if let Err(e) = invalidate_timeline_cache(&mut redis_conn, *timeline).await {
366364 error_elog!(
367365 event_logger,
368366 "Failed to invalidate cache for timeline {}: {:?}",
369369- timeline_id,
367367+ timeline.to_string(),
370368 e
371369 );
372370 }
···381379pub async fn remove_from_timeline(
382380 event_logger: EventLogger,
383381 db: &DbConn,
384384- timeline_id: &str,
385385- item_id: &str,
382382+ timeline: &Uuid,
383383+ item: &Uuid,
386384) -> Result<(), LuminaError> {
387385 // Remove from database
388386 match db {
389387 DbConn::PgsqlConnection(pg_pool, redis_pool) => {
390390- let client = pg_pool.get().await?;
391391- let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?;
392392- let item_uuid = Uuid::parse_str(item_id).map_err(|_| LuminaError::UUidError)?;
393393- client
394394- .execute(
395395- "DELETE FROM timelines WHERE tlid = $1 AND item_id = $2",
396396- &[&timeline_uuid, &item_uuid],
397397- )
398398- .await?;
388388+ sqlx::query!(
389389+ "DELETE FROM timelines WHERE tlid = $1 AND item_id = $2",
390390+ &timeline,
391391+ &item
392392+ )
393393+ .execute(pg_pool)
394394+ .await?;
399395400396 // Invalidate cache
401397 let mut redis_conn = redis_pool.get().await?;
402402- if let Err(e) = invalidate_timeline_cache(&mut redis_conn, timeline_id).await {
398398+ if let Err(e) = invalidate_timeline_cache(&mut redis_conn, *timeline).await {
403399 error_elog!(
404400 event_logger,
405401 "Failed to invalidate cache for timeline {}: {:?}",
406406- timeline_id,
402402+ timeline.to_string(),
407403 e
408404 );
409405 }
+45-34
server/src/user.rs
···2222use crate::{LuminaError, database::DbConn, helpers::events::EventLogger, info_elog};
2323use cynthia_con::CynthiaColors;
2424use uuid::Uuid;
2525-use crate::database::DatabaseConnections;
26252726#[derive(Debug, Clone)]
2827pub struct User {
···6362 async fn get_hashed_password(self, database: &DbConn) -> Result<String, LuminaError> {
6463 match database {
6564 DbConn::PgsqlConnection(pg_pool, _) => {
6666-6765 let row = sqlx::query!("SELECT password FROM users WHERE id = $1", &self.id)
6866 .fetch_one(pg_pool)
6967 .await?;
···9694 let username_exists =
9795 sqlx::query!("SELECT * FROM users WHERE username = $1", &username)
9896 .fetch_optional(pg_pool)
9999- .await?;
9797+ .await?;
10098 if !username_exists.is_none() {
10199 return Err(LuminaError::RegisterUsernameInUse);
102100 }
···118116 identifier: String,
119117 db: &DbConn,
120118 ) -> Result<User, LuminaError> {
121121- let identifyer_type = if identifier.contains('@') {
122122- "email"
119119+ let DbConn::PgsqlConnection(pg_pool, _) = db;
120120+ // todo: Find a way to not repeat here, without it 'never matching' (which is what happens if you
121121+ // parameterise the left side of the WHERE...)
122122+ if identifier.contains('@') {
123123+ match sqlx::query!("SELECT id, email, username, coalesce(foreign_instance_id, '') as foreign_instance_id FROM users WHERE email = $1", &identifier).fetch_optional(pg_pool).await?
124124+ {
125125+ None => Err(LuminaError::AuthenticationNoSuchUser),
126126+ Some(user) => Ok(
127127+ User {
128128+ id: user.id,
129129+ email: user.email,
130130+ username: user.username,
131131+ foreign_instance_id: user.foreign_instance_id.unwrap_or("".to_string()),
132132+ }
133133+ ),
134134+ }
123135 } else {
124124- "username"
125125- };
126126- match db {
127127- DbConn::PgsqlConnection(pg_pool, _) => {
128128- let user = sqlx::query!("SELECT id, email, username, coalesce(foreign_instance_id, '') as foreign_instance_id FROM users WHERE $1 = $2", identifyer_type, &identifier)
129129-130130- .fetch_one(pg_pool)
131131- .await
132132- ?;
133133- Ok(User {
134134- id: user.id,
135135- email: user.email,
136136- username: user.username,
137137- foreign_instance_id: user.foreign_instance_id.unwrap_or("".to_string()),
138138- })
139139- }
136136+ match sqlx::query!("SELECT id, email, username, coalesce(foreign_instance_id, '') as foreign_instance_id FROM users WHERE username = $1", &identifier).fetch_optional(pg_pool).await?
137137+ {
138138+ None => Err(LuminaError::AuthenticationNoSuchUser),
139139+ Some(user) => Ok(
140140+ User {
141141+ id: user.id,
142142+ email: user.email,
143143+ username: user.username,
144144+ foreign_instance_id: user.foreign_instance_id.unwrap_or("".to_string()),
145145+ }
146146+ )
147147+ }
140148 }
141149 }
142150···151159 DbConn::PgsqlConnection(pg_pool, _) => {
152160 let session_key = Uuid::new_v4().to_string();
153161 let id = sqlx::query!(
154154- "INSERT INTO sessions (user_id, session_key) VALUES ($1, $2) RETURNING id",
155155- &user_id, &session_key,
156156- ).fetch_one(pg_pool)
157157- .await?;
162162+ "INSERT INTO sessions (user_id, session_key) VALUES ($1, $2) RETURNING id",
163163+ &user_id,
164164+ &session_key,
165165+ )
166166+ .fetch_one(pg_pool)
167167+ .await?;
158168 info_elog!(
159169 ev_log,
160170 "New session created by {}",
···229239 .unwrap_or(false);
230240 if username_exists {
231241 // Fallback to DB check if in bloom filter
232232- let username_db = sqlx::query!("SELECT * FROM users WHERE username = $1", &username)
233233- .fetch_optional(pg_pool)
234234- .await?;
242242+ let username_db =
243243+ sqlx::query!("SELECT * FROM users WHERE username = $1", &username)
244244+ .fetch_optional(pg_pool)
245245+ .await?;
235246 if !username_db.is_none() {
236247 return Err(LuminaError::RegisterUsernameInUse);
237248 }
238249 }
239250 // Fallback to DB check if not in bloom filter
240240- let email_db =
241241- sqlx::query!("SELECT * FROM users WHERE email = $1", &email)
242242- .fetch_optional(pg_pool)
251251+ let email_db = sqlx::query!("SELECT * FROM users WHERE email = $1", &email)
252252+ .fetch_optional(pg_pool)
243253 .await?;
244254 if !email_db.is_none() {
245255 // Update bloom filter after DB check
···251261 .unwrap_or(());
252262 return Err(LuminaError::RegisterEmailInUse);
253263 }
254254- let username_db = sqlx::query!("SELECT * FROM users WHERE username = $1", &username)
255255- .fetch_optional(pg_pool)
256256- .await?;
264264+ let username_db =
265265+ sqlx::query!("SELECT * FROM users WHERE username = $1", &username)
266266+ .fetch_optional(pg_pool)
267267+ .await?;
257268 if !username_db.is_none() {
258269 let _: () = redis::cmd("BF.ADD")
259270 .arg(&username_key)