Compare commits

...

10 Commits

Author SHA1 Message Date
63ac89e952 4.49.6
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m15s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m18s
Build-Release-Image / Merge-Images (push) Successful in 13s
Build-Release-Image / Create-Release (push) Successful in 10s
Build-Release-Image / Notify (push) Successful in 3s
2024-08-28 12:00:07 +01:00
8896f00124 4.49.5
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m16s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m21s
Build-Release-Image / Merge-Images (push) Successful in 11s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 2s
2024-08-26 12:00:07 +01:00
d313c94f77 4.49.4
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m6s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m35s
Build-Release-Image / Merge-Images (push) Successful in 14s
Build-Release-Image / Create-Release (push) Successful in 10s
Build-Release-Image / Notify (push) Successful in 3s
2024-08-24 12:00:07 +01:00
39fcf2e48f 4.49.3
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m11s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m17s
Build-Release-Image / Merge-Images (push) Successful in 13s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 4s
2024-08-23 12:00:07 +01:00
41a5a65f51 4.49.2
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m14s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m27s
Build-Release-Image / Merge-Images (push) Successful in 13s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 2s
2024-08-21 12:00:07 +01:00
1d0c7ec4a0 4.49.0
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m32s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m40s
Build-Release-Image / Merge-Images (push) Successful in 18s
Build-Release-Image / Create-Release (push) Successful in 10s
Build-Release-Image / Notify (push) Successful in 3s
2024-08-19 12:00:06 +01:00
4de5b8eb6d 4.48.2
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m9s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m14s
Build-Release-Image / Merge-Images (push) Successful in 24s
Build-Release-Image / Create-Release (push) Successful in 8s
Build-Release-Image / Notify (push) Successful in 3s
2024-08-09 12:00:06 +01:00
0942f5eba3 4.48.0
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 2m58s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m24s
Build-Release-Image / Merge-Images (push) Successful in 12s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 19s
2024-08-05 12:00:06 +01:00
dae6f64482 4.47.2
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m33s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m39s
Build-Release-Image / Merge-Images (push) Successful in 12s
Build-Release-Image / Create-Release (push) Successful in 8s
Build-Release-Image / Notify (push) Successful in 5s
2024-07-30 12:00:06 +01:00
e7f0f81d85 4.46.4
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 2m53s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m13s
Build-Release-Image / Merge-Images (push) Successful in 12s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 2s
2024-07-13 12:00:06 +01:00
55 changed files with 1729 additions and 395 deletions

View File

@ -14,4 +14,4 @@ venv/
.venv
.coverage
htmlcov
.git/
.git/

3
app/.gitignore vendored
View File

@ -11,8 +11,7 @@ db.sqlite-journal
static/upload
venv/
.venv
.python-version
.coverage
htmlcov
adhoc
.env.*
.env.*

View File

