···2222 Json(input): Json<DeleteAccountInput>,
2323) -> Response {
2424 let did = &input.did;
2525- let user = sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did.as_str())
2626- .fetch_optional(&state.db)
2727- .await;
2828- let (user_id, handle) = match user {
2525+ let (user_id, handle) = match state.user_repo.get_id_and_handle_by_did(did).await {
2926 Ok(Some(row)) => (row.id, row.handle),
3027 Ok(None) => {
3128 return ApiError::AccountNotFound.into_response();
···3532 return ApiError::InternalError(None).into_response();
3633 }
3734 };
3838- let mut tx = match state.db.begin().await {
3939- Ok(tx) => tx,
4040- Err(e) => {
4141- error!("Failed to begin transaction for account deletion: {:?}", e);
4242- return ApiError::InternalError(None).into_response();
4343- }
4444- };
4545- if let Err(e) = sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did.as_str())
4646- .execute(&mut *tx)
4747- .await
4848- {
4949- error!("Failed to delete session tokens for {}: {:?}", did, e);
5050- return ApiError::InternalError(Some("Failed to delete session tokens".into()))
5151- .into_response();
5252- }
5353- if let Err(e) = sqlx::query!("DELETE FROM used_refresh_tokens WHERE session_id IN (SELECT id FROM session_tokens WHERE did = $1)", did.as_str())
5454- .execute(&mut *tx)
5555- .await
5656- {
5757- error!("Failed to delete used refresh tokens for {}: {:?}", did, e);
5858- }
5959- if let Err(e) = sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
6060- .execute(&mut *tx)
6161- .await
6262- {
6363- error!("Failed to delete records for user {}: {:?}", user_id, e);
6464- return ApiError::InternalError(Some("Failed to delete records".into())).into_response();
6565- }
6666- if let Err(e) = sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
6767- .execute(&mut *tx)
6868- .await
6969- {
7070- error!("Failed to delete repos for user {}: {:?}", user_id, e);
7171- return ApiError::InternalError(Some("Failed to delete repos".into())).into_response();
7272- }
7373- if let Err(e) = sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
7474- .execute(&mut *tx)
7575- .await
7676- {
7777- error!("Failed to delete blobs for user {}: {:?}", user_id, e);
7878- return ApiError::InternalError(Some("Failed to delete blobs".into())).into_response();
7979- }
8080- if let Err(e) = sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
8181- .execute(&mut *tx)
8282- .await
8383- {
8484- error!(
8585- "Failed to delete app passwords for user {}: {:?}",
8686- user_id, e
8787- );
8888- return ApiError::InternalError(Some("Failed to delete app passwords".into()))
8989- .into_response();
9090- }
9191- if let Err(e) = sqlx::query!(
9292- "DELETE FROM invite_code_uses WHERE used_by_user = $1",
9393- user_id
9494- )
9595- .execute(&mut *tx)
9696- .await
9797- {
9898- error!(
9999- "Failed to delete invite code uses for user {}: {:?}",
100100- user_id, e
101101- );
102102- }
103103- if let Err(e) = sqlx::query!(
104104- "DELETE FROM invite_codes WHERE created_by_user = $1",
105105- user_id
106106- )
107107- .execute(&mut *tx)
108108- .await
109109- {
110110- error!(
111111- "Failed to delete invite codes for user {}: {:?}",
112112- user_id, e
113113- );
114114- }
115115- if let Err(e) = sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
116116- .execute(&mut *tx)
3535+ if let Err(e) = state
3636+ .user_repo
3737+ .admin_delete_account_complete(user_id, did)
11738 .await
11839 {
119119- error!("Failed to delete user keys for user {}: {:?}", user_id, e);
120120- return ApiError::InternalError(Some("Failed to delete user keys".into())).into_response();
121121- }
122122- if let Err(e) = sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
123123- .execute(&mut *tx)
124124- .await
125125- {
126126- error!("Failed to delete user {}: {:?}", user_id, e);
127127- return ApiError::InternalError(Some("Failed to delete user".into())).into_response();
128128- }
129129- if let Err(e) = tx.commit().await {
130130- error!("Failed to commit account deletion transaction: {:?}", e);
131131- return ApiError::InternalError(Some("Failed to commit deletion".into())).into_response();
4040+ error!("Failed to delete account {}: {:?}", did, e);
4141+ return ApiError::InternalError(Some("Failed to delete account".into())).into_response();
13242 }
13343 if let Err(e) =
13444 crate::api::repo::record::sequence_account_event(&state, did, false, Some("deleted")).await
···66pub mod write;
7788pub use batch::apply_writes;
99-pub use delete::{DeleteRecordInput, delete_record};
99+pub use delete::{DeleteRecordInput, delete_record, delete_record_internal};
1010pub use read::{GetRecordInput, ListRecordsInput, ListRecordsOutput, get_record, list_records};
1111pub use utils::*;
1212pub use write::{
+63-78
crates/tranquil-pds/src/api/repo/record/read.rs
···5959 Query(input): Query<GetRecordInput>,
6060) -> Response {
6161 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
6262+ let hostname_for_handles = hostname.split(':').next().unwrap_or(&hostname);
6263 let user_id_opt = if input.repo.is_did() {
6363- sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str())
6464- .fetch_optional(&state.db)
6464+ let did: crate::types::Did = match input.repo.as_str().parse() {
6565+ Ok(d) => d,
6666+ Err(_) => return ApiError::InvalidRequest("Invalid DID format".into()).into_response(),
6767+ };
6868+ state
6969+ .user_repo
7070+ .get_id_by_did(&did)
6571 .await
6666- .map(|opt| opt.map(|r| r.id))
7272+ .map_err(|_| ())
6773 } else {
6874 let repo_str = input.repo.as_str();
6969- let handle = if !repo_str.contains('.') {
7070- format!("{}.{}", repo_str, hostname)
7575+ let handle_str = if !repo_str.contains('.') {
7676+ format!("{}.{}", repo_str, hostname_for_handles)
7177 } else {
7278 repo_str.to_string()
7379 };
7474- sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
7575- .fetch_optional(&state.db)
8080+ let handle: crate::types::Handle = match handle_str.parse() {
8181+ Ok(h) => h,
8282+ Err(_) => return ApiError::InvalidRequest("Invalid handle format".into()).into_response(),
8383+ };
8484+ state
8585+ .user_repo
8686+ .get_id_by_handle(&handle)
7687 .await
7777- .map(|opt| opt.map(|r| r.id))
8888+ .map_err(|_| ())
7889 };
7990 let user_id: uuid::Uuid = match user_id_opt {
8091 Ok(Some(id)) => id,
···8596 return ApiError::InternalError(None).into_response();
8697 }
8798 };
8888- let record_row = sqlx::query!(
8989- "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
9090- user_id,
9191- input.collection.as_str(),
9292- input.rkey.as_str()
9393- )
9494- .fetch_optional(&state.db)
9595- .await;
9696- let record_cid_str: String = match record_row {
9797- Ok(Some(row)) => row.record_cid,
9999+ let record_row = state
100100+ .repo_repo
101101+ .get_record_cid(user_id, &input.collection, &input.rkey)
102102+ .await;
103103+ let record_cid_link = match record_row {
104104+ Ok(Some(cid)) => cid,
98105 _ => {
99106 return ApiError::RecordNotFound.into_response();
100107 }
101108 };
109109+ let record_cid_str = record_cid_link.to_string();
102110 if let Some(expected_cid) = &input.cid
103111 && &record_cid_str != expected_cid
104112 {
···152160 Query(input): Query<ListRecordsInput>,
153161) -> Response {
154162 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
163163+ let hostname_for_handles = hostname.split(':').next().unwrap_or(&hostname);
155164 let user_id_opt = if input.repo.is_did() {
156156- sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str())
157157- .fetch_optional(&state.db)
165165+ let did: crate::types::Did = match input.repo.as_str().parse() {
166166+ Ok(d) => d,
167167+ Err(_) => return ApiError::InvalidRequest("Invalid DID format".into()).into_response(),
168168+ };
169169+ state
170170+ .user_repo
171171+ .get_id_by_did(&did)
158172 .await
159159- .map(|opt| opt.map(|r| r.id))
173173+ .map_err(|_| ())
160174 } else {
161175 let repo_str = input.repo.as_str();
162162- let handle = if !repo_str.contains('.') {
163163- format!("{}.{}", repo_str, hostname)
176176+ let handle_str = if !repo_str.contains('.') {
177177+ format!("{}.{}", repo_str, hostname_for_handles)
164178 } else {
165179 repo_str.to_string()
166180 };
167167- sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
168168- .fetch_optional(&state.db)
181181+ let handle: crate::types::Handle = match handle_str.parse() {
182182+ Ok(h) => h,
183183+ Err(_) => return ApiError::InvalidRequest("Invalid handle format".into()).into_response(),
184184+ };
185185+ state
186186+ .user_repo
187187+ .get_id_by_handle(&handle)
169188 .await
170170- .map(|opt| opt.map(|r| r.id))
189189+ .map_err(|_| ())
171190 };
172191 let user_id: uuid::Uuid = match user_id_opt {
173192 Ok(Some(id)) => id,
···181200 let limit = input.limit.unwrap_or(50).clamp(1, 100);
182201 let reverse = input.reverse.unwrap_or(false);
183202 let limit_i64 = limit as i64;
184184- let order = if reverse { "ASC" } else { "DESC" };
185185- let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
186186- let comparator = if reverse { ">" } else { "<" };
187187- let query = format!(
188188- "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
189189- comparator, order
190190- );
191191- sqlx::query_as(&query)
192192- .bind(user_id)
193193- .bind(input.collection.as_str())
194194- .bind(cursor)
195195- .bind(limit_i64)
196196- .fetch_all(&state.db)
197197- .await
198198- } else {
199199- let mut conditions = vec!["repo_id = $1", "collection = $2"];
200200- let mut param_idx = 3;
201201- if input.rkey_start.is_some() {
202202- conditions.push("rkey > $3");
203203- param_idx += 1;
204204- }
205205- if input.rkey_end.is_some() {
206206- conditions.push(if param_idx == 3 {
207207- "rkey < $3"
208208- } else {
209209- "rkey < $4"
210210- });
211211- param_idx += 1;
212212- }
213213- let limit_idx = param_idx;
214214- let query = format!(
215215- "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
216216- conditions.join(" AND "),
217217- order,
218218- limit_idx
219219- );
220220- let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
221221- .bind(user_id)
222222- .bind(input.collection.as_str());
223223- if let Some(start) = &input.rkey_start {
224224- query_builder = query_builder.bind(start.as_str());
225225- }
226226- if let Some(end) = &input.rkey_end {
227227- query_builder = query_builder.bind(end.as_str());
228228- }
229229- query_builder.bind(limit_i64).fetch_all(&state.db).await
230230- };
231231- let rows = match rows_res {
203203+ let cursor_rkey = input.cursor.as_ref().and_then(|c| c.parse::<crate::types::Rkey>().ok());
204204+ let rows = match state
205205+ .repo_repo
206206+ .list_records(
207207+ user_id,
208208+ &input.collection,
209209+ cursor_rkey.as_ref(),
210210+ limit_i64,
211211+ reverse,
212212+ input.rkey_start.as_ref(),
213213+ input.rkey_end.as_ref(),
214214+ )
215215+ .await
216216+ {
232217 Ok(r) => r,
233218 Err(e) => {
234219 error!("Error listing records: {:?}", e);
235220 return ApiError::InternalError(None).into_response();
236221 }
237222 };
238238- let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
223223+ let last_rkey = rows.last().map(|r| r.rkey.to_string());
239224 let parsed_rows: Vec<(Cid, String, String)> = rows
240225 .iter()
241241- .filter_map(|(rkey, cid_str)| {
242242- Cid::from_str(cid_str)
226226+ .filter_map(|row| {
227227+ Cid::from_str(row.record_cid.as_str())
243228 .ok()
244244- .map(|cid| (cid, rkey.clone(), cid_str.clone()))
229229+ .map(|cid| (cid, row.rkey.to_string(), row.record_cid.to_string()))
245230 })
246231 .collect();
247232 let cids: Vec<Cid> = parsed_rows.iter().map(|(cid, _, _)| *cid).collect();
···9494#[tokio::test]
9595async fn test_external_did_web_no_local_doc() {
9696 let client = client();
9797+ let base = base_url().await;
9798 let mock_server = MockServer::start().await;
9899 let mock_uri = mock_server.uri();
99100 let mock_addr = mock_uri.trim_start_matches("http://");
100101 let did = format!("did:web:{}", mock_addr.replace(":", "%3A"));
101102 let handle = format!("xw{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
102102- let pds_endpoint = base_url().await.replace("http://", "https://");
103103+ let pds_endpoint = common::pds_endpoint();
103104104105 let reserve_res = client
105106 .post(format!(
106107 "{}/xrpc/com.atproto.server.reserveSigningKey",
107107- base_url().await
108108+ base
108109 ))
109110 .json(&json!({ "did": did }))
110111 .send()
···150151 let res = client
151152 .post(format!(
152153 "{}/xrpc/com.atproto.server.createAccount",
153153- base_url().await
154154+ base
154155 ))
155156 .json(&payload)
156157 .send()
···161162 panic!("createAccount failed: {:?}", body);
162163 }
163164 let res = client
164164- .get(format!("{}/u/{}/did.json", base_url().await, handle))
165165+ .get(format!("{}/u/{}/did.json", base, handle))
165166 .send()
166167 .await
167168 .expect("Failed to fetch DID doc");
···383384#[tokio::test]
384385async fn test_did_web_byod_flow() {
385386 let client = client();
387387+ let base = base_url().await;
386388 let mock_server = MockServer::start().await;
387389 let mock_uri = mock_server.uri();
388390 let mock_addr = mock_uri.trim_start_matches("http://");
···393395 unique_id
394396 );
395397 let handle = format!("by{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
396396- let pds_endpoint = base_url().await.replace("http://", "https://");
397397- let pds_did = format!("did:web:{}", pds_endpoint.trim_start_matches("https://"));
398398+ let pds_endpoint = common::pds_endpoint();
399399+ let pds_hostname = common::pds_hostname();
400400+ let pds_did = format!("did:web:{}", pds_hostname);
398401399402 let temp_key = SigningKey::random(&mut rand::thread_rng());
400403 let public_key_multibase = signing_key_to_multibase(&temp_key);
···430433 let res = client
431434 .post(format!(
432435 "{}/xrpc/com.atproto.server.createAccount",
433433- base_url().await
436436+ base
434437 ))
435438 .header("Authorization", format!("Bearer {}", service_jwt))
436439 .json(&payload)
···454457 let res = client
455458 .get(format!(
456459 "{}/xrpc/com.atproto.server.checkAccountStatus",
457457- base_url().await
460460+ base
458461 ))
459462 .bearer_auth(&access_jwt)
460463 .send()
···470473 let res = client
471474 .get(format!(
472475 "{}/xrpc/com.atproto.identity.getRecommendedDidCredentials",
473473- base_url().await
476476+ base
474477 ))
475478 .bearer_auth(&access_jwt)
476479 .send()
···493496 let res = client
494497 .post(format!(
495498 "{}/xrpc/com.atproto.server.activateAccount",
496496- base_url().await
499499+ base
497500 ))
498501 .bearer_auth(&access_jwt)
499502 .send()
···508511 let res = client
509512 .get(format!(
510513 "{}/xrpc/com.atproto.server.checkAccountStatus",
511511- base_url().await
514514+ base
512515 ))
513516 .bearer_auth(&access_jwt)
514517 .send()
···524527 let res = client
525528 .post(format!(
526529 "{}/xrpc/com.atproto.repo.createRecord",
527527- base_url().await
530530+ base
528531 ))
529532 .bearer_auth(&access_jwt)
530533 .json(&json!({
+131
crates/tranquil-pds/tests/firehose_validation.rs
···850850 "Should have received commits even with outdated cursor"
851851 );
852852}
853853+854854+#[tokio::test]
855855+async fn test_firehose_car_contains_mst_blocks() {
856856+ let client = client();
857857+ let (token, did) = create_account_and_login(&client).await;
858858+859859+ for i in 0..3 {
860860+ let post_payload = json!({
861861+ "repo": did,
862862+ "collection": "app.bsky.feed.post",
863863+ "record": {
864864+ "$type": "app.bsky.feed.post",
865865+ "text": format!("Setup post {}", i),
866866+ "createdAt": chrono::Utc::now().to_rfc3339(),
867867+ }
868868+ });
869869+ client
870870+ .post(format!(
871871+ "{}/xrpc/com.atproto.repo.createRecord",
872872+ base_url().await
873873+ ))
874874+ .bearer_auth(&token)
875875+ .json(&post_payload)
876876+ .send()
877877+ .await
878878+ .expect("Failed to create setup post");
879879+ tokio::time::sleep(std::time::Duration::from_millis(50)).await;
880880+ }
881881+882882+ let url = format!(
883883+ "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
884884+ app_port()
885885+ );
886886+ let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
887887+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
888888+889889+ let post_payload = json!({
890890+ "repo": did,
891891+ "collection": "app.bsky.feed.post",
892892+ "record": {
893893+ "$type": "app.bsky.feed.post",
894894+ "text": "Test post for MST block validation",
895895+ "createdAt": chrono::Utc::now().to_rfc3339(),
896896+ }
897897+ });
898898+ let res = client
899899+ .post(format!(
900900+ "{}/xrpc/com.atproto.repo.createRecord",
901901+ base_url().await
902902+ ))
903903+ .bearer_auth(&token)
904904+ .json(&post_payload)
905905+ .send()
906906+ .await
907907+ .expect("Failed to create post");
908908+ assert_eq!(res.status(), StatusCode::OK);
909909+ let create_result: Value = res.json().await.unwrap();
910910+ let record_cid_str = create_result["cid"].as_str().unwrap();
911911+ let expected_record_cid: Cid = record_cid_str.parse().unwrap();
912912+913913+ let mut frame_opt: Option<CommitFrame> = None;
914914+ let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async {
915915+ loop {
916916+ let msg = ws_stream.next().await.unwrap().unwrap();
917917+ let raw_bytes = match msg {
918918+ tungstenite::Message::Binary(bin) => bin,
919919+ _ => continue,
920920+ };
921921+ if let Ok((_, f)) = parse_frame(&raw_bytes)
922922+ && f.repo == did
923923+ && f.ops.iter().any(|op| op.cid == Some(expected_record_cid))
924924+ {
925925+ frame_opt = Some(f);
926926+ break;
927927+ }
928928+ }
929929+ })
930930+ .await;
931931+ assert!(timeout.is_ok(), "Timed out waiting for firehose event");
932932+ let frame = frame_opt.expect("No matching frame found");
933933+934934+ let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap();
935935+936936+ let mut block_count = 0;
937937+ let mut found_commit = false;
938938+ let mut found_record = false;
939939+ let mut mst_block_count = 0;
940940+941941+ while let Ok(Some((cid, data))) = car_reader.next_block().await {
942942+ block_count += 1;
943943+944944+ if cid == frame.commit {
945945+ found_commit = true;
946946+ continue;
947947+ }
948948+949949+ if cid == expected_record_cid {
950950+ found_record = true;
951951+ continue;
952952+ }
953953+954954+ if data.len() > 10 && data.len() < 5000 {
955955+ mst_block_count += 1;
956956+ }
957957+ }
958958+959959+ println!("CAR block analysis:");
960960+ println!(" Total blocks: {}", block_count);
961961+ println!(" Found commit: {}", found_commit);
962962+ println!(" Found record: {}", found_record);
963963+ println!(" MST/other blocks: {}", mst_block_count);
964964+965965+ assert!(found_commit, "CAR must contain commit block");
966966+ assert!(found_record, "CAR must contain record block");
967967+968968+ assert!(
969969+ block_count >= 3,
970970+ "CAR should contain at least commit + record + MST node(s), got {} blocks. \
971971+ This may indicate firehose is not including all relevant blocks.",
972972+ block_count
973973+ );
974974+975975+ assert!(
976976+ mst_block_count >= 1,
977977+ "CAR should contain MST node blocks for repo validation, got {} MST blocks. \
978978+ Firehose must include relevant MST blocks, not just new ones.",
979979+ mst_block_count
980980+ );
981981+982982+ ws_stream.send(tungstenite::Message::Close(None)).await.ok();
983983+}
+12-9
crates/tranquil-pds/tests/identity.rs
···4848#[tokio::test]
4949async fn test_resolve_handle_not_found() {
5050 let client = client();
5151- let params = [("handle", "nonexistent_handle_12345")];
5151+ let _base = base_url().await;
5252+ let params = [("handle", "nonexistent.handle.test")];
5253 let res = client
5354 .get(format!(
5455 "{}/xrpc/com.atproto.identity.resolveHandle",
5555- base_url().await
5656+ _base
5657 ))
5758 .query(¶ms)
5859 .send()
···99100 let mock_addr = mock_uri.trim_start_matches("http://");
100101 let did = format!("did:web:{}", mock_addr.replace(":", "%3A"));
101102 let handle = format!("wu{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
102102- let pds_endpoint = base_url().await.replace("http://", "https://");
103103+ let base = base_url().await;
104104+ let pds_endpoint = common::pds_endpoint();
103105104106 let reserve_res = client
105107 .post(format!(
106108 "{}/xrpc/com.atproto.server.reserveSigningKey",
107107- base_url().await
109109+ base
108110 ))
109111 .json(&json!({ "did": did }))
110112 .send()
···149151 let res = client
150152 .post(format!(
151153 "{}/xrpc/com.atproto.server.createAccount",
152152- base_url().await
154154+ base
153155 ))
154156 .json(&payload)
155157 .send()
···169171 .expect("createAccount response was not JSON");
170172 assert_eq!(body["did"], did);
171173 let res = client
172172- .get(format!("{}/u/{}/did.json", base_url().await, handle))
174174+ .get(format!("{}/u/{}/did.json", base, handle))
173175 .send()
174176 .await
175177 .expect("Failed to fetch DID doc");
···217219#[tokio::test]
218220async fn test_did_web_lifecycle() {
219221 let client = client();
222222+ let base = base_url().await;
220223 let mock_server = MockServer::start().await;
221224 let mock_uri = mock_server.uri();
222225 let mock_addr = mock_uri.trim_start_matches("http://");
223226 let handle = format!("lc{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
224227 let did = format!("did:web:{}:u:{}", mock_addr.replace(":", "%3A"), handle);
225228 let email = format!("{}@test.com", handle);
226226- let pds_endpoint = base_url().await.replace("http://", "https://");
229229+ let pds_endpoint = common::pds_endpoint();
227230228231 let reserve_res = client
229232 .post(format!(
230233 "{}/xrpc/com.atproto.server.reserveSigningKey",
231231- base_url().await
234234+ base
232235 ))
233236 .json(&json!({ "did": did }))
234237 .send()
···273276 let res = client
274277 .post(format!(
275278 "{}/xrpc/com.atproto.server.createAccount",
276276- base_url().await
279279+ base
277280 ))
278281 .json(&create_payload)
279282 .send()
+2-2
crates/tranquil-pds/tests/lifecycle_record.rs
···132132 .expect("Failed to send stale update");
133133 assert_eq!(
134134 stale_res.status(),
135135- StatusCode::CONFLICT,
136136- "Stale update should cause 409"
135135+ StatusCode::BAD_REQUEST,
136136+ "Stale update should cause 400 InvalidSwap"
137137 );
138138 let good_update_payload = json!({
139139 "repo": did,
+52-74
crates/tranquil-pds/tests/notifications.rs
···11mod common;
22-use tranquil_pds::comms::{
33- CommsChannel, CommsStatus, CommsType, NewComms, enqueue_comms, enqueue_welcome,
44-};
22+use sqlx::Row;
33+use tranquil_pds::comms::{CommsChannel, CommsStatus, CommsType};
5465#[tokio::test]
76async fn test_enqueue_comms() {
87 let pool = common::get_test_db_pool().await;
98 let (_, did) = common::create_account_and_login(&common::client()).await;
1010- let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
99+ let user_id: uuid::Uuid = sqlx::query_scalar("SELECT id FROM users WHERE did = $1")
1010+ .bind(&did)
1111 .fetch_one(pool)
1212 .await
1313 .expect("User not found");
1414- let item = NewComms::email(
1515- user_id,
1616- CommsType::Welcome,
1717- "test@example.com".to_string(),
1818- "Test Subject".to_string(),
1919- "Test body".to_string(),
2020- );
2121- let comms_id = enqueue_comms(pool, item)
2222- .await
2323- .expect("Failed to enqueue comms");
2424- let row = sqlx::query!(
2525- r#"
2626- SELECT
2727- id, user_id, recipient, subject, body,
2828- channel as "channel: CommsChannel",
2929- comms_type as "comms_type: CommsType",
3030- status as "status: CommsStatus"
3131- FROM comms_queue
3232- WHERE id = $1
3333- "#,
3434- comms_id
1414+ let comms_id: uuid::Uuid = sqlx::query_scalar(
1515+ r#"INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body)
1616+ VALUES ($1, 'email', 'welcome', $2, $3, $4)
1717+ RETURNING id"#,
3518 )
1919+ .bind(user_id)
2020+ .bind("test@example.com")
2121+ .bind("Test Subject")
2222+ .bind("Test body")
3623 .fetch_one(pool)
3724 .await
3838- .expect("Comms not found");
3939- assert_eq!(row.user_id, user_id);
4040- assert_eq!(row.recipient, "test@example.com");
4141- assert_eq!(row.subject.as_deref(), Some("Test Subject"));
4242- assert_eq!(row.body, "Test body");
4343- assert_eq!(row.channel, CommsChannel::Email);
4444- assert_eq!(row.comms_type, CommsType::Welcome);
4545- assert_eq!(row.status, CommsStatus::Pending);
4646-}
4747-4848-#[tokio::test]
4949-async fn test_enqueue_welcome() {
5050- let pool = common::get_test_db_pool().await;
5151- let (_, did) = common::create_account_and_login(&common::client()).await;
5252- let user_row = sqlx::query!("SELECT id, email, handle FROM users WHERE did = $1", did)
5353- .fetch_one(pool)
5454- .await
5555- .expect("User not found");
5656- let comms_id = enqueue_welcome(pool, user_row.id, "example.com")
5757- .await
5858- .expect("Failed to enqueue welcome comms");
5959- let row = sqlx::query!(
2525+ .expect("Failed to enqueue comms");
2626+ let row = sqlx::query(
6027 r#"
6161- SELECT
6262- recipient, subject, body,
6363- comms_type as "comms_type: CommsType"
2828+ SELECT id, user_id, recipient, subject, body, channel, comms_type, status
6429 FROM comms_queue
6530 WHERE id = $1
6631 "#,
6767- comms_id
6832 )
3333+ .bind(comms_id)
6934 .fetch_one(pool)
7035 .await
7136 .expect("Comms not found");
7272- assert_eq!(Some(row.recipient), user_row.email);
7373- assert_eq!(row.subject.as_deref(), Some("Welcome to example.com"));
7474- assert!(row.body.contains(&format!("@{}", user_row.handle)));
7575- assert_eq!(row.comms_type, CommsType::Welcome);
3737+ let row_user_id: uuid::Uuid = row.get("user_id");
3838+ let row_recipient: String = row.get("recipient");
3939+ let row_subject: Option<String> = row.get("subject");
4040+ let row_body: String = row.get("body");
4141+ let row_channel: CommsChannel = row.get("channel");
4242+ let row_comms_type: CommsType = row.get("comms_type");
4343+ let row_status: CommsStatus = row.get("status");
4444+ assert_eq!(row_user_id, user_id);
4545+ assert_eq!(row_recipient, "test@example.com");
4646+ assert_eq!(row_subject.as_deref(), Some("Test Subject"));
4747+ assert_eq!(row_body, "Test body");
4848+ assert_eq!(row_channel, CommsChannel::Email);
4949+ assert_eq!(row_comms_type, CommsType::Welcome);
5050+ assert_eq!(row_status, CommsStatus::Pending);
7651}
77527853#[tokio::test]
7954async fn test_comms_queue_status_index() {
8055 let pool = common::get_test_db_pool().await;
8156 let (_, did) = common::create_account_and_login(&common::client()).await;
8282- let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
5757+ let user_id: uuid::Uuid = sqlx::query_scalar("SELECT id FROM users WHERE did = $1")
5858+ .bind(&did)
8359 .fetch_one(pool)
8460 .await
8561 .expect("User not found");
8686- let initial_count: i64 = sqlx::query_scalar!(
6262+ let initial_count: i64 = sqlx::query_scalar(
8763 "SELECT COUNT(*) FROM comms_queue WHERE status = 'pending' AND user_id = $1",
8888- user_id
8964 )
6565+ .bind(user_id)
9066 .fetch_one(pool)
9167 .await
9292- .expect("Failed to count")
9393- .unwrap_or(0);
9494- for i in 0..5 {
9595- let item = NewComms::email(
9696- user_id,
9797- CommsType::PasswordReset,
9898- format!("test{}@example.com", i),
9999- "Test".to_string(),
100100- "Body".to_string(),
101101- );
102102- enqueue_comms(pool, item).await.expect("Failed to enqueue");
103103- }
104104- let final_count: i64 = sqlx::query_scalar!(
6868+ .expect("Failed to count");
6969+ let inserts = (0..5).map(|i| {
7070+ sqlx::query(
7171+ r#"INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body)
7272+ VALUES ($1, 'email', 'password_reset', $2, $3, $4)"#,
7373+ )
7474+ .bind(user_id)
7575+ .bind(format!("test{}@example.com", i))
7676+ .bind("Test")
7777+ .bind("Body")
7878+ .execute(pool)
7979+ });
8080+ futures::future::try_join_all(inserts)
8181+ .await
8282+ .expect("Failed to enqueue");
8383+ let final_count: i64 = sqlx::query_scalar(
10584 "SELECT COUNT(*) FROM comms_queue WHERE status = 'pending' AND user_id = $1",
106106- user_id
10785 )
8686+ .bind(user_id)
10887 .fetch_one(pool)
10988 .await
110110- .expect("Failed to count")
111111- .unwrap_or(0);
8989+ .expect("Failed to count");
11290 assert_eq!(final_count - initial_count, 5);
11391}
+2-2
crates/tranquil-pds/tests/sync_conformance.rs
···352352 let initial_body: Value = initial_commit_res.json().await.unwrap();
353353 let initial_rev = initial_body["rev"].as_str().unwrap();
354354355355+ create_post(&client, &did, &jwt, "Test post for since param").await;
356356+355357 let full_repo_res = client
356358 .get(format!(
357359 "{}/xrpc/com.atproto.sync.getRepo",
···364366 assert_eq!(full_repo_res.status(), StatusCode::OK);
365367 let full_repo_bytes = full_repo_res.bytes().await.unwrap();
366368 let full_repo_size = full_repo_bytes.len();
367367-368368- create_post(&client, &did, &jwt, "Test post for since param").await;
369369370370 let partial_repo_res = client
371371 .get(format!(