4.61.0
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 4m9s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 4m14s
Build-Release-Image / Merge-Images (push) Successful in 47s
Build-Release-Image / Create-Release (push) Successful in 16s
Build-Release-Image / Notify (push) Successful in 3s

This commit is contained in:
2024-11-29 12:00:12 +00:00
parent 545eeda79b
commit 3c77f8af4b
17 changed files with 385 additions and 154 deletions

View File

@ -36,6 +36,24 @@ def test_delete_mailbox_transfer_mailbox_primary(flask_client):
assert str(mails_sent[0].msg).find("alias have been transferred") > -1
@mail_sender.store_emails_test_decorator
def test_delete_mailbox_no_email(flask_client):
user = create_new_user()
m1 = Mailbox.create(
user_id=user.id, email=random_email(), verified=True, flush=True
)
job = Job.create(
name=JOB_DELETE_MAILBOX,
payload={"mailbox_id": m1.id, "transfer_mailbox_id": None, "send_mail": False},
run_at=arrow.now(),
commit=True,
)
Session.commit()
delete_mailbox_job(job)
mails_sent = mail_sender.get_stored_emails()
assert len(mails_sent) == 0
@mail_sender.store_emails_test_decorator
def test_delete_mailbox_transfer_mailbox_in_list(flask_client):
user = create_new_user()

View File

@ -0,0 +1,100 @@
import arrow
from app.account_linking import (
SLPlan,
SLPlanType,
set_plan_for_partner_user,
)
from app.db import Session
from app.models import User, PartnerUser, PartnerSubscription
from app.proton.utils import get_proton_partner
from app.utils import random_string
from tests.utils import random_email
partner_user_id: int = 0
def setup_module():
global partner_user_id
email = random_email()
external_id = random_string()
sl_user = User.create(email, commit=True)
partner_user_id = PartnerUser.create(
user_id=sl_user.id,
partner_id=get_proton_partner().id,
external_user_id=external_id,
partner_email=email,
commit=True,
).id
def setup_function(func):
Session.query(PartnerSubscription).delete()
def test_free_plan_removes_sub():
pu = PartnerUser.get(partner_user_id)
sub_id = PartnerSubscription.create(
partner_user_id=partner_user_id,
end_at=arrow.utcnow(),
lifetime=False,
commit=True,
).id
set_plan_for_partner_user(pu, plan=SLPlan(type=SLPlanType.Free, expiration=None))
assert PartnerSubscription.get(sub_id) is None
def test_premium_plan_updates_expiration():
pu = PartnerUser.get(partner_user_id)
sub_id = PartnerSubscription.create(
partner_user_id=partner_user_id,
end_at=arrow.utcnow(),
lifetime=False,
commit=True,
).id
new_expiration = arrow.utcnow().shift(days=+10)
set_plan_for_partner_user(
pu, plan=SLPlan(type=SLPlanType.Premium, expiration=new_expiration)
)
assert PartnerSubscription.get(sub_id).end_at == new_expiration
def test_premium_plan_creates_sub():
pu = PartnerUser.get(partner_user_id)
new_expiration = arrow.utcnow().shift(days=+10)
set_plan_for_partner_user(
pu, plan=SLPlan(type=SLPlanType.Premium, expiration=new_expiration)
)
assert (
PartnerSubscription.get_by(partner_user_id=partner_user_id).end_at
== new_expiration
)
def test_lifetime_creates_sub():
pu = PartnerUser.get(partner_user_id)
new_expiration = arrow.utcnow().shift(days=+10)
set_plan_for_partner_user(
pu, plan=SLPlan(type=SLPlanType.PremiumLifetime, expiration=new_expiration)
)
sub = PartnerSubscription.get_by(partner_user_id=partner_user_id)
assert sub is not None
assert sub.end_at is None
assert sub.lifetime
def test_lifetime_updates_sub():
pu = PartnerUser.get(partner_user_id)
sub_id = PartnerSubscription.create(
partner_user_id=partner_user_id,
end_at=arrow.utcnow(),
lifetime=False,
commit=True,
).id
set_plan_for_partner_user(
pu, plan=SLPlan(type=SLPlanType.PremiumLifetime, expiration=arrow.utcnow())
)
sub = PartnerSubscription.get(sub_id)
assert sub is not None
assert sub.end_at is None
assert sub.lifetime

View File

