Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

Sync conformance fixes vs ref

+1928 -351
-29
.sqlx/query-0f8fd9cbb1ff0fd8951ce082a82cc058ec6db0dde3ab0059d6f340a1fd9ddade.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT u.did, r.repo_root_cid\n FROM repos r\n JOIN users u ON r.user_id = u.id\n WHERE u.did > $1\n ORDER BY u.did ASC\n LIMIT $2\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "did", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "repo_root_cid", 14 - "type_info": "Text" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text", 20 - "Int8" 21 - ] 22 - }, 23 - "nullable": [ 24 - false, 25 - false 26 - ] 27 - }, 28 - "hash": "0f8fd9cbb1ff0fd8951ce082a82cc058ec6db0dde3ab0059d6f340a1fd9ddade" 29 - }
-25
.sqlx/query-0fdf13907693d130babae38f4bb1df772dc11ab682f47918cacb5ae186b4eb24.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT cid FROM blobs\n WHERE created_by_user = $1 AND cid > $2 AND created_at > $3\n ORDER BY cid ASC\n LIMIT $4\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "cid", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Uuid", 15 - "Text", 16 - "Timestamptz", 17 - "Int8" 18 - ] 19 - }, 20 - "nullable": [ 21 - false 22 - ] 23 - }, 24 - "hash": "0fdf13907693d130babae38f4bb1df772dc11ab682f47918cacb5ae186b4eb24" 25 - }
-28
.sqlx/query-1d3748694f23a407e26c793cc43e91c4fa9753dc7c7fd964f6c43de27c5bac4a.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT u.did, r.repo_root_cid\n FROM users u\n LEFT JOIN repos r ON u.id = r.user_id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "did", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "repo_root_cid", 14 - "type_info": "Text" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - false 25 - ] 26 - }, 27 - "hash": "1d3748694f23a407e26c793cc43e91c4fa9753dc7c7fd964f6c43de27c5bac4a" 28 - }
+23
.sqlx/query-485cd286a085cca2910e3c3de757b66211b8eb7ec5dadcfda79485520f792c16.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT DISTINCT unnest(blobs) as \"cid!\"\n FROM repo_seq\n WHERE did = $1 AND rev > $2 AND blobs IS NOT NULL\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "cid!", 9 + "type_info": "Text" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Text", 15 + "Text" 16 + ] 17 + }, 18 + "nullable": [ 19 + null 20 + ] 21 + }, 22 + "hash": "485cd286a085cca2910e3c3de757b66211b8eb7ec5dadcfda79485520f792c16" 23 + }
+100
.sqlx/query-6783bd8e36444e5d6cc25cc1a120618a541ff9eafa457943ca814bd2a3ca72e1.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "created_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "event_type", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "commit_cid", 29 + "type_info": "Text" 30 + }, 31 + { 32 + "ordinal": 5, 33 + "name": "prev_cid", 34 + "type_info": "Text" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "prev_data_cid", 39 + "type_info": "Text" 40 + }, 41 + { 42 + "ordinal": 7, 43 + "name": "ops", 44 + "type_info": "Jsonb" 45 + }, 46 + { 47 + "ordinal": 8, 48 + "name": "blobs", 49 + "type_info": "TextArray" 50 + }, 51 + { 52 + "ordinal": 9, 53 + "name": "blocks_cids", 54 + "type_info": "TextArray" 55 + }, 56 + { 57 + "ordinal": 10, 58 + "name": "handle", 59 + "type_info": "Text" 60 + }, 61 + { 62 + "ordinal": 11, 63 + "name": "active", 64 + "type_info": "Bool" 65 + }, 66 + { 67 + "ordinal": 12, 68 + "name": "status", 69 + "type_info": "Text" 70 + }, 71 + { 72 + "ordinal": 13, 73 + "name": "rev", 74 + "type_info": "Text" 75 + } 76 + ], 77 + "parameters": { 78 + "Left": [ 79 + "Int8" 80 + ] 81 + }, 82 + "nullable": [ 83 + false, 84 + false, 85 + false, 86 + false, 87 + true, 88 + true, 89 + true, 90 + true, 91 + true, 92 + true, 93 + true, 94 + true, 95 + true, 96 + true 97 + ] 98 + }, 99 + "hash": "6783bd8e36444e5d6cc25cc1a120618a541ff9eafa457943ca814bd2a3ca72e1" 100 + }
+46
.sqlx/query-93678a24667d311aaec7c6277aae3764e5761870aa8c10c50f1e3c8e7fdb87d4.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid\n FROM users u\n LEFT JOIN repos r ON r.user_id = u.id\n WHERE u.did = $1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "deactivated_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "takedown_ref", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "repo_root_cid", 29 + "type_info": "Text" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Text" 35 + ] 36 + }, 37 + "nullable": [ 38 + false, 39 + false, 40 + true, 41 + true, 42 + false 43 + ] 44 + }, 45 + "hash": "93678a24667d311aaec7c6277aae3764e5761870aa8c10c50f1e3c8e7fdb87d4" 46 + }
+29
.sqlx/query-9eeebac027c05ac44afa9f6b163762277849b75b647b7bf2ce5104baca795bf6.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT blocks_cids, commit_cid\n FROM repo_seq\n WHERE did = $1 AND rev > $2\n ORDER BY seq DESC\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "blocks_cids", 9 + "type_info": "TextArray" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "commit_cid", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [ 19 + "Text", 20 + "Text" 21 + ] 22 + }, 23 + "nullable": [ 24 + true, 25 + true 26 + ] 27 + }, 28 + "hash": "9eeebac027c05ac44afa9f6b163762277849b75b647b7bf2ce5104baca795bf6" 29 + }
+20
.sqlx/query-a805ece8ccc38c88a6dbca22dd70a6a330f0cc8d72952d5b72a8766199fbf598.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT MAX(seq) FROM repo_seq", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "max", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [] 14 + }, 15 + "nullable": [ 16 + null 17 + ] 18 + }, 19 + "hash": "a805ece8ccc38c88a6dbca22dd70a6a330f0cc8d72952d5b72a8766199fbf598" 20 + }
+100
.sqlx/query-abed6772d0cb2924c0aa27d479c866bd099105461ffa126dcbe97ce9089a8b5d.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n LIMIT 1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "created_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "event_type", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "commit_cid", 29 + "type_info": "Text" 30 + }, 31 + { 32 + "ordinal": 5, 33 + "name": "prev_cid", 34 + "type_info": "Text" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "prev_data_cid", 39 + "type_info": "Text" 40 + }, 41 + { 42 + "ordinal": 7, 43 + "name": "ops", 44 + "type_info": "Jsonb" 45 + }, 46 + { 47 + "ordinal": 8, 48 + "name": "blobs", 49 + "type_info": "TextArray" 50 + }, 51 + { 52 + "ordinal": 9, 53 + "name": "blocks_cids", 54 + "type_info": "TextArray" 55 + }, 56 + { 57 + "ordinal": 10, 58 + "name": "handle", 59 + "type_info": "Text" 60 + }, 61 + { 62 + "ordinal": 11, 63 + "name": "active", 64 + "type_info": "Bool" 65 + }, 66 + { 67 + "ordinal": 12, 68 + "name": "status", 69 + "type_info": "Text" 70 + }, 71 + { 72 + "ordinal": 13, 73 + "name": "rev", 74 + "type_info": "Text" 75 + } 76 + ], 77 + "parameters": { 78 + "Left": [ 79 + "Int8" 80 + ] 81 + }, 82 + "nullable": [ 83 + false, 84 + false, 85 + false, 86 + false, 87 + true, 88 + true, 89 + true, 90 + true, 91 + true, 92 + true, 93 + true, 94 + true, 95 + true, 96 + true 97 + ] 98 + }, 99 + "hash": "abed6772d0cb2924c0aa27d479c866bd099105461ffa126dcbe97ce9089a8b5d" 100 + }
+22
.sqlx/query-b43902272f2710b849840b29f2e4c7c9959116bbdf4bed09939dfd82749ccb5f.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT MIN(seq) FROM repo_seq WHERE created_at >= $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "min", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Timestamptz" 15 + ] 16 + }, 17 + "nullable": [ 18 + null 19 + ] 20 + }, 21 + "hash": "b43902272f2710b849840b29f2e4c7c9959116bbdf4bed09939dfd82749ccb5f" 22 + }
+34
.sqlx/query-dd1b61d6ec81fd891d4effd3b51e6c22308b878acdc5355dfcb04c5664c9463b.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT storage_key, mime_type, size_bytes FROM blobs WHERE cid = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "storage_key", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "mime_type", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "size_bytes", 19 + "type_info": "Int8" 20 + } 21 + ], 22 + "parameters": { 23 + "Left": [ 24 + "Text" 25 + ] 26 + }, 27 + "nullable": [ 28 + false, 29 + false, 30 + false 31 + ] 32 + }, 33 + "hash": "dd1b61d6ec81fd891d4effd3b51e6c22308b878acdc5355dfcb04c5664c9463b" 34 + }
+47
.sqlx/query-f6723557ad451b8f4349df8ad4ec35f7abc8590262156bfd4cdc4dbf11ddf8c9.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid, r.repo_rev\n FROM repos r\n JOIN users u ON r.user_id = u.id\n WHERE u.did > $1\n ORDER BY u.did ASC\n LIMIT $2\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "did", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "deactivated_at", 14 + "type_info": "Timestamptz" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "takedown_ref", 19 + "type_info": "Text" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "repo_root_cid", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "repo_rev", 29 + "type_info": "Text" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Text", 35 + "Int8" 36 + ] 37 + }, 38 + "nullable": [ 39 + false, 40 + true, 41 + true, 42 + false, 43 + true 44 + ] 45 + }, 46 + "hash": "f6723557ad451b8f4349df8ad4ec35f7abc8590262156bfd4cdc4dbf11ddf8c9" 47 + }
+43 -16
justfile
··· 1 1 default: 2 2 @just --list 3 + 3 4 run: 4 5 cargo run 5 6 run-release: ··· 17 18 fmt-check: 18 19 cargo fmt -- --check 19 20 lint: fmt-check clippy 20 - # Run tests (auto-starts and auto-cleans containers) 21 + 22 + test-all *args: 23 + ./scripts/run-tests.sh {{args}} 24 + 25 + test-auth: 26 + ./scripts/run-tests.sh --test oauth --test oauth_lifecycle --test oauth_scopes --test oauth_security --test oauth_client_metadata --test jwt_security --test session_management --test change_password --test password_reset 27 + 28 + test-admin: 29 + ./scripts/run-tests.sh --test admin_email --test admin_invite --test admin_moderation --test admin_search --test admin_stats 30 + 31 + test-sync: 32 + ./scripts/run-tests.sh --test sync_repo --test sync_blob --test sync_conformance --test sync_deprecated --test firehose_validation 33 + 34 + test-repo: 35 + ./scripts/run-tests.sh --test repo_batch --test repo_blob --test record_validation --test lifecycle_record 36 + 37 + test-identity: 38 + ./scripts/run-tests.sh --test identity --test did_web --test plc_migration --test plc_operations --test plc_validation 39 + 40 + test-account: 41 + ./scripts/run-tests.sh --test lifecycle_session --test delete_account --test invite --test email_update --test account_notifications 42 + 43 + test-security: 44 + ./scripts/run-tests.sh --test security_fixes --test banned_words --test rate_limit --test moderation 45 + 46 + test-import: 47 + ./scripts/run-tests.sh --test import_verification --test import_with_verification 48 + 49 + test-misc: 50 + ./scripts/run-tests.sh --test actor --test commit_signing --test image_processing --test lifecycle_social --test notifications --test server --test signing_key --test verify_live_commit 51 + 21 52 test *args: 22 53 ./scripts/run-tests.sh {{args}} 23 - # Run a specific test file 24 - test-file file: 25 - ./scripts/run-tests.sh --test {{file}} 26 - # Run tests with testcontainers (slower, no shared infra) 27 - test-standalone: 28 - TRANQUIL_PDS_ALLOW_INSECURE_SECRETS=1 cargo test 29 - # Manually manage test infrastructure (for debugging) 30 - test-infra-start: 54 + 55 + test-one name: 56 + ./scripts/run-tests.sh --test {{name}} 57 + 58 + infra-start: 31 59 ./scripts/test-infra.sh start 32 - test-infra-stop: 60 + infra-stop: 33 61 ./scripts/test-infra.sh stop 34 - test-infra-status: 62 + infra-status: 35 63 ./scripts/test-infra.sh status 64 + 36 65 clean: 37 66 cargo clean 38 67 doc: ··· 53 82 podman compose logs -f 54 83 podman-build: 55 84 podman compose build 56 - # Frontend commands (Deno) 85 + 57 86 frontend-dev: 58 87 . ~/.deno/env && cd frontend && deno task dev 59 88 frontend-build: 60 89 . ~/.deno/env && cd frontend && deno task build 61 90 frontend-clean: 62 91 rm -rf frontend/dist frontend/node_modules 63 - # Frontend tests 92 + 64 93 frontend-test *args: 65 94 . ~/.deno/env && cd frontend && VITEST=true deno task test:run {{args}} 66 95 frontend-test-watch: ··· 69 98 . ~/.deno/env && cd frontend && VITEST=true deno task test:ui 70 99 frontend-test-coverage: 71 100 . ~/.deno/env && cd frontend && VITEST=true deno task test:run --coverage 72 - # Build all (frontend + backend) 101 + 73 102 build-all: frontend-build build 74 - # Test all (backend + frontend) 75 - test-all: test frontend-test
+1 -19
src/api/admin/account/info.rs
··· 88 88 } 89 89 } 90 90 91 - fn parse_repeated_param(query: Option<&str>, key: &str) -> Vec<String> { 92 - query 93 - .map(|q| { 94 - q.split('&') 95 - .filter_map(|pair| { 96 - let (k, v) = pair.split_once('=')?; 97 - 98 - if k == key { 99 - Some(urlencoding::decode(v).ok()?.into_owned()) 100 - } else { 101 - None 102 - } 103 - }) 104 - .collect() 105 - }) 106 - .unwrap_or_default() 107 - } 108 - 109 91 pub async fn get_account_infos( 110 92 State(state): State<AppState>, 111 93 _auth: BearerAuthAdmin, 112 94 RawQuery(raw_query): RawQuery, 113 95 ) -> Response { 114 - let dids = parse_repeated_param(raw_query.as_deref(), "dids"); 96 + let dids = crate::util::parse_repeated_query_param(raw_query.as_deref(), "dids"); 115 97 if dids.is_empty() { 116 98 return ( 117 99 StatusCode::BAD_REQUEST,
+3 -1
src/moderation/mod.rs
··· 107 107 use base64::Engine; 108 108 109 109 fn d(b64: &str) -> String { 110 - let bytes = base64::engine::general_purpose::STANDARD.decode(b64).unwrap(); 110 + let bytes = base64::engine::general_purpose::STANDARD 111 + .decode(b64) 112 + .unwrap(); 111 113 String::from_utf8(bytes).unwrap() 112 114 } 113 115
+33 -56
src/sync/blob.rs
··· 1 1 use crate::state::AppState; 2 + use crate::sync::util::assert_repo_availability; 2 3 use axum::{ 3 4 Json, 4 5 body::Body, ··· 37 38 ) 38 39 .into_response(); 39 40 } 40 - let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 41 - .fetch_optional(&state.db) 42 - .await; 43 - match user_exists { 44 - Ok(None) => { 45 - return ( 46 - StatusCode::NOT_FOUND, 47 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 48 - ) 49 - .into_response(); 50 - } 51 - Err(e) => { 52 - error!("DB error in get_blob: {:?}", e); 53 - return ( 54 - StatusCode::INTERNAL_SERVER_ERROR, 55 - Json(json!({"error": "InternalError"})), 56 - ) 57 - .into_response(); 58 - } 59 - Ok(Some(_)) => {} 60 - } 41 + 42 + let _account = match assert_repo_availability(&state.db, did, false).await { 43 + Ok(a) => a, 44 + Err(e) => return e.into_response(), 45 + }; 46 + 61 47 let blob_result = sqlx::query!( 62 - "SELECT storage_key, mime_type FROM blobs WHERE cid = $1", 48 + "SELECT storage_key, mime_type, size_bytes FROM blobs WHERE cid = $1", 63 49 cid 64 50 ) 65 51 .fetch_optional(&state.db) ··· 68 54 Ok(Some(row)) => { 69 55 let storage_key = &row.storage_key; 70 56 let mime_type = &row.mime_type; 57 + let size_bytes = row.size_bytes; 71 58 match state.blob_store.get(storage_key).await { 72 59 Ok(data) => Response::builder() 73 60 .status(StatusCode::OK) 74 61 .header(header::CONTENT_TYPE, mime_type) 62 + .header(header::CONTENT_LENGTH, size_bytes.to_string()) 63 + .header("x-content-type-options", "nosniff") 64 + .header("content-security-policy", "default-src 'none'; sandbox") 75 65 .body(Body::from(data)) 76 66 .unwrap(), 77 67 Err(e) => { ··· 127 117 ) 128 118 .into_response(); 129 119 } 120 + 121 + let account = match assert_repo_availability(&state.db, did, false).await { 122 + Ok(a) => a, 123 + Err(e) => return e.into_response(), 124 + }; 125 + 130 126 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 131 127 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 132 - let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 133 - .fetch_optional(&state.db) 134 - .await; 135 - let user_id = match user_result { 136 - Ok(Some(row)) => row.id, 137 - Ok(None) => { 138 - return ( 139 - StatusCode::NOT_FOUND, 140 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 141 - ) 142 - .into_response(); 143 - } 144 - Err(e) => { 145 - error!("DB error in list_blobs: {:?}", e); 146 - return ( 147 - StatusCode::INTERNAL_SERVER_ERROR, 148 - Json(json!({"error": "InternalError"})), 149 - ) 150 - .into_response(); 151 - } 152 - }; 128 + let user_id = account.user_id; 129 + 153 130 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 154 - let since_time = chrono::DateTime::parse_from_rfc3339(since) 155 - .map(|dt| dt.with_timezone(&chrono::Utc)) 156 - .unwrap_or_else(|_| chrono::Utc::now()); 157 - sqlx::query!( 131 + sqlx::query_scalar!( 158 132 r#" 159 - SELECT cid FROM blobs 160 - WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 161 - ORDER BY cid ASC 162 - LIMIT $4 133 + SELECT DISTINCT unnest(blobs) as "cid!" 134 + FROM repo_seq 135 + WHERE did = $1 AND rev > $2 AND blobs IS NOT NULL 163 136 "#, 164 - user_id, 165 - cursor_cid, 166 - since_time, 167 - limit + 1 137 + did, 138 + since 168 139 ) 169 140 .fetch_all(&state.db) 170 141 .await 171 - .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 142 + .map(|mut cids| { 143 + cids.sort(); 144 + cids.into_iter() 145 + .filter(|c| c.as_str() > cursor_cid) 146 + .take((limit + 1) as usize) 147 + .collect() 148 + }) 172 149 } else { 173 150 sqlx::query!( 174 151 r#"
+12
src/sync/car.rs
··· 34 34 result.extend_from_slice(&header_cbor); 35 35 Ok(result) 36 36 } 37 + 38 + pub fn encode_car_header_null_root() -> Result<Vec<u8>, String> { 39 + let header = CarHeader::new_v1(vec![]); 40 + let header_cbor = header 41 + .encode() 42 + .map_err(|e| format!("Failed to encode CAR header: {:?}", e))?; 43 + let mut result = Vec::new(); 44 + write_varint(&mut result, header_cbor.len() as u64) 45 + .expect("Writing to Vec<u8> should never fail"); 46 + result.extend_from_slice(&header_cbor); 47 + Ok(result) 48 + }
+97 -68
src/sync/commit.rs
··· 1 1 use crate::state::AppState; 2 + use crate::sync::util::{AccountStatus, assert_repo_availability, get_account_with_status}; 2 3 use axum::{ 3 4 Json, 4 5 extract::{Query, State}, ··· 43 44 ) 44 45 .into_response(); 45 46 } 46 - let result = sqlx::query!( 47 - r#" 48 - SELECT r.repo_root_cid 49 - FROM repos r 50 - JOIN users u ON r.user_id = u.id 51 - WHERE u.did = $1 52 - "#, 53 - did 54 - ) 55 - .fetch_optional(&state.db) 56 - .await; 57 - match result { 58 - Ok(Some(row)) => { 59 - let rev = get_rev_from_commit(&state, &row.repo_root_cid) 60 - .await 61 - .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string()); 62 - ( 63 - StatusCode::OK, 64 - Json(GetLatestCommitOutput { 65 - cid: row.repo_root_cid, 66 - rev, 67 - }), 47 + 48 + let account = match assert_repo_availability(&state.db, did, false).await { 49 + Ok(a) => a, 50 + Err(e) => return e.into_response(), 51 + }; 52 + 53 + let repo_root_cid = match account.repo_root_cid { 54 + Some(cid) => cid, 55 + None => { 56 + return ( 57 + StatusCode::BAD_REQUEST, 58 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 68 59 ) 69 - .into_response() 60 + .into_response(); 70 61 } 71 - Ok(None) => ( 72 - StatusCode::NOT_FOUND, 73 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 74 - ) 75 - .into_response(), 76 - Err(e) => { 77 - error!("DB error in get_latest_commit: {:?}", e); 78 - ( 62 + }; 63 + 64 + let rev = match get_rev_from_commit(&state, &repo_root_cid).await { 65 + Some(r) => r, 66 + None => { 67 + error!( 68 + "Failed to parse commit for DID {}: CID {}", 69 + did, repo_root_cid 70 + ); 71 + return ( 79 72 StatusCode::INTERNAL_SERVER_ERROR, 80 - Json(json!({"error": "InternalError"})), 73 + Json(json!({"error": "InternalError", "message": "Failed to read repo commit"})), 81 74 ) 82 - .into_response() 75 + .into_response(); 83 76 } 84 - } 77 + }; 78 + 79 + ( 80 + StatusCode::OK, 81 + Json(GetLatestCommitOutput { 82 + cid: repo_root_cid, 83 + rev, 84 + }), 85 + ) 86 + .into_response() 85 87 } 86 88 87 89 #[derive(Deserialize)] ··· 97 99 pub head: String, 98 100 pub rev: String, 99 101 pub active: bool, 102 + #[serde(skip_serializing_if = "Option::is_none")] 103 + pub status: Option<String>, 100 104 } 101 105 102 106 #[derive(Serialize)] ··· 114 118 let cursor_did = params.cursor.as_deref().unwrap_or(""); 115 119 let result = sqlx::query!( 116 120 r#" 117 - SELECT u.did, r.repo_root_cid 121 + SELECT u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid, r.repo_rev 118 122 FROM repos r 119 123 JOIN users u ON r.user_id = u.id 120 124 WHERE u.did > $1 ··· 131 135 let has_more = rows.len() as i64 > limit; 132 136 let mut repos: Vec<RepoInfo> = Vec::new(); 133 137 for row in rows.iter().take(limit as usize) { 134 - let rev = get_rev_from_commit(&state, &row.repo_root_cid) 135 - .await 136 - .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string()); 138 + let rev = match get_rev_from_commit(&state, &row.repo_root_cid).await { 139 + Some(r) => r, 140 + None => { 141 + if let Some(ref stored_rev) = row.repo_rev { 142 + stored_rev.clone() 143 + } else { 144 + tracing::warn!( 145 + "Failed to parse commit for DID {} in list_repos: CID {}", 146 + row.did, 147 + row.repo_root_cid 148 + ); 149 + continue; 150 + } 151 + } 152 + }; 153 + let status = if row.takedown_ref.is_some() { 154 + AccountStatus::Takendown 155 + } else if row.deactivated_at.is_some() { 156 + AccountStatus::Deactivated 157 + } else { 158 + AccountStatus::Active 159 + }; 137 160 repos.push(RepoInfo { 138 161 did: row.did.clone(), 139 162 head: row.repo_root_cid.clone(), 140 163 rev, 141 - active: true, 164 + active: status.is_active(), 165 + status: status.as_str().map(String::from), 142 166 }); 143 167 } 144 168 let next_cursor = if has_more { ··· 175 199 pub struct GetRepoStatusOutput { 176 200 pub did: String, 177 201 pub active: bool, 202 + #[serde(skip_serializing_if = "Option::is_none")] 203 + pub status: Option<String>, 204 + #[serde(skip_serializing_if = "Option::is_none")] 178 205 pub rev: Option<String>, 179 206 } 180 207 ··· 190 217 ) 191 218 .into_response(); 192 219 } 193 - let result = sqlx::query!( 194 - r#" 195 - SELECT u.did, r.repo_root_cid 196 - FROM users u 197 - LEFT JOIN repos r ON u.id = r.user_id 198 - WHERE u.did = $1 199 - "#, 200 - did 201 - ) 202 - .fetch_optional(&state.db) 203 - .await; 204 - match result { 205 - Ok(Some(row)) => { 206 - let rev = get_rev_from_commit(&state, &row.repo_root_cid).await; 207 - ( 208 - StatusCode::OK, 209 - Json(GetRepoStatusOutput { 210 - did: row.did, 211 - active: true, 212 - rev, 213 - }), 220 + 221 + let account = match get_account_with_status(&state.db, did).await { 222 + Ok(Some(a)) => a, 223 + Ok(None) => { 224 + return ( 225 + StatusCode::BAD_REQUEST, 226 + Json(json!({"error": "RepoNotFound", "message": format!("Could not find repo for DID: {}", did)})), 214 227 ) 215 228 .into_response() 216 229 } 217 - Ok(None) => ( 218 - StatusCode::NOT_FOUND, 219 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 220 - ) 221 - .into_response(), 222 230 Err(e) => { 223 231 error!("DB error in get_repo_status: {:?}", e); 224 - ( 232 + return ( 225 233 StatusCode::INTERNAL_SERVER_ERROR, 226 234 Json(json!({"error": "InternalError"})), 227 235 ) 228 - .into_response() 236 + .into_response(); 237 + } 238 + }; 239 + 240 + let rev = if account.status.is_active() { 241 + if let Some(ref cid) = account.repo_root_cid { 242 + get_rev_from_commit(&state, cid).await 243 + } else { 244 + None 229 245 } 230 - } 246 + } else { 247 + None 248 + }; 249 + 250 + ( 251 + StatusCode::OK, 252 + Json(GetRepoStatusOutput { 253 + did: account.did, 254 + active: account.status.is_active(), 255 + status: account.status.as_str().map(String::from), 256 + rev, 257 + }), 258 + ) 259 + .into_response() 231 260 }
+19
src/sync/frame.rs
··· 74 74 pub time: String, 75 75 } 76 76 77 + #[derive(Debug, Serialize, Deserialize)] 78 + pub struct InfoFrame { 79 + pub name: String, 80 + #[serde(skip_serializing_if = "Option::is_none")] 81 + pub message: Option<String>, 82 + } 83 + 84 + #[derive(Debug, Serialize, Deserialize)] 85 + pub struct ErrorFrameHeader { 86 + pub op: i64, 87 + } 88 + 89 + #[derive(Debug, Serialize, Deserialize)] 90 + pub struct ErrorFrameBody { 91 + pub error: String, 92 + #[serde(skip_serializing_if = "Option::is_none")] 93 + pub message: Option<String>, 94 + } 95 + 77 96 pub struct CommitFrameBuilder { 78 97 pub seq: i64, 79 98 pub did: String,
+4
src/sync/mod.rs
··· 18 18 pub use deprecated::{get_checkout, get_head}; 19 19 pub use repo::{get_blocks, get_record, get_repo}; 20 20 pub use subscribe_repos::subscribe_repos; 21 + pub use util::{ 22 + AccountStatus, RepoAccount, RepoAvailabilityError, assert_repo_availability, 23 + get_account_with_status, 24 + }; 21 25 pub use verify::{CarVerifier, VerifiedCar, VerifyError};
+210 -72
src/sync/repo.rs
··· 1 1 use crate::state::AppState; 2 2 use crate::sync::car::encode_car_header; 3 + use crate::sync::util::assert_repo_availability; 3 4 use axum::{ 4 5 Json, 5 - extract::{Query, State}, 6 + extract::{Query, RawQuery, State}, 6 7 http::StatusCode, 7 8 response::{IntoResponse, Response}, 8 9 }; ··· 17 18 18 19 const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20 20 - #[derive(Deserialize)] 21 - pub struct GetBlocksQuery { 22 - pub did: String, 23 - pub cids: String, 21 + fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 22 + let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 23 + .into_iter() 24 + .next() 25 + .ok_or("Missing required parameter: did")?; 26 + let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 27 + Ok((did, cids)) 24 28 } 25 29 26 - pub async fn get_blocks( 27 - State(state): State<AppState>, 28 - Query(query): Query<GetBlocksQuery>, 29 - ) -> Response { 30 - let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 31 - .fetch_optional(&state.db) 32 - .await 33 - .unwrap_or(None); 34 - if user_exists.is_none() { 35 - return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 36 - } 37 - let cids_str: Vec<&str> = query.cids.split(',').collect(); 30 + pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 31 + let query_string = match query { 32 + Some(q) => q, 33 + None => { 34 + return ( 35 + StatusCode::BAD_REQUEST, 36 + Json(json!({"error": "InvalidRequest", "message": "Missing query parameters"})), 37 + ) 38 + .into_response(); 39 + } 40 + }; 41 + 42 + let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 43 + Ok(parsed) => parsed, 44 + Err(msg) => { 45 + return ( 46 + StatusCode::BAD_REQUEST, 47 + Json(json!({"error": "InvalidRequest", "message": msg})), 48 + ) 49 + .into_response(); 50 + } 51 + }; 52 + 53 + let _account = match assert_repo_availability(&state.db, &did, false).await { 54 + Ok(a) => a, 55 + Err(e) => return e.into_response(), 56 + }; 57 + 38 58 let mut cids = Vec::new(); 39 - for s in cids_str { 59 + for s in &cid_strings { 40 60 match Cid::from_str(s) { 41 61 Ok(cid) => cids.push(cid), 42 - Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 62 + Err(_) => return ( 63 + StatusCode::BAD_REQUEST, 64 + Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", s)})), 65 + ) 66 + .into_response(), 43 67 } 44 68 } 69 + 70 + if cids.is_empty() { 71 + return ( 72 + StatusCode::BAD_REQUEST, 73 + Json(json!({"error": "InvalidRequest", "message": "No CIDs provided"})), 74 + ) 75 + .into_response(); 76 + } 77 + 45 78 let blocks_res = state.block_store.get_many(&cids).await; 46 79 let blocks = match blocks_res { 47 80 Ok(blocks) => blocks, 48 81 Err(e) => { 49 82 error!("Failed to get blocks: {}", e); 50 - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 83 + return ( 84 + StatusCode::INTERNAL_SERVER_ERROR, 85 + Json(json!({"error": "InternalError", "message": "Failed to get blocks"})), 86 + ) 87 + .into_response(); 51 88 } 52 89 }; 53 - if cids.is_empty() { 54 - return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 90 + 91 + let mut missing_cids: Vec<String> = Vec::new(); 92 + for (i, block_opt) in blocks.iter().enumerate() { 93 + if block_opt.is_none() { 94 + missing_cids.push(cids[i].to_string()); 95 + } 55 96 } 56 - let root_cid = cids[0]; 57 - let header = match encode_car_header(&root_cid) { 97 + if !missing_cids.is_empty() { 98 + return ( 99 + StatusCode::BAD_REQUEST, 100 + Json(json!({ 101 + "error": "InvalidRequest", 102 + "message": format!("Could not find blocks: {}", missing_cids.join(", ")) 103 + })), 104 + ) 105 + .into_response(); 106 + } 107 + 108 + let header = match crate::sync::car::encode_car_header_null_root() { 58 109 Ok(h) => h, 59 110 Err(e) => { 60 111 error!("Failed to encode CAR header: {}", e); 61 - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response(); 112 + return ( 113 + StatusCode::INTERNAL_SERVER_ERROR, 114 + Json(json!({"error": "InternalError", "message": "Failed to encode CAR"})), 115 + ) 116 + .into_response(); 62 117 } 63 118 }; 64 119 let mut car_bytes = header; ··· 97 152 State(state): State<AppState>, 98 153 Query(query): Query<GetRepoQuery>, 99 154 ) -> Response { 100 - let repo_row = sqlx::query!( 101 - r#" 102 - SELECT r.repo_root_cid 103 - FROM repos r 104 - JOIN users u ON u.id = r.user_id 105 - WHERE u.did = $1 106 - "#, 107 - query.did 108 - ) 109 - .fetch_optional(&state.db) 110 - .await 111 - .unwrap_or(None); 112 - let head_str = match repo_row { 113 - Some(r) => r.repo_root_cid, 155 + let account = match assert_repo_availability(&state.db, &query.did, false).await { 156 + Ok(a) => a, 157 + Err(e) => return e.into_response(), 158 + }; 159 + 160 + let head_str = match account.repo_root_cid { 161 + Some(cid) => cid, 114 162 None => { 115 - let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 116 - .fetch_optional(&state.db) 117 - .await 118 - .unwrap_or(None); 119 - if user_exists.is_none() { 120 - return ( 121 - StatusCode::NOT_FOUND, 122 - Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 123 - ) 124 - .into_response(); 125 - } else { 126 - return ( 127 - StatusCode::NOT_FOUND, 128 - Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 129 - ) 130 - .into_response(); 131 - } 163 + return ( 164 + StatusCode::BAD_REQUEST, 165 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 166 + ) 167 + .into_response(); 132 168 } 133 169 }; 170 + 134 171 let head_cid = match Cid::from_str(&head_str) { 135 172 Ok(c) => c, 136 173 Err(_) => { ··· 141 178 .into_response(); 142 179 } 143 180 }; 181 + 182 + if let Some(since) = &query.since { 183 + return get_repo_since(&state, &query.did, &head_cid, since).await; 184 + } 185 + 144 186 let mut car_bytes = match encode_car_header(&head_cid) { 145 187 Ok(h) => h, 146 188 Err(e) => { ··· 189 231 .into_response() 190 232 } 191 233 234 + async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 235 + let events = sqlx::query!( 236 + r#" 237 + SELECT blocks_cids, commit_cid 238 + FROM repo_seq 239 + WHERE did = $1 AND rev > $2 240 + ORDER BY seq DESC 241 + "#, 242 + did, 243 + since 244 + ) 245 + .fetch_all(&state.db) 246 + .await; 247 + 248 + let events = match events { 249 + Ok(e) => e, 250 + Err(e) => { 251 + error!("DB error in get_repo_since: {:?}", e); 252 + return ( 253 + StatusCode::INTERNAL_SERVER_ERROR, 254 + Json(json!({"error": "InternalError", "message": "Database error"})), 255 + ) 256 + .into_response(); 257 + } 258 + }; 259 + 260 + let mut block_cids: Vec<Cid> = Vec::new(); 261 + for event in &events { 262 + if let Some(cids) = &event.blocks_cids { 263 + for cid_str in cids { 264 + if let Ok(cid) = Cid::from_str(cid_str) 265 + && !block_cids.contains(&cid) 266 + { 267 + block_cids.push(cid); 268 + } 269 + } 270 + } 271 + if let Some(commit_cid_str) = &event.commit_cid 272 + && let Ok(cid) = Cid::from_str(commit_cid_str) 273 + && !block_cids.contains(&cid) 274 + { 275 + block_cids.push(cid); 276 + } 277 + } 278 + 279 + let mut car_bytes = match encode_car_header(head_cid) { 280 + Ok(h) => h, 281 + Err(e) => { 282 + return ( 283 + StatusCode::INTERNAL_SERVER_ERROR, 284 + Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 285 + ) 286 + .into_response(); 287 + } 288 + }; 289 + 290 + if block_cids.is_empty() { 291 + return ( 292 + StatusCode::OK, 293 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 294 + car_bytes, 295 + ) 296 + .into_response(); 297 + } 298 + 299 + let blocks = match state.block_store.get_many(&block_cids).await { 300 + Ok(b) => b, 301 + Err(e) => { 302 + error!("Block store error in get_repo_since: {:?}", e); 303 + return ( 304 + StatusCode::INTERNAL_SERVER_ERROR, 305 + Json(json!({"error": "InternalError", "message": "Failed to get blocks"})), 306 + ) 307 + .into_response(); 308 + } 309 + }; 310 + 311 + for (i, block_opt) in blocks.into_iter().enumerate() { 312 + if let Some(block) = block_opt { 313 + let cid = block_cids[i]; 314 + let cid_bytes = cid.to_bytes(); 315 + let total_len = cid_bytes.len() + block.len(); 316 + let mut writer = Vec::new(); 317 + crate::sync::car::write_varint(&mut writer, total_len as u64) 318 + .expect("Writing to Vec<u8> should never fail"); 319 + writer 320 + .write_all(&cid_bytes) 321 + .expect("Writing to Vec<u8> should never fail"); 322 + writer 323 + .write_all(&block) 324 + .expect("Writing to Vec<u8> should never fail"); 325 + car_bytes.extend_from_slice(&writer); 326 + } 327 + } 328 + 329 + ( 330 + StatusCode::OK, 331 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 332 + car_bytes, 333 + ) 334 + .into_response() 335 + } 336 + 192 337 fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 193 338 match value { 194 339 Ipld::Link(cid) => { ··· 224 369 use std::collections::BTreeMap; 225 370 use std::sync::Arc; 226 371 227 - let repo_row = sqlx::query!( 228 - r#" 229 - SELECT r.repo_root_cid 230 - FROM repos r 231 - JOIN users u ON u.id = r.user_id 232 - WHERE u.did = $1 233 - "#, 234 - query.did 235 - ) 236 - .fetch_optional(&state.db) 237 - .await 238 - .unwrap_or(None); 239 - let commit_cid_str = match repo_row { 240 - Some(r) => r.repo_root_cid, 372 + let account = match assert_repo_availability(&state.db, &query.did, false).await { 373 + Ok(a) => a, 374 + Err(e) => return e.into_response(), 375 + }; 376 + 377 + let commit_cid_str = match account.repo_root_cid { 378 + Some(cid) => cid, 241 379 None => { 242 380 return ( 243 - StatusCode::NOT_FOUND, 244 - Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 381 + StatusCode::BAD_REQUEST, 382 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 245 383 ) 246 384 .into_response(); 247 385 }
+120 -2
src/sync/subscribe_repos.rs
··· 1 1 use crate::state::AppState; 2 2 use crate::sync::firehose::SequencedEvent; 3 3 use crate::sync::util::{ 4 - format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events, 4 + format_error_frame, format_event_for_sending, format_event_with_prefetched_blocks, 5 + format_info_frame, prefetch_blocks_for_events, 5 6 }; 6 7 use axum::{ 7 8 extract::{Query, State, ws::Message, ws::WebSocket, ws::WebSocketUpgrade}, ··· 55 56 info!(subscribers = count, "Firehose subscriber disconnected"); 56 57 } 57 58 59 + fn get_backfill_hours() -> i64 { 60 + std::env::var("FIREHOSE_BACKFILL_HOURS") 61 + .ok() 62 + .and_then(|v| v.parse().ok()) 63 + .unwrap_or(72) 64 + } 65 + 58 66 async fn handle_socket_inner( 59 67 socket: &mut WebSocket, 60 68 state: &AppState, 61 69 params: SubscribeReposParams, 62 70 ) -> Result<(), ()> { 71 + let mut rx = state.firehose_tx.subscribe(); 72 + let mut last_seen: i64 = -1; 73 + 63 74 if let Some(cursor) = params.cursor { 75 + let current_seq = sqlx::query_scalar!("SELECT MAX(seq) FROM repo_seq") 76 + .fetch_one(&state.db) 77 + .await 78 + .ok() 79 + .flatten() 80 + .unwrap_or(0); 81 + 82 + if cursor > current_seq { 83 + if let Ok(error_bytes) = 84 + format_error_frame("FutureCursor", Some("Cursor in the future.")) 85 + { 86 + let _ = socket.send(Message::Binary(error_bytes.into())).await; 87 + } 88 + socket.close().await.ok(); 89 + return Err(()); 90 + } 91 + 92 + let backfill_time = chrono::Utc::now() - chrono::Duration::hours(get_backfill_hours()); 93 + 94 + let first_event = sqlx::query_as!( 95 + SequencedEvent, 96 + r#" 97 + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 98 + FROM repo_seq 99 + WHERE seq > $1 100 + ORDER BY seq ASC 101 + LIMIT 1 102 + "#, 103 + cursor 104 + ) 105 + .fetch_optional(&state.db) 106 + .await 107 + .ok() 108 + .flatten(); 109 + 64 110 let mut current_cursor = cursor; 111 + 112 + if let Some(ref event) = first_event 113 + && event.created_at < backfill_time 114 + { 115 + if let Ok(info_bytes) = format_info_frame( 116 + "OutdatedCursor", 117 + Some("Requested cursor exceeded limit. Possibly missing events"), 118 + ) { 119 + let _ = socket.send(Message::Binary(info_bytes.into())).await; 120 + } 121 + 122 + let earliest = sqlx::query_scalar!( 123 + "SELECT MIN(seq) FROM repo_seq WHERE created_at >= $1", 124 + backfill_time 125 + ) 126 + .fetch_one(&state.db) 127 + .await 128 + .ok() 129 + .flatten(); 130 + 131 + if let Some(earliest_seq) = earliest { 132 + current_cursor = earliest_seq - 1; 133 + } 134 + } 135 + 136 + last_seen = current_cursor; 137 + 65 138 loop { 66 139 let events = sqlx::query_as!( 67 140 SequencedEvent, ··· 93 166 }; 94 167 for event in events { 95 168 current_cursor = event.seq; 169 + last_seen = event.seq; 96 170 let bytes = 97 171 match format_event_with_prefetched_blocks(event, &prefetched).await { 98 172 Ok(b) => b, ··· 118 192 } 119 193 } 120 194 } 195 + 196 + let cutover_events = sqlx::query_as!( 197 + SequencedEvent, 198 + r#" 199 + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 200 + FROM repo_seq 201 + WHERE seq > $1 202 + ORDER BY seq ASC 203 + "#, 204 + last_seen 205 + ) 206 + .fetch_all(&state.db) 207 + .await; 208 + 209 + if let Ok(events) = cutover_events 210 + && !events.is_empty() 211 + { 212 + let prefetched = match prefetch_blocks_for_events(state, &events).await { 213 + Ok(blocks) => blocks, 214 + Err(e) => { 215 + error!("Failed to prefetch blocks for cutover: {}", e); 216 + socket.close().await.ok(); 217 + return Err(()); 218 + } 219 + }; 220 + for event in events { 221 + last_seen = event.seq; 222 + let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await { 223 + Ok(b) => b, 224 + Err(e) => { 225 + warn!("Failed to format cutover event: {}", e); 226 + return Err(()); 227 + } 228 + }; 229 + if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 230 + warn!("Failed to send cutover event: {}", e); 231 + return Err(()); 232 + } 233 + crate::metrics::record_firehose_event(); 234 + } 235 + } 121 236 } 122 - let mut rx = state.firehose_tx.subscribe(); 123 237 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG") 124 238 .ok() 125 239 .and_then(|v| v.parse().ok()) ··· 129 243 result = rx.recv() => { 130 244 match result { 131 245 Ok(event) => { 246 + if event.seq <= last_seen { 247 + continue; 248 + } 249 + last_seen = event.seq; 132 250 if let Err(e) = send_event(socket, state, event).await { 133 251 warn!("Failed to send event: {}", e); 134 252 break;
+179 -1
src/sync/util.rs
··· 1 1 use crate::state::AppState; 2 2 use crate::sync::firehose::SequencedEvent; 3 - use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 3 + use crate::sync::frame::{ 4 + AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame, 5 + InfoFrame, SyncFrame, 6 + }; 7 + use axum::Json; 8 + use axum::http::StatusCode; 9 + use axum::response::{IntoResponse, Response}; 4 10 use bytes::Bytes; 5 11 use cid::Cid; 6 12 use iroh_car::{CarHeader, CarWriter}; 7 13 use jacquard_repo::commit::Commit; 8 14 use jacquard_repo::storage::BlockStore; 15 + use serde::Serialize; 16 + use serde_json::json; 17 + use sqlx::PgPool; 9 18 use std::collections::{BTreeMap, HashMap}; 10 19 use std::io::Cursor; 11 20 use std::str::FromStr; 12 21 use tokio::io::AsyncWriteExt; 22 + 23 + #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 24 + #[serde(rename_all = "lowercase")] 25 + pub enum AccountStatus { 26 + Active, 27 + Takendown, 28 + Suspended, 29 + Deactivated, 30 + Deleted, 31 + } 32 + 33 + impl AccountStatus { 34 + pub fn as_str(&self) -> Option<&'static str> { 35 + match self { 36 + AccountStatus::Active => None, 37 + AccountStatus::Takendown => Some("takendown"), 38 + AccountStatus::Suspended => Some("suspended"), 39 + AccountStatus::Deactivated => Some("deactivated"), 40 + AccountStatus::Deleted => Some("deleted"), 41 + } 42 + } 43 + 44 + pub fn is_active(&self) -> bool { 45 + matches!(self, AccountStatus::Active) 46 + } 47 + } 48 + 49 + pub struct RepoAccount { 50 + pub did: String, 51 + pub user_id: uuid::Uuid, 52 + pub status: AccountStatus, 53 + pub repo_root_cid: Option<String>, 54 + } 55 + 56 + pub enum RepoAvailabilityError { 57 + NotFound(String), 58 + Takendown(String), 59 + Deactivated(String), 60 + Internal(String), 61 + } 62 + 63 + impl IntoResponse for RepoAvailabilityError { 64 + fn into_response(self) -> Response { 65 + match self { 66 + RepoAvailabilityError::NotFound(did) => ( 67 + StatusCode::BAD_REQUEST, 68 + Json(json!({ 69 + "error": "RepoNotFound", 70 + "message": format!("Could not find repo for DID: {}", did) 71 + })), 72 + ) 73 + .into_response(), 74 + RepoAvailabilityError::Takendown(did) => ( 75 + StatusCode::BAD_REQUEST, 76 + Json(json!({ 77 + "error": "RepoTakendown", 78 + "message": format!("Repo has been takendown: {}", did) 79 + })), 80 + ) 81 + .into_response(), 82 + RepoAvailabilityError::Deactivated(did) => ( 83 + StatusCode::BAD_REQUEST, 84 + Json(json!({ 85 + "error": "RepoDeactivated", 86 + "message": format!("Repo has been deactivated: {}", did) 87 + })), 88 + ) 89 + .into_response(), 90 + RepoAvailabilityError::Internal(msg) => ( 91 + StatusCode::INTERNAL_SERVER_ERROR, 92 + Json(json!({ 93 + "error": "InternalError", 94 + "message": msg 95 + })), 96 + ) 97 + .into_response(), 98 + } 99 + } 100 + } 101 + 102 + pub async fn get_account_with_status( 103 + db: &PgPool, 104 + did: &str, 105 + ) -> Result<Option<RepoAccount>, sqlx::Error> { 106 + let row = sqlx::query!( 107 + r#" 108 + SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid 109 + FROM users u 110 + LEFT JOIN repos r ON r.user_id = u.id 111 + WHERE u.did = $1 112 + "#, 113 + did 114 + ) 115 + .fetch_optional(db) 116 + .await?; 117 + 118 + Ok(row.map(|r| { 119 + let status = if r.takedown_ref.is_some() { 120 + AccountStatus::Takendown 121 + } else if r.deactivated_at.is_some() { 122 + AccountStatus::Deactivated 123 + } else { 124 + AccountStatus::Active 125 + }; 126 + 127 + RepoAccount { 128 + did: r.did, 129 + user_id: r.id, 130 + status, 131 + repo_root_cid: Some(r.repo_root_cid), 132 + } 133 + })) 134 + } 135 + 136 + pub async fn assert_repo_availability( 137 + db: &PgPool, 138 + did: &str, 139 + is_admin_or_self: bool, 140 + ) -> Result<RepoAccount, RepoAvailabilityError> { 141 + let account = get_account_with_status(db, did) 142 + .await 143 + .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?; 144 + 145 + let account = match account { 146 + Some(a) => a, 147 + None => return Err(RepoAvailabilityError::NotFound(did.to_string())), 148 + }; 149 + 150 + if is_admin_or_self { 151 + return Ok(account); 152 + } 153 + 154 + match account.status { 155 + AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())), 156 + AccountStatus::Deactivated => { 157 + return Err(RepoAvailabilityError::Deactivated(did.to_string())); 158 + } 159 + _ => {} 160 + } 161 + 162 + Ok(account) 163 + } 13 164 14 165 fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 166 Commit::from_cbor(commit_bytes) ··· 351 502 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 352 503 Ok(bytes) 353 504 } 505 + 506 + pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 507 + let header = FrameHeader { 508 + op: 1, 509 + t: "#info".to_string(), 510 + }; 511 + let frame = InfoFrame { 512 + name: name.to_string(), 513 + message: message.map(String::from), 514 + }; 515 + let mut bytes = Vec::new(); 516 + serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 517 + serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 518 + Ok(bytes) 519 + } 520 + 521 + pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 522 + let header = ErrorFrameHeader { op: -1 }; 523 + let frame = ErrorFrameBody { 524 + error: error.to_string(), 525 + message: message.map(String::from), 526 + }; 527 + let mut bytes = Vec::new(); 528 + serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 529 + serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 530 + Ok(bytes) 531 + }
+69
src/util.rs
··· 73 73 .ok_or(DbLookupError::NotFound) 74 74 } 75 75 76 + pub fn parse_repeated_query_param(query: Option<&str>, key: &str) -> Vec<String> { 77 + query 78 + .map(|q| { 79 + let mut values = Vec::new(); 80 + for pair in q.split('&') { 81 + if let Some((k, v)) = pair.split_once('=') 82 + && k == key 83 + && let Ok(decoded) = urlencoding::decode(v) 84 + { 85 + let decoded = decoded.into_owned(); 86 + if decoded.contains(',') { 87 + for part in decoded.split(',') { 88 + let trimmed = part.trim(); 89 + if !trimmed.is_empty() { 90 + values.push(trimmed.to_string()); 91 + } 92 + } 93 + } else if !decoded.is_empty() { 94 + values.push(decoded); 95 + } 96 + } 97 + } 98 + values 99 + }) 100 + .unwrap_or_default() 101 + } 102 + 76 103 pub fn extract_client_ip(headers: &HeaderMap) -> String { 77 104 if let Some(forwarded) = headers.get("x-forwarded-for") 78 105 && let Ok(value) = forwarded.to_str() ··· 91 118 #[cfg(test)] 92 119 mod tests { 93 120 use super::*; 121 + 122 + #[test] 123 + fn test_parse_repeated_query_param_repeated() { 124 + let query = "did=test&cids=a&cids=b&cids=c"; 125 + let result = parse_repeated_query_param(Some(query), "cids"); 126 + assert_eq!(result, vec!["a", "b", "c"]); 127 + } 128 + 129 + #[test] 130 + fn test_parse_repeated_query_param_comma_separated() { 131 + let query = "did=test&cids=a,b,c"; 132 + let result = parse_repeated_query_param(Some(query), "cids"); 133 + assert_eq!(result, vec!["a", "b", "c"]); 134 + } 135 + 136 + #[test] 137 + fn test_parse_repeated_query_param_mixed() { 138 + let query = "did=test&cids=a,b&cids=c"; 139 + let result = parse_repeated_query_param(Some(query), "cids"); 140 + assert_eq!(result, vec!["a", "b", "c"]); 141 + } 142 + 143 + #[test] 144 + fn test_parse_repeated_query_param_single() { 145 + let query = "did=test&cids=a"; 146 + let result = parse_repeated_query_param(Some(query), "cids"); 147 + assert_eq!(result, vec!["a"]); 148 + } 149 + 150 + #[test] 151 + fn test_parse_repeated_query_param_empty() { 152 + let query = "did=test"; 153 + let result = parse_repeated_query_param(Some(query), "cids"); 154 + assert!(result.is_empty()); 155 + } 156 + 157 + #[test] 158 + fn test_parse_repeated_query_param_url_encoded() { 159 + let query = "did=test&cids=bafyreib%2Btest"; 160 + let result = parse_repeated_query_param(Some(query), "cids"); 161 + assert_eq!(result, vec!["bafyreib+test"]); 162 + } 94 163 95 164 #[test] 96 165 fn test_generate_token_code() {
+194 -5
tests/firehose_validation.rs
··· 200 200 app_port() 201 201 ); 202 202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 203 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 203 204 204 205 let post_text = "Testing firehose validation!"; 205 206 let post_payload = json!({ ··· 224 225 assert_eq!(res.status(), StatusCode::OK); 225 226 226 227 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None; 227 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 228 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 228 229 loop { 229 230 let msg = ws_stream.next().await.unwrap().unwrap(); 230 231 let raw_bytes = match msg { ··· 392 393 app_port() 393 394 ); 394 395 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 396 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 395 397 396 398 let update_payload = json!({ 397 399 "repo": did, ··· 415 417 assert_eq!(res.status(), StatusCode::OK); 416 418 417 419 let mut frame_opt: Option<CommitFrame> = None; 418 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async { 420 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(20), async { 419 421 loop { 420 422 let msg = match ws_stream.next().await { 421 423 Some(Ok(m)) => m, ··· 472 474 app_port() 473 475 ); 474 476 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 477 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 475 478 476 479 let post_payload = json!({ 477 480 "repo": did, ··· 494 497 .expect("Failed to create first post"); 495 498 496 499 let mut first_frame_opt: Option<CommitFrame> = None; 497 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 500 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 498 501 loop { 499 502 let msg = ws_stream.next().await.unwrap().unwrap(); 500 503 let raw_bytes = match msg { ··· 544 547 .expect("Failed to create second post"); 545 548 546 549 let mut second_frame_opt: Option<CommitFrame> = None; 547 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 550 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 548 551 loop { 549 552 let msg = ws_stream.next().await.unwrap().unwrap(); 550 553 let raw_bytes = match msg { ··· 593 596 app_port() 594 597 ); 595 598 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 599 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 596 600 597 601 let post_payload = json!({ 598 602 "repo": did, ··· 615 619 .expect("Failed to create post"); 616 620 617 621 let mut raw_bytes_opt: Option<Vec<u8>> = None; 618 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 622 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 619 623 loop { 620 624 let msg = ws_stream.next().await.unwrap().unwrap(); 621 625 let raw = match msg { ··· 661 665 662 666 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 663 667 } 668 + 669 + #[derive(Debug, Deserialize)] 670 + struct ErrorFrameHeader { 671 + op: i64, 672 + } 673 + 674 + #[derive(Debug, Deserialize)] 675 + struct ErrorFrameBody { 676 + error: String, 677 + #[allow(dead_code)] 678 + message: Option<String>, 679 + } 680 + 681 + #[derive(Debug, Deserialize)] 682 + struct InfoFrameHeader { 683 + #[allow(dead_code)] 684 + op: i64, 685 + t: String, 686 + } 687 + 688 + #[derive(Debug, Deserialize)] 689 + struct InfoFrameBody { 690 + name: String, 691 + #[allow(dead_code)] 692 + message: Option<String>, 693 + } 694 + 695 + fn parse_error_frame(bytes: &[u8]) -> Result<(ErrorFrameHeader, ErrorFrameBody), String> { 696 + let header_len = find_cbor_map_end(bytes)?; 697 + let header: ErrorFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 698 + .map_err(|e| format!("Failed to parse error header: {:?}", e))?; 699 + 700 + if header.op != -1 { 701 + return Err(format!("Not an error frame, op: {}", header.op)); 702 + } 703 + 704 + let remaining = &bytes[header_len..]; 705 + let body: ErrorFrameBody = serde_ipld_dagcbor::from_slice(remaining) 706 + .map_err(|e| format!("Failed to parse error body: {:?}", e))?; 707 + 708 + Ok((header, body)) 709 + } 710 + 711 + fn parse_info_frame(bytes: &[u8]) -> Result<(InfoFrameHeader, InfoFrameBody), String> { 712 + let header_len = find_cbor_map_end(bytes)?; 713 + let header: InfoFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 714 + .map_err(|e| format!("Failed to parse info header: {:?}", e))?; 715 + 716 + if header.t != "#info" { 717 + return Err(format!("Not an info frame, t: {}", header.t)); 718 + } 719 + 720 + let remaining = &bytes[header_len..]; 721 + let body: InfoFrameBody = serde_ipld_dagcbor::from_slice(remaining) 722 + .map_err(|e| format!("Failed to parse info body: {:?}", e))?; 723 + 724 + Ok((header, body)) 725 + } 726 + 727 + #[tokio::test] 728 + async fn test_firehose_future_cursor_error() { 729 + let _ = base_url().await; 730 + 731 + let future_cursor = 9999999999i64; 732 + let url = format!( 733 + "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", 734 + app_port(), 735 + future_cursor 736 + ); 737 + 738 + let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 739 + 740 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 741 + loop { 742 + match ws_stream.next().await { 743 + Some(Ok(tungstenite::Message::Binary(bin))) => { 744 + if let Ok((header, body)) = parse_error_frame(&bin) { 745 + println!("Received error frame: {:?} {:?}", header, body); 746 + assert_eq!(header.op, -1, "Error frame op should be -1"); 747 + assert_eq!(body.error, "FutureCursor", "Error should be FutureCursor"); 748 + return true; 749 + } 750 + } 751 + Some(Ok(tungstenite::Message::Close(_))) => { 752 + println!("Connection closed"); 753 + return false; 754 + } 755 + None => { 756 + println!("Stream ended"); 757 + return false; 758 + } 759 + _ => continue, 760 + } 761 + } 762 + }) 763 + .await; 764 + 765 + match timeout { 766 + Ok(received_error) => { 767 + assert!( 768 + received_error, 769 + "Should have received FutureCursor error frame before connection closed" 770 + ); 771 + } 772 + Err(_) => { 773 + panic!( 774 + "Timed out waiting for FutureCursor error - connection should close quickly with error" 775 + ); 776 + } 777 + } 778 + } 779 + 780 + #[tokio::test] 781 + async fn test_firehose_outdated_cursor_info() { 782 + let client = client(); 783 + let (token, did) = create_account_and_login(&client).await; 784 + 785 + let post_payload = json!({ 786 + "repo": did, 787 + "collection": "app.bsky.feed.post", 788 + "record": { 789 + "$type": "app.bsky.feed.post", 790 + "text": "Post for outdated cursor test", 791 + "createdAt": chrono::Utc::now().to_rfc3339(), 792 + } 793 + }); 794 + let _ = client 795 + .post(format!( 796 + "{}/xrpc/com.atproto.repo.createRecord", 797 + base_url().await 798 + )) 799 + .bearer_auth(&token) 800 + .json(&post_payload) 801 + .send() 802 + .await 803 + .expect("Failed to create post"); 804 + 805 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 806 + 807 + let outdated_cursor = 1i64; 808 + let url = format!( 809 + "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", 810 + app_port(), 811 + outdated_cursor 812 + ); 813 + 814 + let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 815 + 816 + let mut found_info = false; 817 + let mut found_commit = false; 818 + 819 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async { 820 + loop { 821 + match ws_stream.next().await { 822 + Some(Ok(tungstenite::Message::Binary(bin))) => { 823 + if let Ok((header, body)) = parse_info_frame(&bin) { 824 + println!("Received info frame: {:?} {:?}", header, body); 825 + if body.name == "OutdatedCursor" { 826 + found_info = true; 827 + println!("Found OutdatedCursor info frame!"); 828 + } 829 + } else if let Ok((_, frame)) = parse_frame(&bin) { 830 + if frame.repo == did { 831 + found_commit = true; 832 + println!("Found commit for our DID"); 833 + } 834 + } 835 + if found_commit { 836 + break; 837 + } 838 + } 839 + Some(Ok(tungstenite::Message::Close(_))) => break, 840 + None => break, 841 + _ => continue, 842 + } 843 + } 844 + }) 845 + .await; 846 + 847 + assert!(timeout.is_ok(), "Timed out"); 848 + assert!( 849 + found_commit, 850 + "Should have received commits even with outdated cursor" 851 + ); 852 + }
+38
tests/helpers/mod.rs
··· 214 214 body["cid"].as_str().unwrap().to_string(), 215 215 ) 216 216 } 217 + 218 + #[allow(dead_code)] 219 + pub async fn set_account_takedown(did: &str, takedown_ref: Option<&str>) { 220 + let conn_str = get_db_connection_string().await; 221 + let pool = sqlx::postgres::PgPoolOptions::new() 222 + .max_connections(2) 223 + .connect(&conn_str) 224 + .await 225 + .expect("Failed to connect to test database"); 226 + sqlx::query!( 227 + "UPDATE users SET takedown_ref = $1 WHERE did = $2", 228 + takedown_ref, 229 + did 230 + ) 231 + .execute(&pool) 232 + .await 233 + .expect("Failed to update takedown_ref"); 234 + } 235 + 236 + #[allow(dead_code)] 237 + pub async fn set_account_deactivated(did: &str, deactivated: bool) { 238 + let conn_str = get_db_connection_string().await; 239 + let pool = sqlx::postgres::PgPoolOptions::new() 240 + .max_connections(2) 241 + .connect(&conn_str) 242 + .await 243 + .expect("Failed to connect to test database"); 244 + let deactivated_at: Option<chrono::DateTime<Utc>> = 245 + if deactivated { Some(Utc::now()) } else { None }; 246 + sqlx::query!( 247 + "UPDATE users SET deactivated_at = $1 WHERE did = $2", 248 + deactivated_at, 249 + did 250 + ) 251 + .execute(&pool) 252 + .await 253 + .expect("Failed to update deactivated_at"); 254 + }
+1 -1
tests/sync_blob.rs
··· 50 50 .send() 51 51 .await 52 52 .expect("Failed to send request"); 53 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 53 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 54 54 let body: Value = res.json().await.expect("Response was not valid JSON"); 55 55 assert_eq!(body["error"], "RepoNotFound"); 56 56 }
+443
tests/sync_conformance.rs
··· 1 + mod common; 2 + mod helpers; 3 + 4 + use common::*; 5 + use helpers::*; 6 + use reqwest::StatusCode; 7 + use serde_json::Value; 8 + 9 + #[tokio::test] 10 + async fn test_get_repo_takendown_returns_error() { 11 + let client = client(); 12 + let (_, did) = create_account_and_login(&client).await; 13 + 14 + set_account_takedown(&did, Some("test-takedown-ref")).await; 15 + 16 + let res = client 17 + .get(format!( 18 + "{}/xrpc/com.atproto.sync.getRepo", 19 + base_url().await 20 + )) 21 + .query(&[("did", did.as_str())]) 22 + .send() 23 + .await 24 + .expect("Failed to send request"); 25 + 26 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 27 + let body: Value = res.json().await.expect("Response was not valid JSON"); 28 + assert_eq!(body["error"], "RepoTakendown"); 29 + } 30 + 31 + #[tokio::test] 32 + async fn test_get_repo_deactivated_returns_error() { 33 + let client = client(); 34 + let (_, did) = create_account_and_login(&client).await; 35 + 36 + set_account_deactivated(&did, true).await; 37 + 38 + let res = client 39 + .get(format!( 40 + "{}/xrpc/com.atproto.sync.getRepo", 41 + base_url().await 42 + )) 43 + .query(&[("did", did.as_str())]) 44 + .send() 45 + .await 46 + .expect("Failed to send request"); 47 + 48 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 49 + let body: Value = res.json().await.expect("Response was not valid JSON"); 50 + assert_eq!(body["error"], "RepoDeactivated"); 51 + } 52 + 53 + #[tokio::test] 54 + async fn test_get_latest_commit_takendown_returns_error() { 55 + let client = client(); 56 + let (_, did) = create_account_and_login(&client).await; 57 + 58 + set_account_takedown(&did, Some("test-takedown-ref")).await; 59 + 60 + let res = client 61 + .get(format!( 62 + "{}/xrpc/com.atproto.sync.getLatestCommit", 63 + base_url().await 64 + )) 65 + .query(&[("did", did.as_str())]) 66 + .send() 67 + .await 68 + .expect("Failed to send request"); 69 + 70 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 71 + let body: Value = res.json().await.expect("Response was not valid JSON"); 72 + assert_eq!(body["error"], "RepoTakendown"); 73 + } 74 + 75 + #[tokio::test] 76 + async fn test_get_blocks_takendown_returns_error() { 77 + let client = client(); 78 + let (_, did) = create_account_and_login(&client).await; 79 + 80 + let commit_res = client 81 + .get(format!( 82 + "{}/xrpc/com.atproto.sync.getLatestCommit", 83 + base_url().await 84 + )) 85 + .query(&[("did", did.as_str())]) 86 + .send() 87 + .await 88 + .expect("Failed to get commit"); 89 + let commit_body: Value = commit_res.json().await.unwrap(); 90 + let cid = commit_body["cid"].as_str().unwrap(); 91 + 92 + set_account_takedown(&did, Some("test-takedown-ref")).await; 93 + 94 + let res = client 95 + .get(format!( 96 + "{}/xrpc/com.atproto.sync.getBlocks", 97 + base_url().await 98 + )) 99 + .query(&[("did", did.as_str()), ("cids", cid)]) 100 + .send() 101 + .await 102 + .expect("Failed to send request"); 103 + 104 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 105 + let body: Value = res.json().await.expect("Response was not valid JSON"); 106 + assert_eq!(body["error"], "RepoTakendown"); 107 + } 108 + 109 + #[tokio::test] 110 + async fn test_get_repo_status_shows_takendown_status() { 111 + let client = client(); 112 + let (_, did) = create_account_and_login(&client).await; 113 + 114 + set_account_takedown(&did, Some("test-takedown-ref")).await; 115 + 116 + let res = client 117 + .get(format!( 118 + "{}/xrpc/com.atproto.sync.getRepoStatus", 119 + base_url().await 120 + )) 121 + .query(&[("did", did.as_str())]) 122 + .send() 123 + .await 124 + .expect("Failed to send request"); 125 + 126 + assert_eq!(res.status(), StatusCode::OK); 127 + let body: Value = res.json().await.expect("Response was not valid JSON"); 128 + assert_eq!(body["active"], false); 129 + assert_eq!(body["status"], "takendown"); 130 + assert!(body.get("rev").is_none() || body["rev"].is_null()); 131 + } 132 + 133 + #[tokio::test] 134 + async fn test_get_repo_status_shows_deactivated_status() { 135 + let client = client(); 136 + let (_, did) = create_account_and_login(&client).await; 137 + 138 + set_account_deactivated(&did, true).await; 139 + 140 + let res = client 141 + .get(format!( 142 + "{}/xrpc/com.atproto.sync.getRepoStatus", 143 + base_url().await 144 + )) 145 + .query(&[("did", did.as_str())]) 146 + .send() 147 + .await 148 + .expect("Failed to send request"); 149 + 150 + assert_eq!(res.status(), StatusCode::OK); 151 + let body: Value = res.json().await.expect("Response was not valid JSON"); 152 + assert_eq!(body["active"], false); 153 + assert_eq!(body["status"], "deactivated"); 154 + } 155 + 156 + #[tokio::test] 157 + async fn test_list_repos_shows_status_field() { 158 + let client = client(); 159 + let (_, did) = create_account_and_login(&client).await; 160 + 161 + set_account_takedown(&did, Some("test-takedown-ref")).await; 162 + 163 + let res = client 164 + .get(format!( 165 + "{}/xrpc/com.atproto.sync.listRepos", 166 + base_url().await 167 + )) 168 + .send() 169 + .await 170 + .expect("Failed to send request"); 171 + 172 + assert_eq!(res.status(), StatusCode::OK); 173 + let body: Value = res.json().await.expect("Response was not valid JSON"); 174 + let repos = body["repos"].as_array().unwrap(); 175 + 176 + let takendown_repo = repos.iter().find(|r| r["did"] == did); 177 + assert!(takendown_repo.is_some(), "Takendown repo should be in list"); 178 + let repo = takendown_repo.unwrap(); 179 + assert_eq!(repo["active"], false); 180 + assert_eq!(repo["status"], "takendown"); 181 + } 182 + 183 + #[tokio::test] 184 + async fn test_get_blob_takendown_returns_error() { 185 + let client = client(); 186 + let (jwt, did) = create_account_and_login(&client).await; 187 + 188 + let blob_res = client 189 + .post(format!( 190 + "{}/xrpc/com.atproto.repo.uploadBlob", 191 + base_url().await 192 + )) 193 + .header("Content-Type", "image/png") 194 + .bearer_auth(&jwt) 195 + .body(vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) 196 + .send() 197 + .await 198 + .expect("Failed to upload blob"); 199 + let blob_body: Value = blob_res.json().await.unwrap(); 200 + let cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap(); 201 + 202 + set_account_takedown(&did, Some("test-takedown-ref")).await; 203 + 204 + let res = client 205 + .get(format!( 206 + "{}/xrpc/com.atproto.sync.getBlob", 207 + base_url().await 208 + )) 209 + .query(&[("did", did.as_str()), ("cid", cid)]) 210 + .send() 211 + .await 212 + .expect("Failed to send request"); 213 + 214 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 215 + let body: Value = res.json().await.expect("Response was not valid JSON"); 216 + assert_eq!(body["error"], "RepoTakendown"); 217 + } 218 + 219 + #[tokio::test] 220 + async fn test_get_blob_has_security_headers() { 221 + let client = client(); 222 + let (jwt, did) = create_account_and_login(&client).await; 223 + 224 + let blob_res = client 225 + .post(format!( 226 + "{}/xrpc/com.atproto.repo.uploadBlob", 227 + base_url().await 228 + )) 229 + .header("Content-Type", "image/png") 230 + .bearer_auth(&jwt) 231 + .body(vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) 232 + .send() 233 + .await 234 + .expect("Failed to upload blob"); 235 + let blob_body: Value = blob_res.json().await.unwrap(); 236 + let cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap(); 237 + 238 + let res = client 239 + .get(format!( 240 + "{}/xrpc/com.atproto.sync.getBlob", 241 + base_url().await 242 + )) 243 + .query(&[("did", did.as_str()), ("cid", cid)]) 244 + .send() 245 + .await 246 + .expect("Failed to send request"); 247 + 248 + assert_eq!(res.status(), StatusCode::OK); 249 + 250 + let headers = res.headers(); 251 + assert_eq!( 252 + headers 253 + .get("x-content-type-options") 254 + .map(|v| v.to_str().unwrap()), 255 + Some("nosniff"), 256 + "Missing x-content-type-options: nosniff header" 257 + ); 258 + assert_eq!( 259 + headers 260 + .get("content-security-policy") 261 + .map(|v| v.to_str().unwrap()), 262 + Some("default-src 'none'; sandbox"), 263 + "Missing content-security-policy header" 264 + ); 265 + assert!( 266 + headers.get("content-length").is_some(), 267 + "Missing content-length header" 268 + ); 269 + } 270 + 271 + #[tokio::test] 272 + async fn test_get_blocks_missing_cids_returns_error() { 273 + let client = client(); 274 + let (_, did) = create_account_and_login(&client).await; 275 + 276 + let fake_cid = "bafyreif2pall7dybz7vecqka3zo24irdwabwdi4wc55jznaq75q7eaavvu"; 277 + 278 + let res = client 279 + .get(format!( 280 + "{}/xrpc/com.atproto.sync.getBlocks", 281 + base_url().await 282 + )) 283 + .query(&[("did", did.as_str()), ("cids", fake_cid)]) 284 + .send() 285 + .await 286 + .expect("Failed to send request"); 287 + 288 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 289 + let body: Value = res.json().await.expect("Response was not valid JSON"); 290 + assert_eq!(body["error"], "InvalidRequest"); 291 + assert!( 292 + body["message"] 293 + .as_str() 294 + .unwrap() 295 + .contains("Could not find blocks"), 296 + "Error message should mention missing blocks" 297 + ); 298 + } 299 + 300 + #[tokio::test] 301 + async fn test_get_blocks_accepts_array_format() { 302 + let client = client(); 303 + let (_, did) = create_account_and_login(&client).await; 304 + 305 + let commit_res = client 306 + .get(format!( 307 + "{}/xrpc/com.atproto.sync.getLatestCommit", 308 + base_url().await 309 + )) 310 + .query(&[("did", did.as_str())]) 311 + .send() 312 + .await 313 + .expect("Failed to get commit"); 314 + let commit_body: Value = commit_res.json().await.unwrap(); 315 + let cid = commit_body["cid"].as_str().unwrap(); 316 + 317 + let url = format!( 318 + "{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}&cids={}", 319 + base_url().await, 320 + did, 321 + cid, 322 + cid 323 + ); 324 + let res = client 325 + .get(&url) 326 + .send() 327 + .await 328 + .expect("Failed to send request"); 329 + 330 + assert_eq!(res.status(), StatusCode::OK); 331 + let content_type = res.headers().get("content-type").unwrap().to_str().unwrap(); 332 + assert!( 333 + content_type.contains("application/vnd.ipld.car"), 334 + "Response should be a CAR file" 335 + ); 336 + } 337 + 338 + #[tokio::test] 339 + async fn test_get_repo_since_returns_partial() { 340 + let client = client(); 341 + let (jwt, did) = create_account_and_login(&client).await; 342 + 343 + let initial_commit_res = client 344 + .get(format!( 345 + "{}/xrpc/com.atproto.sync.getLatestCommit", 346 + base_url().await 347 + )) 348 + .query(&[("did", did.as_str())]) 349 + .send() 350 + .await 351 + .expect("Failed to get initial commit"); 352 + let initial_body: Value = initial_commit_res.json().await.unwrap(); 353 + let initial_rev = initial_body["rev"].as_str().unwrap(); 354 + 355 + let full_repo_res = client 356 + .get(format!( 357 + "{}/xrpc/com.atproto.sync.getRepo", 358 + base_url().await 359 + )) 360 + .query(&[("did", did.as_str())]) 361 + .send() 362 + .await 363 + .expect("Failed to get full repo"); 364 + assert_eq!(full_repo_res.status(), StatusCode::OK); 365 + let full_repo_bytes = full_repo_res.bytes().await.unwrap(); 366 + let full_repo_size = full_repo_bytes.len(); 367 + 368 + create_post(&client, &did, &jwt, "Test post for since param").await; 369 + 370 + let partial_repo_res = client 371 + .get(format!( 372 + "{}/xrpc/com.atproto.sync.getRepo", 373 + base_url().await 374 + )) 375 + .query(&[("did", did.as_str()), ("since", initial_rev)]) 376 + .send() 377 + .await 378 + .expect("Failed to get partial repo"); 379 + assert_eq!(partial_repo_res.status(), StatusCode::OK); 380 + let partial_repo_bytes = partial_repo_res.bytes().await.unwrap(); 381 + let partial_repo_size = partial_repo_bytes.len(); 382 + 383 + assert!( 384 + partial_repo_size < full_repo_size, 385 + "Partial export (since={}) should be smaller than full export: {} vs {}", 386 + initial_rev, 387 + partial_repo_size, 388 + full_repo_size 389 + ); 390 + } 391 + 392 + #[tokio::test] 393 + async fn test_list_blobs_takendown_returns_error() { 394 + let client = client(); 395 + let (_, did) = create_account_and_login(&client).await; 396 + 397 + set_account_takedown(&did, Some("test-takedown-ref")).await; 398 + 399 + let res = client 400 + .get(format!( 401 + "{}/xrpc/com.atproto.sync.listBlobs", 402 + base_url().await 403 + )) 404 + .query(&[("did", did.as_str())]) 405 + .send() 406 + .await 407 + .expect("Failed to send request"); 408 + 409 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 410 + let body: Value = res.json().await.expect("Response was not valid JSON"); 411 + assert_eq!(body["error"], "RepoTakendown"); 412 + } 413 + 414 + #[tokio::test] 415 + async fn test_get_record_takendown_returns_error() { 416 + let client = client(); 417 + let (jwt, did) = create_account_and_login(&client).await; 418 + 419 + let (uri, _cid) = create_post(&client, &did, &jwt, "Test post").await; 420 + let parts: Vec<&str> = uri.split('/').collect(); 421 + let collection = parts[parts.len() - 2]; 422 + let rkey = parts[parts.len() - 1]; 423 + 424 + set_account_takedown(&did, Some("test-takedown-ref")).await; 425 + 426 + let res = client 427 + .get(format!( 428 + "{}/xrpc/com.atproto.sync.getRecord", 429 + base_url().await 430 + )) 431 + .query(&[ 432 + ("did", did.as_str()), 433 + ("collection", collection), 434 + ("rkey", rkey), 435 + ]) 436 + .send() 437 + .await 438 + .expect("Failed to send request"); 439 + 440 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 441 + let body: Value = res.json().await.expect("Response was not valid JSON"); 442 + assert_eq!(body["error"], "RepoTakendown"); 443 + }
+1 -1
tests/sync_deprecated.rs
··· 138 138 "CAR file should have at least header length" 139 139 ); 140 140 for i in 0..4 { 141 - tokio::time::sleep(std::time::Duration::from_millis(50)).await; 141 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 142 142 create_post(&client, &did, &jwt, &format!("Checkout post {}", i)).await; 143 143 } 144 144 let multi_res = client
+40 -27
tests/sync_repo.rs
··· 39 39 .send() 40 40 .await 41 41 .expect("Failed to send request"); 42 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 42 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 43 43 let body: Value = res.json().await.expect("Response was not valid JSON"); 44 44 assert_eq!(body["error"], "RepoNotFound"); 45 45 } ··· 106 106 #[tokio::test] 107 107 async fn test_list_repos_pagination() { 108 108 let client = client(); 109 - let _ = create_account_and_login(&client).await; 110 - let _ = create_account_and_login(&client).await; 111 - let _ = create_account_and_login(&client).await; 112 - let params = [("limit", "1")]; 113 - let res = client 114 - .get(format!( 115 - "{}/xrpc/com.atproto.sync.listRepos", 116 - base_url().await 117 - )) 118 - .query(&params) 119 - .send() 120 - .await 121 - .expect("Failed to send request"); 122 - assert_eq!(res.status(), StatusCode::OK); 123 - let body: Value = res.json().await.expect("Response was not valid JSON"); 124 - let repos = body["repos"].as_array().unwrap(); 125 - assert_eq!(repos.len(), 1); 126 - if let Some(cursor) = body["cursor"].as_str() { 127 - let params = [("limit", "1"), ("cursor", cursor)]; 109 + let (_, did1) = create_account_and_login(&client).await; 110 + let (_, did2) = create_account_and_login(&client).await; 111 + let (_, did3) = create_account_and_login(&client).await; 112 + let our_dids: std::collections::HashSet<String> = [did1, did2, did3].into_iter().collect(); 113 + let mut all_dids_seen: std::collections::HashSet<String> = std::collections::HashSet::new(); 114 + let mut cursor: Option<String> = None; 115 + let mut page_count = 0; 116 + let max_pages = 100; 117 + loop { 118 + let mut params: Vec<(&str, String)> = vec![("limit".into(), "10".into())]; 119 + if let Some(ref c) = cursor { 120 + params.push(("cursor", c.clone())); 121 + } 128 122 let res = client 129 123 .get(format!( 130 124 "{}/xrpc/com.atproto.sync.listRepos", ··· 136 130 .expect("Failed to send request"); 137 131 assert_eq!(res.status(), StatusCode::OK); 138 132 let body: Value = res.json().await.expect("Response was not valid JSON"); 139 - let repos2 = body["repos"].as_array().unwrap(); 140 - assert_eq!(repos2.len(), 1); 141 - assert_ne!(repos[0]["did"], repos2[0]["did"]); 133 + let repos = body["repos"].as_array().unwrap(); 134 + for repo in repos { 135 + let did = repo["did"].as_str().unwrap().to_string(); 136 + assert!( 137 + !all_dids_seen.contains(&did), 138 + "Pagination returned duplicate DID: {}", 139 + did 140 + ); 141 + all_dids_seen.insert(did); 142 + } 143 + cursor = body["cursor"].as_str().map(String::from); 144 + page_count += 1; 145 + if cursor.is_none() || page_count >= max_pages { 146 + break; 147 + } 148 + } 149 + for did in &our_dids { 150 + assert!( 151 + all_dids_seen.contains(did), 152 + "Our created DID {} was not found in paginated results", 153 + did 154 + ); 142 155 } 143 156 } 144 157 ··· 176 189 .send() 177 190 .await 178 191 .expect("Failed to send request"); 179 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 192 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 180 193 let body: Value = res.json().await.expect("Response was not valid JSON"); 181 194 assert_eq!(body["error"], "RepoNotFound"); 182 195 } ··· 270 283 .send() 271 284 .await 272 285 .expect("Failed to send request"); 273 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 286 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 274 287 let body: Value = res.json().await.expect("Response was not valid JSON"); 275 288 assert_eq!(body["error"], "RepoNotFound"); 276 289 } ··· 397 410 .send() 398 411 .await 399 412 .expect("Failed to send request"); 400 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 413 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 401 414 } 402 415 403 416 #[tokio::test] ··· 536 549 .expect("Failed to create profile"); 537 550 assert_eq!(profile_res.status(), StatusCode::OK); 538 551 for i in 0..3 { 539 - tokio::time::sleep(std::time::Duration::from_millis(50)).await; 552 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 540 553 create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await; 541 554 } 542 555 let blob_data = b"blob data for sync export test";