diff --git a/app/app/email_utils.py b/app/app/email_utils.py index 53c1896..70cec70 100644 --- a/app/app/email_utils.py +++ b/app/app/email_utils.py @@ -951,6 +951,8 @@ def add_header(msg: Message, text_header, html_header=None) -> Message: for part in msg.get_payload(): if isinstance(part, Message): new_parts.append(add_header(part, text_header, html_header)) + elif isinstance(part, str): + new_parts.append(MIMEText(part)) else: new_parts.append(part) clone_msg = copy(msg) @@ -959,7 +961,14 @@ def add_header(msg: Message, text_header, html_header=None) -> Message: elif content_type in ("multipart/mixed", "multipart/signed"): new_parts = [] - parts = list(msg.get_payload()) + payload = msg.get_payload() + if isinstance(payload, str): + # The message is badly formatted inject as new + new_parts = [MIMEText(text_header, "plain"), MIMEText(payload, "plain")] + clone_msg = copy(msg) + clone_msg.set_payload(new_parts) + return clone_msg + parts = list(payload) LOG.d("only add header for the first part for %s", content_type) for ix, part in enumerate(parts): if ix == 0: diff --git a/app/tests/example_emls/add_header_multipart.eml b/app/tests/example_emls/add_header_multipart.eml new file mode 100644 index 0000000..2c6539e --- /dev/null +++ b/app/tests/example_emls/add_header_multipart.eml @@ -0,0 +1,21 @@ +Sender: somebody@somewhere.net +Content-Type: multipart/mixed; boundary="----=_Part_3946_1099248058.1688752298149" + +--0c916c9b5fe3c925d7bafeb988bb6794 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: quoted-printable + +notification test + +--0c916c9b5fe3c925d7bafeb988bb6794 +Content-Type: text/html; charset="UTF-8" +Content-Transfer-Encoding: quoted-printable + +
+ +--0c916c9b5fe3c925d7bafeb988bb6794-- + + diff --git a/app/tests/example_emls/email_to_pgp_encrypt.eml b/app/tests/example_emls/email_to_pgp_encrypt.eml new file mode 100644 index 0000000..fb3b969 --- /dev/null +++ b/app/tests/example_emls/email_to_pgp_encrypt.eml @@ -0,0 +1,27 @@ +From: {{sender_address}} +To: {{recipient_address}} +Subject: Test subject +Content-Type: multipart/alternative; boundary="MLF8fvg556fdhFDH7=_?: + +--MLF8fvg556fdhFDH7=_?: +Content-Type: text/plain; + charset="utf-8" +Content-Transfer-Encoding: quoted-printable + +************************************************************************* + +This five-part limited series, based on the brilliant graphic novel by Me + +--MLF8fvg556fdhFDH7=_?: +Content-Type: text/html; + charset="utf-8" +Content-Transfer-Encoding: 8bit +--MLF8fvg556fdhFDH7=_?: +Content-Type: text/plain; + charset="utf-8" +Content-Transfer-Encoding: quoted-printable + +************************************************************************* +************************************************************************* + + diff --git a/app/tests/handler/test_encrypt_pgp.py b/app/tests/handler/test_encrypt_pgp.py new file mode 100644 index 0000000..d265a84 --- /dev/null +++ b/app/tests/handler/test_encrypt_pgp.py @@ -0,0 +1,33 @@ +from aiosmtpd.smtp import Envelope + +import email_handler +from app.config import get_abs_path +from app.db import Session +from app.pgp_utils import load_public_key +from tests.utils import create_new_user, load_eml_file, random_email + +from app.models import Alias + + +def test_encrypt_with_pgp(): + user = create_new_user() + pgp_public_key = open(get_abs_path("local_data/public-pgp.asc")).read() + mailbox = user.default_mailbox + mailbox.pgp_public_key = pgp_public_key + mailbox.generic_subject = True + mailbox.pgp_finger_print = load_public_key(pgp_public_key) + alias = Alias.create_new_random(user) + Session.flush() + sender_address = random_email() + msg = load_eml_file( + "email_to_pgp_encrypt.eml", + { + "sender_address": sender_address, + "recipient_address": alias.email, + }, + ) + envelope = Envelope() + envelope.mail_from = sender_address + envelope.rcpt_tos = [alias.email] + result = email_handler.MailHandler()._handle(envelope, msg) + assert result is not None diff --git a/app/tests/test_email_utils.py b/app/tests/test_email_utils.py index 2b96312..749603a 100644 --- a/app/tests/test_email_utils.py +++ b/app/tests/test_email_utils.py @@ -810,7 +810,7 @@ def test_add_header_multipart_with_invalid_part(): if i < 2: assert part.get_payload().index("INJECT") > -1 else: - assert part == "invalid" + assert part.get_payload() == "invalid" def test_sl_formataddr(): @@ -822,3 +822,10 @@ def test_sl_formataddr(): # test that the same name-address can't be handled by the built-in formataddr with pytest.raises(UnicodeEncodeError): formataddr(("é", "è@ç.à")) + + +def test_add_header_to_invalid_multipart(): + msg = load_eml_file("add_header_multipart.eml") + msg = add_header(msg, "test", "test") + data = msg.as_string() + assert data != ""