Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,19 @@ impl MsgId {
let result = context
.sql
.query_row_optional(
"SELECT m.state, mdns.msg_id
FROM msgs m LEFT JOIN msgs_mdns mdns ON mdns.msg_id=m.id
WHERE id=?
LIMIT 1",
"
SELECT m.state, mdns.msg_id, c.type
FROM msgs m
LEFT JOIN msgs_mdns mdns ON mdns.msg_id=m.id
LEFT JOIN chats c ON c.id=m.chat_id
WHERE m.id=?
LIMIT 1",
(self,),
|row| {
let state: MessageState = row.get(0)?;
let mdn_msg_id: Option<MsgId> = row.get(1)?;
Ok(state.with_mdns(mdn_msg_id.is_some()))
let chat_type: Option<Chattype> = row.get(2)?;
Ok(state.with_mdns(mdn_msg_id.is_some(), chat_type))
},
)
.await?
Expand Down Expand Up @@ -537,6 +541,7 @@ impl Message {
m.param AS param,
m.hidden AS hidden,
m.location_id AS location,
c.type AS chat_type,
c.archived AS visibility,
c.blocked AS blocked
FROM msgs m
Expand All @@ -548,6 +553,7 @@ impl Message {
|row| {
let state: MessageState = row.get("state")?;
let mdn_msg_id: Option<MsgId> = row.get("mdn_msg_id")?;
let chat_type: Option<Chattype> = row.get("chat_type")?;
let text = match row.get_ref("txt")? {
rusqlite::types::ValueRef::Text(buf) => {
match String::from_utf8(buf.to_vec()) {
Expand Down Expand Up @@ -583,7 +589,7 @@ impl Message {
ephemeral_timer: row.get("ephemeral_timer")?,
ephemeral_timestamp: row.get("ephemeral_timestamp")?,
viewtype: row.get("type").unwrap_or_default(),
state: state.with_mdns(mdn_msg_id.is_some()),
state: state.with_mdns(mdn_msg_id.is_some(), chat_type),
download_state: row.get("download_state")?,
error: Some(row.get::<_, String>("error")?)
.filter(|error| !error.is_empty()),
Expand Down Expand Up @@ -1486,11 +1492,17 @@ impl MessageState {
}

/// Returns adjusted message state if the message has MDNs.
pub(crate) fn with_mdns(self, has_mdns: bool) -> Self {
if self == MessageState::OutDelivered && has_mdns {
return MessageState::OutMdnRcvd;
pub(crate) fn with_mdns(self, has_mdns: bool, chat_type: Option<Chattype>) -> Self {
if !has_mdns {
return self;
}
match self {
MessageState::OutFailed if chat_type == Some(Chattype::Single) => {
MessageState::OutMdnRcvd
}
MessageState::OutDelivered => MessageState::OutMdnRcvd,
_ => self,
}
self
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/message/message_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::chatlist::Chatlist;
use crate::config::Config;
use crate::reaction::send_reaction;
use crate::receive_imf::receive_imf;
use crate::smtp;
use crate::test_utils;
use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};

Expand Down Expand Up @@ -450,6 +451,13 @@ async fn test_get_state() -> Result<()> {
markseen_msgs(&bob, vec![bob_msg.id]).await?;
assert_state(&bob, bob_msg.id, MessageState::InSeen).await;

smtp::queue_mdn(&bob).await?;
let sent = bob.pop_sent_msg().await;
alice.recv_msg_trash(&sent).await;
let alice_msg = Message::load_from_db(&alice, alice_msg.id).await?;
assert_eq!(alice_msg.get_state(), MessageState::OutMdnRcvd);
assert_eq!(alice_msg.error().unwrap(), "badly failed");

Ok(())
}

Expand Down
108 changes: 72 additions & 36 deletions src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,39 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
return Ok(());
}

let more_mdns = send_mdn(context, connection).await?;
let more_mdns = send_mdn(
context,
connection,
async |context, rfc724_mid, body, recipients, connection| {
let recipients: Vec<_> = recipients
.into_iter()
.filter_map(|addr| {
async_smtp::EmailAddress::new(addr.clone())
.with_context(|| format!("Invalid recipient: {addr}"))
.log_err(context)
.ok()
})
.collect();

match smtp_send(context, &recipients, &body, connection, None).await {
SendResult::Success => {
if !recipients.is_empty() {
info!(context, "Successfully sent MDN for {rfc724_mid}.");
}
Ok(true)
}
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {rfc724_mid}."
);
Ok(false)
}
SendResult::Failure(err) => Err(err),
}
},
)
.await?;
if !more_mdns {
// No more MDNs to send or one of them failed.
return Ok(());
Expand Down Expand Up @@ -551,6 +583,7 @@ async fn send_mdn_rfc724_mid(
rfc724_mid: &str,
contact_id: ContactId,
smtp: &mut Smtp,
send_fn: impl AsyncFn(&Context, &str, String, Vec<String>, &mut Smtp) -> Result<bool>,
) -> Result<bool> {
let contact = Contact::get_by_id(context, contact_id).await?;
if contact.is_blocked() {
Expand Down Expand Up @@ -590,48 +623,51 @@ async fn send_mdn_rfc724_mid(
if context.get_config_bool(Config::BccSelf).await? {
add_self_recipients(context, &mut recipients, encrypted).await?;
}
let recipients: Vec<_> = recipients
.into_iter()
.filter_map(|addr| {
async_smtp::EmailAddress::new(addr.clone())
.with_context(|| format!("Invalid recipient: {addr}"))
.log_err(context)
.ok()
})
.collect();
let sent = send_fn(context, rfc724_mid, body, recipients, smtp).await?;
if sent {
context
.sql
.transaction(|transaction| {
let mut stmt = transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
}
Ok(sent)
}

match smtp_send(context, &recipients, &body, smtp, None).await {
SendResult::Success => {
if !recipients.is_empty() {
info!(context, "Successfully sent MDN for {rfc724_mid}.");
}
#[cfg(test)]
pub(crate) async fn queue_mdn(context: &Context) -> Result<()> {
let queued = send_mdn(
context,
&mut Smtp::new(),
async |context, rfc724_mid, body, recipients, _smtp| {
context
.sql
.transaction(|transaction| {
let mut stmt =
transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.execute(
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
VALUES (?, ?, ?, ?)",
(rfc724_mid, recipients.join(" "), body, u32::MAX),
)
.await?;
Ok(true)
}
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {rfc724_mid}."
);
Ok(false)
}
SendResult::Failure(err) => Err(err),
}
},
)
.await?;
assert!(queued);
Ok(())
}

/// Tries to send a single MDN. Returns true if more MDNs should be sent.
async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
async fn send_mdn(
context: &Context,
smtp: &mut Smtp,
send_fn: impl AsyncFn(&Context, &str, String, Vec<String>, &mut Smtp) -> Result<bool>,
) -> Result<bool> {
if !context.should_send_mdns().await? {
context.sql.execute("DELETE FROM smtp_mdns", []).await?;
return Ok(false);
Expand Down Expand Up @@ -668,7 +704,7 @@ async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
.await
.context("Failed to update MDN retries count")?;

match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp, send_fn).await {
Err(err) => {
// If there is an error, for example there is no message corresponding to the msg_id in the
// database, do not try to send this MDN again.
Expand Down
Loading