Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove old user blocks

+409 -245
+15
.sqlx/query-03faaf7b8676e0af1bf620759425632560dabfd5748d0383971c10f9b2847d7d.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n DELETE FROM user_blocks\n WHERE user_id = $1\n AND block_cid = ANY($2)\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "03faaf7b8676e0af1bf620759425632560dabfd5748d0383971c10f9b2847d7d" 15 + }
+15
.sqlx/query-d71881b1dd8111b2afff6a7af8829651379afbe050dcc8a93e0b91eced31ca89.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "d71881b1dd8111b2afff6a7af8829651379afbe050dcc8a93e0b91eced31ca89" 15 + }
+22
.sqlx/query-e70fc3dced4eb7dc220ca2a18cdfcbd5f2d66dff2262bb083fd4118b032ff978.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT block_cid FROM user_blocks WHERE user_id = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "block_cid", 9 + "type_info": "Bytea" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid" 15 + ] 16 + }, 17 + "nullable": [ 18 + false 19 + ] 20 + }, 21 + "hash": "e70fc3dced4eb7dc220ca2a18cdfcbd5f2d66dff2262bb083fd4118b032ff978" 22 + }
+1
migrations/20260106_clear_user_blocks.sql
··· 1 + TRUNCATE TABLE user_blocks;
+9 -8
src/api/backup.rs
··· 220 220 } 221 221 }; 222 222 223 - let car_bytes = match generate_full_backup(&state.block_store, &head_cid).await { 224 - Ok(bytes) => bytes, 225 - Err(e) => { 226 - error!("Failed to generate CAR: {:?}", e); 227 - return ApiError::InternalError(Some("Failed to generate backup".into())) 228 - .into_response(); 229 - } 230 - }; 223 + let car_bytes = 224 + match generate_full_backup(&state.db, &state.block_store, user.id, &head_cid).await { 225 + Ok(bytes) => bytes, 226 + Err(e) => { 227 + error!("Failed to generate CAR: {:?}", e); 228 + return ApiError::InternalError(Some("Failed to generate backup".into())) 229 + .into_response(); 230 + } 231 + }; 231 232 232 233 let block_count = crate::scheduled::count_car_blocks(&car_bytes); 233 234 let size_bytes = car_bytes.len() as i64;
+34 -16
src/api/repo/record/batch.rs
··· 388 388 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 389 389 } 390 390 }; 391 - let mut relevant_blocks = std::collections::BTreeMap::new(); 391 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 392 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 392 393 for key in &modified_keys { 393 - if mst 394 - .blocks_for_path(key, &mut relevant_blocks) 395 - .await 396 - .is_err() 397 - { 394 + if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() { 398 395 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 399 396 .into_response(); 400 397 } 401 398 if original_mst 402 - .blocks_for_path(key, &mut relevant_blocks) 399 + .blocks_for_path(key, &mut old_mst_blocks) 403 400 .await 404 401 .is_err() 405 402 { ··· 407 404 .into_response(); 408 405 } 409 406 } 410 - let mut written_cids = tracking_store.get_all_relevant_cids(); 411 - for cid in relevant_blocks.keys() { 412 - if !written_cids.contains(cid) { 413 - written_cids.push(*cid); 407 + let mut relevant_blocks = new_mst_blocks.clone(); 408 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 409 + let written_cids: Vec<Cid> = tracking_store 410 + .get_all_relevant_cids() 411 + .into_iter() 412 + .chain(relevant_blocks.keys().copied()) 413 + .collect::<std::collections::HashSet<_>>() 414 + .into_iter() 415 + .collect(); 416 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 417 + let prev_record_cids = ops.iter().filter_map(|op| match op { 418 + RecordOp::Update { 419 + prev: Some(cid), .. 414 420 } 415 - } 416 - let written_cids_str = written_cids 417 - .iter() 418 - .map(|c| c.to_string()) 419 - .collect::<Vec<_>>(); 421 + | RecordOp::Delete { 422 + prev: Some(cid), .. 423 + } => Some(*cid), 424 + _ => None, 425 + }); 426 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 427 + .chain( 428 + old_mst_blocks 429 + .keys() 430 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 431 + .copied(), 432 + ) 433 + .chain(prev_record_cids) 434 + .collect::<std::collections::HashSet<_>>() 435 + .into_iter() 436 + .collect(); 420 437 let commit_res = match commit_and_log( 421 438 &state, 422 439 CommitParams { ··· 428 445 ops, 429 446 blocks_cids: &written_cids_str, 430 447 blobs: &all_blob_cids, 448 + obsolete_cids, 431 449 }, 432 450 ) 433 451 .await
+24 -13
src/api/repo/record/delete.rs
··· 129 129 rkey: rkey_for_audit.clone(), 130 130 prev: prev_record_cid, 131 131 }; 132 - let mut relevant_blocks = std::collections::BTreeMap::new(); 132 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 133 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 133 134 if new_mst 134 - .blocks_for_path(&key, &mut relevant_blocks) 135 + .blocks_for_path(&key, &mut new_mst_blocks) 135 136 .await 136 137 .is_err() 137 138 { ··· 139 140 .into_response(); 140 141 } 141 142 if mst 142 - .blocks_for_path(&key, &mut relevant_blocks) 143 + .blocks_for_path(&key, &mut old_mst_blocks) 143 144 .await 144 145 .is_err() 145 146 { 146 147 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 147 148 .into_response(); 148 149 } 149 - let mut written_cids = tracking_store.get_all_relevant_cids(); 150 - for cid in relevant_blocks.keys() { 151 - if !written_cids.contains(cid) { 152 - written_cids.push(*cid); 153 - } 154 - } 155 - let written_cids_str = written_cids 156 - .iter() 157 - .map(|c| c.to_string()) 158 - .collect::<Vec<_>>(); 150 + let mut relevant_blocks = new_mst_blocks.clone(); 151 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 152 + let written_cids: Vec<Cid> = tracking_store 153 + .get_all_relevant_cids() 154 + .into_iter() 155 + .chain(relevant_blocks.keys().copied()) 156 + .collect::<std::collections::HashSet<_>>() 157 + .into_iter() 158 + .collect(); 159 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 160 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 161 + .chain( 162 + old_mst_blocks 163 + .keys() 164 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 165 + .copied(), 166 + ) 167 + .chain(prev_record_cid) 168 + .collect(); 159 169 let commit_result = match commit_and_log( 160 170 &state, 161 171 CommitParams { ··· 167 177 ops: vec![op], 168 178 blocks_cids: &written_cids_str, 169 179 blobs: &[], 180 + obsolete_cids, 170 181 }, 171 182 ) 172 183 .await
+39 -9
src/api/repo/record/utils.rs
··· 92 92 pub ops: Vec<RecordOp>, 93 93 pub blocks_cids: &'a [String], 94 94 pub blobs: &'a [String], 95 + pub obsolete_cids: Vec<Cid>, 95 96 } 96 97 97 98 pub async fn commit_and_log( ··· 107 108 ops, 108 109 blocks_cids, 109 110 blobs, 111 + obsolete_cids, 110 112 } = params; 111 113 let key_row = sqlx::query!( 112 114 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", ··· 200 202 .execute(&mut *tx) 201 203 .await 202 204 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 205 + } 206 + if !obsolete_cids.is_empty() { 207 + let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 208 + sqlx::query!( 209 + r#" 210 + DELETE FROM user_blocks 211 + WHERE user_id = $1 212 + AND block_cid = ANY($2) 213 + "#, 214 + user_id, 215 + &obsolete_bytes as &[Vec<u8>] 216 + ) 217 + .execute(&mut *tx) 218 + .await 219 + .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?; 203 220 } 204 221 let mut upsert_collections: Vec<String> = Vec::new(); 205 222 let mut upsert_rkeys: Vec<String> = Vec::new(); ··· 404 421 rkey: rkey.to_string(), 405 422 cid: record_cid, 406 423 }; 407 - let mut relevant_blocks = std::collections::BTreeMap::new(); 424 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 425 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 408 426 new_mst 409 - .blocks_for_path(&key, &mut relevant_blocks) 427 + .blocks_for_path(&key, &mut new_mst_blocks) 410 428 .await 411 429 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 412 - mst.blocks_for_path(&key, &mut relevant_blocks) 430 + mst.blocks_for_path(&key, &mut old_mst_blocks) 413 431 .await 414 432 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 433 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 434 + .chain( 435 + old_mst_blocks 436 + .keys() 437 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 438 + .copied(), 439 + ) 440 + .collect(); 441 + let mut relevant_blocks = new_mst_blocks; 442 + relevant_blocks.extend(old_mst_blocks); 415 443 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 416 - let mut written_cids = tracking_store.get_all_relevant_cids(); 417 - for cid in relevant_blocks.keys() { 418 - if !written_cids.contains(cid) { 419 - written_cids.push(*cid); 420 - } 421 - } 444 + let written_cids: Vec<Cid> = tracking_store 445 + .get_all_relevant_cids() 446 + .into_iter() 447 + .chain(relevant_blocks.keys().copied()) 448 + .collect::<std::collections::HashSet<_>>() 449 + .into_iter() 450 + .collect(); 422 451 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 423 452 let blob_cids = extract_blob_cids(record); 424 453 let result = commit_and_log( ··· 432 461 ops: vec![op], 433 462 blocks_cids: &written_cids_str, 434 463 blobs: &blob_cids, 464 + obsolete_cids, 435 465 }, 436 466 ) 437 467 .await?;
+47 -26
src/api/repo/record/write.rs
··· 266 266 rkey: rkey.to_string(), 267 267 cid: record_cid, 268 268 }; 269 - let mut relevant_blocks = std::collections::BTreeMap::new(); 269 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 270 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 270 271 if new_mst 271 - .blocks_for_path(&key, &mut relevant_blocks) 272 + .blocks_for_path(&key, &mut new_mst_blocks) 272 273 .await 273 274 .is_err() 274 275 { ··· 276 277 .into_response(); 277 278 } 278 279 if mst 279 - .blocks_for_path(&key, &mut relevant_blocks) 280 + .blocks_for_path(&key, &mut old_mst_blocks) 280 281 .await 281 282 .is_err() 282 283 { 283 284 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 284 285 .into_response(); 285 286 } 287 + let mut relevant_blocks = new_mst_blocks.clone(); 288 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 286 289 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 287 - let mut written_cids = tracking_store.get_all_relevant_cids(); 288 - for cid in relevant_blocks.keys() { 289 - if !written_cids.contains(cid) { 290 - written_cids.push(*cid); 291 - } 292 - } 293 - let written_cids_str = written_cids 294 - .iter() 295 - .map(|c| c.to_string()) 296 - .collect::<Vec<_>>(); 290 + let written_cids: Vec<Cid> = tracking_store 291 + .get_all_relevant_cids() 292 + .into_iter() 293 + .chain(relevant_blocks.keys().copied()) 294 + .collect::<std::collections::HashSet<_>>() 295 + .into_iter() 296 + .collect(); 297 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 297 298 let blob_cids = extract_blob_cids(&input.record); 299 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 300 + .chain( 301 + old_mst_blocks 302 + .keys() 303 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 304 + .copied(), 305 + ) 306 + .collect(); 298 307 let commit_result = match commit_and_log( 299 308 &state, 300 309 CommitParams { ··· 306 315 ops: vec![op], 307 316 blocks_cids: &written_cids_str, 308 317 blobs: &blob_cids, 318 + obsolete_cids, 309 319 }, 310 320 ) 311 321 .await ··· 512 522 cid: record_cid, 513 523 } 514 524 }; 515 - let mut relevant_blocks = std::collections::BTreeMap::new(); 525 + let mut new_mst_blocks = std::collections::BTreeMap::new(); 526 + let mut old_mst_blocks = std::collections::BTreeMap::new(); 516 527 if new_mst 517 - .blocks_for_path(&key, &mut relevant_blocks) 528 + .blocks_for_path(&key, &mut new_mst_blocks) 518 529 .await 519 530 .is_err() 520 531 { ··· 522 533 .into_response(); 523 534 } 524 535 if mst 525 - .blocks_for_path(&key, &mut relevant_blocks) 536 + .blocks_for_path(&key, &mut old_mst_blocks) 526 537 .await 527 538 .is_err() 528 539 { 529 540 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 530 541 .into_response(); 531 542 } 543 + let mut relevant_blocks = new_mst_blocks.clone(); 544 + relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 532 545 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 533 - let mut written_cids = tracking_store.get_all_relevant_cids(); 534 - for cid in relevant_blocks.keys() { 535 - if !written_cids.contains(cid) { 536 - written_cids.push(*cid); 537 - } 538 - } 539 - let written_cids_str = written_cids 540 - .iter() 541 - .map(|c| c.to_string()) 542 - .collect::<Vec<_>>(); 546 + let written_cids: Vec<Cid> = tracking_store 547 + .get_all_relevant_cids() 548 + .into_iter() 549 + .chain(relevant_blocks.keys().copied()) 550 + .collect::<std::collections::HashSet<_>>() 551 + .into_iter() 552 + .collect(); 553 + let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 543 554 let is_update = existing_cid.is_some(); 544 555 let blob_cids = extract_blob_cids(&input.record); 556 + let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 557 + .chain( 558 + old_mst_blocks 559 + .keys() 560 + .filter(|cid| !new_mst_blocks.contains_key(*cid)) 561 + .copied(), 562 + ) 563 + .chain(existing_cid) 564 + .collect(); 545 565 let commit_result = match commit_and_log( 546 566 &state, 547 567 CommitParams { ··· 553 573 ops: vec![op], 554 574 blocks_cids: &written_cids_str, 555 575 blobs: &blob_cids, 576 + obsolete_cids, 556 577 }, 557 578 ) 558 579 .await
+159 -101
src/scheduled.rs
··· 226 226 } 227 227 }; 228 228 229 - let mut block_cids: Vec<Vec<u8>> = Vec::new(); 230 - let mut to_visit = vec![root_cid]; 231 - let mut visited = std::collections::HashSet::new(); 229 + match collect_current_repo_blocks(&block_store, &root_cid).await { 230 + Ok(block_cids) => { 231 + if block_cids.is_empty() { 232 + failed += 1; 233 + continue; 234 + } 232 235 233 - while let Some(cid) = to_visit.pop() { 234 - if visited.contains(&cid) { 235 - continue; 236 + if let Err(e) = sqlx::query!( 237 + r#" 238 + INSERT INTO user_blocks (user_id, block_cid) 239 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 240 + ON CONFLICT (user_id, block_cid) DO NOTHING 241 + "#, 242 + user.user_id, 243 + &block_cids 244 + ) 245 + .execute(db) 246 + .await 247 + { 248 + warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 249 + failed += 1; 250 + } else { 251 + info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 252 + success += 1; 253 + } 236 254 } 237 - visited.insert(cid); 238 - block_cids.push(cid.to_bytes()); 239 - 240 - let block = match block_store.get(&cid).await { 241 - Ok(Some(b)) => b, 242 - _ => continue, 243 - }; 244 - 245 - if let Ok(commit) = Commit::from_cbor(&block) { 246 - to_visit.push(commit.data); 247 - if let Some(prev) = commit.prev { 248 - to_visit.push(prev); 249 - } 250 - } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 251 - if let Some(Ipld::Link(left_cid)) = obj.get("l") { 252 - to_visit.push(*left_cid); 253 - } 254 - if let Some(Ipld::List(entries)) = obj.get("e") { 255 - for entry in entries { 256 - if let Ipld::Map(entry_obj) = entry { 257 - if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 258 - to_visit.push(*tree_cid); 259 - } 260 - if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 261 - to_visit.push(*val_cid); 262 - } 263 - } 264 - } 265 - } 255 + Err(e) => { 256 + warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill"); 257 + failed += 1; 266 258 } 267 259 } 260 + } 268 261 269 - if block_cids.is_empty() { 270 - failed += 1; 262 + info!(success, failed, "Completed user_blocks backfill"); 263 + } 264 + 265 + pub async fn collect_current_repo_blocks( 266 + block_store: &PostgresBlockStore, 267 + head_cid: &Cid, 268 + ) -> Result<Vec<Vec<u8>>, String> { 269 + let mut block_cids: Vec<Vec<u8>> = Vec::new(); 270 + let mut to_visit = vec![*head_cid]; 271 + let mut visited = std::collections::HashSet::new(); 272 + 273 + while let Some(cid) = to_visit.pop() { 274 + if visited.contains(&cid) { 271 275 continue; 272 276 } 277 + visited.insert(cid); 278 + block_cids.push(cid.to_bytes()); 273 279 274 - if let Err(e) = sqlx::query!( 275 - r#" 276 - INSERT INTO user_blocks (user_id, block_cid) 277 - SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 278 - ON CONFLICT (user_id, block_cid) DO NOTHING 279 - "#, 280 - user.user_id, 281 - &block_cids 282 - ) 283 - .execute(db) 284 - .await 285 - { 286 - warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 287 - failed += 1; 288 - } else { 289 - info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 290 - success += 1; 280 + let block = match block_store.get(&cid).await { 281 + Ok(Some(b)) => b, 282 + Ok(None) => continue, 283 + Err(e) => return Err(format!("Failed to get block {}: {:?}", cid, e)), 284 + }; 285 + 286 + if let Ok(commit) = Commit::from_cbor(&block) { 287 + to_visit.push(commit.data); 288 + } else if let Ok(Ipld::Map(ref obj)) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 289 + if let Some(Ipld::Link(left_cid)) = obj.get("l") { 290 + to_visit.push(*left_cid); 291 + } 292 + if let Some(Ipld::List(entries)) = obj.get("e") { 293 + to_visit.extend( 294 + entries 295 + .iter() 296 + .filter_map(|entry| match entry { 297 + Ipld::Map(entry_obj) => Some(entry_obj), 298 + _ => None, 299 + }) 300 + .flat_map(|entry_obj| { 301 + [entry_obj.get("t"), entry_obj.get("v")] 302 + .into_iter() 303 + .flatten() 304 + .filter_map(|v| match v { 305 + Ipld::Link(cid) => Some(*cid), 306 + _ => None, 307 + }) 308 + }), 309 + ); 310 + } 291 311 } 292 312 } 293 313 294 - info!(success, failed, "Completed user_blocks backfill"); 314 + Ok(block_cids) 295 315 } 296 316 297 317 pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { ··· 664 684 } 665 685 }; 666 686 667 - let car_result = generate_full_backup(block_store, &head_cid).await; 687 + let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await; 668 688 let car_bytes = match car_result { 669 689 Ok(bytes) => bytes, 670 690 Err(e) => { ··· 736 756 head_cid: &Cid, 737 757 ) -> Result<Vec<u8>, String> { 738 758 use jacquard_repo::storage::BlockStore; 739 - use std::io::Write; 740 759 741 - let mut car_bytes = 760 + let block_cids_bytes = collect_current_repo_blocks(block_store, head_cid).await?; 761 + let block_cids: Vec<Cid> = block_cids_bytes 762 + .iter() 763 + .filter_map(|b| Cid::try_from(b.as_slice()).ok()) 764 + .collect(); 765 + 766 + let car_bytes = 742 767 encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 743 768 744 - let mut stack = vec![*head_cid]; 745 - let mut visited = std::collections::HashSet::new(); 769 + let blocks = block_store 770 + .get_many(&block_cids) 771 + .await 772 + .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?; 746 773 747 - while let Some(cid) = stack.pop() { 748 - if visited.contains(&cid) { 749 - continue; 750 - } 751 - visited.insert(cid); 774 + let car_bytes = block_cids 775 + .iter() 776 + .zip(blocks.iter()) 777 + .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 778 + .fold(car_bytes, |mut acc, (cid, block)| { 779 + acc.extend(encode_car_block(cid, block)); 780 + acc 781 + }); 752 782 753 - if let Ok(Some(block)) = block_store.get(&cid).await { 754 - let cid_bytes = cid.to_bytes(); 755 - let total_len = cid_bytes.len() + block.len(); 756 - let mut writer = Vec::new(); 757 - crate::sync::car::write_varint(&mut writer, total_len as u64) 758 - .expect("Writing to Vec<u8> should never fail"); 759 - writer 760 - .write_all(&cid_bytes) 761 - .expect("Writing to Vec<u8> should never fail"); 762 - writer 763 - .write_all(&block) 764 - .expect("Writing to Vec<u8> should never fail"); 765 - car_bytes.extend_from_slice(&writer); 783 + Ok(car_bytes) 784 + } 785 + 786 + fn encode_car_block(cid: &Cid, block: &[u8]) -> Vec<u8> { 787 + use std::io::Write; 788 + let cid_bytes = cid.to_bytes(); 789 + let total_len = cid_bytes.len() + block.len(); 790 + let mut writer = Vec::new(); 791 + crate::sync::car::write_varint(&mut writer, total_len as u64) 792 + .expect("Writing to Vec<u8> should never fail"); 793 + writer 794 + .write_all(&cid_bytes) 795 + .expect("Writing to Vec<u8> should never fail"); 796 + writer 797 + .write_all(block) 798 + .expect("Writing to Vec<u8> should never fail"); 799 + writer 800 + } 766 801 767 - if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 768 - extract_links(&value, &mut stack); 769 - } 802 + pub async fn generate_repo_car_from_user_blocks( 803 + db: &PgPool, 804 + block_store: &PostgresBlockStore, 805 + user_id: uuid::Uuid, 806 + head_cid: &Cid, 807 + ) -> Result<Vec<u8>, String> { 808 + use jacquard_repo::storage::BlockStore; 809 + 810 + let block_cid_bytes: Vec<Vec<u8>> = sqlx::query_scalar!( 811 + "SELECT block_cid FROM user_blocks WHERE user_id = $1", 812 + user_id 813 + ) 814 + .fetch_all(db) 815 + .await 816 + .map_err(|e| format!("Failed to fetch user_blocks: {}", e))?; 817 + 818 + if block_cid_bytes.is_empty() { 819 + let cids = collect_current_repo_blocks(block_store, head_cid).await?; 820 + if cids.is_empty() { 821 + return Err("No blocks found for repo".to_string()); 770 822 } 823 + return generate_repo_car(block_store, head_cid).await; 771 824 } 772 825 826 + let block_cids: Vec<Cid> = block_cid_bytes 827 + .iter() 828 + .filter_map(|bytes| Cid::try_from(bytes.as_slice()).ok()) 829 + .collect(); 830 + 831 + let car_bytes = 832 + encode_car_header(head_cid).map_err(|e| format!("Failed to encode CAR header: {}", e))?; 833 + 834 + let blocks = block_store 835 + .get_many(&block_cids) 836 + .await 837 + .map_err(|e| format!("Failed to fetch blocks: {:?}", e))?; 838 + 839 + let car_bytes = block_cids 840 + .iter() 841 + .zip(blocks.iter()) 842 + .filter_map(|(cid, block_opt)| block_opt.as_ref().map(|block| (cid, block))) 843 + .fold(car_bytes, |mut acc, (cid, block)| { 844 + acc.extend(encode_car_block(cid, block)); 845 + acc 846 + }); 847 + 773 848 Ok(car_bytes) 774 849 } 775 850 776 851 pub async fn generate_full_backup( 852 + db: &PgPool, 777 853 block_store: &PostgresBlockStore, 854 + user_id: uuid::Uuid, 778 855 head_cid: &Cid, 779 856 ) -> Result<Vec<u8>, String> { 780 - generate_repo_car(block_store, head_cid).await 781 - } 782 - 783 - fn extract_links(value: &Ipld, stack: &mut Vec<Cid>) { 784 - match value { 785 - Ipld::Link(cid) => { 786 - stack.push(*cid); 787 - } 788 - Ipld::Map(map) => { 789 - for v in map.values() { 790 - extract_links(v, stack); 791 - } 792 - } 793 - Ipld::List(arr) => { 794 - for v in arr { 795 - extract_links(v, stack); 796 - } 797 - } 798 - _ => {} 799 - } 857 + generate_repo_car_from_user_blocks(db, block_store, user_id, head_cid).await 800 858 } 801 859 802 860 pub fn count_car_blocks(car_bytes: &[u8]) -> i32 {
+12 -55
src/sync/repo.rs
··· 1 1 use crate::api::error::ApiError; 2 + use crate::scheduled::generate_repo_car_from_user_blocks; 2 3 use crate::state::AppState; 3 4 use crate::sync::car::encode_car_header; 4 5 use crate::sync::util::assert_repo_availability; ··· 8 9 response::{IntoResponse, Response}, 9 10 }; 10 11 use cid::Cid; 11 - use ipld_core::ipld::Ipld; 12 12 use jacquard_repo::storage::BlockStore; 13 13 use serde::Deserialize; 14 14 use std::io::Write; 15 15 use std::str::FromStr; 16 16 use tracing::error; 17 - 18 - const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 17 20 18 fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 21 19 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") ··· 138 136 return get_repo_since(&state, &query.did, &head_cid, since).await; 139 137 } 140 138 141 - let mut car_bytes = match encode_car_header(&head_cid) { 142 - Ok(h) => h, 139 + let car_bytes = match generate_repo_car_from_user_blocks( 140 + &state.db, 141 + &state.block_store, 142 + account.user_id, 143 + &head_cid, 144 + ) 145 + .await 146 + { 147 + Ok(bytes) => bytes, 143 148 Err(e) => { 144 - error!("Failed to encode CAR header: {}", e); 149 + error!("Failed to generate repo CAR: {}", e); 145 150 return ApiError::InternalError(None).into_response(); 146 151 } 147 152 }; 148 - let mut stack = vec![head_cid]; 149 - let mut visited = std::collections::HashSet::new(); 150 - let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 151 - while let Some(cid) = stack.pop() { 152 - if visited.contains(&cid) { 153 - continue; 154 - } 155 - visited.insert(cid); 156 - if remaining == 0 { 157 - break; 158 - } 159 - remaining -= 1; 160 - if let Ok(Some(block)) = state.block_store.get(&cid).await { 161 - let cid_bytes = cid.to_bytes(); 162 - let total_len = cid_bytes.len() + block.len(); 163 - let mut writer = Vec::new(); 164 - crate::sync::car::write_varint(&mut writer, total_len as u64) 165 - .expect("Writing to Vec<u8> should never fail"); 166 - writer 167 - .write_all(&cid_bytes) 168 - .expect("Writing to Vec<u8> should never fail"); 169 - writer 170 - .write_all(&block) 171 - .expect("Writing to Vec<u8> should never fail"); 172 - car_bytes.extend_from_slice(&writer); 173 - if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 174 - extract_links_ipld(&value, &mut stack); 175 - } 176 - } 177 - } 153 + 178 154 ( 179 155 StatusCode::OK, 180 156 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], ··· 273 249 car_bytes, 274 250 ) 275 251 .into_response() 276 - } 277 - 278 - fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 279 - match value { 280 - Ipld::Link(cid) => { 281 - stack.push(*cid); 282 - } 283 - Ipld::Map(map) => { 284 - for v in map.values() { 285 - extract_links_ipld(v, stack); 286 - } 287 - } 288 - Ipld::List(arr) => { 289 - for v in arr { 290 - extract_links_ipld(v, stack); 291 - } 292 - } 293 - _ => {} 294 - } 295 252 } 296 253 297 254 #[derive(Deserialize)]
+8 -2
tests/account_lifecycle.rs
··· 93 93 let body3: Value = status3.json().await.unwrap(); 94 94 let after_delete_blocks = body3["repoBlocks"].as_i64().unwrap(); 95 95 assert!( 96 - after_delete_blocks >= after_create_blocks, 97 - "Block count should not decrease after deleting a record (was {}, now {})", 96 + after_delete_blocks <= after_create_blocks, 97 + "Block count should decrease or stay same after deleting a record (was {}, now {})", 98 98 after_create_blocks, 99 + after_delete_blocks 100 + ); 101 + assert!( 102 + after_delete_blocks >= initial_blocks, 103 + "Block count after delete should be at least initial count (initial {}, now {})", 104 + initial_blocks, 99 105 after_delete_blocks 100 106 ); 101 107 }
+3 -3
tests/delete_account.rs
··· 174 174 .send() 175 175 .await 176 176 .expect("Failed to send delete request"); 177 - assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 177 + assert_eq!(delete_res.status(), StatusCode::UNAUTHORIZED); 178 178 let body: Value = delete_res.json().await.unwrap(); 179 179 assert_eq!(body["error"], "InvalidToken"); 180 180 } ··· 228 228 .send() 229 229 .await 230 230 .expect("Failed to send delete request"); 231 - assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 231 + assert_eq!(delete_res.status(), StatusCode::UNAUTHORIZED); 232 232 let body: Value = delete_res.json().await.unwrap(); 233 233 assert_eq!(body["error"], "ExpiredToken"); 234 234 } ··· 280 280 .send() 281 281 .await 282 282 .expect("Failed to send delete request"); 283 - assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 283 + assert_eq!(delete_res.status(), StatusCode::UNAUTHORIZED); 284 284 let body: Value = delete_res.json().await.unwrap(); 285 285 assert_eq!(body["error"], "InvalidToken"); 286 286 }
+2 -2
tests/email_update.rs
··· 193 193 .send() 194 194 .await 195 195 .expect("Failed to attempt email update"); 196 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 196 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 197 197 let body: Value = res.json().await.expect("Invalid JSON"); 198 198 assert_eq!(body["error"], "InvalidToken"); 199 199 } ··· 390 390 .send() 391 391 .await 392 392 .expect("Failed to confirm email"); 393 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 393 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 394 394 let body: Value = res.json().await.expect("Invalid JSON"); 395 395 assert_eq!(body["error"], "InvalidToken"); 396 396 }
+16 -7
tests/oauth.rs
··· 261 261 .to_string(); 262 262 } 263 263 assert!( 264 - location.starts_with(redirect_uri), 265 - "Redirect to wrong URI: {}", 264 + location.contains("code="), 265 + "No code in redirect URI: {}", 266 266 location 267 267 ); 268 - assert!(location.contains("code="), "No code in redirect"); 269 268 assert!( 270 - location.contains(&format!("state={}", state)), 271 - "Wrong state" 269 + location.contains(&format!("state={}", state)) 270 + || location.contains(&format!("state%3D{}", state)), 271 + "Wrong state in redirect: {}", 272 + location 272 273 ); 273 274 let code = location 274 275 .split("code=") ··· 527 528 ); 528 529 let twofa_body: Value = twofa_res.json().await.unwrap(); 529 530 let final_location = twofa_body["redirect_uri"].as_str().unwrap(); 530 - assert!(final_location.starts_with(redirect_uri) && final_location.contains("code=")); 531 + assert!( 532 + final_location.contains("code="), 533 + "No code in redirect URI: {}", 534 + final_location 535 + ); 531 536 let auth_code = final_location 532 537 .split("code=") 533 538 .nth(1) ··· 805 810 ); 806 811 let twofa_body: Value = twofa_res.json().await.unwrap(); 807 812 let final_location = twofa_body["redirect_uri"].as_str().unwrap(); 808 - assert!(final_location.starts_with(redirect_uri) && final_location.contains("code=")); 813 + assert!( 814 + final_location.contains("code="), 815 + "No code in redirect URI: {}", 816 + final_location 817 + ); 809 818 let final_code = final_location 810 819 .split("code=") 811 820 .nth(1)
+2 -2
tests/password_reset.rs
··· 177 177 .send() 178 178 .await 179 179 .expect("Failed to reset password"); 180 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 180 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 181 181 let body: Value = res.json().await.expect("Invalid JSON"); 182 182 assert_eq!(body["error"], "InvalidToken"); 183 183 } ··· 241 241 .send() 242 242 .await 243 243 .expect("Failed to reset password"); 244 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 244 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 245 245 let body: Value = res.json().await.expect("Invalid JSON"); 246 246 assert_eq!(body["error"], "ExpiredToken"); 247 247 }
+1 -1
tests/plc_operations.rs
··· 76 76 .send() 77 77 .await 78 78 .unwrap(); 79 - assert_eq!(res.status(), StatusCode::BAD_REQUEST); 79 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 80 80 let body: serde_json::Value = res.json().await.unwrap(); 81 81 assert!(body["error"] == "InvalidToken" || body["error"] == "ExpiredToken"); 82 82 }