diff --git a/app/.github/workflows/main.yml b/app/.github/workflows/main.yml index 360a4ba..b14160a 100644 --- a/app/.github/workflows/main.yml +++ b/app/.github/workflows/main.yml @@ -163,7 +163,7 @@ jobs: uses: docker/build-push-action@v3 with: context: . - platforms: linux/amd64,linux/arm64 + platforms: linux/amd64 push: true tags: ${{ steps.meta.outputs.tags }} diff --git a/app/app/admin_model.py b/app/app/admin_model.py index 7b53cd4..03830a3 100644 --- a/app/app/admin_model.py +++ b/app/app/admin_model.py @@ -34,6 +34,7 @@ from app.models import ( DeletedAlias, DomainDeletedAlias, PartnerUser, + AliasMailbox, ) from app.newsletter_utils import send_newsletter_to_user, send_newsletter_to_address @@ -785,6 +786,25 @@ class EmailSearchHelpers: def mailbox_count(user: User) -> int: return Mailbox.filter_by(user_id=user.id).order_by(Mailbox.id.desc()).count() + @staticmethod + def alias_mailboxes(alias: Alias) -> list[Mailbox]: + return ( + Session.query(Mailbox) + .filter(Mailbox.id == Alias.mailbox_id, Alias.id == alias.id) + .union( + Session.query(Mailbox) + .join(AliasMailbox, Mailbox.id == AliasMailbox.mailbox_id) + .filter(AliasMailbox.alias_id == alias.id) + ) + .order_by(Mailbox.id) + .limit(10) + .all() + ) + + @staticmethod + def alias_mailbox_count(alias: Alias) -> int: + return len(alias.mailboxes) + @staticmethod def alias_list(user: User) -> list[Alias]: return ( diff --git a/app/app/alias_utils.py b/app/app/alias_utils.py index 8e51c4e..47851f8 100644 --- a/app/app/alias_utils.py +++ b/app/app/alias_utils.py @@ -1,6 +1,7 @@ import csv from io import StringIO import re +from dataclasses import dataclass from typing import Optional, Tuple from email_validator import validate_email, EmailNotValidError @@ -23,6 +24,7 @@ from app.email_utils import ( send_cannot_create_domain_alias, send_email, render, + sl_formataddr, ) from app.errors import AliasInTrashError from app.events.event_dispatcher import EventDispatcher @@ -30,6 +32,7 @@ from app.events.generated.event_pb2 import ( AliasDeleted, AliasStatusChanged, EventContent, + AliasCreated, ) from app.log import LOG from app.models import ( @@ -501,6 +504,28 @@ def transfer_alias(alias, new_user, new_mailboxes: [Mailbox]): alias.disable_pgp = False alias.pinned = False + EventDispatcher.send_event( + old_user, + EventContent( + alias_deleted=AliasDeleted( + id=alias.id, + email=alias.email, + ) + ), + ) + EventDispatcher.send_event( + new_user, + EventContent( + alias_created=AliasCreated( + id=alias.id, + email=alias.email, + note=alias.note, + enabled=alias.enabled, + created_at=int(alias.created_at.timestamp), + ) + ), + ) + Session.commit() @@ -518,3 +543,30 @@ def change_alias_status(alias: Alias, enabled: bool, commit: bool = False): if commit: Session.commit() + + +@dataclass +class AliasRecipientName: + name: str + message: Optional[str] = None + + +def get_alias_recipient_name(alias: Alias) -> AliasRecipientName: + """ + Logic: + 1. If alias has name, use it + 2. If alias has custom domain, and custom domain has name, use it + 3. Otherwise, use the alias email as the recipient + """ + if alias.name: + return AliasRecipientName( + name=sl_formataddr((alias.name, alias.email)), + message=f"Put alias name {alias.name} in from header", + ) + elif alias.custom_domain: + if alias.custom_domain.name: + return AliasRecipientName( + name=sl_formataddr((alias.custom_domain.name, alias.email)), + message=f"Put domain default alias name {alias.custom_domain.name} in from header", + ) + return AliasRecipientName(name=alias.email) diff --git a/app/app/api/views/auth.py b/app/app/api/views/auth.py index a22b30d..c1cacef 100644 --- a/app/app/api/views/auth.py +++ b/app/app/api/views/auth.py @@ -52,8 +52,12 @@ def auth_login(): password = data.get("password") device = data.get("device") - email = sanitize_email(data.get("email")) - canonical_email = canonicalize_email(data.get("email")) + email = data.get("email") + if not email: + LoginEvent(LoginEvent.ActionType.failed, LoginEvent.Source.api).send() + return jsonify(error="Email or password incorrect"), 400 + email = sanitize_email(email) + canonical_email = canonicalize_email(email) user = User.get_by(email=email) or User.get_by(email=canonical_email) diff --git a/app/app/api/views/user_info.py b/app/app/api/views/user_info.py index 44c530d..e52e05a 100644 --- a/app/app/api/views/user_info.py +++ b/app/app/api/views/user_info.py @@ -87,7 +87,7 @@ def update_user_info(): File.delete(file.id) s3.delete(file.path) Session.flush() - else: + if data["profile_picture"] is not None: raw_data = base64.decodebytes(data["profile_picture"].encode()) if detect_image_format(raw_data) == ImageFormat.Unknown: return jsonify(error="Unsupported image format"), 400 diff --git a/app/app/config.py b/app/app/config.py index 1962976..0ad1d37 100644 --- a/app/app/config.py +++ b/app/app/config.py @@ -653,9 +653,11 @@ def read_partner_dict(var: str) -> dict[int, str]: return res -PARTNER_DOMAINS: dict[int, str] = read_partner_dict("PARTNER_DOMAINS") -PARTNER_DOMAIN_VALIDATION_PREFIXES: dict[int, str] = read_partner_dict( - "PARTNER_DOMAIN_VALIDATION_PREFIXES" +PARTNER_DNS_CUSTOM_DOMAINS: dict[int, str] = read_partner_dict( + "PARTNER_DNS_CUSTOM_DOMAINS" +) +PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES: dict[int, str] = read_partner_dict( + "PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES" ) MAILBOX_VERIFICATION_OVERRIDE_CODE: Optional[str] = os.environ.get( diff --git a/app/app/custom_domain_validation.py b/app/app/custom_domain_validation.py index 4dbf201..ce4b844 100644 --- a/app/app/custom_domain_validation.py +++ b/app/app/custom_domain_validation.py @@ -1,15 +1,17 @@ from dataclasses import dataclass -from typing import Optional +from typing import List, Optional from app import config from app.constants import DMARC_RECORD from app.db import Session from app.dns_utils import ( + MxRecord, DNSClient, is_mx_equivalent, get_network_dns_client, ) from app.models import CustomDomain +from app.utils import random_string @dataclass @@ -28,10 +30,10 @@ class CustomDomainValidation: ): self.dkim_domain = dkim_domain self._dns_client = dns_client - self._partner_domains = partner_domains or config.PARTNER_DOMAINS + self._partner_domains = partner_domains or config.PARTNER_DNS_CUSTOM_DOMAINS self._partner_domain_validation_prefixes = ( partner_domains_validation_prefixes - or config.PARTNER_DOMAIN_VALIDATION_PREFIXES + or config.PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES ) def get_ownership_verification_record(self, domain: CustomDomain) -> str: @@ -41,8 +43,36 @@ class CustomDomainValidation: and domain.partner_id in self._partner_domain_validation_prefixes ): prefix = self._partner_domain_validation_prefixes[domain.partner_id] + + if not domain.ownership_txt_token: + domain.ownership_txt_token = random_string(30) + Session.commit() + return f"{prefix}-verification={domain.ownership_txt_token}" + def get_expected_mx_records(self, domain: CustomDomain) -> list[MxRecord]: + records = [] + if domain.partner_id is not None and domain.partner_id in self._partner_domains: + domain = self._partner_domains[domain.partner_id] + records.append(MxRecord(10, f"mx1.{domain}.")) + records.append(MxRecord(20, f"mx2.{domain}.")) + else: + # Default ones + for priority, domain in config.EMAIL_SERVERS_WITH_PRIORITY: + records.append(MxRecord(priority, domain)) + + return records + + def get_expected_spf_domain(self, domain: CustomDomain) -> str: + if domain.partner_id is not None and domain.partner_id in self._partner_domains: + return self._partner_domains[domain.partner_id] + else: + return config.EMAIL_DOMAIN + + def get_expected_spf_record(self, domain: CustomDomain) -> str: + spf_domain = self.get_expected_spf_domain(domain) + return f"v=spf1 include:{spf_domain} ~all" + def get_dkim_records(self, domain: CustomDomain) -> {str: str}: """ Get a list of dkim records to set up. Depending on the custom_domain, whether if it's from a partner or not, @@ -116,11 +146,12 @@ class CustomDomainValidation: self, custom_domain: CustomDomain ) -> DomainValidationResult: mx_domains = self._dns_client.get_mx_domains(custom_domain.domain) + expected_mx_records = self.get_expected_mx_records(custom_domain) - if not is_mx_equivalent(mx_domains, config.EMAIL_SERVERS_WITH_PRIORITY): + if not is_mx_equivalent(mx_domains, expected_mx_records): return DomainValidationResult( success=False, - errors=[f"{priority} {domain}" for (priority, domain) in mx_domains], + errors=[f"{record.priority} {record.domain}" for record in mx_domains], ) else: custom_domain.verified = True @@ -131,16 +162,19 @@ class CustomDomainValidation: self, custom_domain: CustomDomain ) -> DomainValidationResult: spf_domains = self._dns_client.get_spf_domain(custom_domain.domain) - if config.EMAIL_DOMAIN in spf_domains: + expected_spf_domain = self.get_expected_spf_domain(custom_domain) + if expected_spf_domain in spf_domains: custom_domain.spf_verified = True Session.commit() return DomainValidationResult(success=True, errors=[]) else: custom_domain.spf_verified = False Session.commit() + txt_records = self._dns_client.get_txt_record(custom_domain.domain) + cleaned_records = self.__clean_spf_records(txt_records, custom_domain) return DomainValidationResult( success=False, - errors=self._dns_client.get_txt_record(custom_domain.domain), + errors=cleaned_records, ) def validate_dmarc_records( @@ -155,3 +189,13 @@ class CustomDomainValidation: custom_domain.dmarc_verified = False Session.commit() return DomainValidationResult(success=False, errors=txt_records) + + def __clean_spf_records( + self, txt_records: List[str], custom_domain: CustomDomain + ) -> List[str]: + final_records = [] + verification_record = self.get_ownership_verification_record(custom_domain) + for record in txt_records: + if record != verification_record: + final_records.append(record) + return final_records diff --git a/app/app/dashboard/views/custom_domain.py b/app/app/dashboard/views/custom_domain.py index b410b30..5b70249 100644 --- a/app/app/dashboard/views/custom_domain.py +++ b/app/app/dashboard/views/custom_domain.py @@ -21,7 +21,9 @@ class NewCustomDomainForm(FlaskForm): @parallel_limiter.lock(only_when=lambda: request.method == "POST") def custom_domain(): custom_domains = CustomDomain.filter_by( - user_id=current_user.id, is_sl_subdomain=False + user_id=current_user.id, + is_sl_subdomain=False, + pending_deletion=False, ).all() new_custom_domain_form = NewCustomDomainForm() diff --git a/app/app/dashboard/views/domain_detail.py b/app/app/dashboard/views/domain_detail.py index 0911a74..2b1ac32 100644 --- a/app/app/dashboard/views/domain_detail.py +++ b/app/app/dashboard/views/domain_detail.py @@ -36,8 +36,6 @@ def domain_detail_dns(custom_domain_id): custom_domain.ownership_txt_token = random_string(30) Session.commit() - spf_record = f"v=spf1 include:{EMAIL_DOMAIN} ~all" - domain_validator = CustomDomainValidation(EMAIL_DOMAIN) csrf_form = CSRFValidationForm() @@ -141,7 +139,9 @@ def domain_detail_dns(custom_domain_id): ownership_record=domain_validator.get_ownership_verification_record( custom_domain ), + expected_mx_records=domain_validator.get_expected_mx_records(custom_domain), dkim_records=domain_validator.get_dkim_records(custom_domain), + spf_record=domain_validator.get_expected_spf_record(custom_domain), dmarc_record=DMARC_RECORD, **locals(), ) diff --git a/app/app/dns_utils.py b/app/app/dns_utils.py index 2ce6993..202f109 100644 --- a/app/app/dns_utils.py +++ b/app/app/dns_utils.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod -from typing import List, Tuple, Optional +from dataclasses import dataclass +from typing import List, Optional import dns.resolver @@ -8,8 +9,14 @@ from app.config import NAMESERVERS _include_spf = "include:" +@dataclass +class MxRecord: + priority: int + domain: str + + def is_mx_equivalent( - mx_domains: List[Tuple[int, str]], ref_mx_domains: List[Tuple[int, str]] + mx_domains: List[MxRecord], ref_mx_domains: List[MxRecord] ) -> bool: """ Compare mx_domains with ref_mx_domains to see if they are equivalent. @@ -18,14 +25,14 @@ def is_mx_equivalent( The priority order is taken into account but not the priority number. For example, [(1, domain1), (2, domain2)] is equivalent to [(10, domain1), (20, domain2)] """ - mx_domains = sorted(mx_domains, key=lambda x: x[0]) - ref_mx_domains = sorted(ref_mx_domains, key=lambda x: x[0]) + mx_domains = sorted(mx_domains, key=lambda x: x.priority) + ref_mx_domains = sorted(ref_mx_domains, key=lambda x: x.priority) if len(mx_domains) < len(ref_mx_domains): return False - for i in range(len(ref_mx_domains)): - if mx_domains[i][1] != ref_mx_domains[i][1]: + for actual, expected in zip(mx_domains, ref_mx_domains): + if actual.domain != expected.domain: return False return True @@ -37,7 +44,7 @@ class DNSClient(ABC): pass @abstractmethod - def get_mx_domains(self, hostname: str) -> List[Tuple[int, str]]: + def get_mx_domains(self, hostname: str) -> List[MxRecord]: pass def get_spf_domain(self, hostname: str) -> List[str]: @@ -81,7 +88,7 @@ class NetworkDNSClient(DNSClient): except Exception: return None - def get_mx_domains(self, hostname: str) -> List[Tuple[int, str]]: + def get_mx_domains(self, hostname: str) -> List[MxRecord]: """ return list of (priority, domain name) sorted by priority (lowest priority first) domain name ends with a "." at the end. @@ -92,14 +99,14 @@ class NetworkDNSClient(DNSClient): for a in answers: record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.' parts = record.split(" ") - ret.append((int(parts[0]), parts[1])) - return sorted(ret, key=lambda x: x[0]) + ret.append(MxRecord(priority=int(parts[0]), domain=parts[1])) + return sorted(ret, key=lambda x: x.priority) except Exception: return [] def get_txt_record(self, hostname: str) -> List[str]: try: - answers = self._resolver.resolve(hostname, "TXT", search=True) + answers = self._resolver.resolve(hostname, "TXT", search=False) ret = [] for a in answers: # type: dns.rdtypes.ANY.TXT.TXT for record in a.strings: @@ -112,14 +119,14 @@ class NetworkDNSClient(DNSClient): class InMemoryDNSClient(DNSClient): def __init__(self): self.cname_records: dict[str, Optional[str]] = {} - self.mx_records: dict[str, List[Tuple[int, str]]] = {} + self.mx_records: dict[str, List[MxRecord]] = {} self.spf_records: dict[str, List[str]] = {} self.txt_records: dict[str, List[str]] = {} def set_cname_record(self, hostname: str, cname: str): self.cname_records[hostname] = cname - def set_mx_records(self, hostname: str, mx_list: List[Tuple[int, str]]): + def set_mx_records(self, hostname: str, mx_list: List[MxRecord]): self.mx_records[hostname] = mx_list def set_txt_record(self, hostname: str, txt_list: List[str]): @@ -128,9 +135,9 @@ class InMemoryDNSClient(DNSClient): def get_cname_record(self, hostname: str) -> Optional[str]: return self.cname_records.get(hostname) - def get_mx_domains(self, hostname: str) -> List[Tuple[int, str]]: + def get_mx_domains(self, hostname: str) -> List[MxRecord]: mx_list = self.mx_records.get(hostname, []) - return sorted(mx_list, key=lambda x: x[0]) + return sorted(mx_list, key=lambda x: x.priority) def get_txt_record(self, hostname: str) -> List[str]: return self.txt_records.get(hostname, []) @@ -140,5 +147,5 @@ def get_network_dns_client() -> NetworkDNSClient: return NetworkDNSClient(NAMESERVERS) -def get_mx_domains(hostname: str) -> [(int, str)]: +def get_mx_domains(hostname: str) -> List[MxRecord]: return get_network_dns_client().get_mx_domains(hostname) diff --git a/app/app/email_utils.py b/app/app/email_utils.py index 5ff34d0..742806d 100644 --- a/app/app/email_utils.py +++ b/app/app/email_utils.py @@ -592,7 +592,7 @@ def email_can_be_used_as_mailbox(email_address: str) -> bool: from app.models import CustomDomain - if CustomDomain.get_by(domain=domain, verified=True): + if CustomDomain.get_by(domain=domain, is_sl_subdomain=True, verified=True): LOG.d("domain %s is a SimpleLogin custom domain", domain) return False @@ -657,7 +657,7 @@ def get_mx_domain_list(domain) -> [str]: """ priority_domains = get_mx_domains(domain) - return [d[:-1] for _, d in priority_domains] + return [d.domain[:-1] for d in priority_domains] def personal_email_already_used(email_address: str) -> bool: diff --git a/app/app/models.py b/app/app/models.py index c86c5df..092c106 100644 --- a/app/app/models.py +++ b/app/app/models.py @@ -2766,9 +2766,9 @@ class Mailbox(Base, ModelMixin): from app.email_utils import get_email_local_part - mx_domains: [(int, str)] = get_mx_domains(get_email_local_part(self.email)) + mx_domains = get_mx_domains(get_email_local_part(self.email)) # Proton is the first domain - if mx_domains and mx_domains[0][1] in ( + if mx_domains and mx_domains[0].domain in ( "mail.protonmail.ch.", "mailsec.protonmail.ch.", ): diff --git a/app/cron.py b/app/cron.py index bfc150d..f8bb09f 100644 --- a/app/cron.py +++ b/app/cron.py @@ -14,6 +14,7 @@ from sqlalchemy.sql import Insert, text from app import s3, config from app.alias_utils import nb_email_log_for_mailbox from app.api.views.apple import verify_receipt +from app.custom_domain_validation import CustomDomainValidation from app.db import Session from app.dns_utils import get_mx_domains, is_mx_equivalent from app.email_utils import ( @@ -905,9 +906,11 @@ def check_custom_domain(): LOG.i("custom domain has been deleted") -def check_single_custom_domain(custom_domain): +def check_single_custom_domain(custom_domain: CustomDomain): mx_domains = get_mx_domains(custom_domain.domain) - if not is_mx_equivalent(mx_domains, config.EMAIL_SERVERS_WITH_PRIORITY): + validator = CustomDomainValidation(dkim_domain=config.EMAIL_DOMAIN) + expected_custom_domains = validator.get_expected_mx_records(custom_domain) + if not is_mx_equivalent(mx_domains, expected_custom_domains): user = custom_domain.user LOG.w( "The MX record is not correctly set for %s %s %s", diff --git a/app/email_handler.py b/app/email_handler.py index 2baee1a..d44555b 100644 --- a/app/email_handler.py +++ b/app/email_handler.py @@ -53,7 +53,11 @@ from flanker.addresslib.address import EmailAddress from sqlalchemy.exc import IntegrityError from app import pgp_utils, s3, config, contact_utils -from app.alias_utils import try_auto_create, change_alias_status +from app.alias_utils import ( + try_auto_create, + change_alias_status, + get_alias_recipient_name, +) from app.config import ( EMAIL_DOMAIN, URL, @@ -1161,23 +1165,11 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str): Session.commit() - # make the email comes from alias - from_header = alias.email - # add alias name from alias - if alias.name: - LOG.d("Put alias name %s in from header", alias.name) - from_header = sl_formataddr((alias.name, alias.email)) - elif alias.custom_domain: - # add alias name from domain - if alias.custom_domain.name: - LOG.d( - "Put domain default alias name %s in from header", - alias.custom_domain.name, - ) - from_header = sl_formataddr((alias.custom_domain.name, alias.email)) - - LOG.d("From header is %s", from_header) - add_or_replace_header(msg, headers.FROM, from_header) + recipient_name = get_alias_recipient_name(alias) + if recipient_name.message: + LOG.d(recipient_name.message) + LOG.d("From header is %s", recipient_name.name) + add_or_replace_header(msg, headers.FROM, recipient_name.name) try: if str(msg[headers.TO]).lower() == "undisclosed-recipients:;": diff --git a/app/templates/admin/email_search.html b/app/templates/admin/email_search.html index 49eef26..291ec4e 100644 --- a/app/templates/admin/email_search.html +++ b/app/templates/admin/email_search.html @@ -1,220 +1,220 @@ {% extends 'admin/master.html' %} {% macro show_user(user) -%} -

User {{ user.email }} with ID {{ user.id }}.

- {% set pu = helper.partner_user(user) %} - - - - - - - - - - - - - - - - - - {% if user.disabled %} - - - {% else %} - - {% endif %} - - - - - {% if pu %} - - - {% else %} - - {% endif %} - - -
User IDEmailStatusPaidSubscriptionCreated AtUpdated AtConnected with Proton account
{{ user.id }}{{ user.email }}DisabledEnabled{{ "yes" if user.is_paid() else "No" }}{{ user.get_active_subscription() }}{{ user.created_at }}{{ user.updated_at }}{{ pu.partner_email }}No
-{%- endmacro %} -{% macro list_mailboxes(mbox_count, mboxes) %} -

- {{ mbox_count }} Mailboxes found. - {% if mbox_count>10 %}Showing only the last 10.{% endif %} -

- - - - - - - - - - - {% for mailbox in mboxes %} - +

User {{ user.email }} with ID {{ user.id }}.

+ {% set pu = helper.partner_user(user) %} +
Mailbox IDEmailVerifiedCreated At
+ - - - - + + + + + + + + - {% endfor %} - -
{{ mailbox.id }}{{mailbox.email}}{{ "Yes" if mailbox.verified else "No" }} - {{ mailbox.created_at }} - User IDEmailStatusPaidSubscriptionCreated AtUpdated AtConnected with Proton account
+ + + + {{ user.id }} + {{ user.email }} + {% if user.disabled %} + + Disabled + {% else %} + Enabled + {% endif %} + {{ "yes" if user.is_paid() else "No" }} + {{ user.get_active_subscription() }} + {{ user.created_at }} + {{ user.updated_at }} + {% if pu %} + + {{ pu.partner_email }} + {% else %} + No + {% endif %} + + + +{%- endmacro %} +{% macro list_mailboxes(message, mbox_count, mboxes) %} +

+ {{ mbox_count }} {{ message }}. + {% if mbox_count>10 %}Showing only the last 10.{% endif %} +

+ + + + + + + + + + + {% for mailbox in mboxes %} + + + + + + + + {% endfor %} + +
Mailbox IDEmailVerifiedCreated At
{{ mailbox.id }}{{ mailbox.email }}{{ "Yes" if mailbox.verified else "No" }} + {{ mailbox.created_at }} +
{% endmacro %} {% macro list_alias(alias_count, aliases) %} -

- {{ alias_count }} Aliases found. - {% if alias_count>10 %}Showing only the last 10.{% endif %} -

- - - - - - - - - - - {% for alias in aliases %} +

+ {{ alias_count }} Aliases found. + {% if alias_count>10 %}Showing only the last 10.{% endif %} +

+
- Alias ID - - Email - - Verified - - Created At -
+ - - - - + + + + - {% endfor %} - -
{{ alias.id }}{{alias.email}}{{ "Yes" if alias.verified else "No" }}{{ alias.created_at }} + Alias ID + + Email + + Verified + + Created At +
+ + + {% for alias in aliases %} + + {{ alias.id }} + {{ alias.email }} + {{ "Yes" if alias.verified else "No" }} + {{ alias.created_at }} + + {% endfor %} + + {% endmacro %} {% macro show_deleted_alias(deleted_alias) -%} -

Deleted Alias {{ deleted_alias.email }} with ID {{ deleted_alias.id }}.

- - - - - - - - - - - - - - - - - -
Deleted Alias IDEmailDeleted AtReason
{{ deleted_alias.id }}{{ deleted_alias.email }}{{ deleted_alias.created_at }}{{ deleted_alias.reason }}
+

Deleted Alias {{ deleted_alias.email }} with ID {{ deleted_alias.id }}.

+ + + + + + + + + + + + + + + + + +
Deleted Alias IDEmailDeleted AtReason
{{ deleted_alias.id }}{{ deleted_alias.email }}{{ deleted_alias.created_at }}{{ deleted_alias.reason }}
{%- endmacro %} {% macro show_domain_deleted_alias(dom_deleted_alias) -%} -

- Domain Deleted Alias {{ dom_deleted_alias.email }} with ID {{ dom_deleted_alias.id }} for domain {{ dom_deleted_alias.domain.domain }} -

- - - - - - - - - - - - - - - - - - - - - - -
Deleted Alias IDEmailDomainDomain IDDomain owner user IDDomain owner user emailDeleted At
{{ dom_deleted_alias.id }}{{ dom_deleted_alias.email }}{{ dom_deleted_alias.domain.domain }}{{ dom_deleted_alias.domain.id }}{{ dom_deleted_alias.domain.user_id }}{{ dom_deleted_alias.created_at }}
- {{ show_user(data.domain_deleted_alias.domain.user) }} +

+ Domain Deleted Alias {{ dom_deleted_alias.email }} with ID {{ dom_deleted_alias.id }} for + domain {{ dom_deleted_alias.domain.domain }} +

+ + + + + + + + + + + + + + + + + + + + + + +
Deleted Alias IDEmailDomainDomain IDDomain owner user IDDomain owner user emailDeleted At
{{ dom_deleted_alias.id }}{{ dom_deleted_alias.email }}{{ dom_deleted_alias.domain.domain }}{{ dom_deleted_alias.domain.id }}{{ dom_deleted_alias.domain.user_id }}{{ dom_deleted_alias.created_at }}
+ {{ show_user(data.domain_deleted_alias.domain.user) }} {%- endmacro %} {% block body %} -
-
-
- - -
- -
-
- {% if data.no_match and email %} - - - {% endif %} - {% if data.alias %} -
-

Found Alias {{ data.alias.email }}

- {{ list_alias(1,[data.alias]) }} - {{ show_user(data.alias.user) }} - {{ list_mailboxes(helper.mailbox_count(data.alias.user) , helper.mailbox_list(data.alias.user) ) }} +
+
+ + +
+ +
- {% endif %} - {% if data.user %} - -
-

Found User {{ data.user.email }}

- {{ show_user(data.user) }} - {{ list_mailboxes(helper.mailbox_count(data.user) , helper.mailbox_list(data.user) ) }} - {{ list_alias(helper.alias_count(data.user) ,helper.alias_list(data.user)) }} -
- {% endif %} - {% if data.mailbox_count > 10 %} -

Found more than 10 mailboxes for {{ email }}. Showing the last 10

- {% elif data.mailbox_count > 0 %} -

Found {{ data.mailbox_count }} mailbox(es) for {{ email }}

+ {% if data.no_match and email %} + {% endif %} - {% for mailbox in data.mailbox %} -
-

Found Mailbox {{ mailbox.email }}

- {{ list_mailboxes(1, [mailbox]) }} - {{ show_user(mailbox.user) }} -
- {% endfor %} - {% if data.deleted_alias %} + {% if data.alias %} +
+

Found Alias {{ data.alias.email }}

+ {{ list_alias(1,[data.alias]) }} + {{ list_mailboxes("Mailboxes for alias", helper.alias_mailbox_count(data.alias), helper.alias_mailboxes(data.alias)) }} + {{ show_user(data.alias.user) }} +
+ {% endif %} -
-

Found DeletedAlias {{ data.deleted_alias.email }}

- {{ show_deleted_alias(data.deleted_alias) }} -
- {% endif %} - {% if data.domain_deleted_alias %} + {% if data.user %} +
+

Found User {{ data.user.email }}

+ {{ show_user(data.user) }} + {{ list_mailboxes("Mailboxes for user", helper.mailbox_count(data.user) , helper.mailbox_list(data.user) ) }} + {{ list_alias(helper.alias_count(data.user) ,helper.alias_list(data.user)) }} +
+ {% endif %} + {% if data.mailbox_count > 10 %} +

Found more than 10 mailboxes for {{ email }}. Showing the last 10

+ {% elif data.mailbox_count > 0 %} +

Found {{ data.mailbox_count }} mailbox(es) for {{ email }}

+ {% endif %} + {% for mailbox in data.mailbox %} -
-

Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}

- {{ show_domain_deleted_alias(data.domain_deleted_alias) }} -
- {% endif %} +
+

Found Mailbox {{ mailbox.email }}

+ {{ list_mailboxes("Mailbox found", 1, [mailbox]) }} + {{ show_user(mailbox.user) }} +
+ {% endfor %} + {% if data.deleted_alias %} + +
+

Found DeletedAlias {{ data.deleted_alias.email }}

+ {{ show_deleted_alias(data.deleted_alias) }} +
+ {% endif %} + {% if data.domain_deleted_alias %} + +
+

Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}

+ {{ show_domain_deleted_alias(data.domain_deleted_alias) }} +
+ {% endif %} {% endblock %} diff --git a/app/templates/dashboard/domain_detail/dns.html b/app/templates/dashboard/domain_detail/dns.html index 4058f5e..e100fee 100644 --- a/app/templates/dashboard/domain_detail/dns.html +++ b/app/templates/dashboard/domain_detail/dns.html @@ -91,7 +91,8 @@
Some domain registrars (Namecheap, CloudFlare, etc) might also use @ for the root domain. - {% for priority, email_server in EMAIL_SERVERS_WITH_PRIORITY %} + + {% for record in expected_mx_records %}
Record: MX @@ -99,14 +100,15 @@ Domain: {{ custom_domain.domain }} or @
- Priority: {{ priority }} + Priority: {{ record.priority }}
Target: {{ email_server }} + data-clipboard-text="{{ record.domain }}">{{ record.domain }}
{% endfor %} +
{{ csrf_form.csrf_token }} diff --git a/app/tests/dashboard/test_alias_transfer.py b/app/tests/dashboard/test_alias_transfer.py index 32a063f..8680fa5 100644 --- a/app/tests/dashboard/test_alias_transfer.py +++ b/app/tests/dashboard/test_alias_transfer.py @@ -1,38 +1,72 @@ import app.alias_utils +from app import config from app.db import Session +from app.events.event_dispatcher import GlobalDispatcher from app.models import ( Alias, Mailbox, - User, AliasMailbox, ) +from tests.events.event_test_utils import ( + OnMemoryDispatcher, + _get_event_from_string, + _create_linked_user, +) from tests.utils import login +on_memory_dispatcher = OnMemoryDispatcher() + + +def setup_module(): + GlobalDispatcher.set_dispatcher(on_memory_dispatcher) + config.EVENT_WEBHOOK = "http://test" + + +def teardown_module(): + GlobalDispatcher.set_dispatcher(None) + config.EVENT_WEBHOOK = None + def test_alias_transfer(flask_client): - user = login(flask_client) - mb = Mailbox.create(user_id=user.id, email="mb@gmail.com", commit=True) + (source_user, source_user_pu) = _create_linked_user() + source_user = login(flask_client, source_user) + mb = Mailbox.create(user_id=source_user.id, email="mb@gmail.com", commit=True) - alias = Alias.create_new_random(user) + alias = Alias.create_new_random(source_user) Session.commit() AliasMailbox.create(alias_id=alias.id, mailbox_id=mb.id, commit=True) - new_user = User.create( - email="hey@example.com", - password="password", - activated=True, - commit=True, - ) + (target_user, target_user_pu) = _create_linked_user() Mailbox.create( - user_id=new_user.id, email="hey2@example.com", verified=True, commit=True + user_id=target_user.id, email="hey2@example.com", verified=True, commit=True ) - app.alias_utils.transfer_alias(alias, new_user, new_user.mailboxes()) + on_memory_dispatcher.clear() + app.alias_utils.transfer_alias(alias, target_user, target_user.mailboxes()) # refresh from db alias = Alias.get(alias.id) - assert alias.user == new_user - assert set(alias.mailboxes) == set(new_user.mailboxes()) + assert alias.user == target_user + assert set(alias.mailboxes) == set(target_user.mailboxes()) assert len(alias.mailboxes) == 2 + + # Check events + assert len(on_memory_dispatcher.memory) == 2 + # 1st delete event + event_data = on_memory_dispatcher.memory[0] + event_content = _get_event_from_string(event_data, source_user, source_user_pu) + assert event_content.alias_deleted is not None + alias_deleted = event_content.alias_deleted + assert alias_deleted.id == alias.id + assert alias_deleted.email == alias.email + # 2nd create event + event_data = on_memory_dispatcher.memory[1] + event_content = _get_event_from_string(event_data, target_user, target_user_pu) + assert event_content.alias_created is not None + alias_created = event_content.alias_created + assert alias.id == alias_created.id + assert alias.email == alias_created.email + assert alias.note or "" == alias_created.note + assert alias.enabled == alias_created.enabled diff --git a/app/tests/events/event_test_utils.py b/app/tests/events/event_test_utils.py index 3a5342c..d7b6e96 100644 --- a/app/tests/events/event_test_utils.py +++ b/app/tests/events/event_test_utils.py @@ -1,4 +1,5 @@ from app.events.event_dispatcher import Dispatcher +from app.events.generated import event_pb2 from app.models import PartnerUser, User from app.proton.utils import get_proton_partner from tests.utils import create_new_user, random_token @@ -30,3 +31,14 @@ def _create_linked_user() -> Tuple[User, PartnerUser]: ) return user, partner_user + + +def _get_event_from_string( + data: str, user: User, pu: PartnerUser +) -> event_pb2.EventContent: + event = event_pb2.Event() + event.ParseFromString(data) + assert user.id == event.user_id + assert pu.external_user_id == event.external_user_id + assert pu.partner_id == event.partner_id + return event.content diff --git a/app/tests/events/test_sent_events.py b/app/tests/events/test_sent_events.py index 20d4645..910e3db 100644 --- a/app/tests/events/test_sent_events.py +++ b/app/tests/events/test_sent_events.py @@ -1,12 +1,12 @@ from app import config, alias_utils from app.db import Session from app.events.event_dispatcher import GlobalDispatcher -from app.events.generated import event_pb2 -from app.models import Alias, User, PartnerUser +from app.models import Alias from tests.utils import random_token from .event_test_utils import ( OnMemoryDispatcher, _create_linked_user, + _get_event_from_string, ) on_memory_dispatcher = OnMemoryDispatcher() @@ -26,17 +26,6 @@ def setup_function(func): on_memory_dispatcher.clear() -def _get_event_from_string( - data: str, user: User, pu: PartnerUser -) -> event_pb2.EventContent: - event = event_pb2.Event() - event.ParseFromString(data) - assert user.id == event.user_id - assert pu.external_user_id == event.external_user_id - assert pu.partner_id == event.partner_id - return event.content - - def test_fire_event_on_alias_creation(): (user, pu) = _create_linked_user() alias = Alias.create_new_random(user) diff --git a/app/tests/test_alias_utils.py b/app/tests/test_alias_utils.py index 2499e5f..507b507 100644 --- a/app/tests/test_alias_utils.py +++ b/app/tests/test_alias_utils.py @@ -4,6 +4,7 @@ from app.alias_utils import ( delete_alias, check_alias_prefix, get_user_if_alias_would_auto_create, + get_alias_recipient_name, try_auto_create, ) from app.config import ALIAS_DOMAINS @@ -18,7 +19,8 @@ from app.models import ( User, DomainDeletedAlias, ) -from tests.utils import create_new_user, random_domain, random_token +from app.utils import random_string +from tests.utils import create_new_user, random_domain, random_token, random_email def test_delete_alias(flask_client): @@ -131,3 +133,91 @@ def test_auto_create_alias(flask_client): assert result, f"Case {test_id} - Failed address {address}" else: assert result is None, f"Case {test_id} - Failed address {address}" + + +# get_alias_recipient_name +def test_get_alias_recipient_name_no_overrides(): + user = create_new_user() + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + commit=True, + ) + res = get_alias_recipient_name(alias) + assert res.message is None + assert res.name == alias.email + + +def test_get_alias_recipient_name_alias_name(): + user = create_new_user() + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + name=random_string(), + commit=True, + ) + res = get_alias_recipient_name(alias) + assert res.message is not None + assert res.name == f"{alias.name} <{alias.email}>" + + +def test_get_alias_recipient_alias_with_name_and_custom_domain_name(): + user = create_new_user() + custom_domain = CustomDomain.create( + user_id=user.id, + domain=random_domain(), + name=random_string(), + verified=True, + ) + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + name=random_string(), + custom_domain_id=custom_domain.id, + commit=True, + ) + res = get_alias_recipient_name(alias) + assert res.message is not None + assert res.name == f"{alias.name} <{alias.email}>" + + +def test_get_alias_recipient_alias_without_name_and_custom_domain_without_name(): + user = create_new_user() + custom_domain = CustomDomain.create( + user_id=user.id, + domain=random_domain(), + verified=True, + ) + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + custom_domain_id=custom_domain.id, + commit=True, + ) + res = get_alias_recipient_name(alias) + assert res.message is None + assert res.name == alias.email + + +def test_get_alias_recipient_alias_without_name_and_custom_domain_name(): + user = create_new_user() + custom_domain = CustomDomain.create( + user_id=user.id, + domain=random_domain(), + name=random_string(), + verified=True, + ) + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + custom_domain_id=custom_domain.id, + commit=True, + ) + res = get_alias_recipient_name(alias) + assert res.message is not None + assert res.name == f"{custom_domain.name} <{alias.email}>" diff --git a/app/tests/test_custom_domain_validation.py b/app/tests/test_custom_domain_validation.py index b6e4386..d0de3db 100644 --- a/app/tests/test_custom_domain_validation.py +++ b/app/tests/test_custom_domain_validation.py @@ -5,7 +5,7 @@ from app.constants import DMARC_RECORD from app.custom_domain_validation import CustomDomainValidation from app.db import Session from app.models import CustomDomain, User -from app.dns_utils import InMemoryDNSClient +from app.dns_utils import InMemoryDNSClient, MxRecord from app.proton.utils import get_proton_partner from app.utils import random_string from tests.utils import create_new_user, random_domain @@ -58,6 +58,123 @@ def test_custom_domain_validation_get_dkim_records_for_partner(): assert records["dkim._domainkey"] == f"dkim._domainkey.{dkim_domain}" +# get_expected_mx_records +def test_custom_domain_validation_get_expected_mx_records_regular_domain(): + domain = random_domain() + custom_domain = create_custom_domain(domain) + + partner_id = get_proton_partner().id + + dkim_domain = random_domain() + validator = CustomDomainValidation( + domain, partner_domains={partner_id: dkim_domain} + ) + records = validator.get_expected_mx_records(custom_domain) + # As the domain is not a partner_domain,default records should be used even if + # there is a config for the partner + assert len(records) == len(config.EMAIL_SERVERS_WITH_PRIORITY) + for i in range(len(config.EMAIL_SERVERS_WITH_PRIORITY)): + config_record = config.EMAIL_SERVERS_WITH_PRIORITY[i] + assert records[i].priority == config_record[0] + assert records[i].domain == config_record[1] + + +def test_custom_domain_validation_get_expected_mx_records_domain_from_partner(): + domain = random_domain() + custom_domain = create_custom_domain(domain) + + partner_id = get_proton_partner().id + custom_domain.partner_id = partner_id + Session.commit() + + dkim_domain = random_domain() + validator = CustomDomainValidation(dkim_domain) + records = validator.get_expected_mx_records(custom_domain) + # As the domain is a partner_domain but there is no custom config for partner, default records + # should be used + assert len(records) == len(config.EMAIL_SERVERS_WITH_PRIORITY) + for i in range(len(config.EMAIL_SERVERS_WITH_PRIORITY)): + config_record = config.EMAIL_SERVERS_WITH_PRIORITY[i] + assert records[i].priority == config_record[0] + assert records[i].domain == config_record[1] + + +def test_custom_domain_validation_get_expected_mx_records_domain_from_partner_with_custom_config(): + domain = random_domain() + custom_domain = create_custom_domain(domain) + + partner_id = get_proton_partner().id + custom_domain.partner_id = partner_id + Session.commit() + + dkim_domain = random_domain() + expected_mx_domain = random_domain() + validator = CustomDomainValidation( + dkim_domain, partner_domains={partner_id: expected_mx_domain} + ) + records = validator.get_expected_mx_records(custom_domain) + # As the domain is a partner_domain and there is a custom config for partner, partner records + # should be used + assert len(records) == 2 + + assert records[0].priority == 10 + assert records[0].domain == f"mx1.{expected_mx_domain}." + assert records[1].priority == 20 + assert records[1].domain == f"mx2.{expected_mx_domain}." + + +# get_expected_spf_records +def test_custom_domain_validation_get_expected_spf_record_regular_domain(): + domain = random_domain() + custom_domain = create_custom_domain(domain) + + partner_id = get_proton_partner().id + + dkim_domain = random_domain() + validator = CustomDomainValidation( + domain, partner_domains={partner_id: dkim_domain} + ) + record = validator.get_expected_spf_record(custom_domain) + # As the domain is not a partner_domain, default records should be used even if + # there is a config for the partner + assert record == f"v=spf1 include:{config.EMAIL_DOMAIN} ~all" + + +def test_custom_domain_validation_get_expected_spf_record_domain_from_partner(): + domain = random_domain() + custom_domain = create_custom_domain(domain) + + partner_id = get_proton_partner().id + custom_domain.partner_id = partner_id + Session.commit() + + dkim_domain = random_domain() + validator = CustomDomainValidation(dkim_domain) + record = validator.get_expected_spf_record(custom_domain) + # As the domain is a partner_domain but there is no custom config for partner, default records + # should be used + assert record == f"v=spf1 include:{config.EMAIL_DOMAIN} ~all" + + +def test_custom_domain_validation_get_expected_spf_record_domain_from_partner_with_custom_config(): + domain = random_domain() + custom_domain = create_custom_domain(domain) + + partner_id = get_proton_partner().id + custom_domain.partner_id = partner_id + Session.commit() + + dkim_domain = random_domain() + expected_mx_domain = random_domain() + validator = CustomDomainValidation( + dkim_domain, partner_domains={partner_id: expected_mx_domain} + ) + record = validator.get_expected_spf_record(custom_domain) + # As the domain is a partner_domain and there is a custom config for partner, partner records + # should be used + assert record == f"v=spf1 include:{expected_mx_domain} ~all" + + # validate_dkim_records def test_custom_domain_validation_validate_dkim_records_empty_records_failure(): dns_client = InMemoryDNSClient() @@ -253,7 +370,7 @@ def test_custom_domain_validation_validate_mx_records_wrong_records_failure(): wrong_record_1 = random_string() wrong_record_2 = random_string() - wrong_records = [(10, wrong_record_1), (20, wrong_record_2)] + wrong_records = [MxRecord(10, wrong_record_1), MxRecord(20, wrong_record_2)] dns_client.set_mx_records(domain.domain, wrong_records) res = validator.validate_mx_records(domain) @@ -270,7 +387,7 @@ def test_custom_domain_validation_validate_mx_records_success(): domain = create_custom_domain(random_domain()) - dns_client.set_mx_records(domain.domain, config.EMAIL_SERVERS_WITH_PRIORITY) + dns_client.set_mx_records(domain.domain, validator.get_expected_mx_records(domain)) res = validator.validate_mx_records(domain) assert res.success is True @@ -328,6 +445,58 @@ def test_custom_domain_validation_validate_spf_records_success(): assert db_domain.spf_verified is True +def test_custom_domain_validation_validate_spf_records_partner_domain_success(): + dns_client = InMemoryDNSClient() + proton_partner_id = get_proton_partner().id + + expected_domain = random_domain() + validator = CustomDomainValidation( + dkim_domain=random_domain(), + dns_client=dns_client, + partner_domains={proton_partner_id: expected_domain}, + ) + + domain = create_custom_domain(random_domain()) + domain.partner_id = proton_partner_id + Session.commit() + + dns_client.set_txt_record(domain.domain, [f"v=spf1 include:{expected_domain}"]) + res = validator.validate_spf_records(domain) + + assert res.success is True + assert len(res.errors) == 0 + + db_domain = CustomDomain.get_by(id=domain.id) + assert db_domain.spf_verified is True + + +def test_custom_domain_validation_validate_spf_cleans_verification_record(): + dns_client = InMemoryDNSClient() + proton_partner_id = get_proton_partner().id + + expected_domain = random_domain() + validator = CustomDomainValidation( + dkim_domain=random_domain(), + dns_client=dns_client, + partner_domains={proton_partner_id: expected_domain}, + ) + + domain = create_custom_domain(random_domain()) + domain.partner_id = proton_partner_id + Session.commit() + + wrong_record = random_string() + dns_client.set_txt_record( + hostname=domain.domain, + txt_list=[wrong_record, validator.get_ownership_verification_record(domain)], + ) + res = validator.validate_spf_records(domain) + + assert res.success is False + assert len(res.errors) == 1 + assert res.errors[0] == wrong_record + + # validate_dmarc_records def test_custom_domain_validation_validate_dmarc_records_empty_failure(): dns_client = InMemoryDNSClient() diff --git a/app/tests/test_dns_utils.py b/app/tests/test_dns_utils.py index 374983c..15b2b9a 100644 --- a/app/tests/test_dns_utils.py +++ b/app/tests/test_dns_utils.py @@ -3,6 +3,7 @@ from app.dns_utils import ( get_network_dns_client, is_mx_equivalent, InMemoryDNSClient, + MxRecord, ) from tests.utils import random_domain @@ -17,8 +18,8 @@ def test_get_mx_domains(): assert len(r) > 0 for x in r: - assert x[0] > 0 - assert x[1] + assert x.priority > 0 + assert x.domain def test_get_spf_domain(): @@ -33,20 +34,32 @@ def test_get_txt_record(): def test_is_mx_equivalent(): assert is_mx_equivalent([], []) - assert is_mx_equivalent([(1, "domain")], [(1, "domain")]) assert is_mx_equivalent( - [(10, "domain1"), (20, "domain2")], [(10, "domain1"), (20, "domain2")] + mx_domains=[MxRecord(1, "domain")], ref_mx_domains=[MxRecord(1, "domain")] ) assert is_mx_equivalent( - [(5, "domain1"), (10, "domain2")], [(10, "domain1"), (20, "domain2")] + mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], + ref_mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], ) assert is_mx_equivalent( - [(5, "domain1"), (10, "domain2"), (20, "domain3")], - [(10, "domain1"), (20, "domain2")], + mx_domains=[MxRecord(5, "domain1"), MxRecord(10, "domain2")], + ref_mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], + ) + assert is_mx_equivalent( + mx_domains=[ + MxRecord(5, "domain1"), + MxRecord(10, "domain2"), + MxRecord(20, "domain3"), + ], + ref_mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], ) assert not is_mx_equivalent( - [(5, "domain1"), (10, "domain2")], - [(10, "domain1"), (20, "domain2"), (20, "domain3")], + mx_domains=[MxRecord(5, "domain1"), MxRecord(10, "domain2")], + ref_mx_domains=[ + MxRecord(10, "domain1"), + MxRecord(20, "domain2"), + MxRecord(20, "domain3"), + ], ) diff --git a/app/tests/test_email_utils.py b/app/tests/test_email_utils.py index 7e133f1..c2cc8ab 100644 --- a/app/tests/test_email_utils.py +++ b/app/tests/test_email_utils.py @@ -90,12 +90,19 @@ def test_can_be_used_as_personal_email(flask_client): assert not email_can_be_used_as_mailbox("ab@sl.local") assert not email_can_be_used_as_mailbox("hey@d1.test") - # custom domain + # custom domain as SL domain domain = random_domain() user = create_new_user() - CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True) + domain_obj = CustomDomain.create( + user_id=user.id, domain=domain, verified=True, is_sl_subdomain=True, flush=True + ) assert not email_can_be_used_as_mailbox(f"hey@{domain}") + # custom domain is NOT SL domain + domain_obj.is_sl_subdomain = False + Session.flush() + assert email_can_be_used_as_mailbox(f"hey@{domain}") + # disposable domain disposable_domain = random_domain() InvalidMailboxDomain.create(domain=disposable_domain, commit=True)