@ -20,15 +20,15 @@ SimpleLogin backend consists of 2 main components:
## Install dependencies
The project requires:
- Python 3.7+ and [poetry](https://python-poetry.org/) to manage dependencies
- Python 3.10 and poetry to manage dependencies
- Node v10 for front-end.
- Postgres 12+
- Postgres 13+
First, install all dependencies by running the following command.
Feel free to use `virtualenv` or similar tools to isolate development environment.
```bash
poetry install
poetry sync
```
On Mac, sometimes you might need to install some other packages via `brew`:
@ -225,4 +225,27 @@ Now open http://localhost:1080/ (or http://localhost:1080/ for MailHog), you sho
Some features require a job handler (such as GDPR data export). To test such feature you need to run the job_runner
```bash
python job_runner.py
```
# Setup for Mac
There are several ways to setup Python and manage the project dependencies on Mac. For info we have successfully used this setup on a Mac silicon:
```bash
# we haven't managed to make python 3.12 work
brew install python3.10
# make sure to update the PATH so python, pip point to Python3
# for us it can be done by adding "export PATH=/opt/homebrew/opt/python@3.10/libexec/bin:$PATH" to .zprofile
# Although pipx is the recommended way to install poetry,
# install pipx via brew will automatically install python 3.12
# and poetry will then use python 3.12
# so we recommend using poetry this way instead
curl -sSL https://install.python-poetry.org | python3 -
poetry install
# activate the virtualenv and you should be good to go!
source .venv/bin/activate
```

View File

@ -541,7 +541,7 @@ exit
Once you've created all your desired login accounts, add these lines to `/simplelogin.env` to disable further registrations:
```
```.env
DISABLE_REGISTRATION=1
DISABLE_ONBOARDING=true
```

View File

@ -1,7 +1,10 @@
from __future__ import annotations
from typing import Optional
import arrow
import sqlalchemy
from flask_admin import BaseView
from flask_admin.form import SecureForm
from flask_admin.model.template import EndpointLinkRowAction
from markupsafe import Markup
@ -27,10 +30,26 @@ from app.models import (
Alias,
Newsletter,
PADDLE_SUBSCRIPTION_GRACE_DAYS,
Mailbox,
DeletedAlias,
DomainDeletedAlias,
)
from app.newsletter_utils import send_newsletter_to_user, send_newsletter_to_address
def _admin_action_formatter(view, context, model, name):
action_name = AuditLogActionEnum.get_name(model.action)
return "{} ({})".format(action_name, model.action)
def _admin_date_formatter(view, context, model, name):
return model.created_at.format()
def _user_upgrade_channel_formatter(view, context, model, name):
return Markup(model.upgrade_channel)
class SLModelView(sqla.ModelView):
column_default_sort = ("id", True)
column_display_pk = True
@ -95,11 +114,8 @@ class SLAdminIndexView(AdminIndexView):
return redirect("/admin/user")
def _user_upgrade_channel_formatter(view, context, model, name):
return Markup(model.upgrade_channel)
class UserAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["email", "id"]
column_exclude_list = [
"salt",
@ -118,6 +134,8 @@ class UserAdmin(SLModelView):
column_formatters = {
"upgrade_channel": _user_upgrade_channel_formatter,
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@action(
@ -344,17 +362,29 @@ def manual_upgrade(way: str, ids: [int], is_giveaway: bool):
class EmailLogAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id"]
column_filters = ["id", "user.email", "mailbox.email", "contact.website_email"]
can_edit = False
can_create = False
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
class AliasAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.email", "email", "mailbox.email"]
column_filters = ["id", "user.email", "email", "mailbox.email"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@action(
"disable_email_spoofing_check",
"Disable email spoofing protection",
@ -377,9 +407,15 @@ class AliasAdmin(SLModelView):
class MailboxAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.email", "email"]
column_filters = ["id", "user.email", "email"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
# class LifetimeCouponAdmin(SLModelView):
# can_edit = True
@ -387,14 +423,26 @@ class MailboxAdmin(SLModelView):
class CouponAdmin(SLModelView):
form_base_class = SecureForm
can_edit = False
can_create = True
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
class ManualSubscriptionAdmin(SLModelView):
form_base_class = SecureForm
can_edit = True
column_searchable_list = ["id", "user.email"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@action(
"extend_1y",
"Extend for 1 year",
@ -433,15 +481,27 @@ class ManualSubscriptionAdmin(SLModelView):
class CustomDomainAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["domain", "user.email", "user.id"]
column_exclude_list = ["ownership_txt_token"]
can_edit = False
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
class ReferralAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.email", "code", "name"]
column_filters = ["id", "user.email", "code", "name"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
def scaffold_list_columns(self):
ret = super().scaffold_list_columns()
ret.insert(0, "nb_user")
@ -457,16 +517,8 @@ class ReferralAdmin(SLModelView):
# can_delete = True
def _admin_action_formatter(view, context, model, name):
action_name = AuditLogActionEnum.get_name(model.action)
return "{} ({})".format(action_name, model.action)
def _admin_created_at_formatter(view, context, model, name):
return model.created_at.format()
class AdminAuditLogAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["admin.id", "admin.email", "model_id", "created_at"]
column_filters = ["admin.id", "admin.email", "model_id", "created_at"]
column_exclude_list = ["id"]
@ -477,7 +529,8 @@ class AdminAuditLogAdmin(SLModelView):
column_formatters = {
"action": _admin_action_formatter,
"created_at": _admin_created_at_formatter,
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@ -497,6 +550,7 @@ def _transactionalcomplaint_refused_email_id_formatter(view, context, model, nam
class ProviderComplaintAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.id", "created_at"]
column_filters = ["user.id", "state"]
column_hide_backrefs = False
@ -505,8 +559,8 @@ class ProviderComplaintAdmin(SLModelView):
can_delete = False
column_formatters = {
"created_at": _admin_created_at_formatter,
"updated_at": _admin_created_at_formatter,
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
"state": _transactionalcomplaint_state_formatter,
"phase": _transactionalcomplaint_phase_formatter,
"refused_email": _transactionalcomplaint_refused_email_id_formatter,
@ -567,6 +621,7 @@ def _newsletter_html_formatter(view, context, model: Newsletter, name):
class NewsletterAdmin(SLModelView):
form_base_class = SecureForm
list_template = "admin/model/newsletter-list.html"
edit_template = "admin/model/newsletter-edit.html"
edit_modal = False
@ -648,6 +703,7 @@ class NewsletterAdmin(SLModelView):
class NewsletterUserAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id"]
column_filters = ["id", "user.email", "newsletter.subject"]
column_exclude_list = ["created_at", "updated_at", "id"]
@ -657,17 +713,107 @@ class NewsletterUserAdmin(SLModelView):
class DailyMetricAdmin(SLModelView):
form_base_class = SecureForm
column_exclude_list = ["created_at", "updated_at", "id"]
can_export = True
class MetricAdmin(SLModelView):
form_base_class = SecureForm
column_exclude_list = ["created_at", "updated_at", "id"]
can_export = True
class InvalidMailboxDomainAdmin(SLModelView):
form_base_class = SecureForm
can_create = True
can_delete = True
class EmailSearchResult:
no_match: bool = True
alias: Optional[Alias] = None
mailbox: Optional[Mailbox] = None
deleted_alias: Optional[DeletedAlias] = None
deleted_custom_alias: Optional[DomainDeletedAlias] = None
user: Optional[User] = None
@staticmethod
def from_email(email: str) -> EmailSearchResult:
output = EmailSearchResult()
alias = Alias.get_by(email=email)
if alias:
output.alias = alias
output.no_match = False
return output
user = User.get_by(email=email)
if user:
output.user = user
output.no_match = False
return output
mailbox = Mailbox.get_by(email=email)
if mailbox:
output.mailbox = mailbox
output.no_match = False
return output
deleted_alias = DeletedAlias.get_by(email=email)
if deleted_alias:
output.deleted_alias = deleted_alias
output.no_match = False
return output
domain_deleted_alias = DomainDeletedAlias.get_by(email=email)
if domain_deleted_alias:
output.domain_deleted_alias = domain_deleted_alias
output.no_match = False
return output
class EmailSearchHelpers:
@staticmethod
def mailbox_list(user: User) -> list[Mailbox]:
return (
Mailbox.filter_by(user_id=user.id)
.order_by(Mailbox.id.asc())
.limit(10)
.all()
)
@staticmethod
def mailbox_count(user: User) -> int:
return Mailbox.filter_by(user_id=user.id).order_by(Mailbox.id.asc()).count()
@staticmethod
def alias_list(user: User) -> list[Alias]:
return Alias.filter_by(user_id=user.id).order_by(Alias.id.asc()).limit(10).all()
@staticmethod
def alias_count(user: User) -> int:
return Alias.filter_by(user_id=user.id).count()
class EmailSearchAdmin(BaseView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
@expose("/", methods=["GET", "POST"])
def index(self):
search = EmailSearchResult()
email = ""
if request.form and request.form["email"]:
email = request.form["email"]
email = email.strip()
search = EmailSearchResult.from_email(email)
return self.render(
"admin/email_search.html",
email=email,
data=search,
helper=EmailSearchHelpers,
)

View File

@ -64,8 +64,12 @@ def verify_prefix_suffix(
# SimpleLogin domain case:
# 1) alias_suffix must start with "." and
# 2) alias_domain_prefix must come from the word list
available_sl_domains = [
sl_domain.domain
for sl_domain in user.get_sl_domains(alias_options=alias_options)
]
if (
alias_domain in user.available_sl_domains(alias_options=alias_options)
alias_domain in available_sl_domains
and alias_domain not in user_custom_domains
# when DISABLE_ALIAS_SUFFIX is true, alias_domain_prefix is empty
and not config.DISABLE_ALIAS_SUFFIX
@ -80,9 +84,7 @@ def verify_prefix_suffix(
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
return False
if alias_domain not in user.available_sl_domains(
alias_options=alias_options
):
if alias_domain not in available_sl_domains:
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
return False

View File

@ -63,12 +63,16 @@ def get_user_if_alias_would_auto_create(
# Prevent addresses with unicode characters (🤯) in them for now.
validate_email(address, check_deliverability=False, allow_smtputf8=False)
except EmailNotValidError:
LOG.i(f"Not creating alias for {address} because email is invalid")
return None
domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user
)
if DomainDeletedAlias.get_by(email=address):
LOG.i(
f"Not creating alias for {address} because it was previously deleted for this domain"
)
return None
if domain_and_rule:
return domain_and_rule[0].user
@ -93,6 +97,9 @@ def check_if_alias_can_be_auto_created_for_custom_domain(
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if not custom_domain:
LOG.i(
f"Cannot auto-create custom domain alias for {address} because there's no custom domain for {alias_domain}"
)
return None
user: User = custom_domain.user
@ -108,6 +115,9 @@ def check_if_alias_can_be_auto_created_for_custom_domain(
if not custom_domain.catch_all:
if len(custom_domain.auto_create_rules) == 0:
LOG.i(
f"Cannot create alias {address} for domain {custom_domain} because it has no catch-all and no rules"
)
return None
local = get_email_local_part(address)
@ -121,7 +131,7 @@ def check_if_alias_can_be_auto_created_for_custom_domain(
)
return custom_domain, rule
else: # no rule passes
LOG.d("no rule passed to create %s", local)
LOG.d(f"No rule matches auto-create {address} for domain {custom_domain}")
return None
LOG.d("Create alias via catchall")
@ -148,6 +158,7 @@ def check_if_alias_can_be_auto_created_for_a_directory(
sep = "#"
else:
# if there's no directory separator in the alias, no way to auto-create it
LOG.info(f"Cannot auto-create {address} since it has no directory separator")
return None
directory_name = address[: address.find(sep)]
@ -155,6 +166,9 @@ def check_if_alias_can_be_auto_created_for_a_directory(
directory = Directory.get_by(name=directory_name)
if not directory:
LOG.info(
f"Cannot auto-create {address} because there is no directory for {directory_name}"
)
return None
user: User = directory.user
@ -163,12 +177,17 @@ def check_if_alias_can_be_auto_created_for_a_directory(
return None
if not user.can_create_new_alias():
LOG.d(f"{user} can't create new directory alias {address}")
LOG.d(
f"{user} can't create new directory alias {address} because user cannot create aliases"
)
if notify_user:
send_cannot_create_directory_alias(user, address, directory_name)
return None
if directory.disabled:
LOG.d(
f"{user} can't create new directory alias {address} bcause directory is disabled"
)
if notify_user:
send_cannot_create_directory_alias_disabled(user, address, directory_name)
return None
@ -311,7 +330,10 @@ def try_auto_create_via_domain(address: str) -> Optional[Alias]:
def delete_alias(
alias: Alias, user: User, reason: AliasDeleteReason = AliasDeleteReason.Unspecified
alias: Alias,
user: User,
reason: AliasDeleteReason = AliasDeleteReason.Unspecified,
commit: bool = False,
):
"""
Delete an alias and add it to either global or domain trash
@ -347,6 +369,8 @@ def delete_alias(
EventDispatcher.send_event(
user, EventContent(alias_deleted=AliasDeleted(alias_id=alias.id))
)
if commit:
Session.commit()
def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:

View File

@ -5,7 +5,6 @@ import arrow
from flask import Blueprint, request, jsonify, g
from flask_login import current_user
from app import constants
from app.db import Session
from app.models import ApiKey
@ -19,9 +18,10 @@ def authorize_request() -> Optional[Tuple[str, int]]:
api_key = ApiKey.get_by(code=api_code)
if not api_key:
if current_user.is_authenticated and request.headers.get(
constants.HEADER_ALLOW_API_COOKIES
):
if current_user.is_authenticated:
# if current_user.is_authenticated and request.headers.get(
# constants.HEADER_ALLOW_API_COOKIES
# ):
g.user = current_user
else:
return jsonify(error="Wrong api key"), 401

View File

@ -1,22 +1,18 @@
from smtplib import SMTPRecipientsRefused
import arrow
from flask import g
from flask import jsonify
from flask import request
from app import mailbox_utils
from app.api.base import api_bp, require_api_auth
from app.config import JOB_DELETE_MAILBOX
from app.dashboard.views.mailbox import send_verification_email
from app.dashboard.views.mailbox_detail import verify_mailbox_change
from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Mailbox, Job
from app.models import Mailbox
from app.utils import sanitize_email
@ -44,31 +40,15 @@ def create_mailbox():
user = g.user
mailbox_email = sanitize_email(request.get_json().get("email"))
if not user.is_premium():
return jsonify(error="Only premium plan can add additional mailbox"), 400
try:
new_mailbox = mailbox_utils.create_mailbox(user, mailbox_email).mailbox
except mailbox_utils.MailboxError as e:
return jsonify(error=e.msg), 400
if not is_valid_email(mailbox_email):
return jsonify(error=f"{mailbox_email} invalid"), 400
elif mailbox_already_used(mailbox_email, user):
return jsonify(error=f"{mailbox_email} already used"), 400
elif not email_can_be_used_as_mailbox(mailbox_email):
return (
jsonify(
error=f"{mailbox_email} cannot be used. Please note a mailbox cannot "
f"be a disposable email address"
),
400,
)
else:
new_mailbox = Mailbox.create(email=mailbox_email, user_id=user.id)
Session.commit()
send_verification_email(user, new_mailbox)
return (
jsonify(mailbox_to_dict(new_mailbox)),
201,
)
return (
jsonify(mailbox_to_dict(new_mailbox)),
201,
)
@api_bp.route("/mailboxes/<int:mailbox_id>", methods=["DELETE"])
@ -86,47 +66,17 @@ def delete_mailbox(mailbox_id):
"""
user = g.user
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != user.id:
return jsonify(error="Forbidden"), 403
if mailbox.id == user.default_mailbox_id:
return jsonify(error="You cannot delete the default mailbox"), 400
data = request.get_json() or {}
transfer_mailbox_id = data.get("transfer_aliases_to")
if transfer_mailbox_id and int(transfer_mailbox_id) >= 0:
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
transfer_mailbox_id = int(transfer_mailbox_id)
else:
transfer_mailbox_id = None
if not transfer_mailbox or transfer_mailbox.user_id != user.id:
return (
jsonify(error="You must transfer the aliases to a mailbox you own."),
403,
)
if transfer_mailbox_id == mailbox_id:
return (
jsonify(
error="You can not transfer the aliases to the mailbox you want to delete."
),
400,
)
if not transfer_mailbox.verified:
return jsonify(error="Your new mailbox is not verified"), 400
# Schedule delete account job
LOG.w("schedule delete mailbox job for %s", mailbox)
Job.create(
name=JOB_DELETE_MAILBOX,
payload={
"mailbox_id": mailbox.id,
"transfer_mailbox_id": transfer_mailbox_id,
},
run_at=arrow.now(),
commit=True,
)
try:
mailbox_utils.delete_mailbox(user, mailbox_id, transfer_mailbox_id)
except mailbox_utils.MailboxError as e:
return jsonify(error=e.msg), 400
return jsonify(deleted=True), 200

View File

@ -10,6 +10,7 @@ from app.api.base import api_bp, require_api_auth
from app.config import SESSION_COOKIE_NAME
from app.dashboard.views.index import get_stats
from app.db import Session
from app.image_validation import detect_image_format, ImageFormat
from app.models import ApiKey, File, PartnerUser, User
from app.proton.utils import get_proton_partner
from app.session import logout_session
@ -78,17 +79,18 @@ def update_user_info():
data = request.get_json() or {}
if "profile_picture" in data:
if data["profile_picture"] is None:
if user.profile_picture_id:
file = user.profile_picture
user.profile_picture_id = None
if user.profile_picture_id:
file = user.profile_picture
user.profile_picture_id = None
Session.flush()
if file:
File.delete(file.id)
s3.delete(file.path)
Session.flush()
if file:
File.delete(file.id)
s3.delete(file.path)
Session.flush()
else:
raw_data = base64.decodebytes(data["profile_picture"].encode())
if detect_image_format(raw_data) == ImageFormat.Unknown:
return jsonify(error="Unsupported image format"), 400
file_path = random_string(30)
file = File.create(user_id=user.id, path=file_path)
Session.flush()

View File

@ -115,7 +115,8 @@ def register():
def send_activation_email(user, next_url):
# the activation code is valid for 1h
# the activation code is valid for 1h and delete all previous codes
Session.query(ActivationCode).filter(ActivationCode.user_id == user.id).delete()
activation = ActivationCode.create(user_id=user.id, code=random_string(30))
Session.commit()

View File

@ -3,7 +3,7 @@ import random
import socket
import string
from ast import literal_eval
from typing import Callable, List
from typing import Callable, List, Optional
from urllib.parse import urlparse
from dotenv import load_dotenv
@ -588,3 +588,24 @@ EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
# We want it disabled by default, so only skip if defined
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ
def read_webhook_enabled_user_ids() -> Optional[List[int]]:
user_ids = os.environ.get("EVENT_WEBHOOK_ENABLED_USER_IDS", None)
if user_ids is None:
return None
ids = []
for user_id in user_ids.split(","):
try:
ids.append(int(user_id.strip()))
except ValueError:
pass
return ids
EVENT_WEBHOOK_ENABLED_USER_IDS: Optional[List[int]] = read_webhook_enabled_user_ids()
# Allow to define a different DB_URI for the event listener, in case we want to skip the connection pool
# It defaults to the regular DB_URI in case it's needed
EVENT_LISTENER_DB_URI = os.environ.get("EVENT_LISTENER_DB_URI", DB_URI)

View File

@ -145,7 +145,7 @@ def index():
LOG.i(f"User {current_user} requested deletion of alias {alias}")
email = alias.email
alias_utils.delete_alias(
alias, current_user, AliasDeleteReason.ManualAction
alias, current_user, AliasDeleteReason.ManualAction, commit=True
)
flash(f"Alias {email} has been deleted", "success")
elif request.form.get("form-name") == "disable-alias":

View File

@ -2,7 +2,6 @@ import base64
import binascii
import json
import arrow
from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from flask_wtf import FlaskForm
@ -10,19 +9,12 @@ from itsdangerous import TimestampSigner
from wtforms import validators, IntegerField
from wtforms.fields.html5 import EmailField
from app import parallel_limiter
from app.config import MAILBOX_SECRET, URL, JOB_DELETE_MAILBOX
from app import parallel_limiter, mailbox_utils, user_settings
from app.config import MAILBOX_SECRET
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import (
email_can_be_used_as_mailbox,
mailbox_already_used,
render,
send_email,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Mailbox, Job
from app.models import Mailbox
from app.utils import CSRFValidationForm
@ -58,120 +50,61 @@ def mailbox_route():
if not delete_mailbox_form.validate():
flash("Invalid request", "warning")
return redirect(request.url)
mailbox = Mailbox.get(delete_mailbox_form.mailbox_id.data)
if not mailbox or mailbox.user_id != current_user.id:
flash("Invalid mailbox. Refresh the page", "warning")
try:
mailbox = mailbox_utils.delete_mailbox(
current_user,
delete_mailbox_form.mailbox_id.data,
delete_mailbox_form.transfer_mailbox_id.data,
)
except mailbox_utils.MailboxError as e:
flash(e.msg, "warning")
return redirect(url_for("dashboard.mailbox_route"))
if mailbox.id == current_user.default_mailbox_id:
flash("You cannot delete default mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
transfer_mailbox_id = delete_mailbox_form.transfer_mailbox_id.data
if transfer_mailbox_id and transfer_mailbox_id > 0:
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
if not transfer_mailbox or transfer_mailbox.user_id != current_user.id:
flash(
"You must transfer the aliases to a mailbox you own.", "error"
)
return redirect(url_for("dashboard.mailbox_route"))
if transfer_mailbox.id == mailbox.id:
flash(
"You can not transfer the aliases to the mailbox you want to delete.",
"error",
)
return redirect(url_for("dashboard.mailbox_route"))
if not transfer_mailbox.verified:
flash("Your new mailbox is not verified", "error")
return redirect(url_for("dashboard.mailbox_route"))
# Schedule delete account job
LOG.w(
f"schedule delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}"
)
Job.create(
name=JOB_DELETE_MAILBOX,
payload={
"mailbox_id": mailbox.id,
"transfer_mailbox_id": transfer_mailbox_id
if transfer_mailbox_id > 0
else None,
},
run_at=arrow.now(),
commit=True,
)
flash(
f"Mailbox {mailbox.email} scheduled for deletion."
f"You will receive a confirmation email when the deletion is finished",
"success",
)
return redirect(url_for("dashboard.mailbox_route"))
if request.form.get("form-name") == "set-default":
if not csrf_form.validate():
flash("Invalid request", "warning")
return redirect(request.url)
mailbox_id = request.form.get("mailbox_id")
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != current_user.id:
flash("Unknown error. Refresh the page", "warning")
try:
mailbox_id = request.form.get("mailbox_id")
mailbox = user_settings.set_default_mailbox(current_user, mailbox_id)
except user_settings.CannotSetMailbox as e:
flash(e.msg, "warning")
return redirect(url_for("dashboard.mailbox_route"))
if mailbox.id == current_user.default_mailbox_id:
flash("This mailbox is already default one", "error")
return redirect(url_for("dashboard.mailbox_route"))
if not mailbox.verified:
flash("Cannot set unverified mailbox as default", "error")
return redirect(url_for("dashboard.mailbox_route"))
current_user.default_mailbox_id = mailbox.id
Session.commit()
flash(f"Mailbox {mailbox.email} is set as Default Mailbox", "success")
return redirect(url_for("dashboard.mailbox_route"))
elif request.form.get("form-name") == "create":
if not current_user.is_premium():
flash("Only premium plan can add additional mailbox", "warning")
if not new_mailbox_form.validate():
flash("Invalid request", "warning")
return redirect(request.url)
mailbox_email = new_mailbox_form.email.data.lower().strip().replace(" ", "")
try:
mailbox = mailbox_utils.create_mailbox(
current_user, mailbox_email
).mailbox
except mailbox_utils.MailboxError as e:
flash(e.msg, "warning")
return redirect(url_for("dashboard.mailbox_route"))
if new_mailbox_form.validate():
mailbox_email = (
new_mailbox_form.email.data.lower().strip().replace(" ", "")
flash(
f"You are going to receive an email to confirm {mailbox.email}.",
"success",
)
return redirect(
url_for(
"dashboard.mailbox_detail_route",
mailbox_id=mailbox.id,
)
if not is_valid_email(mailbox_email):
flash(f"{mailbox_email} invalid", "error")
elif mailbox_already_used(mailbox_email, current_user):
flash(f"{mailbox_email} already used", "error")
elif not email_can_be_used_as_mailbox(mailbox_email):
flash(f"You cannot use {mailbox_email}.", "error")
else:
new_mailbox = Mailbox.create(
email=mailbox_email, user_id=current_user.id
)
Session.commit()
send_verification_email(current_user, new_mailbox)
flash(
f"You are going to receive an email to confirm {mailbox_email}.",
"success",
)
return redirect(
url_for(
"dashboard.mailbox_detail_route",
mailbox_id=new_mailbox.id,
)
)
)
return render_template(
"dashboard/mailbox.html",
@ -182,34 +115,20 @@ def mailbox_route():
)
def send_verification_email(user, mailbox):
s = TimestampSigner(MAILBOX_SECRET)
encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8")
b64_data = base64.urlsafe_b64encode(encoded_data)
mailbox_id_signed = s.sign(b64_data).decode()
verification_url = (
URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}"
)
send_email(
mailbox.email,
f"Please confirm your mailbox {mailbox.email}",
render(
"transactional/verify-mailbox.txt.jinja2",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
),
render(
"transactional/verify-mailbox.html",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
),
)
@dashboard_bp.route("/mailbox_verify")
@login_required
def mailbox_verify():
mailbox_id = request.args.get("mailbox_id")
code = request.args.get("code")
if not code:
# Old way
return verify_with_signed_secret(mailbox_id)
mailbox = mailbox_utils.verify_mailbox_code(current_user, mailbox_id, code)
LOG.d("Mailbox %s is verified", mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)
def verify_with_signed_secret(request: str):
s = TimestampSigner(MAILBOX_SECRET)
mailbox_verify_request = request.args.get("mailbox_id")
try:

View File

@ -14,7 +14,7 @@ from flask_wtf import FlaskForm
from flask_wtf.file import FileField
from wtforms import StringField, validators
from app import s3
from app import s3, user_settings
from app.config import (
FIRST_ALIAS_DOMAIN,
ALIAS_RANDOM_SUFFIX_LENGTH,
@ -31,12 +31,10 @@ from app.models import (
PlanEnum,
File,
EmailChange,
CustomDomain,
AliasGeneratorEnum,
AliasSuffixEnum,
ManualSubscription,
SenderFormatEnum,
SLDomain,
CoinbaseSubscription,
AppleSubscription,
PartnerUser,
@ -166,38 +164,11 @@ def setting():
return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "change-random-alias-default-domain":
default_domain = request.form.get("random-alias-default-domain")
if default_domain:
sl_domain: SLDomain = SLDomain.get_by(domain=default_domain)
if sl_domain:
if sl_domain.premium_only and not current_user.is_premium():
flash("You cannot use this domain", "error")
return redirect(url_for("dashboard.setting"))
current_user.default_alias_public_domain_id = sl_domain.id
current_user.default_alias_custom_domain_id = None
else:
custom_domain = CustomDomain.get_by(domain=default_domain)
if custom_domain:
# sanity check
if (
custom_domain.user_id != current_user.id
or not custom_domain.verified
):
LOG.w(
"%s cannot use domain %s", current_user, custom_domain
)
flash(f"Domain {default_domain} can't be used", "error")
return redirect(request.url)
else:
current_user.default_alias_custom_domain_id = (
custom_domain.id
)
current_user.default_alias_public_domain_id = None
else:
current_user.default_alias_custom_domain_id = None
current_user.default_alias_public_domain_id = None
try:
user_settings.set_default_alias_domain(current_user, default_domain)
except user_settings.CannotSetAlias as e:
flash(e.msg, "error")
return redirect(url_for("dashboard.setting"))
Session.commit()
flash("Your preference has been updated", "success")

View File

@ -1,4 +1,5 @@
from io import BytesIO
from urllib.parse import urlparse
from flask import request, render_template, redirect, url_for, flash
from flask_login import current_user, login_required
@ -11,6 +12,7 @@ from app.config import ADMIN_EMAIL
from app.db import Session
from app.developer.base import developer_bp
from app.email_utils import send_email
from app.image_validation import detect_image_format, ImageFormat
from app.log import LOG
from app.models import Client, RedirectUri, File, Referral
from app.utils import random_string
@ -46,16 +48,25 @@ def client_detail(client_id):
approval_form.description.data = client.description
if action == "edit" and form.validate_on_submit():
parsed_url = urlparse(form.url.data)
if parsed_url.scheme != "https":
flash("Only https urls are allowed", "error")
return redirect(url_for("developer.index"))
client.name = form.name.data
client.home_url = form.url.data
if form.icon.data:
# todo: remove current icon if any
# todo: handle remove icon
icon_data = form.icon.data.read(10240)
if detect_image_format(icon_data) == ImageFormat.Unknown:
flash("Unknown file format", "warning")
return redirect(url_for("developer.index"))
if client.icon:
s3.delete(client.icon_id)
File.delete(client.icon)
file_path = random_string(30)
file = File.create(path=file_path, user_id=client.user_id)
s3.upload_from_bytesio(file_path, BytesIO(form.icon.data.read()))
s3.upload_from_bytesio(file_path, BytesIO(icon_data))
Session.flush()
LOG.d("upload file %s to s3", file)

View File

@ -1,3 +1,5 @@
from urllib.parse import urlparse
from flask import render_template, redirect, url_for, flash
from flask_login import current_user, login_required
from flask_wtf import FlaskForm
@ -20,6 +22,10 @@ def new_client():
if form.validate_on_submit():
client = Client.create_new(form.name.data, current_user.id)
parsed_url = urlparse(form.url.data)
if parsed_url.scheme != "https":
flash("Only https urls are allowed", "error")
return redirect(url_for("developer.new_client"))
client.home_url = form.url.data
Session.commit()

View File

@ -548,7 +548,9 @@ def can_create_directory_for_address(email_address: str) -> bool:
for domain in config.ALIAS_DOMAINS:
if email_address.endswith("@" + domain):
return True
LOG.i(
f"Cannot create address in directory for {email_address} since it does not belong to a valid directory domain"
)
return False

View File

@ -40,6 +40,10 @@ class EventDispatcher:
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
return
if config.EVENT_WEBHOOK_ENABLED_USER_IDS is not None:
if user.id not in config.EVENT_WEBHOOK_ENABLED_USER_IDS:
return
partner_user = EventDispatcher.__partner_user(user.id)
if not partner_user:
return

260
app/app/mailbox_utils.py Normal file
View File

@ -0,0 +1,260 @@
import dataclasses
import secrets
import random
from typing import Optional
import arrow
from app import config
from app.config import JOB_DELETE_MAILBOX
from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
send_email,
render,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import User, Mailbox, Job, MailboxActivation
@dataclasses.dataclass
class CreateMailboxOutput:
mailbox: Mailbox
activation: Optional[MailboxActivation]
class MailboxError(Exception):
def __init__(self, msg: str):
self.msg = msg
class OnlyPaidError(MailboxError):
def __init__(self):
self.msg = "Only available for paid plans"
class CannotVerifyError(MailboxError):
def __init__(self, msg: str):
self.msg = msg
MAX_ACTIVATION_TRIES = 3
def create_mailbox(
user: User,
email: str,
verified: bool = False,
send_email: bool = True,
use_digit_codes: bool = False,
send_link: bool = True,
) -> CreateMailboxOutput:
if not user.is_premium():
LOG.i(
f"User {user} has tried to create mailbox with {email} but is not premium"
)
raise OnlyPaidError()
if not is_valid_email(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but is not valid email"
)
raise MailboxError("Invalid email")
elif mailbox_already_used(email, user):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is already used"
)
raise MailboxError("Email already used")
elif not email_can_be_used_as_mailbox(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is invalid"
)
raise MailboxError("Invalid email")
new_mailbox = Mailbox.create(
email=email, user_id=user.id, verified=verified, commit=True
)
if verified:
LOG.i(f"User {user} as created a pre-verified mailbox with {email}")
return CreateMailboxOutput(mailbox=new_mailbox, activation=None)
LOG.i(f"User {user} has created mailbox with {email}")
activation = generate_activation_code(new_mailbox, use_digit_code=use_digit_codes)
output = CreateMailboxOutput(mailbox=new_mailbox, activation=activation)
if not send_email:
LOG.i(f"Skipping sending validation email for mailbox {new_mailbox}")
return output
send_verification_email(
user,
new_mailbox,
activation=activation,
send_link=send_link,
)
return output
def delete_mailbox(
user: User, mailbox_id: int, transfer_mailbox_id: Optional[int]
) -> Mailbox:
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != user.id:
LOG.i(
f"User {user} has tried to delete another user's mailbox with {mailbox_id}"
)
raise MailboxError("Invalid mailbox")
if mailbox.id == user.default_mailbox_id:
LOG.i(f"User {user} has tried to delete the default mailbox")
raise MailboxError("Cannot delete your default mailbox")
if transfer_mailbox_id and transfer_mailbox_id > 0:
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
if not transfer_mailbox or transfer_mailbox.user_id != user.id:
LOG.i(
f"User {user} has tried to transfer to a mailbox owned by another user"
)
raise MailboxError("You must transfer the aliases to a mailbox you own")
if transfer_mailbox.id == mailbox.id:
LOG.i(
f"User {user} has tried to transfer to the same mailbox he is deleting"
)
raise MailboxError(
"You can not transfer the aliases to the mailbox you want to delete"
)
if not transfer_mailbox.verified:
LOG.i(f"User {user} has tried to transfer to a non verified mailbox")
MailboxError("Your new mailbox is not verified")
# Schedule delete account job
LOG.i(
f"User {user} has scheduled delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}"
)
Job.create(
name=JOB_DELETE_MAILBOX,
payload={
"mailbox_id": mailbox.id,
"transfer_mailbox_id": transfer_mailbox_id
if transfer_mailbox_id and transfer_mailbox_id > 0
else None,
},
run_at=arrow.now(),
commit=True,
)
return mailbox
def clear_activation_codes_for_mailbox(mailbox: Mailbox):
Session.query(MailboxActivation).filter(
MailboxActivation.mailbox_id == mailbox.id
).delete()
Session.commit()
def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because it does not exist"
)
raise MailboxError("Invalid mailbox")
if mailbox.verified:
LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because it's already verified"
)
clear_activation_codes_for_mailbox(mailbox)
return mailbox
if mailbox.user_id != user.id:
LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because it's owned by another user"
)
raise MailboxError("Invalid mailbox")
activation = (
MailboxActivation.filter(MailboxActivation.mailbox_id == mailbox_id)
.order_by(MailboxActivation.created_at.desc())
.first()
)
if not activation:
LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because there is no activation"
)
raise MailboxError("Invalid code")
if activation.tries >= MAX_ACTIVATION_TRIES:
LOG.i(f"User {user} failed to verify mailbox {mailbox_id} more than 3 times")
clear_activation_codes_for_mailbox(mailbox)
raise CannotVerifyError("Invalid activation code. Please request another code.")
if activation.created_at < arrow.now().shift(minutes=-15):
LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because code is too old"
)
clear_activation_codes_for_mailbox(mailbox)
raise CannotVerifyError("Invalid activation code. Please request another code.")
if code != activation.code:
LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because code does not match"
)
activation.tries = activation.tries + 1
Session.commit()
raise CannotVerifyError("Invalid activation code")
LOG.i(f"User {user} has verified mailbox {mailbox_id}")
mailbox.verified = True
clear_activation_codes_for_mailbox(mailbox)
return mailbox
def generate_activation_code(
mailbox: Mailbox, use_digit_code: bool = False
) -> MailboxActivation:
clear_activation_codes_for_mailbox(mailbox)
if use_digit_code:
code = "{:06d}".format(random.randint(1, 999999))
else:
code = secrets.token_urlsafe(16)
return MailboxActivation.create(
mailbox_id=mailbox.id,
code=code,
tries=0,
commit=True,
)
def send_verification_email(
user: User, mailbox: Mailbox, activation: MailboxActivation, send_link: bool = True
):
LOG.i(
f"Sending mailbox verification email to {mailbox.email} with send link={send_link}"
)
if send_link:
verification_url = (
config.URL
+ "/dashboard/mailbox_verify"
+ f"?mailbox_id={mailbox.id}&code={activation.code}"
)
else:
verification_url = None
send_email(
mailbox.email,
f"Please confirm your mailbox {mailbox.email}",
render(
"transactional/verify-mailbox.txt.jinja2",
user=user,
code=activation.code,
link=verification_url,
mailbox_email=mailbox.email,
),
render(
"transactional/verify-mailbox.html",
user=user,
code=activation.code,
link=verification_url,
mailbox_email=mailbox.email,
),
)

View File

@ -985,8 +985,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
- the domain
"""
res = []
for domain in self.available_sl_domains(alias_options=alias_options):
res.append((True, domain))
for domain in self.get_sl_domains(alias_options=alias_options):
res.append((True, domain.domain))
for custom_domain in self.verified_custom_domains():
res.append((False, custom_domain.domain))
@ -1128,7 +1128,10 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
- Verified custom domains
"""
domains = self.available_sl_domains(alias_options=alias_options)
domains = [
sl_domain.domain
for sl_domain in self.get_sl_domains(alias_options=alias_options)
]
for custom_domain in self.verified_custom_domains():
domains.append(custom_domain.domain)
@ -2483,7 +2486,7 @@ class CustomDomain(Base, ModelMixin):
return sorted(self._auto_create_rules, key=lambda rule: rule.order)
def __repr__(self):
return f"<Custom Domain {self.domain}>"
return f"<Custom Domain {self.id} {self.domain}>"
class AutoCreateRule(Base, ModelMixin):
@ -2801,6 +2804,16 @@ class Mailbox(Base, ModelMixin):
return f"<Mailbox {self.id} {self.email}>"
class MailboxActivation(Base, ModelMixin):
__tablename__ = "mailbox_activation"
mailbox_id = sa.Column(
sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False, index=True
)
code = sa.Column(sa.String(32), nullable=False, index=True)
tries = sa.Column(sa.Integer, default=0, nullable=False)
class AccountActivation(Base, ModelMixin):
"""contains code to activate the user account when they sign up on mobile"""
@ -3114,7 +3127,7 @@ class SLDomain(Base, ModelMixin):
)
def __repr__(self):
return f"<SLDomain {self.domain} {'Premium' if self.premium_only else 'Free'}"
return f"<SLDomain {self.id} {self.domain} {'Premium' if self.premium_only else 'Free'}>"
class Monitoring(Base, ModelMixin):
@ -3484,6 +3497,7 @@ class AdminAuditLog(Base):
action=AuditLogActionEnum.stop_trial.value,
model="User",
model_id=user_id,
data={},
)
@classmethod
@ -3729,6 +3743,7 @@ class SyncEvent(Base, ModelMixin):
taken_time = sa.Column(
ArrowType, default=None, nullable=True, server_default=None, index=True
)
retry_count = sa.Column(sa.Integer, default=0, nullable=False, server_default="0")
__table_args__ = (
sa.Index("ix_sync_event_created_at", "created_at"),
@ -3750,7 +3765,7 @@ class SyncEvent(Base, ModelMixin):
return res.rowcount > 0
@classmethod
def get_dead_letter(cls, older_than: Arrow) -> [SyncEvent]:
def get_dead_letter(cls, older_than: Arrow, max_retries: int) -> [SyncEvent]:
return (
SyncEvent.filter(
(
@ -3763,6 +3778,7 @@ class SyncEvent(Base, ModelMixin):
& (SyncEvent.created_at < older_than)
)
)
& (SyncEvent.retry_count < max_retries)
)
.order_by(SyncEvent.id)
.limit(100)

View File

@ -1,7 +1,13 @@
from app.onboarding.base import onboarding_bp
from flask import render_template
from flask import render_template, url_for, redirect
@onboarding_bp.route("/", methods=["GET"])
def index():
return render_template("onboarding/index.html")
# Do the redirect to ensure cookies are set because they are SameSite=lax/strict
return redirect(url_for("onboarding.setup"))
@onboarding_bp.route("/setup", methods=["GET"])
def setup():
return render_template("onboarding/setup.html")

View File

@ -2,6 +2,7 @@ import requests
from requests import RequestException
from app import config
from app.db import Session
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
from app.log import LOG
@ -29,10 +30,11 @@ def execute_subscription_webhook(user: User):
LOG.i("Sent request to subscription update webhook successfully")
else:
LOG.i(
f"Request to webhook failed with statue {response.status_code}: {response.text}"
f"Request to webhook failed with status {response.status_code}: {response.text}"
)
except RequestException as e:
LOG.error(f"Subscription request exception: {e}")
event = UserPlanChanged(plan_end_time=sl_subscription_end)
EventDispatcher.send_event(user, EventContent(user_plan_change=event))
Session.commit()

71
app/app/user_settings.py Normal file
View File

@ -0,0 +1,71 @@
from typing import Optional
from app.db import Session
from app.log import LOG
from app.models import User, SLDomain, CustomDomain, Mailbox
class CannotSetAlias(Exception):
def __init__(self, msg: str):
self.msg = msg
class CannotSetMailbox(Exception):
def __init__(self, msg: str):
self.msg = msg
def set_default_alias_domain(user: User, domain_name: Optional[str]):
if not domain_name:
LOG.i(f"User {user} has set no domain as default domain")
user.default_alias_public_domain_id = None
user.default_alias_custom_domain_id = None
Session.flush()
return
sl_domain: SLDomain = SLDomain.get_by(domain=domain_name)
if sl_domain:
if sl_domain.hidden:
LOG.i(f"User {user} has tried to set up a hidden domain as default domain")
raise CannotSetAlias("Domain does not exist")
if sl_domain.premium_only and not user.is_premium():
LOG.i(f"User {user} has tried to set up a premium domain as default domain")
raise CannotSetAlias("You cannot use this domain")
LOG.i(f"User {user} has set public {sl_domain} as default domain")
user.default_alias_public_domain_id = sl_domain.id
user.default_alias_custom_domain_id = None
Session.flush()
return
custom_domain = CustomDomain.get_by(domain=domain_name)
if not custom_domain:
LOG.i(
f"User {user} has tried to set up an non existing domain as default domain"
)
raise CannotSetAlias("Domain does not exist or it hasn't been verified")
if custom_domain.user_id != user.id or not custom_domain.verified:
LOG.i(
f"User {user} has tried to set domain {custom_domain} as default domain that does not belong to the user or that is not verified"
)
raise CannotSetAlias("Domain does not exist or it hasn't been verified")
LOG.i(f"User {user} has set custom {custom_domain} as default domain")
user.default_alias_public_domain_id = None
user.default_alias_custom_domain_id = custom_domain.id
Session.flush()
def set_default_mailbox(user: User, mailbox_id: int) -> Mailbox:
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != user.id:
raise CannotSetMailbox("Invalid mailbox")
if not mailbox.verified:
raise CannotSetMailbox("This is mailbox is not verified")
if mailbox.id == user.default_mailbox_id:
return mailbox
LOG.i(f"User {user} has set mailbox {mailbox} as his default one")
user.default_mailbox_id = mailbox.id
Session.commit()
return mailbox

View File

@ -262,7 +262,8 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
Session.commit()
except IntegrityError:
# No need to manually rollback, as IntegrityError already rolls back
# If the tx has been rolled back, the connection is borked. Force close to try to get a new one and start fresh
Session.close()
LOG.info(
f"Contact with email {contact_email} for alias_id {alias_id} already existed, fetching from DB"
)

View File

@ -2,12 +2,15 @@ import argparse
from enum import Enum
from sys import argv, exit
from app.config import DB_URI
from app.config import EVENT_LISTENER_DB_URI
from app.log import LOG
from events import event_debugger
from events.runner import Runner
from events.event_source import DeadLetterEventSource, PostgresEventSource
from events.event_sink import ConsoleEventSink, HttpEventSink
_DEFAULT_MAX_RETRIES = 100
class Mode(Enum):
DEAD_LETTER = "dead_letter"
@ -23,13 +26,13 @@ class Mode(Enum):
raise ValueError(f"Invalid mode: {value}")
def main(mode: Mode, dry_run: bool):
def main(mode: Mode, dry_run: bool, max_retries: int):
if mode == Mode.DEAD_LETTER:
LOG.i("Using DeadLetterEventSource")
source = DeadLetterEventSource()
source = DeadLetterEventSource(max_retries)
elif mode == Mode.LISTENER:
LOG.i("Using PostgresEventSource")
source = PostgresEventSource(DB_URI)
source = PostgresEventSource(EVENT_LISTENER_DB_URI)
else:
raise ValueError(f"Invalid mode: {mode}")
@ -44,21 +47,67 @@ def main(mode: Mode, dry_run: bool):
runner.run()
def debug_event(event_id: str):
LOG.i(f"Debugging event {event_id}")
try:
event_id_int = int(event_id)
except ValueError:
raise ValueError(f"Invalid event id: {event_id}")
event_debugger.debug_event(event_id_int)
def run_event(event_id: str, delete_on_success: bool):
LOG.i(f"Running event {event_id}")
try:
event_id_int = int(event_id)
except ValueError:
raise ValueError(f"Invalid event id: {event_id}")
event_debugger.run_event(event_id_int, delete_on_success)
def args():
parser = argparse.ArgumentParser(description="Run event listener")
parser.add_argument(
"mode",
help="Mode to run",
choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value],
subparsers = parser.add_subparsers(dest="command")
listener_parser = subparsers.add_parser(Mode.LISTENER.value)
listener_parser.add_argument(
"--max-retries", type=int, default=_DEFAULT_MAX_RETRIES
)
parser.add_argument("--dry-run", help="Dry run mode", action="store_true")
listener_parser.add_argument("--dry-run", action="store_true")
dead_letter_parser = subparsers.add_parser(Mode.DEAD_LETTER.value)
dead_letter_parser.add_argument(
"--max-retries", type=int, default=_DEFAULT_MAX_RETRIES
)
dead_letter_parser.add_argument("--dry-run", action="store_true")
debug_parser = subparsers.add_parser("debug")
debug_parser.add_argument("event_id", help="ID of the event to debug")
run_parser = subparsers.add_parser("run")
run_parser.add_argument("event_id", help="ID of the event to run")
run_parser.add_argument("--delete-on-success", action="store_true")
return parser.parse_args()
if __name__ == "__main__":
if len(argv) < 2:
print("Invalid usage. Pass 'listener' or 'dead_letter' as argument")
print("Invalid usage. Pass a valid subcommand as argument")
exit(1)
args = args()
main(Mode.from_str(args.mode), args.dry_run)
if args.command in [Mode.LISTENER.value, Mode.DEAD_LETTER.value]:
main(
mode=Mode.from_str(args.command),
dry_run=args.dry_run,
max_retries=args.max_retries,
)
elif args.command == "debug":
debug_event(args.event_id)
elif args.command == "run":
run_event(args.event_id, args.delete_on_success)
else:
print("Invalid command")
exit(1)

View File

@ -0,0 +1,43 @@
from app.events.generated import event_pb2
from app.models import SyncEvent
from events.event_sink import HttpEventSink
def debug_event(event_id: int):
event = SyncEvent.get_by(id=event_id)
if not event:
print("Event not found")
return
print(f"Info for event {event_id}")
print(f"- Created at: {event.created_at}")
print(f"- Updated at: {event.updated_at}")
print(f"- Taken time: {event.taken_time}")
print(f"- Retry count: {event.retry_count}")
print()
print("Event contents")
event_contents = event.content
parsed = event_pb2.Event.FromString(event_contents)
print(f"- UserID: {parsed.user_id}")
print(f"- ExternalUserID: {parsed.external_user_id}")
print(f"- PartnerID: {parsed.partner_id}")
content = parsed.content
print(f"Content: {content}")
def run_event(event_id: int, delete_on_success: bool = True):
event = SyncEvent.get_by(id=event_id)
if not event:
print("Event not found")
return
print(f"Processing event {event_id}")
sink = HttpEventSink()
res = sink.process(event)
if res:
print(f"Processed event {event_id}")
if delete_on_success:
SyncEvent.delete(event_id, commit=True)

View File

@ -4,6 +4,8 @@ import psycopg2
import select
from abc import ABC, abstractmethod
from app.db import Session
from app.log import LOG
from app.models import SyncEvent
from app.events.event_dispatcher import NOTIFICATION_CHANNEL
@ -44,6 +46,7 @@ class PostgresEventSource(EventSource):
cursor = self.__connection.cursor()
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
LOG.info("Starting to listen to events")
while True:
if select.select([self.__connection], [], [], 5) != ([], [], []):
self.__connection.poll()
@ -66,6 +69,7 @@ class PostgresEventSource(EventSource):
LOG.info(f"Could not find event with id={notify.payload}")
except Exception as e:
LOG.warn(f"Error getting event: {e}")
Session.close() # Ensure we get a new connection and we don't leave a dangling tx
def __connect(self):
self.__connection = psycopg2.connect(self.__connection_string)
@ -76,6 +80,9 @@ class PostgresEventSource(EventSource):
class DeadLetterEventSource(EventSource):
def __init__(self, max_retries: int):
self.__max_retries = max_retries
@newrelic.agent.background_task()
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
while True:
@ -83,7 +90,9 @@ class DeadLetterEventSource(EventSource):
threshold = arrow.utcnow().shift(
minutes=-_DEAD_LETTER_THRESHOLD_MINUTES
)
events = SyncEvent.get_dead_letter(older_than=threshold)
events = SyncEvent.get_dead_letter(
older_than=threshold, max_retries=self.__max_retries
)
if events:
LOG.info(f"Got {len(events)} dead letter events")
if events:
@ -92,7 +101,8 @@ class DeadLetterEventSource(EventSource):
)
for event in events:
on_event(event)
else:
Session.close() # Ensure that we have a new connection and we don't have a dangling tx with a lock
if not events:
LOG.debug("No dead letter events")
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
except Exception as e:

View File

@ -2,6 +2,7 @@ import arrow
import newrelic.agent
from app.log import LOG
from app.db import Session
from app.models import SyncEvent
from events.event_sink import EventSink
from events.event_source import EventSource
@ -37,6 +38,9 @@ class Runner:
"Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(),
)
else:
event.retry_count = event.retry_count + 1
Session.commit()
except Exception as e:
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)

View File

@ -14,6 +14,7 @@ from app.email_utils import (
send_email,
render,
)
from app.events.event_dispatcher import PostgresDispatcher
from app.import_utils import handle_batch_import
from app.jobs.event_jobs import send_alias_creation_events_for_user
from app.jobs.export_user_data_job import ExportUserDataJob
@ -276,7 +277,9 @@ SimpleLogin team.
user = User.get(user_id)
if user and user.activated:
LOG.d(f"Sending alias creation events for {user}")
send_alias_creation_events_for_user(user)
send_alias_creation_events_for_user(
user, dispatcher=PostgresDispatcher.get()
)
else:
LOG.e("Unknown job name %s", job.name)

View File

@ -745,8 +745,6 @@ bullish
bullpen
bullring
bullseye
bullwhip
bully
bunch
bundle
bungee
@ -1149,7 +1147,6 @@ coherence
coherent
cohesive
coil
coke
cola
cold
coleslaw
@ -1674,8 +1671,6 @@ delta
deluge
delusion
deluxe
demanding
demeaning
demeanor
demise
democracy
@ -1897,9 +1892,6 @@ divisible
divisibly
division
divisive
divorcee
dizziness
dizzy
doable
docile
dock
@ -1913,7 +1905,6 @@ dole
dollar
dollhouse
dollop
dolly
dolphin
domain
domelike
@ -2027,7 +2018,6 @@ duh
duke
dumping
dumpling
dumpster
duo
dupe
duplex
@ -2036,14 +2026,12 @@ duplicity
durable
durably
duration
duress
during
dusk
dust
dutiful
duty
duvet
dwarf
dweeb
dwelled
dweller
@ -3782,10 +3770,6 @@ makeshift
making
malformed
malt
mama
mammal
mammary
mammogram
manager
managing
manatee
@ -3798,7 +3782,6 @@ mangle
mango
mangy
manhandle
manhole
manhood
manhunt
manicotti
@ -3813,7 +3796,6 @@ manmade
manned
mannish
manor
manpower
mantis
mantra
manual
@ -3850,7 +3832,6 @@ mashed
mashing
massager
masses
massive
mastiff
matador
matchbook
@ -3863,15 +3844,11 @@ maternal
maternity
math
mating
matriarch
matrimony
matrix
matron
matted
matter
maturely
maturing
maturity
mauve
maverick
maximize
@ -3891,9 +3868,6 @@ modify
modular
modulator
module
moisten
moistness
moisture
molar
molasses
mold
@ -3946,11 +3920,7 @@ morality
morally
morbidity
morbidly
morphine
morphing
morse
mortality
mortally
mortician
mortified
mortify
@ -3976,7 +3946,6 @@ mournful
mouse
mousiness
moustache
mousy
mouth
movable
move
@ -3985,7 +3954,6 @@ moving
mower
mowing
much
muck
mud
mug
mulberry
@ -4002,7 +3970,6 @@ mumbling
mumbo
mummified
mummify
mummy
mumps
munchkin
mundane
@ -4798,7 +4765,6 @@ princess
print
prior
prism
prison
prissy
pristine
privacy
@ -4822,8 +4788,6 @@ prodigal
prodigy
produce
product
profane
profanity
professed
professor
profile
@ -5992,10 +5956,6 @@ slit
sliver
slobbery
slogan
sloped
sloping
sloppily
sloppy
slot
slouching
slouchy
@ -6011,7 +5971,6 @@ smartness
smasher
smashing
smashup
smell
smelting
smile
smilingly
@ -6021,11 +5980,6 @@ smith
smitten
smock
smog
smoked
smokeless
smokiness
smoking
smoky
smolder
smooth
smother
@ -6047,7 +6001,6 @@ sneer
sneeze
sneezing
snide
sniff
snippet
snipping
snitch
@ -6203,7 +6156,6 @@ squiggle
squiggly
squint
squire
squirt
squishier
squishy
stability
@ -6323,7 +6275,6 @@ stoning
stony
stood
stooge
stool
stoop
stoplight
stoppable
@ -6458,12 +6409,9 @@ subwoofer
subzero
succulent
such
suction
sudden
sudoku
suds
sufferer
suffering
suffice
suffix
suffocate
@ -6515,7 +6463,6 @@ surplus
surprise
surreal
surrender
surrogate
surround
survey
survival
@ -6528,7 +6475,6 @@ suspend
suspense
sustained
sustainer
swab
swaddling
swagger
swampland
@ -6536,7 +6482,6 @@ swan
swapping
swarm
sway
swear
sweat
sweep
swell
@ -6605,9 +6550,6 @@ talcum
talisman
tall
talon
tamale
tameness
tamer
tamper
tank
tanned
@ -6647,7 +6589,6 @@ thaw
theater
theatrics
thee
theft
theme
theology
theorize
@ -6752,7 +6693,6 @@ trade
trading
tradition
traffic
tragedy
trailing
trailside
train
@ -6772,7 +6712,6 @@ trapped
trapper
trapping
traps
trash
travel
traverse
travesty

View File

@ -0,0 +1,28 @@
"""add retry count to sync event
Revision ID: 56d08955fcab
Revises: d608b8e48082
Create Date: 2024-07-19 08:21:19.979973
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '56d08955fcab'
down_revision = 'd608b8e48082'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('sync_event', sa.Column('retry_count', sa.Integer(), server_default='0', nullable=False, default=0))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('sync_event', 'retry_count')
# ### end Alembic commands ###

View File

@ -0,0 +1,42 @@
"""empty message
Revision ID: 1c14339aae90
Revises: 56d08955fcab
Create Date: 2024-07-30 11:46:32.460221
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1c14339aae90'
down_revision = '56d08955fcab'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('mailbox_activation',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False),
sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True),
sa.Column('mailbox_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=32), nullable=False),
sa.Column('tries', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['mailbox_id'], ['mailbox.id'], ondelete='cascade'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_mailbox_activation_code'), 'mailbox_activation', ['code'], unique=False)
op.create_index(op.f('ix_mailbox_activation_mailbox_id'), 'mailbox_activation', ['mailbox_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_mailbox_activation_mailbox_id'), table_name='mailbox_activation')
op.drop_index(op.f('ix_mailbox_activation_code'), table_name='mailbox_activation')
op.drop_table('mailbox_activation')
# ### end Alembic commands ###

View File

@ -12,10 +12,10 @@ docker run -p 25432:5432 --name ${container_name} -e POSTGRES_PASSWORD=postgres
sleep 3
# upgrade the DB to the latest stage and
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl poetry run alembic upgrade head
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl rye run alembic upgrade head
# generate the migration script.
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl poetry run alembic revision --autogenerate $@
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl rye run alembic revision --autogenerate $@
# remove the db
docker rm -f ${container_name}

View File

@ -45,6 +45,7 @@ from app.admin_model import (
DailyMetricAdmin,
MetricAdmin,
InvalidMailboxDomainAdmin,
EmailSearchAdmin,
)
from app.api.base import api_bp
from app.auth.base import auth_bp
@ -200,7 +201,7 @@ def create_app() -> Flask:
"username": "admin",
"password": FLASK_PROFILER_PASSWORD,
},
"ignore": ["^/static/.*", "/git", "/exception"],
"ignore": ["^/static/.*", "/git", "/exception", "/health"],
}
flask_profiler.init_app(app)
@ -218,6 +219,10 @@ def create_app() -> Flask:
def cleanup(resp_or_exc):
Session.remove()
@app.route("/health", methods=["GET"])
def healthcheck():
return "success", 200
return app
@ -282,7 +287,9 @@ def set_index_page(app):
and not request.path.startswith("/_debug_toolbar")
and not request.path.startswith("/git")
and not request.path.startswith("/favicon.ico")
and not request.path.startswith("/health")
):
start_time = g.start_time or time.time()
LOG.d(
"%s %s %s %s %s, takes %s",
request.remote_addr,
@ -290,7 +297,7 @@ def set_index_page(app):
request.path,
request.args,
res.status_code,
time.time() - g.start_time,
time.time() - start_time,
)
return res
@ -780,6 +787,7 @@ def init_admin(app):
admin.add_view(UserAdmin(User, Session))
admin.add_view(AliasAdmin(Alias, Session))
admin.add_view(MailboxAdmin(Mailbox, Session))
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search"))
admin.add_view(CouponAdmin(Coupon, Session))
admin.add_view(ManualSubscriptionAdmin(ManualSubscription, Session))
admin.add_view(CustomDomainAdmin(CustomDomain, Session))

View File

@ -0,0 +1,251 @@
{% extends 'admin/master.html' %}
{% macro show_user(user) -%}
<h4>User {{ user.email }} with ID {{ user.id }}.</h4>
<table class="table">
<thead>
<tr>
<th scope="col">User ID</th>
<th scope="col">Email</th>
<th scope="col">Paid</th>
<th>Subscription</th>
<th>Created At</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{ user.id }}</td>
<td>{{ user.email }}</td>
<td>{{ "yes" if user.is_paid() else No }}</td>
<td>{{ user.get_active_subscription() }}</td>
<td>{{ user.created_at }}</td>
</tr>
</tbody>
</table>
{%- endmacro %}
{% macro list_mailboxes(mbox_count, mboxes) %}
<h4>
{{ mbox_count }} Mailboxes found.
{% if mbox_count>10 %}Showing only the first 10.{% endif %}
</h4>
<table class="table">
<thead>
<tr>
<th>Mailbox ID</th>
<th>Email</th>
<th>Verified</th>
<th>Created At</th>
</tr>
</thead>
<tbody>
{% for mailbox in mboxes %}
<tr>
<td>{{ mailbox.id }}</td>
<td>{{ mailbox.email }}</td>
<td>{{ "Yes" if mailbox.verified else "No" }}</td>
<td>{{ mailbox.created_at }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro list_alias(alias_count, aliases) %}
<h4>
{{ alias_count }} Aliases found.
{% if alias_count>10 %}Showing only the first 10.{% endif %}
</h4>
<table class="table">
<thead>
<tr>
<th>Alias ID</th>
<th>Email</th>
<th>Verified</th>
<th>Created At</th>
</tr>
</thead>
<tbody>
{% for alias in aliases %}
<tr>
<td>{{ alias.id }}</td>
<td>{{ alias.email }}</td>
<td>{{ "Yes" if alias.verified else "No" }}</td>
<td>
{{ alias.created_at }}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro show_deleted_alias(deleted_alias) -%}
<h4>
Deleted Alias {{ deleted_alias.email }} with ID {{ deleted_alias.id }}.
</h4>
<table class="table">
<thead>
<tr>
<th scope="col">
Deleted Alias ID
</th>
<th scope="col">
Email
</th>
<th scope="col">
Deleted At
</th>
<th scope="col">
Reason
</th>
</tr>
</thead>
<tbody>
<tr>
<td>
{{ deleted_alias.id }}
</td>
<td>
{{ deleted_alias.email }}
</td>
<td>
{{ deleted_alias.created_at }}
</td>
<td>
{{ deleted_alias.reason }}
</td>
</tr>
</tbody>
</table>
{%- endmacro %}
{% macro show_domain_deleted_alias(dom_deleted_alias) -%}
<h4>
Domain Deleted Alias {{ dom_deleted_alias.email }} with ID {{ dom_deleted_alias.id }} for domain {{ dom_deleted_alias.domain.domain }}
</h4>
<table class="table">
<thead>
<tr>
<th scope="col">
Deleted Alias ID
</th>
<th scope="col">
Email
</th>
<th scope="col">
Domain
</th>
<th scope="col">
Domain ID
</th>
<th scope="col">
Domain owner user ID
</th>
<th scope="col">
Domain owner user email
</th>
<th scope="col">
Deleted At
</th>
</tr>
</thead>
<tbody>
<tr>
<td>
{{ dom_deleted_alias.id }}
</td>
<td>
{{ dom_deleted_alias.email }}
</td>
<td>
{{ dom_deleted_alias.domain.domain }}
</td>
<td>
{{ dom_deleted_alias.domain.id }}
</td>
<td>
{{ dom_deleted_alias.domain.user_id }}
</td>
<td>
{{ dom_deleted_alias.created_at }}
</td>
</tr>
</tbody>
</table>
{{ show_user(data.domain_deleted_alias.domain.user) }}
{%- endmacro %}
{% block body %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<form method="post">
<div class="form-group">
<label for="email">
Email to search:
</label>
<input type="text"
class="form-control"
name="email"
value="{{ email or '' }}"/>
</div>
<button type="submit" class="btn btn-primary">
Submit
</button>
</form>
</div>
{% if no_match %}
<div class="border border-dark border-2 mt-1 mb-2 p-3 alert alert-warning"
role="alert">
No user, alias or mailbox found for {{ email }}
</div>
{% endif %}
{% if data.alias %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">
Found Alias {{ data.alias.email }}
</h3>
{{ list_alias(1,[data.alias]) }}
{{ show_user(data.alias.user) }}
{{ list_mailboxes(helper.mailbox_count(data.alias.user), helper.mailbox_list(data.alias.user) ) }}
</div>
{% endif %}
{% if data.user %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">
Found User {{ data.user.email }}
</h3>
{{ show_user(data.user) }}
{{ list_mailboxes(helper.mailbox_count(data.user), helper.mailbox_list(data.user) ) }}
{{ list_alias(helper.alias_count(data.user),helper.alias_list(data.user)) }}
</div>
{% endif %}
{% if data.mailbox %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">
Found Mailbox {{ data.mailbox.email }}
</h3>
{{ list_mailboxes(1, [data.mailbox] ) }}
{{ show_user(data.mailbox.user) }}
</div>
{% endif %}
{% if data.deleted_alias %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">
Found DeletedAlias {{ data.deleted_alias.email }}
</h3>
{{ show_deleted_alias(data.deleted_alias) }}
</div>
{% endif %}
{% if data.domain_deleted_alias %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">
Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}
</h3>
{{ show_domain_deleted_alias(data.domain_deleted_alias) }}
</div>
{% endif %}
{% endblock %}

View File

@ -18,7 +18,7 @@
<br />
For generic questions, i.e. not related to your account, we recommend to post the question on
our
<a href="https://www.reddit.com/r/Simplelogin/">Reddit</a> or <a href="https://forum.simplelogin.io/">our official forum</a>
<a href="https://www.reddit.com/r/Simplelogin/">Reddit</a> or <a href="https://github.com/simple-login/app/discussions">forum</a>
where our community can help answer the question
and other people with the same question can find the answer there.
</div>

View File

@ -43,8 +43,7 @@ Note, if you are a paying Proton Mail user, you automatically receive the premiu
{% endcall %}
{% call text() %}
For any question or feedback, please join our <a href="https://forum.simplelogin.io/">official forum</a>.
If you want to request a feature, please submit it on our <a href="https://github.com/simple-login/app/discussions">GitHub repo</a>.
For any question or if you want to request a feature, please submit it on our <a href="https://github.com/simple-login/app/discussions">forum</a>.
You can also join our
<a href="https://www.reddit.com/r/Simplelogin/">Reddit</a>
or follow our

View File

@ -27,8 +27,7 @@ Firefox: https://addons.mozilla.org/firefox/addon/simplelogin/
Edge: https://microsoftedge.microsoft.com/addons/detail/simpleloginreceive-sen/diacfpipniklenphgljfkmhinphjlfff
Android: https://play.google.com/store/apps/details?id=io.simplelogin.android
iOS: https://apps.apple.com/app/id1494359858
Github repo: https://github.com/simple-login/app/discussions
Official forum: https://forum.simplelogin.io/
Forum: https://github.com/simple-login/app/discussions
Reddit: https://www.reddit.com/r/Simplelogin/
Twitter: https://twitter.com/simple_login

View File

@ -71,10 +71,8 @@ Please note that you can't create more than {{ MAX_NB_EMAIL_FREE_PLAN }} aliases
{% endif %}
{% call text() %}
For any question or feedback,
please join our <a href="https://forum.simplelogin.io/">official forum</a>.
If you want to request a feature,
please submit it on our <a href="https://github.com/simple-login/app/discussions">GitHub repo</a>.
For any question or if you want to request a feature,
please submit it on our <a href="https://github.com/simple-login/app/discussions">forum</a>.
You can also join our
<a href="https://www.reddit.com/r/Simplelogin/">Reddit</a>
or follow our

View File

@ -26,8 +26,6 @@ No worries: all aliases you create during this period will continue to work norm
At any time, you can reach out to us by simply replying to this email.
For any question or feedback, please join our official forum at https://forum.simplelogin.io/
If you want to request a feature, please submit it on our GitHub repo at https://github.com/simple-login/app/discussions
For any question or if you want to request a feature, please submit it on our forum at https://github.com/simple-login/app/discussions
You can also join our Reddit at https://www.reddit.com/r/Simplelogin/ follow our Twitter at https://twitter.com/simplelogin

View File

@ -4,8 +4,13 @@
{{ render_text("Hi") }}
{{ render_text("You have added <b>"+ mailbox_email +"</b> as an additional mailbox.") }}
{{ render_text("To confirm, please click on the button below.") }}
{{ render_button("Confirm mailbox", link) }}
{% if link %}
{{ render_text("To confirm, please click on the button below.") }}
{{ render_button("Confirm mailbox", link) }}
{% else %}
{{ render_text("Please enter <b>"+code+"</b> as your verification code") }}
{% endif %}
{{ render_text("This email will only be valid for the next 15 minutes.") }}
{{ render_text('Thanks,
<br />

View File

@ -5,9 +5,13 @@ Hi
You have added {{mailbox_email}} as an additional mailbox.
{% if link %}
To confirm, please click on this link:
{{link}}
{% else %}
Please enter {{ code }} as your verification code for this mailbox
{% endif %}
This link will only be valid during the next 15 minutes.
{% endblock %}

View File

@ -93,7 +93,7 @@
</li>
<li>
<a class="list-group-item text-white footer-item "
href="https://forum.simplelogin.io">Forum</a>
href="https://github.com/simple-login/app/discussions">Forum</a>
</li>
</ul>
</div>

View File

@ -91,7 +91,7 @@
</a>
</div>
<div class="dropdown-item">
<a href="https://forum.simplelogin.io"
<a href="https://github.com/simple-login/app/discussions"
target="_blank"
rel="noopener noreferrer">
Forum

View File

@ -107,7 +107,7 @@
</a>
</div>
<div class="dropdown-item">
<a href="https://forum.simplelogin.io/"
<a href="https://github.com/simple-login/app/discussions"
target="_blank"
rel="noopener noreferrer">
Forum

View File

@ -19,7 +19,10 @@
<div class="mt-8 text-center">
{% if current_user != None and current_user.is_authenticated %}
<h2 class="text-black-50" style="font-size:2rem">Performing the extension setup...</h2>
<h2 class="text-black-50" style="font-size:2rem">
Automatically performing extension setup.
If the setup doesn't start in a couple seconds click <a onclick="sendSetupMessage()" class="text-primary">here</a>
</h2>
{% else %}
<a class="mx-6 p-4 text-decoration-none"
style="background:black;
@ -41,6 +44,10 @@
{% if current_user != None and current_user.is_authenticated %}
<script type="text/javascript">
function sendSetupMessage(){
const data = { tag: "PERFORM_EXTENSION_SETUP" };
window.postMessage(data, "/");
}
let counterIterations = 5;
let extensionSetupIntervalId = setInterval(function() {
counterIterations--;
@ -48,9 +55,7 @@
clearInterval(extensionSetupIntervalId);
return;
}
const data = { tag: "PERFORM_EXTENSION_SETUP" };
window.postMessage(data, "/");
sendSetupMessage()
}, 300); // Send it many times, in case the extension had not registered the listener yet
</script>
{% endif %}

View File

@ -28,7 +28,7 @@ def test_create_mailbox(flask_client):
)
assert r.status_code == 400
assert r.json == {"error": "gmail.com invalid"}
assert r.json == {"error": "Invalid email"}
def test_create_mailbox_fail_for_free_user(flask_client):
@ -42,7 +42,7 @@ def test_create_mailbox_fail_for_free_user(flask_client):
)
assert r.status_code == 400
assert r.json == {"error": "Only premium plan can add additional mailbox"}
assert r.json == {"error": "Only available for paid plans"}
def test_delete_mailbox(flask_client):

View File

@ -44,6 +44,9 @@ def test_update_settings_alias_generator(flask_client):
def test_update_settings_random_alias_default_domain(flask_client):
user = login(flask_client)
custom_domain = CustomDomain.create(
domain=random_domain(), verified=True, user_id=user.id, flush=True
)
assert user.default_random_alias_domain() == "sl.local"
r = flask_client.patch(
@ -57,6 +60,12 @@ def test_update_settings_random_alias_default_domain(flask_client):
assert r.status_code == 200
assert user.default_random_alias_domain() == "d1.test"
r = flask_client.patch(
"/api/setting", json={"random_alias_default_domain": custom_domain.domain}
)
assert r.status_code == 200
assert user.default_random_alias_domain() == custom_domain.domain
def test_update_settings_sender_format(flask_client):
user = login(flask_client)

View File

@ -23,7 +23,7 @@ _MAX_PER_MINUTE = 3
_ENDPOINT,
methods=["GET"],
)
@limiter.limit(f"{_MAX_PER_MINUTE}/minute")
@limiter.limit(f"{_MAX_PER_MINUTE}/hour")
def rate_limited_endpoint_1():
return "Working", HTTPStatus.OK

View File

@ -0,0 +1,304 @@
from typing import Optional
import arrow
import pytest
from app import mailbox_utils, config
from app.db import Session
from app.mail_sender import mail_sender
from app.models import Mailbox, MailboxActivation, User, Job
from tests.utils import create_new_user, random_email
user: Optional[User] = None
def setup_module():
global user
config.SKIP_MX_LOOKUP_ON_CHECK = True
user = create_new_user()
user.trial_end = None
user.lifetime = True
Session.commit()
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False
def test_free_user_cannot_add_mailbox():
user.lifetime = False
email = random_email()
try:
with pytest.raises(mailbox_utils.OnlyPaidError):
mailbox_utils.create_mailbox(user, email)
finally:
user.lifetime = True
def test_invalid_email():
user.lifetime = True
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.create_mailbox(user, "invalid")
def test_already_used():
user.lifetime = True
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.create_mailbox(user, user.email)
@mail_sender.store_emails_test_decorator
def test_create_mailbox():
email = random_email()
mailbox_utils.create_mailbox(user, email)
mailbox = Mailbox.get_by(email=email)
assert mailbox is not None
assert not mailbox.verified
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation is not None
assert activation.tries == 0
assert len(activation.code) > 6
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(config.URL) > 0
assert mail_contents.find(activation.code) > 0
assert mail_sent.envelope_to == email
@mail_sender.store_emails_test_decorator
def test_create_mailbox_verified():
email = random_email()
output = mailbox_utils.create_mailbox(user, email, verified=True)
assert output.mailbox is not None
assert output.mailbox.verified
assert output.activation is None
mailbox = Mailbox.get_by(email=email)
assert mailbox is not None
assert mailbox.verified
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation is None
assert 0 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_create_mailbox_with_digits():
email = random_email()
output = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_link=False
)
assert output.activation is not None
assert output.activation.tries == 0
assert len(output.activation.code) == 6
mailbox = Mailbox.get_by(email=email)
assert mailbox is not None
assert not mailbox.verified
assert output.mailbox.id == mailbox.id
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation is not None
assert output.activation.mailbox_id == activation.mailbox_id
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(output.activation.code) > 0
assert mail_contents.find(config.URL) == -1
assert mail_sent.envelope_to == email
@mail_sender.store_emails_test_decorator
def test_create_mailbox_without_verification_email():
email = random_email()
output = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_email=False
)
mailbox = Mailbox.get_by(email=email)
assert mailbox is not None
assert not mailbox.verified
assert mailbox.id == output.mailbox.id
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation is not None
assert activation.tries == 0
assert len(activation.code) == 6
assert activation.code == output.activation.code
assert 0 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_send_verification_email():
email = random_email()
mailbox_utils.create_mailbox(user, email, use_digit_codes=True, send_link=False)
mailbox = Mailbox.get_by(email=email)
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
mail_sender.purge_stored_emails()
mailbox_utils.send_verification_email(user, mailbox, activation, send_link=False)
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(activation.code) > 0
assert mail_contents.find(config.URL) == -1
assert mail_sent.envelope_to == email
@mail_sender.store_emails_test_decorator
def test_send_verification_email_with_link():
email = random_email()
mailbox_utils.create_mailbox(user, email, use_digit_codes=True, send_link=False)
mailbox = Mailbox.get_by(email=email)
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
mail_sender.purge_stored_emails()
mailbox_utils.send_verification_email(user, mailbox, activation, send_link=True)
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(activation.code) > 0
assert mail_contents.find(config.URL) > -1
assert mail_sent.envelope_to == email
def test_delete_other_user_mailbox():
other = create_new_user()
mailbox = Mailbox.create(user_id=other.id, email=random_email(), commit=True)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None)
def test_delete_default_mailbox():
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(
user, user.default_mailbox_id, transfer_mailbox_id=None
)
def test_transfer_to_same_mailbox():
email = random_email()
mailbox = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_link=False
).mailbox
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=mailbox.id)
def test_transfer_to_other_users_mailbox():
email = random_email()
mailbox = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_link=False
).mailbox
other = create_new_user()
other_mailbox = Mailbox.create(user_id=other.id, email=random_email(), commit=True)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(
user, mailbox.id, transfer_mailbox_id=other_mailbox.id
)
def test_delete_with_no_transfer():
email = random_email()
mailbox = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_link=False
).mailbox
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None)
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_DELETE_MAILBOX
assert job.payload["mailbox_id"] == mailbox.id
assert job.payload["transfer_mailbox_id"] is None
def test_delete_with_transfer():
mailbox = mailbox_utils.create_mailbox(
user, random_email(), use_digit_codes=True, send_link=False
).mailbox
transfer_mailbox = mailbox_utils.create_mailbox(
user, random_email(), use_digit_codes=True, send_link=False
).mailbox
mailbox_utils.delete_mailbox(
user, mailbox.id, transfer_mailbox_id=transfer_mailbox.id
)
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_DELETE_MAILBOX
assert job.payload["mailbox_id"] == mailbox.id
assert job.payload["transfer_mailbox_id"] == transfer_mailbox.id
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None)
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_DELETE_MAILBOX
assert job.payload["mailbox_id"] == mailbox.id
assert job.payload["transfer_mailbox_id"] is None
def test_verify_non_existing_mailbox():
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.verify_mailbox_code(user, 999999999, "9999999")
def test_verify_already_verified_mailbox():
mailbox = Mailbox.create(
user_id=user.id, email=random_email(), verified=True, commit=True
)
mbox = mailbox_utils.verify_mailbox_code(user, mailbox.id, "9999999")
assert mbox.id == mailbox.id
def test_verify_other_users_mailbox():
other = create_new_user()
mailbox = Mailbox.create(
user_id=other.id, email=random_email(), verified=False, commit=True
)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.verify_mailbox_code(user, mailbox.id, "9999999")
@mail_sender.store_emails_test_decorator
def test_verify_fail():
output = mailbox_utils.create_mailbox(user, random_email())
for i in range(mailbox_utils.MAX_ACTIVATION_TRIES - 1):
try:
mailbox_utils.verify_mailbox_code(
user, output.mailbox.id, output.activation.code + "nop"
)
assert False, f"test {i}"
except mailbox_utils.CannotVerifyError:
activation = MailboxActivation.get_by(mailbox_id=output.mailbox.id)
assert activation.tries == i + 1
@mail_sender.store_emails_test_decorator
def test_verify_too_may():
output = mailbox_utils.create_mailbox(user, random_email())
output.activation.tries = mailbox_utils.MAX_ACTIVATION_TRIES
Session.commit()
with pytest.raises(mailbox_utils.CannotVerifyError):
mailbox_utils.verify_mailbox_code(
user, output.mailbox.id, output.activation.code
)
@mail_sender.store_emails_test_decorator
def test_verify_too_old_code():
output = mailbox_utils.create_mailbox(user, random_email())
output.activation.created_at = arrow.now().shift(minutes=-30)
Session.commit()
with pytest.raises(mailbox_utils.CannotVerifyError):
mailbox_utils.verify_mailbox_code(
user, output.mailbox.id, output.activation.code
)
@mail_sender.store_emails_test_decorator
def test_verify_ok():
output = mailbox_utils.create_mailbox(user, random_email())
mailbox_utils.verify_mailbox_code(user, output.mailbox.id, output.activation.code)
activation = MailboxActivation.get_by(mailbox_id=output.mailbox.id)
assert activation is None
mailbox = Mailbox.get(id=output.mailbox.id)
assert mailbox.verified

View File

@ -365,12 +365,22 @@ def test_sync_event_dead_letter():
commit=True,
)
# create event with too many retries
max_retries = 5
e5 = SyncEvent.create(
content=b"content",
retry_count=max_retries + 1,
created_at=arrow.now(),
commit=True,
)
# get dead letter events
dead_letter_events = SyncEvent.get_dead_letter(
older_than=arrow.now().shift(minutes=-10)
older_than=arrow.now().shift(minutes=-10), max_retries=max_retries
)
assert len(dead_letter_events) == 2
assert e1 in dead_letter_events
assert e2 in dead_letter_events
assert e3 not in dead_letter_events
assert e4 not in dead_letter_events
assert e5 not in dead_letter_events

View File

View File

@ -0,0 +1,128 @@
import pytest
from app import user_settings
from app.db import Session
from app.models import User, CustomDomain, SLDomain
from tests.utils import random_token, create_new_user
user_id: int = 0
custom_domain_name: str = ""
sl_domain_name: str = ""
def setup_module():
global user_id, custom_domain_name, sl_domain_name
user = create_new_user()
user.trial_end = None
user_id = user.id
custom_domain_name = CustomDomain.create(
user_id=user_id,
catch_all=True,
domain=random_token() + ".com",
verified=True,
flush=True,
).domain
sl_domain_name = SLDomain.create(
domain=random_token() + ".com",
premium_only=False,
flush=True,
order=5,
hidden=False,
).domain
def test_set_default_no_domain():
user = User.get(user_id)
user.default_alias_public_domain_id = SLDomain.get_by(domain=sl_domain_name).id
user.default_alias_private_domain_id = CustomDomain.get_by(
domain=custom_domain_name
).id
Session.flush()
user_settings.set_default_alias_domain(user, None)
assert user.default_alias_public_domain_id is None
assert user.default_alias_custom_domain_id is None
def test_set_premium_sl_domain_with_non_premium_user():
user = User.get(user_id)
user.lifetime = False
domain = SLDomain.get_by(domain=sl_domain_name)
domain.premium_only = True
Session.flush()
with pytest.raises(user_settings.CannotSetAlias):
user_settings.set_default_alias_domain(user, sl_domain_name)
def test_set_hidden_sl_domain():
user = User.get(user_id)
domain = SLDomain.get_by(domain=sl_domain_name)
domain.hidden = True
domain.premium_only = False
Session.flush()
with pytest.raises(user_settings.CannotSetAlias):
user_settings.set_default_alias_domain(user, sl_domain_name)
def test_set_sl_domain():
user = User.get(user_id)
user.lifetime = False
domain = SLDomain.get_by(domain=sl_domain_name)
domain.hidden = False
domain.premium_only = False
Session.flush()
user_settings.set_default_alias_domain(user, sl_domain_name)
assert user.default_alias_public_domain_id == domain.id
assert user.default_alias_custom_domain_id is None
def test_set_sl_premium_domain():
user = User.get(user_id)
user.lifetime = True
domain = SLDomain.get_by(domain=sl_domain_name)
domain.hidden = False
domain.premium_only = True
Session.flush()
user_settings.set_default_alias_domain(user, sl_domain_name)
assert user.default_alias_public_domain_id == domain.id
assert user.default_alias_custom_domain_id is None
def test_set_other_user_custom_domain():
user = User.get(user_id)
user.lifetime = True
other_user_domain_name = CustomDomain.create(
user_id=create_new_user().id,
catch_all=True,
domain=random_token() + ".com",
verified=True,
).domain
Session.flush()
with pytest.raises(user_settings.CannotSetAlias):
user_settings.set_default_alias_domain(user, other_user_domain_name)
def test_set_unverified_custom_domain():
user = User.get(user_id)
user.lifetime = True
domain = CustomDomain.get_by(domain=custom_domain_name)
domain.verified = False
Session.flush()
with pytest.raises(user_settings.CannotSetAlias):
user_settings.set_default_alias_domain(user, custom_domain_name)
def test_set_custom_domain():
user = User.get(user_id)
user.lifetime = True
domain = CustomDomain.get_by(domain=custom_domain_name)
domain.verified = True
Session.flush()
user_settings.set_default_alias_domain(user, custom_domain_name)
assert user.default_alias_public_domain_id is None
assert user.default_alias_custom_domain_id == domain.id
def test_set_invalid_custom_domain():
user = User.get(user_id)
with pytest.raises(user_settings.CannotSetAlias):
user_settings.set_default_alias_domain(user, "invalid_nop" + random_token())

View File

@ -0,0 +1,61 @@
from typing import Optional
import pytest
from app import mailbox_utils, user_settings, config
from app.db import Session
from app.models import User
from tests.utils import random_email, create_new_user
user: Optional[User] = None
def setup_module():
global user
config.SKIP_MX_LOOKUP_ON_CHECK = True
user = create_new_user()
user.trial_end = None
user.lifetime = True
Session.commit()
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False # noqa: F821
def test_set_default_mailbox():
other = create_new_user()
output = mailbox_utils.create_mailbox(
other,
random_email(),
use_digit_codes=True,
send_link=False,
)
output.mailbox.verified = True
Session.commit()
user_settings.set_default_mailbox(other, output.mailbox.id)
other = User.get(other.id)
assert other.default_mailbox_id == output.mailbox.id
def test_cannot_set_unverified():
output = mailbox_utils.create_mailbox(
user,
random_email(),
use_digit_codes=True,
send_link=False,
)
with pytest.raises(user_settings.CannotSetMailbox):
user_settings.set_default_mailbox(user, output.mailbox.id)
def test_cannot_default_other_user_mailbox():
other = create_new_user()
mailbox = mailbox_utils.create_mailbox(
other,
random_email(),
use_digit_codes=True,
send_link=False,
).mailbox
with pytest.raises(user_settings.CannotSetMailbox):
user_settings.set_default_mailbox(user, mailbox.id)