Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

misc genesis fixes

+159 -87
+15
.sqlx/query-3b791fdb8e29043c980963d4d18e1e492c73c39818a8648a7af70555418fb5d1.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "TextArray", 9 + "Int8" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "3b791fdb8e29043c980963d4d18e1e492c73c39818a8648a7af70555418fb5d1" 15 + }
-22
.sqlx/query-3fae97c8a2551c1ef8db06c4cde5480e44c5f771397e01574d0026e5bac6af55.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT r.repo_root_cid\n FROM repos r\n JOIN users u ON u.id = r.user_id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "repo_root_cid", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "3fae97c8a2551c1ef8db06c4cde5480e44c5f771397e01574d0026e5bac6af55" 22 - }
-22
.sqlx/query-5b692e8f6d32dcbdcb45a3fff152a2be5672aadd807a4abab6914f80d57cba02.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT r.repo_root_cid\n FROM repos r\n JOIN users u ON r.user_id = u.id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "repo_root_cid", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "5b692e8f6d32dcbdcb45a3fff152a2be5672aadd807a4abab6914f80d57cba02" 22 - }
-28
.sqlx/query-933f6585efdafedc82a8b6ac3c1513f25459bd9ab08e385ebc929469666d7747.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT id, deactivated_at FROM users WHERE did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "id", 9 - "type_info": "Uuid" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "deactivated_at", 14 - "type_info": "Timestamptz" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - true 25 - ] 26 - }, 27 - "hash": "933f6585efdafedc82a8b6ac3c1513f25459bd9ab08e385ebc929469666d7747" 28 - }
+32
.sqlx/query-d65ebbc09a5756438063cb6eaf8284f17beeedde25d4f41dd6788d9c60d162f7.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT seq, did, commit_cid\n FROM repo_seq\n WHERE event_type = 'commit'\n AND prev_cid IS NULL\n AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "commit_cid", 19 + "type_info": "Text" 20 + } 21 + ], 22 + "parameters": { 23 + "Left": [] 24 + }, 25 + "nullable": [ 26 + false, 27 + false, 28 + true 29 + ] 30 + }, 31 + "hash": "d65ebbc09a5756438063cb6eaf8284f17beeedde25d4f41dd6788d9c60d162f7" 32 + }
+1 -1
src/api/identity/account.rs
··· 996 996 warn!("Failed to sequence account event for {}: {}", did, e); 997 997 } 998 998 if let Err(e) = 999 - crate::api::repo::record::sequence_empty_commit_event(&state, &did).await 999 + crate::api::repo::record::sequence_genesis_commit(&state, &did, &commit_cid, &mst_root, &rev_str).await 1000 1000 { 1001 1001 warn!("Failed to sequence commit event for {}: {}", did, e); 1002 1002 }
+12 -13
src/api/repo/record/utils.rs
··· 512 512 Ok(seq_row.seq) 513 513 } 514 514 515 - pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 516 - let repo_info = sqlx::query!( 517 - "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 518 - did 519 - ) 520 - .fetch_optional(&state.db) 521 - .await 522 - .map_err(|e| format!("DB Error fetching repo root: {}", e))? 523 - .ok_or_else(|| "Repo not found".to_string())?; 515 + pub async fn sequence_genesis_commit( 516 + state: &AppState, 517 + did: &str, 518 + commit_cid: &Cid, 519 + mst_root_cid: &Cid, 520 + rev: &str, 521 + ) -> Result<i64, String> { 524 522 let ops = serde_json::json!([]); 525 523 let blobs: Vec<String> = vec![]; 526 - let blocks_cids: Vec<String> = vec![]; 524 + let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 527 525 let prev_cid: Option<&str> = None; 526 + let commit_cid_str = commit_cid.to_string(); 528 527 let seq_row = sqlx::query!( 529 528 r#" 530 529 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) ··· 532 531 RETURNING seq 533 532 "#, 534 533 did, 535 - repo_info.repo_root_cid, 534 + commit_cid_str, 536 535 prev_cid, 537 536 ops, 538 537 &blobs, 539 538 &blocks_cids, 540 - repo_info.repo_rev 539 + rev 541 540 ) 542 541 .fetch_one(&state.db) 543 542 .await 544 - .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?; 543 + .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?; 545 544 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 546 545 .execute(&state.db) 547 546 .await
+2 -1
src/main.rs
··· 5 5 use tracing::{error, info, warn}; 6 6 use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7 7 use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8 - use tranquil_pds::scheduled::{backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 8 + use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 9 9 use tranquil_pds::state::AppState; 10 10 11 11 #[tokio::main] ··· 32 32 let backfill_db = state.db.clone(); 33 33 let backfill_block_store = state.block_store.clone(); 34 34 tokio::spawn(async move { 35 + backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await; 35 36 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 36 37 backfill_user_blocks(&backfill_db, backfill_block_store).await; 37 38 });
+97
src/scheduled.rs
··· 13 13 use crate::repo::PostgresBlockStore; 14 14 use crate::storage::BlobStorage; 15 15 16 + pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 17 + let broken_genesis_commits = match sqlx::query!( 18 + r#" 19 + SELECT seq, did, commit_cid 20 + FROM repo_seq 21 + WHERE event_type = 'commit' 22 + AND prev_cid IS NULL 23 + AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 24 + "# 25 + ) 26 + .fetch_all(db) 27 + .await 28 + { 29 + Ok(rows) => rows, 30 + Err(e) => { 31 + error!("Failed to query repo_seq for genesis commit backfill: {}", e); 32 + return; 33 + } 34 + }; 35 + 36 + if broken_genesis_commits.is_empty() { 37 + debug!("No genesis commits need blocks_cids backfill"); 38 + return; 39 + } 40 + 41 + info!( 42 + count = broken_genesis_commits.len(), 43 + "Backfilling blocks_cids for genesis commits" 44 + ); 45 + 46 + let mut success = 0; 47 + let mut failed = 0; 48 + 49 + for commit_row in broken_genesis_commits { 50 + let commit_cid_str = match &commit_row.commit_cid { 51 + Some(c) => c.clone(), 52 + None => { 53 + warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 54 + failed += 1; 55 + continue; 56 + } 57 + }; 58 + 59 + let commit_cid = match Cid::from_str(&commit_cid_str) { 60 + Ok(c) => c, 61 + Err(_) => { 62 + warn!(seq = commit_row.seq, "Invalid commit CID"); 63 + failed += 1; 64 + continue; 65 + } 66 + }; 67 + 68 + let block = match block_store.get(&commit_cid).await { 69 + Ok(Some(b)) => b, 70 + Ok(None) => { 71 + warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 72 + failed += 1; 73 + continue; 74 + } 75 + Err(e) => { 76 + warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 77 + failed += 1; 78 + continue; 79 + } 80 + }; 81 + 82 + let commit = match Commit::from_cbor(&block) { 83 + Ok(c) => c, 84 + Err(e) => { 85 + warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 86 + failed += 1; 87 + continue; 88 + } 89 + }; 90 + 91 + let mst_root_cid = commit.data; 92 + let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 93 + 94 + if let Err(e) = sqlx::query!( 95 + "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 96 + &blocks_cids, 97 + commit_row.seq 98 + ) 99 + .execute(db) 100 + .await 101 + { 102 + warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 103 + failed += 1; 104 + } else { 105 + info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 106 + success += 1; 107 + } 108 + } 109 + 110 + info!(success, failed, "Completed genesis commit blocks_cids backfill"); 111 + } 112 + 16 113 pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 17 114 let repos_missing_rev = match sqlx::query!( 18 115 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL"