Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-server): email config, tests, fmt #10

open opened by oyster.cafe targeting main from feat/inline-emailing
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mkuoecysgl22
+239 -98
Diff #1
+158
crates/tranquil-comms/tests/email_smtp.rs
··· 1 + use std::time::Duration; 2 + 3 + use chrono::Utc; 4 + use lettre::message::Mailbox; 5 + use lettre::transport::smtp::AsyncSmtpTransport; 6 + use lettre::transport::smtp::extension::ClientId; 7 + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 8 + use tokio::net::{TcpListener, TcpStream}; 9 + use tranquil_comms::email::transport::SendMode; 10 + use tranquil_comms::email::{EmailSender, types::HeloName}; 11 + use tranquil_comms::{CommsChannel, CommsSender, CommsStatus, CommsType, QueuedComms, SendError}; 12 + use uuid::Uuid; 13 + 14 + fn fixture(recipient: &str, subject: &str, body: &str) -> QueuedComms { 15 + QueuedComms { 16 + id: Uuid::new_v4(), 17 + user_id: None, 18 + channel: CommsChannel::Email, 19 + comms_type: CommsType::Welcome, 20 + status: CommsStatus::Pending, 21 + recipient: recipient.to_string(), 22 + subject: Some(subject.to_string()), 23 + body: body.to_string(), 24 + metadata: None, 25 + attempts: 0, 26 + max_attempts: 3, 27 + last_error: None, 28 + created_at: Utc::now(), 29 + updated_at: Utc::now(), 30 + scheduled_for: Utc::now(), 31 + processed_at: None, 32 + } 33 + } 34 + 35 + fn build_smarthost_sender(host: &str, port: u16) -> EmailSender { 36 + build_smarthost_sender_with_total_timeout(host, port, Duration::from_secs(10)) 37 + } 38 + 39 + fn build_smarthost_sender_with_total_timeout( 40 + host: &str, 41 + port: u16, 42 + total_timeout: Duration, 43 + ) -> EmailSender { 44 + let from: Mailbox = "Tranquil Test <noreply@nel.pet>".parse().unwrap(); 45 + let helo = HeloName::parse("mta.nel.pet").unwrap(); 46 + let transport = AsyncSmtpTransport::<lettre::Tokio1Executor>::builder_dangerous(host) 47 + .port(port) 48 + .hello_name(ClientId::Domain(helo.into_inner())) 49 + .timeout(Some(Duration::from_secs(5))) 50 + .build(); 51 + EmailSender::new( 52 + from, 53 + SendMode::Smarthost { 54 + transport: Box::new(transport), 55 + total_timeout, 56 + }, 57 + None, 58 + ) 59 + } 60 + 61 + async fn drive_stub(stream: TcpStream, rcpt_response: &'static [u8]) -> std::io::Result<()> { 62 + let (read, mut write) = stream.into_split(); 63 + let mut reader = BufReader::new(read); 64 + write.write_all(b"220 stub ESMTP\r\n").await?; 65 + let mut line = String::new(); 66 + loop { 67 + line.clear(); 68 + let n = reader.read_line(&mut line).await?; 69 + if n == 0 { 70 + return Ok(()); 71 + } 72 + let upper = line.to_ascii_uppercase(); 73 + let response: &[u8] = match upper.split_whitespace().next() { 74 + Some("EHLO") | Some("HELO") => b"250-stub\r\n250 SIZE 10240000\r\n", 75 + Some("MAIL") => b"250 OK\r\n", 76 + Some("RCPT") => rcpt_response, 77 + Some("DATA") => b"354 end with .\r\n", 78 + Some("RSET") => b"250 OK\r\n", 79 + Some("QUIT") => b"221 bye\r\n", 80 + _ => b"500 unknown\r\n", 81 + }; 82 + write.write_all(response).await?; 83 + if upper.starts_with("QUIT") { 84 + return Ok(()); 85 + } 86 + } 87 + } 88 + 89 + async fn spawn_stub(rcpt_response: &'static [u8]) -> u16 { 90 + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 91 + let port = listener.local_addr().unwrap().port(); 92 + tokio::spawn(async move { 93 + let (stream, _) = listener.accept().await.unwrap(); 94 + let _ = drive_stub(stream, rcpt_response).await; 95 + }); 96 + port 97 + } 98 + 99 + #[tokio::test] 100 + async fn rcpt_550_classifies_as_smtp_permanent() { 101 + let port = spawn_stub(b"550 5.1.1 user unknown\r\n").await; 102 + let sender = build_smarthost_sender("127.0.0.1", port); 103 + let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await; 104 + match result { 105 + Err(SendError::SmtpPermanent(_)) => {} 106 + other => panic!("expected SmtpPermanent, got {other:?}"), 107 + } 108 + } 109 + 110 + #[tokio::test] 111 + async fn rcpt_421_classifies_as_smtp_transient() { 112 + let port = spawn_stub(b"421 4.7.0 try again later\r\n").await; 113 + let sender = build_smarthost_sender("127.0.0.1", port); 114 + let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await; 115 + match result { 116 + Err(SendError::SmtpTransient(_)) => {} 117 + other => panic!("expected SmtpTransient, got {other:?}"), 118 + } 119 + } 120 + 121 + #[tokio::test] 122 + async fn invalid_recipient_classifies_as_invalid_recipient() { 123 + let port = spawn_stub(b"250 OK\r\n").await; 124 + let sender = build_smarthost_sender("127.0.0.1", port); 125 + let result = sender.send(&fixture("not-an-address", "x", "x")).await; 126 + match result { 127 + Err(SendError::InvalidRecipient(_)) => {} 128 + other => panic!("expected InvalidRecipient, got {other:?}"), 129 + } 130 + } 131 + 132 + async fn spawn_silent_stub() -> u16 { 133 + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 134 + let port = listener.local_addr().unwrap().port(); 135 + tokio::spawn(async move { 136 + let (_stream, _) = listener.accept().await.unwrap(); 137 + std::future::pending::<()>().await; 138 + }); 139 + port 140 + } 141 + 142 + #[tokio::test] 143 + async fn smarthost_silent_relay_hits_total_timeout() { 144 + let port = spawn_silent_stub().await; 145 + let sender = 146 + build_smarthost_sender_with_total_timeout("127.0.0.1", port, Duration::from_millis(500)); 147 + let start = std::time::Instant::now(); 148 + let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await; 149 + let elapsed = start.elapsed(); 150 + match result { 151 + Err(SendError::Timeout) => {} 152 + other => panic!("expected Timeout, got {other:?}"), 153 + } 154 + assert!( 155 + elapsed < Duration::from_secs(2), 156 + "send returned in {elapsed:?}, expected close to 500ms total_timeout" 157 + ); 158 + }
+2 -4
crates/tranquil-pds/src/repo_ops.rs
··· 526 526 527 527 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 528 528 529 - let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = ops 530 - .iter() 531 - .map(|op| (op.collection_rkey(), op)) 532 - .collect(); 529 + let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = 530 + ops.iter().map(|op| (op.collection_rkey(), op)).collect(); 533 531 534 532 let final_record_uris: HashSet<AtUri> = final_ops 535 533 .iter()
+2 -8
crates/tranquil-pds/tests/repo_batch.rs
··· 402 402 "{}/xrpc/com.atproto.repo.listRecords", 403 403 base_url().await 404 404 )) 405 - .query(&[ 406 - ("repo", did.as_str()), 407 - ("collection", "app.bsky.feed.post"), 408 - ]) 405 + .query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")]) 409 406 .send() 410 407 .await 411 408 .expect("Failed to list records"); ··· 484 481 "{}/xrpc/com.atproto.repo.listRecords", 485 482 base_url().await 486 483 )) 487 - .query(&[ 488 - ("repo", did.as_str()), 489 - ("collection", "app.bsky.feed.post"), 490 - ]) 484 + .query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")]) 491 485 .send() 492 486 .await 493 487 .expect("Failed to list records");
+21 -14
crates/tranquil-server/src/main.rs
··· 53 53 return ExitCode::FAILURE; 54 54 } 55 55 }; 56 - match config.validate(*ignore_secrets) { 57 - Ok(()) => { 58 - println!("Configuration is valid."); 59 - ExitCode::SUCCESS 60 - } 61 - Err(e) => { 62 - eprint!("{e}"); 63 - ExitCode::FAILURE 64 - } 56 + if let Err(e) = config.validate(*ignore_secrets) { 57 + eprint!("{e}"); 58 + return ExitCode::FAILURE; 65 59 } 60 + if let Err(e) = EmailSender::from_config(&config) { 61 + eprintln!("Email configuration invalid: {e}"); 62 + return ExitCode::FAILURE; 63 + } 64 + println!("Configuration is valid."); 65 + ExitCode::SUCCESS 66 66 } 67 67 }; 68 68 } ··· 141 141 142 142 let cfg = tranquil_config::get(); 143 143 144 - if let Some(email_sender) = EmailSender::from_config(cfg) { 145 - info!("Email comms enabled"); 146 - comms_service = comms_service.register_sender(email_sender); 147 - } else { 148 - warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 144 + match EmailSender::from_config(cfg) { 145 + Ok(Some(email_sender)) => { 146 + info!("Email comms enabled"); 147 + comms_service = comms_service.register_sender(email_sender); 148 + } 149 + Ok(None) => { 150 + warn!("Email comms disabled (MAIL_FROM_ADDRESS unset)"); 151 + } 152 + Err(e) => { 153 + error!(error = %e, "Email configuration invalid"); 154 + return Err(e.into()); 155 + } 149 156 } 150 157 151 158 if let Some(discord_sender) = DiscordSender::from_config(cfg) {
+7 -10
crates/tranquil-store/src/blockstore/store.rs
··· 806 806 impl StorageIO for EioOnReadAtRange { 807 807 fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> { 808 808 let fd = self.inner.open(path, opts)?; 809 - self.fd_paths 810 - .lock() 811 - .unwrap() 812 - .insert(fd, path.to_path_buf()); 809 + self.fd_paths.lock().unwrap().insert(fd, path.to_path_buf()); 813 810 Ok(fd) 814 811 } 815 812 ··· 887 884 let cid_a = [0xAAu8; CID_SIZE]; 888 885 let data_a = vec![1u8; 64]; 889 886 let block_a_offset = BlockOffset::new(BLOCK_HEADER_SIZE as u64); 890 - let len_a = 891 - encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap(); 887 + let len_a = encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap(); 892 888 893 889 let block_b_offset_raw = BLOCK_HEADER_SIZE as u64 + len_a; 894 890 let block_b_offset = BlockOffset::new(block_b_offset_raw); ··· 982 978 #[test] 983 979 fn retry_with_backoff_passes_attempt_index_to_op() { 984 980 let observed = std::sync::Mutex::new(Vec::<u8>::new()); 985 - let _result: Result<(), RepoError> = retry_with_backoff(instant_policy(4), &mut |attempt| { 986 - observed.lock().unwrap().push(attempt); 987 - Err(RepoError::storage(io::Error::other("EIO"))) 988 - }); 981 + let _result: Result<(), RepoError> = 982 + retry_with_backoff(instant_policy(4), &mut |attempt| { 983 + observed.lock().unwrap().push(attempt); 984 + Err(RepoError::storage(io::Error::other("EIO"))) 985 + }); 989 986 assert_eq!(*observed.lock().unwrap(), vec![0, 1, 2, 3]); 990 987 } 991 988 }
+1 -4
crates/tranquil-store/src/gauntlet/farm.rs
··· 157 157 158 158 #[test] 159 159 fn scratch_for_thread_falls_back_to_root_zero_outside_pool() { 160 - let roots = vec![ 161 - PathBuf::from("/scratch/a"), 162 - PathBuf::from("/scratch/b"), 163 - ]; 160 + let roots = vec![PathBuf::from("/scratch/a"), PathBuf::from("/scratch/b")]; 164 161 assert_eq!( 165 162 scratch_for_thread(&roots, None), 166 163 Some(PathBuf::from("/scratch/a"))
+1 -1
crates/tranquil-store/src/lib.rs
··· 28 28 }; 29 29 #[cfg(any(test, feature = "test-harness"))] 30 30 pub use sim::{ 31 - FaultConfig, LatencyNs, OpRecord, Probability, PristineGuard, SimulatedIO, SyncReorderWindow, 31 + FaultConfig, LatencyNs, OpRecord, PristineGuard, Probability, SimulatedIO, SyncReorderWindow, 32 32 sim_proptest_cases, sim_seed_count, sim_seed_range, sim_single_seed, 33 33 }; 34 34
+25 -30
crates/tranquil-store/src/sim.rs
··· 1 1 use std::collections::{HashMap, HashSet, VecDeque}; 2 2 use std::io; 3 3 use std::path::{Path, PathBuf}; 4 + use std::sync::Arc; 4 5 use std::sync::Mutex; 5 6 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 6 - use std::sync::Arc; 7 7 use std::time::Duration; 8 8 9 9 use crate::io::{FileId, OpenOptions, StorageIO}; ··· 561 561 return Err(io::Error::other("simulated EIO on read")); 562 562 } 563 563 564 - let read_offset = 565 - if state.should_fault(seed, fault.misdirected_read_probability) { 566 - let drift_sectors = state.next_random_usize(seed, 8) + 1; 567 - let drift = (drift_sectors * SECTOR_BYTES) as u64; 568 - if state.next_random(seed) < 0.5 { 569 - offset.saturating_sub(drift) 570 - } else { 571 - offset.saturating_add(drift) 572 - } 564 + let read_offset = if state.should_fault(seed, fault.misdirected_read_probability) { 565 + let drift_sectors = state.next_random_usize(seed, 8) + 1; 566 + let drift = (drift_sectors * SECTOR_BYTES) as u64; 567 + if state.next_random(seed) < 0.5 { 568 + offset.saturating_sub(drift) 573 569 } else { 574 - offset 575 - }; 570 + offset.saturating_add(drift) 571 + } 572 + } else { 573 + offset 574 + }; 576 575 577 576 let storage = state.storage.get(&sid).unwrap(); 578 577 ··· 620 619 return Err(io::Error::other("simulated EIO on write")); 621 620 } 622 621 623 - let torn_len = 624 - if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) { 625 - let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); 626 - let page_end = page_base + TORN_PAGE_BYTES; 627 - let cap = page_end.saturating_sub(offset as usize).min(buf.len()); 628 - let max_sectors = cap / SECTOR_BYTES; 629 - (max_sectors >= 2).then(|| { 630 - let n = state.next_random_usize(seed, max_sectors - 1) + 1; 631 - n * SECTOR_BYTES 632 - }) 633 - } else { 634 - None 635 - }; 622 + let torn_len = if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) { 623 + let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); 624 + let page_end = page_base + TORN_PAGE_BYTES; 625 + let cap = page_end.saturating_sub(offset as usize).min(buf.len()); 626 + let max_sectors = cap / SECTOR_BYTES; 627 + (max_sectors >= 2).then(|| { 628 + let n = state.next_random_usize(seed, max_sectors - 1) + 1; 629 + n * SECTOR_BYTES 630 + }) 631 + } else { 632 + None 633 + }; 636 634 637 635 let actual_len = match torn_len { 638 636 Some(n) => n, 639 - None if buf.len() > 1 640 - && state.should_fault(seed, fault.partial_write_probability) => 641 - { 637 + None if buf.len() > 1 && state.should_fault(seed, fault.partial_write_probability) => { 642 638 let partial = state.next_random_usize(seed, buf.len()); 643 639 partial.max(1) 644 640 } ··· 840 836 } 841 837 842 838 let dir_path = path.to_path_buf(); 843 - let actually_persisted = 844 - !state.should_fault(seed, fault.dir_sync_failure_probability); 839 + let actually_persisted = !state.should_fault(seed, fault.dir_sync_failure_probability); 845 840 846 841 if actually_persisted { 847 842 state.dirs_durable.insert(dir_path.clone());
+1 -1
crates/tranquil-store/tests/gauntlet_smoke.rs
··· 554 554 std::fs::create_dir_all(&root_b).expect("mkdir b"); 555 555 let roots = vec![root_a.clone(), root_b.clone()]; 556 556 let reports = farm::run_many_timed_with_scratch_roots( 557 - |seed| fast_sanity_config(seed), 557 + fast_sanity_config, 558 558 &roots, 559 559 (0..2).map(Seed), 560 560 );
+10 -17
crates/tranquil-store/tests/sim_blockstore.rs
··· 717 717 718 718 { 719 719 let s = Arc::clone(&sim); 720 - let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io( 721 - config.clone(), 722 - move || Arc::clone(&s), 723 - ) 724 - .unwrap(); 720 + let store = 721 + TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config.clone(), move || { 722 + Arc::clone(&s) 723 + }) 724 + .unwrap(); 725 725 store 726 726 .put_blocks_blocking(vec![(cid, data.clone())]) 727 727 .unwrap(); ··· 730 730 sim.crash(); 731 731 732 732 let s = Arc::clone(&sim); 733 - let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || { 734 - Arc::clone(&s) 735 - }) 736 - .unwrap(); 733 + let store = 734 + TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || Arc::clone(&s)) 735 + .unwrap(); 737 736 738 737 match store.get_block_sync(&cid) { 739 - Ok(Some(d)) => assert_eq!( 740 - &d[..], 741 - &data[..], 742 - "block content mismatch after crash" 743 - ), 738 + Ok(Some(d)) => assert_eq!(&d[..], &data[..], "block content mismatch after crash"), 744 739 Ok(None) => panic!( 745 740 "durability bug: put_blocks_blocking returned Ok but block missing after crash" 746 741 ), 747 - Err(e) => panic!( 748 - "durability bug: block read failed after crash: {e}" 749 - ), 742 + Err(e) => panic!("durability bug: block read failed after crash: {e}"), 750 743 } 751 744 }); 752 745 }
+4 -1
crates/tranquil-store/tests/sim_eventlog.rs
··· 1157 1157 "sync must ack only events 1..=2 with corrupt event 3" 1158 1158 ); 1159 1159 assert_eq!(result.flushed_events.len(), 2); 1160 - assert!(writer.is_poisoned(), "writer must be poisoned after partial sync"); 1160 + assert!( 1161 + writer.is_poisoned(), 1162 + "writer must be poisoned after partial sync" 1163 + ); 1161 1164 1162 1165 let append_after_poison = writer.append( 1163 1166 DidHash::from_did("did:plc:after"),
+7 -8
crates/tranquil-sync/src/subscribe_repos.rs
··· 85 85 }; 86 86 for event in events { 87 87 *last_seen = event.seq; 88 - let bytes = 89 - match format_event_with_prefetched_blocks(state, event, &prefetched).await { 90 - Ok(b) => b, 91 - Err(e) => { 92 - warn!("Lag recovery format failed: {}", e); 93 - return Err(()); 94 - } 95 - }; 88 + let bytes = match format_event_with_prefetched_blocks(state, event, &prefetched).await { 89 + Ok(b) => b, 90 + Err(e) => { 91 + warn!("Lag recovery format failed: {}", e); 92 + return Err(()); 93 + } 94 + }; 96 95 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 97 96 warn!("Lag recovery send failed: {}", e); 98 97 return Err(());

History

4 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
feat(tranquil-server): email config, tests, fmt
merge conflicts detected
expand
  • .config/nextest.toml:68
  • Cargo.lock:9
  • Cargo.toml:26
  • crates/tranquil-comms/Cargo.toml:10
  • crates/tranquil-config/src/lib.rs:5
  • example.toml:373
expand 0 comments
1 commit
expand
feat(tranquil-server): email config, tests, fmt
expand 0 comments
1 commit
expand
feat(tranquil-server): email config, tests, fmt
expand 0 comments
1 commit
expand
feat(tranquil-server): email config, tests, fmt
expand 0 comments