@ -14,7 +14,6 @@ from app.email_utils import generate_verp_email
from app.mail_sender import mail_sender
from app.models import (
Alias,
AuthorizedAddress,
IgnoredEmail,
EmailLog,
Notification,
@ -24,35 +23,12 @@ from app.models import (
)
from app.utils import random_string, canonicalize_email
from email_handler import (
get_mailbox_from_mail_from,
should_ignore,
is_automatic_out_of_office,
)
from tests.utils import load_eml_file, create_new_user, random_email
def test_get_mailbox_from_mail_from(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
mb = get_mailbox_from_mail_from(user.email, alias)
assert mb.email == user.email
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
assert mb is None
# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="unauthorized@gmail.com",
commit=True,
)
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
assert mb.email == user.email
def test_should_ignore(flask_client):
assert should_ignore("mail_from", []) is False

View File

@ -791,12 +791,21 @@ def test_parse_id_from_bounce():
assert parse_id_from_bounce("anything+1234+@local") == 1234
def test_get_queue_id():
def test_get_queue_id_esmtps():
for id_type in ["SMTP", "ESMTP", "ESMTPA", "ESMTPS"]:
msg = email.message_from_string(
f"Received: from mail-wr1-x434.google.com (mail-wr1-x434.google.com [IPv6:2a00:1450:4864:20::434])\r\n\t(using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits))\r\n\t(No client certificate requested)\r\n\tby mx1.simplelogin.co (Postfix) with {id_type} id 4FxQmw1DXdz2vK2\r\n\tfor <jglfdjgld@alias.com>; Fri, 4 Jun 2021 14:55:43 +0000 (UTC)"
)
assert get_queue_id(msg) == "4FxQmw1DXdz2vK2", f"Failed for {id_type}"
def test_get_queue_id_postfix():
msg = email.message_from_string(
"Received: from mail-wr1-x434.google.com (mail-wr1-x434.google.com [IPv6:2a00:1450:4864:20::434])\r\n\t(using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits))\r\n\t(No client certificate requested)\r\n\tby mx1.simplelogin.co (Postfix) with ESMTPS id 4FxQmw1DXdz2vK2\r\n\tfor <jglfdjgld@alias.com>; Fri, 4 Jun 2021 14:55:43 +0000 (UTC)"
"Received: by mailin001.somewhere.net (Postfix)\r\n\tid 4Xz5pb2nMszGrqpL; Wed, 27 Nov 2024 17:21:59 +0000 (UTC)'] by mailin001.somewhere.net (Postfix)"
)
assert get_queue_id(msg) == "4FxQmw1DXdz2vK2"
assert get_queue_id(msg) == "4Xz5pb2nMszGrqpL"
def test_get_queue_id_from_double_header():

View File

@ -6,9 +6,18 @@ import pytest
from app import mailbox_utils, config
from app.db import Session
from app.mail_sender import mail_sender
from app.mailbox_utils import MailboxEmailChangeError
from app.models import Mailbox, MailboxActivation, User, Job, UserAuditLog
from app.mailbox_utils import MailboxEmailChangeError, get_mailbox_for_reply_phase
from app.models import (
Mailbox,
MailboxActivation,
User,
Job,
UserAuditLog,
Alias,
AuthorizedAddress,
)
from app.user_audit_log_utils import UserAuditLogAction
from app.utils import random_string, canonicalize_email
from tests.utils import create_new_user, random_email
@ -50,6 +59,14 @@ def test_already_used():
mailbox_utils.create_mailbox(user, user.email)
def test_already_used_with_different_case():
user.lifetime = True
email = random_email()
mailbox_utils.create_mailbox(user, email)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.create_mailbox(user, email.upper())
@mail_sender.store_emails_test_decorator
def test_create_mailbox():
email = random_email()
@ -418,3 +435,75 @@ def test_perform_mailbox_email_change_success():
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 1
def test_get_mailbox_from_mail_from(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
mb = get_mailbox_for_reply_phase(user.email, "", alias)
assert mb.email == user.email
mb = get_mailbox_for_reply_phase("unauthorized@gmail.com", "", alias)
assert mb is None
# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="unauthorized@gmail.com",
commit=True,
)
mb = get_mailbox_for_reply_phase("unauthorized@gmail.com", "", alias)
assert mb.email == user.email
def test_get_mailbox_from_mail_from_for_canonical_email(flask_client):
prefix = random_string(10)
email = f"{prefix}+subaddresxs@gmail.com"
canonical_email = canonicalize_email(email)
assert canonical_email != email
user = create_new_user()
mbox = Mailbox.create(
email=canonical_email, user_id=user.id, verified=True, flush=True
)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()
mb = get_mailbox_for_reply_phase(email, "", alias)
assert mb.email == canonical_email
mb = get_mailbox_for_reply_phase(canonical_email, "", alias)
assert mb.email == canonical_email
def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()
mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb.email == mail_from
def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_not_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@other_{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()
mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb is None