Lewis: May this revision serve well! lu5a@proton.me
+535
-158
Diff
round #0
+15
.sqlx/query-5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349.json
+15
.sqlx/query-5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "UPDATE comms_queue\n SET status = 'failed'::comms_status,\n attempts = max_attempts,\n last_error = $2,\n updated_at = NOW()\n WHERE id = $1",
4
+
"describe": {
5
+
"columns": [],
6
+
"parameters": {
7
+
"Left": [
8
+
"Uuid",
9
+
"Text"
10
+
]
11
+
},
12
+
"nullable": []
13
+
},
14
+
"hash": "5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349"
15
+
}
+158
.sqlx/query-890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae.json
+158
.sqlx/query-890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "UPDATE comms_queue\n SET status = 'processing', updated_at = NOW()\n WHERE id IN (\n SELECT id FROM comms_queue\n WHERE attempts < max_attempts\n AND scheduled_for <= $1\n AND (\n status = 'pending'\n OR (status = 'processing'\n AND updated_at < $1 - INTERVAL '10 minutes')\n )\n ORDER BY scheduled_for ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n )\n RETURNING\n id, user_id,\n channel as \"channel: CommsChannel\",\n comms_type as \"comms_type: CommsType\",\n status as \"status: CommsStatus\",\n recipient, subject, body, metadata,\n attempts, max_attempts, last_error,\n created_at, updated_at, scheduled_for, processed_at",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "id",
9
+
"type_info": "Uuid"
10
+
},
11
+
{
12
+
"ordinal": 1,
13
+
"name": "user_id",
14
+
"type_info": "Uuid"
15
+
},
16
+
{
17
+
"ordinal": 2,
18
+
"name": "channel: CommsChannel",
19
+
"type_info": {
20
+
"Custom": {
21
+
"name": "comms_channel",
22
+
"kind": {
23
+
"Enum": [
24
+
"email",
25
+
"discord",
26
+
"telegram",
27
+
"signal"
28
+
]
29
+
}
30
+
}
31
+
}
32
+
},
33
+
{
34
+
"ordinal": 3,
35
+
"name": "comms_type: CommsType",
36
+
"type_info": {
37
+
"Custom": {
38
+
"name": "comms_type",
39
+
"kind": {
40
+
"Enum": [
41
+
"welcome",
42
+
"email_verification",
43
+
"password_reset",
44
+
"email_update",
45
+
"account_deletion",
46
+
"admin_email",
47
+
"plc_operation",
48
+
"two_factor_code",
49
+
"channel_verification",
50
+
"passkey_recovery",
51
+
"legacy_login_alert",
52
+
"migration_verification",
53
+
"channel_verified"
54
+
]
55
+
}
56
+
}
57
+
}
58
+
},
59
+
{
60
+
"ordinal": 4,
61
+
"name": "status: CommsStatus",
62
+
"type_info": {
63
+
"Custom": {
64
+
"name": "comms_status",
65
+
"kind": {
66
+
"Enum": [
67
+
"pending",
68
+
"processing",
69
+
"sent",
70
+
"failed"
71
+
]
72
+
}
73
+
}
74
+
}
75
+
},
76
+
{
77
+
"ordinal": 5,
78
+
"name": "recipient",
79
+
"type_info": "Text"
80
+
},
81
+
{
82
+
"ordinal": 6,
83
+
"name": "subject",
84
+
"type_info": "Text"
85
+
},
86
+
{
87
+
"ordinal": 7,
88
+
"name": "body",
89
+
"type_info": "Text"
90
+
},
91
+
{
92
+
"ordinal": 8,
93
+
"name": "metadata",
94
+
"type_info": "Jsonb"
95
+
},
96
+
{
97
+
"ordinal": 9,
98
+
"name": "attempts",
99
+
"type_info": "Int4"
100
+
},
101
+
{
102
+
"ordinal": 10,
103
+
"name": "max_attempts",
104
+
"type_info": "Int4"
105
+
},
106
+
{
107
+
"ordinal": 11,
108
+
"name": "last_error",
109
+
"type_info": "Text"
110
+
},
111
+
{
112
+
"ordinal": 12,
113
+
"name": "created_at",
114
+
"type_info": "Timestamptz"
115
+
},
116
+
{
117
+
"ordinal": 13,
118
+
"name": "updated_at",
119
+
"type_info": "Timestamptz"
120
+
},
121
+
{
122
+
"ordinal": 14,
123
+
"name": "scheduled_for",
124
+
"type_info": "Timestamptz"
125
+
},
126
+
{
127
+
"ordinal": 15,
128
+
"name": "processed_at",
129
+
"type_info": "Timestamptz"
130
+
}
131
+
],
132
+
"parameters": {
133
+
"Left": [
134
+
"Timestamptz",
135
+
"Int8"
136
+
]
137
+
},
138
+
"nullable": [
139
+
false,
140
+
false,
141
+
false,
142
+
false,
143
+
false,
144
+
false,
145
+
true,
146
+
false,
147
+
true,
148
+
false,
149
+
false,
150
+
true,
151
+
false,
152
+
false,
153
+
false,
154
+
true
155
+
]
156
+
},
157
+
"hash": "890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae"
158
+
}
+192
crates/tranquil-comms/src/email/mod.rs
+192
crates/tranquil-comms/src/email/mod.rs
···
1
+
pub mod dkim;
2
+
pub mod message;
3
+
mod mx;
4
+
pub mod transport;
5
+
pub mod types;
6
+
7
+
use std::sync::Arc;
8
+
use std::time::Duration;
9
+
10
+
use async_trait::async_trait;
11
+
use hickory_resolver::TokioAsyncResolver;
12
+
use lettre::message::Mailbox;
13
+
use lettre::transport::smtp::AsyncSmtpTransport;
14
+
use lettre::transport::smtp::PoolConfig;
15
+
use lettre::transport::smtp::authentication::Credentials;
16
+
use lettre::transport::smtp::extension::ClientId;
17
+
use tokio::sync::Semaphore;
18
+
use tracing::{info, warn};
19
+
20
+
pub use self::dkim::DkimSigner;
21
+
pub use self::transport::SendMode;
22
+
use self::types::{
23
+
DkimKeyPath, DkimSelector, EmailDomain, HeloName, SmtpHost, SmtpPassword, SmtpPort,
24
+
SmtpUsername, TlsMode,
25
+
};
26
+
use crate::sender::{CommsSender, SendError};
27
+
use crate::types::{CommsChannel, QueuedComms};
28
+
29
+
pub struct EmailSender {
30
+
from: Mailbox,
31
+
mode: SendMode,
32
+
dkim: Option<DkimSigner>,
33
+
}
34
+
35
+
impl EmailSender {
36
+
pub fn new(from: Mailbox, mode: SendMode, dkim: Option<DkimSigner>) -> Self {
37
+
Self { from, mode, dkim }
38
+
}
39
+
40
+
pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Result<Option<Self>, SendError> {
41
+
let Some(from_address) = cfg.email.from_address.as_deref().filter(|s| !s.is_empty()) else {
42
+
info!("Email sender disabled: MAIL_FROM_ADDRESS unset");
43
+
return Ok(None);
44
+
};
45
+
let from = build_from(&cfg.email.from_name, from_address)?;
46
+
let dkim = build_dkim(&cfg.email.dkim)?;
47
+
let mode = match cfg
48
+
.email
49
+
.smarthost
50
+
.host
51
+
.as_deref()
52
+
.filter(|h| !h.is_empty())
53
+
{
54
+
Some(host) => build_smarthost(cfg, host)?,
55
+
None => build_direct_mx(cfg)?,
56
+
};
57
+
info!(?mode, dkim = dkim.is_some(), "Email sender initialized");
58
+
Ok(Some(Self { from, mode, dkim }))
59
+
}
60
+
}
61
+
62
+
fn config_invalid(field: &str, error: impl std::fmt::Display) -> SendError {
63
+
SendError::ConfigInvalid(format!("{field}: {error}"))
64
+
}
65
+
66
+
fn build_from(from_name: &str, from_address: &str) -> Result<Mailbox, SendError> {
67
+
let raw = match from_name.is_empty() {
68
+
true => from_address.to_string(),
69
+
false => format!("\"{}\" <{}>", from_name.replace('"', "'"), from_address),
70
+
};
71
+
raw.parse::<Mailbox>()
72
+
.map_err(|e| config_invalid("MAIL_FROM_ADDRESS / MAIL_FROM_NAME", e))
73
+
}
74
+
75
+
fn build_smarthost(
76
+
cfg: &tranquil_config::TranquilConfig,
77
+
host_raw: &str,
78
+
) -> Result<SendMode, SendError> {
79
+
let host = SmtpHost::parse(host_raw).map_err(|e| config_invalid("MAIL_SMARTHOST_HOST", e))?;
80
+
let port = SmtpPort::parse(cfg.email.smarthost.port)
81
+
.map_err(|e| config_invalid("MAIL_SMARTHOST_PORT", e))?;
82
+
let tls = TlsMode::parse(&cfg.email.smarthost.tls)
83
+
.map_err(|e| config_invalid("MAIL_SMARTHOST_TLS", e))?;
84
+
let helo = resolve_helo(cfg)?;
85
+
let pool = PoolConfig::new()
86
+
.max_size(cfg.email.smarthost.pool_size)
87
+
.idle_timeout(Duration::from_secs(60));
88
+
let command_timeout = Duration::from_secs(cfg.email.smarthost.command_timeout_secs);
89
+
let total_timeout = Duration::from_secs(cfg.email.smarthost.total_timeout_secs);
90
+
91
+
let builder = match tls {
92
+
TlsMode::Implicit => AsyncSmtpTransport::<lettre::Tokio1Executor>::relay(host.as_str())
93
+
.map_err(|e| config_invalid("smarthost TLS setup", e))?,
94
+
TlsMode::Starttls => AsyncSmtpTransport::<lettre::Tokio1Executor>::starttls_relay(
95
+
host.as_str(),
96
+
)
97
+
.map_err(|e| config_invalid("smarthost TLS setup", e))?,
98
+
TlsMode::None => AsyncSmtpTransport::<lettre::Tokio1Executor>::builder_dangerous(host.as_str()),
99
+
};
100
+
let builder = builder
101
+
.port(port.as_u16())
102
+
.hello_name(ClientId::Domain(helo.into_inner()))
103
+
.timeout(Some(command_timeout))
104
+
.pool_config(pool);
105
+
let builder = match (
106
+
cfg.email.smarthost.username.as_deref(),
107
+
cfg.email.smarthost.password.as_deref(),
108
+
) {
109
+
(Some(u), Some(p)) => {
110
+
let username =
111
+
SmtpUsername::parse(u).map_err(|e| config_invalid("MAIL_SMARTHOST_USERNAME", e))?;
112
+
let password =
113
+
SmtpPassword::parse(p).map_err(|e| config_invalid("MAIL_SMARTHOST_PASSWORD", e))?;
114
+
builder.credentials(Credentials::new(
115
+
username.into_inner(),
116
+
password.expose().to_string(),
117
+
))
118
+
}
119
+
_ => builder,
120
+
};
121
+
Ok(SendMode::Smarthost {
122
+
transport: Box::new(builder.build()),
123
+
total_timeout,
124
+
})
125
+
}
126
+
127
+
fn build_direct_mx(cfg: &tranquil_config::TranquilConfig) -> Result<SendMode, SendError> {
128
+
let helo = resolve_helo(cfg)?;
129
+
let resolver = TokioAsyncResolver::tokio_from_system_conf()
130
+
.map(Arc::new)
131
+
.map_err(|e| config_invalid("system DNS configuration", e))?;
132
+
let max_concurrent = cfg.email.direct_mx.max_concurrent_sends.max(1);
133
+
Ok(SendMode::DirectMx {
134
+
resolver,
135
+
helo,
136
+
command_timeout: Duration::from_secs(cfg.email.direct_mx.command_timeout_secs),
137
+
total_timeout: Duration::from_secs(cfg.email.direct_mx.total_timeout_secs),
138
+
require_tls: cfg.email.direct_mx.require_tls,
139
+
inflight: Arc::new(Semaphore::new(max_concurrent)),
140
+
})
141
+
}
142
+
143
+
fn resolve_helo(cfg: &tranquil_config::TranquilConfig) -> Result<HeloName, SendError> {
144
+
let raw = cfg
145
+
.email
146
+
.helo_name
147
+
.clone()
148
+
.unwrap_or_else(|| cfg.server.hostname_without_port().to_string());
149
+
HeloName::parse(&raw).map_err(|e| config_invalid(&format!("HELO name {raw:?}"), e))
150
+
}
151
+
152
+
fn build_dkim(cfg: &tranquil_config::DkimConfig) -> Result<Option<DkimSigner>, SendError> {
153
+
let selector = match cfg.selector.as_deref() {
154
+
Some(s) => s,
155
+
None => return Ok(None),
156
+
};
157
+
let domain = cfg
158
+
.domain
159
+
.as_deref()
160
+
.ok_or_else(|| SendError::DkimSign("MAIL_DKIM_DOMAIN required when selector set".into()))?;
161
+
let key_path = cfg.private_key_path.as_deref().ok_or_else(|| {
162
+
SendError::DkimSign("MAIL_DKIM_KEY_PATH required when selector set".into())
163
+
})?;
164
+
let selector = DkimSelector::parse(selector)
165
+
.map_err(|e| SendError::DkimSign(format!("invalid DKIM selector: {e}")))?;
166
+
let domain = EmailDomain::parse(domain)
167
+
.map_err(|e| SendError::DkimSign(format!("invalid DKIM domain: {e}")))?;
168
+
let path = DkimKeyPath::parse(key_path)
169
+
.map_err(|e| SendError::DkimSign(format!("DKIM key path invalid: {e}")))?;
170
+
DkimSigner::load(selector, domain, path).map(Some)
171
+
}
172
+
173
+
#[async_trait]
174
+
impl CommsSender for EmailSender {
175
+
fn channel(&self) -> CommsChannel {
176
+
CommsChannel::Email
177
+
}
178
+
179
+
async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
180
+
let mut message = message::build(&self.from, notification)?;
181
+
if let Some(signer) = &self.dkim {
182
+
signer.sign(&mut message);
183
+
}
184
+
match transport::dispatch(&self.mode, message).await {
185
+
Ok(()) => Ok(()),
186
+
Err(e) => {
187
+
warn!(comms_id = %notification.id, error = %e, "SMTP send failed");
188
+
Err(e)
189
+
}
190
+
}
191
+
}
192
+
}
+4
-2
crates/tranquil-comms/src/lib.rs
+4
-2
crates/tranquil-comms/src/lib.rs
···
1
+
pub mod email;
1
2
mod locale;
2
3
mod sender;
3
4
mod types;
4
5
6
+
pub use email::EmailSender;
5
7
pub use locale::{
6
8
DEFAULT_LOCALE, NotificationStrings, VALID_LOCALES, format_message, get_strings,
7
9
validate_locale,
8
10
};
9
11
pub use sender::{
10
-
CommsSender, DiscordSender, EmailSender, SendError, SignalSender, TelegramSender,
11
-
is_valid_phone_number, is_valid_signal_username, mime_encode_header, sanitize_header_value,
12
+
CommsSender, DiscordSender, SendError, SignalSender, TelegramSender, is_valid_phone_number,
13
+
is_valid_signal_username,
12
14
};
13
15
pub use types::{CommsChannel, CommsStatus, CommsType, NewComms, QueuedComms};
+60
-113
crates/tranquil-comms/src/sender.rs
+60
-113
crates/tranquil-comms/src/sender.rs
···
1
1
use async_trait::async_trait;
2
-
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
3
2
use reqwest::Client;
4
3
use serde_json::json;
5
-
use std::process::Stdio;
6
4
use std::time::Duration;
7
-
use tokio::io::AsyncWriteExt;
8
-
use tokio::process::Command;
9
5
10
6
use super::types::{CommsChannel, QueuedComms};
11
7
···
21
17
22
18
#[derive(Debug, thiserror::Error)]
23
19
pub enum SendError {
24
-
#[error("Failed to spawn {command}: {source}")]
25
-
ProcessSpawn {
26
-
command: String,
27
-
source: std::io::Error,
28
-
},
29
-
#[error("{command} exited with non-zero status: {detail}")]
30
-
ProcessFailed { command: String, detail: String },
31
20
#[error("Channel not configured: {0:?}")]
32
21
NotConfigured(CommsChannel),
33
-
#[error("External service error: {0}")]
34
-
ExternalService(String),
22
+
#[error("Email configuration invalid: {0}")]
23
+
ConfigInvalid(String),
35
24
#[error("Invalid recipient format: {0}")]
36
25
InvalidRecipient(String),
26
+
#[error("Message construction failed: {0}")]
27
+
MessageBuild(String),
28
+
#[error("transient DNS lookup failure: {0}")]
29
+
DnsTransient(String),
30
+
#[error("permanent DNS lookup failure: {0}")]
31
+
DnsPermanent(String),
32
+
#[error("SMTP transient error: {0}")]
33
+
SmtpTransient(String),
34
+
#[error("SMTP permanent error: {0}")]
35
+
SmtpPermanent(String),
36
+
#[error("DKIM signing failed: {0}")]
37
+
DkimSign(String),
38
+
#[error("External service error: {0}")]
39
+
ExternalService(String),
37
40
#[error("Request timeout")]
38
41
Timeout,
39
42
#[error("Max retries exceeded: {0}")]
40
43
MaxRetriesExceeded(String),
41
44
}
42
45
46
+
impl SendError {
47
+
pub fn is_permanent(&self) -> bool {
48
+
match self {
49
+
Self::SmtpPermanent(_)
50
+
| Self::DnsPermanent(_)
51
+
| Self::InvalidRecipient(_)
52
+
| Self::MessageBuild(_)
53
+
| Self::DkimSign(_)
54
+
| Self::ConfigInvalid(_) => true,
55
+
Self::SmtpTransient(_)
56
+
| Self::DnsTransient(_)
57
+
| Self::Timeout
58
+
| Self::ExternalService(_)
59
+
| Self::MaxRetriesExceeded(_)
60
+
| Self::NotConfigured(_) => false,
61
+
}
62
+
}
63
+
}
64
+
43
65
fn create_http_client() -> Client {
44
66
Client::builder()
45
67
.timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
···
100
122
))
101
123
}
102
124
103
-
pub fn sanitize_header_value(value: &str) -> String {
104
-
value.replace(['\r', '\n'], " ").trim().to_string()
105
-
}
106
-
107
-
pub fn mime_encode_header(value: &str) -> String {
108
-
if value.is_ascii() {
109
-
sanitize_header_value(value)
110
-
} else {
111
-
let sanitized = sanitize_header_value(value);
112
-
format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes()))
113
-
}
114
-
}
115
-
116
125
pub fn escape_html(text: &str) -> String {
117
126
text.replace('&', "&")
118
127
.replace('<', "<")
···
135
144
tranquil_signal::SignalUsername::parse(username).is_ok()
136
145
}
137
146
138
-
pub struct EmailSender {
139
-
from_address: String,
140
-
from_name: String,
141
-
sendmail_path: String,
142
-
}
143
-
144
-
impl EmailSender {
145
-
pub fn new(from_address: String, from_name: String, sendmail_path: String) -> Self {
146
-
Self {
147
-
from_address,
148
-
from_name,
149
-
sendmail_path,
150
-
}
151
-
}
152
-
153
-
pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Option<Self> {
154
-
let from_address = cfg.email.from_address.clone()?;
155
-
let from_name = cfg.email.from_name.clone();
156
-
let sendmail_path = cfg.email.sendmail_path.clone();
157
-
Some(Self::new(from_address, from_name, sendmail_path))
158
-
}
159
-
160
-
pub fn format_email(&self, notification: &QueuedComms) -> String {
161
-
let subject = mime_encode_header(notification.subject.as_deref().unwrap_or("Notification"));
162
-
let recipient = sanitize_header_value(¬ification.recipient);
163
-
let from_header = if self.from_name.is_empty() {
164
-
self.from_address.clone()
165
-
} else {
166
-
format!(
167
-
"{} <{}>",
168
-
sanitize_header_value(&self.from_name),
169
-
self.from_address
170
-
)
171
-
};
172
-
format!(
173
-
"From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
174
-
from_header, recipient, subject, notification.body
175
-
)
176
-
}
177
-
}
178
-
179
-
#[async_trait]
180
-
impl CommsSender for EmailSender {
181
-
fn channel(&self) -> CommsChannel {
182
-
CommsChannel::Email
183
-
}
184
-
185
-
async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
186
-
let email_content = self.format_email(notification);
187
-
let mut child = Command::new(&self.sendmail_path)
188
-
.arg("-t")
189
-
.arg("-oi")
190
-
.stdin(Stdio::piped())
191
-
.stdout(Stdio::piped())
192
-
.stderr(Stdio::piped())
193
-
.spawn()
194
-
.map_err(|e| SendError::ProcessSpawn {
195
-
command: self.sendmail_path.clone(),
196
-
source: e,
197
-
})?;
198
-
if let Some(mut stdin) = child.stdin.take() {
199
-
stdin
200
-
.write_all(email_content.as_bytes())
201
-
.await
202
-
.map_err(|e| SendError::ProcessSpawn {
203
-
command: self.sendmail_path.clone(),
204
-
source: e,
205
-
})?;
206
-
}
207
-
let output = child
208
-
.wait_with_output()
209
-
.await
210
-
.map_err(|e| SendError::ProcessSpawn {
211
-
command: self.sendmail_path.clone(),
212
-
source: e,
213
-
})?;
214
-
if !output.status.success() {
215
-
let stderr = String::from_utf8_lossy(&output.stderr);
216
-
return Err(SendError::ProcessFailed {
217
-
command: self.sendmail_path.clone(),
218
-
detail: stderr.to_string(),
219
-
});
220
-
}
221
-
Ok(())
222
-
}
223
-
}
224
-
225
147
const DISCORD_API_BASE: &str = "https://discord.com/api/v10";
226
148
227
149
#[derive(Clone)]
···
610
532
))
611
533
}
612
534
}
535
+
536
+
#[cfg(test)]
537
+
mod is_permanent_matrix {
538
+
use super::{CommsChannel, SendError};
539
+
540
+
#[test]
541
+
fn permanent_variants_are_permanent() {
542
+
assert!(SendError::SmtpPermanent("x".into()).is_permanent());
543
+
assert!(SendError::DnsPermanent("x".into()).is_permanent());
544
+
assert!(SendError::InvalidRecipient("x".into()).is_permanent());
545
+
assert!(SendError::MessageBuild("x".into()).is_permanent());
546
+
assert!(SendError::DkimSign("x".into()).is_permanent());
547
+
assert!(SendError::ConfigInvalid("x".into()).is_permanent());
548
+
}
549
+
550
+
#[test]
551
+
fn transient_variants_are_not_permanent() {
552
+
assert!(!SendError::SmtpTransient("x".into()).is_permanent());
553
+
assert!(!SendError::DnsTransient("x".into()).is_permanent());
554
+
assert!(!SendError::Timeout.is_permanent());
555
+
assert!(!SendError::ExternalService("x".into()).is_permanent());
556
+
assert!(!SendError::MaxRetriesExceeded("x".into()).is_permanent());
557
+
assert!(!SendError::NotConfigured(CommsChannel::Email).is_permanent());
558
+
}
559
+
}
+2
crates/tranquil-db-traits/src/infra.rs
+2
crates/tranquil-db-traits/src/infra.rs
+24
-2
crates/tranquil-db/src/postgres/infra.rs
+24
-2
crates/tranquil-db/src/postgres/infra.rs
···
65
65
SET status = 'processing', updated_at = NOW()
66
66
WHERE id IN (
67
67
SELECT id FROM comms_queue
68
-
WHERE status = 'pending'
68
+
WHERE attempts < max_attempts
69
69
AND scheduled_for <= $1
70
-
AND attempts < max_attempts
70
+
AND (
71
+
status = 'pending'
72
+
OR (status = 'processing'
73
+
AND updated_at < $1 - INTERVAL '10 minutes')
74
+
)
71
75
ORDER BY scheduled_for ASC
72
76
LIMIT $2
73
77
FOR UPDATE SKIP LOCKED
···
127
131
Ok(())
128
132
}
129
133
134
+
async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError> {
135
+
sqlx::query!(
136
+
r#"UPDATE comms_queue
137
+
SET status = 'failed'::comms_status,
138
+
attempts = max_attempts,
139
+
last_error = $2,
140
+
updated_at = NOW()
141
+
WHERE id = $1"#,
142
+
id,
143
+
error
144
+
)
145
+
.execute(&self.pool)
146
+
.await
147
+
.map_err(map_sqlx_error)?;
148
+
149
+
Ok(())
150
+
}
151
+
130
152
async fn create_invite_code(
131
153
&self,
132
154
code: &str,
+1
-1
crates/tranquil-pds/src/comms/mod.rs
+1
-1
crates/tranquil-pds/src/comms/mod.rs
···
4
4
CommsChannel, CommsSender, CommsStatus, CommsType, DEFAULT_LOCALE, DiscordSender, EmailSender,
5
5
NewComms, NotificationStrings, QueuedComms, SendError, SignalSender, TelegramSender,
6
6
VALID_LOCALES, format_message, get_strings, is_valid_phone_number, is_valid_signal_username,
7
-
mime_encode_header, sanitize_header_value, validate_locale,
7
+
validate_locale,
8
8
};
9
9
10
10
pub use service::{CommsService, repo as comms_repo, resolve_delivery_channel};
+15
-1
crates/tranquil-pds/src/comms/service.rs
+15
-1
crates/tranquil-pds/src/comms/service.rs
···
149
149
}
150
150
}
151
151
Err(e) => {
152
+
let permanent = e.is_permanent();
152
153
let error_msg = e.to_string();
153
154
warn!(
154
155
comms_id = %comms_id,
155
156
error = %error_msg,
157
+
permanent,
156
158
"Failed to send comms"
157
159
);
158
-
if let Err(db_err) = self.mark_failed(comms_id, &error_msg).await {
160
+
let db_result = match permanent {
161
+
true => self.mark_failed_permanent(comms_id, &error_msg).await,
162
+
false => self.mark_failed(comms_id, &error_msg).await,
163
+
};
164
+
if let Err(db_err) = db_result {
159
165
error!(
160
166
comms_id = %comms_id,
161
167
error = %db_err,
···
173
179
async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), tranquil_db_traits::DbError> {
174
180
self.infra_repo.mark_comms_failed(id, error).await
175
181
}
182
+
183
+
async fn mark_failed_permanent(
184
+
&self,
185
+
id: Uuid,
186
+
error: &str,
187
+
) -> Result<(), tranquil_db_traits::DbError> {
188
+
self.infra_repo.mark_comms_failed_permanent(id, error).await
189
+
}
176
190
}
177
191
178
192
struct ResolvedRecipient {
+1
-37
crates/tranquil-pds/tests/security_fixes.rs
+1
-37
crates/tranquil-pds/tests/security_fixes.rs
···
1
1
mod common;
2
-
use tranquil_pds::comms::{
3
-
SendError, is_valid_phone_number, is_valid_signal_username, sanitize_header_value,
4
-
};
2
+
use tranquil_pds::comms::{SendError, is_valid_phone_number, is_valid_signal_username};
5
3
use tranquil_pds::image::{ImageError, ImageProcessor};
6
4
7
-
#[test]
8
-
fn test_header_injection_sanitization() {
9
-
let malicious = "Injected\r\nBcc: attacker@evil.com";
10
-
let sanitized = sanitize_header_value(malicious);
11
-
assert!(!sanitized.contains('\r') && !sanitized.contains('\n'));
12
-
assert!(sanitized.contains("Injected") && sanitized.contains("Bcc:"));
13
-
14
-
let normal = "Normal Subject Line";
15
-
assert_eq!(sanitize_header_value(normal), "Normal Subject Line");
16
-
17
-
let padded = " Subject ";
18
-
assert_eq!(sanitize_header_value(padded), "Subject");
19
-
20
-
let multi_newline = "Line1\r\nLine2\nLine3\rLine4";
21
-
let sanitized = sanitize_header_value(multi_newline);
22
-
assert!(!sanitized.contains('\r') && !sanitized.contains('\n'));
23
-
assert!(sanitized.contains("Line1") && sanitized.contains("Line4"));
24
-
25
-
let header_injection = "Normal Subject\r\nBcc: attacker@evil.com\r\nX-Injected: value";
26
-
let sanitized = sanitize_header_value(header_injection);
27
-
assert_eq!(sanitized.split("\r\n").count(), 1);
28
-
assert!(
29
-
sanitized.contains("Normal Subject")
30
-
&& sanitized.contains("Bcc:")
31
-
&& sanitized.contains("X-Injected:")
32
-
);
33
-
34
-
let with_null = "client\0id";
35
-
assert!(sanitize_header_value(with_null).contains("client"));
36
-
37
-
let long_input = "x".repeat(10000);
38
-
assert!(!sanitize_header_value(&long_input).is_empty());
39
-
}
40
-
41
5
#[test]
42
6
fn test_phone_number_validation() {
43
7
assert!(is_valid_phone_number("+1234567890"));
+12
crates/tranquil-store/src/metastore/client.rs
+12
crates/tranquil-store/src/metastore/client.rs
···
1860
1860
recv(rx).await
1861
1861
}
1862
1862
1863
+
async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError> {
1864
+
let (tx, rx) = oneshot::channel();
1865
+
self.pool.send(MetastoreRequest::Infra(
1866
+
InfraRequest::MarkCommsFailedPermanent {
1867
+
id,
1868
+
error: error.to_owned(),
1869
+
tx,
1870
+
},
1871
+
))?;
1872
+
recv(rx).await
1873
+
}
1874
+
1863
1875
async fn create_invite_code(
1864
1876
&self,
1865
1877
code: &str,
+13
crates/tranquil-store/src/metastore/handler.rs
+13
crates/tranquil-store/src/metastore/handler.rs
···
1789
1789
error: String,
1790
1790
tx: Tx<()>,
1791
1791
},
1792
+
MarkCommsFailedPermanent {
1793
+
id: Uuid,
1794
+
error: String,
1795
+
tx: Tx<()>,
1796
+
},
1792
1797
CreateInviteCode {
1793
1798
code: String,
1794
1799
use_count: i32,
···
3888
3893
.map_err(metastore_to_db);
3889
3894
let _ = tx.send(result);
3890
3895
}
3896
+
InfraRequest::MarkCommsFailedPermanent { id, error, tx } => {
3897
+
let result = state
3898
+
.metastore
3899
+
.infra_ops()
3900
+
.mark_comms_failed_permanent(id, &error)
3901
+
.map_err(metastore_to_db);
3902
+
let _ = tx.send(result);
3903
+
}
3891
3904
InfraRequest::CreateInviteCode {
3892
3905
code,
3893
3906
use_count,
+38
-2
crates/tranquil-store/src/metastore/infra_ops.rs
+38
-2
crates/tranquil-store/src/metastore/infra_ops.rs
···
247
247
248
248
val.status = status_to_u8(CommsStatus::Sent);
249
249
val.sent_at_ms = Some(Utc::now().timestamp_millis());
250
-
val.attempts = val.attempts.saturating_add(1);
251
250
252
251
let mut batch = self.db.batch();
253
252
batch.insert(&self.infra, key.as_slice(), val.serialize());
···
272
271
)?
273
272
.ok_or(MetastoreError::InvalidInput("comms entry not found"))?;
274
273
274
+
let next_attempts = val.attempts.saturating_add(1);
275
+
let exhausted = next_attempts >= val.max_attempts;
276
+
let next_status = match exhausted {
277
+
true => CommsStatus::Failed,
278
+
false => CommsStatus::Pending,
279
+
};
280
+
let now_ms = Utc::now().timestamp_millis();
281
+
let backoff_ms = i64::from(next_attempts).saturating_mul(60_000);
282
+
283
+
val.status = status_to_u8(next_status);
284
+
val.error_message = Some(error.to_owned());
285
+
val.attempts = next_attempts;
286
+
val.scheduled_for_ms = now_ms.saturating_add(backoff_ms);
287
+
288
+
let mut batch = self.db.batch();
289
+
batch.insert(&self.infra, key.as_slice(), val.serialize());
290
+
291
+
if let Some((hk, mut hv)) =
292
+
self.find_history_entry(val.user_id.unwrap_or(Uuid::nil()), val.id)?
293
+
{
294
+
hv.status = status_to_u8(next_status);
295
+
batch.insert(&self.infra, hk.as_slice(), hv.serialize());
296
+
}
297
+
298
+
batch.commit().map_err(MetastoreError::Fjall)
299
+
}
300
+
301
+
pub fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), MetastoreError> {
302
+
let key = comms_queue_key(id);
303
+
let mut val: QueuedCommsValue = point_lookup(
304
+
&self.infra,
305
+
key.as_slice(),
306
+
QueuedCommsValue::deserialize,
307
+
"corrupt comms queue entry",
308
+
)?
309
+
.ok_or(MetastoreError::InvalidInput("comms entry not found"))?;
310
+
275
311
val.status = status_to_u8(CommsStatus::Failed);
276
312
val.error_message = Some(error.to_owned());
277
-
val.attempts = val.attempts.saturating_add(1);
313
+
val.attempts = val.max_attempts;
278
314
279
315
let mut batch = self.db.batch();
280
316
batch.insert(&self.infra, key.as_slice(), val.serialize());
History
4 rounds
0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
feat(comms): EmailSender, permanent/transient routing
Lewis: May this revision serve well! <lu5a@proton.me>
merge conflicts detected
expand
collapse
expand
collapse
- .config/nextest.toml:68
- Cargo.lock:9
- Cargo.toml:26
- crates/tranquil-comms/Cargo.toml:10
- crates/tranquil-config/src/lib.rs:5
- example.toml:373
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
feat(comms): EmailSender, permanent/transient routing
Lewis: May this revision serve well! <lu5a@proton.me>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
feat(comms): EmailSender, permanent/transient routing
Lewis: May this revision serve well! <lu5a@proton.me>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
feat(comms): EmailSender, permanent/transient routing
Lewis: May this revision serve well! <lu5a@proton.me>