Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
227
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-server): email config, tests, fmt #10

open opened by oyster.cafe targeting main from feat/inline-emailing
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mkuoecysgl22
+543 -176
Diff #3
+158
crates/tranquil-comms/tests/email_smtp.rs
··· 1 + use std::time::Duration; 2 + 3 + use chrono::Utc; 4 + use lettre::message::Mailbox; 5 + use lettre::transport::smtp::AsyncSmtpTransport; 6 + use lettre::transport::smtp::extension::ClientId; 7 + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 8 + use tokio::net::{TcpListener, TcpStream}; 9 + use tranquil_comms::email::transport::SendMode; 10 + use tranquil_comms::email::{EmailSender, types::HeloName}; 11 + use tranquil_comms::{CommsChannel, CommsSender, CommsStatus, CommsType, QueuedComms, SendError}; 12 + use uuid::Uuid; 13 + 14 + fn fixture(recipient: &str, subject: &str, body: &str) -> QueuedComms { 15 + QueuedComms { 16 + id: Uuid::new_v4(), 17 + user_id: None, 18 + channel: CommsChannel::Email, 19 + comms_type: CommsType::Welcome, 20 + status: CommsStatus::Pending, 21 + recipient: recipient.to_string(), 22 + subject: Some(subject.to_string()), 23 + body: body.to_string(), 24 + metadata: None, 25 + attempts: 0, 26 + max_attempts: 3, 27 + last_error: None, 28 + created_at: Utc::now(), 29 + updated_at: Utc::now(), 30 + scheduled_for: Utc::now(), 31 + processed_at: None, 32 + } 33 + } 34 + 35 + fn build_smarthost_sender(host: &str, port: u16) -> EmailSender { 36 + build_smarthost_sender_with_total_timeout(host, port, Duration::from_secs(10)) 37 + } 38 + 39 + fn build_smarthost_sender_with_total_timeout( 40 + host: &str, 41 + port: u16, 42 + total_timeout: Duration, 43 + ) -> EmailSender { 44 + let from: Mailbox = "Tranquil Test <noreply@nel.pet>".parse().unwrap(); 45 + let helo = HeloName::parse("mta.nel.pet").unwrap(); 46 + let transport = AsyncSmtpTransport::<lettre::Tokio1Executor>::builder_dangerous(host) 47 + .port(port) 48 + .hello_name(ClientId::Domain(helo.into_inner())) 49 + .timeout(Some(Duration::from_secs(5))) 50 + .build(); 51 + EmailSender::new( 52 + from, 53 + SendMode::Smarthost { 54 + transport: Box::new(transport), 55 + total_timeout, 56 + }, 57 + None, 58 + ) 59 + } 60 + 61 + async fn drive_stub(stream: TcpStream, rcpt_response: &'static [u8]) -> std::io::Result<()> { 62 + let (read, mut write) = stream.into_split(); 63 + let mut reader = BufReader::new(read); 64 + write.write_all(b"220 stub ESMTP\r\n").await?; 65 + let mut line = String::new(); 66 + loop { 67 + line.clear(); 68 + let n = reader.read_line(&mut line).await?; 69 + if n == 0 { 70 + return Ok(()); 71 + } 72 + let upper = line.to_ascii_uppercase(); 73 + let response: &[u8] = match upper.split_whitespace().next() { 74 + Some("EHLO") | Some("HELO") => b"250-stub\r\n250 SIZE 10240000\r\n", 75 + Some("MAIL") => b"250 OK\r\n", 76 + Some("RCPT") => rcpt_response, 77 + Some("DATA") => b"354 end with .\r\n", 78 + Some("RSET") => b"250 OK\r\n", 79 + Some("QUIT") => b"221 bye\r\n", 80 + _ => b"500 unknown\r\n", 81 + }; 82 + write.write_all(response).await?; 83 + if upper.starts_with("QUIT") { 84 + return Ok(()); 85 + } 86 + } 87 + } 88 + 89 + async fn spawn_stub(rcpt_response: &'static [u8]) -> u16 { 90 + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 91 + let port = listener.local_addr().unwrap().port(); 92 + tokio::spawn(async move { 93 + let (stream, _) = listener.accept().await.unwrap(); 94 + let _ = drive_stub(stream, rcpt_response).await; 95 + }); 96 + port 97 + } 98 + 99 + #[tokio::test] 100 + async fn rcpt_550_classifies_as_smtp_permanent() { 101 + let port = spawn_stub(b"550 5.1.1 user unknown\r\n").await; 102 + let sender = build_smarthost_sender("127.0.0.1", port); 103 + let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await; 104 + match result { 105 + Err(SendError::SmtpPermanent(_)) => {} 106 + other => panic!("expected SmtpPermanent, got {other:?}"), 107 + } 108 + } 109 + 110 + #[tokio::test] 111 + async fn rcpt_421_classifies_as_smtp_transient() { 112 + let port = spawn_stub(b"421 4.7.0 try again later\r\n").await; 113 + let sender = build_smarthost_sender("127.0.0.1", port); 114 + let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await; 115 + match result { 116 + Err(SendError::SmtpTransient(_)) => {} 117 + other => panic!("expected SmtpTransient, got {other:?}"), 118 + } 119 + } 120 + 121 + #[tokio::test] 122 + async fn invalid_recipient_classifies_as_invalid_recipient() { 123 + let port = spawn_stub(b"250 OK\r\n").await; 124 + let sender = build_smarthost_sender("127.0.0.1", port); 125 + let result = sender.send(&fixture("not-an-address", "x", "x")).await; 126 + match result { 127 + Err(SendError::InvalidRecipient(_)) => {} 128 + other => panic!("expected InvalidRecipient, got {other:?}"), 129 + } 130 + } 131 + 132 + async fn spawn_silent_stub() -> u16 { 133 + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 134 + let port = listener.local_addr().unwrap().port(); 135 + tokio::spawn(async move { 136 + let (_stream, _) = listener.accept().await.unwrap(); 137 + std::future::pending::<()>().await; 138 + }); 139 + port 140 + } 141 + 142 + #[tokio::test] 143 + async fn smarthost_silent_relay_hits_total_timeout() { 144 + let port = spawn_silent_stub().await; 145 + let sender = 146 + build_smarthost_sender_with_total_timeout("127.0.0.1", port, Duration::from_millis(500)); 147 + let start = std::time::Instant::now(); 148 + let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await; 149 + let elapsed = start.elapsed(); 150 + match result { 151 + Err(SendError::Timeout) => {} 152 + other => panic!("expected Timeout, got {other:?}"), 153 + } 154 + assert!( 155 + elapsed < Duration::from_secs(2), 156 + "send returned in {elapsed:?}, expected close to 500ms total_timeout" 157 + ); 158 + }
+308 -78
crates/tranquil-config/src/lib.rs
··· 226 226 } 227 227 } 228 228 229 - // -- email smarthost -------------------------------------------------- 230 - match self.email.smarthost.tls.to_ascii_lowercase().as_str() { 231 - "implicit" | "starttls" => {} 232 - "none" => { 233 - if self.email.smarthost.password.is_some() { 234 - errors.push( 235 - "email.smarthost.tls = \"none\" with email.smarthost.password set \ 236 - would transmit credentials in plaintext; use \"starttls\" or \"implicit\"" 237 - .to_string(), 238 - ); 239 - } 240 - } 241 - other => errors.push(format!( 242 - "email.smarthost.tls must be \"implicit\", \"starttls\", or \"none\", got \"{other}\"" 243 - )), 244 - } 245 - 246 - let smarthost_host_set = self 247 - .email 248 - .smarthost 249 - .host 250 - .as_deref() 251 - .is_some_and(|h| !h.is_empty()); 252 - let username_set = self.email.smarthost.username.is_some(); 253 - let password_set = self.email.smarthost.password.is_some(); 254 - if !smarthost_host_set && (username_set || password_set) { 255 - errors.push( 256 - "email.smarthost.username or email.smarthost.password is set but \ 257 - email.smarthost.host is empty; credentials would be silently ignored" 258 - .to_string(), 259 - ); 260 - } 261 - if smarthost_host_set && username_set != password_set { 262 - errors.push( 263 - "email.smarthost.username and email.smarthost.password must both be set or \ 264 - both unset; otherwise authentication would silently degrade to anonymous" 265 - .to_string(), 266 - ); 267 - } 268 - 269 - if self.email.smarthost.command_timeout_secs == 0 { 270 - errors.push("email.smarthost.command_timeout_secs must be at least 1".to_string()); 271 - } 272 - if self.email.smarthost.total_timeout_secs == 0 { 273 - errors.push("email.smarthost.total_timeout_secs must be at least 1".to_string()); 274 - } 275 - if self.email.smarthost.pool_size == 0 { 276 - errors.push("email.smarthost.pool_size must be at least 1".to_string()); 277 - } 278 - 279 - if self.email.direct_mx.max_concurrent_sends == 0 { 280 - errors.push("email.direct_mx.max_concurrent_sends must be at least 1".to_string()); 281 - } 282 - if self.email.direct_mx.command_timeout_secs == 0 { 283 - errors.push("email.direct_mx.command_timeout_secs must be at least 1".to_string()); 284 - } 285 - if self.email.direct_mx.total_timeout_secs == 0 { 286 - errors.push("email.direct_mx.total_timeout_secs must be at least 1".to_string()); 287 - } 288 - 289 - let dkim_set = self.email.dkim.selector.is_some() 290 - || self.email.dkim.domain.is_some() 291 - || self.email.dkim.private_key_path.is_some(); 292 - if dkim_set { 293 - if self.email.dkim.selector.is_none() { 294 - errors 295 - .push("email.dkim.selector is required when any DKIM field is set".to_string()); 296 - } 297 - if self.email.dkim.domain.is_none() { 298 - errors.push("email.dkim.domain is required when any DKIM field is set".to_string()); 299 - } 300 - if self.email.dkim.private_key_path.is_none() { 301 - errors.push( 302 - "email.dkim.private_key_path is required when any DKIM field is set" 303 - .to_string(), 304 - ); 305 - } 306 - } 229 + // -- email ----------------------------------------------------------- 230 + self.email 231 + .validate(self.server.hostname_without_port(), &mut errors); 307 232 308 233 // -- telegram --------------------------------------------------------- 309 234 if self.telegram.bot_token.is_some() && self.telegram.webhook_secret.is_none() { ··· 864 789 pub dkim: DkimConfig, 865 790 } 866 791 792 + impl EmailConfig { 793 + pub fn validate(&self, server_hostname: &str, errors: &mut Vec<String>) { 794 + match self.smarthost.tls.to_ascii_lowercase().as_str() { 795 + "implicit" | "starttls" => {} 796 + "none" => { 797 + if self.smarthost.password.is_some() { 798 + errors.push( 799 + "email.smarthost.tls = \"none\" with email.smarthost.password set \ 800 + would transmit credentials in plaintext; use \"starttls\" or \"implicit\"" 801 + .to_string(), 802 + ); 803 + } 804 + } 805 + other => errors.push(format!( 806 + "email.smarthost.tls must be \"implicit\", \"starttls\", or \"none\", got \"{other}\"" 807 + )), 808 + } 809 + 810 + let smarthost_host_set = self 811 + .smarthost 812 + .host 813 + .as_deref() 814 + .is_some_and(|h| !h.is_empty()); 815 + let username_set = self.smarthost.username.is_some(); 816 + let password_set = self.smarthost.password.is_some(); 817 + if !smarthost_host_set && (username_set || password_set) { 818 + errors.push( 819 + "email.smarthost.username or email.smarthost.password is set but \ 820 + email.smarthost.host is empty; credentials would be silently ignored" 821 + .to_string(), 822 + ); 823 + } 824 + if smarthost_host_set && username_set != password_set { 825 + errors.push( 826 + "email.smarthost.username and email.smarthost.password must both be set or \ 827 + both unset; otherwise authentication would silently degrade to anonymous" 828 + .to_string(), 829 + ); 830 + } 831 + 832 + if self.smarthost.command_timeout_secs == 0 { 833 + errors.push("email.smarthost.command_timeout_secs must be at least 1".to_string()); 834 + } 835 + if self.smarthost.total_timeout_secs == 0 { 836 + errors.push("email.smarthost.total_timeout_secs must be at least 1".to_string()); 837 + } 838 + if self.smarthost.pool_size == 0 { 839 + errors.push("email.smarthost.pool_size must be at least 1".to_string()); 840 + } 841 + 842 + if self.direct_mx.max_concurrent_sends == 0 { 843 + errors.push("email.direct_mx.max_concurrent_sends must be at least 1".to_string()); 844 + } 845 + if self.direct_mx.command_timeout_secs == 0 { 846 + errors.push("email.direct_mx.command_timeout_secs must be at least 1".to_string()); 847 + } 848 + if self.direct_mx.total_timeout_secs == 0 { 849 + errors.push("email.direct_mx.total_timeout_secs must be at least 1".to_string()); 850 + } 851 + 852 + let dkim_set = self.dkim.selector.is_some() 853 + || self.dkim.domain.is_some() 854 + || self.dkim.private_key_path.is_some(); 855 + if dkim_set { 856 + if self.dkim.selector.is_none() { 857 + errors 858 + .push("email.dkim.selector is required when any DKIM field is set".to_string()); 859 + } 860 + if self.dkim.domain.is_none() { 861 + errors.push("email.dkim.domain is required when any DKIM field is set".to_string()); 862 + } 863 + if self.dkim.private_key_path.is_none() { 864 + errors.push( 865 + "email.dkim.private_key_path is required when any DKIM field is set" 866 + .to_string(), 867 + ); 868 + } 869 + } 870 + 871 + let Some(from_address) = self.from_address.as_deref().filter(|s| !s.is_empty()) else { 872 + return; 873 + }; 874 + 875 + if !looks_like_email_address(from_address) { 876 + errors.push(format!( 877 + "email.from_address {from_address:?} is not a valid email address" 878 + )); 879 + } 880 + if self.from_name.chars().any(|c| c.is_control()) { 881 + errors.push("email.from_name must not contain control characters".to_string()); 882 + } 883 + 884 + let helo_raw = self 885 + .helo_name 886 + .as_deref() 887 + .map(str::to_string) 888 + .unwrap_or_else(|| server_hostname.to_string()); 889 + if !is_non_whitespace_token(&helo_raw) { 890 + errors.push(format!( 891 + "email HELO name {helo_raw:?} must be non-empty and contain no whitespace" 892 + )); 893 + } 894 + 895 + if smarthost_host_set { 896 + let host = self.smarthost.host.as_deref().unwrap_or(""); 897 + if !is_non_whitespace_token(host) { 898 + errors.push(format!( 899 + "email.smarthost.host {host:?} must contain no whitespace" 900 + )); 901 + } 902 + if self.smarthost.port == 0 { 903 + errors.push("email.smarthost.port must be non-zero".to_string()); 904 + } 905 + if let Some(u) = self.smarthost.username.as_deref() 906 + && u.is_empty() 907 + { 908 + errors.push("email.smarthost.username must be non-empty".to_string()); 909 + } 910 + if let Some(p) = self.smarthost.password.as_deref() 911 + && p.is_empty() 912 + { 913 + errors.push("email.smarthost.password must be non-empty".to_string()); 914 + } 915 + } 916 + 917 + if let Some(selector) = self.dkim.selector.as_deref() 918 + && !is_valid_dkim_selector(selector) 919 + { 920 + errors.push(format!( 921 + "email.dkim.selector {selector:?} must be valid subdomain syntax" 922 + )); 923 + } 924 + if let Some(domain) = self.dkim.domain.as_deref() 925 + && !is_non_whitespace_token(domain) 926 + { 927 + errors.push(format!( 928 + "email.dkim.domain {domain:?} must be non-empty and contain no whitespace" 929 + )); 930 + } 931 + if let Some(key_path) = self.dkim.private_key_path.as_deref() 932 + && key_path.trim().is_empty() 933 + { 934 + errors.push("email.dkim.private_key_path must be non-empty".to_string()); 935 + } 936 + } 937 + } 938 + 939 + fn looks_like_email_address(s: &str) -> bool { 940 + let trimmed = s.trim(); 941 + if trimmed.is_empty() || trimmed.chars().any(char::is_whitespace) { 942 + return false; 943 + } 944 + let mut parts = trimmed.split('@'); 945 + let local = parts.next().unwrap_or(""); 946 + let domain = parts.next().unwrap_or(""); 947 + parts.next().is_none() && !local.is_empty() && !domain.is_empty() && domain.contains('.') 948 + } 949 + 950 + fn is_non_whitespace_token(s: &str) -> bool { 951 + let trimmed = s.trim(); 952 + !trimmed.is_empty() && !trimmed.chars().any(char::is_whitespace) 953 + } 954 + 955 + fn is_valid_dkim_selector(s: &str) -> bool { 956 + let trimmed = s.trim(); 957 + !trimmed.is_empty() 958 + && trimmed.split('.').all(|seg| { 959 + let starts_alnum = seg 960 + .chars() 961 + .next() 962 + .is_some_and(|c| c.is_ascii_alphanumeric()); 963 + let ends_alnum = seg 964 + .chars() 965 + .next_back() 966 + .is_some_and(|c| c.is_ascii_alphanumeric()); 967 + let body_ok = seg.chars().all(|c| c.is_ascii_alphanumeric() || c == '-'); 968 + starts_alnum && ends_alnum && body_ok 969 + }) 970 + } 971 + 867 972 #[derive(Debug, Config)] 868 973 pub struct SmarthostConfig { 869 974 /// SMTP relay host. When set, mail is delivered through this host ··· 1440 1545 result 1441 1546 ); 1442 1547 } 1548 + 1549 + #[test] 1550 + fn email_address_predicate_accepts_typical_addresses() { 1551 + assert!(looks_like_email_address("alice@nel.pet")); 1552 + assert!(looks_like_email_address("a.b+tag@example.co.uk")); 1553 + } 1554 + 1555 + #[test] 1556 + fn email_address_predicate_rejects_malformed() { 1557 + assert!(!looks_like_email_address("")); 1558 + assert!(!looks_like_email_address("no-at-sign")); 1559 + assert!(!looks_like_email_address("@nel.pet")); 1560 + assert!(!looks_like_email_address("alice@")); 1561 + assert!(!looks_like_email_address("alice@nel")); 1562 + assert!(!looks_like_email_address("a@b@c.com")); 1563 + assert!(!looks_like_email_address("alice @nel.pet")); 1564 + } 1565 + 1566 + #[test] 1567 + fn dkim_selector_predicate_matches_subdomain_syntax() { 1568 + assert!(is_valid_dkim_selector("default")); 1569 + assert!(is_valid_dkim_selector("s2024-q1")); 1570 + assert!(is_valid_dkim_selector("mailo-2024.nel.pet")); 1571 + assert!(!is_valid_dkim_selector("")); 1572 + assert!(!is_valid_dkim_selector("a..b")); 1573 + assert!(!is_valid_dkim_selector("-leading")); 1574 + assert!(!is_valid_dkim_selector("trailing-")); 1575 + assert!(!is_valid_dkim_selector("s_under")); 1576 + } 1577 + 1578 + #[test] 1579 + fn email_validate_disabled_when_from_address_unset() { 1580 + let cfg = email_config_for_test(EmailOverrides::default()); 1581 + let mut errors = Vec::new(); 1582 + cfg.validate("test.local", &mut errors); 1583 + assert!(errors.is_empty(), "expected no errors, got {errors:?}"); 1584 + } 1585 + 1586 + #[test] 1587 + fn email_validate_rejects_bad_from_address() { 1588 + let cfg = email_config_for_test(EmailOverrides { 1589 + from_address: Some("not-an-email"), 1590 + ..Default::default() 1591 + }); 1592 + let mut errors = Vec::new(); 1593 + cfg.validate("test.local", &mut errors); 1594 + assert!( 1595 + errors.iter().any(|e| e.contains("from_address")), 1596 + "expected from_address error, got {errors:?}" 1597 + ); 1598 + } 1599 + 1600 + #[test] 1601 + fn email_validate_rejects_smarthost_with_bad_credentials() { 1602 + let cfg = email_config_for_test(EmailOverrides { 1603 + from_address: Some("alice@nel.pet"), 1604 + smarthost_host: Some("smtp.nel.pet"), 1605 + smarthost_username: Some(""), 1606 + smarthost_password: Some("hunter2"), 1607 + ..Default::default() 1608 + }); 1609 + let mut errors = Vec::new(); 1610 + cfg.validate("test.local", &mut errors); 1611 + assert!( 1612 + errors.iter().any(|e| e.contains("smarthost.username")), 1613 + "expected smarthost.username error, got {errors:?}" 1614 + ); 1615 + } 1616 + 1617 + #[test] 1618 + fn email_validate_rejects_bad_dkim_selector() { 1619 + let cfg = email_config_for_test(EmailOverrides { 1620 + from_address: Some("alice@nel.pet"), 1621 + dkim_selector: Some("-bad"), 1622 + dkim_domain: Some("nel.pet"), 1623 + dkim_key_path: Some("/etc/dkim.key"), 1624 + ..Default::default() 1625 + }); 1626 + let mut errors = Vec::new(); 1627 + cfg.validate("test.local", &mut errors); 1628 + assert!( 1629 + errors.iter().any(|e| e.contains("dkim.selector")), 1630 + "expected dkim.selector error, got {errors:?}" 1631 + ); 1632 + } 1633 + 1634 + #[derive(Default)] 1635 + struct EmailOverrides { 1636 + from_address: Option<&'static str>, 1637 + smarthost_host: Option<&'static str>, 1638 + smarthost_username: Option<&'static str>, 1639 + smarthost_password: Option<&'static str>, 1640 + dkim_selector: Option<&'static str>, 1641 + dkim_domain: Option<&'static str>, 1642 + dkim_key_path: Option<&'static str>, 1643 + } 1644 + 1645 + fn email_config_for_test(o: EmailOverrides) -> EmailConfig { 1646 + EmailConfig { 1647 + from_address: o.from_address.map(str::to_string), 1648 + from_name: "Tranquil PDS".to_string(), 1649 + helo_name: None, 1650 + smarthost: SmarthostConfig { 1651 + host: o.smarthost_host.map(str::to_string), 1652 + port: 587, 1653 + username: o.smarthost_username.map(str::to_string), 1654 + password: o.smarthost_password.map(str::to_string), 1655 + tls: "starttls".to_string(), 1656 + pool_size: 4, 1657 + command_timeout_secs: 30, 1658 + total_timeout_secs: 60, 1659 + }, 1660 + direct_mx: DirectMxConfig { 1661 + command_timeout_secs: 30, 1662 + total_timeout_secs: 60, 1663 + max_concurrent_sends: 8, 1664 + require_tls: false, 1665 + }, 1666 + dkim: DkimConfig { 1667 + selector: o.dkim_selector.map(str::to_string), 1668 + domain: o.dkim_domain.map(str::to_string), 1669 + private_key_path: o.dkim_key_path.map(str::to_string), 1670 + }, 1671 + } 1672 + } 1443 1673 }
+2 -4
crates/tranquil-pds/src/repo_ops.rs
··· 526 526 527 527 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 528 528 529 - let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = ops 530 - .iter() 531 - .map(|op| (op.collection_rkey(), op)) 532 - .collect(); 529 + let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = 530 + ops.iter().map(|op| (op.collection_rkey(), op)).collect(); 533 531 534 532 let final_record_uris: HashSet<AtUri> = final_ops 535 533 .iter()
+2 -8
crates/tranquil-pds/tests/repo_batch.rs
··· 402 402 "{}/xrpc/com.atproto.repo.listRecords", 403 403 base_url().await 404 404 )) 405 - .query(&[ 406 - ("repo", did.as_str()), 407 - ("collection", "app.bsky.feed.post"), 408 - ]) 405 + .query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")]) 409 406 .send() 410 407 .await 411 408 .expect("Failed to list records"); ··· 484 481 "{}/xrpc/com.atproto.repo.listRecords", 485 482 base_url().await 486 483 )) 487 - .query(&[ 488 - ("repo", did.as_str()), 489 - ("collection", "app.bsky.feed.post"), 490 - ]) 484 + .query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")]) 491 485 .send() 492 486 .await 493 487 .expect("Failed to list records");
+17 -14
crates/tranquil-server/src/main.rs
··· 53 53 return ExitCode::FAILURE; 54 54 } 55 55 }; 56 - match config.validate(*ignore_secrets) { 57 - Ok(()) => { 58 - println!("Configuration is valid."); 59 - ExitCode::SUCCESS 60 - } 61 - Err(e) => { 62 - eprint!("{e}"); 63 - ExitCode::FAILURE 64 - } 56 + if let Err(e) = config.validate(*ignore_secrets) { 57 + eprint!("{e}"); 58 + return ExitCode::FAILURE; 65 59 } 60 + println!("Configuration is valid."); 61 + ExitCode::SUCCESS 66 62 } 67 63 }; 68 64 } ··· 141 137 142 138 let cfg = tranquil_config::get(); 143 139 144 - if let Some(email_sender) = EmailSender::from_config(cfg) { 145 - info!("Email comms enabled"); 146 - comms_service = comms_service.register_sender(email_sender); 147 - } else { 148 - warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 140 + match EmailSender::from_config(cfg) { 141 + Ok(Some(email_sender)) => { 142 + info!("Email comms enabled"); 143 + comms_service = comms_service.register_sender(email_sender); 144 + } 145 + Ok(None) => { 146 + warn!("Email comms disabled (MAIL_FROM_ADDRESS unset)"); 147 + } 148 + Err(e) => { 149 + error!(error = %e, "Email configuration invalid"); 150 + return Err(e.into()); 151 + } 149 152 } 150 153 151 154 if let Some(discord_sender) = DiscordSender::from_config(cfg) {
+7 -10
crates/tranquil-store/src/blockstore/store.rs
··· 806 806 impl StorageIO for EioOnReadAtRange { 807 807 fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> { 808 808 let fd = self.inner.open(path, opts)?; 809 - self.fd_paths 810 - .lock() 811 - .unwrap() 812 - .insert(fd, path.to_path_buf()); 809 + self.fd_paths.lock().unwrap().insert(fd, path.to_path_buf()); 813 810 Ok(fd) 814 811 } 815 812 ··· 887 884 let cid_a = [0xAAu8; CID_SIZE]; 888 885 let data_a = vec![1u8; 64]; 889 886 let block_a_offset = BlockOffset::new(BLOCK_HEADER_SIZE as u64); 890 - let len_a = 891 - encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap(); 887 + let len_a = encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap(); 892 888 893 889 let block_b_offset_raw = BLOCK_HEADER_SIZE as u64 + len_a; 894 890 let block_b_offset = BlockOffset::new(block_b_offset_raw); ··· 982 978 #[test] 983 979 fn retry_with_backoff_passes_attempt_index_to_op() { 984 980 let observed = std::sync::Mutex::new(Vec::<u8>::new()); 985 - let _result: Result<(), RepoError> = retry_with_backoff(instant_policy(4), &mut |attempt| { 986 - observed.lock().unwrap().push(attempt); 987 - Err(RepoError::storage(io::Error::other("EIO"))) 988 - }); 981 + let _result: Result<(), RepoError> = 982 + retry_with_backoff(instant_policy(4), &mut |attempt| { 983 + observed.lock().unwrap().push(attempt); 984 + Err(RepoError::storage(io::Error::other("EIO"))) 985 + }); 989 986 assert_eq!(*observed.lock().unwrap(), vec![0, 1, 2, 3]); 990 987 } 991 988 }
+1 -4
crates/tranquil-store/src/gauntlet/farm.rs
··· 157 157 158 158 #[test] 159 159 fn scratch_for_thread_falls_back_to_root_zero_outside_pool() { 160 - let roots = vec![ 161 - PathBuf::from("/scratch/a"), 162 - PathBuf::from("/scratch/b"), 163 - ]; 160 + let roots = vec![PathBuf::from("/scratch/a"), PathBuf::from("/scratch/b")]; 164 161 assert_eq!( 165 162 scratch_for_thread(&roots, None), 166 163 Some(PathBuf::from("/scratch/a"))
+1 -1
crates/tranquil-store/src/lib.rs
··· 28 28 }; 29 29 #[cfg(any(test, feature = "test-harness"))] 30 30 pub use sim::{ 31 - FaultConfig, LatencyNs, OpRecord, Probability, PristineGuard, SimulatedIO, SyncReorderWindow, 31 + FaultConfig, LatencyNs, OpRecord, PristineGuard, Probability, SimulatedIO, SyncReorderWindow, 32 32 sim_proptest_cases, sim_seed_count, sim_seed_range, sim_single_seed, 33 33 }; 34 34
+25 -30
crates/tranquil-store/src/sim.rs
··· 1 1 use std::collections::{HashMap, HashSet, VecDeque}; 2 2 use std::io; 3 3 use std::path::{Path, PathBuf}; 4 + use std::sync::Arc; 4 5 use std::sync::Mutex; 5 6 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 6 - use std::sync::Arc; 7 7 use std::time::Duration; 8 8 9 9 use crate::io::{FileId, OpenOptions, StorageIO}; ··· 561 561 return Err(io::Error::other("simulated EIO on read")); 562 562 } 563 563 564 - let read_offset = 565 - if state.should_fault(seed, fault.misdirected_read_probability) { 566 - let drift_sectors = state.next_random_usize(seed, 8) + 1; 567 - let drift = (drift_sectors * SECTOR_BYTES) as u64; 568 - if state.next_random(seed) < 0.5 { 569 - offset.saturating_sub(drift) 570 - } else { 571 - offset.saturating_add(drift) 572 - } 564 + let read_offset = if state.should_fault(seed, fault.misdirected_read_probability) { 565 + let drift_sectors = state.next_random_usize(seed, 8) + 1; 566 + let drift = (drift_sectors * SECTOR_BYTES) as u64; 567 + if state.next_random(seed) < 0.5 { 568 + offset.saturating_sub(drift) 573 569 } else { 574 - offset 575 - }; 570 + offset.saturating_add(drift) 571 + } 572 + } else { 573 + offset 574 + }; 576 575 577 576 let storage = state.storage.get(&sid).unwrap(); 578 577 ··· 620 619 return Err(io::Error::other("simulated EIO on write")); 621 620 } 622 621 623 - let torn_len = 624 - if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) { 625 - let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); 626 - let page_end = page_base + TORN_PAGE_BYTES; 627 - let cap = page_end.saturating_sub(offset as usize).min(buf.len()); 628 - let max_sectors = cap / SECTOR_BYTES; 629 - (max_sectors >= 2).then(|| { 630 - let n = state.next_random_usize(seed, max_sectors - 1) + 1; 631 - n * SECTOR_BYTES 632 - }) 633 - } else { 634 - None 635 - }; 622 + let torn_len = if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) { 623 + let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); 624 + let page_end = page_base + TORN_PAGE_BYTES; 625 + let cap = page_end.saturating_sub(offset as usize).min(buf.len()); 626 + let max_sectors = cap / SECTOR_BYTES; 627 + (max_sectors >= 2).then(|| { 628 + let n = state.next_random_usize(seed, max_sectors - 1) + 1; 629 + n * SECTOR_BYTES 630 + }) 631 + } else { 632 + None 633 + }; 636 634 637 635 let actual_len = match torn_len { 638 636 Some(n) => n, 639 - None if buf.len() > 1 640 - && state.should_fault(seed, fault.partial_write_probability) => 641 - { 637 + None if buf.len() > 1 && state.should_fault(seed, fault.partial_write_probability) => { 642 638 let partial = state.next_random_usize(seed, buf.len()); 643 639 partial.max(1) 644 640 } ··· 840 836 } 841 837 842 838 let dir_path = path.to_path_buf(); 843 - let actually_persisted = 844 - !state.should_fault(seed, fault.dir_sync_failure_probability); 839 + let actually_persisted = !state.should_fault(seed, fault.dir_sync_failure_probability); 845 840 846 841 if actually_persisted { 847 842 state.dirs_durable.insert(dir_path.clone());
+1 -1
crates/tranquil-store/tests/gauntlet_smoke.rs
··· 554 554 std::fs::create_dir_all(&root_b).expect("mkdir b"); 555 555 let roots = vec![root_a.clone(), root_b.clone()]; 556 556 let reports = farm::run_many_timed_with_scratch_roots( 557 - |seed| fast_sanity_config(seed), 557 + fast_sanity_config, 558 558 &roots, 559 559 (0..2).map(Seed), 560 560 );
+10 -17
crates/tranquil-store/tests/sim_blockstore.rs
··· 717 717 718 718 { 719 719 let s = Arc::clone(&sim); 720 - let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io( 721 - config.clone(), 722 - move || Arc::clone(&s), 723 - ) 724 - .unwrap(); 720 + let store = 721 + TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config.clone(), move || { 722 + Arc::clone(&s) 723 + }) 724 + .unwrap(); 725 725 store 726 726 .put_blocks_blocking(vec![(cid, data.clone())]) 727 727 .unwrap(); ··· 730 730 sim.crash(); 731 731 732 732 let s = Arc::clone(&sim); 733 - let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || { 734 - Arc::clone(&s) 735 - }) 736 - .unwrap(); 733 + let store = 734 + TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || Arc::clone(&s)) 735 + .unwrap(); 737 736 738 737 match store.get_block_sync(&cid) { 739 - Ok(Some(d)) => assert_eq!( 740 - &d[..], 741 - &data[..], 742 - "block content mismatch after crash" 743 - ), 738 + Ok(Some(d)) => assert_eq!(&d[..], &data[..], "block content mismatch after crash"), 744 739 Ok(None) => panic!( 745 740 "durability bug: put_blocks_blocking returned Ok but block missing after crash" 746 741 ), 747 - Err(e) => panic!( 748 - "durability bug: block read failed after crash: {e}" 749 - ), 742 + Err(e) => panic!("durability bug: block read failed after crash: {e}"), 750 743 } 751 744 }); 752 745 }
+4 -1
crates/tranquil-store/tests/sim_eventlog.rs
··· 1157 1157 "sync must ack only events 1..=2 with corrupt event 3" 1158 1158 ); 1159 1159 assert_eq!(result.flushed_events.len(), 2); 1160 - assert!(writer.is_poisoned(), "writer must be poisoned after partial sync"); 1160 + assert!( 1161 + writer.is_poisoned(), 1162 + "writer must be poisoned after partial sync" 1163 + ); 1161 1164 1162 1165 let append_after_poison = writer.append( 1163 1166 DidHash::from_did("did:plc:after"),
+7 -8
crates/tranquil-sync/src/subscribe_repos.rs
··· 85 85 }; 86 86 for event in events { 87 87 *last_seen = event.seq; 88 - let bytes = 89 - match format_event_with_prefetched_blocks(state, event, &prefetched).await { 90 - Ok(b) => b, 91 - Err(e) => { 92 - warn!("Lag recovery format failed: {}", e); 93 - return Err(()); 94 - } 95 - }; 88 + let bytes = match format_event_with_prefetched_blocks(state, event, &prefetched).await { 89 + Ok(b) => b, 90 + Err(e) => { 91 + warn!("Lag recovery format failed: {}", e); 92 + return Err(()); 93 + } 94 + }; 96 95 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 97 96 warn!("Lag recovery send failed: {}", e); 98 97 return Err(());

History

4 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
feat(tranquil-server): email config, tests, fmt
merge conflicts detected
expand
  • .config/nextest.toml:68
  • Cargo.lock:9
  • Cargo.toml:26
  • crates/tranquil-comms/Cargo.toml:10
  • crates/tranquil-config/src/lib.rs:5
  • example.toml:373
expand 0 comments
1 commit
expand
feat(tranquil-server): email config, tests, fmt
expand 0 comments
1 commit
expand
feat(tranquil-server): email config, tests, fmt
expand 0 comments
1 commit
expand
feat(tranquil-server): email config, tests, fmt
expand 0 comments