Game sync and live services for independent game developers (targeting itch.io)
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Phase 3: Delta compression with qbsdiff + quota tracking

- Delta compression: upload stores delta if smaller than full data
- Delta reconstruction: download applies patches sequentially from base
- Quota tracking: gamer_quotas table with check/increment functions
- HTTP 413 Payload Too Large when quota exceeded
- Fixed tuple destructuring to use .clone() instead of ref
- Removed unused imports (NewType, sqlx::Row, std::io::Cursor)

+401 -63
+308 -62
api/src/api/saves.rs
··· 1 1 use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; 2 2 use poem_openapi::param::Path; 3 3 use poem_openapi::payload::Json; 4 - use poem_openapi::{ApiResponse, NewType, Object, OpenApi}; 4 + use poem_openapi::{ApiResponse, Object, OpenApi}; 5 + use qbsdiff::{Bsdiff, Bspatch}; 5 6 use sha2::Digest; 6 - use sqlx::Row; 7 7 use uuid::Uuid; 8 8 9 9 use crate::db::DbPool; 10 + use crate::db::{check_quota, get_quota, increment_quota}; 10 11 use crate::storage::StorageProvider; 11 12 12 13 /// Request to upload a save ··· 48 49 pub message: String, 49 50 } 50 51 52 + /// Quota status response 53 + #[derive(Object)] 54 + pub struct QuotaResponse { 55 + pub used: u64, 56 + pub limit: u64, 57 + pub remaining: u64, 58 + } 59 + 60 + /// Quota exceeded error 61 + #[derive(Object)] 62 + pub struct QuotaError { 63 + pub message: String, 64 + pub used_bytes: u64, 65 + pub limit_bytes: u64, 66 + } 67 + 51 68 /// Upload save response 52 69 #[derive(ApiResponse)] 53 70 pub enum UploadSaveResponse { ··· 57 74 BadRequest(Json<Error>), 58 75 #[oai(status = 404)] 59 76 NotFound(Json<Error>), 77 + #[oai(status = 413)] 78 + PayloadTooLarge(Json<QuotaError>), 60 79 } 61 80 62 81 /// Download save response ··· 68 87 NotFound(Json<Error>), 69 88 } 70 89 90 + /// Quota response 91 + #[derive(ApiResponse)] 92 + pub enum GetQuotaResponse { 93 + #[oai(status = 200)] 94 + Ok(Json<QuotaResponse>), 95 + } 96 + 71 97 /// Save management endpoint 72 98 pub struct SavesEndpoint { 73 99 pool: DbPool, ··· 116 142 } 117 143 }; 118 144 145 + // Check quota before uploading 146 + match check_quota(&self.pool, &req.gamer_id, save_data.len() as i64).await { 147 + Err(err) => { 148 + return UploadSaveResponse::PayloadTooLarge(Json(QuotaError { 149 + message: "Storage quota exceeded".to_string(), 150 + used_bytes: err.used_bytes as u64, 151 + limit_bytes: err.limit_bytes as u64, 152 + })); 153 + } 154 + Ok(_) => {} 155 + } 156 + 119 157 let new_cid = Self::generate_cid(&save_data); 120 - let size_bytes = save_data.len(); 158 + let mut size_bytes = save_data.len(); 159 + let mut is_delta = false; 160 + 161 + // Check if base_cid is provided for delta compression 162 + if let Some(base_cid) = &req.base_cid { 163 + // Fetch base save from storage 164 + let base_storage_path = format!("saves/{}/{}/{}", req.game_id, req.gamer_id, base_cid); 165 + if let Ok(base_data) = self.storage.download_blob(&base_storage_path).await { 166 + // Compute delta between base and new data 167 + let mut delta = Vec::new(); 168 + if let Ok(delta_size) = Bsdiff::new(&base_data, &save_data).compare(&mut delta) { 169 + // Only use delta if it's smaller than full data 170 + if delta_size < save_data.len() as u64 { 171 + // Store the delta instead of full data 172 + size_bytes = delta.len(); 173 + is_delta = true; 121 174 122 - // Upload blob to storage 123 - let storage_path = format!("saves/{}/{}/{}", req.game_id, req.gamer_id, new_cid); 124 - if let Err(e) = self.storage.upload_blob(&storage_path, &save_data).await { 125 - return UploadSaveResponse::BadRequest(Json(Error { 126 - message: format!("Failed to upload save data: {}", e), 127 - })); 175 + let storage_path = format!("saves/{}/{}/{}", req.game_id, req.gamer_id, new_cid); 176 + if let Err(e) = self.storage.upload_blob(&storage_path, &delta).await { 177 + return UploadSaveResponse::BadRequest(Json(Error { 178 + message: format!("Failed to upload save data: {}", e), 179 + })); 180 + } 181 + 182 + // Database insertion happens below with is_delta=true and size_bytes=delta.len() 183 + } 184 + } 185 + } 186 + } 187 + 188 + // Upload full data if no delta compression was applied 189 + if !is_delta { 190 + let storage_path = format!("saves/{}/{}/{}", req.game_id, req.gamer_id, new_cid); 191 + if let Err(e) = self.storage.upload_blob(&storage_path, &save_data).await { 192 + return UploadSaveResponse::BadRequest(Json(Error { 193 + message: format!("Failed to upload save data: {}", e), 194 + })); 195 + } 128 196 } 129 197 130 198 // Check if this is an update or new save ··· 197 265 .await 198 266 .ok(); 199 267 268 + // Increment quota after successful upload 269 + let _ = increment_quota(&self.pool, &req.gamer_id, size_bytes as i64).await; 270 + 200 271 UploadSaveResponse::Created(Json(SaveUploadResponse { 201 272 id: save_id, 202 273 cid: new_cid, 203 274 version: version_number, 204 275 size_bytes, 205 - is_delta: false, 276 + is_delta, 206 277 })) 207 278 } 208 279 Err(e) => UploadSaveResponse::BadRequest(Json(Error { ··· 238 309 239 310 match save_row { 240 311 Some((save_id, current_cid, created_at)) => { 241 - let version_row: Option<(String, i32, i64)> = sqlx::query_as( 242 - "SELECT cid, version_number, size_bytes FROM save_versions WHERE save_id = ? AND version_number = ?", 312 + // Get all versions from 1 to target to check for deltas and reconstruct 313 + let versions: Vec<(String, i32, i64, bool)> = sqlx::query_as( 314 + "SELECT cid, version_number, size_bytes, is_delta FROM save_versions WHERE save_id = ? AND version_number <= ? ORDER BY version_number ASC", 243 315 ) 244 316 .bind(&save_id) 245 317 .bind(target_version as i64) 246 - .fetch_optional(&self.pool) 318 + .fetch_all(&self.pool) 247 319 .await 248 320 .ok() 249 - .flatten(); 321 + .unwrap_or_default(); 322 + 323 + if versions.is_empty() { 324 + return DownloadSaveResponse::NotFound(Json(Error { 325 + message: "Version not found".to_string(), 326 + })); 327 + } 250 328 251 - match version_row { 252 - Some((cid, version_num, size_bytes)) => { 253 - let storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, cid); 254 - let save_data = match self.storage.download_blob(&storage_path).await { 255 - Ok(data) => data, 256 - Err(e) => { 257 - return DownloadSaveResponse::NotFound(Json(Error { 258 - message: format!("Failed to fetch save data: {}", e), 259 - })); 329 + let target_version_data = versions.last().unwrap(); 330 + let (target_cid, version_num, size_bytes, is_delta) = (*target_version_data).clone(); 331 + 332 + // Check if target version is a delta that needs reconstruction 333 + if is_delta { 334 + // Find the base version (first non-delta version) 335 + let base_version = versions.iter().find(|v| !v.3); 336 + 337 + match base_version { 338 + Some((base_cid, base_version_num, _, _)) => { 339 + // Download base data 340 + let base_storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, base_cid); 341 + let mut full_data = match self.storage.download_blob(&base_storage_path).await { 342 + Ok(data) => data, 343 + Err(e) => { 344 + return DownloadSaveResponse::NotFound(Json(Error { 345 + message: format!("Failed to fetch base save data: {}", e), 346 + })); 347 + } 348 + }; 349 + 350 + // Apply deltas sequentially from base+1 to target 351 + let mut current_version_num = *base_version_num; 352 + for (cid, version_number, _, _) in versions.iter() { 353 + if *version_number <= current_version_num { 354 + continue; // Skip versions up to and including base 355 + } 356 + 357 + let storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, cid); 358 + let delta_data = match self.storage.download_blob(&storage_path).await { 359 + Ok(data) => data, 360 + Err(e) => { 361 + return DownloadSaveResponse::NotFound(Json(Error { 362 + message: format!("Failed to fetch delta data for version {}: {}", version_number, e), 363 + })); 364 + } 365 + }; 366 + 367 + // Apply delta patch 368 + let mut reconstructed = Vec::new(); 369 + if let Err(e) = Bspatch::new(&delta_data) 370 + .and_then(|p| p.apply(&full_data, &mut reconstructed)) 371 + { 372 + return DownloadSaveResponse::NotFound(Json(Error { 373 + message: format!("Failed to reconstruct version {}: {}", version_number, e), 374 + })); 375 + } 376 + full_data = reconstructed; 377 + 378 + current_version_num = *version_number; 260 379 } 261 - }; 262 380 263 - DownloadSaveResponse::Ok(Json(SaveDownloadResponse { 264 - id: save_id, 265 - cid, 266 - data: Self::encode_data(&save_data), 267 - version: version_num, 268 - size_bytes: size_bytes as usize, 269 - is_delta: false, 270 - created_at, 271 - })) 381 + DownloadSaveResponse::Ok(Json(SaveDownloadResponse { 382 + id: save_id, 383 + cid: target_cid, 384 + data: Self::encode_data(&full_data), 385 + version: version_num, 386 + size_bytes: full_data.len(), 387 + is_delta: true, 388 + created_at, 389 + })) 390 + } 391 + None => { 392 + return DownloadSaveResponse::NotFound(Json(Error { 393 + message: "Delta save has no base version".to_string(), 394 + })); 395 + } 272 396 } 273 - None => DownloadSaveResponse::NotFound(Json(Error { 274 - message: "Version not found".to_string(), 275 - })), 397 + } else { 398 + // Not a delta - return data directly 399 + let storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, target_cid); 400 + let save_data = match self.storage.download_blob(&storage_path).await { 401 + Ok(data) => data, 402 + Err(e) => { 403 + return DownloadSaveResponse::NotFound(Json(Error { 404 + message: format!("Failed to fetch save data: {}", e), 405 + })); 406 + } 407 + }; 408 + 409 + DownloadSaveResponse::Ok(Json(SaveDownloadResponse { 410 + id: save_id, 411 + cid: target_cid, 412 + data: Self::encode_data(&save_data), 413 + version: version_num, 414 + size_bytes: size_bytes as usize, 415 + is_delta: false, 416 + created_at, 417 + })) 276 418 } 277 419 } 278 420 None => DownloadSaveResponse::NotFound(Json(Error { ··· 306 448 307 449 match save_row { 308 450 Some((save_id, current_cid, created_at)) => { 309 - let version_row: Option<(String, i32, i64)> = sqlx::query_as( 310 - "SELECT cid, version_number, size_bytes FROM save_versions WHERE save_id = ? ORDER BY version_number DESC LIMIT 1", 451 + // Get all versions to check for deltas and reconstruct if needed 452 + let versions: Vec<(String, i32, i64, bool)> = sqlx::query_as( 453 + "SELECT cid, version_number, size_bytes, is_delta FROM save_versions WHERE save_id = ? ORDER BY version_number DESC LIMIT 1", 311 454 ) 312 455 .bind(&save_id) 313 - .fetch_optional(&self.pool) 456 + .fetch_all(&self.pool) 314 457 .await 315 458 .ok() 316 - .flatten(); 459 + .unwrap_or_default(); 460 + 461 + if versions.is_empty() { 462 + return DownloadSaveResponse::NotFound(Json(Error { 463 + message: "No versions found".to_string(), 464 + })); 465 + } 466 + 467 + let target_version_data = versions.first().unwrap(); 468 + let (target_cid, version_num, size_bytes, is_delta) = (*target_version_data).clone(); 469 + 470 + // Check if target version is a delta that needs reconstruction 471 + if is_delta { 472 + // Get all versions from 1 to target for reconstruction 473 + let all_versions: Vec<(String, i32, i64, bool)> = sqlx::query_as( 474 + "SELECT cid, version_number, size_bytes, is_delta FROM save_versions WHERE save_id = ? ORDER BY version_number ASC", 475 + ) 476 + .bind(&save_id) 477 + .fetch_all(&self.pool) 478 + .await 479 + .ok() 480 + .unwrap_or_default(); 481 + 482 + // Find the base version (first non-delta version) 483 + let base_version = all_versions.iter().find(|v| !v.3); 484 + 485 + match base_version { 486 + Some((base_cid, base_version_num, _, _)) => { 487 + // Download base data 488 + let base_storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, base_cid); 489 + let mut full_data = match self.storage.download_blob(&base_storage_path).await { 490 + Ok(data) => data, 491 + Err(e) => { 492 + return DownloadSaveResponse::NotFound(Json(Error { 493 + message: format!("Failed to fetch base save data: {}", e), 494 + })); 495 + } 496 + }; 497 + 498 + // Apply deltas sequentially from base+1 to target 499 + let mut current_version_num = *base_version_num; 500 + for (cid, version_number, _, _) in all_versions.iter() { 501 + if *version_number <= current_version_num { 502 + continue; // Skip versions up to and including base 503 + } 504 + 505 + let storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, cid); 506 + let delta_data = match self.storage.download_blob(&storage_path).await { 507 + Ok(data) => data, 508 + Err(e) => { 509 + return DownloadSaveResponse::NotFound(Json(Error { 510 + message: format!("Failed to fetch delta data for version {}: {}", version_number, e), 511 + })); 512 + } 513 + }; 514 + 515 + // Apply delta patch 516 + let mut reconstructed = Vec::new(); 517 + if let Err(e) = Bspatch::new(&delta_data) 518 + .and_then(|p| p.apply(&full_data, &mut reconstructed)) 519 + { 520 + return DownloadSaveResponse::NotFound(Json(Error { 521 + message: format!("Failed to reconstruct version {}: {}", version_number, e), 522 + })); 523 + } 524 + full_data = reconstructed; 317 525 318 - match version_row { 319 - Some((cid, version_num, size_bytes)) => { 320 - let storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, cid); 321 - let save_data = match self.storage.download_blob(&storage_path).await { 322 - Ok(data) => data, 323 - Err(e) => { 324 - return DownloadSaveResponse::NotFound(Json(Error { 325 - message: format!("Failed to fetch save data: {}", e), 326 - })); 526 + current_version_num = *version_number; 327 527 } 328 - }; 329 528 330 - DownloadSaveResponse::Ok(Json(SaveDownloadResponse { 331 - id: save_id, 332 - cid, 333 - data: Self::encode_data(&save_data), 334 - version: version_num, 335 - size_bytes: size_bytes as usize, 336 - is_delta: false, 337 - created_at, 338 - })) 529 + DownloadSaveResponse::Ok(Json(SaveDownloadResponse { 530 + id: save_id, 531 + cid: target_cid, 532 + data: Self::encode_data(&full_data), 533 + version: version_num, 534 + size_bytes: full_data.len(), 535 + is_delta: true, 536 + created_at, 537 + })) 538 + } 539 + None => { 540 + return DownloadSaveResponse::NotFound(Json(Error { 541 + message: "Delta save has no base version".to_string(), 542 + })); 543 + } 339 544 } 340 - None => DownloadSaveResponse::NotFound(Json(Error { 341 - message: "No versions found".to_string(), 342 - })), 545 + } else { 546 + // Not a delta - return data directly 547 + let storage_path = format!("saves/{}/{}/{}", game_id, gamer_id, target_cid); 548 + let save_data = match self.storage.download_blob(&storage_path).await { 549 + Ok(data) => data, 550 + Err(e) => { 551 + return DownloadSaveResponse::NotFound(Json(Error { 552 + message: format!("Failed to fetch save data: {}", e), 553 + })); 554 + } 555 + }; 556 + 557 + DownloadSaveResponse::Ok(Json(SaveDownloadResponse { 558 + id: save_id, 559 + cid: target_cid, 560 + data: Self::encode_data(&save_data), 561 + version: version_num, 562 + size_bytes: size_bytes as usize, 563 + is_delta: false, 564 + created_at, 565 + })) 343 566 } 344 567 } 345 568 None => DownloadSaveResponse::NotFound(Json(Error { 346 569 message: "Save not found".to_string(), 347 570 })), 348 571 } 572 + } 573 + 574 + /// Get quota status for a gamer 575 + #[oai(path = "/saves/quota", method = "get")] 576 + async fn get_quota_status( 577 + &self, 578 + gamer_id: poem_openapi::param::Query<String>, 579 + ) -> GetQuotaResponse { 580 + let gamer_id = gamer_id.0; 581 + 582 + // Get quota from database, or use default if not exists 583 + let quota = get_quota(&self.pool, &gamer_id).await.ok().flatten().unwrap_or(crate::db::GamerQuota { 584 + gamer_id: gamer_id.clone(), 585 + used_bytes: 0, 586 + limit_bytes: 2147483648, // 2GB default 587 + warning_sent: false, 588 + }); 589 + 590 + GetQuotaResponse::Ok(Json(QuotaResponse { 591 + used: quota.used_bytes as u64, 592 + limit: quota.limit_bytes as u64, 593 + remaining: (quota.limit_bytes - quota.used_bytes) as u64, 594 + })) 349 595 } 350 596 }
+93 -1
api/src/db/mod.rs
··· 1 1 use anyhow::Result; 2 - use sqlx::{Pool, Sqlite}; 2 + use sqlx::{Pool, Row, Sqlite}; 3 3 use std::path::Path; 4 4 5 5 pub type DbPool = Pool<Sqlite>; 6 + 7 + /// Represents a gamer's storage quota 8 + #[derive(Debug, Clone)] 9 + pub struct GamerQuota { 10 + pub gamer_id: String, 11 + pub used_bytes: i64, 12 + pub limit_bytes: i64, 13 + pub warning_sent: bool, 14 + } 15 + 16 + /// Error when quota is exceeded 17 + #[derive(Debug)] 18 + pub struct QuotaExceeded { 19 + pub used_bytes: i64, 20 + pub limit_bytes: i64, 21 + pub requested_bytes: i64, 22 + } 6 23 7 24 pub async fn init_database(db_path: &Path) -> Result<DbPool> { 8 25 if let Some(parent) = db_path.parent() { ··· 99 116 CREATE INDEX IF NOT EXISTS idx_saves_game ON saves(game_id); 100 117 CREATE INDEX IF NOT EXISTS idx_save_versions_save ON save_versions(save_id); 101 118 119 + CREATE TABLE IF NOT EXISTS gamer_quotas ( 120 + gamer_id TEXT PRIMARY KEY, 121 + used_bytes INTEGER NOT NULL DEFAULT 0, 122 + limit_bytes INTEGER NOT NULL DEFAULT 2147483648, 123 + warning_sent INTEGER NOT NULL DEFAULT 0 124 + ); 125 + 102 126 -- Insert default developer if not exists 103 127 INSERT OR IGNORE INTO developers (id, name) VALUES ('default-developer', 'Default Developer'); 104 128 "#, ··· 108 132 109 133 Ok(pool) 110 134 } 135 + 136 + /// Get quota for a gamer, returns None if no quota record exists (uses default) 137 + pub async fn get_quota(pool: &DbPool, gamer_id: &str) -> Result<Option<GamerQuota>> { 138 + let row = sqlx::query( 139 + "SELECT gamer_id, used_bytes, limit_bytes, warning_sent FROM gamer_quotas WHERE gamer_id = ?", 140 + ) 141 + .bind(gamer_id) 142 + .fetch_optional(pool) 143 + .await?; 144 + 145 + Ok(row.map(|r| GamerQuota { 146 + gamer_id: r.get("gamer_id"), 147 + used_bytes: r.get("used_bytes"), 148 + limit_bytes: r.get("limit_bytes"), 149 + warning_sent: r.get::<i64, _>("warning_sent") != 0, 150 + })) 151 + } 152 + 153 + /// Update the used_bytes for a gamer's quota 154 + pub async fn update_quota(pool: &DbPool, gamer_id: &str, used_bytes: i64) -> Result<()> { 155 + sqlx::query( 156 + "INSERT INTO gamer_quotas (gamer_id, used_bytes) VALUES (?, ?) 157 + ON CONFLICT(gamer_id) DO UPDATE SET used_bytes = ?", 158 + ) 159 + .bind(gamer_id) 160 + .bind(used_bytes) 161 + .bind(used_bytes) 162 + .execute(pool) 163 + .await?; 164 + 165 + Ok(()) 166 + } 167 + 168 + /// Check if a gamer has quota for a new upload, returns Ok(remaining_bytes) or Err(QuotaExceeded) 169 + pub async fn check_quota(pool: &DbPool, gamer_id: &str, new_size: i64) -> Result<u64, QuotaExceeded> { 170 + let quota = get_quota(pool, gamer_id).await.ok().flatten().unwrap_or(GamerQuota { 171 + gamer_id: gamer_id.to_string(), 172 + used_bytes: 0, 173 + limit_bytes: 2147483648, // 2GB default 174 + warning_sent: false, 175 + }); 176 + 177 + let total_needed = quota.used_bytes + new_size; 178 + if total_needed > quota.limit_bytes { 179 + return Err(QuotaExceeded { 180 + used_bytes: quota.used_bytes, 181 + limit_bytes: quota.limit_bytes, 182 + requested_bytes: new_size, 183 + }); 184 + } 185 + 186 + Ok((quota.limit_bytes - quota.used_bytes - new_size) as u64) 187 + } 188 + 189 + /// Increment used_bytes after successful upload 190 + pub async fn increment_quota(pool: &DbPool, gamer_id: &str, size_bytes: i64) -> Result<()> { 191 + sqlx::query( 192 + "INSERT INTO gamer_quotas (gamer_id, used_bytes) VALUES (?, ?) 193 + ON CONFLICT(gamer_id) DO UPDATE SET used_bytes = used_bytes + ?", 194 + ) 195 + .bind(gamer_id) 196 + .bind(size_bytes) 197 + .bind(size_bytes) 198 + .execute(pool) 199 + .await?; 200 + 201 + Ok(()) 202 + }