Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
294232a329 | |||
fae9d7bc17 | |||
d666f5af3f | |||
556fae02d5 |
@ -424,7 +424,7 @@ def create_contact_route(alias_id):
|
||||
contact_address = data.get("contact")
|
||||
|
||||
try:
|
||||
contact = create_contact(g.user, alias, contact_address)
|
||||
contact = create_contact(alias, contact_address)
|
||||
except ErrContactErrorUpgradeNeeded as err:
|
||||
return jsonify(error=err.error_for_user()), 403
|
||||
except (ErrAddressInvalid, CannotCreateContactForReverseAlias) as err:
|
||||
|
@ -657,3 +657,7 @@ PARTNER_DOMAINS: dict[int, str] = read_partner_dict("PARTNER_DOMAINS")
|
||||
PARTNER_DOMAIN_VALIDATION_PREFIXES: dict[int, str] = read_partner_dict(
|
||||
"PARTNER_DOMAIN_VALIDATION_PREFIXES"
|
||||
)
|
||||
|
||||
MAILBOX_VERIFICATION_OVERRIDE_CODE: Optional[str] = os.environ.get(
|
||||
"MAILBOX_VERIFICATION_OVERRIDE_CODE", None
|
||||
)
|
||||
|
@ -5,7 +5,7 @@ from typing import Optional
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from app.db import Session
|
||||
from app.email_utils import generate_reply_email
|
||||
from app.email_utils import generate_reply_email, parse_full_address
|
||||
from app.email_validation import is_valid_email
|
||||
from app.log import LOG
|
||||
from app.models import Contact, Alias
|
||||
@ -14,11 +14,13 @@ from app.utils import sanitize_email
|
||||
|
||||
class ContactCreateError(Enum):
|
||||
InvalidEmail = "Invalid email"
|
||||
NotAllowed = "Your plan does not allow to create contacts"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ContactCreateResult:
|
||||
contact: Optional[Contact]
|
||||
created: bool
|
||||
error: Optional[ContactCreateError]
|
||||
|
||||
|
||||
@ -33,34 +35,56 @@ def __update_contact_if_needed(
|
||||
LOG.d(f"Setting {contact} mail_from to {mail_from}")
|
||||
contact.mail_from = mail_from
|
||||
Session.commit()
|
||||
return ContactCreateResult(contact, None)
|
||||
return ContactCreateResult(contact, created=False, error=None)
|
||||
|
||||
|
||||
def create_contact(
|
||||
email: str,
|
||||
name: Optional[str],
|
||||
alias: Alias,
|
||||
name: Optional[str] = None,
|
||||
mail_from: Optional[str] = None,
|
||||
allow_empty_email: bool = False,
|
||||
automatic_created: bool = False,
|
||||
from_partner: bool = False,
|
||||
) -> ContactCreateResult:
|
||||
if name is not None:
|
||||
# If user cannot create contacts, they still need to be created when receiving an email for an alias
|
||||
if not automatic_created and not alias.user.can_create_contacts():
|
||||
return ContactCreateResult(
|
||||
None, created=False, error=ContactCreateError.NotAllowed
|
||||
)
|
||||
# Parse emails with form 'name <email>'
|
||||
try:
|
||||
email_name, email = parse_full_address(email)
|
||||
except ValueError:
|
||||
email = ""
|
||||
email_name = ""
|
||||
# If no name is explicitly given try to get it from the parsed email
|
||||
if name is None:
|
||||
name = email_name[: Contact.MAX_NAME_LENGTH]
|
||||
else:
|
||||
name = name[: Contact.MAX_NAME_LENGTH]
|
||||
# If still no name is there, make sure the name is None instead of empty string
|
||||
if not name:
|
||||
name = None
|
||||
if name is not None and "\x00" in name:
|
||||
LOG.w("Cannot use contact name because has \\x00")
|
||||
name = ""
|
||||
# Sanitize email and if it's not valid only allow to create a contact if it's explicitly allowed. Otherwise fail
|
||||
email = sanitize_email(email, not_lower=True)
|
||||
if not is_valid_email(email):
|
||||
LOG.w(f"invalid contact email {email}")
|
||||
if not allow_empty_email:
|
||||
return ContactCreateResult(None, ContactCreateError.InvalidEmail)
|
||||
return ContactCreateResult(
|
||||
None, created=False, error=ContactCreateError.InvalidEmail
|
||||
)
|
||||
LOG.d("Create a contact with invalid email for %s", alias)
|
||||
# either reuse a contact with empty email or create a new contact with empty email
|
||||
email = ""
|
||||
email = sanitize_email(email, not_lower=True)
|
||||
# If contact exists, update name and mail_from if needed
|
||||
contact = Contact.get_by(alias_id=alias.id, website_email=email)
|
||||
if contact is not None:
|
||||
return __update_contact_if_needed(contact, name, mail_from)
|
||||
# Create the contact
|
||||
reply_email = generate_reply_email(email, alias)
|
||||
try:
|
||||
flags = Contact.FLAG_PARTNER_CREATED if from_partner else 0
|
||||
@ -86,4 +110,4 @@ def create_contact(
|
||||
)
|
||||
contact = Contact.get_by(alias_id=alias.id, website_email=email)
|
||||
return __update_contact_if_needed(contact, name, mail_from)
|
||||
return ContactCreateResult(contact, None)
|
||||
return ContactCreateResult(contact, created=True, error=None)
|
||||
|
@ -3,15 +3,16 @@ import re
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
from app.config import JOB_DELETE_DOMAIN
|
||||
from app.db import Session
|
||||
from app.email_utils import get_email_domain_part
|
||||
from app.log import LOG
|
||||
from app.models import User, CustomDomain, SLDomain, Mailbox, Job
|
||||
from app.models import User, CustomDomain, SLDomain, Mailbox, Job, DomainMailbox
|
||||
|
||||
_ALLOWED_DOMAIN_REGEX = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$")
|
||||
_MAX_MAILBOXES_PER_DOMAIN = 20
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -45,6 +46,20 @@ class CannotUseDomainReason(Enum):
|
||||
raise Exception("Invalid CannotUseDomainReason")
|
||||
|
||||
|
||||
class CannotSetCustomDomainMailboxesCause(Enum):
|
||||
InvalidMailbox = "Something went wrong, please retry"
|
||||
NoMailboxes = "You must select at least 1 mailbox"
|
||||
TooManyMailboxes = (
|
||||
f"You can only set up to {_MAX_MAILBOXES_PER_DOMAIN} mailboxes per domain"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SetCustomDomainMailboxesResult:
|
||||
success: bool
|
||||
reason: Optional[CannotSetCustomDomainMailboxesCause] = None
|
||||
|
||||
|
||||
def is_valid_domain(domain: str) -> bool:
|
||||
"""
|
||||
Checks that a domain is valid according to RFC 1035
|
||||
@ -140,3 +155,40 @@ def delete_custom_domain(domain: CustomDomain):
|
||||
run_at=arrow.now(),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
|
||||
def set_custom_domain_mailboxes(
|
||||
user_id: int, custom_domain: CustomDomain, mailbox_ids: List[int]
|
||||
) -> SetCustomDomainMailboxesResult:
|
||||
if len(mailbox_ids) == 0:
|
||||
return SetCustomDomainMailboxesResult(
|
||||
success=False, reason=CannotSetCustomDomainMailboxesCause.NoMailboxes
|
||||
)
|
||||
elif len(mailbox_ids) > _MAX_MAILBOXES_PER_DOMAIN:
|
||||
return SetCustomDomainMailboxesResult(
|
||||
success=False, reason=CannotSetCustomDomainMailboxesCause.TooManyMailboxes
|
||||
)
|
||||
|
||||
mailboxes = (
|
||||
Session.query(Mailbox)
|
||||
.filter(
|
||||
Mailbox.id.in_(mailbox_ids),
|
||||
Mailbox.user_id == user_id,
|
||||
Mailbox.verified == True, # noqa: E712
|
||||
)
|
||||
.all()
|
||||
)
|
||||
if len(mailboxes) != len(mailbox_ids):
|
||||
return SetCustomDomainMailboxesResult(
|
||||
success=False, reason=CannotSetCustomDomainMailboxesCause.InvalidMailbox
|
||||
)
|
||||
|
||||
# first remove all existing domain-mailboxes links
|
||||
DomainMailbox.filter_by(domain_id=custom_domain.id).delete()
|
||||
Session.flush()
|
||||
|
||||
for mailbox in mailboxes:
|
||||
DomainMailbox.create(domain_id=custom_domain.id, mailbox_id=mailbox.id)
|
||||
|
||||
Session.commit()
|
||||
return SetCustomDomainMailboxesResult(success=True)
|
||||
|
@ -9,13 +9,10 @@ from sqlalchemy import and_, func, case
|
||||
from wtforms import StringField, validators, ValidationError
|
||||
|
||||
# Need to import directly from config to allow modification from the tests
|
||||
from app import config, parallel_limiter
|
||||
from app import config, parallel_limiter, contact_utils
|
||||
from app.contact_utils import ContactCreateError
|
||||
from app.dashboard.base import dashboard_bp
|
||||
from app.db import Session
|
||||
from app.email_utils import (
|
||||
generate_reply_email,
|
||||
parse_full_address,
|
||||
)
|
||||
from app.email_validation import is_valid_email
|
||||
from app.errors import (
|
||||
CannotCreateContactForReverseAlias,
|
||||
@ -24,8 +21,8 @@ from app.errors import (
|
||||
ErrContactAlreadyExists,
|
||||
)
|
||||
from app.log import LOG
|
||||
from app.models import Alias, Contact, EmailLog, User
|
||||
from app.utils import sanitize_email, CSRFValidationForm
|
||||
from app.models import Alias, Contact, EmailLog
|
||||
from app.utils import CSRFValidationForm
|
||||
|
||||
|
||||
def email_validator():
|
||||
@ -51,7 +48,7 @@ def email_validator():
|
||||
return _check
|
||||
|
||||
|
||||
def create_contact(user: User, alias: Alias, contact_address: str) -> Contact:
|
||||
def create_contact(alias: Alias, contact_address: str) -> Contact:
|
||||
"""
|
||||
Create a contact for a user. Can be restricted for new free users by enabling DISABLE_CREATE_CONTACTS_FOR_FREE_USERS.
|
||||
Can throw exceptions:
|
||||
@ -61,37 +58,23 @@ def create_contact(user: User, alias: Alias, contact_address: str) -> Contact:
|
||||
"""
|
||||
if not contact_address:
|
||||
raise ErrAddressInvalid("Empty address")
|
||||
try:
|
||||
contact_name, contact_email = parse_full_address(contact_address)
|
||||
except ValueError:
|
||||
output = contact_utils.create_contact(email=contact_address, alias=alias)
|
||||
if output.error == ContactCreateError.InvalidEmail:
|
||||
raise ErrAddressInvalid(contact_address)
|
||||
|
||||
contact_email = sanitize_email(contact_email)
|
||||
if not is_valid_email(contact_email):
|
||||
raise ErrAddressInvalid(contact_email)
|
||||
|
||||
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
|
||||
if contact:
|
||||
raise ErrContactAlreadyExists(contact)
|
||||
|
||||
if not user.can_create_contacts():
|
||||
elif output.error == ContactCreateError.NotAllowed:
|
||||
raise ErrContactErrorUpgradeNeeded()
|
||||
elif output.error is not None:
|
||||
raise ErrAddressInvalid("Invalid address")
|
||||
elif not output.created:
|
||||
raise ErrContactAlreadyExists(output.contact)
|
||||
|
||||
contact = Contact.create(
|
||||
user_id=alias.user_id,
|
||||
alias_id=alias.id,
|
||||
website_email=contact_email,
|
||||
name=contact_name,
|
||||
reply_email=generate_reply_email(contact_email, alias),
|
||||
)
|
||||
|
||||
contact = output.contact
|
||||
LOG.d(
|
||||
"create reverse-alias for %s %s, reverse alias:%s",
|
||||
contact_address,
|
||||
alias,
|
||||
contact.reply_email,
|
||||
)
|
||||
Session.commit()
|
||||
|
||||
return contact
|
||||
|
||||
@ -261,7 +244,7 @@ def alias_contact_manager(alias_id):
|
||||
if new_contact_form.validate():
|
||||
contact_address = new_contact_form.email.data.strip()
|
||||
try:
|
||||
contact = create_contact(current_user, alias, contact_address)
|
||||
contact = create_contact(alias, contact_address)
|
||||
except (
|
||||
ErrContactErrorUpgradeNeeded,
|
||||
ErrAddressInvalid,
|
||||
|
@ -7,7 +7,7 @@ from wtforms import StringField, validators, IntegerField
|
||||
|
||||
from app.constants import DMARC_RECORD
|
||||
from app.config import EMAIL_SERVERS_WITH_PRIORITY, EMAIL_DOMAIN
|
||||
from app.custom_domain_utils import delete_custom_domain
|
||||
from app.custom_domain_utils import delete_custom_domain, set_custom_domain_mailboxes
|
||||
from app.custom_domain_validation import CustomDomainValidation
|
||||
from app.dashboard.base import dashboard_bp
|
||||
from app.db import Session
|
||||
@ -16,7 +16,6 @@ from app.models import (
|
||||
Alias,
|
||||
DomainDeletedAlias,
|
||||
Mailbox,
|
||||
DomainMailbox,
|
||||
AutoCreateRule,
|
||||
AutoCreateRuleMailbox,
|
||||
)
|
||||
@ -220,40 +219,16 @@ def domain_detail(custom_domain_id):
|
||||
)
|
||||
elif request.form.get("form-name") == "update":
|
||||
mailbox_ids = request.form.getlist("mailbox_ids")
|
||||
# check if mailbox is not tempered with
|
||||
mailboxes = []
|
||||
for mailbox_id in mailbox_ids:
|
||||
mailbox = Mailbox.get(mailbox_id)
|
||||
if (
|
||||
not mailbox
|
||||
or mailbox.user_id != current_user.id
|
||||
or not mailbox.verified
|
||||
):
|
||||
flash("Something went wrong, please retry", "warning")
|
||||
return redirect(
|
||||
url_for(
|
||||
"dashboard.domain_detail", custom_domain_id=custom_domain.id
|
||||
)
|
||||
)
|
||||
mailboxes.append(mailbox)
|
||||
result = set_custom_domain_mailboxes(
|
||||
user_id=current_user.id,
|
||||
custom_domain=custom_domain,
|
||||
mailbox_ids=mailbox_ids,
|
||||
)
|
||||
|
||||
if not mailboxes:
|
||||
flash("You must select at least 1 mailbox", "warning")
|
||||
return redirect(
|
||||
url_for(
|
||||
"dashboard.domain_detail", custom_domain_id=custom_domain.id
|
||||
)
|
||||
)
|
||||
|
||||
# first remove all existing domain-mailboxes links
|
||||
DomainMailbox.filter_by(domain_id=custom_domain.id).delete()
|
||||
Session.flush()
|
||||
|
||||
for mailbox in mailboxes:
|
||||
DomainMailbox.create(domain_id=custom_domain.id, mailbox_id=mailbox.id)
|
||||
|
||||
Session.commit()
|
||||
flash(f"{custom_domain.domain} mailboxes has been updated", "success")
|
||||
if result.success:
|
||||
flash(f"{custom_domain.domain} mailboxes has been updated", "success")
|
||||
else:
|
||||
flash(result.reason.value, "warning")
|
||||
|
||||
return redirect(
|
||||
url_for("dashboard.domain_detail", custom_domain_id=custom_domain.id)
|
||||
|
@ -64,10 +64,6 @@ class EventDispatcher:
|
||||
)
|
||||
return
|
||||
|
||||
if config.EVENT_WEBHOOK_ENABLED_USER_IDS is not None:
|
||||
if user.id not in config.EVENT_WEBHOOK_ENABLED_USER_IDS:
|
||||
return
|
||||
|
||||
partner_user = EventDispatcher.__partner_user(user.id)
|
||||
if not partner_user:
|
||||
LOG.i(f"Not sending events because there's no partner user for user {user}")
|
||||
|
@ -213,7 +213,10 @@ def generate_activation_code(
|
||||
) -> MailboxActivation:
|
||||
clear_activation_codes_for_mailbox(mailbox)
|
||||
if use_digit_code:
|
||||
code = "{:06d}".format(random.randint(1, 999999))
|
||||
if config.MAILBOX_VERIFICATION_OVERRIDE_CODE:
|
||||
code = config.MAILBOX_VERIFICATION_OVERRIDE_CODE
|
||||
else:
|
||||
code = "{:06d}".format(random.randint(1, 999999))
|
||||
else:
|
||||
code = secrets.token_urlsafe(16)
|
||||
return MailboxActivation.create(
|
||||
|
@ -336,7 +336,7 @@ class Fido(Base, ModelMixin):
|
||||
class User(Base, ModelMixin, UserMixin, PasswordOracle):
|
||||
__tablename__ = "users"
|
||||
|
||||
FLAG_FREE_DISABLE_CREATE_ALIAS = 1 << 0
|
||||
FLAG_DISABLE_CREATE_CONTACTS = 1 << 0
|
||||
FLAG_CREATED_FROM_PARTNER = 1 << 1
|
||||
FLAG_FREE_OLD_ALIAS_LIMIT = 1 << 2
|
||||
FLAG_CREATED_ALIAS_FROM_PARTNER = 1 << 3
|
||||
@ -543,7 +543,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
|
||||
# bitwise flags. Allow for future expansion
|
||||
flags = sa.Column(
|
||||
sa.BigInteger,
|
||||
default=FLAG_FREE_DISABLE_CREATE_ALIAS,
|
||||
default=FLAG_DISABLE_CREATE_CONTACTS,
|
||||
server_default="0",
|
||||
nullable=False,
|
||||
)
|
||||
@ -1168,7 +1168,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
|
||||
def can_create_contacts(self) -> bool:
|
||||
if self.is_premium():
|
||||
return True
|
||||
if self.flags & User.FLAG_FREE_DISABLE_CREATE_ALIAS == 0:
|
||||
if self.flags & User.FLAG_DISABLE_CREATE_CONTACTS == 0:
|
||||
return True
|
||||
return not config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS
|
||||
|
||||
@ -2443,6 +2443,8 @@ class CustomDomain(Base, ModelMixin):
|
||||
unique=True,
|
||||
postgresql_where=Column("ownership_verified"),
|
||||
), # The condition
|
||||
Index("ix_custom_domain_user_id", "user_id"),
|
||||
Index("ix_custom_domain_pending_deletion", "pending_deletion"),
|
||||
)
|
||||
|
||||
user = orm.relationship(User, foreign_keys=[user_id], backref="custom_domains")
|
||||
|
@ -197,8 +197,8 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
|
||||
contact_email = mail_from
|
||||
contact_result = contact_utils.create_contact(
|
||||
email=contact_email,
|
||||
name=contact_name,
|
||||
alias=alias,
|
||||
name=contact_name,
|
||||
mail_from=mail_from,
|
||||
allow_empty_email=True,
|
||||
automatic_created=True,
|
||||
@ -229,7 +229,7 @@ def get_or_create_reply_to_contact(
|
||||
)
|
||||
return None
|
||||
|
||||
return contact_utils.create_contact(contact_address, contact_name, alias).contact
|
||||
return contact_utils.create_contact(contact_address, alias, contact_name).contact
|
||||
|
||||
|
||||
def replace_header_when_forward(msg: Message, alias: Alias, header: str):
|
||||
|
@ -72,7 +72,9 @@ class PostgresEventSource(EventSource):
|
||||
Session.close() # Ensure we get a new connection and we don't leave a dangling tx
|
||||
|
||||
def __connect(self):
|
||||
self.__connection = psycopg2.connect(self.__connection_string)
|
||||
self.__connection = psycopg2.connect(
|
||||
self.__connection_string, application_name="sl-event-listen"
|
||||
)
|
||||
|
||||
from app.db import Session
|
||||
|
||||
|
@ -247,12 +247,13 @@ def process_job(job: Job):
|
||||
domain_name = custom_domain.domain
|
||||
user = custom_domain.user
|
||||
|
||||
custom_domain_partner_id = custom_domain.partner_id
|
||||
CustomDomain.delete(custom_domain.id)
|
||||
Session.commit()
|
||||
|
||||
LOG.d("Domain %s deleted", domain_name)
|
||||
|
||||
if custom_domain.partner_id is None:
|
||||
if custom_domain_partner_id is None:
|
||||
send_email(
|
||||
user.email,
|
||||
f"Your domain {domain_name} has been deleted",
|
||||
|
@ -0,0 +1,27 @@
|
||||
"""custom domain indices
|
||||
|
||||
Revision ID: 62afa3a10010
|
||||
Revises: 88dd7a0abf54
|
||||
Create Date: 2024-09-30 11:40:04.127791
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '62afa3a10010'
|
||||
down_revision = '88dd7a0abf54'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
with op.get_context().autocommit_block():
|
||||
op.create_index('ix_custom_domain_pending_deletion', 'custom_domain', ['pending_deletion'], unique=False, postgresql_concurrently=True)
|
||||
op.create_index('ix_custom_domain_user_id', 'custom_domain', ['user_id'], unique=False, postgresql_concurrently=True)
|
||||
|
||||
|
||||
def downgrade():
|
||||
with op.get_context().autocommit_block():
|
||||
op.drop_index('ix_custom_domain_user_id', table_name='custom_domain', postgresql_concurrently=True)
|
||||
op.drop_index('ix_custom_domain_pending_deletion', table_name='custom_domain', postgresql_concurrently=True)
|
@ -94,6 +94,20 @@ def log_nb_db_connection():
|
||||
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)
|
||||
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def log_nb_db_connection_by_app_name():
|
||||
# get the number of connections to the DB
|
||||
rows = Session.execute(
|
||||
"SELECT application_name, count(datid) FROM pg_stat_activity group by application_name"
|
||||
)
|
||||
for row in rows:
|
||||
if row[0].find("sl-") == 0:
|
||||
LOG.d("number of db connections for app %s = %s", row[0], row[1])
|
||||
newrelic.agent.record_custom_metric(
|
||||
f"Custom/nb_db_app_connection/{row[0]}", row[1]
|
||||
)
|
||||
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def log_pending_to_process_events():
|
||||
r = Session.execute("select count(*) from sync_event WHERE taken_time IS NULL;")
|
||||
@ -131,7 +145,7 @@ def log_failed_events():
|
||||
"""
|
||||
SELECT COUNT(*)
|
||||
FROM sync_event
|
||||
WHERE retries >= 10;
|
||||
WHERE retry_count >= 10;
|
||||
""",
|
||||
)
|
||||
failed_events = list(r)[0][0]
|
||||
@ -148,6 +162,7 @@ if __name__ == "__main__":
|
||||
log_pending_to_process_events()
|
||||
log_events_pending_dead_letter()
|
||||
log_failed_events()
|
||||
log_nb_db_connection_by_app_name()
|
||||
Session.close()
|
||||
|
||||
exporter.run()
|
||||
|
@ -536,7 +536,7 @@ def test_create_contact_route_free_users(flask_client):
|
||||
assert r.status_code == 201
|
||||
|
||||
# End trial and disallow for new free users. Config should allow it
|
||||
user.flags = User.FLAG_FREE_DISABLE_CREATE_ALIAS
|
||||
user.flags = User.FLAG_DISABLE_CREATE_CONTACTS
|
||||
Session.commit()
|
||||
r = flask_client.post(
|
||||
url_for("api.create_contact_route", alias_id=alias.id),
|
||||
|
@ -4,7 +4,7 @@ from app.models import (
|
||||
Alias,
|
||||
Contact,
|
||||
)
|
||||
from tests.utils import login
|
||||
from tests.utils import login, random_email
|
||||
|
||||
|
||||
def test_add_contact_success(flask_client):
|
||||
@ -13,26 +13,28 @@ def test_add_contact_success(flask_client):
|
||||
|
||||
assert Contact.filter_by(user_id=user.id).count() == 0
|
||||
|
||||
email = random_email()
|
||||
# <<< Create a new contact >>>
|
||||
flask_client.post(
|
||||
url_for("dashboard.alias_contact_manager", alias_id=alias.id),
|
||||
data={
|
||||
"form-name": "create",
|
||||
"email": "abcd@gmail.com",
|
||||
"email": email,
|
||||
},
|
||||
follow_redirects=True,
|
||||
)
|
||||
# a new contact is added
|
||||
assert Contact.filter_by(user_id=user.id).count() == 1
|
||||
contact = Contact.filter_by(user_id=user.id).first()
|
||||
assert contact.website_email == "abcd@gmail.com"
|
||||
assert contact.website_email == email
|
||||
|
||||
# <<< Create a new contact using a full email format >>>
|
||||
email = random_email()
|
||||
flask_client.post(
|
||||
url_for("dashboard.alias_contact_manager", alias_id=alias.id),
|
||||
data={
|
||||
"form-name": "create",
|
||||
"email": "First Last <another@gmail.com>",
|
||||
"email": f"First Last <{email}>",
|
||||
},
|
||||
follow_redirects=True,
|
||||
)
|
||||
@ -41,7 +43,7 @@ def test_add_contact_success(flask_client):
|
||||
contact = (
|
||||
Contact.filter_by(user_id=user.id).filter(Contact.id != contact.id).first()
|
||||
)
|
||||
assert contact.website_email == "another@gmail.com"
|
||||
assert contact.website_email == email
|
||||
assert contact.name == "First Last"
|
||||
|
||||
# <<< Create a new contact with invalid email address >>>
|
||||
|
@ -1,15 +1,26 @@
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
|
||||
from app import config
|
||||
from app.contact_utils import create_contact, ContactCreateError
|
||||
from app.db import Session
|
||||
from app.models import (
|
||||
Alias,
|
||||
Contact,
|
||||
User,
|
||||
)
|
||||
from tests.utils import create_new_user, random_email, random_token
|
||||
|
||||
|
||||
def setup_module(module):
|
||||
config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = True
|
||||
|
||||
|
||||
def teardown_module(module):
|
||||
config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = False
|
||||
|
||||
|
||||
def create_provider():
|
||||
# name auto_created from_partner
|
||||
yield ["name", "a@b.c", True, True]
|
||||
@ -34,8 +45,8 @@ def test_create_contact(
|
||||
email = random_email()
|
||||
contact_result = create_contact(
|
||||
email,
|
||||
name,
|
||||
alias,
|
||||
name=name,
|
||||
mail_from=mail_from,
|
||||
automatic_created=automatic_created,
|
||||
from_partner=from_partner,
|
||||
@ -57,7 +68,7 @@ def test_create_contact_email_email_not_allowed():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
contact_result = create_contact("", "", alias)
|
||||
contact_result = create_contact("", alias)
|
||||
assert contact_result.contact is None
|
||||
assert contact_result.error == ContactCreateError.InvalidEmail
|
||||
|
||||
@ -66,21 +77,84 @@ def test_create_contact_email_email_allowed():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
contact_result = create_contact("", "", alias, allow_empty_email=True)
|
||||
contact_result = create_contact("", alias, allow_empty_email=True)
|
||||
assert contact_result.error is None
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.website_email == ""
|
||||
assert contact_result.contact.invalid_email
|
||||
|
||||
|
||||
def test_create_contact_name_overrides_email_name():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
email = random_email()
|
||||
name = random_token()
|
||||
contact_result = create_contact(f"superseeded <{email}>", alias, name=name)
|
||||
assert contact_result.error is None
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.website_email == email
|
||||
assert contact_result.contact.name == name
|
||||
|
||||
|
||||
def test_create_contact_name_taken_from_email():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
email = random_email()
|
||||
name = random_token()
|
||||
contact_result = create_contact(f"{name} <{email}>", alias)
|
||||
assert contact_result.error is None
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.website_email == email
|
||||
assert contact_result.contact.name == name
|
||||
|
||||
|
||||
def test_create_contact_empty_name_is_none():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
email = random_email()
|
||||
contact_result = create_contact(email, alias, name="")
|
||||
assert contact_result.error is None
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.website_email == email
|
||||
assert contact_result.contact.name is None
|
||||
|
||||
|
||||
def test_create_contact_free_user():
|
||||
user = create_new_user()
|
||||
user.trial_end = None
|
||||
user.flags = 0
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.flush()
|
||||
# Free users without the FREE_DISABLE_CREATE_CONTACTS
|
||||
result = create_contact(random_email(), alias)
|
||||
assert result.error is None
|
||||
assert result.created
|
||||
assert result.contact is not None
|
||||
assert not result.contact.automatic_created
|
||||
# Free users with the flag should be able to still create automatic emails
|
||||
user.flags = User.FLAG_DISABLE_CREATE_CONTACTS
|
||||
Session.flush()
|
||||
result = create_contact(random_email(), alias, automatic_created=True)
|
||||
assert result.error is None
|
||||
assert result.created
|
||||
assert result.contact is not None
|
||||
assert result.contact.automatic_created
|
||||
# Free users with the flag cannot create non-automatic emails
|
||||
result = create_contact(random_email(), alias)
|
||||
assert result.error == ContactCreateError.NotAllowed
|
||||
|
||||
|
||||
def test_do_not_allow_invalid_email():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
contact_result = create_contact("potato", "", alias)
|
||||
contact_result = create_contact("potato", alias)
|
||||
assert contact_result.contact is None
|
||||
assert contact_result.error == ContactCreateError.InvalidEmail
|
||||
contact_result = create_contact("asdf\x00@gmail.com", "", alias)
|
||||
contact_result = create_contact("asdf\x00@gmail.com", alias)
|
||||
assert contact_result.contact is None
|
||||
assert contact_result.error == ContactCreateError.InvalidEmail
|
||||
|
||||
@ -90,13 +164,15 @@ def test_update_name_for_existing():
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
email = random_email()
|
||||
contact_result = create_contact(email, "", alias)
|
||||
contact_result = create_contact(email, alias)
|
||||
assert contact_result.error is None
|
||||
assert contact_result.created
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.name == ""
|
||||
assert contact_result.contact.name is None
|
||||
name = random_token()
|
||||
contact_result = create_contact(email, name, alias)
|
||||
contact_result = create_contact(email, alias, name=name)
|
||||
assert contact_result.error is None
|
||||
assert not contact_result.created
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.name == name
|
||||
|
||||
@ -106,12 +182,15 @@ def test_update_mail_from_for_existing():
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
email = random_email()
|
||||
contact_result = create_contact(email, "", alias)
|
||||
contact_result = create_contact(email, alias)
|
||||
assert contact_result.error is None
|
||||
assert contact_result.created
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.mail_from is None
|
||||
mail_from = random_email()
|
||||
contact_result = create_contact(email, "", alias, mail_from=mail_from)
|
||||
contact_result = create_contact(email, alias, mail_from=mail_from)
|
||||
assert contact_result.error is None
|
||||
assert not contact_result.created
|
||||
assert contact_result.contact is not None
|
||||
assert contact_result.contact.mail_from == mail_from
|
||||
|
@ -7,11 +7,13 @@ from app.custom_domain_utils import (
|
||||
create_custom_domain,
|
||||
is_valid_domain,
|
||||
sanitize_domain,
|
||||
set_custom_domain_mailboxes,
|
||||
CannotUseDomainReason,
|
||||
CannotSetCustomDomainMailboxesCause,
|
||||
)
|
||||
from app.db import Session
|
||||
from app.models import User, CustomDomain, Mailbox
|
||||
from tests.utils import get_proton_partner
|
||||
from app.models import User, CustomDomain, Mailbox, DomainMailbox
|
||||
from tests.utils import get_proton_partner, random_email
|
||||
from tests.utils import create_new_user, random_string, random_domain
|
||||
|
||||
user: Optional[User] = None
|
||||
@ -147,3 +149,119 @@ def test_creates_custom_domain_with_partner_id():
|
||||
assert res.instance.domain == domain
|
||||
assert res.instance.user_id == user.id
|
||||
assert res.instance.partner_id == proton_partner.id
|
||||
|
||||
|
||||
# set_custom_domain_mailboxes
|
||||
def test_set_custom_domain_mailboxes_empty_list():
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain(), commit=True)
|
||||
res = set_custom_domain_mailboxes(user.id, domain, [])
|
||||
assert res.success is False
|
||||
assert res.reason == CannotSetCustomDomainMailboxesCause.NoMailboxes
|
||||
|
||||
|
||||
def test_set_custom_domain_mailboxes_mailbox_from_another_user():
|
||||
other_user = create_new_user()
|
||||
other_mailbox = Mailbox.create(
|
||||
user_id=other_user.id, email=random_email(), verified=True
|
||||
)
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain(), commit=True)
|
||||
|
||||
res = set_custom_domain_mailboxes(user.id, domain, [other_mailbox.id])
|
||||
assert res.success is False
|
||||
assert res.reason == CannotSetCustomDomainMailboxesCause.InvalidMailbox
|
||||
|
||||
|
||||
def test_set_custom_domain_mailboxes_mailbox_from_current_user_and_another_user():
|
||||
other_user = create_new_user()
|
||||
other_mailbox = Mailbox.create(
|
||||
user_id=other_user.id, email=random_email(), verified=True
|
||||
)
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain(), commit=True)
|
||||
|
||||
res = set_custom_domain_mailboxes(
|
||||
user.id, domain, [user.default_mailbox_id, other_mailbox.id]
|
||||
)
|
||||
assert res.success is False
|
||||
assert res.reason == CannotSetCustomDomainMailboxesCause.InvalidMailbox
|
||||
|
||||
|
||||
def test_set_custom_domain_mailboxes_success():
|
||||
other_mailbox = Mailbox.create(user_id=user.id, email=random_email(), verified=True)
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain(), commit=True)
|
||||
|
||||
res = set_custom_domain_mailboxes(
|
||||
user.id, domain, [user.default_mailbox_id, other_mailbox.id]
|
||||
)
|
||||
assert res.success is True
|
||||
assert res.reason is None
|
||||
|
||||
domain_mailboxes = DomainMailbox.filter_by(domain_id=domain.id).all()
|
||||
assert len(domain_mailboxes) == 2
|
||||
assert domain_mailboxes[0].domain_id == domain.id
|
||||
assert domain_mailboxes[0].mailbox_id == user.default_mailbox_id
|
||||
assert domain_mailboxes[1].domain_id == domain.id
|
||||
assert domain_mailboxes[1].mailbox_id == other_mailbox.id
|
||||
|
||||
|
||||
def test_set_custom_domain_mailboxes_set_twice():
|
||||
other_mailbox = Mailbox.create(user_id=user.id, email=random_email(), verified=True)
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain(), commit=True)
|
||||
|
||||
res = set_custom_domain_mailboxes(
|
||||
user.id, domain, [user.default_mailbox_id, other_mailbox.id]
|
||||
)
|
||||
assert res.success is True
|
||||
assert res.reason is None
|
||||
|
||||
res = set_custom_domain_mailboxes(
|
||||
user.id, domain, [user.default_mailbox_id, other_mailbox.id]
|
||||
)
|
||||
assert res.success is True
|
||||
assert res.reason is None
|
||||
|
||||
domain_mailboxes = DomainMailbox.filter_by(domain_id=domain.id).all()
|
||||
assert len(domain_mailboxes) == 2
|
||||
assert domain_mailboxes[0].domain_id == domain.id
|
||||
assert domain_mailboxes[0].mailbox_id == user.default_mailbox_id
|
||||
assert domain_mailboxes[1].domain_id == domain.id
|
||||
assert domain_mailboxes[1].mailbox_id == other_mailbox.id
|
||||
|
||||
|
||||
def test_set_custom_domain_mailboxes_removes_old_association():
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain(), commit=True)
|
||||
|
||||
res = set_custom_domain_mailboxes(user.id, domain, [user.default_mailbox_id])
|
||||
assert res.success is True
|
||||
assert res.reason is None
|
||||
|
||||
other_mailbox = Mailbox.create(
|
||||
user_id=user.id, email=random_email(), verified=True, commit=True
|
||||
)
|
||||
res = set_custom_domain_mailboxes(user.id, domain, [other_mailbox.id])
|
||||
assert res.success is True
|
||||
assert res.reason is None
|
||||
|
||||
domain_mailboxes = DomainMailbox.filter_by(domain_id=domain.id).all()
|
||||
assert len(domain_mailboxes) == 1
|
||||
assert domain_mailboxes[0].domain_id == domain.id
|
||||
assert domain_mailboxes[0].mailbox_id == other_mailbox.id
|
||||
|
||||
|
||||
def test_set_custom_domain_mailboxes_with_unverified_mailbox():
|
||||
domain = CustomDomain.create(user_id=user.id, domain=random_domain())
|
||||
verified_mailbox = Mailbox.create(
|
||||
user_id=user.id,
|
||||
email=random_email(),
|
||||
verified=True,
|
||||
)
|
||||
unverified_mailbox = Mailbox.create(
|
||||
user_id=user.id,
|
||||
email=random_email(),
|
||||
verified=False,
|
||||
)
|
||||
|
||||
res = set_custom_domain_mailboxes(
|
||||
user.id, domain, [verified_mailbox.id, unverified_mailbox.id]
|
||||
)
|
||||
assert res.success is False
|
||||
assert res.reason is CannotSetCustomDomainMailboxesCause.InvalidMailbox
|
||||
|
Reference in New Issue
Block a user