Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
237
fork

Configure Feed

Select the types of activity you want to include in your feed.

Moderation conf. vs ref

+431 -9
+9
.env.example
··· 119 119 # How often to check for scheduled account deletions (default: 3600 = 1 hour) 120 120 # SCHEDULED_DELETE_CHECK_INTERVAL_SECS=3600 121 121 # ============================================================================= 122 + # Moderation / Report Service 123 + # ============================================================================= 124 + # If configured, moderation reports will be proxied to this service 125 + # instead of being stored locally. The service should implement the 126 + # com.atproto.moderation.createReport endpoint (e.g., Bluesky's Ozone). 127 + # Both URL and DID must be set for proxying to be enabled. 128 + # REPORT_SERVICE_URL=https://mod.bsky.app 129 + # REPORT_SERVICE_DID=did:plc:ar7c4by46qjdydhdevvrndac 130 + # ============================================================================= 122 131 # Miscellaneous 123 132 # ============================================================================= 124 133 # Allow HTTP for proxy requests (development only)
-2
src/api/identity/did.rs
··· 447 447 } 448 448 449 449 #[derive(serde::Serialize)] 450 - #[serde(rename_all = "camelCase")] 451 450 pub struct Services { 452 - #[serde(rename = "atproto_pds")] 453 451 pub atproto_pds: AtprotoPds, 454 452 } 455 453
+194 -5
src/api/moderation/mod.rs
··· 1 1 use crate::api::ApiError; 2 + use crate::api::proxy_client::{is_ssrf_safe, proxy_client}; 2 3 use crate::state::AppState; 3 4 use axum::{ 4 5 Json, ··· 8 9 }; 9 10 use serde::{Deserialize, Serialize}; 10 11 use serde_json::{Value, json}; 11 - use tracing::error; 12 + use tracing::{error, info}; 12 13 13 14 #[derive(Deserialize)] 14 15 #[serde(rename_all = "camelCase")] ··· 29 30 pub created_at: String, 30 31 } 31 32 33 + fn get_report_service_config() -> Option<(String, String)> { 34 + let url = std::env::var("REPORT_SERVICE_URL").ok()?; 35 + let did = std::env::var("REPORT_SERVICE_DID").ok()?; 36 + if url.is_empty() || did.is_empty() { 37 + return None; 38 + } 39 + Some((url, did)) 40 + } 41 + 32 42 pub async fn create_report( 33 43 State(state): State<AppState>, 34 44 headers: axum::http::HeaderMap, ··· 40 50 Some(t) => t, 41 51 None => return ApiError::AuthenticationRequired.into_response(), 42 52 }; 43 - let did = match crate::auth::validate_bearer_token(&state.db, &token).await { 44 - Ok(user) => user.did, 53 + 54 + let auth_user = match crate::auth::validate_bearer_token_allow_takendown(&state.db, &token).await 55 + { 56 + Ok(user) => user, 45 57 Err(e) => return ApiError::from(e).into_response(), 46 58 }; 59 + 60 + let did = &auth_user.did; 61 + 62 + if let Some((service_url, service_did)) = get_report_service_config() { 63 + return proxy_to_report_service( 64 + &state, 65 + &auth_user, 66 + &service_url, 67 + &service_did, 68 + &input, 69 + ) 70 + .await; 71 + } 72 + 73 + create_report_locally(&state, did, auth_user.is_takendown, input).await 74 + } 75 + 76 + async fn proxy_to_report_service( 77 + state: &AppState, 78 + auth_user: &crate::auth::AuthenticatedUser, 79 + service_url: &str, 80 + service_did: &str, 81 + input: &CreateReportInput, 82 + ) -> Response { 83 + if let Err(e) = is_ssrf_safe(service_url) { 84 + error!("Report service URL failed SSRF check: {:?}", e); 85 + return ( 86 + StatusCode::INTERNAL_SERVER_ERROR, 87 + Json(json!({"error": "InternalError", "message": "Invalid report service configuration"})), 88 + ) 89 + .into_response(); 90 + } 91 + 92 + let key_bytes = match &auth_user.key_bytes { 93 + Some(kb) => kb.clone(), 94 + None => { 95 + match sqlx::query_as::<_, (Vec<u8>, Option<i32>)>( 96 + "SELECT k.key_bytes, k.encryption_version 97 + FROM users u 98 + JOIN user_keys k ON u.id = k.user_id 99 + WHERE u.did = $1", 100 + ) 101 + .bind(&auth_user.did) 102 + .fetch_optional(&state.db) 103 + .await 104 + { 105 + Ok(Some((key_bytes_enc, encryption_version))) => { 106 + match crate::config::decrypt_key(&key_bytes_enc, encryption_version) { 107 + Ok(key) => key, 108 + Err(e) => { 109 + error!(error = ?e, "Failed to decrypt user key for report service auth"); 110 + return ApiError::AuthenticationFailedMsg( 111 + "Failed to get signing key".into(), 112 + ) 113 + .into_response(); 114 + } 115 + } 116 + } 117 + Ok(None) => { 118 + return ApiError::AuthenticationFailedMsg("User has no signing key".into()) 119 + .into_response(); 120 + } 121 + Err(e) => { 122 + error!(error = ?e, "DB error fetching user key for report"); 123 + return ApiError::AuthenticationFailedMsg("Failed to get signing key".into()) 124 + .into_response(); 125 + } 126 + } 127 + } 128 + }; 129 + 130 + let service_token = match crate::auth::create_service_token( 131 + &auth_user.did, 132 + service_did, 133 + "com.atproto.moderation.createReport", 134 + &key_bytes, 135 + ) { 136 + Ok(t) => t, 137 + Err(e) => { 138 + error!("Failed to create service token for report: {:?}", e); 139 + return ( 140 + StatusCode::INTERNAL_SERVER_ERROR, 141 + Json(json!({"error": "InternalError"})), 142 + ) 143 + .into_response(); 144 + } 145 + }; 146 + 147 + let target_url = format!("{}/xrpc/com.atproto.moderation.createReport", service_url); 148 + info!( 149 + did = %auth_user.did, 150 + service_did = %service_did, 151 + "Proxying createReport to report service" 152 + ); 153 + 154 + let request_body = json!({ 155 + "reasonType": input.reason_type, 156 + "reason": input.reason, 157 + "subject": input.subject 158 + }); 159 + 160 + let client = proxy_client(); 161 + let result = client 162 + .post(&target_url) 163 + .header("Authorization", format!("Bearer {}", service_token)) 164 + .header("Content-Type", "application/json") 165 + .json(&request_body) 166 + .send() 167 + .await; 168 + 169 + match result { 170 + Ok(resp) => { 171 + let status = resp.status(); 172 + let headers = resp.headers().clone(); 173 + 174 + let body = match resp.bytes().await { 175 + Ok(b) => b, 176 + Err(e) => { 177 + error!("Error reading report service response: {:?}", e); 178 + return (StatusCode::BAD_GATEWAY, "Error reading upstream response") 179 + .into_response(); 180 + } 181 + }; 182 + 183 + let mut response_builder = Response::builder().status(status); 184 + 185 + if let Some(ct) = headers.get("content-type") { 186 + response_builder = response_builder.header("content-type", ct); 187 + } 188 + 189 + match response_builder.body(axum::body::Body::from(body)) { 190 + Ok(r) => r, 191 + Err(e) => { 192 + error!("Error building proxy response: {:?}", e); 193 + (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error").into_response() 194 + } 195 + } 196 + } 197 + Err(e) => { 198 + error!("Error sending report to service: {:?}", e); 199 + if e.is_timeout() { 200 + (StatusCode::GATEWAY_TIMEOUT, "Report service timeout").into_response() 201 + } else { 202 + (StatusCode::BAD_GATEWAY, "Report service error").into_response() 203 + } 204 + } 205 + } 206 + } 207 + 208 + async fn create_report_locally( 209 + state: &AppState, 210 + did: &str, 211 + is_takendown: bool, 212 + input: CreateReportInput, 213 + ) -> Response { 214 + const REASON_APPEAL: &str = "com.atproto.moderation.defs#reasonAppeal"; 215 + 216 + if is_takendown && input.reason_type != REASON_APPEAL { 217 + return ( 218 + StatusCode::BAD_REQUEST, 219 + Json(json!({"error": "InvalidRequest", "message": "Report not accepted from takendown account"})), 220 + ) 221 + .into_response(); 222 + } 223 + 47 224 let valid_reason_types = [ 48 225 "com.atproto.moderation.defs#reasonSpam", 49 226 "com.atproto.moderation.defs#reasonViolation", ··· 51 228 "com.atproto.moderation.defs#reasonSexual", 52 229 "com.atproto.moderation.defs#reasonRude", 53 230 "com.atproto.moderation.defs#reasonOther", 54 - "com.atproto.moderation.defs#reasonAppeal", 231 + REASON_APPEAL, 55 232 ]; 233 + 56 234 if !valid_reason_types.contains(&input.reason_type.as_str()) { 57 235 return ( 58 236 StatusCode::BAD_REQUEST, ··· 60 238 ) 61 239 .into_response(); 62 240 } 241 + 63 242 let created_at = chrono::Utc::now(); 64 243 let report_id = created_at.timestamp_millis(); 65 244 let subject_json = json!(input.subject); 245 + 66 246 let insert = sqlx::query!( 67 247 "INSERT INTO reports (id, reason_type, reason, subject_json, reported_by_did, created_at) VALUES ($1, $2, $3, $4, $5, $6)", 68 248 report_id, ··· 74 254 ) 75 255 .execute(&state.db) 76 256 .await; 257 + 77 258 if let Err(e) = insert { 78 259 error!("Failed to insert report: {:?}", e); 79 260 return ( ··· 82 263 ) 83 264 .into_response(); 84 265 } 266 + 267 + info!( 268 + report_id = %report_id, 269 + reported_by = %did, 270 + reason_type = %input.reason_type, 271 + "Report created locally (no report service configured)" 272 + ); 273 + 85 274 ( 86 275 StatusCode::OK, 87 276 Json(CreateReportOutput { ··· 89 278 reason_type: input.reason_type, 90 279 reason: input.reason, 91 280 subject: input.subject, 92 - reported_by: did, 281 + reported_by: did.to_string(), 93 282 created_at: created_at.to_rfc3339(), 94 283 }), 95 284 )
+1
src/api/server/service_auth.rs
··· 95 95 did: result.did, 96 96 is_oauth: true, 97 97 is_admin: false, 98 + is_takendown: false, 98 99 scope: result.scope, 99 100 key_bytes: None, 100 101 controller_did: None,
+15 -2
src/auth/mod.rs
··· 62 62 pub key_bytes: Option<Vec<u8>>, 63 63 pub is_oauth: bool, 64 64 pub is_admin: bool, 65 + pub is_takendown: bool, 65 66 pub scope: Option<String>, 66 67 pub controller_did: Option<String>, 67 68 } ··· 115 116 token: &str, 116 117 ) -> Result<AuthenticatedUser, TokenValidationError> { 117 118 validate_bearer_token_with_options_internal(db, None, token, true, true).await 119 + } 120 + 121 + pub async fn validate_bearer_token_allow_takendown( 122 + db: &PgPool, 123 + token: &str, 124 + ) -> Result<AuthenticatedUser, TokenValidationError> { 125 + validate_bearer_token_with_options_internal(db, None, token, false, true).await 118 126 } 119 127 120 128 async fn validate_bearer_token_with_options_internal( ··· 254 262 key_bytes: Some(decrypted_key), 255 263 is_oauth: false, 256 264 is_admin, 265 + is_takendown: takedown_ref.is_some(), 257 266 scope: token_data.claims.scope.clone(), 258 267 controller_did, 259 268 }); ··· 286 295 return Err(TokenValidationError::AccountDeactivated); 287 296 } 288 297 289 - if oauth_token.takedown_ref.is_some() { 298 + let is_takendown = oauth_token.takedown_ref.is_some(); 299 + if !allow_takendown && is_takendown { 290 300 return Err(TokenValidationError::AccountTakedown); 291 301 } 292 302 ··· 304 314 key_bytes, 305 315 is_oauth: true, 306 316 is_admin: oauth_token.is_admin, 317 + is_takendown, 307 318 scope: oauth_info.scope, 308 319 controller_did: oauth_info.controller_did, 309 320 }); ··· 364 375 if !allow_deactivated && user_info.deactivated_at.is_some() { 365 376 return Err(TokenValidationError::AccountDeactivated); 366 377 } 367 - if user_info.takedown_ref.is_some() { 378 + let is_takendown = user_info.takedown_ref.is_some(); 379 + if is_takendown { 368 380 return Err(TokenValidationError::AccountTakedown); 369 381 } 370 382 let key_bytes = if let (Some(kb), Some(ev)) = ··· 379 391 key_bytes, 380 392 is_oauth: true, 381 393 is_admin: user_info.is_admin, 394 + is_takendown, 382 395 scope: result.scope, 383 396 controller_did: None, 384 397 })
+212
tests/moderation.rs
··· 59 59 .expect("Failed to create account report"); 60 60 assert_eq!(account_report_res.status(), StatusCode::OK); 61 61 } 62 + 63 + #[tokio::test] 64 + async fn test_moderation_report_invalid_reason_type() { 65 + let client = client(); 66 + let (alice_did, alice_jwt) = setup_new_user("alice-invalid-reason").await; 67 + let report_payload = json!({ 68 + "reasonType": "invalid.reason.type", 69 + "reason": "Testing invalid reason", 70 + "subject": { 71 + "$type": "com.atproto.admin.defs#repoRef", 72 + "did": alice_did 73 + } 74 + }); 75 + let res = client 76 + .post(format!( 77 + "{}/xrpc/com.atproto.moderation.createReport", 78 + base_url().await 79 + )) 80 + .bearer_auth(&alice_jwt) 81 + .json(&report_payload) 82 + .send() 83 + .await 84 + .expect("Failed to send request"); 85 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 86 + let body: Value = res.json().await.unwrap(); 87 + assert_eq!(body["error"], "InvalidRequest"); 88 + assert!(body["message"] 89 + .as_str() 90 + .unwrap() 91 + .contains("reasonType")); 92 + } 93 + 94 + #[tokio::test] 95 + async fn test_moderation_report_unauthenticated() { 96 + let client = client(); 97 + let report_payload = json!({ 98 + "reasonType": "com.atproto.moderation.defs#reasonSpam", 99 + "reason": "Spam report", 100 + "subject": { 101 + "$type": "com.atproto.admin.defs#repoRef", 102 + "did": "did:plc:test" 103 + } 104 + }); 105 + let res = client 106 + .post(format!( 107 + "{}/xrpc/com.atproto.moderation.createReport", 108 + base_url().await 109 + )) 110 + .json(&report_payload) 111 + .send() 112 + .await 113 + .expect("Failed to send request"); 114 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 115 + } 116 + 117 + #[tokio::test] 118 + async fn test_moderation_report_all_reason_types() { 119 + let client = client(); 120 + let (alice_did, alice_jwt) = setup_new_user("alice-all-reasons").await; 121 + let (bob_did, _) = setup_new_user("bob-all-reasons").await; 122 + let reason_types = [ 123 + "com.atproto.moderation.defs#reasonSpam", 124 + "com.atproto.moderation.defs#reasonViolation", 125 + "com.atproto.moderation.defs#reasonMisleading", 126 + "com.atproto.moderation.defs#reasonSexual", 127 + "com.atproto.moderation.defs#reasonRude", 128 + "com.atproto.moderation.defs#reasonOther", 129 + "com.atproto.moderation.defs#reasonAppeal", 130 + ]; 131 + for reason_type in reason_types { 132 + let report_payload = json!({ 133 + "reasonType": reason_type, 134 + "subject": { 135 + "$type": "com.atproto.admin.defs#repoRef", 136 + "did": bob_did 137 + } 138 + }); 139 + let res = client 140 + .post(format!( 141 + "{}/xrpc/com.atproto.moderation.createReport", 142 + base_url().await 143 + )) 144 + .bearer_auth(&alice_jwt) 145 + .json(&report_payload) 146 + .send() 147 + .await 148 + .expect("Failed to send request"); 149 + assert_eq!( 150 + res.status(), 151 + StatusCode::OK, 152 + "Failed for reason type: {}", 153 + reason_type 154 + ); 155 + let body: Value = res.json().await.unwrap(); 156 + assert_eq!(body["reasonType"], reason_type); 157 + assert_eq!(body["reportedBy"], alice_did); 158 + } 159 + } 160 + 161 + #[tokio::test] 162 + async fn test_moderation_report_takendown_user_can_appeal() { 163 + let client = client(); 164 + let (admin_jwt, _) = create_admin_account_and_login(&client).await; 165 + let (target_jwt, target_did) = create_account_and_login(&client).await; 166 + let takedown_payload = json!({ 167 + "subject": { 168 + "$type": "com.atproto.admin.defs#repoRef", 169 + "did": target_did 170 + }, 171 + "takedown": { 172 + "applied": true, 173 + "ref": "mod-action-test" 174 + } 175 + }); 176 + let takedown_res = client 177 + .post(format!( 178 + "{}/xrpc/com.atproto.admin.updateSubjectStatus", 179 + base_url().await 180 + )) 181 + .bearer_auth(&admin_jwt) 182 + .json(&takedown_payload) 183 + .send() 184 + .await 185 + .expect("Failed to takedown"); 186 + assert_eq!(takedown_res.status(), StatusCode::OK); 187 + let appeal_payload = json!({ 188 + "reasonType": "com.atproto.moderation.defs#reasonAppeal", 189 + "reason": "I believe this takedown was a mistake", 190 + "subject": { 191 + "$type": "com.atproto.admin.defs#repoRef", 192 + "did": target_did 193 + } 194 + }); 195 + let appeal_res = client 196 + .post(format!( 197 + "{}/xrpc/com.atproto.moderation.createReport", 198 + base_url().await 199 + )) 200 + .bearer_auth(&target_jwt) 201 + .json(&appeal_payload) 202 + .send() 203 + .await 204 + .expect("Failed to send appeal"); 205 + assert_eq!( 206 + appeal_res.status(), 207 + StatusCode::OK, 208 + "Takendown user should be able to file appeal reports" 209 + ); 210 + let appeal_body: Value = appeal_res.json().await.unwrap(); 211 + assert_eq!( 212 + appeal_body["reasonType"], 213 + "com.atproto.moderation.defs#reasonAppeal" 214 + ); 215 + assert_eq!(appeal_body["reportedBy"], target_did); 216 + } 217 + 218 + #[tokio::test] 219 + async fn test_moderation_report_takendown_user_cannot_file_non_appeal() { 220 + let client = client(); 221 + let (admin_jwt, _) = create_admin_account_and_login(&client).await; 222 + let (target_jwt, target_did) = create_account_and_login(&client).await; 223 + let takedown_payload = json!({ 224 + "subject": { 225 + "$type": "com.atproto.admin.defs#repoRef", 226 + "did": target_did 227 + }, 228 + "takedown": { 229 + "applied": true, 230 + "ref": "mod-action-test-non-appeal" 231 + } 232 + }); 233 + let takedown_res = client 234 + .post(format!( 235 + "{}/xrpc/com.atproto.admin.updateSubjectStatus", 236 + base_url().await 237 + )) 238 + .bearer_auth(&admin_jwt) 239 + .json(&takedown_payload) 240 + .send() 241 + .await 242 + .expect("Failed to takedown"); 243 + assert_eq!(takedown_res.status(), StatusCode::OK); 244 + let report_payload = json!({ 245 + "reasonType": "com.atproto.moderation.defs#reasonSpam", 246 + "reason": "Trying to report spam", 247 + "subject": { 248 + "$type": "com.atproto.admin.defs#repoRef", 249 + "did": "did:plc:test" 250 + } 251 + }); 252 + let report_res = client 253 + .post(format!( 254 + "{}/xrpc/com.atproto.moderation.createReport", 255 + base_url().await 256 + )) 257 + .bearer_auth(&target_jwt) 258 + .json(&report_payload) 259 + .send() 260 + .await 261 + .expect("Failed to send report"); 262 + assert_eq!( 263 + report_res.status(), 264 + StatusCode::BAD_REQUEST, 265 + "Takendown user should not be able to file non-appeal reports" 266 + ); 267 + let body: Value = report_res.json().await.unwrap(); 268 + assert_eq!(body["error"], "InvalidRequest"); 269 + assert!(body["message"] 270 + .as_str() 271 + .unwrap() 272 + .contains("takendown")); 273 + }