Lewis: May this revision serve well! lu5a@proton.me
+239
-98
Diff
round #1
+158
crates/tranquil-comms/tests/email_smtp.rs
+158
crates/tranquil-comms/tests/email_smtp.rs
···
1
+
use std::time::Duration;
2
+
3
+
use chrono::Utc;
4
+
use lettre::message::Mailbox;
5
+
use lettre::transport::smtp::AsyncSmtpTransport;
6
+
use lettre::transport::smtp::extension::ClientId;
7
+
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8
+
use tokio::net::{TcpListener, TcpStream};
9
+
use tranquil_comms::email::transport::SendMode;
10
+
use tranquil_comms::email::{EmailSender, types::HeloName};
11
+
use tranquil_comms::{CommsChannel, CommsSender, CommsStatus, CommsType, QueuedComms, SendError};
12
+
use uuid::Uuid;
13
+
14
+
fn fixture(recipient: &str, subject: &str, body: &str) -> QueuedComms {
15
+
QueuedComms {
16
+
id: Uuid::new_v4(),
17
+
user_id: None,
18
+
channel: CommsChannel::Email,
19
+
comms_type: CommsType::Welcome,
20
+
status: CommsStatus::Pending,
21
+
recipient: recipient.to_string(),
22
+
subject: Some(subject.to_string()),
23
+
body: body.to_string(),
24
+
metadata: None,
25
+
attempts: 0,
26
+
max_attempts: 3,
27
+
last_error: None,
28
+
created_at: Utc::now(),
29
+
updated_at: Utc::now(),
30
+
scheduled_for: Utc::now(),
31
+
processed_at: None,
32
+
}
33
+
}
34
+
35
+
fn build_smarthost_sender(host: &str, port: u16) -> EmailSender {
36
+
build_smarthost_sender_with_total_timeout(host, port, Duration::from_secs(10))
37
+
}
38
+
39
+
fn build_smarthost_sender_with_total_timeout(
40
+
host: &str,
41
+
port: u16,
42
+
total_timeout: Duration,
43
+
) -> EmailSender {
44
+
let from: Mailbox = "Tranquil Test <noreply@nel.pet>".parse().unwrap();
45
+
let helo = HeloName::parse("mta.nel.pet").unwrap();
46
+
let transport = AsyncSmtpTransport::<lettre::Tokio1Executor>::builder_dangerous(host)
47
+
.port(port)
48
+
.hello_name(ClientId::Domain(helo.into_inner()))
49
+
.timeout(Some(Duration::from_secs(5)))
50
+
.build();
51
+
EmailSender::new(
52
+
from,
53
+
SendMode::Smarthost {
54
+
transport: Box::new(transport),
55
+
total_timeout,
56
+
},
57
+
None,
58
+
)
59
+
}
60
+
61
+
async fn drive_stub(stream: TcpStream, rcpt_response: &'static [u8]) -> std::io::Result<()> {
62
+
let (read, mut write) = stream.into_split();
63
+
let mut reader = BufReader::new(read);
64
+
write.write_all(b"220 stub ESMTP\r\n").await?;
65
+
let mut line = String::new();
66
+
loop {
67
+
line.clear();
68
+
let n = reader.read_line(&mut line).await?;
69
+
if n == 0 {
70
+
return Ok(());
71
+
}
72
+
let upper = line.to_ascii_uppercase();
73
+
let response: &[u8] = match upper.split_whitespace().next() {
74
+
Some("EHLO") | Some("HELO") => b"250-stub\r\n250 SIZE 10240000\r\n",
75
+
Some("MAIL") => b"250 OK\r\n",
76
+
Some("RCPT") => rcpt_response,
77
+
Some("DATA") => b"354 end with .\r\n",
78
+
Some("RSET") => b"250 OK\r\n",
79
+
Some("QUIT") => b"221 bye\r\n",
80
+
_ => b"500 unknown\r\n",
81
+
};
82
+
write.write_all(response).await?;
83
+
if upper.starts_with("QUIT") {
84
+
return Ok(());
85
+
}
86
+
}
87
+
}
88
+
89
+
async fn spawn_stub(rcpt_response: &'static [u8]) -> u16 {
90
+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
91
+
let port = listener.local_addr().unwrap().port();
92
+
tokio::spawn(async move {
93
+
let (stream, _) = listener.accept().await.unwrap();
94
+
let _ = drive_stub(stream, rcpt_response).await;
95
+
});
96
+
port
97
+
}
98
+
99
+
#[tokio::test]
100
+
async fn rcpt_550_classifies_as_smtp_permanent() {
101
+
let port = spawn_stub(b"550 5.1.1 user unknown\r\n").await;
102
+
let sender = build_smarthost_sender("127.0.0.1", port);
103
+
let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await;
104
+
match result {
105
+
Err(SendError::SmtpPermanent(_)) => {}
106
+
other => panic!("expected SmtpPermanent, got {other:?}"),
107
+
}
108
+
}
109
+
110
+
#[tokio::test]
111
+
async fn rcpt_421_classifies_as_smtp_transient() {
112
+
let port = spawn_stub(b"421 4.7.0 try again later\r\n").await;
113
+
let sender = build_smarthost_sender("127.0.0.1", port);
114
+
let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await;
115
+
match result {
116
+
Err(SendError::SmtpTransient(_)) => {}
117
+
other => panic!("expected SmtpTransient, got {other:?}"),
118
+
}
119
+
}
120
+
121
+
#[tokio::test]
122
+
async fn invalid_recipient_classifies_as_invalid_recipient() {
123
+
let port = spawn_stub(b"250 OK\r\n").await;
124
+
let sender = build_smarthost_sender("127.0.0.1", port);
125
+
let result = sender.send(&fixture("not-an-address", "x", "x")).await;
126
+
match result {
127
+
Err(SendError::InvalidRecipient(_)) => {}
128
+
other => panic!("expected InvalidRecipient, got {other:?}"),
129
+
}
130
+
}
131
+
132
+
async fn spawn_silent_stub() -> u16 {
133
+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
134
+
let port = listener.local_addr().unwrap().port();
135
+
tokio::spawn(async move {
136
+
let (_stream, _) = listener.accept().await.unwrap();
137
+
std::future::pending::<()>().await;
138
+
});
139
+
port
140
+
}
141
+
142
+
#[tokio::test]
143
+
async fn smarthost_silent_relay_hits_total_timeout() {
144
+
let port = spawn_silent_stub().await;
145
+
let sender =
146
+
build_smarthost_sender_with_total_timeout("127.0.0.1", port, Duration::from_millis(500));
147
+
let start = std::time::Instant::now();
148
+
let result = sender.send(&fixture("nel@nel.pet", "x", "x")).await;
149
+
let elapsed = start.elapsed();
150
+
match result {
151
+
Err(SendError::Timeout) => {}
152
+
other => panic!("expected Timeout, got {other:?}"),
153
+
}
154
+
assert!(
155
+
elapsed < Duration::from_secs(2),
156
+
"send returned in {elapsed:?}, expected close to 500ms total_timeout"
157
+
);
158
+
}
+2
-4
crates/tranquil-pds/src/repo_ops.rs
+2
-4
crates/tranquil-pds/src/repo_ops.rs
···
526
526
527
527
let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect();
528
528
529
-
let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = ops
530
-
.iter()
531
-
.map(|op| (op.collection_rkey(), op))
532
-
.collect();
529
+
let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> =
530
+
ops.iter().map(|op| (op.collection_rkey(), op)).collect();
533
531
534
532
let final_record_uris: HashSet<AtUri> = final_ops
535
533
.iter()
+2
-8
crates/tranquil-pds/tests/repo_batch.rs
+2
-8
crates/tranquil-pds/tests/repo_batch.rs
···
402
402
"{}/xrpc/com.atproto.repo.listRecords",
403
403
base_url().await
404
404
))
405
-
.query(&[
406
-
("repo", did.as_str()),
407
-
("collection", "app.bsky.feed.post"),
408
-
])
405
+
.query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")])
409
406
.send()
410
407
.await
411
408
.expect("Failed to list records");
···
484
481
"{}/xrpc/com.atproto.repo.listRecords",
485
482
base_url().await
486
483
))
487
-
.query(&[
488
-
("repo", did.as_str()),
489
-
("collection", "app.bsky.feed.post"),
490
-
])
484
+
.query(&[("repo", did.as_str()), ("collection", "app.bsky.feed.post")])
491
485
.send()
492
486
.await
493
487
.expect("Failed to list records");
+21
-14
crates/tranquil-server/src/main.rs
+21
-14
crates/tranquil-server/src/main.rs
···
53
53
return ExitCode::FAILURE;
54
54
}
55
55
};
56
-
match config.validate(*ignore_secrets) {
57
-
Ok(()) => {
58
-
println!("Configuration is valid.");
59
-
ExitCode::SUCCESS
60
-
}
61
-
Err(e) => {
62
-
eprint!("{e}");
63
-
ExitCode::FAILURE
64
-
}
56
+
if let Err(e) = config.validate(*ignore_secrets) {
57
+
eprint!("{e}");
58
+
return ExitCode::FAILURE;
65
59
}
60
+
if let Err(e) = EmailSender::from_config(&config) {
61
+
eprintln!("Email configuration invalid: {e}");
62
+
return ExitCode::FAILURE;
63
+
}
64
+
println!("Configuration is valid.");
65
+
ExitCode::SUCCESS
66
66
}
67
67
};
68
68
}
···
141
141
142
142
let cfg = tranquil_config::get();
143
143
144
-
if let Some(email_sender) = EmailSender::from_config(cfg) {
145
-
info!("Email comms enabled");
146
-
comms_service = comms_service.register_sender(email_sender);
147
-
} else {
148
-
warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
144
+
match EmailSender::from_config(cfg) {
145
+
Ok(Some(email_sender)) => {
146
+
info!("Email comms enabled");
147
+
comms_service = comms_service.register_sender(email_sender);
148
+
}
149
+
Ok(None) => {
150
+
warn!("Email comms disabled (MAIL_FROM_ADDRESS unset)");
151
+
}
152
+
Err(e) => {
153
+
error!(error = %e, "Email configuration invalid");
154
+
return Err(e.into());
155
+
}
149
156
}
150
157
151
158
if let Some(discord_sender) = DiscordSender::from_config(cfg) {
+7
-10
crates/tranquil-store/src/blockstore/store.rs
+7
-10
crates/tranquil-store/src/blockstore/store.rs
···
806
806
impl StorageIO for EioOnReadAtRange {
807
807
fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> {
808
808
let fd = self.inner.open(path, opts)?;
809
-
self.fd_paths
810
-
.lock()
811
-
.unwrap()
812
-
.insert(fd, path.to_path_buf());
809
+
self.fd_paths.lock().unwrap().insert(fd, path.to_path_buf());
813
810
Ok(fd)
814
811
}
815
812
···
887
884
let cid_a = [0xAAu8; CID_SIZE];
888
885
let data_a = vec![1u8; 64];
889
886
let block_a_offset = BlockOffset::new(BLOCK_HEADER_SIZE as u64);
890
-
let len_a =
891
-
encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap();
887
+
let len_a = encode_block_record(&setup, fd, block_a_offset, &cid_a, &data_a).unwrap();
892
888
893
889
let block_b_offset_raw = BLOCK_HEADER_SIZE as u64 + len_a;
894
890
let block_b_offset = BlockOffset::new(block_b_offset_raw);
···
982
978
#[test]
983
979
fn retry_with_backoff_passes_attempt_index_to_op() {
984
980
let observed = std::sync::Mutex::new(Vec::<u8>::new());
985
-
let _result: Result<(), RepoError> = retry_with_backoff(instant_policy(4), &mut |attempt| {
986
-
observed.lock().unwrap().push(attempt);
987
-
Err(RepoError::storage(io::Error::other("EIO")))
988
-
});
981
+
let _result: Result<(), RepoError> =
982
+
retry_with_backoff(instant_policy(4), &mut |attempt| {
983
+
observed.lock().unwrap().push(attempt);
984
+
Err(RepoError::storage(io::Error::other("EIO")))
985
+
});
989
986
assert_eq!(*observed.lock().unwrap(), vec![0, 1, 2, 3]);
990
987
}
991
988
}
+1
-4
crates/tranquil-store/src/gauntlet/farm.rs
+1
-4
crates/tranquil-store/src/gauntlet/farm.rs
···
157
157
158
158
#[test]
159
159
fn scratch_for_thread_falls_back_to_root_zero_outside_pool() {
160
-
let roots = vec![
161
-
PathBuf::from("/scratch/a"),
162
-
PathBuf::from("/scratch/b"),
163
-
];
160
+
let roots = vec![PathBuf::from("/scratch/a"), PathBuf::from("/scratch/b")];
164
161
assert_eq!(
165
162
scratch_for_thread(&roots, None),
166
163
Some(PathBuf::from("/scratch/a"))
+1
-1
crates/tranquil-store/src/lib.rs
+1
-1
crates/tranquil-store/src/lib.rs
···
28
28
};
29
29
#[cfg(any(test, feature = "test-harness"))]
30
30
pub use sim::{
31
-
FaultConfig, LatencyNs, OpRecord, Probability, PristineGuard, SimulatedIO, SyncReorderWindow,
31
+
FaultConfig, LatencyNs, OpRecord, PristineGuard, Probability, SimulatedIO, SyncReorderWindow,
32
32
sim_proptest_cases, sim_seed_count, sim_seed_range, sim_single_seed,
33
33
};
34
34
+25
-30
crates/tranquil-store/src/sim.rs
+25
-30
crates/tranquil-store/src/sim.rs
···
1
1
use std::collections::{HashMap, HashSet, VecDeque};
2
2
use std::io;
3
3
use std::path::{Path, PathBuf};
4
+
use std::sync::Arc;
4
5
use std::sync::Mutex;
5
6
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6
-
use std::sync::Arc;
7
7
use std::time::Duration;
8
8
9
9
use crate::io::{FileId, OpenOptions, StorageIO};
···
561
561
return Err(io::Error::other("simulated EIO on read"));
562
562
}
563
563
564
-
let read_offset =
565
-
if state.should_fault(seed, fault.misdirected_read_probability) {
566
-
let drift_sectors = state.next_random_usize(seed, 8) + 1;
567
-
let drift = (drift_sectors * SECTOR_BYTES) as u64;
568
-
if state.next_random(seed) < 0.5 {
569
-
offset.saturating_sub(drift)
570
-
} else {
571
-
offset.saturating_add(drift)
572
-
}
564
+
let read_offset = if state.should_fault(seed, fault.misdirected_read_probability) {
565
+
let drift_sectors = state.next_random_usize(seed, 8) + 1;
566
+
let drift = (drift_sectors * SECTOR_BYTES) as u64;
567
+
if state.next_random(seed) < 0.5 {
568
+
offset.saturating_sub(drift)
573
569
} else {
574
-
offset
575
-
};
570
+
offset.saturating_add(drift)
571
+
}
572
+
} else {
573
+
offset
574
+
};
576
575
577
576
let storage = state.storage.get(&sid).unwrap();
578
577
···
620
619
return Err(io::Error::other("simulated EIO on write"));
621
620
}
622
621
623
-
let torn_len =
624
-
if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) {
625
-
let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES);
626
-
let page_end = page_base + TORN_PAGE_BYTES;
627
-
let cap = page_end.saturating_sub(offset as usize).min(buf.len());
628
-
let max_sectors = cap / SECTOR_BYTES;
629
-
(max_sectors >= 2).then(|| {
630
-
let n = state.next_random_usize(seed, max_sectors - 1) + 1;
631
-
n * SECTOR_BYTES
632
-
})
633
-
} else {
634
-
None
635
-
};
622
+
let torn_len = if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) {
623
+
let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES);
624
+
let page_end = page_base + TORN_PAGE_BYTES;
625
+
let cap = page_end.saturating_sub(offset as usize).min(buf.len());
626
+
let max_sectors = cap / SECTOR_BYTES;
627
+
(max_sectors >= 2).then(|| {
628
+
let n = state.next_random_usize(seed, max_sectors - 1) + 1;
629
+
n * SECTOR_BYTES
630
+
})
631
+
} else {
632
+
None
633
+
};
636
634
637
635
let actual_len = match torn_len {
638
636
Some(n) => n,
639
-
None if buf.len() > 1
640
-
&& state.should_fault(seed, fault.partial_write_probability) =>
641
-
{
637
+
None if buf.len() > 1 && state.should_fault(seed, fault.partial_write_probability) => {
642
638
let partial = state.next_random_usize(seed, buf.len());
643
639
partial.max(1)
644
640
}
···
840
836
}
841
837
842
838
let dir_path = path.to_path_buf();
843
-
let actually_persisted =
844
-
!state.should_fault(seed, fault.dir_sync_failure_probability);
839
+
let actually_persisted = !state.should_fault(seed, fault.dir_sync_failure_probability);
845
840
846
841
if actually_persisted {
847
842
state.dirs_durable.insert(dir_path.clone());
+1
-1
crates/tranquil-store/tests/gauntlet_smoke.rs
+1
-1
crates/tranquil-store/tests/gauntlet_smoke.rs
···
554
554
std::fs::create_dir_all(&root_b).expect("mkdir b");
555
555
let roots = vec![root_a.clone(), root_b.clone()];
556
556
let reports = farm::run_many_timed_with_scratch_roots(
557
-
|seed| fast_sanity_config(seed),
557
+
fast_sanity_config,
558
558
&roots,
559
559
(0..2).map(Seed),
560
560
);
+10
-17
crates/tranquil-store/tests/sim_blockstore.rs
+10
-17
crates/tranquil-store/tests/sim_blockstore.rs
···
717
717
718
718
{
719
719
let s = Arc::clone(&sim);
720
-
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(
721
-
config.clone(),
722
-
move || Arc::clone(&s),
723
-
)
724
-
.unwrap();
720
+
let store =
721
+
TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config.clone(), move || {
722
+
Arc::clone(&s)
723
+
})
724
+
.unwrap();
725
725
store
726
726
.put_blocks_blocking(vec![(cid, data.clone())])
727
727
.unwrap();
···
730
730
sim.crash();
731
731
732
732
let s = Arc::clone(&sim);
733
-
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || {
734
-
Arc::clone(&s)
735
-
})
736
-
.unwrap();
733
+
let store =
734
+
TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || Arc::clone(&s))
735
+
.unwrap();
737
736
738
737
match store.get_block_sync(&cid) {
739
-
Ok(Some(d)) => assert_eq!(
740
-
&d[..],
741
-
&data[..],
742
-
"block content mismatch after crash"
743
-
),
738
+
Ok(Some(d)) => assert_eq!(&d[..], &data[..], "block content mismatch after crash"),
744
739
Ok(None) => panic!(
745
740
"durability bug: put_blocks_blocking returned Ok but block missing after crash"
746
741
),
747
-
Err(e) => panic!(
748
-
"durability bug: block read failed after crash: {e}"
749
-
),
742
+
Err(e) => panic!("durability bug: block read failed after crash: {e}"),
750
743
}
751
744
});
752
745
}
+4
-1
crates/tranquil-store/tests/sim_eventlog.rs
+4
-1
crates/tranquil-store/tests/sim_eventlog.rs
···
1157
1157
"sync must ack only events 1..=2 with corrupt event 3"
1158
1158
);
1159
1159
assert_eq!(result.flushed_events.len(), 2);
1160
-
assert!(writer.is_poisoned(), "writer must be poisoned after partial sync");
1160
+
assert!(
1161
+
writer.is_poisoned(),
1162
+
"writer must be poisoned after partial sync"
1163
+
);
1161
1164
1162
1165
let append_after_poison = writer.append(
1163
1166
DidHash::from_did("did:plc:after"),
+7
-8
crates/tranquil-sync/src/subscribe_repos.rs
+7
-8
crates/tranquil-sync/src/subscribe_repos.rs
···
85
85
};
86
86
for event in events {
87
87
*last_seen = event.seq;
88
-
let bytes =
89
-
match format_event_with_prefetched_blocks(state, event, &prefetched).await {
90
-
Ok(b) => b,
91
-
Err(e) => {
92
-
warn!("Lag recovery format failed: {}", e);
93
-
return Err(());
94
-
}
95
-
};
88
+
let bytes = match format_event_with_prefetched_blocks(state, event, &prefetched).await {
89
+
Ok(b) => b,
90
+
Err(e) => {
91
+
warn!("Lag recovery format failed: {}", e);
92
+
return Err(());
93
+
}
94
+
};
96
95
if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
97
96
warn!("Lag recovery send failed: {}", e);
98
97
return Err(());
History
4 rounds
0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>
merge conflicts detected
expand
collapse
expand
collapse
- .config/nextest.toml:68
- Cargo.lock:9
- Cargo.toml:26
- crates/tranquil-comms/Cargo.toml:10
- crates/tranquil-config/src/lib.rs:5
- example.toml:373
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
feat(tranquil-server): email config, tests, fmt
Lewis: May this revision serve well! <lu5a@proton.me>