Lewis: May this revision serve well! lu5a@proton.me
+543
-176
Diff
round #3
+158
crates/tranquil-comms/tests/email_smtp.rs
+158
crates/tranquil-comms/tests/email_smtp.rs
···
1
+
use std::time::Duration;
2
+
3
+
use chrono::Utc;
4
+
use lettre::message::Mailbox;
5
+
use lettre::transport::smtp::AsyncSmtpTransport;
6
+
use lettre::transport::smtp::extension::ClientId;
7
+
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8
+
use tokio::net::{TcpListener, TcpStream};
9
+
use tranquil_comms::email::transport::SendMode;
10
+
use tranquil_comms::email::{EmailSender, types::HeloName};
11
+
use tranquil_comms::{CommsChannel, CommsSender, CommsStatus, CommsType, QueuedComms, SendError};
12
+
use uuid::Uuid;
13
+
14
+
fn fixture(recipient: &str, subject: &str, body: &str) -> QueuedComms {
15
+
QueuedComms {
16
+
id: Uuid::new_v4(),
17
+
user_id: None,
18
+
channel: CommsChannel::Email,
19
+
comms_type: CommsType::Welcome,
20
+
status: CommsStatus::Pending,
21
+
recipient: recipient.to_string(),
22
+
subject: Some(subject.to_string()),
23
+
body: body.to_string(),
24
+
metadata: None,
25
+
attempts: 0,
26
+
max_attempts: 3,
27
+
last_error: None,
28
+
created_at: Utc::now(),
29
+
updated_at: Utc::now(),
30
+
scheduled_for: Utc::now(),
31
+
processed_at: None,
32
+
}
33
+
}
34
+
35
+
fn build_smarthost_sender(host: &str, port: u16) -> EmailSender {
36
+
build_smarthost_sender_with_total_timeout(host, port, Duration::from_secs(10))
37
+
}
38
+
39
+
fn build_smarthost_sender_with_total_timeout(
40
+
host: &str,
41
+
port: u16,
42
+
total_timeout: Duration,
43
+
) -> EmailSender {
44
+
let from: Mailbox = "Tranquil Test <noreply@nel.pet>".parse().unwrap();
45
+
let helo = HeloName::parse("mta.nel.pet").unwrap();
46
+
let transport = AsyncSmtpTransport::<lettre::Tokio1Executor>::builder_dangerous(host)
47
+
.port(port)
48
+
.hello_name(ClientId::Domain(helo.into_inner()))
49
+
.timeout(Some(Duration::from_secs(5)))
50
+
.build();
51
+
EmailSender::new(
52
+
from,
53
+
SendMode::Smarthost {
54
+
transport: Box::new(transport),
55
+
total_timeout,
56
+
},
57
+
None,
58
+
)
59
+
}
60
+
61
+
async fn drive_stub(stream: TcpStream, rcpt_response: &'static [u8]) -> std::io::Result<()> {
62
+
let (read, mut write) = stream.into_split();
63
+
let mut reader = BufReader::new(read);
64
+
write.write_all(b"220 stub ESMTP\r\n").await?;
65
+
let mut line = String::new();
66
+
loop {
67
+
line.clear();
68
+
let n = reader.read_line(&mut line).await?;
69
+
if n == 0 {
70
+
return Ok(());
71
+
}
72
+
let upper = line.to_ascii_uppercase();
73
+
let response: &[u8] = match upper.split_whitespace().next() {
74
+
Some("EHLO") | Some("HELO") => b"250-stub\r\n250 SIZE 10240000\r\n",
75
+
Some("MAIL") => b"250 OK\r\n",
76
+
Some("RCPT") => rcpt_response,
77
+
Some("DATA") => b"354 end with .\r\n",
78
+
Some("RSET") => b"250 OK\r\n",
79
+
Some("QUIT") => b"221 bye\r\n",
80
+
_ => b"500 unknown\r\n",
81
+
};
82
+
write.write_all(response).await?;
83
+
if upper.starts_with("QUIT") {
84
+
return Ok(());
85
+
}
86
+
}
87
+
}
88
+
89
+
async fn spawn_stub(rcpt_response: &'static [u8]) -> u16 {
90
+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
91
+
let port = listener.local_addr().unwrap().port();
92
+
tokio::spawn(async move {
93
+
let (stream, _) = listener.accept().await.unwrap();
94
+
let _ = drive_stub(stream, rcpt_response).await;
95
+
});
96
+
port
97
+
}
98
+
99
+
#[tokio::test]
100
+
async fn rcpt_550_classifies_as_smtp_permanent() {
101
+
let port = spawn_stub(b"550 5.1.1 user unknown\r\n").await;
102
+
let sender = build_smarthost_sender("127.0.0.1", port);
103
+
let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await;
104
+
match result {
105
+
Err(SendError::SmtpPermanent(_)) => {}
106
+
other => panic!("expected SmtpPermanent, got {other:?}"),
107
+
}
108
+
}
109
+
110
+
#[tokio::test]
111
+
async fn rcpt_421_classifies_as_smtp_transient() {
112
+
let port = spawn_stub(b"421 4.7.0 try again later\r\n").await;
113
+
let sender = build_smarthost_sender("127.0.0.1", port);
114
+
let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await;
115
+
match result {
116
+
Err(SendError::SmtpTransient(_)) => {}
117
+
other => panic!("expected SmtpTransient, got {other:?}"),
118
+
}
119
+
}
120
+
121
+
#[tokio::test]
122
+
async fn invalid_recipient_classifies_as_invalid_recipient() {
123
+
let port = spawn_stub(b"250 OK\r\n").await;
124
+
let sender = build_smarthost_sender("127.0.0.1", port);
125
+
let result = sender.send(&fixture("not-an-address", "x", "x")).await;
126
+
match result {
127
+
Err(SendError::InvalidRecipient(_)) => {}
128
+
other => panic!("expected InvalidRecipient, got {other:?}"),
129
+
}
130
+
}
131
+
132
+
async fn spawn_silent_stub() -> u16 {
133
+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
134
+
let port = listener.local_addr().unwrap().port();
135
+
tokio::spawn(async move {
136
+
let (_stream, _) = listener.accept().await.unwrap();
137
+
std::future::pending::<()>().await;
138
+
});
139
+
port
140
+
}
141
+
142
+
#[tokio::test]
143
+
async fn smarthost_silent_relay_hits_total_timeout() {
144
+
let port = spawn_silent_stub().await;
145
+
let sender =
146
+
build_smarthost_sender_with_total_timeout("127.0.0.1", port, Duration::from_millis(500));
147
+
let start = std::time::Instant::now();
148
+
let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await;
149
+
let elapsed = start.elapsed();
150
+
match result {
151
+
Err(SendError::Timeout) => {}
152
+
other => panic!("expected Timeout, got {other:?}"),
153
+
}
154
+
assert!(
155
+
elapsed < Duration::from_secs(2),
156
+
"send returned in {elapsed:?}, expected close to 500ms total_timeout"
157
+
);
158
+
}
+308
-78
crates/tranquil-config/src/lib.rs
+308
-78
crates/tranquil-config/src/lib.rs
···
226
226
}
227
227
}
228
228
229
-
// -- email smarthost --------------------------------------------------
230
-
match self.email.smarthost.tls.to_ascii_lowercase().as_str() {
231
-
"implicit" | "starttls" => {}
232
-
"none" => {
233
-
if self.email.smarthost.password.is_some() {
234
-
errors.push(
235
-
"email.smarthost.tls = \"none\" with email.smarthost.password set \
236
-
would transmit credentials in plaintext; use \"starttls\" or \"implicit\""
237
-
.to_string(),
238
-
);
239
-
}
240
-
}
241
-
other => errors.push(format!(
242
-
"email.smarthost.tls must be \"implicit\", \"starttls\", or \"none\", got \"{other}\""
243
-
)),
244
-
}
245
-
246
-
let smarthost_host_set = self
247
-
.email
248
-
.smarthost
249
-
.host
250
-
.as_deref()
251
-
.is_some_and(|h| !h.is_empty());
252
-
let username_set = self.email.smarthost.username.is_some();
253
-
let password_set = self.email.smarthost.password.is_some();
254
-
if !smarthost_host_set && (username_set || password_set) {
255
-
errors.push(
256
-
"email.smarthost.username or email.smarthost.password is set but \
257
-
email.smarthost.host is empty; credentials would be silently ignored"
258
-
.to_string(),
259
-
);
260
-
}
261
-
if smarthost_host_set && username_set != password_set {
262
-
errors.push(
263
-
"email.smarthost.username and email.smarthost.password must both be set or \
264
-
both unset; otherwise authentication would silently degrade to anonymous"
265
-
.to_string(),
266
-
);
267
-
}
268
-
269
-
if self.email.smarthost.command_timeout_secs == 0 {
270
-
errors.push("email.smarthost.command_timeout_secs must be at least 1".to_string());
271
-
}
272
-
if self.email.smarthost.total_timeout_secs == 0 {
273
-
errors.push("email.smarthost.total_timeout_secs must be at least 1".to_string());
274
-
}
275
-
if self.email.smarthost.pool_size == 0 {
276
-
errors.push("email.smarthost.pool_size must be at least 1".to_string());
277
-
}
278
-
279
-
if self.email.direct_mx.max_concurrent_sends == 0 {
280
-
errors.push("email.direct_mx.max_concurrent_sends must be at least 1".to_string());
281
-
}
282
-
if self.email.direct_mx.command_timeout_secs == 0 {
283
-
errors.push("email.direct_mx.command_timeout_secs must be at least 1".to_string());
284
-
}
285
-
if self.email.direct_mx.total_timeout_secs == 0 {
286
-
errors.push("email.direct_mx.total_timeout_secs must be at least 1".to_string());
287
-
}
288
-
289
-
let dkim_set = self.email.dkim.selector.is_some()
290
-
|| self.email.dkim.domain.is_some()
291
-
|| self.email.dkim.private_key_path.is_some();
292
-
if dkim_set {
293
-
if self.email.dkim.selector.is_none() {
294
-
errors
295
-
.push("email.dkim.selector is required when any DKIM field is set".to_string());
296
-
}
297
-
if self.email.dkim.domain.is_none() {
298
-
errors.push("email.dkim.domain is required when any DKIM field is set".to_string());
299
-
}
300
-
if self.email.dkim.private_key_path.is_none() {
301
-
errors.push(
302
-
"email.dkim.private_key_path is required when any DKIM field is set"
303
-
.to_string(),
304
-
);
305
-
}
306
-
}
229
+
// -- email -----------------------------------------------------------
230
+
self.email
231
+
.validate(self.server.hostname_without_port(), &mut errors);
307
232
308
233
// -- telegram ---------------------------------------------------------
309
234
if self.telegram.bot_token.is_some() && self.telegram.webhook_secret.is_none() {
···
864
789
pub dkim: DkimConfig,
865
790
}
866
791
792
+
impl EmailConfig {
793
+
pub fn validate(&self, server_hostname: &str, errors: &mut Vec<String>) {
794
+
match self.smarthost.tls.to_ascii_lowercase().as_str() {
795
+
"implicit" | "starttls" => {}
796
+
"none" => {
797
+
if self.smarthost.password.is_some() {
798
+
errors.push(
799
+
"email.smarthost.tls = \"none\" with email.smarthost.password set \
800
+
would transmit credentials in plaintext; use \"starttls\" or \"implicit\""
801
+
.to_string(),
802
+
);
803
+
}
804
+
}
805
+
other => errors.push(format!(
806
+
"email.smarthost.tls must be \"implicit\", \"starttls\", or \"none\", got \"{other}\""
807
+
)),
808
+
}
809
+
810
+
let smarthost_host_set = self
811
+
.smarthost
812
+
.host
813
+
.as_deref()
814
+
.is_some_and(|h| !h.is_empty());
815
+
let username_set = self.smarthost.username.is_some();
816
+
let password_set = self.smarthost.password.is_some();
817
+
if !smarthost_host_set && (username_set || password_set) {
818
+
errors.push(
819
+
"email.smarthost.username or email.smarthost.password is set but \
820
+
email.smarthost.host is empty; credentials would be silently ignored"
821
+
.to_string(),
822
+
);
823
+
}
824
+
if smarthost_host_set && username_set != password_set {
825
+
errors.push(
826
+
"email.smarthost.username and email.smarthost.password must both be set or \
827
+
both unset; otherwise authentication would silently degrade to anonymous"
828
+
.to_string(),
829
+
);
830
+
}
831
+
832
+
if self.smarthost.command_timeout_secs == 0 {
833
+
errors.push("email.smarthost.command_timeout_secs must be at least 1".to_string());
834
+
}
835
+
if self.smarthost.total_timeout_secs == 0 {
836
+
errors.push("email.smarthost.total_timeout_secs must be at least 1".to_string());
837
+
}
838
+
if self.smarthost.pool_size == 0 {
839
+
errors.push("email.smarthost.pool_size must be at least 1".to_string());
840
+
}
841
+
842
+
if self.direct_mx.max_concurrent_sends == 0 {
843
+
errors.push("email.direct_mx.max_concurrent_sends must be at least 1".to_string());
844
+
}
845
+
if self.direct_mx.command_timeout_secs == 0 {
846
+
errors.push("email.direct_mx.command_timeout_secs must be at least 1".to_string());
847
+
}
848
+
if self.direct_mx.total_timeout_secs == 0 {
849
+
errors.push("email.direct_mx.total_timeout_secs must be at least 1".to_string());
850
+
}
851
+
852
+
let dkim_set = self.dkim.selector.is_some()
853
+
|| self.dkim.domain.is_some()
854
+
|| self.dkim.private_key_path.is_some();
855
+
if dkim_set {
856
+
if self.dkim.selector.is_none() {
857
+
errors
858
+
.push("email.dkim.selector is required when any DKIM field is set".to_string());
859
+
}
860
+
if self.dkim.domain.is_none() {
861
+
errors.push("email.dkim.domain is required when any DKIM field is set".to_string());
862
+
}
863
+
if self.dkim.private_key_path.is_none() {
864
+
errors.push(
865
+
"email.dkim.private_key_path is required when any DKIM field is set"
866
+
.to_string(),
867
+
);
868
+
}
869
+
}
870
+
871
+
let Some(from_address) = self.from_address.as_deref().filter(|s| !s.is_empty()) else {
872
+
return;
873
+
};
874
+
875
+
if !looks_like_email_address(from_address) {
876
+
errors.push(format!(
877
+
"email.from_address {from_address:?} is not a valid email address"
878
+
));
879
+
}
880
+
if self.from_name.chars().any(|c| c.is_control()) {
881
+
errors.push("email.from_name must not contain control characters".to_string());
882
+
}
883
+
884
+
let helo_raw = self
885
+
.helo_name
886
+
.as_deref()
887
+
.map(str::to_string)
888
+
.unwrap_or_else(|| server_hostname.to_string());
889
+
if !is_non_whitespace_token(&helo_raw) {
890
+
errors.push(format!(
891
+
"email HELO name {helo_raw:?} must be non-empty and contain no whitespace"
892
+
));
893
+
}
894
+
895
+
if smarthost_host_set {
896
+
let host = self.smarthost.host.as_deref().unwrap_or("");
897
+
if !is_non_whitespace_token(host) {
898
+
errors.push(format!(
899
+
"email.smarthost.host {host:?} must contain no whitespace"
900
+
));
901
+
}
902
+
if self.smarthost.port == 0 {
903
+
errors.push("email.smarthost.port must be non-zero".to_string());
904
+
}
905
+
if let Some(u) = self.smarthost.username.as_deref()
906
+
&& u.is_empty()
907
+
{
908
+
errors.push("email.smarthost.username must be non-empty".to_string());
909
+
}
910
+
if let Some(p) = self.smarthost.password.as_deref()
911
+
&& p.is_empty()
912
+
{
913
+
errors.push("email.smarthost.password must be non-empty".to_string());
914
+
}
915
+
}
916
+
917
+
if let Some(selector) = self.dkim.selector.as_deref()
918
+
&& !is_valid_dkim_selector(selector)
919
+
{
920
+
errors.push(format!(
921
+
"email.dkim.selector {selector:?} must be valid subdomain syntax"
922
+
));
923
+
}
924
+
if let Some(domain) = self.dkim.domain.as_deref()
925
+
&& !is_non_whitespace_token(domain)
926
+
{
927
+
errors.push(format!(
928
+
"email.dkim.domain {domain:?} must be non-empty and contain no whitespace"
929
+
));
930
+
}
931
+
if let Some(key_path) = self.dkim.private_key_path.as_deref()
932
+
&& key_path.trim().is_empty()
933
+
{
934
+
errors.push("email.dkim.private_key_path must be non-empty".to_string());
935
+
}
936
+
}
937
+
}
938
+
939
+
fn looks_like_email_address(s: &str) -> bool {
940
+
let trimmed = s.trim();
941
+
if trimmed.is_empty() || trimmed.chars().any(char::is_whitespace) {
942
+
return false;
943
+
}
944
+
let mut parts = trimmed.split('@');
945
+
let local = parts.next().unwrap_or("");
946
+
let domain = parts.next().unwrap_or("");
947
+
parts.next().is_none() && !local.is_empty() && !domain.is_empty() && domain.contains('.')
948
+
}
949
+
950
+
fn is_non_whitespace_token(s: &str) -> bool {
951
+
let trimmed = s.trim();
952
+
!trimmed.is_empty() && !trimmed.chars().any(char::is_whitespace)
953
+
}
954
+
955
+
fn is_valid_dkim_selector(s: &str) -> bool {
956
+
let trimmed = s.trim();
957
+
!trimmed.is_empty()
958
+
&& trimmed.split('.').all(|seg| {
959
+
let starts_alnum = seg
960
+
.chars()
961
+
.next()
962
+
.is_some_and(|c| c.is_ascii_alphanumeric());
963
+
let ends_alnum = seg
964
+
.chars()
965
+
.next_back()
966
+
.is_some_and(|c| c.is_ascii_alphanumeric());
967
+
let body_ok = seg.chars().all(|c| c.is_ascii_alphanumeric() || c == '-');
968
+
starts_alnum && ends_alnum && body_ok
969
+
})
970
+
}
971
+
867
972
#[derive(Debug, Config)]
868
973
pub struct SmarthostConfig {
869
974
/// SMTP relay host. When set, mail is delivered through this host
···
1440
1545
result
1441
1546
);
1442
1547
}
1548
+
1549
+
#[test]
1550
+
fn email_address_predicate_accepts_typical_addresses() {
1551
+
assert!(looks_like_email_address("alice@nel.pet"));
1552
+
assert!(looks_like_email_address("a.b+tag@example.co.uk"));
1553
+
}
1554
+
1555
+
#[test]
1556
+
fn email_address_predicate_rejects_malformed() {
1557
+
assert!(!looks_like_email_address(""));
1558
+
assert!(!looks_like_email_address("no-at-sign"));
1559
+
assert!(!looks_like_email_address("@nel.pet"));
1560
+
assert!(!looks_like_email_address("alice@"));
1561
+
assert!(!looks_like_email_address("alice@nel"));
1562
+
assert!(!looks_like_email_address("a@b@c.com"));
1563
+
assert!(!looks_like_email_address("alice @nel.pet"));
1564
+
}
1565
+
1566
+
#[test]
1567
+
fn dkim_selector_predicate_matches_subdomain_syntax() {
1568
+
assert!(is_valid_dkim_selector("default"));
1569
+
assert!(is_valid_dkim_selector("s2024-q1"));
1570
+
assert!(is_valid_dkim_selector("mailo-2024.nel.pet"));
1571
+
assert!(!is_valid_dkim_selector(""));
1572
+
assert!(!is_valid_dkim_selector("a..b"));
1573
+
assert!(!is_valid_dkim_selector("-leading"));
1574
+
assert!(!is_valid_dkim_selector("trailing-"));
1575
+
assert!(!is_valid_dkim_selector("s_under"));
1576
+
}
1577
+
1578
+
#[test]
1579
+
fn email_validate_disabled_when_from_address_unset() {
1580
+
let cfg = email_config_for_test(EmailOverrides::default());
1581
+
let mut errors = Vec::new();
1582
+
cfg.validate("test.local", &mut errors);
1583
+
assert!(errors.is_empty(), "expected no errors, got {errors:?}");
1584
+
}
1585
+
1586
+
#[test]
1587
+
fn email_validate_rejects_bad_from_address() {
1588
+
let cfg = email_config_for_test(EmailOverrides {
1589
+
from_address: Some("not-an-email"),
1590
+
..Default::default()
1591
+
});
1592
+
let mut errors = Vec::new();
1593
+
cfg.validate("test.local", &mut errors);
1594
+
assert!(
1595
+
errors.iter().any(|e| e.contains("from_address")),
1596
+
"expected from_address error, got {errors:?}"
1597
+
);
1598
+
}
1599
+
1600
+
#[test]
1601
+
fn email_validate_rejects_smarthost_with_bad_credentials() {
1602
+
let cfg = email_config_for_test(EmailOverrides {
1603
+
from_address: Some("alice@nel.pet"),
1604
+
smarthost_host: Some("smtp.nel.pet"),
1605
+
smarthost_username: Some(""),
1606
+
smarthost_password: Some("hunter2"),
1607
+
..Default::default()
1608
+
});
1609
+
let mut errors = Vec::new();
1610
+
cfg.validate("test.local", &mut errors);
1611
+
assert!(
1612
+
errors.iter().any(|e| e.contains("smarthost.username")),
1613
+
"expected smarthost.username error, got {errors:?}"
1614
+
);
1615
+
}
1616
+
1617
+
#[test]
1618
+
fn email_validate_rejects_bad_dkim_selector() {
1619
+
let cfg = email_config_for_test(EmailOverrides {
1620
+
from_address: Some("alice@nel.pet"),
1621
+
dkim_selector: Some("-bad"),
1622
+
dkim_domain: Some("nel.pet"),
1623
+
dkim_key_path: Some("/etc/dkim.key"),
1624
+
..Default::default()
1625
+
});
1626
+
let mut errors = Vec::new();
1627
+
cfg.validate("test.local", &mut errors);
1628
+
assert!(
1629
+
errors.iter().any(|e| e.contains("dkim.selector")),
1630
+
"expected dkim.selector error, got {errors:?}"
1631
+
);
1632
+
}
1633
+
1634
+
#[derive(Default)]
1635
+
struct EmailOverrides {
1636
+
from_address: Option<&'static str>,
1637
+
smarthost_host: Option<&'static str>,
1638
+
smarthost_username: Option<&'static str>,
1639
+
smarthost_password: Option<&'static str>,
1640
+
dkim_selector: Option<&'static str>,
1641
+
dkim_domain: Option<&'static str>,
1642
+
dkim_key_path: Option<&'static str>,
1643
+
}
1644
+
1645
+
fn email_config_for_test(o: EmailOverrides) -> EmailConfig {
1646
+
EmailConfig {
1647
+
from_address: o.from_address.map(str::to_string),
1648
+
from_name: "Tranquil PDS".to_string(),
1649
+
helo_name: None,
1650
+
smarthost: SmarthostConfig {
1651
+
host: o.smarthost_host.map(str::to_string),
1652
+
port: 587,
1653
+
username: o.smarthost_username.map(str::to_string),
1654
+
password: o.smarthost_password.map(str::to_string),
1655
+
tls: "starttls".to_string(),
1656
+
pool_size: 4,
1657
+
command_timeout_secs: 30,
1658
+
total_timeout_secs: 60,
1659
+
},
1660
+
direct_mx: DirectMxConfig {
1661
+
command_timeout_secs: 30,
1662
+
total_timeout_secs: 60,
1663
+
max_concurrent_sends: 8,
1664
+
require_tls: false,
1665
+
},
1666
+
dkim: DkimConfig {
1667
+
selector: o.dkim_selector.map(str::to_string),
1668
+
domain: o.dkim_domain.map(str::to_string),
1669
+
private_key_path: o.dkim_key_path.map(str::to_string),
1670
+
},
1671
+
}
1672
+
}
1443
1673
}
+2
-4
crates/tranquil-pds/src/repo_ops.rs
+2
-4
crates/tranquil-pds/src/repo_ops.rs
···
526
526
527
527
let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect();
528
528
529
-
let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = ops
530
-
.iter()
531
-
.map(|op| (op.collection_rkey(), op))
532
-
.collect();
529
+
let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> =
530
+
ops.iter().map(|op| (op.collection_rkey(), op)).collect();
533
531
534
532
let final_record_uris: HashSet<AtUri> = final_ops
535
533
.iter()
+2
-8
crates/tranquil-pds/tests/repo_batch.rs
+2
-8
crates/tranquil-pds/tests/repo_batch.rs
···
402
402
"{}/xrpc/com.atproto.repo.listRecords",
403
403
base_url().await
404
404
))
405
-
.query(&[
406
-
("repo", did.as_str()),
407
-
("collection", "app.bsky.feed.post"),
408
-
])
405
+
.query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")])
409
406
.send()
410
407
.await
411
408
.expect("Failed to list records");
···
484
481
"{}/xrpc/com.atproto.repo.listRecords",
485
482
base_url().await
486
483
))
487
-
.query(&[
488
-
("repo", did.as_str()),
489
-
("collection", "app.bsky.feed.post"),
490
-
])
484
+
.query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")])
491
485
.send()
492
486
.await
493
487
.expect("Failed to list records");
+17
-14
crates/tranquil-server/src/main.rs
+17
-14
crates/tranquil-server/src/main.rs
···
53
53
return ExitCode::FAILURE;
54
54
}
55
55
};
56
-
match config.validate(*ignore_secrets) {
57
-
Ok(()) => {
58
-
println!("Configuration is valid.");
59
-
ExitCode::SUCCESS
60
-
}
61
-
Err(e) => {
62
-
eprint!("{e}");
63
-
ExitCode::FAILURE
64
-
}
56
+
if let Err(e) = config.validate(*ignore_secrets) {
57
+
eprint!("{e}");
58
+
return ExitCode::FAILURE;
65
59
}
60
+
println!("Configuration is valid.");
61
+
ExitCode::SUCCESS
66
62
}
67
63
};
68
64
}
···
141
137
142
138
let cfg = tranquil_config::get();
143
139
144
-
if let Some(email_sender) = EmailSender::from_config(cfg) {
145
-
info!("Email comms enabled");
146
-
comms_service = comms_service.register_sender(email_sender);
147
-
} else {
148
-
warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
140
+
match EmailSender::from_config(cfg) {
141
+
Ok(Some(email_sender)) => {
142
+
info!("Email comms enabled");
143
+
comms_service = comms_service.register_sender(email_sender);
144
+
}
145
+
Ok(None) => {
146
+
warn!("Email comms disabled (MAIL_FROM_ADDRESS unset)");
147
+
}
148
+
Err(e) => {
149
+
error!(error = %e, "Email configuration invalid");
150
+
return Err(e.into());
151
+
}
149
152
}
150
153
151
154
if let Some(discord_sender) = DiscordSender::from_config(cfg) {
+7
-10
crates/tranquil-store/src/blockstore/store.rs
+7
-10
crates/tranquil-store/src/blockstore/store.rs
···
806
806
impl StorageIO for EioOnReadAtRange {
807
807
fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> {
808
808
let fd = self.inner.open(path, opts)?;
809
-
self.fd_paths
810
-
.lock()
811
-
.unwrap()
812
-
.insert(fd, path.to_path_buf());
809
+
self.fd_paths.lock().unwrap().insert(fd, path.to_path_buf());
813
810
Ok(fd)
814
811
}
815
812
···
887
884
let cid_a = [0xAAu8; CID_SIZE];
888
885
let data_a = vec![1u8; 64];
889
886
let block_a_offset = BlockOffset::new(BLOCK_HEADER_SIZE as u64);
890
-
let len_a =
891
-
encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap();
887
+
let len_a = encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap();
892
888
893
889
let block_b_offset_raw = BLOCK_HEADER_SIZE as u64 + len_a;
894
890
let block_b_offset = BlockOffset::new(block_b_offset_raw);
···
982
978
#[test]
983
979
fn retry_with_backoff_passes_attempt_index_to_op() {
984
980
let observed = std::sync::Mutex::new(Vec::<u8>::new());
985
-
let _result: Result<(), RepoError> = retry_with_backoff(instant_policy(4), &mut |attempt| {
986
-
observed.lock().unwrap().push(attempt);
987
-
Err(RepoError::storage(io::Error::other("EIO")))
988
-
});
981
+
let _result: Result<(), RepoError> =
982
+
retry_with_backoff(instant_policy(4), &mut |attempt| {
983
+
observed.lock().unwrap().push(attempt);
984
+
Err(RepoError::storage(io::Error::other("EIO")))
985
+
});
989
986
assert_eq!(*observed.lock().unwrap(), vec![0, 1, 2, 3]);
990
987
}
991
988
}
+1
-4
crates/tranquil-store/src/gauntlet/farm.rs
+1
-4
crates/tranquil-store/src/gauntlet/farm.rs
···
157
157
158
158
#[test]
159
159
fn scratch_for_thread_falls_back_to_root_zero_outside_pool() {
160
-
let roots = vec![
161
-
PathBuf::from("/scratch/a"),
162
-
PathBuf::from("/scratch/b"),
163
-
];
160
+
let roots = vec![PathBuf::from("/scratch/a"), PathBuf::from("/scratch/b")];
164
161
assert_eq!(
165
162
scratch_for_thread(&roots, None),
166
163
Some(PathBuf::from("/scratch/a"))
+1
-1
crates/tranquil-store/src/lib.rs
+1
-1
crates/tranquil-store/src/lib.rs
···
28
28
};
29
29
#[cfg(any(test, feature = "test-harness"))]
30
30
pub use sim::{
31
-
FaultConfig, LatencyNs, OpRecord, Probability, PristineGuard, SimulatedIO, SyncReorderWindow,
31
+
FaultConfig, LatencyNs, OpRecord, PristineGuard, Probability, SimulatedIO, SyncReorderWindow,
32
32
sim_proptest_cases, sim_seed_count, sim_seed_range, sim_single_seed,
33
33
};
34
34
+25
-30
crates/tranquil-store/src/sim.rs
+25
-30
crates/tranquil-store/src/sim.rs
···
1
1
use std::collections::{HashMap, HashSet, VecDeque};
2
2
use std::io;
3
3
use std::path::{Path, PathBuf};
4
+
use std::sync::Arc;
4
5
use std::sync::Mutex;
5
6
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6
-
use std::sync::Arc;
7
7
use std::time::Duration;
8
8
9
9
use crate::io::{FileId, OpenOptions, StorageIO};
···
561
561
return Err(io::Error::other("simulated EIO on read"));
562
562
}
563
563
564
-
let read_offset =
565
-
if state.should_fault(seed, fault.misdirected_read_probability) {
566
-
let drift_sectors = state.next_random_usize(seed, 8) + 1;
567
-
let drift = (drift_sectors * SECTOR_BYTES) as u64;
568
-
if state.next_random(seed) < 0.5 {
569
-
offset.saturating_sub(drift)
570
-
} else {
571
-
offset.saturating_add(drift)
572
-
}
564
+
let read_offset = if state.should_fault(seed, fault.misdirected_read_probability) {
565
+
let drift_sectors = state.next_random_usize(seed, 8) + 1;
566
+
let drift = (drift_sectors * SECTOR_BYTES) as u64;
567
+
if state.next_random(seed) < 0.5 {
568
+
offset.saturating_sub(drift)
573
569
} else {
574
-
offset
575
-
};
570
+
offset.saturating_add(drift)
571
+
}
572
+
} else {
573
+
offset
574
+
};
576
575
577
576
let storage = state.storage.get(&sid).unwrap();
578
577
···
620
619
return Err(io::Error::other("simulated EIO on write"));
621
620
}
622
621
623
-
let torn_len =
624
-
if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) {
625
-
let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES);
626
-
let page_end = page_base + TORN_PAGE_BYTES;
627
-
let cap = page_end.saturating_sub(offset as usize).min(buf.len());
628
-
let max_sectors = cap / SECTOR_BYTES;
629
-
(max_sectors >= 2).then(|| {
630
-
let n = state.next_random_usize(seed, max_sectors - 1) + 1;
631
-
n * SECTOR_BYTES
632
-
})
633
-
} else {
634
-
None
635
-
};
622
+
let torn_len = if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) {
623
+
let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES);
624
+
let page_end = page_base + TORN_PAGE_BYTES;
625
+
let cap = page_end.saturating_sub(offset as usize).min(buf.len());
626
+
let max_sectors = cap / SECTOR_BYTES;
627
+
(max_sectors >= 2).then(|| {
628
+
let n = state.next_random_usize(seed, max_sectors - 1) + 1;
629
+
n * SECTOR_BYTES
630
+
})
631
+
} else {
632
+
None
633
+
};
636
634
637
635
let actual_len = match torn_len {
638
636
Some(n) => n,
639
-
None if buf.len() > 1
640
-
&& state.should_fault(seed, fault.partial_write_probability) =>
641
-
{
637
+
None if buf.len() > 1 && state.should_fault(seed, fault.partial_write_probability) => {
642
638
let partial = state.next_random_usize(seed, buf.len());
643
639
partial.max(1)
644
640
}
···
840
836
}
841
837
842
838
let dir_path = path.to_path_buf();
843
-
let actually_persisted =
844
-
!state.should_fault(seed, fault.dir_sync_failure_probability);
839
+
let actually_persisted = !state.should_fault(seed, fault.dir_sync_failure_probability);
845
840
846
841
if actually_persisted {
847
842
state.dirs_durable.insert(dir_path.clone());
+1
-1
crates/tranquil-store/tests/gauntlet_smoke.rs
+1
-1
crates/tranquil-store/tests/gauntlet_smoke.rs
···
554
554
std::fs::create_dir_all(&root_b).expect("mkdir b");
555
555
let roots = vec![root_a.clone(), root_b.clone()];
556
556
let reports = farm::run_many_timed_with_scratch_roots(
557
-
|seed| fast_sanity_config(seed),
557
+
fast_sanity_config,
558
558
&roots,
559
559
(0..2).map(Seed),
560
560
);
+10
-17
crates/tranquil-store/tests/sim_blockstore.rs
+10
-17
crates/tranquil-store/tests/sim_blockstore.rs
···
717
717
718
718
{
719
719
let s = Arc::clone(&sim);
720
-
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(
721
-
config.clone(),
722
-
move || Arc::clone(&s),
723
-
)
724
-
.unwrap();
720
+
let store =
721
+
TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config.clone(), move || {
722
+
Arc::clone(&s)
723
+
})
724
+
.unwrap();
725
725
store
726
726
.put_blocks_blocking(vec![(cid, data.clone())])
727
727
.unwrap();
···
730
730
sim.crash();
731
731
732
732
let s = Arc::clone(&sim);
733
-
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || {
734
-
Arc::clone(&s)
735
-
})
736
-
.unwrap();
733
+
let store =
734
+
TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || Arc::clone(&s))
735
+
.unwrap();
737
736
738
737
match store.get_block_sync(&cid) {
739
-
Ok(Some(d)) => assert_eq!(
740
-
&d[..],
741
-
&data[..],
742
-
"block content mismatch after crash"
743
-
),
738
+
Ok(Some(d)) => assert_eq!(&d[..], &data[..], "block content mismatch after crash"),
744
739
Ok(None) => panic!(
745
740
"durability bug: put_blocks_blocking returned Ok but block missing after crash"
746
741
),
747
-
Err(e) => panic!(
748
-
"durability bug: block read failed after crash: {e}"
749
-
),
742
+
Err(e) => panic!("durability bug: block read failed after crash: {e}"),
750
743
}
751
744
});
752
745
}
+4
-1
crates/tranquil-store/tests/sim_eventlog.rs
+4
-1
crates/tranquil-store/tests/sim_eventlog.rs
···
1157
1157
"sync must ack only events 1..=2 with corrupt event 3"
1158
1158
);
1159
1159
assert_eq!(result.flushed_events.len(), 2);
1160
-
assert!(writer.is_poisoned(), "writer must be poisoned after partial sync");
1160
+
assert!(
1161
+
writer.is_poisoned(),
1162
+
"writer must be poisoned after partial sync"
1163
+
);
1161
1164
1162
1165
let append_after_poison = writer.append(
1163
1166
DidHash::from_did("did:plc:after"),
+7
-8
crates/tranquil-sync/src/subscribe_repos.rs
+7
-8
crates/tranquil-sync/src/subscribe_repos.rs
···
85
85
};
86
86
for event in events {
87
87
*last_seen = event.seq;
88
-
let bytes =
89
-
match format_event_with_prefetched_blocks(state, event, &prefetched).await {
90
-
Ok(b) => b,
91
-
Err(e) => {
92
-
warn!("Lag recovery format failed: {}", e);
93
-
return Err(());
94
-
}
95
-
};
88
+
let bytes = match format_event_with_prefetched_blocks(state, event, &prefetched).await {
89
+
Ok(b) => b,
90
+
Err(e) => {
91
+
warn!("Lag recovery format failed: {}", e);
92
+
return Err(());
93
+
}
94
+
};
96
95
if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
97
96
warn!("Lag recovery send failed: {}", e);
98
97
return Err(());
History
4 rounds
0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>
merge conflicts detected
expand
collapse
expand
collapse
- .config/nextest.toml:68
- Cargo.lock:9
- Cargo.toml:26
- crates/tranquil-comms/Cargo.toml:10
- crates/tranquil-config/src/lib.rs:5
- example.toml:373
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>