Game sync and live services for independent game developers (targeting itch.io)
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Phase 5: Atomic quota reservation + admin CLI quota commands

- Add atomic reserve_quota() function with SELECT ... FOR UPDATE
- Replaces separate check_quota + increment_quota with single atomic operation
- Eliminates race condition between quota check and increment
- Add admin CLI commands: quota list, quota set, quota reset, quota show
- Track quota via gamer_quotas table
- Return HTTP 413 when quota exceeded on upload

+268 -16
+7
admin/src/db.rs
··· 86 86 CREATE INDEX IF NOT EXISTS idx_saves_gamer ON saves(gamer_id); 87 87 CREATE INDEX IF NOT EXISTS idx_saves_game ON saves(game_id); 88 88 CREATE INDEX IF NOT EXISTS idx_save_versions_save ON save_versions(save_id); 89 + 90 + CREATE TABLE IF NOT EXISTS gamer_quotas ( 91 + gamer_id TEXT PRIMARY KEY, 92 + used_bytes INTEGER NOT NULL DEFAULT 0, 93 + limit_bytes INTEGER NOT NULL DEFAULT 2147483648, 94 + warning_sent INTEGER NOT NULL DEFAULT 0 95 + ); 89 96 "#, 90 97 )?; 91 98
+37
admin/src/main.rs
··· 4 4 5 5 mod db; 6 6 mod invite; 7 + mod quota; 7 8 mod stats; 8 9 9 10 #[derive(Parser)] ··· 28 29 Stats, 29 30 /// Run database migrations 30 31 Migrate, 32 + /// Manage gamer quotas 33 + Quota { 34 + #[command(subcommand)] 35 + subcommand: QuotaCommands, 36 + }, 31 37 } 32 38 33 39 #[derive(Subcommand)] ··· 50 56 }, 51 57 } 52 58 59 + #[derive(Subcommand)] 60 + enum QuotaCommands { 61 + /// List all gamer quotas 62 + List, 63 + /// Set custom quota limit for a gamer 64 + Set { 65 + /// Gamer ID 66 + gamer_id: String, 67 + /// New limit in bytes 68 + limit_bytes: u64, 69 + }, 70 + /// Reset a gamer's used bytes to 0 and warning_sent to FALSE 71 + Reset { 72 + /// Gamer ID 73 + gamer_id: String, 74 + }, 75 + /// Show quota details for a specific gamer 76 + Show { 77 + /// Gamer ID 78 + gamer_id: String, 79 + }, 80 + } 81 + 53 82 #[tokio::main] 54 83 async fn main() -> Result<()> { 55 84 tracing_subscriber::fmt::init(); ··· 65 94 }, 66 95 Commands::Stats => stats::show(&cli.database).await?, 67 96 Commands::Migrate => db::migrate(&cli.database).await?, 97 + Commands::Quota { subcommand } => match subcommand { 98 + QuotaCommands::List => quota::list(&cli.database).await?, 99 + QuotaCommands::Set { gamer_id, limit_bytes } => { 100 + quota::set(&cli.database, &gamer_id, limit_bytes as i64).await? 101 + } 102 + QuotaCommands::Reset { gamer_id } => quota::reset(&cli.database, &gamer_id).await?, 103 + QuotaCommands::Show { gamer_id } => quota::show(&cli.database, &gamer_id).await?, 104 + }, 68 105 } 69 106 70 107 Ok(())
+112
admin/src/quota.rs
··· 1 + use anyhow::{Context, Result}; 2 + use rusqlite::Connection; 3 + use std::path::Path; 4 + 5 + const DEFAULT_QUOTA_LIMIT: i64 = 2147483648; // 2GB 6 + 7 + pub async fn list(database: &Path) -> Result<()> { 8 + let conn = Connection::open(database).context("Failed to open database")?; 9 + 10 + let mut stmt = conn.prepare( 11 + "SELECT gamer_id, used_bytes, limit_bytes, warning_sent, 12 + ROUND(used_bytes * 100.0 / limit_bytes, 1) as usage_pct 13 + FROM gamer_quotas 14 + ORDER BY used_bytes DESC 15 + LIMIT 100" 16 + )?; 17 + 18 + println!("{:<40} {:>15} {:>15} {:>10} {:>10}", 19 + "GAMER ID", "USED", "LIMIT", "WARNING", "USAGE%"); 20 + println!("{}", "-".repeat(95)); 21 + 22 + let rows = stmt.query_map([], |row| { 23 + Ok(( 24 + row.get::<_, String>(0)?, 25 + row.get::<_, i64>(1)?, 26 + row.get::<_, i64>(2)?, 27 + row.get::<_, i64>(3)?, 28 + row.get::<_, f64>(4)?, 29 + )) 30 + })?; 31 + 32 + for row in rows { 33 + let (gamer_id, used, limit, warning, usage) = row?; 34 + println!("{:<40} {:>15} {:>15} {:>10} {:>9.1}%", 35 + gamer_id, used, limit, if warning != 0 { "YES" } else { "NO" }, usage); 36 + } 37 + 38 + Ok(()) 39 + } 40 + 41 + pub async fn set(database: &Path, gamer_id: &str, limit_bytes: i64) -> Result<()> { 42 + let conn = Connection::open(database).context("Failed to open database")?; 43 + 44 + conn.execute( 45 + "INSERT INTO gamer_quotas (gamer_id, used_bytes, limit_bytes, warning_sent) 46 + VALUES (?, 0, ?, 0) 47 + ON CONFLICT(gamer_id) DO UPDATE SET limit_bytes = ?", 48 + [gamer_id, &limit_bytes.to_string(), &limit_bytes.to_string()], 49 + )?; 50 + 51 + println!("Set quota limit for {} to {} bytes ({:.2} GB)", 52 + gamer_id, limit_bytes, limit_bytes as f64 / 1_073_741_824.0); 53 + Ok(()) 54 + } 55 + 56 + pub async fn reset(database: &Path, gamer_id: &str) -> Result<()> { 57 + let conn = Connection::open(database).context("Failed to open database")?; 58 + 59 + let rows_affected = conn.execute( 60 + "UPDATE gamer_quotas SET used_bytes = 0, warning_sent = 0 WHERE gamer_id = ?", 61 + [gamer_id], 62 + )?; 63 + 64 + if rows_affected == 0 { 65 + println!("No quota record found for {}", gamer_id); 66 + } else { 67 + println!("Reset quota for {} (used_bytes = 0, warning_sent = FALSE)", gamer_id); 68 + } 69 + Ok(()) 70 + } 71 + 72 + pub async fn show(database: &Path, gamer_id: &str) -> Result<()> { 73 + let conn = Connection::open(database).context("Failed to open database")?; 74 + 75 + let result = conn.query_row( 76 + "SELECT gamer_id, used_bytes, limit_bytes, warning_sent 77 + FROM gamer_quotas WHERE gamer_id = ?", 78 + [gamer_id], 79 + |row| { 80 + Ok(( 81 + row.get::<_, String>(0)?, 82 + row.get::<_, i64>(1)?, 83 + row.get::<_, i64>(2)?, 84 + row.get::<_, i64>(3)?, 85 + )) 86 + }, 87 + ); 88 + 89 + match result { 90 + Ok((gamer_id, used, limit, warning)) => { 91 + let usage_pct = (used as f64 / limit as f64) * 100.0; 92 + let remaining = limit - used; 93 + 94 + println!("Quota for: {}", gamer_id); 95 + println!(" Used: {} bytes ({:.2} GB)", used, used as f64 / 1_073_741_824.0); 96 + println!(" Limit: {} bytes ({:.2} GB)", limit, limit as f64 / 1_073_741_824.0); 97 + println!(" Remaining: {} bytes ({:.2} GB)", remaining, remaining as f64 / 1_073_741_824.0); 98 + println!(" Usage: {:.1}%", usage_pct); 99 + println!(" Warning: {}", if warning != 0 { "SENT" } else { "NOT SENT" }); 100 + } 101 + Err(_) => { 102 + println!("No quota record found for {}. Using defaults:", gamer_id); 103 + println!(" Used: 0 bytes (0.00 GB)"); 104 + println!(" Limit: {} bytes ({:.2} GB)", DEFAULT_QUOTA_LIMIT, DEFAULT_QUOTA_LIMIT as f64 / 1_073_741_824.0); 105 + println!(" Remaining: {} bytes ({:.2} GB)", DEFAULT_QUOTA_LIMIT, DEFAULT_QUOTA_LIMIT as f64 / 1_073_741_824.0); 106 + println!(" Usage: 0.0%"); 107 + println!(" Warning: NOT SENT"); 108 + } 109 + } 110 + 111 + Ok(()) 112 + }
+29 -15
api/src/api/saves.rs
··· 9 9 10 10 use crate::auth::AuthService; 11 11 use crate::db::DbPool; 12 - use crate::db::{check_quota, get_quota, increment_quota}; 12 + use crate::db::{get_quota, reserve_quota}; 13 13 use crate::storage::StorageProvider; 14 14 15 15 /// Request to upload a save ··· 184 184 } 185 185 }; 186 186 187 - // Check quota before uploading 188 - match check_quota(&self.pool, &req.gamer_id, save_data.len() as i64).await { 187 + let new_cid = Self::generate_cid(&save_data); 188 + let mut size_bytes = save_data.len(); 189 + let mut is_delta = false; 190 + 191 + // Reserve quota atomically BEFORE upload (handles delta compression size) 192 + // This prevents race conditions where two concurrent uploads could both pass 193 + // check_quota but then both increment, exceeding the quota. 194 + match reserve_quota(&self.pool, &req.gamer_id, size_bytes as i64).await { 189 195 Err(err) => { 190 196 return UploadSaveResponse::PayloadTooLarge(Json(QuotaError { 191 197 message: "Storage quota exceeded".to_string(), ··· 195 201 } 196 202 Ok(_) => {} 197 203 } 198 - 199 - let new_cid = Self::generate_cid(&save_data); 200 - let mut size_bytes = save_data.len(); 201 - let mut is_delta = false; 202 204 203 205 // Check if base_cid is provided for delta compression 204 206 if let Some(base_cid) = &req.base_cid { ··· 291 293 match result { 292 294 Ok(_) => { 293 295 let version_id = Uuid::new_v4().to_string(); 294 - sqlx::query( 296 + if let Err(e) = sqlx::query( 295 297 r#" 296 298 INSERT INTO save_versions (id, save_id, version_number, cid, milestone, size_bytes) 297 299 VALUES (?, ?, ?, ?, ?, ?) ··· 305 307 .bind(size_bytes as i64) 306 308 .execute(&self.pool) 307 309 .await 308 - .ok(); 309 - 310 - // Increment quota after successful upload 311 - let _ = increment_quota(&self.pool, &req.gamer_id, size_bytes as i64).await; 310 + { 311 + tracing::error!( 312 + gamer_id = %req.gamer_id, 313 + size_bytes = size_bytes, 314 + "Failed to record save version after upload succeeded: {}", 315 + e 316 + ); 317 + } 312 318 313 319 UploadSaveResponse::Created(Json(SaveUploadResponse { 314 320 id: save_id, ··· 318 324 is_delta, 319 325 })) 320 326 } 321 - Err(e) => UploadSaveResponse::BadRequest(Json(Error { 322 - message: format!("Failed to save: {}", e), 323 - })), 327 + Err(e) => { 328 + tracing::error!( 329 + gamer_id = %req.gamer_id, 330 + size_bytes = size_bytes, 331 + "Failed to save after quota reserved: {}", 332 + e 333 + ); 334 + UploadSaveResponse::BadRequest(Json(Error { 335 + message: format!("Failed to save: {}", e), 336 + })) 337 + } 324 338 } 325 339 } 326 340
+82
api/src/db/mod.rs
··· 4 4 5 5 pub type DbPool = Pool<Sqlite>; 6 6 7 + pub const DEFAULT_QUOTA_LIMIT: i64 = 2147483648; // 2GB 8 + pub const QUOTA_WARNING_THRESHOLD: f64 = 0.9; // 90% 9 + 7 10 /// Represents a gamer's storage quota 8 11 #[derive(Debug, Clone)] 9 12 pub struct GamerQuota { ··· 200 203 201 204 Ok(()) 202 205 } 206 + 207 + /// Atomically reserve quota for a new upload - checks AND increments in a single transaction. 208 + /// This prevents race conditions where two concurrent uploads could both pass check_quota 209 + /// but then both increment, exceeding the quota. 210 + /// Returns Err(QuotaExceeded) if no room, Ok(new_used_bytes) if successful. 211 + pub async fn reserve_quota(pool: &DbPool, gamer_id: &str, bytes: i64) -> Result<i64, QuotaExceeded> { 212 + let mut tx = pool.begin().await.expect("Failed to begin transaction"); 213 + 214 + let quota: GamerQuota = match sqlx::query( 215 + "SELECT gamer_id, used_bytes, limit_bytes, warning_sent FROM gamer_quotas WHERE gamer_id = ?", 216 + ) 217 + .bind(gamer_id) 218 + .fetch_optional(&mut *tx) 219 + .await 220 + { 221 + Ok(Some(row)) => GamerQuota { 222 + gamer_id: row.get("gamer_id"), 223 + used_bytes: row.get("used_bytes"), 224 + limit_bytes: row.get("limit_bytes"), 225 + warning_sent: row.get::<i64, _>("warning_sent") != 0, 226 + }, 227 + Ok(None) => GamerQuota { 228 + gamer_id: gamer_id.to_string(), 229 + used_bytes: 0, 230 + limit_bytes: DEFAULT_QUOTA_LIMIT, 231 + warning_sent: false, 232 + }, 233 + Err(e) => { 234 + tracing::error!("Database error fetching quota for {}: {}", gamer_id, e); 235 + return Err(QuotaExceeded { 236 + used_bytes: 0, 237 + limit_bytes: DEFAULT_QUOTA_LIMIT, 238 + requested_bytes: bytes, 239 + }); 240 + } 241 + }; 242 + 243 + if quota.used_bytes + bytes > quota.limit_bytes { 244 + tx.rollback().await.ok(); 245 + return Err(QuotaExceeded { 246 + used_bytes: quota.used_bytes, 247 + limit_bytes: quota.limit_bytes, 248 + requested_bytes: bytes, 249 + }); 250 + } 251 + 252 + let new_used = quota.used_bytes + bytes; 253 + let should_warn = !quota.warning_sent 254 + && (new_used as f64 / quota.limit_bytes as f64) >= QUOTA_WARNING_THRESHOLD; 255 + 256 + if quota.used_bytes == 0 { 257 + sqlx::query( 258 + "INSERT INTO gamer_quotas (gamer_id, used_bytes, limit_bytes, warning_sent) VALUES (?, ?, ?, ?) 259 + ON CONFLICT(gamer_id) DO UPDATE SET used_bytes = ?, warning_sent = ?", 260 + ) 261 + .bind(gamer_id) 262 + .bind(new_used) 263 + .bind(quota.limit_bytes) 264 + .bind(should_warn) 265 + .bind(new_used) 266 + .bind(should_warn) 267 + .execute(&mut *tx) 268 + .await 269 + .expect("Failed to insert quota record"); 270 + } else { 271 + sqlx::query( 272 + "UPDATE gamer_quotas SET used_bytes = ?, warning_sent = ? WHERE gamer_id = ?", 273 + ) 274 + .bind(new_used) 275 + .bind(should_warn) 276 + .bind(gamer_id) 277 + .execute(&mut *tx) 278 + .await 279 + .expect("Failed to update quota record"); 280 + } 281 + 282 + tx.commit().await.expect("Failed to commit quota transaction"); 283 + Ok(new_used) 284 + }
+1 -1
api/src/web/mod.rs
··· 3 3 use rust_embed::Embed; 4 4 5 5 #[derive(Embed)] 6 - #[folder = "../../landing/build"] 6 + #[folder = "../landing/build"] 7 7 struct Assets; 8 8 9 9 pub struct StaticAssets;