Compare commits

...

18 Commits

Author SHA1 Message Date
88f270c6a1 4.34.1 2023-08-09 12:00:05 +01:00
0962b1cf29 Update .drone.yml 2023-08-06 17:56:31 +00:00
6051d72691 4.33.3 2023-08-06 17:51:04 +01:00
c31a75a9ef Update README.md 2023-08-06 16:04:57 +00:00
ef289385ff Update README.md 2023-08-06 16:04:47 +00:00
9b12a2ad33 Update README.md 2023-08-06 16:04:41 +00:00
8eb19d88f3 Remove provenance [CI SKIP] 2023-08-06 16:01:04 +00:00
e36e9d3077 4.32.4 2023-08-02 16:49:54 +01:00
b2430cbc5b 4.32.1 2023-07-12 11:00:04 +00:00
1258115397 4.32.0 2023-07-11 11:00:05 +00:00
38c134d903 4.31.0 2023-06-30 11:00:06 +00:00
cd77e4cc2d 4.30.1 2023-06-28 11:00:03 +00:00
87aedf3207 4.30.0 2023-06-27 11:00:04 +00:00
3523c9fc15 4.29.4 2023-06-07 11:00:05 +00:00
a6f4995cb5 4.29.3 2023-06-01 11:00:05 +00:00
727f61a35e 4.28.2 2023-05-16 11:00:09 +00:00
ce5124605a 4.28.1 2023-05-10 11:00:05 +00:00
2c82b03f8d 4.27.0 2023-04-25 11:00:05 +00:00
81 changed files with 2679 additions and 957 deletions

View File

@ -17,6 +17,7 @@ steps:
image: thegeeklab/drone-docker-buildx
privileged: true
settings:
provenance: false
dockerfile: app/Dockerfile
context: app
registry: git.mrmeeb.stream
@ -35,6 +36,7 @@ steps:
status:
- success
- failure
- killed
settings:
webhook:
from_secret: slack_webhook

View File

