Compare commits

...

7 Commits

Author SHA1 Message Date
1258115397 4.32.0 2023-07-11 11:00:05 +00:00
38c134d903 4.31.0 2023-06-30 11:00:06 +00:00
cd77e4cc2d 4.30.1 2023-06-28 11:00:03 +00:00
87aedf3207 4.30.0 2023-06-27 11:00:04 +00:00
3523c9fc15 4.29.4 2023-06-07 11:00:05 +00:00
a6f4995cb5 4.29.3 2023-06-01 11:00:05 +00:00
727f61a35e 4.28.2 2023-05-16 11:00:09 +00:00
35 changed files with 910 additions and 139 deletions

View File

@ -169,6 +169,12 @@ For HTML templates, we use `djlint`. Before creating a pull request, please run
poetry run djlint --check templates
```
If some files aren't properly formatted, you can format all files with
```bash
poetry run djlint --reformat .
```
## Test sending email
[swaks](http://www.jetmore.org/john/code/swaks/) is used for sending test emails to the `email_handler`.

View File

@ -23,10 +23,10 @@ COPY poetry.lock pyproject.toml ./
# Install and setup poetry
RUN pip install -U pip \
&& apt-get update \
&& apt install -y curl netcat gcc python3-dev gnupg git libre2-dev \
&& apt install -y curl netcat-traditional gcc python3-dev gnupg git libre2-dev \
&& curl -sSL https://install.python-poetry.org | python3 - \
# Remove curl and netcat from the image
&& apt-get purge -y curl netcat \
&& apt-get purge -y curl netcat-traditional \
# Run poetry
&& poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --no-root \

View File

@ -6,7 +6,7 @@ from typing import Optional
import itsdangerous
from app import config
from app.log import LOG
from app.models import User, AliasOptions
from app.models import User, AliasOptions, SLDomain
signer = itsdangerous.TimestampSigner(config.CUSTOM_ALIAS_SECRET)
@ -105,10 +105,7 @@ def get_alias_suffixes(
for custom_domain in user_custom_domains:
if custom_domain.random_prefix_generation:
suffix = (
"."
+ user.get_random_alias_suffix(custom_domain)
+ "@"
+ custom_domain.domain
f".{user.get_random_alias_suffix(custom_domain)}@{custom_domain.domain}"
)
alias_suffix = AliasSuffix(
is_custom=True,
@ -123,7 +120,7 @@ def get_alias_suffixes(
else:
alias_suffixes.append(alias_suffix)
suffix = "@" + custom_domain.domain
suffix = f"@{custom_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=True,
suffix=suffix,
@ -144,16 +141,13 @@ def get_alias_suffixes(
alias_suffixes.append(alias_suffix)
# then SimpleLogin domain
for sl_domain in user.get_sl_domains(alias_options=alias_options):
suffix = (
(
""
if config.DISABLE_ALIAS_SUFFIX
else "." + user.get_random_alias_suffix()
)
+ "@"
+ sl_domain.domain
sl_domains = user.get_sl_domains(alias_options=alias_options)
default_domain_found = False
for sl_domain in sl_domains:
prefix = (
"" if config.DISABLE_ALIAS_SUFFIX else f".{user.get_random_alias_suffix()}"
)
suffix = f"{prefix}@{sl_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=False,
suffix=suffix,
@ -162,11 +156,36 @@ def get_alias_suffixes(
domain=sl_domain.domain,
mx_verified=True,
)
# put the default domain to top
if user.default_alias_public_domain_id == sl_domain.id:
alias_suffixes.insert(0, alias_suffix)
else:
# No default or this is not the default
if (
user.default_alias_public_domain_id is None
or user.default_alias_public_domain_id != sl_domain.id
):
alias_suffixes.append(alias_suffix)
else:
default_domain_found = True
alias_suffixes.insert(0, alias_suffix)
if not default_domain_found:
domain_conditions = {"id": user.default_alias_public_domain_id, "hidden": False}
if not user.is_premium():
domain_conditions["premium_only"] = False
sl_domain = SLDomain.get_by(**domain_conditions)
if sl_domain:
prefix = (
""
if config.DISABLE_ALIAS_SUFFIX
else f".{user.get_random_alias_suffix()}"
)
suffix = f"{prefix}@{sl_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=False,
suffix=suffix,
signed_suffix=signer.sign(suffix).decode(),
is_premium=sl_domain.premium_only,
domain=sl_domain.domain,
mx_verified=True,
)
alias_suffixes.insert(0, alias_suffix)
return alias_suffixes

View File

@ -57,6 +57,8 @@ def get_user_if_alias_would_auto_create(
domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user
)
if DomainDeletedAlias.get_by(email=address):
return None
if domain_and_rule:
return domain_and_rule[0].user
directory = check_if_alias_can_be_auto_created_for_a_directory(

View File

@ -9,6 +9,7 @@ from requests import RequestException
from app.api.base import api_bp, require_api_auth
from app.config import APPLE_API_SECRET, MACAPP_APPLE_API_SECRET
from app.subscription_webhook import execute_subscription_webhook
from app.db import Session
from app.log import LOG
from app.models import PlanEnum, AppleSubscription
@ -50,6 +51,7 @@ def apple_process_payment():
apple_sub = verify_receipt(receipt_data, user, password)
if apple_sub:
execute_subscription_webhook(user)
return jsonify(ok=True), 200
return jsonify(error="Processing failed"), 400
@ -282,6 +284,7 @@ def apple_update_notification():
apple_sub.plan = plan
apple_sub.product_id = transaction["product_id"]
Session.commit()
execute_subscription_webhook(user)
return jsonify(ok=True), 200
else:
LOG.w(
@ -554,6 +557,7 @@ def verify_receipt(receipt_data, user, password) -> Optional[AppleSubscription]:
product_id=latest_transaction["product_id"],
)
execute_subscription_webhook(user)
Session.commit()
return apple_sub

View File

@ -1,4 +1,4 @@
from flask import request, render_template, redirect, url_for, flash, g
from flask import request, render_template, flash, g
from flask_wtf import FlaskForm
from wtforms import StringField, validators
@ -16,7 +16,7 @@ class ForgotPasswordForm(FlaskForm):
@auth_bp.route("/forgot_password", methods=["GET", "POST"])
@limiter.limit(
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
"10/hour", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
)
def forgot_password():
form = ForgotPasswordForm(request.form)
@ -37,6 +37,5 @@ def forgot_password():
if user:
LOG.d("Send forgot password email to %s", user)
send_reset_password_email(user)
return redirect(url_for("auth.forgot_password"))
return render_template("auth/forgot_password.html", form=form)

View File

@ -60,8 +60,8 @@ def reset_password():
# this can be served to activate user too
user.activated = True
# remove the reset password code
ResetPasswordCode.delete(reset_password_code.id)
# remove all reset password codes
ResetPasswordCode.filter_by(user_id=user.id).delete()
# change the alternative_id to log user out on other browsers
user.alternative_id = str(uuid.uuid4())

View File

@ -532,3 +532,6 @@ if ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT:
SKIP_MX_LOOKUP_ON_CHECK = False
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)
MAX_API_KEYS = int(os.environ.get("MAX_API_KEYS", 30))

View File

@ -3,9 +3,11 @@ from flask_login import login_required, current_user
from flask_wtf import FlaskForm
from wtforms import StringField, validators
from app import config
from app.dashboard.base import dashboard_bp
from app.dashboard.views.enter_sudo import sudo_required
from app.db import Session
from app.extensions import limiter
from app.models import ApiKey
from app.utils import CSRFValidationForm
@ -14,9 +16,32 @@ class NewApiKeyForm(FlaskForm):
name = StringField("Name", validators=[validators.DataRequired()])
def clean_up_unused_or_old_api_keys(user_id: int):
total_keys = ApiKey.filter_by(user_id=user_id).count()
# Remove oldest unused
for api_key in (
ApiKey.filter_by(user_id=user_id, last_used=None)
.order_by(ApiKey.created_at.asc())
.all()
):
Session.delete(api_key)
total_keys -= 1
if total_keys <= config.MAX_API_KEYS:
return
# Clean up oldest used
for api_key in (
ApiKey.filter_by(user_id=user_id).order_by(ApiKey.last_used.asc()).all()
):
Session.delete(api_key)
total_keys -= 1
if total_keys <= config.MAX_API_KEYS:
return
@dashboard_bp.route("/api_key", methods=["GET", "POST"])
@login_required
@sudo_required
@limiter.limit("10/hour")
def api_key():
api_keys = (
ApiKey.filter(ApiKey.user_id == current_user.id)
@ -50,6 +75,7 @@ def api_key():
elif request.form.get("form-name") == "create":
if new_api_key_form.validate():
clean_up_unused_or_old_api_keys(current_user.id)
new_api_key = ApiKey.create(
name=new_api_key_form.name.data, user_id=current_user.id
)

View File

@ -1,3 +1,7 @@
import base64
import binascii
import json
import arrow
from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
@ -180,7 +184,9 @@ def mailbox_route():
def send_verification_email(user, mailbox):
s = TimestampSigner(MAILBOX_SECRET)
mailbox_id_signed = s.sign(str(mailbox.id)).decode()
encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8")
b64_data = base64.urlsafe_b64encode(encoded_data)
mailbox_id_signed = s.sign(b64_data).decode()
verification_url = (
URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}"
)
@ -205,22 +211,34 @@ def send_verification_email(user, mailbox):
@dashboard_bp.route("/mailbox_verify")
def mailbox_verify():
s = TimestampSigner(MAILBOX_SECRET)
mailbox_id = request.args.get("mailbox_id")
mailbox_verify_request = request.args.get("mailbox_id")
try:
r_id = int(s.unsign(mailbox_id, max_age=900))
mailbox_raw_data = s.unsign(mailbox_verify_request, max_age=900)
except Exception:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
else:
mailbox = Mailbox.get(r_id)
if not mailbox:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
try:
decoded_data = base64.urlsafe_b64decode(mailbox_raw_data)
except binascii.Error:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_data = json.loads(decoded_data)
if not isinstance(mailbox_data, list) or len(mailbox_data) != 2:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_id = mailbox_data[0]
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_email = mailbox_data[1]
if mailbox_email != mailbox.email:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox.verified = True
Session.commit()
mailbox.verified = True
Session.commit()
LOG.d("Mailbox %s is verified", mailbox)
LOG.d("Mailbox %s is verified", mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)

View File

@ -198,6 +198,16 @@ def setting():
)
return redirect(url_for("dashboard.setting"))
if current_user.profile_picture_id is not None:
current_profile_file = File.get_by(
id=current_user.profile_picture_id
)
if (
current_profile_file is not None
and current_profile_file.user_id == current_user.id
):
s3.delete(current_profile_file.path)
file_path = random_string(30)
file = File.create(user_id=current_user.id, path=file_path)
@ -451,8 +461,13 @@ def send_change_email_confirmation(user: User, email_change: EmailChange):
@dashboard_bp.route("/resend_email_change", methods=["GET", "POST"])
@limiter.limit("5/hour")
@login_required
def resend_email_change():
form = CSRFValidationForm()
if not form.validate():
flash("Invalid request. Please try again", "warning")
return redirect(url_for("dashboard.setting"))
email_change = EmailChange.get_by(user_id=current_user.id)
if email_change:
# extend email change expiration
@ -472,6 +487,10 @@ def resend_email_change():
@dashboard_bp.route("/cancel_email_change", methods=["GET", "POST"])
@login_required
def cancel_email_change():
form = CSRFValidationForm()
if not form.validate():
flash("Invalid request. Please try again", "warning")
return redirect(url_for("dashboard.setting"))
email_change = EmailChange.get_by(user_id=current_user.id)
if email_change:
EmailChange.delete(email_change.id)

View File

@ -20,6 +20,7 @@ X_SPAM_STATUS = "X-Spam-Status"
LIST_UNSUBSCRIBE = "List-Unsubscribe"
LIST_UNSUBSCRIBE_POST = "List-Unsubscribe-Post"
RETURN_PATH = "Return-Path"
AUTHENTICATION_RESULTS = "Authentication-Results"
# headers used to DKIM sign in order of preference
DKIM_HEADERS = [
@ -32,6 +33,7 @@ DKIM_HEADERS = [
SL_DIRECTION = "X-SimpleLogin-Type"
SL_EMAIL_LOG_ID = "X-SimpleLogin-EmailLog-ID"
SL_ENVELOPE_FROM = "X-SimpleLogin-Envelope-From"
SL_ORIGINAL_FROM = "X-SimpleLogin-Original-From"
SL_ENVELOPE_TO = "X-SimpleLogin-Envelope-To"
SL_CLIENT_IP = "X-SimpleLogin-Client-IP"

View File

@ -74,8 +74,8 @@ class UnsubscribeEncoder:
)
signed_data = cls._get_signer().sign(serialized_data).decode("utf-8")
encoded_request = f"{UNSUB_PREFIX}.{signed_data}"
if len(encoded_request) > 256:
LOG.e("Encoded request is longer than 256 chars")
if len(encoded_request) > 512:
LOG.w("Encoded request is longer than 512 chars")
return encoded_request
@staticmethod

View File

@ -9,6 +9,7 @@ from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeOriginalData,
)
from app.log import LOG
from app.models import Alias, Contact, UnsubscribeBehaviourEnum
@ -30,6 +31,7 @@ class UnsubscribeGenerator:
"""
unsubscribe_data = message[headers.LIST_UNSUBSCRIBE]
if not unsubscribe_data:
LOG.info("Email has no unsubscribe header")
return message
raw_methods = [method.strip() for method in unsubscribe_data.split(",")]
mailto_unsubs = None
@ -44,7 +46,9 @@ class UnsubscribeGenerator:
if url_data.scheme == "mailto":
query_data = urllib.parse.parse_qs(url_data.query)
mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0])
LOG.debug(f"Unsub is mailto to {mailto_unsubs}")
else:
LOG.debug(f"Unsub has {url_data.scheme} scheme")
other_unsubs.append(method)
# If there are non mailto unsubscribe methods, use those in the header
if other_unsubs:
@ -56,18 +60,19 @@ class UnsubscribeGenerator:
add_or_replace_header(
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
LOG.debug(f"Adding click unsub methods to header {other_unsubs}")
return message
if not mailto_unsubs:
message = delete_header(message, headers.LIST_UNSUBSCRIBE)
message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
elif not mailto_unsubs:
LOG.debug("No unsubs. Deleting all unsub headers")
delete_header(message, headers.LIST_UNSUBSCRIBE)
delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
return message
return self._add_unsubscribe_header(
message,
UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
),
unsub_data = UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
)
LOG.debug(f"Adding unsub data {unsub_data}")
return self._add_unsubscribe_header(message, unsub_data)
def _add_unsubscribe_header(
self, message: Message, unsub: UnsubscribeData

View File

@ -41,7 +41,7 @@ from app.models import (
class ExportUserDataJob:
REMOVE_FIELDS = {
"User": ("otp_secret",),
"User": ("otp_secret", "password"),
"Alias": ("ts_vector", "transfer_token", "hibp_last_check"),
"CustomDomain": ("ownership_txt_token",),
}

View File

@ -32,6 +32,7 @@ class SendRequest:
rcpt_options: Dict = {}
is_forward: bool = False
ignore_smtp_errors: bool = False
retries: int = 0
def to_bytes(self) -> bytes:
if not config.SAVE_UNSENT_DIR:
@ -45,6 +46,7 @@ class SendRequest:
"mail_options": self.mail_options,
"rcpt_options": self.rcpt_options,
"is_forward": self.is_forward,
"retries": self.retries,
}
return json.dumps(data).encode("utf-8")
@ -65,8 +67,33 @@ class SendRequest:
mail_options=decoded_data["mail_options"],
rcpt_options=decoded_data["rcpt_options"],
is_forward=decoded_data["is_forward"],
retries=decoded_data.get("retries", 1),
)
def save_request_to_unsent_dir(self, prefix: str = "DeliveryFail"):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
self.save_request_to_file(file_path)
@staticmethod
def save_request_to_failed_dir(self, prefix: str = "DeliveryRetryFail"):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
dir_name = os.path.join(config.SAVE_UNSENT_DIR, "failed")
if not os.path.isdir(dir_name):
os.makedirs(dir_name)
file_path = os.path.join(dir_name, file_name)
self.save_request_to_file(file_path)
def save_request_to_file(self, file_path: str):
file_contents = self.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
class MailSender:
def __init__(self):
@ -171,21 +198,9 @@ class MailSender:
f"Could not send message to smtp server {config.POSTFIX_SERVER}:{config.POSTFIX_PORT}"
)
if config.SAVE_UNSENT_DIR:
self._save_request_to_unsent_dir(send_request)
send_request.save_request_to_unsent_dir()
return False
def _save_request_to_unsent_dir(
self, send_request: SendRequest, prefix: str = "DeliveryFail"
):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
file_contents = send_request.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
mail_sender = MailSender()
@ -219,6 +234,7 @@ def load_unsent_mails_from_fs_and_resend():
LOG.i(f"Trying to re-deliver email {filename}")
try:
send_request = SendRequest.load_from_file(full_file_path)
send_request.retries += 1
except Exception as e:
LOG.e(f"Cannot load {filename}. Error {e}")
continue
@ -230,6 +246,11 @@ def load_unsent_mails_from_fs_and_resend():
"DeliverUnsentEmail", {"delivered": "true"}
)
else:
if send_request.retries > 2:
os.unlink(full_file_path)
send_request.save_request_to_failed_dir()
else:
send_request.save_request_to_file(full_file_path)
newrelic.agent.record_custom_event(
"DeliverUnsentEmail", {"delivered": "false"}
)

View File

@ -445,7 +445,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
random_alias_suffix = sa.Column(
sa.Integer,
nullable=False,
default=AliasSuffixEnum.random_string.value,
default=AliasSuffixEnum.word.value,
server_default=str(AliasSuffixEnum.random_string.value),
)
@ -514,9 +514,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
server_default=BlockBehaviourEnum.return_2xx.name,
)
# to keep existing behavior, the server default is TRUE whereas for new user, the default value is FALSE
include_header_email_header = sa.Column(
sa.Boolean, default=False, nullable=False, server_default="1"
sa.Boolean, default=True, nullable=False, server_default="1"
)
# bitwise flags. Allow for future expansion
@ -580,19 +579,6 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
Session.flush()
user.default_mailbox_id = mb.id
# create a first alias mail to show user how to use when they login
alias = Alias.create_new(
user,
prefix="simplelogin-newsletter",
mailbox_id=mb.id,
note="This is your first alias. It's used to receive SimpleLogin communications "
"like new features announcements, newsletters.",
)
Session.flush()
user.newsletter_alias_id = alias.id
Session.flush()
# generate an alternative_id if needed
if "alternative_id" not in kwargs:
user.alternative_id = str(uuid.uuid4())
@ -611,6 +597,19 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
Session.flush()
return user
# create a first alias mail to show user how to use when they login
alias = Alias.create_new(
user,
prefix="simplelogin-newsletter",
mailbox_id=mb.id,
note="This is your first alias. It's used to receive SimpleLogin communications "
"like new features announcements, newsletters.",
)
Session.flush()
user.newsletter_alias_id = alias.id
Session.flush()
if config.DISABLE_ONBOARDING:
LOG.d("Disable onboarding emails")
return user
@ -636,7 +635,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return user
def get_active_subscription(
self,
self, include_partner_subscription: bool = True
) -> Optional[
Union[
Subscription
@ -664,19 +663,40 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if coinbase_subscription and coinbase_subscription.is_active():
return coinbase_subscription
partner_sub: PartnerSubscription = PartnerSubscription.find_by_user_id(self.id)
if partner_sub and partner_sub.is_active():
return partner_sub
if include_partner_subscription:
partner_sub: PartnerSubscription = PartnerSubscription.find_by_user_id(
self.id
)
if partner_sub and partner_sub.is_active():
return partner_sub
return None
def get_active_subscription_end(
self, include_partner_subscription: bool = True
) -> Optional[arrow.Arrow]:
sub = self.get_active_subscription(
include_partner_subscription=include_partner_subscription
)
if isinstance(sub, Subscription):
return arrow.get(sub.next_bill_date)
if isinstance(sub, AppleSubscription):
return sub.expires_date
if isinstance(sub, ManualSubscription):
return sub.end_at
if isinstance(sub, CoinbaseSubscription):
return sub.end_at
return None
# region Billing
def lifetime_or_active_subscription(self) -> bool:
def lifetime_or_active_subscription(
self, include_partner_subscription: bool = True
) -> bool:
"""True if user has lifetime licence or active subscription"""
if self.lifetime:
return True
return self.get_active_subscription() is not None
return self.get_active_subscription(include_partner_subscription) is not None
def is_paid(self) -> bool:
"""same as _lifetime_or_active_subscription but not include free manual subscription"""
@ -705,14 +725,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return True
def is_premium(self) -> bool:
def is_premium(self, include_partner_subscription: bool = True) -> bool:
"""
user is premium if they:
- have a lifetime deal or
- in trial period or
- active subscription
"""
if self.lifetime_or_active_subscription():
if self.lifetime_or_active_subscription(include_partner_subscription):
return True
if self.trial_end and arrow.now() < self.trial_end:
@ -995,6 +1015,10 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if not self.is_premium():
conditions.append(SLDomain.premium_only == False) # noqa: E712
partner_domain_cond = [] # noqa:E711
if self.default_alias_public_domain_id is not None:
partner_domain_cond.append(
SLDomain.id == self.default_alias_public_domain_id
)
if alias_options.show_partner_domains is not None:
partner_user = PartnerUser.filter_by(
user_id=self.id, partner_id=alias_options.show_partner_domains.id

View File

@ -0,0 +1,33 @@
import requests
from requests import RequestException
from app import config
from app.log import LOG
from app.models import User
def execute_subscription_webhook(user: User):
webhook_url = config.SUBSCRIPTION_CHANGE_WEBHOOK
if webhook_url is None:
return
subscription_end = user.get_active_subscription_end(
include_partner_subscription=False
)
sl_subscription_end = None
if subscription_end:
sl_subscription_end = subscription_end.timestamp
payload = {
"user_id": user.id,
"is_premium": user.is_premium(),
"active_subscription_end": sl_subscription_end,
}
try:
response = requests.post(webhook_url, json=payload, timeout=2)
if response.status_code == 200:
LOG.i("Sent request to subscription update webhook successfully")
else:
LOG.i(
f"Request to webhook failed with statue {response.status_code}: {response.text}"
)
except RequestException as e:
LOG.error(f"Subscription request exception: {e}")

View File

@ -846,22 +846,23 @@ def forward_email_to_mailbox(
f"""Email sent to {alias.email} from an invalid address and cannot be replied""",
)
delete_all_headers_except(
msg,
[
headers.FROM,
headers.TO,
headers.CC,
headers.SUBJECT,
headers.DATE,
# do not delete original message id
headers.MESSAGE_ID,
# References and In-Reply-To are used for keeping the email thread
headers.REFERENCES,
headers.IN_REPLY_TO,
]
+ headers.MIME_HEADERS,
)
headers_to_keep = [
headers.FROM,
headers.TO,
headers.CC,
headers.SUBJECT,
headers.DATE,
# do not delete original message id
headers.MESSAGE_ID,
# References and In-Reply-To are used for keeping the email thread
headers.REFERENCES,
headers.IN_REPLY_TO,
headers.LIST_UNSUBSCRIBE,
headers.LIST_UNSUBSCRIBE_POST,
] + headers.MIME_HEADERS
if user.include_header_email_header:
headers_to_keep.append(headers.AUTHENTICATION_RESULTS)
delete_all_headers_except(msg, headers_to_keep)
# create PGP email if needed
if mailbox.pgp_enabled() and user.is_premium() and not alias.disable_pgp:
@ -898,6 +899,11 @@ def forward_email_to_mailbox(
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
if user.include_header_email_header:
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
if contact.name:
original_from = f"{contact.name} <{contact.website_email}>"
else:
original_from = contact.website_email
msg[headers.SL_ORIGINAL_FROM] = original_from
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email
msg[headers.SL_ENVELOPE_TO] = alias.email

View File

@ -79,6 +79,7 @@ from app.config import (
MEM_STORE_URI,
)
from app.dashboard.base import dashboard_bp
from app.subscription_webhook import execute_subscription_webhook
from app.db import Session
from app.developer.base import developer_bp
from app.discover.base import discover_bp
@ -491,6 +492,7 @@ def setup_paddle_callback(app: Flask):
# in case user cancels a plan and subscribes a new plan
sub.cancelled = False
execute_subscription_webhook(user)
LOG.d("User %s upgrades!", user)
Session.commit()
@ -509,6 +511,7 @@ def setup_paddle_callback(app: Flask):
).date()
Session.commit()
execute_subscription_webhook(sub.user)
elif request.form.get("alert_name") == "subscription_cancelled":
subscription_id = request.form.get("subscription_id")
@ -538,6 +541,7 @@ def setup_paddle_callback(app: Flask):
end_date=request.form.get("cancellation_effective_date"),
),
)
execute_subscription_webhook(sub.user)
else:
# user might have deleted their account
@ -580,6 +584,7 @@ def setup_paddle_callback(app: Flask):
sub.cancelled = False
Session.commit()
execute_subscription_webhook(sub.user)
else:
LOG.w(
f"update non-exist subscription {subscription_id}. {request.form}"
@ -596,6 +601,7 @@ def setup_paddle_callback(app: Flask):
Subscription.delete(sub.id)
Session.commit()
LOG.e("%s requests a refund", user)
execute_subscription_webhook(sub.user)
elif request.form.get("alert_name") == "subscription_payment_refunded":
subscription_id = request.form.get("subscription_id")
@ -629,6 +635,7 @@ def setup_paddle_callback(app: Flask):
LOG.e("Unknown plan_id %s", plan_id)
else:
LOG.w("partial subscription_payment_refunded, not handled")
execute_subscription_webhook(sub.user)
return "OK"
@ -742,6 +749,7 @@ def handle_coinbase_event(event) -> bool:
coinbase_subscription=coinbase_subscription,
),
)
execute_subscription_webhook(user)
return True

View File

@ -9,10 +9,13 @@
<h1 class="card-title">Create new account</h1>
<div class="form-group">
<label class="form-label">Email address</label>
{{ form.email(class="form-control", type="email") }}
{{ form.email(class="form-control", type="email", placeholder="YourName@protonmail.com") }}
<div class="small-text alert alert-info" style="margin-top: 1px">
Emails sent to your alias will be forwarded to this email address.
<br>
It can't be a disposable or forwarding email address.
<br>
We recommend using a <a href="https://proton.me/mail" target="_blank">Proton Mail</a> address
</div>
{{ render_field_errors(form.email) }}
</div>

View File

@ -181,10 +181,10 @@
<!-- END change name & profile picture -->
<!-- Change email -->
<div class="card">
<form method="post" enctype="multipart/form-data">
<input type="hidden" name="form-name" value="update-email">
{{ change_email_form.csrf_token }}
<div class="card-body">
<div class="card-body">
<form method="post" enctype="multipart/form-data">
<input type="hidden" name="form-name" value="update-email">
{{ change_email_form.csrf_token }}
<div class="card-title">Account Email</div>
<div class="mb-3">
This email address is used to log in to SimpleLogin.
@ -199,26 +199,30 @@
<!-- Not allow user to change email if there's a pending change -->
{{ change_email_form.email(class="form-control", value=current_user.email, readonly=pending_email != None) }}
{{ render_field_errors(change_email_form.email) }}
{% if pending_email %}
<div class="mt-2">
<span class="text-danger">Pending email change: {{ pending_email }}</span>
<a href="{{ url_for('dashboard.resend_email_change') }}"
class="btn btn-secondary btn-sm">
Resend
confirmation email
</a>
<a href="{{ url_for('dashboard.cancel_email_change') }}"
class="btn btn-secondary btn-sm">
Cancel email
change
</a>
</div>
{% endif %}
</div>
<button class="btn btn-outline-primary">Change Email</button>
</div>
</form>
</form>
{% if pending_email %}
<div class="mt-2">
<span class="text-danger float-left">Pending email change: {{ pending_email }}</span>
<form method="POST"
action="{{ url_for('dashboard.resend_email_change') }}"
class="float-left ml-2">
{{ change_email_form.csrf_token }}
<a onclick="this.closest('form').submit()"
class="btn btn-secondary btn-sm">Resend confirmation email</a>
</form>
<form method="POST"
action="{{ url_for('dashboard.cancel_email_change') }}"
class="float-left ml-2">
{{ change_email_form.csrf_token }}
<a onclick="this.closest('form').submit()"
class="btn btn-secondary btn-sm">Cancel email change</a>
</form>
</div>
{% endif %}
</div>
</div>
<!-- END Change email -->
<!-- Connect with Proton -->
@ -265,11 +269,15 @@
<div class="card" id="change_password">
<div class="card-body">
<div class="card-title">Password</div>
<div class="mb-3">You will receive an email containing instructions on how to change your password.</div>
<div class="mb-3">
You will receive an email containing instructions on how to change your password.
</div>
<form method="post">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="change-password">
<button class="btn btn-outline-primary">Change password</button>
<button class="btn btn-outline-primary">
Change password
</button>
</form>
</div>
</div>
@ -676,7 +684,8 @@
SimpleLogin forwards emails to your mailbox from the <b>reverse-alias</b> and not from the <b>original</b>
sender address.
<br />
If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>.
If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>
and the original From header is stored in <b>X-SimpleLogin-Original-From<b>.
You can choose to display this header in your email client.
<br />
As email headers aren't encrypted, your mailbox service can know the sender address via this header.

View File

@ -286,6 +286,7 @@
},
async mounted() {
Object.freeze(Object.prototype);
let that = this;
let res = await fetch(`/api/notifications?page=${that.page}`, {
method: "GET",

View File

@ -5,7 +5,7 @@
<div class="page-single">
<div class="container">
<div class="row">
<div class="col mx-auto" style="max-width: 28rem">
<div class="col mx-auto" style="max-width: 32rem">
<div class="text-center mb-6">
<a href="{{ LANDING_PAGE_URL }}">
<img src="/static/logo.svg"

View File

@ -17,7 +17,7 @@ def test_get_setting(flask_client):
"notification": True,
"random_alias_default_domain": "sl.local",
"sender_format": "AT",
"random_alias_suffix": "random_string",
"random_alias_suffix": "word",
}
@ -95,11 +95,13 @@ def test_get_setting_domains_v2(flask_client):
def test_update_settings_random_alias_suffix(flask_client):
user = login(flask_client)
# default random_alias_suffix is random_string
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value
assert user.random_alias_suffix == AliasSuffixEnum.word.value
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"})
assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "word"})
r = flask_client.patch(
"/api/setting", json={"random_alias_suffix": "random_string"}
)
assert r.status_code == 200
assert user.random_alias_suffix == AliasSuffixEnum.word.value
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value

View File

@ -0,0 +1,26 @@
from flask import url_for
from app.db import Session
from app.models import User, ResetPasswordCode
from tests.utils import create_new_user, random_token
def test_successful_reset_password(flask_client):
user = create_new_user()
original_pass_hash = user.password
user_id = user.id
reset_code = random_token()
ResetPasswordCode.create(user_id=user.id, code=reset_code)
ResetPasswordCode.create(user_id=user.id, code=random_token())
Session.commit()
r = flask_client.post(
url_for("auth.reset_password", code=reset_code),
data={"password": "1231idsfjaads"},
)
assert r.status_code == 302
assert ResetPasswordCode.get_by(user_id=user_id) is None
user = User.get(user_id)
assert user.password != original_pass_hash

View File

@ -1,10 +1,13 @@
from time import time
import arrow
from flask import url_for
from app import config
from app.dashboard.views.api_key import clean_up_unused_or_old_api_keys
from app.db import Session
from app.models import User, ApiKey
from tests.utils import login
from tests.utils import login, create_new_user
def test_api_key_page_requires_password(flask_client):
@ -87,3 +90,26 @@ def test_delete_all_api_keys(flask_client):
assert (
ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1
) # assert that user 2 still has 1 API key
def test_cleanup_api_keys():
user = create_new_user()
ApiKey.create(
user_id=user.id, name="used", last_used=arrow.utcnow().shift(days=-3), times=1
)
ApiKey.create(
user_id=user.id, name="keep 1", last_used=arrow.utcnow().shift(days=-2), times=1
)
ApiKey.create(
user_id=user.id, name="keep 2", last_used=arrow.utcnow().shift(days=-1), times=1
)
ApiKey.create(user_id=user.id, name="not used", last_used=None, times=1)
Session.flush()
old_max_api_keys = config.MAX_API_KEYS
config.MAX_API_KEYS = 2
clean_up_unused_or_old_api_keys(user.id)
keys = ApiKey.filter_by(user_id=user.id).all()
assert len(keys) == 2
assert keys[0].name.find("keep") == 0
assert keys[1].name.find("keep") == 0
config.MAX_API_KEYS = old_max_api_keys

View File

@ -316,6 +316,10 @@ def test_add_alias_in_global_trash(flask_client):
def test_add_alias_in_custom_domain_trash(flask_client):
user = login(flask_client)
for deleted_domain in DomainDeletedAlias.all():
Session.delete(deleted_domain)
Session.flush()
domain = random_domain()
custom_domain = CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True

View File

@ -0,0 +1,65 @@
Received: by mail-ed1-f49.google.com with SMTP id ej4so13657316edb.7
for <gmail@simplemail.fplante.fr>; Mon, 27 Jun 2022 08:48:15 -0700 (PDT)
X-Gm-Message-State: AJIora8exR9DGeRFoKAtjzwLtUpH5hqx6Zt3tm8n4gUQQivGQ3fELjUV
yT7RQIfeW9Kv2atuOcgtmGYVU4iQ8VBeLmK1xvOYL4XpXfrT7ZrJNQ==
Authentication-Results: mx.google.com;
dkim=pass header.i=@matera.eu header.s=fnt header.b=XahYMey7;
dkim=pass header.i=@sendgrid.info header.s=smtpapi header.b="QOCS/yjt";
spf=pass (google.com: domain of bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu designates 168.245.4.42 as permitted sender) smtp.mailfrom="bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu";
dmarc=pass (p=NONE sp=NONE dis=NONE) header.from=matera.eu
Received: from out.frontapp.com (unknown)
by geopod-ismtpd-3-0 (SG)
with ESMTP id d2gM2N7PT7W8d2-UEC4ESA
for <csyndic.quartz@gmail.com>;
Mon, 27 Jun 2022 15:48:11.014 +0000 (UTC)
Content-Type: multipart/alternative;
boundary="----sinikael-?=_1-16563448907660.10629093370416887"
In-Reply-To:
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
References:
<imported@frontapp.com_t:AssembléeGénérale2022-06-25T16:32:03+02:006b3cdade-982b-47cd-8114-6a037dfb7d60>
<imported@frontapp.com_f924cce139940c9935621f067d46443597394f34>
<imported@frontapp.com_t:Appeldefonds2022-06-26T10:04:55+02:00d89f5e23-6d98-4f01-95fa-b7c7544b7aa9>
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
<af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
From: {{ sender_address }}
To: {{ recipient_address }}
CC: {{ cc_address }}
Subject: Something
Message-ID: <af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
X-Mailer: Front (1.0; +https://frontapp.com;
+msgid=af07e94a66ece6564ae30a2aaac7a34c@frontapp.com)
X-Feedback-ID: 14445963:SG
X-SG-EID:
=?us-ascii?Q?XtlxQDg5i3HqMzQY2Upg19JPZBVl1RybInUUL2yta9uBoIU4KU1FMJ5DjWrz6g?=
=?us-ascii?Q?fJUK5Qmneg2uc46gwp5BdHdp6Foaq5gg3xJriv3?=
=?us-ascii?Q?9OA=2FWRifeylU9O+ngdNbOKXoeJAkROmp2mCgw9x?=
=?us-ascii?Q?uud+EclOT9mYVtbZsydOLLm6Y2PPswQl8lnmiku?=
=?us-ascii?Q?DAhkG15HTz2FbWGWNDFb7VrSsN5ddjAscr6sIHw?=
=?us-ascii?Q?S48R5fnXmfhPbmlCgqFjr0FGphfuBdNAt6z6w8a?=
=?us-ascii?Q?o9u1EYDIX7zWHZ+Tr3eyw=3D=3D?=
X-SG-ID:
=?us-ascii?Q?N2C25iY2uzGMFz6rgvQsb8raWjw0ZPf1VmjsCkspi=2FI9PhcvqXQTpKqqyZkvBe?=
=?us-ascii?Q?+2RscnQ4WPkA+BN1vYgz1rezTVIqgp+rlWrKk8o?=
=?us-ascii?Q?HoB5dzpX6HKWtWCVRi10zwlDN1+pJnySoIUrlaT?=
=?us-ascii?Q?PA2aqQKmMQbjTl0CUAFryR8hhHcxdS0cQowZSd7?=
=?us-ascii?Q?XNjJWLvCGF7ODwg=2FKr+4yRE8UvULS2nrdO2wWyQ?=
=?us-ascii?Q?AiFHdPdZsRlgNomEo=3D?=
X-Spamd-Result: default: False [-2.00 / 13.00];
ARC_ALLOW(-1.00)[google.com:s=arc-20160816:i=1];
MIME_GOOD(-0.10)[multipart/alternative,text/plain];
REPLYTO_ADDR_EQ_FROM(0.00)[];
FORGED_RECIPIENTS_FORWARDING(0.00)[];
NEURAL_HAM(-0.00)[-0.981];
FREEMAIL_TO(0.00)[gmail.com];
RCVD_TLS_LAST(0.00)[];
FREEMAIL_ENVFROM(0.00)[gmail.com];
MIME_TRACE(0.00)[0:+,1:+,2:~];
RWL_MAILSPIKE_POSSIBLE(0.00)[209.85.208.49:from]
------sinikael-?=_1-16563448907660.10629093370416887
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: quoted-printable
From {{ sender_address }} To {{ recipient_address }}
------sinikael-?=_1-16563448907660.10629093370416887--

View File

@ -0,0 +1,74 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.db import Session
from app.email import headers, status
from app.mail_sender import mail_sender
from app.models import Alias
from app.utils import random_string
from tests.utils import create_new_user, load_eml_file, random_email
@mail_sender.store_emails_test_decorator
def test_original_headers_from_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": original_sender_address,
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert request.msg[headers.SL_ORIGINAL_FROM] == original_sender_address
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)
@mail_sender.store_emails_test_decorator
def test_original_headers_from_with_name_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
name = random_string(10)
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": f"{name} <{original_sender_address}>",
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert (
request.msg[headers.SL_ORIGINAL_FROM] == f"{name} <{original_sender_address}>"
)
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)

View File

@ -1,7 +1,9 @@
import arrow
from app import config
from app.db import Session
from app.models import User, Job
from tests.utils import random_email
from app.models import User, Job, PartnerSubscription, PartnerUser, ManualSubscription
from app.proton.utils import get_proton_partner
from tests.utils import random_email, random_token
def test_create_from_partner(flask_client):
@ -11,6 +13,7 @@ def test_create_from_partner(flask_client):
)
assert user.notification is False
assert user.trial_end is None
assert user.newsletter_alias_id is None
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_SEND_PROTON_WELCOME_1
@ -23,3 +26,23 @@ def test_user_created_by_partner(flask_client):
regular_user = User.create(email=random_email())
assert regular_user.created_by_partner is False
def test_user_is_premium(flask_client):
user = User.create(email=random_email(), from_partner=True)
assert not user.is_premium()
partner_user = PartnerUser.create(
user_id=user.id,
partner_id=get_proton_partner().id,
partner_email=user.email,
external_user_id=random_token(),
flush=True,
)
ps = PartnerSubscription.create(
partner_user_id=partner_user.id, end_at=arrow.now().shift(years=1), flush=True
)
assert user.is_premium()
assert not user.is_premium(include_partner_subscription=False)
ManualSubscription.create(user_id=user.id, end_at=ps.end_at)
assert user.is_premium()
assert user.is_premium(include_partner_subscription=False)

View File

@ -0,0 +1,152 @@
import re
from app.alias_suffix import get_alias_suffixes
from app.db import Session
from app.models import SLDomain, PartnerUser, AliasOptions, CustomDomain
from app.proton.utils import get_proton_partner
from init_app import add_sl_domains
from tests.utils import create_new_user, random_token
def setup_module():
Session.query(SLDomain).delete()
SLDomain.create(
domain="hidden", premium_only=False, flush=True, order=5, hidden=True
)
SLDomain.create(domain="free_non_partner", premium_only=False, flush=True, order=4)
SLDomain.create(
domain="premium_non_partner", premium_only=True, flush=True, order=3
)
SLDomain.create(
domain="free_partner",
premium_only=False,
flush=True,
partner_id=get_proton_partner().id,
order=2,
)
SLDomain.create(
domain="premium_partner",
premium_only=True,
flush=True,
partner_id=get_proton_partner().id,
order=1,
)
Session.commit()
def teardown_module():
Session.query(SLDomain).delete()
add_sl_domains()
def test_get_default_domain_even_if_is_not_allowed():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(
hidden=False, partner_id=None, premium_only=False
).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
assert suffixes[0].domain == default_domain.domain
def test_get_default_domain_hidden():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(
hidden=True, partner_id=None, premium_only=False
).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
for suffix in suffixes:
domain = SLDomain.get_by(domain=suffix.domain)
assert not domain.hidden
assert suffixes[0].domain != default_domain.domain
def test_get_default_domain_is_premium_for_free_user():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(partner_id=None, premium_only=True).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
for suffix in suffixes:
domain = SLDomain.get_by(domain=suffix.domain)
assert not domain.premium_only
assert suffixes[0].domain != default_domain.domain
def test_suffixes_are_valid():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
CustomDomain.create(
user_id=user.id, domain=f"{random_token(10)}.com", verified=True
)
user.trial_end = None
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
alias_suffixes = get_alias_suffixes(user, alias_options=options)
valid_re = re.compile(r"^(\.[\w_]+)?@[\.\w]+$")
has_prefix = 0
for suffix in alias_suffixes:
match = valid_re.match(suffix.suffix)
assert match is not None
if len(match.groups()) >= 1:
has_prefix += 1
assert has_prefix > 0
def test_get_default_domain_is_only_shown_once():
user = create_new_user()
default_domain = SLDomain.filter_by(hidden=False).order_by(SLDomain.order).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
found_default = False
found_domains = set()
for suffix in suffixes:
assert suffix.domain not in found_domains
found_domains.add(suffix.domain)
if default_domain.domain == suffix.domain:
found_default = True
assert found_default

View File

@ -16,6 +16,7 @@ from app.models import (
Directory,
DirectoryMailbox,
User,
DomainDeletedAlias,
)
from tests.utils import create_new_user, random_domain, random_token
@ -83,6 +84,11 @@ def get_auto_create_alias_tests(user: User) -> List:
regex="ok-.*",
flush=True,
)
deleted_alias = f"deletedalias@{catchall.domain}"
Session.add(
DomainDeletedAlias(email=deleted_alias, domain_id=catchall.id, user_id=user.id)
)
Session.flush()
dir_name = random_token()
directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
DirectoryMailbox.create(
@ -101,6 +107,7 @@ def get_auto_create_alias_tests(user: User) -> List:
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
(deleted_alias, False),
]

View File

@ -128,3 +128,74 @@ def test_get_premium_with_partner_domains():
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_partner_and_free_default_domain():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 3
assert domains[0].domain == "premium_partner"
assert domains[1].domain == "free_partner"
assert domains[2].domain == "free_non_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_premium_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False, premium_only=True).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_hidden_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = SLDomain.filter_by(hidden=True).first().id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)

View File

@ -0,0 +1,113 @@
import http.server
import json
import threading
import arrow
from app import config
from app.models import (
Subscription,
AppleSubscription,
CoinbaseSubscription,
ManualSubscription,
)
from tests.utils import create_new_user, random_token
from app.subscription_webhook import execute_subscription_webhook
http_server = None
last_http_request = None
def setup_module():
global http_server
http_server = http.server.ThreadingHTTPServer(("", 0), HTTPTestServer)
print(http_server.server_port)
threading.Thread(target=http_server.serve_forever, daemon=True).start()
config.SUBSCRIPTION_CHANGE_WEBHOOK = f"http://localhost:{http_server.server_port}"
def teardown_module():
global http_server
config.SUBSCRIPTION_CHANGE_WEBHOOK = None
http_server.shutdown()
class HTTPTestServer(http.server.BaseHTTPRequestHandler):
def do_POST(self):
global last_http_request
content_len = int(self.headers.get("Content-Length"))
body_data = self.rfile.read(content_len)
last_http_request = json.loads(body_data)
self.send_response(200)
def test_webhook_with_trial():
user = create_new_user()
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] is None
def test_webhook_with_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_token(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_apple_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_token(10),
plan="yearly",
product_id="",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_coinbase_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_manual_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
ManualSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp