4.50.0
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m43s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m54s
Build-Release-Image / Merge-Images (push) Successful in 23s
Build-Release-Image / Create-Release (push) Successful in 14s
Build-Release-Image / Notify (push) Successful in 3s

This commit is contained in:
2024-09-19 12:00:06 +01:00
parent 357f0cca57
commit edef254529
22 changed files with 1133 additions and 306 deletions

View File

@ -46,10 +46,11 @@ def test_fire_event_on_alias_creation():
event_content = _get_event_from_string(event_data, user, pu)
assert event_content.alias_created is not None
alias_created = event_content.alias_created
assert alias.id == alias_created.alias_id
assert alias.email == alias_created.alias_email
assert "" == alias_created.alias_note
assert alias.id == alias_created.id
assert alias.email == alias_created.email
assert "" == alias_created.note
assert alias.enabled == alias_created.enabled
assert int(alias.created_at.timestamp) == alias_created.created_at
def test_fire_event_on_alias_creation_with_note():
@ -62,9 +63,9 @@ def test_fire_event_on_alias_creation_with_note():
event_content = _get_event_from_string(event_data, user, pu)
assert event_content.alias_created is not None
alias_created = event_content.alias_created
assert alias.id == alias_created.alias_id
assert alias.email == alias_created.alias_email
assert note == alias_created.alias_note
assert alias.id == alias_created.id
assert alias.email == alias_created.email
assert note == alias_created.note
assert alias.enabled == alias_created.enabled
@ -80,8 +81,8 @@ def test_fire_event_on_alias_deletion():
event_content = _get_event_from_string(event_data, user, pu)
assert event_content.alias_deleted is not None
alias_deleted = event_content.alias_deleted
assert alias_id == alias_deleted.alias_id
assert alias.email == alias_deleted.alias_email
assert alias_id == alias_deleted.id
assert alias.email == alias_deleted.email
def test_fire_event_on_alias_status_change():
@ -95,6 +96,7 @@ def test_fire_event_on_alias_status_change():
event_content = _get_event_from_string(event_data, user, pu)
assert event_content.alias_status_change is not None
event = event_content.alias_status_change
assert alias.id == event.alias_id
assert alias.email == event.alias_email
assert alias.id == event.id
assert alias.email == event.email
assert int(alias.created_at.timestamp) == event.created_at
assert event.enabled

View File

@ -37,10 +37,14 @@ def test_send_alias_creation_events():
event_list = decoded_event.content.alias_create_list.events
assert len(event_list) == 2
# 0 is newsletter alias
assert event_list[1].alias_id == aliases[0].id
assert event_list[1].id == aliases[0].id
assert event_list[1].email == aliases[0].email
assert event_list[1].note == ""
assert event_list[1].enabled == aliases[0].enabled
assert event_list[1].created_at == int(aliases[0].created_at.timestamp)
decoded_event = event_pb2.Event.FromString(dispatcher.events[1])
assert decoded_event.user_id == user.id
assert decoded_event.external_user_id == partner_user.external_user_id
event_list = decoded_event.content.alias_create_list.events
assert len(event_list) == 1
assert event_list[0].alias_id == aliases[1].id
assert event_list[0].id == aliases[1].id

View File

@ -0,0 +1,149 @@
from typing import Optional
from app import config
from app.config import ALIAS_DOMAINS
from app.custom_domain_utils import (
can_domain_be_used,
create_custom_domain,
is_valid_domain,
sanitize_domain,
CannotUseDomainReason,
)
from app.db import Session
from app.models import User, CustomDomain, Mailbox
from tests.utils import get_proton_partner
from tests.utils import create_new_user, random_string, random_domain
user: Optional[User] = None
def setup_module():
global user
config.SKIP_MX_LOOKUP_ON_CHECK = True
user = create_new_user()
user.trial_end = None
user.lifetime = True
Session.commit()
# is_valid_domain
def test_is_valid_domain():
assert is_valid_domain("example.com") is True
assert is_valid_domain("sub.example.com") is True
assert is_valid_domain("ex-ample.com") is True
assert is_valid_domain("-example.com") is False
assert is_valid_domain("example-.com") is False
assert is_valid_domain("exa_mple.com") is False
assert is_valid_domain("example..com") is False
assert is_valid_domain("") is False
assert is_valid_domain("a" * 64 + ".com") is False
assert is_valid_domain("a" * 63 + ".com") is True
assert is_valid_domain("example.com.") is True
assert is_valid_domain(".example.com") is False
assert is_valid_domain("example..com") is False
assert is_valid_domain("example.com-") is False
# can_domain_be_used
def test_can_domain_be_used():
domain = f"{random_string(10)}.com"
res = can_domain_be_used(user, domain)
assert res is None
def test_can_domain_be_used_existing_domain():
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, commit=True)
res = can_domain_be_used(user, domain)
assert res is CannotUseDomainReason.DomainAlreadyUsed
def test_can_domain_be_used_sl_domain():
domain = ALIAS_DOMAINS[0]
res = can_domain_be_used(user, domain)
assert res is CannotUseDomainReason.BuiltinDomain
def test_can_domain_be_used_domain_of_user_email():
domain = user.email.split("@")[1]
res = can_domain_be_used(user, domain)
assert res is CannotUseDomainReason.DomainPartOfUserEmail
def test_can_domain_be_used_domain_of_existing_mailbox():
domain = random_domain()
Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True)
res = can_domain_be_used(user, domain)
assert res is CannotUseDomainReason.DomainUserInMailbox
def test_can_domain_be_used_invalid_domain():
domain = f"{random_string(10)}@lol.com"
res = can_domain_be_used(user, domain)
assert res is CannotUseDomainReason.InvalidDomain
# sanitize_domain
def test_can_sanitize_domain_empty():
assert sanitize_domain("") == ""
def test_can_sanitize_domain_starting_with_http():
domain = "test.domain"
assert sanitize_domain(f"http://{domain}") == domain
def test_can_sanitize_domain_starting_with_https():
domain = "test.domain"
assert sanitize_domain(f"https://{domain}") == domain
def test_can_sanitize_domain_correct_domain():
domain = "test.domain"
assert sanitize_domain(domain) == domain
# create_custom_domain
def test_can_create_custom_domain():
domain = random_domain()
res = create_custom_domain(user=user, domain=domain)
assert res.success is True
assert res.redirect is None
assert res.message == ""
assert res.message_category == ""
assert res.instance is not None
assert res.instance.domain == domain
assert res.instance.user_id == user.id
def test_can_create_custom_domain_validates_if_parent_is_validated():
root_domain = random_domain()
subdomain = f"{random_string(10)}.{root_domain}"
# Create custom domain with the root domain
CustomDomain.create(
user_id=user.id,
domain=root_domain,
verified=True,
ownership_verified=True,
commit=True,
)
# Create custom domain with subdomain. Should automatically be verified
res = create_custom_domain(user=user, domain=subdomain)
assert res.success is True
assert res.instance.domain == subdomain
assert res.instance.user_id == user.id
assert res.instance.ownership_verified is True
def test_creates_custom_domain_with_partner_id():
domain = random_domain()
proton_partner = get_proton_partner()
res = create_custom_domain(user=user, domain=domain, partner_id=proton_partner.id)
assert res.success is True
assert res.instance.domain == domain
assert res.instance.user_id == user.id
assert res.instance.partner_id == proton_partner.id

View File

@ -0,0 +1,376 @@
from typing import Optional
from app import config
from app.constants import DMARC_RECORD
from app.custom_domain_validation import CustomDomainValidation
from app.db import Session
from app.models import CustomDomain, User
from app.dns_utils import InMemoryDNSClient
from app.proton.utils import get_proton_partner
from app.utils import random_string
from tests.utils import create_new_user, random_domain
user: Optional[User] = None
def setup_module():
global user
config.SKIP_MX_LOOKUP_ON_CHECK = True
user = create_new_user()
user.trial_end = None
user.lifetime = True
Session.commit()
def create_custom_domain(domain: str) -> CustomDomain:
return CustomDomain.create(user_id=user.id, domain=domain, commit=True)
def test_custom_domain_validation_get_dkim_records():
domain = random_domain()
custom_domain = create_custom_domain(domain)
validator = CustomDomainValidation(domain)
records = validator.get_dkim_records(custom_domain)
assert len(records) == 3
assert records["dkim02._domainkey"] == f"dkim02._domainkey.{domain}"
assert records["dkim03._domainkey"] == f"dkim03._domainkey.{domain}"
assert records["dkim._domainkey"] == f"dkim._domainkey.{domain}"
def test_custom_domain_validation_get_dkim_records_for_partner():
domain = random_domain()
custom_domain = create_custom_domain(domain)
partner_id = get_proton_partner().id
custom_domain.partner_id = partner_id
Session.commit()
dkim_domain = random_domain()
validator = CustomDomainValidation(
domain, partner_domains={partner_id: dkim_domain}
)
records = validator.get_dkim_records(custom_domain)
assert len(records) == 3
assert records["dkim02._domainkey"] == f"dkim02._domainkey.{dkim_domain}"
assert records["dkim03._domainkey"] == f"dkim03._domainkey.{dkim_domain}"
assert records["dkim._domainkey"] == f"dkim._domainkey.{dkim_domain}"
# validate_dkim_records
def test_custom_domain_validation_validate_dkim_records_empty_records_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
res = validator.validate_dkim_records(domain)
assert len(res) == 3
for record_value in res.values():
assert record_value == "empty"
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dkim_verified is False
def test_custom_domain_validation_validate_dkim_records_wrong_records_failure():
dkim_domain = random_domain()
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(dkim_domain, dns_client)
user_domain = random_domain()
# One domain right, two domains wrong
dns_client.set_cname_record(
f"dkim._domainkey.{user_domain}", f"dkim._domainkey.{dkim_domain}"
)
dns_client.set_cname_record(f"dkim02._domainkey.{user_domain}", "wrong")
dns_client.set_cname_record(f"dkim03._domainkey.{user_domain}", "wrong")
domain = create_custom_domain(user_domain)
res = validator.validate_dkim_records(domain)
assert len(res) == 2
for record_value in res.values():
assert record_value == "wrong"
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dkim_verified is False
def test_custom_domain_validation_validate_dkim_records_success_with_old_system():
dkim_domain = random_domain()
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(dkim_domain, dns_client)
user_domain = random_domain()
# One domain right, other domains missing
dns_client.set_cname_record(
f"dkim._domainkey.{user_domain}", f"dkim._domainkey.{dkim_domain}"
)
domain = create_custom_domain(user_domain)
# DKIM is verified
domain.dkim_verified = True
Session.commit()
res = validator.validate_dkim_records(domain)
assert len(res) == 2
assert f"dkim02._domainkey.{user_domain}" in res
assert f"dkim03._domainkey.{user_domain}" in res
# Flag is not cleared
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dkim_verified is True
def test_custom_domain_validation_validate_dkim_records_success():
dkim_domain = random_domain()
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(dkim_domain, dns_client)
user_domain = random_domain()
# One domain right, two domains wrong
dns_client.set_cname_record(
f"dkim._domainkey.{user_domain}", f"dkim._domainkey.{dkim_domain}"
)
dns_client.set_cname_record(
f"dkim02._domainkey.{user_domain}", f"dkim02._domainkey.{dkim_domain}"
)
dns_client.set_cname_record(
f"dkim03._domainkey.{user_domain}", f"dkim03._domainkey.{dkim_domain}"
)
domain = create_custom_domain(user_domain)
res = validator.validate_dkim_records(domain)
assert len(res) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dkim_verified is True
# validate_ownership
def test_custom_domain_validation_validate_ownership_empty_records_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
res = validator.validate_domain_ownership(domain)
assert res.success is False
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.ownership_verified is False
def test_custom_domain_validation_validate_ownership_wrong_records_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
wrong_records = [random_string()]
dns_client.set_txt_record(domain.domain, wrong_records)
res = validator.validate_domain_ownership(domain)
assert res.success is False
assert res.errors == wrong_records
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.ownership_verified is False
def test_custom_domain_validation_validate_ownership_success():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
dns_client.set_txt_record(
domain.domain, [validator.get_ownership_verification_record(domain)]
)
res = validator.validate_domain_ownership(domain)
assert res.success is True
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.ownership_verified is True
def test_custom_domain_validation_validate_ownership_from_partner_success():
dns_client = InMemoryDNSClient()
partner_id = get_proton_partner().id
prefix = random_string()
validator = CustomDomainValidation(
random_domain(),
dns_client,
partner_domains_validation_prefixes={partner_id: prefix},
)
domain = create_custom_domain(random_domain())
domain.partner_id = partner_id
Session.commit()
dns_client.set_txt_record(
domain.domain, [validator.get_ownership_verification_record(domain)]
)
res = validator.validate_domain_ownership(domain)
assert res.success is True
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.ownership_verified is True
# validate_mx_records
def test_custom_domain_validation_validate_mx_records_empty_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
res = validator.validate_mx_records(domain)
assert res.success is False
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.verified is False
def test_custom_domain_validation_validate_mx_records_wrong_records_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
wrong_record_1 = random_string()
wrong_record_2 = random_string()
wrong_records = [(10, wrong_record_1), (20, wrong_record_2)]
dns_client.set_mx_records(domain.domain, wrong_records)
res = validator.validate_mx_records(domain)
assert res.success is False
assert res.errors == [f"10 {wrong_record_1}", f"20 {wrong_record_2}"]
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.verified is False
def test_custom_domain_validation_validate_mx_records_success():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
dns_client.set_mx_records(domain.domain, config.EMAIL_SERVERS_WITH_PRIORITY)
res = validator.validate_mx_records(domain)
assert res.success is True
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.verified is True
# validate_spf_records
def test_custom_domain_validation_validate_spf_records_empty_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
res = validator.validate_spf_records(domain)
assert res.success is False
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.spf_verified is False
def test_custom_domain_validation_validate_spf_records_wrong_records_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
wrong_records = [random_string()]
dns_client.set_txt_record(domain.domain, wrong_records)
res = validator.validate_spf_records(domain)
assert res.success is False
assert res.errors == wrong_records
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.spf_verified is False
def test_custom_domain_validation_validate_spf_records_success():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
dns_client.set_txt_record(domain.domain, [f"v=spf1 include:{config.EMAIL_DOMAIN}"])
res = validator.validate_spf_records(domain)
assert res.success is True
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.spf_verified is True
# validate_dmarc_records
def test_custom_domain_validation_validate_dmarc_records_empty_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
res = validator.validate_dmarc_records(domain)
assert res.success is False
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dmarc_verified is False
def test_custom_domain_validation_validate_dmarc_records_wrong_records_failure():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
wrong_records = [random_string()]
dns_client.set_txt_record(f"_dmarc.{domain.domain}", wrong_records)
res = validator.validate_dmarc_records(domain)
assert res.success is False
assert res.errors == wrong_records
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dmarc_verified is False
def test_custom_domain_validation_validate_dmarc_records_success():
dns_client = InMemoryDNSClient()
validator = CustomDomainValidation(random_domain(), dns_client)
domain = create_custom_domain(random_domain())
dns_client.set_txt_record(f"_dmarc.{domain.domain}", [DMARC_RECORD])
res = validator.validate_dmarc_records(domain)
assert res.success is True
assert len(res.errors) == 0
db_domain = CustomDomain.get_by(id=domain.id)
assert db_domain.dmarc_verified is True

View File

@ -1,10 +1,12 @@
from app.dns_utils import (
get_mx_domains,
get_spf_domain,
get_txt_record,
get_network_dns_client,
is_mx_equivalent,
InMemoryDNSClient,
)
from tests.utils import random_domain
# use our own domain for test
_DOMAIN = "simplelogin.io"
@ -20,12 +22,12 @@ def test_get_mx_domains():
def test_get_spf_domain():
r = get_spf_domain(_DOMAIN)
r = get_network_dns_client().get_spf_domain(_DOMAIN)
assert r == ["simplelogin.co"]
def test_get_txt_record():
r = get_txt_record(_DOMAIN)
r = get_network_dns_client().get_txt_record(_DOMAIN)
assert len(r) > 0
@ -46,3 +48,15 @@ def test_is_mx_equivalent():
[(5, "domain1"), (10, "domain2")],
[(10, "domain1"), (20, "domain2"), (20, "domain3")],
)
def test_get_spf_record():
client = InMemoryDNSClient()
sl_domain = random_domain()
domain = random_domain()
spf_record = f"v=spf1 include:{sl_domain}"
client.set_txt_record(domain, [spf_record, "another record"])
res = client.get_spf_domain(domain)
assert res == [sl_domain]