@ -1,9 +1,7 @@
# Simple Login
# SimpleLogin
[![Build Status](https://drone.mrmeeb.stream/api/badges/MrMeeb/simple-login/status.svg?ref=refs/heads/main)](https://drone.mrmeeb.stream/MrMeeb/simple-login)
This repo exists to automatically capture any releases of the SaaS edition of SimpleLogin. It checks the simplelogin/app GitHub repo once a day, and builds the latest release automatically if it is newer than the currently built version.
This repo exists to automatically capture any releases of the SaaS edition of SimpleLogin. It checks once a day, and builds the latest one automatically if it is newer than the currentlty built version.
I did this to simplify deployment of my self-hosted SimpleLogin instance. SimpleLogin do not provide an up-to-date version for self-hosting, leaving you with the options of either running a very outdated version with no app support, a beta version, or their `simplelogin/app-ci` version. This last option works well if you use an x86 machine, but I'm running SimpleLogin on an ARM machine. Since I don't want to have to build containers on the machine itself, this repo handles that for me.
This exists to simplify deployment of SimpleLogin in a self-hosted capacity, while also allowing the use of the latest version; SimpleLogin do not provide an up-to-date version for this use.
The image is built for amd64 and arm64 devices.
As a result, this image is built for both amd64 and arm64 devices.

View File

@ -34,7 +34,7 @@ poetry install
On Mac, sometimes you might need to install some other packages via `brew`:
```bash
brew install pkg-config libffi openssl postgresql
brew install pkg-config libffi openssl postgresql@13
```
You also need to install `gpg` tool, on Mac it can be done with:
@ -169,6 +169,12 @@ For HTML templates, we use `djlint`. Before creating a pull request, please run
poetry run djlint --check templates
```
If some files aren't properly formatted, you can format all files with
```bash
poetry run djlint --reformat .
```
## Test sending email
[swaks](http://www.jetmore.org/john/code/swaks/) is used for sending test emails to the `email_handler`.

View File

@ -23,10 +23,10 @@ COPY poetry.lock pyproject.toml ./
# Install and setup poetry
RUN pip install -U pip \
&& apt-get update \
&& apt install -y curl netcat gcc python3-dev gnupg git libre2-dev \
&& apt install -y curl netcat-traditional gcc python3-dev gnupg git libre2-dev \
&& curl -sSL https://install.python-poetry.org | python3 - \
# Remove curl and netcat from the image
&& apt-get purge -y curl netcat \
&& apt-get purge -y curl netcat-traditional \
# Run poetry
&& poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --no-root \

View File

@ -207,13 +207,14 @@ def process_login_case(
) -> LinkResult:
# Sanitize email just in case
link_request.email = sanitize_email(link_request.email)
check_alias(link_request.email)
# Try to find a SimpleLogin user registered with that partner user id
partner_user = PartnerUser.get_by(
partner_id=partner.id, external_user_id=link_request.external_user_id
)
if partner_user is None:
# We didn't find any SimpleLogin user registered with that partner user id
# Make sure they aren't using an alias as their link email
check_alias(link_request.email)
# Try to find it using the partner's e-mail address
user = User.get_by(email=link_request.email)
return get_login_strategy(link_request, user, partner).process()

View File

@ -6,7 +6,7 @@ from typing import Optional
import itsdangerous
from app import config
from app.log import LOG
from app.models import User, AliasOptions
from app.models import User, AliasOptions, SLDomain
signer = itsdangerous.TimestampSigner(config.CUSTOM_ALIAS_SECRET)
@ -105,10 +105,7 @@ def get_alias_suffixes(
for custom_domain in user_custom_domains:
if custom_domain.random_prefix_generation:
suffix = (
"."
+ user.get_random_alias_suffix(custom_domain)
+ "@"
+ custom_domain.domain
f".{user.get_random_alias_suffix(custom_domain)}@{custom_domain.domain}"
)
alias_suffix = AliasSuffix(
is_custom=True,
@ -123,7 +120,7 @@ def get_alias_suffixes(
else:
alias_suffixes.append(alias_suffix)
suffix = "@" + custom_domain.domain
suffix = f"@{custom_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=True,
suffix=suffix,
@ -144,16 +141,13 @@ def get_alias_suffixes(
alias_suffixes.append(alias_suffix)
# then SimpleLogin domain
for sl_domain in user.get_sl_domains(alias_options=alias_options):
suffix = (
(
""
if config.DISABLE_ALIAS_SUFFIX
else "." + user.get_random_alias_suffix()
)
+ "@"
+ sl_domain.domain
sl_domains = user.get_sl_domains(alias_options=alias_options)
default_domain_found = False
for sl_domain in sl_domains:
prefix = (
"" if config.DISABLE_ALIAS_SUFFIX else f".{user.get_random_alias_suffix()}"
)
suffix = f"{prefix}@{sl_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=False,
suffix=suffix,
@ -162,11 +156,36 @@ def get_alias_suffixes(
domain=sl_domain.domain,
mx_verified=True,
)
# put the default domain to top
if user.default_alias_public_domain_id == sl_domain.id:
alias_suffixes.insert(0, alias_suffix)
else:
# No default or this is not the default
if (
user.default_alias_public_domain_id is None
or user.default_alias_public_domain_id != sl_domain.id
):
alias_suffixes.append(alias_suffix)
else:
default_domain_found = True
alias_suffixes.insert(0, alias_suffix)
if not default_domain_found:
domain_conditions = {"id": user.default_alias_public_domain_id, "hidden": False}
if not user.is_premium():
domain_conditions["premium_only"] = False
sl_domain = SLDomain.get_by(**domain_conditions)
if sl_domain:
prefix = (
""
if config.DISABLE_ALIAS_SUFFIX
else f".{user.get_random_alias_suffix()}"
)
suffix = f"{prefix}@{sl_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=False,
suffix=suffix,
signed_suffix=signer.sign(suffix).decode(),
is_premium=sl_domain.premium_only,
domain=sl_domain.domain,
mx_verified=True,
)
alias_suffixes.insert(0, alias_suffix)
return alias_suffixes

View File

@ -57,6 +57,8 @@ def get_user_if_alias_would_auto_create(
domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user
)
if DomainDeletedAlias.get_by(email=address):
return None
if domain_and_rule:
return domain_and_rule[0].user
directory = check_if_alias_can_be_auto_created_for_a_directory(

View File

@ -9,6 +9,7 @@ from requests import RequestException
from app.api.base import api_bp, require_api_auth
from app.config import APPLE_API_SECRET, MACAPP_APPLE_API_SECRET
from app.subscription_webhook import execute_subscription_webhook
from app.db import Session
from app.log import LOG
from app.models import PlanEnum, AppleSubscription
@ -50,6 +51,7 @@ def apple_process_payment():
apple_sub = verify_receipt(receipt_data, user, password)
if apple_sub:
execute_subscription_webhook(user)
return jsonify(ok=True), 200
return jsonify(error="Processing failed"), 400
@ -282,6 +284,7 @@ def apple_update_notification():
apple_sub.plan = plan
apple_sub.product_id = transaction["product_id"]
Session.commit()
execute_subscription_webhook(user)
return jsonify(ok=True), 200
else:
LOG.w(
@ -554,6 +557,7 @@ def verify_receipt(receipt_data, user, password) -> Optional[AppleSubscription]:
product_id=latest_transaction["product_id"],
)
execute_subscription_webhook(user)
Session.commit()
return apple_sub

View File

@ -13,8 +13,8 @@ from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
is_valid_email,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Mailbox, Job
from app.utils import sanitize_email

View File

@ -1,4 +1,5 @@
import base64
import dataclasses
from io import BytesIO
from typing import Optional
@ -7,6 +8,7 @@ from flask import jsonify, g, request, make_response
from app import s3, config
from app.api.base import api_bp, require_api_auth
from app.config import SESSION_COOKIE_NAME
from app.dashboard.views.index import get_stats
from app.db import Session
from app.models import ApiKey, File, PartnerUser, User
from app.proton.utils import get_proton_partner
@ -136,3 +138,22 @@ def logout():
response.delete_cookie(SESSION_COOKIE_NAME)
return response
@api_bp.route("/stats")
@require_api_auth
def user_stats():
"""
Return stats
Output as json
- nb_alias
- nb_forward
- nb_reply
- nb_block
"""
user = g.user
stats = get_stats(user)
return jsonify(dataclasses.asdict(stats))

View File

@ -1,4 +1,4 @@
from flask import request, render_template, redirect, url_for, flash, g
from flask import request, render_template, flash, g
from flask_wtf import FlaskForm
from wtforms import StringField, validators
@ -16,7 +16,7 @@ class ForgotPasswordForm(FlaskForm):
@auth_bp.route("/forgot_password", methods=["GET", "POST"])
@limiter.limit(
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
"10/hour", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
)
def forgot_password():
form = ForgotPasswordForm(request.form)
@ -37,6 +37,5 @@ def forgot_password():
if user:
LOG.d("Send forgot password email to %s", user)
send_reset_password_email(user)
return redirect(url_for("auth.forgot_password"))
return render_template("auth/forgot_password.html", form=form)

View File

@ -60,8 +60,8 @@ def reset_password():
# this can be served to activate user too
user.activated = True
# remove the reset password code
ResetPasswordCode.delete(reset_password_code.id)
# remove all reset password codes
ResetPasswordCode.filter_by(user_id=user.id).delete()
# change the alternative_id to log user out on other browsers
user.alternative_id = str(uuid.uuid4())

View File

@ -532,3 +532,10 @@ if ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT:
SKIP_MX_LOOKUP_ON_CHECK = False
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)
MAX_API_KEYS = int(os.environ.get("MAX_API_KEYS", 30))
UPCLOUD_USERNAME = os.environ.get("UPCLOUD_USERNAME", None)
UPCLOUD_PASSWORD = os.environ.get("UPCLOUD_PASSWORD", None)
UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)

View File

@ -13,10 +13,10 @@ from app import config, parallel_limiter
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import (
is_valid_email,
generate_reply_email,
parse_full_address,
)
from app.email_validation import is_valid_email
from app.errors import (
CannotCreateContactForReverseAlias,
ErrContactErrorUpgradeNeeded,

View File

@ -3,9 +3,11 @@ from flask_login import login_required, current_user
from flask_wtf import FlaskForm
from wtforms import StringField, validators
from app import config
from app.dashboard.base import dashboard_bp
from app.dashboard.views.enter_sudo import sudo_required
from app.db import Session
from app.extensions import limiter
from app.models import ApiKey
from app.utils import CSRFValidationForm
@ -14,9 +16,34 @@ class NewApiKeyForm(FlaskForm):
name = StringField("Name", validators=[validators.DataRequired()])
def clean_up_unused_or_old_api_keys(user_id: int):
total_keys = ApiKey.filter_by(user_id=user_id).count()
if total_keys <= config.MAX_API_KEYS:
return
# Remove oldest unused
for api_key in (
ApiKey.filter_by(user_id=user_id, last_used=None)
.order_by(ApiKey.created_at.asc())
.all()
):
Session.delete(api_key)
total_keys -= 1
if total_keys <= config.MAX_API_KEYS:
return
# Clean up oldest used
for api_key in (
ApiKey.filter_by(user_id=user_id).order_by(ApiKey.last_used.asc()).all()
):
Session.delete(api_key)
total_keys -= 1
if total_keys <= config.MAX_API_KEYS:
return
@dashboard_bp.route("/api_key", methods=["GET", "POST"])
@login_required
@sudo_required
@limiter.limit("10/hour")
def api_key():
api_keys = (
ApiKey.filter(ApiKey.user_id == current_user.id)
@ -50,6 +77,7 @@ def api_key():
elif request.form.get("form-name") == "create":
if new_api_key_form.validate():
clean_up_unused_or_old_api_keys(current_user.id)
new_api_key = ApiKey.create(
name=new_api_key_form.name.data, user_id=current_user.id
)

View File

@ -68,9 +68,14 @@ def coupon_route():
)
return redirect(request.url)
coupon.used_by_user_id = current_user.id
coupon.used = True
Session.commit()
updated = (
Session.query(Coupon)
.filter_by(code=code, used=False)
.update({"used_by_user_id": current_user.id, "used": True})
)
if updated != 1:
flash("Coupon is not valid", "error")
return redirect(request.url)
manual_sub: ManualSubscription = ManualSubscription.get_by(
user_id=current_user.id

View File

@ -8,6 +8,7 @@ from wtforms import PasswordField, validators
from app.config import CONNECT_WITH_PROTON
from app.dashboard.base import dashboard_bp
from app.extensions import limiter
from app.log import LOG
from app.models import PartnerUser
from app.proton.utils import get_proton_partner
@ -21,6 +22,7 @@ class LoginForm(FlaskForm):
@dashboard_bp.route("/enter_sudo", methods=["GET", "POST"])
@limiter.limit("3/minute")
@login_required
def enter_sudo():
password_check_form = LoginForm()

View File

@ -1,3 +1,7 @@
import base64
import binascii
import json
import arrow
from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
@ -15,8 +19,8 @@ from app.email_utils import (
mailbox_already_used,
render,
send_email,
is_valid_email,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Mailbox, Job
from app.utils import CSRFValidationForm
@ -180,7 +184,9 @@ def mailbox_route():
def send_verification_email(user, mailbox):
s = TimestampSigner(MAILBOX_SECRET)
mailbox_id_signed = s.sign(str(mailbox.id)).decode()
encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8")
b64_data = base64.urlsafe_b64encode(encoded_data)
mailbox_id_signed = s.sign(b64_data).decode()
verification_url = (
URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}"
)
@ -205,22 +211,34 @@ def send_verification_email(user, mailbox):
@dashboard_bp.route("/mailbox_verify")
def mailbox_verify():
s = TimestampSigner(MAILBOX_SECRET)
mailbox_id = request.args.get("mailbox_id")
mailbox_verify_request = request.args.get("mailbox_id")
try:
r_id = int(s.unsign(mailbox_id, max_age=900))
mailbox_raw_data = s.unsign(mailbox_verify_request, max_age=900)
except Exception:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
else:
mailbox = Mailbox.get(r_id)
if not mailbox:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
try:
decoded_data = base64.urlsafe_b64decode(mailbox_raw_data)
except binascii.Error:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_data = json.loads(decoded_data)
if not isinstance(mailbox_data, list) or len(mailbox_data) != 2:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_id = mailbox_data[0]
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_email = mailbox_data[1]
if mailbox_email != mailbox.email:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox.verified = True
Session.commit()
mailbox.verified = True
Session.commit()
LOG.d("Mailbox %s is verified", mailbox)
LOG.d("Mailbox %s is verified", mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)

View File

@ -30,7 +30,7 @@ class ChangeEmailForm(FlaskForm):
@dashboard_bp.route("/mailbox/<int:mailbox_id>/", methods=["GET", "POST"])
@login_required
def mailbox_detail_route(mailbox_id):
mailbox = Mailbox.get(mailbox_id)
mailbox: Mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != current_user.id:
flash("You cannot see this page", "warning")
return redirect(url_for("dashboard.index"))
@ -144,6 +144,15 @@ def mailbox_detail_route(mailbox_id):
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
if mailbox.is_proton():
flash(
"Enabling PGP for a Proton Mail mailbox is redundant and does not add any security benefit",
"info",
)
return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
mailbox.pgp_public_key = request.form.get("pgp")
try:
mailbox.pgp_finger_print = load_public_key_and_check(

View File

@ -198,6 +198,16 @@ def setting():
)
return redirect(url_for("dashboard.setting"))
if current_user.profile_picture_id is not None:
current_profile_file = File.get_by(
id=current_user.profile_picture_id
)
if (
current_profile_file is not None
and current_profile_file.user_id == current_user.id
):
s3.delete(current_profile_file.path)
file_path = random_string(30)
file = File.create(user_id=current_user.id, path=file_path)
@ -451,8 +461,13 @@ def send_change_email_confirmation(user: User, email_change: EmailChange):
@dashboard_bp.route("/resend_email_change", methods=["GET", "POST"])
@limiter.limit("5/hour")
@login_required
def resend_email_change():
form = CSRFValidationForm()
if not form.validate():
flash("Invalid request. Please try again", "warning")
return redirect(url_for("dashboard.setting"))
email_change = EmailChange.get_by(user_id=current_user.id)
if email_change:
# extend email change expiration
@ -472,6 +487,10 @@ def resend_email_change():
@dashboard_bp.route("/cancel_email_change", methods=["GET", "POST"])
@login_required
def cancel_email_change():
form = CSRFValidationForm()
if not form.validate():
flash("Invalid request. Please try again", "warning")
return redirect(url_for("dashboard.setting"))
email_change = EmailChange.get_by(user_id=current_user.id)
if email_change:
EmailChange.delete(email_change.id)

View File

@ -34,7 +34,7 @@ def get_cname_record(hostname) -> Optional[str]:
def get_mx_domains(hostname) -> [(int, str)]:
"""return list of (priority, domain name).
"""return list of (priority, domain name) sorted by priority (lowest priority first)
domain name ends with a "." at the end.
"""
try:
@ -50,7 +50,7 @@ def get_mx_domains(hostname) -> [(int, str)]:
ret.append((int(parts[0]), parts[1]))
return ret
return sorted(ret, key=lambda prio_domain: prio_domain[0])
_include_spf = "include:"

View File

@ -20,6 +20,7 @@ X_SPAM_STATUS = "X-Spam-Status"
LIST_UNSUBSCRIBE = "List-Unsubscribe"
LIST_UNSUBSCRIBE_POST = "List-Unsubscribe-Post"
RETURN_PATH = "Return-Path"
AUTHENTICATION_RESULTS = "Authentication-Results"
# headers used to DKIM sign in order of preference
DKIM_HEADERS = [
@ -32,6 +33,7 @@ DKIM_HEADERS = [
SL_DIRECTION = "X-SimpleLogin-Type"
SL_EMAIL_LOG_ID = "X-SimpleLogin-EmailLog-ID"
SL_ENVELOPE_FROM = "X-SimpleLogin-Envelope-From"
SL_ORIGINAL_FROM = "X-SimpleLogin-Original-From"
SL_ENVELOPE_TO = "X-SimpleLogin-Envelope-To"
SL_CLIENT_IP = "X-SimpleLogin-Client-IP"

View File

@ -828,19 +828,6 @@ def should_add_dkim_signature(domain: str) -> bool:
return False
def is_valid_email(email_address: str) -> bool:
"""
Used to check whether an email address is valid
NOT run MX check.
NOT allow unicode.
"""
try:
validate_email(email_address, check_deliverability=False, allow_smtputf8=False)
return True
except EmailNotValidError:
return False
class EmailEncoding(enum.Enum):
BASE64 = "base64"
QUOTED = "quoted-printable"
@ -951,6 +938,8 @@ def add_header(msg: Message, text_header, html_header=None) -> Message:
for part in msg.get_payload():
if isinstance(part, Message):
new_parts.append(add_header(part, text_header, html_header))
elif isinstance(part, str):
new_parts.append(MIMEText(part))
else:
new_parts.append(part)
clone_msg = copy(msg)
@ -959,7 +948,14 @@ def add_header(msg: Message, text_header, html_header=None) -> Message:
elif content_type in ("multipart/mixed", "multipart/signed"):
new_parts = []
parts = list(msg.get_payload())
payload = msg.get_payload()
if isinstance(payload, str):
# The message is badly formatted inject as new
new_parts = [MIMEText(text_header, "plain"), MIMEText(payload, "plain")]
clone_msg = copy(msg)
clone_msg.set_payload(new_parts)
return clone_msg
parts = list(payload)
LOG.d("only add header for the first part for %s", content_type)
for ix, part in enumerate(parts):
if ix == 0:
@ -1107,26 +1103,6 @@ def is_reverse_alias(address: str) -> bool:
)
# allow also + and @ that are present in a reply address
_ALLOWED_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.+@"
def normalize_reply_email(reply_email: str) -> str:
"""Handle the case where reply email contains *strange* char that was wrongly generated in the past"""
if not reply_email.isascii():
reply_email = convert_to_id(reply_email)
ret = []
# drop all control characters like shift, separator, etc
for c in reply_email:
if c not in _ALLOWED_CHARS:
ret.append("_")
else:
ret.append(c)
return "".join(ret)
def should_disable(alias: Alias) -> (bool, str):
"""
Return whether an alias should be disabled and if yes, the reason why

View File

@ -0,0 +1,38 @@
from email_validator import (
validate_email,
EmailNotValidError,
)
from app.utils import convert_to_id
# allow also + and @ that are present in a reply address
_ALLOWED_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.+@"
def is_valid_email(email_address: str) -> bool:
"""
Used to check whether an email address is valid
NOT run MX check.
NOT allow unicode.
"""
try:
validate_email(email_address, check_deliverability=False, allow_smtputf8=False)
return True
except EmailNotValidError:
return False
def normalize_reply_email(reply_email: str) -> str:
"""Handle the case where reply email contains *strange* char that was wrongly generated in the past"""
if not reply_email.isascii():
reply_email = convert_to_id(reply_email)
ret = []
# drop all control characters like shift, separator, etc
for c in reply_email:
if c not in _ALLOWED_CHARS:
ret.append("_")
else:
ret.append(c)
return "".join(ret)

View File

@ -84,6 +84,14 @@ class ErrAddressInvalid(SLException):
return f"{self.address} is not a valid email address"
class InvalidContactEmailError(SLException):
def __init__(self, website_email: str): # noqa: F821
self.website_email = website_email
def error_for_user(self) -> str:
return f"Cannot create contact with invalid email {self.website_email}"
class ErrContactAlreadyExists(SLException):
"""raised when a contact already exists"""

View File

@ -74,8 +74,8 @@ class UnsubscribeEncoder:
)
signed_data = cls._get_signer().sign(serialized_data).decode("utf-8")
encoded_request = f"{UNSUB_PREFIX}.{signed_data}"
if len(encoded_request) > 256:
LOG.e("Encoded request is longer than 256 chars")
if len(encoded_request) > 512:
LOG.w("Encoded request is longer than 512 chars")
return encoded_request
@staticmethod

View File

@ -9,6 +9,7 @@ from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeOriginalData,
)
from app.log import LOG
from app.models import Alias, Contact, UnsubscribeBehaviourEnum
@ -30,6 +31,7 @@ class UnsubscribeGenerator:
"""
unsubscribe_data = message[headers.LIST_UNSUBSCRIBE]
if not unsubscribe_data:
LOG.info("Email has no unsubscribe header")
return message
raw_methods = [method.strip() for method in unsubscribe_data.split(",")]
mailto_unsubs = None
@ -44,7 +46,9 @@ class UnsubscribeGenerator:
if url_data.scheme == "mailto":
query_data = urllib.parse.parse_qs(url_data.query)
mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0])
LOG.debug(f"Unsub is mailto to {mailto_unsubs}")
else:
LOG.debug(f"Unsub has {url_data.scheme} scheme")
other_unsubs.append(method)
# If there are non mailto unsubscribe methods, use those in the header
if other_unsubs:
@ -56,18 +60,19 @@ class UnsubscribeGenerator:
add_or_replace_header(
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
LOG.debug(f"Adding click unsub methods to header {other_unsubs}")
return message
if not mailto_unsubs:
message = delete_header(message, headers.LIST_UNSUBSCRIBE)
message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
elif not mailto_unsubs:
LOG.debug("No unsubs. Deleting all unsub headers")
delete_header(message, headers.LIST_UNSUBSCRIBE)
delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
return message
return self._add_unsubscribe_header(
message,
UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
),
unsub_data = UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
)
LOG.debug(f"Adding unsub data {unsub_data}")
return self._add_unsubscribe_header(message, unsub_data)
def _add_unsubscribe_header(
self, message: Message, unsub: UnsubscribeData

View File

@ -41,7 +41,7 @@ from app.models import (
class ExportUserDataJob:
REMOVE_FIELDS = {
"User": ("otp_secret",),
"User": ("otp_secret", "password"),
"Alias": ("ts_vector", "transfer_token", "hibp_last_check"),
"CustomDomain": ("ownership_txt_token",),
}

View File

@ -32,6 +32,7 @@ class SendRequest:
rcpt_options: Dict = {}
is_forward: bool = False
ignore_smtp_errors: bool = False
retries: int = 0
def to_bytes(self) -> bytes:
if not config.SAVE_UNSENT_DIR:
@ -45,6 +46,7 @@ class SendRequest:
"mail_options": self.mail_options,
"rcpt_options": self.rcpt_options,
"is_forward": self.is_forward,
"retries": self.retries,
}
return json.dumps(data).encode("utf-8")
@ -65,8 +67,33 @@ class SendRequest:
mail_options=decoded_data["mail_options"],
rcpt_options=decoded_data["rcpt_options"],
is_forward=decoded_data["is_forward"],
retries=decoded_data.get("retries", 1),
)
def save_request_to_unsent_dir(self, prefix: str = "DeliveryFail"):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
self.save_request_to_file(file_path)
@staticmethod
def save_request_to_failed_dir(self, prefix: str = "DeliveryRetryFail"):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
dir_name = os.path.join(config.SAVE_UNSENT_DIR, "failed")
if not os.path.isdir(dir_name):
os.makedirs(dir_name)
file_path = os.path.join(dir_name, file_name)
self.save_request_to_file(file_path)
def save_request_to_file(self, file_path: str):
file_contents = self.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
class MailSender:
def __init__(self):
@ -171,21 +198,9 @@ class MailSender:
f"Could not send message to smtp server {config.POSTFIX_SERVER}:{config.POSTFIX_PORT}"
)
if config.SAVE_UNSENT_DIR:
self._save_request_to_unsent_dir(send_request)
send_request.save_request_to_unsent_dir()
return False
def _save_request_to_unsent_dir(
self, send_request: SendRequest, prefix: str = "DeliveryFail"
):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
file_contents = send_request.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
mail_sender = MailSender()
@ -219,6 +234,7 @@ def load_unsent_mails_from_fs_and_resend():
LOG.i(f"Trying to re-deliver email {filename}")
try:
send_request = SendRequest.load_from_file(full_file_path)
send_request.retries += 1
except Exception as e:
LOG.e(f"Cannot load {filename}. Error {e}")
continue
@ -230,6 +246,11 @@ def load_unsent_mails_from_fs_and_resend():
"DeliverUnsentEmail", {"delivered": "true"}
)
else:
if send_request.retries > 2:
os.unlink(full_file_path)
send_request.save_request_to_failed_dir()
else:
send_request.save_request_to_file(full_file_path)
newrelic.agent.record_custom_event(
"DeliverUnsentEmail", {"delivered": "false"}
)

View File

@ -30,6 +30,8 @@ from sqlalchemy_utils import ArrowType
from app import config
from app import s3
from app.db import Session
from app.dns_utils import get_mx_domains
from app.errors import (
AliasInTrashError,
DirectoryInTrashError,
@ -341,7 +343,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
sa.Boolean, default=True, nullable=False, server_default="1"
)
activated = sa.Column(sa.Boolean, default=False, nullable=False)
activated = sa.Column(sa.Boolean, default=False, nullable=False, index=True)
# an account can be disabled if having harmful behavior
disabled = sa.Column(sa.Boolean, default=False, nullable=False, server_default="0")
@ -411,7 +413,10 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
)
referral_id = sa.Column(
sa.ForeignKey("referral.id", ondelete="SET NULL"), nullable=True, default=None
sa.ForeignKey("referral.id", ondelete="SET NULL"),
nullable=True,
default=None,
index=True,
)
referral = orm.relationship("Referral", foreign_keys=[referral_id])
@ -445,7 +450,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
random_alias_suffix = sa.Column(
sa.Integer,
nullable=False,
default=AliasSuffixEnum.random_string.value,
default=AliasSuffixEnum.word.value,
server_default=str(AliasSuffixEnum.random_string.value),
)
@ -514,9 +519,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
server_default=BlockBehaviourEnum.return_2xx.name,
)
# to keep existing behavior, the server default is TRUE whereas for new user, the default value is FALSE
include_header_email_header = sa.Column(
sa.Boolean, default=False, nullable=False, server_default="1"
sa.Boolean, default=True, nullable=False, server_default="1"
)
# bitwise flags. Allow for future expansion
@ -535,6 +539,12 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
nullable=False,
)
__table_args__ = (
sa.Index(
"ix_users_activated_trial_end_lifetime", activated, trial_end, lifetime
),
)
@property
def directory_quota(self):
return min(
@ -569,6 +579,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
@classmethod
def create(cls, email, name="", password=None, from_partner=False, **kwargs):
email = sanitize_email(email)
user: User = super(User, cls).create(email=email, name=name[:100], **kwargs)
if password:
@ -580,19 +591,6 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
Session.flush()
user.default_mailbox_id = mb.id
# create a first alias mail to show user how to use when they login
alias = Alias.create_new(
user,
prefix="simplelogin-newsletter",
mailbox_id=mb.id,
note="This is your first alias. It's used to receive SimpleLogin communications "
"like new features announcements, newsletters.",
)
Session.flush()
user.newsletter_alias_id = alias.id
Session.flush()
# generate an alternative_id if needed
if "alternative_id" not in kwargs:
user.alternative_id = str(uuid.uuid4())
@ -611,6 +609,19 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
Session.flush()
return user
# create a first alias mail to show user how to use when they login
alias = Alias.create_new(
user,
prefix="simplelogin-newsletter",
mailbox_id=mb.id,
note="This is your first alias. It's used to receive SimpleLogin communications "
"like new features announcements, newsletters.",
)
Session.flush()
user.newsletter_alias_id = alias.id
Session.flush()
if config.DISABLE_ONBOARDING:
LOG.d("Disable onboarding emails")
return user
@ -636,7 +647,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return user
def get_active_subscription(
self,
self, include_partner_subscription: bool = True
) -> Optional[
Union[
Subscription
@ -664,19 +675,40 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if coinbase_subscription and coinbase_subscription.is_active():
return coinbase_subscription
partner_sub: PartnerSubscription = PartnerSubscription.find_by_user_id(self.id)
if partner_sub and partner_sub.is_active():
return partner_sub
if include_partner_subscription:
partner_sub: PartnerSubscription = PartnerSubscription.find_by_user_id(
self.id
)
if partner_sub and partner_sub.is_active():
return partner_sub
return None
def get_active_subscription_end(
self, include_partner_subscription: bool = True
) -> Optional[arrow.Arrow]:
sub = self.get_active_subscription(
include_partner_subscription=include_partner_subscription
)
if isinstance(sub, Subscription):
return arrow.get(sub.next_bill_date)
if isinstance(sub, AppleSubscription):
return sub.expires_date
if isinstance(sub, ManualSubscription):
return sub.end_at
if isinstance(sub, CoinbaseSubscription):
return sub.end_at
return None
# region Billing
def lifetime_or_active_subscription(self) -> bool:
def lifetime_or_active_subscription(
self, include_partner_subscription: bool = True
) -> bool:
"""True if user has lifetime licence or active subscription"""
if self.lifetime:
return True
return self.get_active_subscription() is not None
return self.get_active_subscription(include_partner_subscription) is not None
def is_paid(self) -> bool:
"""same as _lifetime_or_active_subscription but not include free manual subscription"""
@ -705,14 +737,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return True
def is_premium(self) -> bool:
def is_premium(self, include_partner_subscription: bool = True) -> bool:
"""
user is premium if they:
- have a lifetime deal or
- in trial period or
- active subscription
"""
if self.lifetime_or_active_subscription():
if self.lifetime_or_active_subscription(include_partner_subscription):
return True
if self.trial_end and arrow.now() < self.trial_end:
@ -995,6 +1027,10 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if not self.is_premium():
conditions.append(SLDomain.premium_only == False) # noqa: E712
partner_domain_cond = [] # noqa:E711
if self.default_alias_public_domain_id is not None:
partner_domain_cond.append(
SLDomain.id == self.default_alias_public_domain_id
)
if alias_options.show_partner_domains is not None:
partner_user = PartnerUser.filter_by(
user_id=self.id, partner_id=alias_options.show_partner_domains.id
@ -1421,7 +1457,7 @@ class Alias(Base, ModelMixin):
)
# have I been pwned
hibp_last_check = sa.Column(ArrowType, default=None)
hibp_last_check = sa.Column(ArrowType, default=None, index=True)
hibp_breaches = orm.relationship("Hibp", secondary="alias_hibp")
# to use Postgres full text search. Only applied on "note" column for now
@ -2267,6 +2303,7 @@ class CustomDomain(Base, ModelMixin):
@classmethod
def create(cls, **kwargs):
domain = kwargs.get("domain")
kwargs["domain"] = domain.replace("\n", "")
if DeletedSubdomain.get_by(domain=domain):
raise SubdomainInTrashError
@ -2534,6 +2571,27 @@ class Mailbox(Base, ModelMixin):
+ Alias.filter_by(mailbox_id=self.id).count()
)
def is_proton(self) -> bool:
if (
self.email.endswith("@proton.me")
or self.email.endswith("@protonmail.com")
or self.email.endswith("@protonmail.ch")
or self.email.endswith("@pm.me")
):
return True
from app.email_utils import get_email_local_part
mx_domains: [(int, str)] = get_mx_domains(get_email_local_part(self.email))
# Proton is the first domain
if mx_domains and mx_domains[0][1] in (
"mail.protonmail.ch.",
"mailsec.protonmail.ch.",
):
return True
return False
@classmethod
def delete(cls, obj_id):
mailbox: Mailbox = cls.get(obj_id)
@ -2566,6 +2624,12 @@ class Mailbox(Base, ModelMixin):
return ret
@classmethod
def create(cls, **kw):
if "email" in kw:
kw["email"] = sanitize_email(kw["email"])
return super().create(**kw)
def __repr__(self):
return f"<Mailbox {self.id} {self.email}>"
@ -2904,6 +2968,8 @@ class Monitoring(Base, ModelMixin):
active_queue = sa.Column(sa.Integer, nullable=False)
deferred_queue = sa.Column(sa.Integer, nullable=False)
__table_args__ = (Index("ix_monitoring_created_at", "created_at"),)
class BatchImport(Base, ModelMixin):
__tablename__ = "batch_import"
@ -3029,6 +3095,8 @@ class Bounce(Base, ModelMixin):
email = sa.Column(sa.String(256), nullable=False, index=True)
info = sa.Column(sa.Text, nullable=True)
__table_args__ = (sa.Index("ix_bounce_created_at", "created_at"),)
class TransactionalEmail(Base, ModelMixin):
"""Storing all email addresses that receive transactional emails, including account email and mailboxes.
@ -3038,6 +3106,8 @@ class TransactionalEmail(Base, ModelMixin):
__tablename__ = "transactional_email"
email = sa.Column(sa.String(256), nullable=False, unique=False)
__table_args__ = (sa.Index("ix_transactional_email_created_at", "created_at"),)
class Payout(Base, ModelMixin):
"""Referral payouts"""

View File

@ -0,0 +1,33 @@
import requests
from requests import RequestException
from app import config
from app.log import LOG
from app.models import User
def execute_subscription_webhook(user: User):
webhook_url = config.SUBSCRIPTION_CHANGE_WEBHOOK
if webhook_url is None:
return
subscription_end = user.get_active_subscription_end(
include_partner_subscription=False
)
sl_subscription_end = None
if subscription_end:
sl_subscription_end = subscription_end.timestamp
payload = {
"user_id": user.id,
"is_premium": user.is_premium(),
"active_subscription_end": sl_subscription_end,
}
try:
response = requests.post(webhook_url, json=payload, timeout=2)
if response.status_code == 200:
LOG.i("Sent request to subscription update webhook successfully")
else:
LOG.i(
f"Request to webhook failed with statue {response.status_code}: {response.text}"
)
except RequestException as e:
LOG.error(f"Subscription request exception: {e}")

View File

@ -99,7 +99,7 @@ def sanitize_email(email_address: str, not_lower=False) -> str:
email_address = email_address.strip().replace(" ", "").replace("\n", " ")
if not not_lower:
email_address = email_address.lower()
return email_address
return email_address.replace("\u200f", "")
class NextUrlSanitizer:

View File

@ -22,10 +22,9 @@ from app.email_utils import (
render,
email_can_be_used_as_mailbox,
send_email_with_rate_control,
normalize_reply_email,
is_valid_email,
get_email_domain_part,
)
from app.email_validation import is_valid_email, normalize_reply_email
from app.errors import ProtonPartnerNotSetUp
from app.log import LOG
from app.mail_sender import load_unsent_mails_from_fs_and_resend
@ -66,12 +65,14 @@ from server import create_light_app
def notify_trial_end():
for user in User.filter(
User.activated.is_(True), User.trial_end.isnot(None), User.lifetime.is_(False)
User.activated.is_(True),
User.trial_end.isnot(None),
User.trial_end >= arrow.now().shift(days=2),
User.trial_end < arrow.now().shift(days=3),
User.lifetime.is_(False),
).all():
try:
if user.in_trial() and arrow.now().shift(
days=3
) > user.trial_end >= arrow.now().shift(days=2):
if user.in_trial():
LOG.d("Send trial end email to user %s", user)
send_trial_end_soon_email(user)
# happens if user has been deleted in the meantime
@ -104,7 +105,9 @@ def delete_logs():
def delete_refused_emails():
for refused_email in RefusedEmail.filter_by(deleted=False).all():
for refused_email in (
RefusedEmail.filter_by(deleted=False).order_by(RefusedEmail.id).all()
):
if arrow.now().shift(days=1) > refused_email.delete_at >= arrow.now():
LOG.d("Delete refused email %s", refused_email)
if refused_email.path:
@ -272,7 +275,11 @@ def compute_metric2() -> Metric2:
_24h_ago = now.shift(days=-1)
nb_referred_user_paid = 0
for user in User.filter(User.referral_id.isnot(None)):
for user in (
User.filter(User.referral_id.isnot(None))
.yield_per(500)
.enable_eagerloads(False)
):
if user.is_paid():
nb_referred_user_paid += 1
@ -1020,7 +1027,8 @@ async def check_hibp():
)
.filter(Alias.enabled)
.order_by(Alias.hibp_last_check.asc())
.all()
.yield_per(500)
.enable_eagerloads(False)
):
await queue.put(alias.id)

View File

@ -5,68 +5,64 @@ jobs:
schedule: "0 0 * * *"
captureStderr: true
- name: SimpleLogin Notify Trial Ends
command: python /code/cron.py -j notify_trial_end
shell: /bin/bash
schedule: "0 8 * * *"
captureStderr: true
- name: SimpleLogin Notify Manual Subscription Ends
command: python /code/cron.py -j notify_manual_subscription_end
shell: /bin/bash
schedule: "0 9 * * *"
captureStderr: true
- name: SimpleLogin Notify Premium Ends
command: python /code/cron.py -j notify_premium_end
shell: /bin/bash
schedule: "0 10 * * *"
captureStderr: true
- name: SimpleLogin Delete Logs
command: python /code/cron.py -j delete_logs
shell: /bin/bash
schedule: "0 11 * * *"
captureStderr: true
- name: SimpleLogin Poll Apple Subscriptions
command: python /code/cron.py -j poll_apple_subscription
shell: /bin/bash
schedule: "0 12 * * *"
captureStderr: true
- name: SimpleLogin Sanity Check
command: python /code/cron.py -j sanity_check
shell: /bin/bash
schedule: "0 2 * * *"
captureStderr: true
- name: SimpleLogin Delete Old Monitoring records
command: python /code/cron.py -j delete_old_monitoring
shell: /bin/bash
schedule: "0 14 * * *"
schedule: "15 1 * * *"
captureStderr: true
- name: SimpleLogin Custom Domain check
command: python /code/cron.py -j check_custom_domain
shell: /bin/bash
schedule: "0 15 * * *"
schedule: "15 2 * * *"
captureStderr: true
- name: SimpleLogin HIBP check
command: python /code/cron.py -j check_hibp
shell: /bin/bash
schedule: "0 18 * * *"
schedule: "15 3 * * *"
captureStderr: true
concurrencyPolicy: Forbid
- name: SimpleLogin Notify HIBP breaches
command: python /code/cron.py -j notify_hibp
shell: /bin/bash
schedule: "0 19 * * *"
schedule: "15 4 * * *"
captureStderr: true
concurrencyPolicy: Forbid
- name: SimpleLogin Delete Logs
command: python /code/cron.py -j delete_logs
shell: /bin/bash
schedule: "15 5 * * *"
captureStderr: true
- name: SimpleLogin Poll Apple Subscriptions
command: python /code/cron.py -j poll_apple_subscription
shell: /bin/bash
schedule: "15 6 * * *"
captureStderr: true
- name: SimpleLogin Notify Trial Ends
command: python /code/cron.py -j notify_trial_end
shell: /bin/bash
schedule: "15 8 * * *"
captureStderr: true
- name: SimpleLogin Notify Manual Subscription Ends
command: python /code/cron.py -j notify_manual_subscription_end
shell: /bin/bash
schedule: "15 9 * * *"
captureStderr: true
- name: SimpleLogin Notify Premium Ends
command: python /code/cron.py -j notify_premium_end
shell: /bin/bash
schedule: "15 10 * * *"
captureStderr: true
- name: SimpleLogin send unsent emails
command: python /code/cron.py -j send_undelivered_mails
shell: /bin/bash

View File

@ -15,6 +15,7 @@
- [GET /api/user/cookie_token](#get-apiusercookie_token): Get a one time use token to exchange it for a valid cookie
- [PATCH /api/user_info](#patch-apiuser_info): Update user's information.
- [POST /api/api_key](#post-apiapi_key): Create a new API key.
- [GET /api/stats](#get-apistats): Get user's stats.
- [GET /api/logout](#get-apilogout): Log out.
[Alias endpoints](#alias-endpoints)
@ -226,6 +227,22 @@ Input:
Output: same as GET /api/user_info
#### GET /api/stats
Given the API Key, return stats about the number of aliases, number of emails forwarded/replied/blocked
Input:
- `Authentication` header that contains the api key
Output: if api key is correct, return a json with the following fields:
```json
{"nb_alias": 1, "nb_block": 0, "nb_forward": 0, "nb_reply": 0}
```
If api key is incorrect, return 401.
#### PATCH /api/sudo
Enable sudo mode
@ -694,7 +711,7 @@ Return 200 and `existed=true` if contact is already added.
It can return 403 with an error if the user cannot create reverse alias.
``json
```json
{
"error": "Please upgrade to create a reverse-alias"
}

View File

@ -1,4 +1,4 @@
# SSL, HTTPS, and HSTS
# SSL, HTTPS, HSTS and additional security measures
It's highly recommended to enable SSL/TLS on your server, both for the web app and email server.
@ -58,3 +58,124 @@ Now, reload Nginx:
```bash
sudo systemctl reload nginx
```
## Additional security measures
For additional security, we recommend you take some extra steps.
### Enable Certificate Authority Authorization (CAA)
[Certificate Authority Authorization](https://letsencrypt.org/docs/caa/) is a step you can take to restrict the list of certificate authorities that are allowed to issue certificates for your domains.
Use [SSLMates CAA Record Generator](https://sslmate.com/caa/) to create a **CAA record** with the following configuration:
- `flags`: `0`
- `tag`: `issue`
- `value`: `"letsencrypt.org"`
To verify if the DNS works, the following command
```bash
dig @1.1.1.1 mydomain.com caa
```
should return:
```
mydomain.com. 3600 IN CAA 0 issue "letsencrypt.org"
```
### SMTP MTA Strict Transport Security (MTA-STS)
[MTA-STS](https://datatracker.ietf.org/doc/html/rfc8461) is an extra step you can take to broadcast the ability of your instance to receive and, optionally enforce, TSL-secure SMTP connections to protect email traffic.
Enabling MTA-STS requires you serve a specific file from subdomain `mta-sts.domain.com` on a well-known route.
Create a text file `/var/www/.well-known/mta-sts.txt` with the content:
```txt
version: STSv1
mode: testing
mx: app.mydomain.com
max_age: 86400
```
It is recommended to start with `mode: testing` for starters to get time to review failure reports. Add as many `mx:` domain entries as you have matching **MX records** in your DNS configuration.
Create a **TXT record** for `_mta-sts.mydomain.com.` with the following value:
```txt
v=STSv1; id=UNIX_TIMESTAMP
```
With `UNIX_TIMESTAMP` being the current date/time.
Use the following command to generate the record:
```bash
echo "v=STSv1; id=$(date +%s)"
```
To verify if the DNS works, the following command
```bash
dig @1.1.1.1 _mta-sts.mydomain.com txt
```
should return a result similar to this one:
```
_mta-sts.mydomain.com. 3600 IN TXT "v=STSv1; id=1689416399"
```
Create an additional Nginx configuration in `/etc/nginx/sites-enabled/mta-sts` with the following content:
```
server {
server_name mta-sts.mydomain.com;
root /var/www;
listen 80;
location ^~ /.well-known {}
}
```
Restart Nginx with the following command:
```sh
sudo service nginx restart
```
A correct configuration of MTA-STS, however, requires that the certificate used to host the `mta-sts` subdomain matches that of the subdomain referred to by the **MX record** from the DNS. In other words, both `mta-sts.mydomain.com` and `app.mydomain.com` must share the same certificate.
The easiest way to do this is to _expand_the certificate associated with `app.mydomain.com` to also support the `mta-sts` subdomain using the following command:
```sh
certbot --expand --nginx -d app.mydomain.com,mta-sts.mydomain.com
```
## SMTP TLS Reporting
[TLSRPT](https://datatracker.ietf.org/doc/html/rfc8460) is used by SMTP systems to report failures in establishing TLS-secure sessions as broadcast by the MTA-STS configuration.
Configuring MTA-STS in `mode: testing` as shown in the previous section gives you time to review failures from some SMTP senders.
Create a **TXT record** for `_smtp._tls.mydomain.com.` with the following value:
```txt
v=TSLRPTv1; rua=mailto:YOUR_EMAIL
```
The TLSRPT configuration at the DNS level allows SMTP senders that fail to initiate TLS-secure sessions to send reports to a particular email address. We suggest creating a `tls-reports` alias in SimpleLogin for this purpose.
To verify if the DNS works, the following command
```bash
dig @1.1.1.1 _smtp._tls.mydomain.com txt
```
should return a result similar to this one:
```
_smtp._tls.mydomain.com. 3600 IN TXT "v=TSLRPTv1; rua=mailto:tls-reports@mydomain.com"
```

View File

@ -106,8 +106,6 @@ from app.email_utils import (
get_header_unicode,
generate_reply_email,
is_reverse_alias,
normalize_reply_email,
is_valid_email,
replace,
should_disable,
parse_id_from_bounce,
@ -123,6 +121,7 @@ from app.email_utils import (
generate_verp_email,
sl_formataddr,
)
from app.email_validation import is_valid_email, normalize_reply_email
from app.errors import (
NonReverseAliasInReplyPhase,
VERPTransactional,
@ -262,7 +261,7 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
Session.commit()
except IntegrityError:
LOG.w("Contact %s %s already exist", alias, contact_email)
LOG.w(f"Contact with email {contact_email} for alias {alias} already exist")
Session.rollback()
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
@ -280,6 +279,9 @@ def get_or_create_reply_to_contact(
except ValueError:
return
if len(contact_name) >= Contact.MAX_NAME_LENGTH:
contact_name = contact_name[0 : Contact.MAX_NAME_LENGTH]
if not is_valid_email(contact_address):
LOG.w(
"invalid reply-to address %s. Parse from %s",
@ -348,6 +350,10 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
continue
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
contact_name = full_address.display_name
if len(contact_name) >= Contact.MAX_NAME_LENGTH:
contact_name = contact_name[0 : Contact.MAX_NAME_LENGTH]
if contact:
# update the contact name if needed
if contact.name != full_address.display_name:
@ -355,9 +361,9 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
"Update contact %s name %s to %s",
contact,
contact.name,
full_address.display_name,
contact_name,
)
contact.name = full_address.display_name
contact.name = contact_name
Session.commit()
else:
LOG.d(
@ -372,7 +378,7 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=full_address.display_name,
name=contact_name,
reply_email=generate_reply_email(contact_email, alias),
is_cc=header.lower() == "cc",
automatic_created=True,
@ -541,12 +547,20 @@ def sign_msg(msg: Message) -> Message:
signature.add_header("Content-Disposition", 'attachment; filename="signature.asc"')
try:
signature.set_payload(sign_data(message_to_bytes(msg).replace(b"\n", b"\r\n")))
payload = sign_data(message_to_bytes(msg).replace(b"\n", b"\r\n"))
if not payload:
raise PGPException("Empty signature by gnupg")
signature.set_payload(payload)
except Exception:
LOG.e("Cannot sign, try using pgpy")
signature.set_payload(
sign_data_with_pgpy(message_to_bytes(msg).replace(b"\n", b"\r\n"))
)
payload = sign_data_with_pgpy(message_to_bytes(msg).replace(b"\n", b"\r\n"))
if not payload:
raise PGPException("Empty signature by pgpy")
signature.set_payload(payload)
container.attach(signature)
@ -846,22 +860,23 @@ def forward_email_to_mailbox(
f"""Email sent to {alias.email} from an invalid address and cannot be replied""",
)
delete_all_headers_except(
msg,
[
headers.FROM,
headers.TO,
headers.CC,
headers.SUBJECT,
headers.DATE,
# do not delete original message id
headers.MESSAGE_ID,
# References and In-Reply-To are used for keeping the email thread
headers.REFERENCES,
headers.IN_REPLY_TO,
]
+ headers.MIME_HEADERS,
)
headers_to_keep = [
headers.FROM,
headers.TO,
headers.CC,
headers.SUBJECT,
headers.DATE,
# do not delete original message id
headers.MESSAGE_ID,
# References and In-Reply-To are used for keeping the email thread
headers.REFERENCES,
headers.IN_REPLY_TO,
headers.LIST_UNSUBSCRIBE,
headers.LIST_UNSUBSCRIBE_POST,
] + headers.MIME_HEADERS
if user.include_header_email_header:
headers_to_keep.append(headers.AUTHENTICATION_RESULTS)
delete_all_headers_except(msg, headers_to_keep)
# create PGP email if needed
if mailbox.pgp_enabled() and user.is_premium() and not alias.disable_pgp:
@ -898,6 +913,11 @@ def forward_email_to_mailbox(
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
if user.include_header_email_header:
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
if contact.name:
original_from = f"{contact.name} <{contact.website_email}>"
else:
original_from = contact.website_email
msg[headers.SL_ORIGINAL_FROM] = original_from
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email
msg[headers.SL_ENVELOPE_TO] = alias.email
@ -1024,7 +1044,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
# reply_email must end with EMAIL_DOMAIN or a domain that can be used as reverse alias domain
if not reply_email.endswith(EMAIL_DOMAIN):
sl_domain: SLDomain = SLDomain.get_by(domain=reply_domain)
if sl_domain is None or not sl_domain.use_as_reverse_alias:
if sl_domain is None:
LOG.w(f"Reply email {reply_email} has wrong domain")
return False, status.E501

View File

@ -0,0 +1,42 @@
"""empty message
Revision ID: 01827104004b
Revises: 2634b41f54db
Create Date: 2023-07-28 19:39:28.675490
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '01827104004b'
down_revision = '2634b41f54db'
branch_labels = None
depends_on = None
def upgrade():
with op.get_context().autocommit_block():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(op.f('ix_alias_hibp_last_check'), 'alias', ['hibp_last_check'], unique=False, postgresql_concurrently=True)
op.create_index('ix_bounce_created_at', 'bounce', ['created_at'], unique=False, postgresql_concurrently=True)
op.create_index('ix_monitoring_created_at', 'monitoring', ['created_at'], unique=False, postgresql_concurrently=True)
op.create_index('ix_transactional_email_created_at', 'transactional_email', ['created_at'], unique=False, postgresql_concurrently=True)
op.create_index(op.f('ix_users_activated'), 'users', ['activated'], unique=False, postgresql_concurrently=True)
op.create_index('ix_users_activated_trial_end_lifetime', 'users', ['activated', 'trial_end', 'lifetime'], unique=False, postgresql_concurrently=True)
op.create_index(op.f('ix_users_referral_id'), 'users', ['referral_id'], unique=False, postgresql_concurrently=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_users_referral_id'), table_name='users')
op.drop_index('ix_users_activated_trial_end_lifetime', table_name='users')
op.drop_index(op.f('ix_users_activated'), table_name='users')
op.drop_index('ix_transactional_email_created_at', table_name='transactional_email')
op.drop_index('ix_monitoring_created_at', table_name='monitoring')
op.drop_index('ix_bounce_created_at', table_name='bounce')
op.drop_index(op.f('ix_alias_hibp_last_check'), table_name='alias')
# ### end Alembic commands ###

0
app/monitor/__init__.py Normal file
View File

21
app/monitor/metric.py Normal file
View File

@ -0,0 +1,21 @@
from dataclasses import dataclass
from typing import List
@dataclass
class UpcloudRecord:
db_role: str
label: str
time: str
value: float
@dataclass
class UpcloudMetric:
metric_name: str
records: List[UpcloudRecord]
@dataclass
class UpcloudMetrics:
metrics: List[UpcloudMetric]

View File

@ -0,0 +1,20 @@
from app.config import UPCLOUD_DB_ID, UPCLOUD_PASSWORD, UPCLOUD_USERNAME
from app.log import LOG
from monitor.newrelic import NewRelicClient
from monitor.upcloud import UpcloudClient
class MetricExporter:
def __init__(self, newrelic_license: str):
self.__upcloud = UpcloudClient(
username=UPCLOUD_USERNAME, password=UPCLOUD_PASSWORD
)
self.__newrelic = NewRelicClient(newrelic_license)
def run(self):
try:
metrics = self.__upcloud.get_metrics(UPCLOUD_DB_ID)
self.__newrelic.send(metrics)
LOG.info("Upcloud metrics sent to NewRelic")
except Exception as e:
LOG.warn(f"Could not export metrics: {e}")

26
app/monitor/newrelic.py Normal file
View File

@ -0,0 +1,26 @@
from monitor.metric import UpcloudMetrics
from newrelic_telemetry_sdk import GaugeMetric, MetricClient
_NEWRELIC_BASE_HOST = "metric-api.eu.newrelic.com"
class NewRelicClient:
def __init__(self, license_key: str):
self.__client = MetricClient(license_key=license_key, host=_NEWRELIC_BASE_HOST)
def send(self, metrics: UpcloudMetrics):
batch = []
for metric in metrics.metrics:
for record in metric.records:
batch.append(
GaugeMetric(
name=f"upcloud.db.{metric.metric_name}",
value=record.value,
tags={"host": record.label, "db_role": record.db_role},
)
)
response = self.__client.send_batch(batch)
response.raise_for_status()

82
app/monitor/upcloud.py Normal file
View File

@ -0,0 +1,82 @@
from app.log import LOG
from monitor.metric import UpcloudMetric, UpcloudMetrics, UpcloudRecord
import base64
import requests
from typing import Any
BASE_URL = "https://api.upcloud.com"
def get_metric(json: Any, metric: str) -> UpcloudMetric:
records = []
if metric in json:
metric_data = json[metric]
data = metric_data["data"]
cols = list(map(lambda x: x["label"], data["cols"][1:]))
latest = data["rows"][-1]
time = latest[0]
for column_idx in range(len(cols)):
value = latest[1 + column_idx]
# If the latest value is None, try to fetch the second to last
if value is None:
value = data["rows"][-2][1 + column_idx]
if value is not None:
label = cols[column_idx]
if "(master)" in label:
db_role = "master"
else:
db_role = "standby"
records.append(
UpcloudRecord(time=time, db_role=db_role, label=label, value=value)
)
else:
LOG.warn(f"Could not get value for metric {metric}")
return UpcloudMetric(metric_name=metric, records=records)
def get_metrics(json: Any) -> UpcloudMetrics:
return UpcloudMetrics(
metrics=[
get_metric(json, "cpu_usage"),
get_metric(json, "disk_usage"),
get_metric(json, "diskio_reads"),
get_metric(json, "diskio_writes"),
get_metric(json, "load_average"),
get_metric(json, "mem_usage"),
get_metric(json, "net_receive"),
get_metric(json, "net_send"),
]
)
class UpcloudClient:
def __init__(self, username: str, password: str):
if not username:
raise Exception("UpcloudClient username must be set")
if not password:
raise Exception("UpcloudClient password must be set")
client = requests.Session()
encoded_auth = base64.b64encode(
f"{username}:{password}".encode("utf-8")
).decode("utf-8")
client.headers = {"Authorization": f"Basic {encoded_auth}"}
self.__client = client
def get_metrics(self, db_uuid: str) -> UpcloudMetrics:
url = f"{BASE_URL}/1.3/database/{db_uuid}/metrics?period=hour"
LOG.d(f"Performing request to {url}")
response = self.__client.get(url)
LOG.d(f"Status code: {response.status_code}")
if response.status_code != 200:
return UpcloudMetrics(metrics=[])
as_json = response.json()
return get_metrics(as_json)

View File

@ -1,3 +1,4 @@
import configparser
import os
import subprocess
from time import sleep
@ -7,6 +8,7 @@ import newrelic.agent
from app.db import Session
from app.log import LOG
from monitor.metric_exporter import MetricExporter
# the number of consecutive fails
# if more than _max_nb_fails, alert
@ -19,6 +21,18 @@ _max_nb_fails = 10
# the maximum number of emails in incoming & active queue
_max_incoming = 50
_NR_CONFIG_FILE_LOCATION_VAR = "NEW_RELIC_CONFIG_FILE"
def get_newrelic_license() -> str:
nr_file = os.environ.get(_NR_CONFIG_FILE_LOCATION_VAR, None)
if nr_file is None:
raise Exception(f"{_NR_CONFIG_FILE_LOCATION_VAR} not defined")
config = configparser.ConfigParser()
config.read(nr_file)
return config["newrelic"]["license_key"]
@newrelic.agent.background_task()
def log_postfix_metrics():
@ -80,10 +94,13 @@ def log_nb_db_connection():
if __name__ == "__main__":
exporter = MetricExporter(get_newrelic_license())
while True:
log_postfix_metrics()
log_nb_db_connection()
Session.close()
exporter.run()
# 1 min
sleep(60)

1013
app/poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -95,13 +95,13 @@ webauthn = "^0.4.7"
pyspf = "^2.0.14"
Flask-Limiter = "^1.4"
memory_profiler = "^0.57.0"
gevent = "^21.12.0"
gevent = "22.10.2"
aiospamc = "^0.6.1"
email_validator = "^1.1.1"
PGPy = "0.5.4"
coinbase-commerce = "^1.0.1"
requests = "^2.25.1"
newrelic = "^7.10.0"
newrelic = "8.8.0"
flanker = "^0.9.11"
pyre2 = "^0.3.6"
tldextract = "^3.1.2"
@ -111,6 +111,7 @@ Deprecated = "^1.2.13"
cryptography = "37.0.1"
SQLAlchemy = "1.3.24"
redis = "^4.5.3"
newrelic-telemetry-sdk = "^0.5.0"
[tool.poetry.dev-dependencies]
pytest = "^7.0.0"

View File

@ -79,6 +79,7 @@ from app.config import (
MEM_STORE_URI,
)
from app.dashboard.base import dashboard_bp
from app.subscription_webhook import execute_subscription_webhook
from app.db import Session
from app.developer.base import developer_bp
from app.discover.base import discover_bp
@ -491,6 +492,7 @@ def setup_paddle_callback(app: Flask):
# in case user cancels a plan and subscribes a new plan
sub.cancelled = False
execute_subscription_webhook(user)
LOG.d("User %s upgrades!", user)
Session.commit()
@ -509,6 +511,7 @@ def setup_paddle_callback(app: Flask):
).date()
Session.commit()
execute_subscription_webhook(sub.user)
elif request.form.get("alert_name") == "subscription_cancelled":
subscription_id = request.form.get("subscription_id")
@ -538,6 +541,7 @@ def setup_paddle_callback(app: Flask):
end_date=request.form.get("cancellation_effective_date"),
),
)
execute_subscription_webhook(sub.user)
else:
# user might have deleted their account
@ -580,6 +584,7 @@ def setup_paddle_callback(app: Flask):
sub.cancelled = False
Session.commit()
execute_subscription_webhook(sub.user)
else:
LOG.w(
f"update non-exist subscription {subscription_id}. {request.form}"
@ -596,6 +601,7 @@ def setup_paddle_callback(app: Flask):
Subscription.delete(sub.id)
Session.commit()
LOG.e("%s requests a refund", user)
execute_subscription_webhook(sub.user)
elif request.form.get("alert_name") == "subscription_payment_refunded":
subscription_id = request.form.get("subscription_id")
@ -629,6 +635,7 @@ def setup_paddle_callback(app: Flask):
LOG.e("Unknown plan_id %s", plan_id)
else:
LOG.w("partial subscription_payment_refunded, not handled")
execute_subscription_webhook(sub.user)
return "OK"
@ -742,6 +749,7 @@ def handle_coinbase_event(event) -> bool:
coinbase_subscription=coinbase_subscription,
),
)
execute_subscription_webhook(user)
return True

View File

@ -256,17 +256,27 @@ new Vue({
el: '#filter-app',
delimiters: ["[[", "]]"], // necessary to avoid conflict with jinja
data: {
showFilter: false
showFilter: false,
showStats: false
},
methods: {
async toggleFilter() {
let that = this;
that.showFilter = !that.showFilter;
store.set('showFilter', that.showFilter);
},
async toggleStats() {
let that = this;
that.showStats = !that.showStats;
store.set('showStats', that.showStats);
}
},
async mounted() {
if (store.get("showFilter"))
this.showFilter = true;
if (store.get("showStats"))
this.showStats = true;
}
});

16
app/static/package-lock.json generated vendored
View File

@ -69,12 +69,12 @@
"font-awesome": {
"version": "4.7.0",
"resolved": "https://registry.npmjs.org/font-awesome/-/font-awesome-4.7.0.tgz",
"integrity": "sha1-j6jPBBGhoxr9B7BtKQK7n8gVoTM="
"integrity": "sha512-U6kGnykA/6bFmg1M/oT9EkFeIYv7JlX3bozwQJWiiLz6L0w3F5vBVPxHlwyX/vtNq1ckcpRKOB9f2Qal/VtFpg=="
},
"htmx.org": {
"version": "1.6.1",
"resolved": "https://registry.npmjs.org/htmx.org/-/htmx.org-1.6.1.tgz",
"integrity": "sha512-i+1k5ee2eFWaZbomjckyrDjUpa3FMDZWufatUSBmmsjXVksn89nsXvr1KLGIdAajiz+ZSL7TE4U/QaZVd2U2sA=="
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/htmx.org/-/htmx.org-1.7.0.tgz",
"integrity": "sha512-wIQ3yNq7yiLTm+6BhV7Z8qKKTzEQv9xN/I4QsN5FvdGi69SNWTsSMlhH69HPa1rpZ8zSq1A/e7gTbTySxliP8g=="
},
"intro.js": {
"version": "2.9.3",
@ -82,9 +82,9 @@
"integrity": "sha512-hC+EXWnEuJeA3CveGMat3XHePd2iaXNFJIVfvJh2E9IzBMGLTlhWvPIVHAgKlOpO4lNayCxEqzr4N02VmHFr9Q=="
},
"jquery": {
"version": "3.5.1",
"resolved": "https://registry.npmjs.org/jquery/-/jquery-3.5.1.tgz",
"integrity": "sha512-XwIBPqcMn57FxfT+Go5pzySnm4KWkT1Tv7gjrpT1srtf8Weynl6R273VJ5GjkRb51IzMp5nbaPjJXMWeju2MKg=="
"version": "3.6.4",
"resolved": "https://registry.npmjs.org/jquery/-/jquery-3.6.4.tgz",
"integrity": "sha512-v28EW9DWDFpzcD9O5iyJXg3R3+q+mET5JhnjJzQUZMHOv67bpSIHq81GEYpPNZHG+XXHsfSme3nxp/hndKEcsQ=="
},
"multiple-select": {
"version": "1.5.2",
@ -107,7 +107,7 @@
"toastr": {
"version": "2.1.4",
"resolved": "https://registry.npmjs.org/toastr/-/toastr-2.1.4.tgz",
"integrity": "sha1-i0O+ZPudDEFIcURvLbjoyk6V8YE=",
"integrity": "sha512-LIy77F5n+sz4tefMmFOntcJ6HL0Fv3k1TDnNmFZ0bU/GcvIIfy6eG2v7zQmMiYgaalAiUv75ttFrPn5s0gyqlA==",
"requires": {
"jquery": ">=1.12.0"
}

View File

@ -9,10 +9,13 @@
<h1 class="card-title">Create new account</h1>
<div class="form-group">
<label class="form-label">Email address</label>
{{ form.email(class="form-control", type="email") }}
{{ form.email(class="form-control", type="email", placeholder="YourName@protonmail.com") }}
<div class="small-text alert alert-info" style="margin-top: 1px">
Emails sent to your alias will be forwarded to this email address.
<br>
It can't be a disposable or forwarding email address.
<br>
We recommend using a <a href="https://proton.me/mail" target="_blank">Proton Mail</a> address
</div>
{{ render_field_errors(form.email) }}
</div>

View File

@ -23,7 +23,7 @@
<!-- Yandex -->
<meta name="yandex-verification" content="c9e5d4d68bc983a1" />
<meta name="description"
content="Protect your email address with email ALIAS. Create a different email alias for each website. No more phishing, spams."/>
content="Protect your email address with email ALIAS. Create a different email alias for each website. No more phishing, or spam."/>
<link rel="icon" href="/static/favicon.ico" type="image/x-icon" />
<link rel="shortcut icon" type="image/x-icon" href="/static/favicon.ico" />
<link rel="canonical" href="{{ CANONICAL_URL }}" />

View File

@ -31,63 +31,11 @@
{% block title %}Alias{% endblock %}
{% block default_content %}
<!-- Global Stats -->
<div class="row">
<div class="col-12 col-md-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="d-flex align-items-center">
<div class="subheader">Aliases</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">All time</div>
</div>
<div class="h1 m-0">{{ stats.nb_alias }}</div>
</div>
</div>
</div>
<div class="col-12 col-md-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="d-flex align-items-center">
<div class="subheader">Forwarded</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">Last 14 days</div>
</div>
<div class="h1 m-0">{{ stats.nb_forward }}</div>
</div>
</div>
</div>
<div class="col-12 col-md-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="d-flex align-items-center">
<div class="subheader">Replies/Sent</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">Last 14 days</div>
</div>
<div class="h1 m-0">{{ stats.nb_reply }}</div>
</div>
</div>
</div>
<div class="col-12 col-md-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="d-flex align-items-center">
<div class="subheader">Blocked</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">Last 14 days</div>
</div>
<div class="h1 m-0">{{ stats.nb_block }}</div>
</div>
</div>
</div>
</div>
<!-- END Global Stats -->
<!-- Controls: buttons & search -->
<div id="filter-app">
<div class="row mb-3">
<div class="col d-flex">
<div>
<div class="col d-flex flex-wrap justify-content-between">
<div class="mb-1">
<div class="btn-group" role="group">
<form method="post">
{{ csrf_form.csrf_token }}
@ -141,17 +89,86 @@
</div>
</div>
</div>
<div style="margin-left: auto">
<div>
<div class="btn-group">
<a v-if="!showFilter"
@click="toggleFilter()"
class="btn btn-outline-secondary">
<i class="fe fe-chevrons-down"></i> Filters
<a @click="toggleStats()" class="btn btn-outline-secondary">
<span v-if="!showStats">
<i class="fe fe-chevrons-down"></i>
Show stats
</span>
<span v-else>
<i class="fe fe-chevrons-up"></i>
Hide stats
</span>
</a>
</div>
<div class="btn-group">
<a @click="toggleFilter()" class="btn btn-outline-secondary">
<span v-if="!showFilter">
<i class="fe fe-chevrons-down"></i>
Show filters
</span>
<span v-else>
<i class="fe fe-chevrons-up"></i>
Hide filters
</span>
</a>
</div>
</div>
</div>
</div>
<!-- Global Stats -->
<div class="row" v-if="showStats">
<div class="col-12 col-md-6 col-lg-3">
<div class="card mb-3">
<div class="card-body py-3">
<div class="d-flex align-items-center">
<div class="subheader">Aliases</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">All time</div>
</div>
<div class="h1 m-0">{{ stats.nb_alias }}</div>
</div>
</div>
</div>
<div class="col-12 col-md-6 col-lg-3">
<div class="card mb-3">
<div class="card-body py-3">
<div class="d-flex align-items-center">
<div class="subheader">Forwarded</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">Last 14 days</div>
</div>
<div class="h1 m-0">{{ stats.nb_forward }}</div>
</div>
</div>
</div>
<div class="col-12 col-md-6 col-lg-3">
<div class="card mb-3">
<div class="card-body py-3">
<div class="d-flex align-items-center">
<div class="subheader">Replies/Sent</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">Last 14 days</div>
</div>
<div class="h1 m-0">{{ stats.nb_reply }}</div>
</div>
</div>
</div>
<div class="col-12 col-md-6 col-lg-3">
<div class="card mb-3">
<div class="card-body py-3">
<div class="d-flex align-items-center">
<div class="subheader">Blocked</div>
<div class="text-muted"
style="order: 2; margin-left: auto; font-size: .8rem">Last 14 days</div>
</div>
<div class="h1 m-0">{{ stats.nb_block }}</div>
</div>
</div>
</div>
</div>
<!-- END Global Stats -->
<div class="row mb-2" v-if="showFilter" id="filter-control">
<!-- Filter Control -->
<div class="col d-flex">
@ -223,11 +240,6 @@
<a href="{{ url_for('dashboard.index') }}"
class="btn btn-outline-secondary">Reset</a>
{% endif %}
<a v-if="showFilter"
@click="toggleFilter()"
class="btn btn-outline-secondary">
<i class="fe fe-chevrons-up"></i>
</a>
</div>
</div>
</div>

View File

@ -71,98 +71,110 @@
</form>
</div>
<!-- END Change email -->
{% if mailbox.pgp_finger_print and not mailbox.disable_pgp and current_user.include_sender_in_reverse_alias %}
<!-- Not show PGP option for Proton mailbox -->
{% if mailbox.is_proton() and not mailbox.pgp_enabled() %}
<div class="alert alert-info">
Email headers like <span class="italic">From, To, Subject</span> aren't encrypted by PGP.
Currently, your reverse alias includes the sender address.
You can disable this on <a href="/dashboard/setting#sender-in-ra">Settings</a>.
As an email is always encrypted at rest in Proton Mail, having SimpleLogin also encrypt your email is redundant and does not add any security benefit.
<br>
The PGP option on SimpleLogin is instead useful for when your mailbox provider isn't encrypted by default like Gmail, Outlook, etc.
</div>
{% endif %}
<div class="card">
<div class="card-body">
<div class="card-title">
<div class="d-flex">
Pretty Good Privacy (PGP)
{% if mailbox.pgp_finger_print %}
<div class="{% if mailbox.is_proton() and not mailbox.pgp_enabled() %}
disabled-content{% endif %}">
{% if mailbox.pgp_finger_print and not mailbox.disable_pgp and current_user.include_sender_in_reverse_alias and not mailbox.is_proton() %}
<form method="post">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="toggle-pgp">
<label class="custom-switch cursor" style="padding-left: 1rem" data-toggle="tooltip" {% if mailbox.disable_pgp %}
title="Enable PGP" {% else %} title="Disable PGP" {% endif %}>
<input type="checkbox" class="custom-switch-input" name="pgp-enabled" {{ "" if mailbox.disable_pgp else "checked" }}>
<span class="custom-switch-indicator"></span>
</label>
</form>
{% endif %}
</div>
<div class="small-text mt-1">
By importing your PGP Public Key into SimpleLogin, all emails sent to {{ mailbox.email }} are
<b>encrypted</b> with your key.
<br />
{% if PGP_SIGNER %}All forwarded emails will be signed with <b>{{ PGP_SIGNER }}</b>.{% endif %}
</div>
<div class="alert alert-info">
Email headers like <span class="italic">From, To, Subject</span> aren't encrypted by PGP.
Currently, your reverse alias includes the sender address.
You can disable this on <a href="/dashboard/setting#sender-in-ra">Settings</a>.
</div>
{% if not current_user.is_premium() %}
<div class="alert alert-danger" role="alert">This feature is only available in premium plan.</div>
{% endif %}
<form method="post">
{{ csrf_form.csrf_token }}
<div class="form-group">
<label class="form-label">PGP Public Key</label>
<textarea name="pgp" {% if not current_user.is_premium() %} disabled {% endif %} class="form-control" rows=10 id="pgp-public-key" placeholder="(Drag and drop or paste your pgp public key here)&#10;-----BEGIN PGP PUBLIC KEY BLOCK-----">{{ mailbox.pgp_public_key or "" }}</textarea>
</div>
<input type="hidden" name="form-name" value="pgp">
<button class="btn btn-primary" name="action" {% if not current_user.is_premium() %}
disabled {% endif %} value="save">
Save
</button>
{% if mailbox.pgp_finger_print %}
<button class="btn btn-danger float-right" name="action" value="remove">Remove</button>
{% endif %}
</form>
</div>
</div>
<div class="card" {% if not mailbox.pgp_enabled() %}
disabled {% endif %}>
<form method="post">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="generic-subject">
{% endif %}
<div class="card">
<div class="card-body">
<div class="card-title">
Hide email subject when PGP is enabled
<div class="d-flex">
Pretty Good Privacy (PGP)
{% if mailbox.pgp_finger_print %}
<form method="post">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="toggle-pgp">
<label class="custom-switch cursor" style="padding-left: 1rem" data-toggle="tooltip" {% if mailbox.disable_pgp %}
title="Enable PGP" {% else %} title="Disable PGP" {% endif %}>
<input type="checkbox" class="custom-switch-input" name="pgp-enabled" {{ "" if mailbox.disable_pgp else "checked" }}>
<span class="custom-switch-indicator"></span>
</label>
</form>
{% endif %}
</div>
<div class="small-text mt-1">
When PGP is enabled, you can choose to use a <b>generic</b> subject for the forwarded emails.
The original subject is then added into the email body.
By importing your PGP Public Key into SimpleLogin, all emails sent to {{ mailbox.email }} are
<b>encrypted</b> with your key.
<br />
As PGP does not encrypt the email subject and the email subject might contain sensitive information,
this option will allow a further protection of your email content.
{% if PGP_SIGNER %}All forwarded emails will be signed with <b>{{ PGP_SIGNER }}</b>.{% endif %}
</div>
</div>
<div class="alert alert-info">
As the email is encrypted, a subject like "Email for you"
will probably be rejected by your mailbox since it sounds like a spam.
<br />
Something like "Encrypted Email" would work much better :).
</div>
<div class="form-group">
<label class="form-label">Generic Subject</label>
<input name="generic-subject" {% if not mailbox.pgp_enabled() %}
disabled {% endif %} class="form-control" maxlength="78" placeholder="Generic Subject" value="{{ mailbox.generic_subject or "" }}">
{% if not current_user.is_premium() %}
<div class="alert alert-danger" role="alert">This feature is only available in premium plan.</div>
{% endif %}
<form method="post">
{{ csrf_form.csrf_token }}
<div class="form-group">
<label class="form-label">PGP Public Key</label>
<textarea name="pgp" {% if not current_user.is_premium() %} disabled {% endif %} class="form-control" rows=10 id="pgp-public-key" placeholder="(Drag and drop or paste your pgp public key here)&#10;-----BEGIN PGP PUBLIC KEY BLOCK-----">{{ mailbox.pgp_public_key or "" }}</textarea>
</div>
<button class="btn btn-primary" name="action" {% if not mailbox.pgp_enabled() %}
<input type="hidden" name="form-name" value="pgp">
<button class="btn btn-primary" name="action" {% if not current_user.is_premium() %}
disabled {% endif %} value="save">
Save
</button>
{% if mailbox.generic_subject %}
{% if mailbox.pgp_finger_print %}
<button class="btn btn-danger float-right" name="action" value="remove">Remove</button>
{% endif %}
</div>
</form>
</form>
</div>
</div>
<div class="card" {% if not mailbox.pgp_enabled() %}
disabled {% endif %}>
<form method="post">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="generic-subject">
<div class="card-body">
<div class="card-title">
Hide email subject when PGP is enabled
<div class="small-text mt-1">
When PGP is enabled, you can choose to use a <b>generic</b> subject for the forwarded emails.
The original subject is then added into the email body.
<br />
As PGP does not encrypt the email subject and the email subject might contain sensitive information,
this option will allow a further protection of your email content.
</div>
</div>
<div class="alert alert-info">
As the email is encrypted, a subject like "Email for you"
will probably be rejected by your mailbox since it sounds like a spam.
<br />
Something like "Encrypted Email" would work much better :).
</div>
<div class="form-group">
<label class="form-label">Generic Subject</label>
<input name="generic-subject" {% if not mailbox.pgp_enabled() %}
disabled {% endif %} class="form-control" maxlength="78" placeholder="Generic Subject" value="{{ mailbox.generic_subject or "" }}">
</div>
<button class="btn btn-primary" name="action" {% if not mailbox.pgp_enabled() %}
disabled {% endif %} value="save">
Save
</button>
{% if mailbox.generic_subject %}
<button class="btn btn-danger float-right" name="action" value="remove">Remove</button>
{% endif %}
</div>
</form>
</div>
</div>
<hr />
<h2 class="h4">Advanced Options</h2>

View File

@ -15,7 +15,7 @@
<div class="col">
<h1 class="h3 mb-5">Quarantine & Bounce</h1>
<div class="alert alert-info">
This page shows all emails that are either refused by your mailbox (bounced) or detected as spams/phishing (quarantine) via our
This page shows all emails that are either refused by your mailbox (bounced) or detected as spam/phishing (quarantine) via our
<a href="https://simplelogin.io/docs/getting-started/anti-phishing/"
target="_blank"
rel="noopener noreferrer">anti-phishing program ↗</a>
@ -31,7 +31,7 @@
rel="noopener noreferrer">Setting up filter for SimpleLogin emails ↗</a>
</li>
<li>
If the email is flagged as spams/phishing, this means that the sender explicitly states their emails should respect
If the email is flagged as spam/phishing, this means that the sender explicitly states their emails should respect
<b>DMARC</b> (an email authentication protocol)
and any email that violates this should either be quarantined or rejected. If possible, please contact the sender
so they can update their DMARC setting or fix their SPF/DKIM that cause the DMARC failure.

View File

@ -181,10 +181,10 @@
<!-- END change name & profile picture -->
<!-- Change email -->
<div class="card">
<form method="post" enctype="multipart/form-data">
<input type="hidden" name="form-name" value="update-email">
{{ change_email_form.csrf_token }}
<div class="card-body">
<div class="card-body">
<form method="post" enctype="multipart/form-data">
<input type="hidden" name="form-name" value="update-email">
{{ change_email_form.csrf_token }}
<div class="card-title">Account Email</div>
<div class="mb-3">
This email address is used to log in to SimpleLogin.
@ -199,26 +199,30 @@
<!-- Not allow user to change email if there's a pending change -->
{{ change_email_form.email(class="form-control", value=current_user.email, readonly=pending_email != None) }}
{{ render_field_errors(change_email_form.email) }}
{% if pending_email %}
<div class="mt-2">
<span class="text-danger">Pending email change: {{ pending_email }}</span>
<a href="{{ url_for('dashboard.resend_email_change') }}"
class="btn btn-secondary btn-sm">
Resend
confirmation email
</a>
<a href="{{ url_for('dashboard.cancel_email_change') }}"
class="btn btn-secondary btn-sm">
Cancel email
change
</a>
</div>
{% endif %}
</div>
<button class="btn btn-outline-primary">Change Email</button>
</div>
</form>
</form>
{% if pending_email %}
<div class="mt-2">
<span class="text-danger float-left">Pending email change: {{ pending_email }}</span>
<form method="POST"
action="{{ url_for('dashboard.resend_email_change') }}"
class="float-left ml-2">
{{ change_email_form.csrf_token }}
<a onclick="this.closest('form').submit()"
class="btn btn-secondary btn-sm">Resend confirmation email</a>
</form>
<form method="POST"
action="{{ url_for('dashboard.cancel_email_change') }}"
class="float-left ml-2">
{{ change_email_form.csrf_token }}
<a onclick="this.closest('form').submit()"
class="btn btn-secondary btn-sm">Cancel email change</a>
</form>
</div>
{% endif %}
</div>
</div>
<!-- END Change email -->
<!-- Connect with Proton -->
@ -265,11 +269,15 @@
<div class="card" id="change_password">
<div class="card-body">
<div class="card-title">Password</div>
<div class="mb-3">You will receive an email containing instructions on how to change your password.</div>
<div class="mb-3">
You will receive an email containing instructions on how to change your password.
</div>
<form method="post">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="change-password">
<button class="btn btn-outline-primary">Change password</button>
<button class="btn btn-outline-primary">
Change password
</button>
</form>
</div>
</div>
@ -676,7 +684,8 @@
SimpleLogin forwards emails to your mailbox from the <b>reverse-alias</b> and not from the <b>original</b>
sender address.
<br />
If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>.
If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>
and the original From header is stored in <b>X-SimpleLogin-Original-From<b>.
You can choose to display this header in your email client.
<br />
As email headers aren't encrypted, your mailbox service can know the sender address via this header.

View File

@ -28,7 +28,7 @@
<form id="supportZendeskForm" method="post" enctype="multipart/form-data">
<div class="mt-4 mb-5">
<label for="issueDescription" class="form-label font-weight-bold">What happened?</label>
<textarea class="form-control" required name="ticket_content" id="issueDescription" rows="3" placeholder="Please provide as much information as possible. For example which alias(es), mailbox(es) ar affected, if this is a persistent issue...">{{- ticket_content or '' -}}</textarea>
<textarea class="form-control" required name="ticket_content" id="issueDescription" rows="3" placeholder="Please provide as much information as possible. For example which alias(es), mailbox(es) are affected, if this is a persistent issue...">{{- ticket_content or '' -}}</textarea>
</div>
<div class="mt-5 font-weight-bold">Attach files to support request</div>
<div class="text-muted">Only images, text and emails are accepted</div>

View File

@ -31,7 +31,7 @@ Please consider the following options:
<a href="{{ disable_alias_link }}">disable the alias</a>
or
<a href="{{ block_sender_link }}">block the sender</a>
if they send too many spams.
if they send too many spam emails.
</li>
</ol>
<br />

View File

@ -12,7 +12,7 @@ Please consider the following options:
2. If this email is spam, it means your alias {{alias}} is now in the hands of a spammer.
You can either disable the alias on {{disable_alias_link}}
or block the sender on {{ block_sender_link }} if they send too many spams.
or block the sender on {{ block_sender_link }} if they send too many spam emails.
Please note that the alias can be automatically disabled if too many emails sent to it are bounced.

View File

@ -286,6 +286,7 @@
},
async mounted() {
Object.freeze(Object.prototype);
let that = this;
let res = await fetch(`/api/notifications?page=${that.page}`, {
method: "GET",

View File

@ -19,7 +19,7 @@
<a href="{{ disable_alias_link }}">disable the alias</a>
or
<a href="{{ block_sender_link }}">block the sender</a>
if they send too many spams.
if they send too many spam emails.
</li>
</ol>
</div>

View File

@ -5,7 +5,7 @@
<div class="page-single">
<div class="container">
<div class="row">
<div class="col mx-auto" style="max-width: 28rem">
<div class="col mx-auto" style="max-width: 32rem">
<div class="text-center mb-6">
<a href="{{ LANDING_PAGE_URL }}">
<img src="/static/logo.svg"

View File

@ -17,7 +17,7 @@ def test_get_setting(flask_client):
"notification": True,
"random_alias_default_domain": "sl.local",
"sender_format": "AT",
"random_alias_suffix": "random_string",
"random_alias_suffix": "word",
}
@ -95,11 +95,13 @@ def test_get_setting_domains_v2(flask_client):
def test_update_settings_random_alias_suffix(flask_client):
user = login(flask_client)
# default random_alias_suffix is random_string
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value
assert user.random_alias_suffix == AliasSuffixEnum.word.value
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"})
assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "word"})
r = flask_client.patch(
"/api/setting", json={"random_alias_suffix": "random_string"}
)
assert r.status_code == 200
assert user.random_alias_suffix == AliasSuffixEnum.word.value
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value

View File

@ -129,3 +129,12 @@ def test_change_name(flask_client):
assert r.json["name"] == "new name"
assert user.name == "new name"
def test_stats(flask_client):
login(flask_client)
r = flask_client.get("/api/stats")
assert r.status_code == 200
assert r.json == {"nb_alias": 1, "nb_block": 0, "nb_forward": 0, "nb_reply": 0}

View File

@ -0,0 +1,26 @@
from flask import url_for
from app.db import Session
from app.models import User, ResetPasswordCode
from tests.utils import create_new_user, random_token
def test_successful_reset_password(flask_client):
user = create_new_user()
original_pass_hash = user.password
user_id = user.id
reset_code = random_token()
ResetPasswordCode.create(user_id=user.id, code=reset_code)
ResetPasswordCode.create(user_id=user.id, code=random_token())
Session.commit()
r = flask_client.post(
url_for("auth.reset_password", code=reset_code),
data={"password": "1231idsfjaads"},
)
assert r.status_code == 302
assert ResetPasswordCode.get_by(user_id=user_id) is None
user = User.get(user_id)
assert user.password != original_pass_hash

View File

@ -1,10 +1,13 @@
from time import time
import arrow
from flask import url_for
from app import config
from app.dashboard.views.api_key import clean_up_unused_or_old_api_keys
from app.db import Session
from app.models import User, ApiKey
from tests.utils import login
from tests.utils import login, create_new_user
def test_api_key_page_requires_password(flask_client):
@ -34,6 +37,17 @@ def test_create_delete_api_key(flask_client):
assert ApiKey.filter(ApiKey.user_id == user.id).count() == 1
assert api_key.name == "for test"
# create second api_key
create_r = flask_client.post(
url_for("dashboard.api_key"),
data={"form-name": "create", "name": "for test 2"},
follow_redirects=True,
)
assert create_r.status_code == 200
api_key_2 = ApiKey.filter_by(user_id=user.id).order_by(ApiKey.id.desc()).first()
assert ApiKey.filter(ApiKey.user_id == user.id).count() == 2
assert api_key_2.name == "for test 2"
# delete api_key
delete_r = flask_client.post(
url_for("dashboard.api_key"),
@ -41,7 +55,7 @@ def test_create_delete_api_key(flask_client):
follow_redirects=True,
)
assert delete_r.status_code == 200
assert ApiKey.count() == nb_api_key
assert ApiKey.count() == nb_api_key + 1
def test_delete_all_api_keys(flask_client):
@ -87,3 +101,26 @@ def test_delete_all_api_keys(flask_client):
assert (
ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1
) # assert that user 2 still has 1 API key
def test_cleanup_api_keys():
user = create_new_user()
ApiKey.create(
user_id=user.id, name="used", last_used=arrow.utcnow().shift(days=-3), times=1
)
ApiKey.create(
user_id=user.id, name="keep 1", last_used=arrow.utcnow().shift(days=-2), times=1
)
ApiKey.create(
user_id=user.id, name="keep 2", last_used=arrow.utcnow().shift(days=-1), times=1
)
ApiKey.create(user_id=user.id, name="not used", last_used=None, times=1)
Session.flush()
old_max_api_keys = config.MAX_API_KEYS
config.MAX_API_KEYS = 2
clean_up_unused_or_old_api_keys(user.id)
keys = ApiKey.filter_by(user_id=user.id).all()
assert len(keys) == 2
assert keys[0].name.find("keep") == 0
assert keys[1].name.find("keep") == 0
config.MAX_API_KEYS = old_max_api_keys

View File

@ -0,0 +1,20 @@
from flask import url_for
from app.models import Coupon
from app.utils import random_string
from tests.utils import login
def test_use_coupon(flask_client):
user = login(flask_client)
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True)
r = flask_client.post(
url_for("dashboard.coupon_route"),
data={"code": code},
)
assert r.status_code == 302
coupon = Coupon.get_by(code=code)
assert coupon.used
assert coupon.used_by_user_id == user.id

View File

@ -316,6 +316,10 @@ def test_add_alias_in_global_trash(flask_client):
def test_add_alias_in_custom_domain_trash(flask_client):
user = login(flask_client)
for deleted_domain in DomainDeletedAlias.all():
Session.delete(deleted_domain)
Session.flush()
domain = random_domain()
custom_domain = CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True

View File

@ -0,0 +1,21 @@
Sender: somebody@somewhere.net
Content-Type: multipart/mixed; boundary="----=_Part_3946_1099248058.1688752298149"
--0c916c9b5fe3c925d7bafeb988bb6794
Content-Type: text/plain; charset="UTF-8"
Content-Transfer-Encoding: quoted-printable
notification test
--0c916c9b5fe3c925d7bafeb988bb6794
Content-Type: text/html; charset="UTF-8"
Content-Transfer-Encoding: quoted-printable
<html><head><meta http-equiv=3D"Content-Type" content=3D"text/html; charset=
=3DUTF-8"><meta http-equiv=3D"X-UA-Compatible" content=3D"IE=3Dedge"><meta =
name=3D"format-detection" content=3D"telephone=3Dno"><meta name=3D"viewport=
" content=3D"width=3Ddevice-width, initial-scale=3D1.0">
--0c916c9b5fe3c925d7bafeb988bb6794--

View File

@ -0,0 +1,27 @@
From: {{sender_address}}
To: {{recipient_address}}
Subject: Test subject
Content-Type: multipart/alternative; boundary="MLF8fvg556fdhFDH7=_?:
--MLF8fvg556fdhFDH7=_?:
Content-Type: text/plain;
charset="utf-8"
Content-Transfer-Encoding: quoted-printable
*************************************************************************
This five-part limited series, based on the brilliant graphic novel by Me
--MLF8fvg556fdhFDH7=_?:
Content-Type: text/html;
charset="utf-8"
Content-Transfer-Encoding: 8bit
--MLF8fvg556fdhFDH7=_?:
Content-Type: text/plain;
charset="utf-8"
Content-Transfer-Encoding: quoted-printable
*************************************************************************
*************************************************************************

View File

@ -0,0 +1,65 @@
Received: by mail-ed1-f49.google.com with SMTP id ej4so13657316edb.7
for <gmail@simplemail.fplante.fr>; Mon, 27 Jun 2022 08:48:15 -0700 (PDT)
X-Gm-Message-State: AJIora8exR9DGeRFoKAtjzwLtUpH5hqx6Zt3tm8n4gUQQivGQ3fELjUV
yT7RQIfeW9Kv2atuOcgtmGYVU4iQ8VBeLmK1xvOYL4XpXfrT7ZrJNQ==
Authentication-Results: mx.google.com;
dkim=pass header.i=@matera.eu header.s=fnt header.b=XahYMey7;
dkim=pass header.i=@sendgrid.info header.s=smtpapi header.b="QOCS/yjt";
spf=pass (google.com: domain of bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu designates 168.245.4.42 as permitted sender) smtp.mailfrom="bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu";
dmarc=pass (p=NONE sp=NONE dis=NONE) header.from=matera.eu
Received: from out.frontapp.com (unknown)
by geopod-ismtpd-3-0 (SG)
with ESMTP id d2gM2N7PT7W8d2-UEC4ESA
for <csyndic.quartz@gmail.com>;
Mon, 27 Jun 2022 15:48:11.014 +0000 (UTC)
Content-Type: multipart/alternative;
boundary="----sinikael-?=_1-16563448907660.10629093370416887"
In-Reply-To:
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
References:
<imported@frontapp.com_t:AssembléeGénérale2022-06-25T16:32:03+02:006b3cdade-982b-47cd-8114-6a037dfb7d60>
<imported@frontapp.com_f924cce139940c9935621f067d46443597394f34>
<imported@frontapp.com_t:Appeldefonds2022-06-26T10:04:55+02:00d89f5e23-6d98-4f01-95fa-b7c7544b7aa9>
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
<af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
From: {{ sender_address }}
To: {{ recipient_address }}
CC: {{ cc_address }}
Subject: Something
Message-ID: <af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
X-Mailer: Front (1.0; +https://frontapp.com;
+msgid=af07e94a66ece6564ae30a2aaac7a34c@frontapp.com)
X-Feedback-ID: 14445963:SG
X-SG-EID:
=?us-ascii?Q?XtlxQDg5i3HqMzQY2Upg19JPZBVl1RybInUUL2yta9uBoIU4KU1FMJ5DjWrz6g?=
=?us-ascii?Q?fJUK5Qmneg2uc46gwp5BdHdp6Foaq5gg3xJriv3?=
=?us-ascii?Q?9OA=2FWRifeylU9O+ngdNbOKXoeJAkROmp2mCgw9x?=
=?us-ascii?Q?uud+EclOT9mYVtbZsydOLLm6Y2PPswQl8lnmiku?=
=?us-ascii?Q?DAhkG15HTz2FbWGWNDFb7VrSsN5ddjAscr6sIHw?=
=?us-ascii?Q?S48R5fnXmfhPbmlCgqFjr0FGphfuBdNAt6z6w8a?=
=?us-ascii?Q?o9u1EYDIX7zWHZ+Tr3eyw=3D=3D?=
X-SG-ID:
=?us-ascii?Q?N2C25iY2uzGMFz6rgvQsb8raWjw0ZPf1VmjsCkspi=2FI9PhcvqXQTpKqqyZkvBe?=
=?us-ascii?Q?+2RscnQ4WPkA+BN1vYgz1rezTVIqgp+rlWrKk8o?=
=?us-ascii?Q?HoB5dzpX6HKWtWCVRi10zwlDN1+pJnySoIUrlaT?=
=?us-ascii?Q?PA2aqQKmMQbjTl0CUAFryR8hhHcxdS0cQowZSd7?=
=?us-ascii?Q?XNjJWLvCGF7ODwg=2FKr+4yRE8UvULS2nrdO2wWyQ?=
=?us-ascii?Q?AiFHdPdZsRlgNomEo=3D?=
X-Spamd-Result: default: False [-2.00 / 13.00];
ARC_ALLOW(-1.00)[google.com:s=arc-20160816:i=1];
MIME_GOOD(-0.10)[multipart/alternative,text/plain];
REPLYTO_ADDR_EQ_FROM(0.00)[];
FORGED_RECIPIENTS_FORWARDING(0.00)[];
NEURAL_HAM(-0.00)[-0.981];
FREEMAIL_TO(0.00)[gmail.com];
RCVD_TLS_LAST(0.00)[];
FREEMAIL_ENVFROM(0.00)[gmail.com];
MIME_TRACE(0.00)[0:+,1:+,2:~];
RWL_MAILSPIKE_POSSIBLE(0.00)[209.85.208.49:from]
------sinikael-?=_1-16563448907660.10629093370416887
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: quoted-printable
From {{ sender_address }} To {{ recipient_address }}
------sinikael-?=_1-16563448907660.10629093370416887--

View File

@ -0,0 +1,33 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.config import get_abs_path
from app.db import Session
from app.pgp_utils import load_public_key
from tests.utils import create_new_user, load_eml_file, random_email
from app.models import Alias
def test_encrypt_with_pgp():
user = create_new_user()
pgp_public_key = open(get_abs_path("local_data/public-pgp.asc")).read()
mailbox = user.default_mailbox
mailbox.pgp_public_key = pgp_public_key
mailbox.generic_subject = True
mailbox.pgp_finger_print = load_public_key(pgp_public_key)
alias = Alias.create_new_random(user)
Session.flush()
sender_address = random_email()
msg = load_eml_file(
"email_to_pgp_encrypt.eml",
{
"sender_address": sender_address,
"recipient_address": alias.email,
},
)
envelope = Envelope()
envelope.mail_from = sender_address
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result is not None

View File

@ -0,0 +1,74 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.db import Session
from app.email import headers, status
from app.mail_sender import mail_sender
from app.models import Alias
from app.utils import random_string
from tests.utils import create_new_user, load_eml_file, random_email
@mail_sender.store_emails_test_decorator
def test_original_headers_from_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": original_sender_address,
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert request.msg[headers.SL_ORIGINAL_FROM] == original_sender_address
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)
@mail_sender.store_emails_test_decorator
def test_original_headers_from_with_name_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
name = random_string(10)
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": f"{name} <{original_sender_address}>",
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert (
request.msg[headers.SL_ORIGINAL_FROM] == f"{name} <{original_sender_address}>"
)
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)

View File

@ -1,7 +1,9 @@
import arrow
from app import config
from app.db import Session
from app.models import User, Job
from tests.utils import random_email
from app.models import User, Job, PartnerSubscription, PartnerUser, ManualSubscription
from app.proton.utils import get_proton_partner
from tests.utils import random_email, random_token
def test_create_from_partner(flask_client):
@ -11,6 +13,7 @@ def test_create_from_partner(flask_client):
)
assert user.notification is False
assert user.trial_end is None
assert user.newsletter_alias_id is None
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_SEND_PROTON_WELCOME_1
@ -23,3 +26,23 @@ def test_user_created_by_partner(flask_client):
regular_user = User.create(email=random_email())
assert regular_user.created_by_partner is False
def test_user_is_premium(flask_client):
user = User.create(email=random_email(), from_partner=True)
assert not user.is_premium()
partner_user = PartnerUser.create(
user_id=user.id,
partner_id=get_proton_partner().id,
partner_email=user.email,
external_user_id=random_token(),
flush=True,
)
ps = PartnerSubscription.create(
partner_user_id=partner_user.id, end_at=arrow.now().shift(years=1), flush=True
)
assert user.is_premium()
assert not user.is_premium(include_partner_subscription=False)
ManualSubscription.create(user_id=user.id, end_at=ps.end_at)
assert user.is_premium()
assert user.is_premium(include_partner_subscription=False)

View File

View File

@ -0,0 +1,350 @@
from monitor.upcloud import get_metric, get_metrics
from monitor.metric import UpcloudMetrics, UpcloudMetric, UpcloudRecord
import json
MOCK_RESPONSE = """
{
"cpu_usage": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 2.744682398273781, 3.054323473090861],
["2022-01-21T13:11:00Z", 3.0735645433218366, 2.972423595745795],
["2022-01-21T13:11:30Z", 2.61619694060839, 3.1358378052207883],
["2022-01-21T13:12:00Z", 3.275132296130991, 4.196249043309251]
]
},
"hints": { "title": "CPU usage %" }
},
"disk_usage": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 5.654416415900109, 5.58959125727556],
["2022-01-21T13:11:00Z", 5.654416415900109, 5.58959125727556],
["2022-01-21T13:11:30Z", 5.654416415900109, 5.58959125727556]
]
},
"hints": { "title": "Disk space usage %" }
},
"diskio_reads": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 0, 0],
["2022-01-21T13:11:00Z", 0, 0],
["2022-01-21T13:11:30Z", 0, 0]
]
},
"hints": { "title": "Disk iops (reads)" }
},
"diskio_writes": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 3, 2],
["2022-01-21T13:11:00Z", 2, 3],
["2022-01-21T13:11:30Z", 4, 3]
]
},
"hints": { "title": "Disk iops (writes)" }
},
"load_average": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 0.11, 0.11],
["2022-01-21T13:11:00Z", 0.14, 0.1],
["2022-01-21T13:11:30Z", 0.14, 0.09]
]
},
"hints": { "title": "Load average (5 min)" }
},
"mem_usage": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 11.491766148261078, 12.318932883261219],
["2022-01-21T13:11:00Z", 11.511967645759277, 12.304403727425075],
["2022-01-21T13:11:30Z", 11.488581675749048, 12.272260458006759]
]
},
"hints": { "title": "Memory usage %" }
},
"net_receive": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 442, 470],
["2022-01-21T13:11:00Z", 439, 384],
["2022-01-21T13:11:30Z", 466, 458]
]
},
"hints": { "title": "Network receive (bytes/s)" }
},
"net_send": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 672, 581],
["2022-01-21T13:11:00Z", 660, 555],
["2022-01-21T13:11:30Z", 694, 573]
]
},
"hints": { "title": "Network transmit (bytes/s)" }
}
}
"""
def test_get_metrics():
response = json.loads(MOCK_RESPONSE)
metrics = get_metrics(response)
assert metrics == UpcloudMetrics(
metrics=[
UpcloudMetric(
metric_name="cpu_usage",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:12:00Z",
value=3.275132296130991,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:12:00Z",
value=4.196249043309251,
),
],
),
UpcloudMetric(
metric_name="disk_usage",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=5.654416415900109,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=5.58959125727556,
),
],
),
UpcloudMetric(
metric_name="diskio_reads",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=0,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=0,
),
],
),
UpcloudMetric(
metric_name="diskio_writes",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=4,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=3,
),
],
),
UpcloudMetric(
metric_name="load_average",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=0.14,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=0.09,
),
],
),
UpcloudMetric(
metric_name="mem_usage",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=11.488581675749048,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=12.272260458006759,
),
],
),
UpcloudMetric(
metric_name="net_receive",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=466,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=458,
),
],
),
UpcloudMetric(
metric_name="net_send",
records=[
UpcloudRecord(
db_role="master",
label="test-1 " "(master)",
time="2022-01-21T13:11:30Z",
value=694,
),
UpcloudRecord(
db_role="standby",
label="test-2 " "(standby)",
time="2022-01-21T13:11:30Z",
value=573,
),
],
),
]
)
def test_get_metric():
response = json.loads(MOCK_RESPONSE)
metric_name = "cpu_usage"
metric = get_metric(response, metric_name)
assert metric.metric_name == metric_name
assert len(metric.records) == 2
assert metric.records[0].label == "test-1 (master)"
assert metric.records[0].time == "2022-01-21T13:12:00Z"
assert metric.records[0].value == 3.275132296130991
assert metric.records[1].label == "test-2 (standby)"
assert metric.records[1].time == "2022-01-21T13:12:00Z"
assert metric.records[1].value == 4.196249043309251
def test_get_metric_with_none_value():
response_str = """
{
"cpu_usage": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 2.744682398273781, 3.054323473090861],
["2022-01-21T13:11:00Z", 3.0735645433218366, 2.972423595745795],
["2022-01-21T13:11:30Z", null, 3.1358378052207883],
["2022-01-21T13:12:00Z", 3.275132296130991, null]
]
},
"hints": { "title": "CPU usage %" }
}
}
"""
response = json.loads(response_str)
metric = get_metric(response, "cpu_usage")
assert metric.records[0].label == "test-1 (master)"
assert metric.records[0].value == 3.275132296130991
assert metric.records[1].label == "test-2 (standby)"
assert metric.records[1].value == 3.1358378052207883
def test_get_metric_with_none_value_in_last_two_positions():
response_str = """
{
"cpu_usage": {
"data": {
"cols": [
{ "label": "time", "type": "date" },
{ "label": "test-1 (master)", "type": "number" },
{ "label": "test-2 (standby)", "type": "number" }
],
"rows": [
["2022-01-21T13:10:30Z", 2.744682398273781, 3.054323473090861],
["2022-01-21T13:11:00Z", 3.0735645433218366, 2.972423595745795],
["2022-01-21T13:11:30Z", null, null],
["2022-01-21T13:12:00Z", 3.275132296130991, null]
]
},
"hints": { "title": "CPU usage %" }
}
}
"""
response = json.loads(response_str)
metric = get_metric(response, "cpu_usage")
assert len(metric.records) == 1
assert metric.records[0].label == "test-1 (master)"
assert metric.records[0].value == 3.275132296130991

View File

@ -0,0 +1,152 @@
import re
from app.alias_suffix import get_alias_suffixes
from app.db import Session
from app.models import SLDomain, PartnerUser, AliasOptions, CustomDomain
from app.proton.utils import get_proton_partner
from init_app import add_sl_domains
from tests.utils import create_new_user, random_token
def setup_module():
Session.query(SLDomain).delete()
SLDomain.create(
domain="hidden", premium_only=False, flush=True, order=5, hidden=True
)
SLDomain.create(domain="free_non_partner", premium_only=False, flush=True, order=4)
SLDomain.create(
domain="premium_non_partner", premium_only=True, flush=True, order=3
)
SLDomain.create(
domain="free_partner",
premium_only=False,
flush=True,
partner_id=get_proton_partner().id,
order=2,
)
SLDomain.create(
domain="premium_partner",
premium_only=True,
flush=True,
partner_id=get_proton_partner().id,
order=1,
)
Session.commit()
def teardown_module():
Session.query(SLDomain).delete()
add_sl_domains()
def test_get_default_domain_even_if_is_not_allowed():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(
hidden=False, partner_id=None, premium_only=False
).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
assert suffixes[0].domain == default_domain.domain
def test_get_default_domain_hidden():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(
hidden=True, partner_id=None, premium_only=False
).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
for suffix in suffixes:
domain = SLDomain.get_by(domain=suffix.domain)
assert not domain.hidden
assert suffixes[0].domain != default_domain.domain
def test_get_default_domain_is_premium_for_free_user():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(partner_id=None, premium_only=True).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
for suffix in suffixes:
domain = SLDomain.get_by(domain=suffix.domain)
assert not domain.premium_only
assert suffixes[0].domain != default_domain.domain
def test_suffixes_are_valid():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
CustomDomain.create(
user_id=user.id, domain=f"{random_token(10)}.com", verified=True
)
user.trial_end = None
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
alias_suffixes = get_alias_suffixes(user, alias_options=options)
valid_re = re.compile(r"^(\.[\w_]+)?@[\.\w]+$")
has_prefix = 0
for suffix in alias_suffixes:
match = valid_re.match(suffix.suffix)
assert match is not None
if len(match.groups()) >= 1:
has_prefix += 1
assert has_prefix > 0
def test_get_default_domain_is_only_shown_once():
user = create_new_user()
default_domain = SLDomain.filter_by(hidden=False).order_by(SLDomain.order).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
found_default = False
found_domains = set()
for suffix in suffixes:
assert suffix.domain not in found_domains
found_domains.add(suffix.domain)
if default_domain.domain == suffix.domain:
found_default = True
assert found_default

View File

@ -16,6 +16,7 @@ from app.models import (
Directory,
DirectoryMailbox,
User,
DomainDeletedAlias,
)
from tests.utils import create_new_user, random_domain, random_token
@ -83,6 +84,11 @@ def get_auto_create_alias_tests(user: User) -> List:
regex="ok-.*",
flush=True,
)
deleted_alias = f"deletedalias@{catchall.domain}"
Session.add(
DomainDeletedAlias(email=deleted_alias, domain_id=catchall.id, user_id=user.id)
)
Session.flush()
dir_name = random_token()
directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
DirectoryMailbox.create(
@ -101,6 +107,7 @@ def get_auto_create_alias_tests(user: User) -> List:
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
(deleted_alias, False),
]

View File

@ -128,3 +128,74 @@ def test_get_premium_with_partner_domains():
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_partner_and_free_default_domain():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 3
assert domains[0].domain == "premium_partner"
assert domains[1].domain == "free_partner"
assert domains[2].domain == "free_non_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_premium_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False, premium_only=True).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_hidden_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = SLDomain.filter_by(hidden=True).first().id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)

View File

@ -19,10 +19,8 @@ from app.email_utils import (
copy,
get_spam_from_header,
get_header_from_bounce,
is_valid_email,
add_header,
generate_reply_email,
normalize_reply_email,
get_encoding,
encode_text,
EmailEncoding,
@ -41,6 +39,7 @@ from app.email_utils import (
get_verp_info_from_email,
sl_formataddr,
)
from app.email_validation import is_valid_email, normalize_reply_email
from app.models import (
CustomDomain,
Alias,
@ -810,7 +809,7 @@ def test_add_header_multipart_with_invalid_part():
if i < 2:
assert part.get_payload().index("INJECT") > -1
else:
assert part == "invalid"
assert part.get_payload() == "invalid"
def test_sl_formataddr():
@ -822,3 +821,10 @@ def test_sl_formataddr():
# test that the same name-address can't be handled by the built-in formataddr
with pytest.raises(UnicodeEncodeError):
formataddr(("é", "è@ç.à"))
def test_add_header_to_invalid_multipart():
msg = load_eml_file("add_header_multipart.eml")
msg = add_header(msg, "test", "test")
data = msg.as_string()
assert data != ""

View File

@ -78,20 +78,20 @@ def test_website_send_to(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="First Last",
)
assert c1.website_send_to() == f'"First Last | {prefix} at example.com" <rep@SL>'
assert c1.website_send_to() == f'"First Last | {prefix} at example.com" <rep@sl>'
# empty name, ascii website_from, easy case
c1.name = None
c1.website_from = f"First Last <{prefix}@example.com>"
assert c1.website_send_to() == f'"First Last | {prefix} at example.com" <rep@SL>'
assert c1.website_send_to() == f'"First Last | {prefix} at example.com" <rep@sl>'
# empty name, RFC 2047 website_from
c1.name = None
c1.website_from = f"=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <{prefix}@example.com>"
assert c1.website_send_to() == f'"Nhơn Nguyễn | {prefix} at example.com" <rep@SL>'
assert c1.website_send_to() == f'"Nhơn Nguyễn | {prefix} at example.com" <rep@sl>'
def test_new_addr_default_sender_format(flask_client):
@ -103,16 +103,16 @@ def test_new_addr_default_sender_format(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="First Last",
commit=True,
)
assert contact.new_addr() == f'"First Last - {prefix} at example.com" <rep@SL>'
assert contact.new_addr() == f'"First Last - {prefix} at example.com" <rep@sl>'
# Make sure email isn't duplicated if sender name equals email
contact.name = f"{prefix}@example.com"
assert contact.new_addr() == f'"{prefix} at example.com" <rep@SL>'
assert contact.new_addr() == f'"{prefix} at example.com" <rep@sl>'
def test_new_addr_a_sender_format(flask_client):
@ -126,12 +126,12 @@ def test_new_addr_a_sender_format(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="First Last",
commit=True,
)
assert contact.new_addr() == f'"First Last - {prefix}(a)example.com" <rep@SL>'
assert contact.new_addr() == f'"First Last - {prefix}(a)example.com" <rep@sl>'
def test_new_addr_no_name_sender_format(flask_client):
@ -145,12 +145,12 @@ def test_new_addr_no_name_sender_format(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="First Last",
commit=True,
)
assert contact.new_addr() == "rep@SL"
assert contact.new_addr() == "rep@sl"
def test_new_addr_name_only_sender_format(flask_client):
@ -164,12 +164,12 @@ def test_new_addr_name_only_sender_format(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="First Last",
commit=True,
)
assert contact.new_addr() == "First Last <rep@SL>"
assert contact.new_addr() == "First Last <rep@sl>"
def test_new_addr_at_only_sender_format(flask_client):
@ -183,12 +183,12 @@ def test_new_addr_at_only_sender_format(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="First Last",
commit=True,
)
assert contact.new_addr() == f'"{prefix} at example.com" <rep@SL>'
assert contact.new_addr() == f'"{prefix} at example.com" <rep@sl>'
def test_new_addr_unicode(flask_client):
@ -200,14 +200,14 @@ def test_new_addr_unicode(flask_client):
user_id=user.id,
alias_id=alias.id,
website_email=f"{random_prefix}@example.com",
reply_email="rep@SL",
reply_email="rep@sl",
name="Nhơn Nguyễn",
commit=True,
)
assert (
contact.new_addr()
== f"=?utf-8?q?Nh=C6=A1n_Nguy=E1=BB=85n_-_{random_prefix}_at_example=2Ecom?= <rep@SL>"
== f"=?utf-8?q?Nh=C6=A1n_Nguy=E1=BB=85n_-_{random_prefix}_at_example=2Ecom?= <rep@sl>"
)
# sanity check

View File

@ -0,0 +1,113 @@
import http.server
import json
import threading
import arrow
from app import config
from app.models import (
Subscription,
AppleSubscription,
CoinbaseSubscription,
ManualSubscription,
)
from tests.utils import create_new_user, random_token
from app.subscription_webhook import execute_subscription_webhook
http_server = None
last_http_request = None
def setup_module():
global http_server
http_server = http.server.ThreadingHTTPServer(("", 0), HTTPTestServer)
print(http_server.server_port)
threading.Thread(target=http_server.serve_forever, daemon=True).start()
config.SUBSCRIPTION_CHANGE_WEBHOOK = f"http://localhost:{http_server.server_port}"
def teardown_module():
global http_server
config.SUBSCRIPTION_CHANGE_WEBHOOK = None
http_server.shutdown()
class HTTPTestServer(http.server.BaseHTTPRequestHandler):
def do_POST(self):
global last_http_request
content_len = int(self.headers.get("Content-Length"))
body_data = self.rfile.read(content_len)
last_http_request = json.loads(body_data)
self.send_response(200)
def test_webhook_with_trial():
user = create_new_user()
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] is None
def test_webhook_with_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_token(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_apple_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_token(10),
plan="yearly",
product_id="",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_coinbase_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_manual_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
ManualSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp