Compare commits

...

6 Commits

Author SHA1 Message Date
b2430cbc5b 4.32.1 2023-07-12 11:00:04 +00:00
1258115397 4.32.0 2023-07-11 11:00:05 +00:00
38c134d903 4.31.0 2023-06-30 11:00:06 +00:00
cd77e4cc2d 4.30.1 2023-06-28 11:00:03 +00:00
87aedf3207 4.30.0 2023-06-27 11:00:04 +00:00
3523c9fc15 4.29.4 2023-06-07 11:00:05 +00:00
30 changed files with 496 additions and 57 deletions

View File

@ -169,6 +169,12 @@ For HTML templates, we use `djlint`. Before creating a pull request, please run
poetry run djlint --check templates poetry run djlint --check templates
``` ```
If some files aren't properly formatted, you can format all files with
```bash
poetry run djlint --reformat .
```
## Test sending email ## Test sending email
[swaks](http://www.jetmore.org/john/code/swaks/) is used for sending test emails to the `email_handler`. [swaks](http://www.jetmore.org/john/code/swaks/) is used for sending test emails to the `email_handler`.

View File

@ -23,10 +23,10 @@ COPY poetry.lock pyproject.toml ./
# Install and setup poetry # Install and setup poetry
RUN pip install -U pip \ RUN pip install -U pip \
&& apt-get update \ && apt-get update \
&& apt install -y curl netcat gcc python3-dev gnupg git libre2-dev \ && apt install -y curl netcat-traditional gcc python3-dev gnupg git libre2-dev \
&& curl -sSL https://install.python-poetry.org | python3 - \ && curl -sSL https://install.python-poetry.org | python3 - \
# Remove curl and netcat from the image # Remove curl and netcat from the image
&& apt-get purge -y curl netcat \ && apt-get purge -y curl netcat-traditional \
# Run poetry # Run poetry
&& poetry config virtualenvs.create false \ && poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --no-root \ && poetry install --no-interaction --no-ansi --no-root \

View File

@ -162,8 +162,6 @@ def get_alias_suffixes(
or user.default_alias_public_domain_id != sl_domain.id or user.default_alias_public_domain_id != sl_domain.id
): ):
alias_suffixes.append(alias_suffix) alias_suffixes.append(alias_suffix)
# If no default domain mark it as found
default_domain_found = user.default_alias_public_domain_id is None
else: else:
default_domain_found = True default_domain_found = True
alias_suffixes.insert(0, alias_suffix) alias_suffixes.insert(0, alias_suffix)

View File

@ -57,6 +57,8 @@ def get_user_if_alias_would_auto_create(
domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain( domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user address, notify_user=notify_user
) )
if DomainDeletedAlias.get_by(email=address):
return None
if domain_and_rule: if domain_and_rule:
return domain_and_rule[0].user return domain_and_rule[0].user
directory = check_if_alias_can_be_auto_created_for_a_directory( directory = check_if_alias_can_be_auto_created_for_a_directory(

View File

@ -534,3 +534,4 @@ SKIP_MX_LOOKUP_ON_CHECK = False
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None) SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)
MAX_API_KEYS = int(os.environ.get("MAX_API_KEYS", 30))

View File

@ -3,9 +3,11 @@ from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, validators from wtforms import StringField, validators
from app import config
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.dashboard.views.enter_sudo import sudo_required from app.dashboard.views.enter_sudo import sudo_required
from app.db import Session from app.db import Session
from app.extensions import limiter
from app.models import ApiKey from app.models import ApiKey
from app.utils import CSRFValidationForm from app.utils import CSRFValidationForm
@ -14,9 +16,32 @@ class NewApiKeyForm(FlaskForm):
name = StringField("Name", validators=[validators.DataRequired()]) name = StringField("Name", validators=[validators.DataRequired()])
def clean_up_unused_or_old_api_keys(user_id: int):
total_keys = ApiKey.filter_by(user_id=user_id).count()
# Remove oldest unused
for api_key in (
ApiKey.filter_by(user_id=user_id, last_used=None)
.order_by(ApiKey.created_at.asc())
.all()
):
Session.delete(api_key)
total_keys -= 1
if total_keys <= config.MAX_API_KEYS:
return
# Clean up oldest used
for api_key in (
ApiKey.filter_by(user_id=user_id).order_by(ApiKey.last_used.asc()).all()
):
Session.delete(api_key)
total_keys -= 1
if total_keys <= config.MAX_API_KEYS:
return
@dashboard_bp.route("/api_key", methods=["GET", "POST"]) @dashboard_bp.route("/api_key", methods=["GET", "POST"])
@login_required @login_required
@sudo_required @sudo_required
@limiter.limit("10/hour")
def api_key(): def api_key():
api_keys = ( api_keys = (
ApiKey.filter(ApiKey.user_id == current_user.id) ApiKey.filter(ApiKey.user_id == current_user.id)
@ -50,6 +75,7 @@ def api_key():
elif request.form.get("form-name") == "create": elif request.form.get("form-name") == "create":
if new_api_key_form.validate(): if new_api_key_form.validate():
clean_up_unused_or_old_api_keys(current_user.id)
new_api_key = ApiKey.create( new_api_key = ApiKey.create(
name=new_api_key_form.name.data, user_id=current_user.id name=new_api_key_form.name.data, user_id=current_user.id
) )

View File

@ -1,3 +1,7 @@
import base64
import binascii
import json
import arrow import arrow
from flask import render_template, request, redirect, url_for, flash from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user from flask_login import login_required, current_user
@ -180,7 +184,9 @@ def mailbox_route():
def send_verification_email(user, mailbox): def send_verification_email(user, mailbox):
s = TimestampSigner(MAILBOX_SECRET) s = TimestampSigner(MAILBOX_SECRET)
mailbox_id_signed = s.sign(str(mailbox.id)).decode() encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8")
b64_data = base64.urlsafe_b64encode(encoded_data)
mailbox_id_signed = s.sign(b64_data).decode()
verification_url = ( verification_url = (
URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}" URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}"
) )
@ -205,22 +211,34 @@ def send_verification_email(user, mailbox):
@dashboard_bp.route("/mailbox_verify") @dashboard_bp.route("/mailbox_verify")
def mailbox_verify(): def mailbox_verify():
s = TimestampSigner(MAILBOX_SECRET) s = TimestampSigner(MAILBOX_SECRET)
mailbox_id = request.args.get("mailbox_id") mailbox_verify_request = request.args.get("mailbox_id")
try: try:
r_id = int(s.unsign(mailbox_id, max_age=900)) mailbox_raw_data = s.unsign(mailbox_verify_request, max_age=900)
except Exception: except Exception:
flash("Invalid link. Please delete and re-add your mailbox", "error") flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route")) return redirect(url_for("dashboard.mailbox_route"))
else: try:
mailbox = Mailbox.get(r_id) decoded_data = base64.urlsafe_b64decode(mailbox_raw_data)
if not mailbox: except binascii.Error:
flash("Invalid link", "error") flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route")) return redirect(url_for("dashboard.mailbox_route"))
mailbox_data = json.loads(decoded_data)
if not isinstance(mailbox_data, list) or len(mailbox_data) != 2:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_id = mailbox_data[0]
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_email = mailbox_data[1]
if mailbox_email != mailbox.email:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox.verified = True mailbox.verified = True
Session.commit() Session.commit()
LOG.d("Mailbox %s is verified", mailbox) LOG.d("Mailbox %s is verified", mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox) return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)

View File

@ -20,6 +20,7 @@ X_SPAM_STATUS = "X-Spam-Status"
LIST_UNSUBSCRIBE = "List-Unsubscribe" LIST_UNSUBSCRIBE = "List-Unsubscribe"
LIST_UNSUBSCRIBE_POST = "List-Unsubscribe-Post" LIST_UNSUBSCRIBE_POST = "List-Unsubscribe-Post"
RETURN_PATH = "Return-Path" RETURN_PATH = "Return-Path"
AUTHENTICATION_RESULTS = "Authentication-Results"
# headers used to DKIM sign in order of preference # headers used to DKIM sign in order of preference
DKIM_HEADERS = [ DKIM_HEADERS = [
@ -32,6 +33,7 @@ DKIM_HEADERS = [
SL_DIRECTION = "X-SimpleLogin-Type" SL_DIRECTION = "X-SimpleLogin-Type"
SL_EMAIL_LOG_ID = "X-SimpleLogin-EmailLog-ID" SL_EMAIL_LOG_ID = "X-SimpleLogin-EmailLog-ID"
SL_ENVELOPE_FROM = "X-SimpleLogin-Envelope-From" SL_ENVELOPE_FROM = "X-SimpleLogin-Envelope-From"
SL_ORIGINAL_FROM = "X-SimpleLogin-Original-From"
SL_ENVELOPE_TO = "X-SimpleLogin-Envelope-To" SL_ENVELOPE_TO = "X-SimpleLogin-Envelope-To"
SL_CLIENT_IP = "X-SimpleLogin-Client-IP" SL_CLIENT_IP = "X-SimpleLogin-Client-IP"

View File

@ -951,6 +951,8 @@ def add_header(msg: Message, text_header, html_header=None) -> Message:
for part in msg.get_payload(): for part in msg.get_payload():
if isinstance(part, Message): if isinstance(part, Message):
new_parts.append(add_header(part, text_header, html_header)) new_parts.append(add_header(part, text_header, html_header))
elif isinstance(part, str):
new_parts.append(MIMEText(part))
else: else:
new_parts.append(part) new_parts.append(part)
clone_msg = copy(msg) clone_msg = copy(msg)
@ -959,7 +961,14 @@ def add_header(msg: Message, text_header, html_header=None) -> Message:
elif content_type in ("multipart/mixed", "multipart/signed"): elif content_type in ("multipart/mixed", "multipart/signed"):
new_parts = [] new_parts = []
parts = list(msg.get_payload()) payload = msg.get_payload()
if isinstance(payload, str):
# The message is badly formatted inject as new
new_parts = [MIMEText(text_header, "plain"), MIMEText(payload, "plain")]
clone_msg = copy(msg)
clone_msg.set_payload(new_parts)
return clone_msg
parts = list(payload)
LOG.d("only add header for the first part for %s", content_type) LOG.d("only add header for the first part for %s", content_type)
for ix, part in enumerate(parts): for ix, part in enumerate(parts):
if ix == 0: if ix == 0:

View File

@ -74,8 +74,8 @@ class UnsubscribeEncoder:
) )
signed_data = cls._get_signer().sign(serialized_data).decode("utf-8") signed_data = cls._get_signer().sign(serialized_data).decode("utf-8")
encoded_request = f"{UNSUB_PREFIX}.{signed_data}" encoded_request = f"{UNSUB_PREFIX}.{signed_data}"
if len(encoded_request) > 256: if len(encoded_request) > 512:
LOG.e("Encoded request is longer than 256 chars") LOG.w("Encoded request is longer than 512 chars")
return encoded_request return encoded_request
@staticmethod @staticmethod

View File

@ -9,6 +9,7 @@ from app.handler.unsubscribe_encoder import (
UnsubscribeData, UnsubscribeData,
UnsubscribeOriginalData, UnsubscribeOriginalData,
) )
from app.log import LOG
from app.models import Alias, Contact, UnsubscribeBehaviourEnum from app.models import Alias, Contact, UnsubscribeBehaviourEnum
@ -30,6 +31,7 @@ class UnsubscribeGenerator:
""" """
unsubscribe_data = message[headers.LIST_UNSUBSCRIBE] unsubscribe_data = message[headers.LIST_UNSUBSCRIBE]
if not unsubscribe_data: if not unsubscribe_data:
LOG.info("Email has no unsubscribe header")
return message return message
raw_methods = [method.strip() for method in unsubscribe_data.split(",")] raw_methods = [method.strip() for method in unsubscribe_data.split(",")]
mailto_unsubs = None mailto_unsubs = None
@ -44,7 +46,9 @@ class UnsubscribeGenerator:
if url_data.scheme == "mailto": if url_data.scheme == "mailto":
query_data = urllib.parse.parse_qs(url_data.query) query_data = urllib.parse.parse_qs(url_data.query)
mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0]) mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0])
LOG.debug(f"Unsub is mailto to {mailto_unsubs}")
else: else:
LOG.debug(f"Unsub has {url_data.scheme} scheme")
other_unsubs.append(method) other_unsubs.append(method)
# If there are non mailto unsubscribe methods, use those in the header # If there are non mailto unsubscribe methods, use those in the header
if other_unsubs: if other_unsubs:
@ -56,18 +60,19 @@ class UnsubscribeGenerator:
add_or_replace_header( add_or_replace_header(
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click" message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
) )
LOG.debug(f"Adding click unsub methods to header {other_unsubs}")
return message return message
if not mailto_unsubs: elif not mailto_unsubs:
message = delete_header(message, headers.LIST_UNSUBSCRIBE) LOG.debug("No unsubs. Deleting all unsub headers")
message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST) delete_header(message, headers.LIST_UNSUBSCRIBE)
delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
return message return message
return self._add_unsubscribe_header( unsub_data = UnsubscribeData(
message, UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeData( UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
),
) )
LOG.debug(f"Adding unsub data {unsub_data}")
return self._add_unsubscribe_header(message, unsub_data)
def _add_unsubscribe_header( def _add_unsubscribe_header(
self, message: Message, unsub: UnsubscribeData self, message: Message, unsub: UnsubscribeData

View File

@ -46,6 +46,7 @@ class SendRequest:
"mail_options": self.mail_options, "mail_options": self.mail_options,
"rcpt_options": self.rcpt_options, "rcpt_options": self.rcpt_options,
"is_forward": self.is_forward, "is_forward": self.is_forward,
"retries": self.retries,
} }
return json.dumps(data).encode("utf-8") return json.dumps(data).encode("utf-8")
@ -66,6 +67,7 @@ class SendRequest:
mail_options=decoded_data["mail_options"], mail_options=decoded_data["mail_options"],
rcpt_options=decoded_data["rcpt_options"], rcpt_options=decoded_data["rcpt_options"],
is_forward=decoded_data["is_forward"], is_forward=decoded_data["is_forward"],
retries=decoded_data.get("retries", 1),
) )
def save_request_to_unsent_dir(self, prefix: str = "DeliveryFail"): def save_request_to_unsent_dir(self, prefix: str = "DeliveryFail"):

View File

@ -445,7 +445,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
random_alias_suffix = sa.Column( random_alias_suffix = sa.Column(
sa.Integer, sa.Integer,
nullable=False, nullable=False,
default=AliasSuffixEnum.random_string.value, default=AliasSuffixEnum.word.value,
server_default=str(AliasSuffixEnum.random_string.value), server_default=str(AliasSuffixEnum.random_string.value),
) )
@ -514,9 +514,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
server_default=BlockBehaviourEnum.return_2xx.name, server_default=BlockBehaviourEnum.return_2xx.name,
) )
# to keep existing behavior, the server default is TRUE whereas for new user, the default value is FALSE
include_header_email_header = sa.Column( include_header_email_header = sa.Column(
sa.Boolean, default=False, nullable=False, server_default="1" sa.Boolean, default=True, nullable=False, server_default="1"
) )
# bitwise flags. Allow for future expansion # bitwise flags. Allow for future expansion
@ -1016,6 +1015,10 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if not self.is_premium(): if not self.is_premium():
conditions.append(SLDomain.premium_only == False) # noqa: E712 conditions.append(SLDomain.premium_only == False) # noqa: E712
partner_domain_cond = [] # noqa:E711 partner_domain_cond = [] # noqa:E711
if self.default_alias_public_domain_id is not None:
partner_domain_cond.append(
SLDomain.id == self.default_alias_public_domain_id
)
if alias_options.show_partner_domains is not None: if alias_options.show_partner_domains is not None:
partner_user = PartnerUser.filter_by( partner_user = PartnerUser.filter_by(
user_id=self.id, partner_id=alias_options.show_partner_domains.id user_id=self.id, partner_id=alias_options.show_partner_domains.id

View File

@ -846,22 +846,23 @@ def forward_email_to_mailbox(
f"""Email sent to {alias.email} from an invalid address and cannot be replied""", f"""Email sent to {alias.email} from an invalid address and cannot be replied""",
) )
delete_all_headers_except( headers_to_keep = [
msg, headers.FROM,
[ headers.TO,
headers.FROM, headers.CC,
headers.TO, headers.SUBJECT,
headers.CC, headers.DATE,
headers.SUBJECT, # do not delete original message id
headers.DATE, headers.MESSAGE_ID,
# do not delete original message id # References and In-Reply-To are used for keeping the email thread
headers.MESSAGE_ID, headers.REFERENCES,
# References and In-Reply-To are used for keeping the email thread headers.IN_REPLY_TO,
headers.REFERENCES, headers.LIST_UNSUBSCRIBE,
headers.IN_REPLY_TO, headers.LIST_UNSUBSCRIBE_POST,
] ] + headers.MIME_HEADERS
+ headers.MIME_HEADERS, if user.include_header_email_header:
) headers_to_keep.append(headers.AUTHENTICATION_RESULTS)
delete_all_headers_except(msg, headers_to_keep)
# create PGP email if needed # create PGP email if needed
if mailbox.pgp_enabled() and user.is_premium() and not alias.disable_pgp: if mailbox.pgp_enabled() and user.is_premium() and not alias.disable_pgp:
@ -898,6 +899,11 @@ def forward_email_to_mailbox(
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id) msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
if user.include_header_email_header: if user.include_header_email_header:
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
if contact.name:
original_from = f"{contact.name} <{contact.website_email}>"
else:
original_from = contact.website_email
msg[headers.SL_ORIGINAL_FROM] = original_from
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email # when an alias isn't in the To: header, there's no way for users to know what alias has received the email
msg[headers.SL_ENVELOPE_TO] = alias.email msg[headers.SL_ENVELOPE_TO] = alias.email

View File

@ -9,10 +9,13 @@
<h1 class="card-title">Create new account</h1> <h1 class="card-title">Create new account</h1>
<div class="form-group"> <div class="form-group">
<label class="form-label">Email address</label> <label class="form-label">Email address</label>
{{ form.email(class="form-control", type="email") }} {{ form.email(class="form-control", type="email", placeholder="YourName@protonmail.com") }}
<div class="small-text alert alert-info" style="margin-top: 1px"> <div class="small-text alert alert-info" style="margin-top: 1px">
Emails sent to your alias will be forwarded to this email address. Emails sent to your alias will be forwarded to this email address.
<br>
It can't be a disposable or forwarding email address. It can't be a disposable or forwarding email address.
<br>
We recommend using a <a href="https://proton.me/mail" target="_blank">Proton Mail</a> address
</div> </div>
{{ render_field_errors(form.email) }} {{ render_field_errors(form.email) }}
</div> </div>

View File

@ -684,7 +684,8 @@
SimpleLogin forwards emails to your mailbox from the <b>reverse-alias</b> and not from the <b>original</b> SimpleLogin forwards emails to your mailbox from the <b>reverse-alias</b> and not from the <b>original</b>
sender address. sender address.
<br /> <br />
If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>. If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>
and the original From header is stored in <b>X-SimpleLogin-Original-From<b>.
You can choose to display this header in your email client. You can choose to display this header in your email client.
<br /> <br />
As email headers aren't encrypted, your mailbox service can know the sender address via this header. As email headers aren't encrypted, your mailbox service can know the sender address via this header.

View File

@ -286,6 +286,7 @@
}, },
async mounted() { async mounted() {
Object.freeze(Object.prototype);
let that = this; let that = this;
let res = await fetch(`/api/notifications?page=${that.page}`, { let res = await fetch(`/api/notifications?page=${that.page}`, {
method: "GET", method: "GET",

View File

@ -5,7 +5,7 @@
<div class="page-single"> <div class="page-single">
<div class="container"> <div class="container">
<div class="row"> <div class="row">
<div class="col mx-auto" style="max-width: 28rem"> <div class="col mx-auto" style="max-width: 32rem">
<div class="text-center mb-6"> <div class="text-center mb-6">
<a href="{{ LANDING_PAGE_URL }}"> <a href="{{ LANDING_PAGE_URL }}">
<img src="/static/logo.svg" <img src="/static/logo.svg"

View File

@ -17,7 +17,7 @@ def test_get_setting(flask_client):
"notification": True, "notification": True,
"random_alias_default_domain": "sl.local", "random_alias_default_domain": "sl.local",
"sender_format": "AT", "sender_format": "AT",
"random_alias_suffix": "random_string", "random_alias_suffix": "word",
} }
@ -95,11 +95,13 @@ def test_get_setting_domains_v2(flask_client):
def test_update_settings_random_alias_suffix(flask_client): def test_update_settings_random_alias_suffix(flask_client):
user = login(flask_client) user = login(flask_client)
# default random_alias_suffix is random_string # default random_alias_suffix is random_string
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value assert user.random_alias_suffix == AliasSuffixEnum.word.value
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"}) r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"})
assert r.status_code == 400 assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "word"}) r = flask_client.patch(
"/api/setting", json={"random_alias_suffix": "random_string"}
)
assert r.status_code == 200 assert r.status_code == 200
assert user.random_alias_suffix == AliasSuffixEnum.word.value assert user.random_alias_suffix == AliasSuffixEnum.random_string.value

View File

@ -1,10 +1,13 @@
from time import time from time import time
import arrow
from flask import url_for from flask import url_for
from app import config
from app.dashboard.views.api_key import clean_up_unused_or_old_api_keys
from app.db import Session from app.db import Session
from app.models import User, ApiKey from app.models import User, ApiKey
from tests.utils import login from tests.utils import login, create_new_user
def test_api_key_page_requires_password(flask_client): def test_api_key_page_requires_password(flask_client):
@ -87,3 +90,26 @@ def test_delete_all_api_keys(flask_client):
assert ( assert (
ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1 ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1
) # assert that user 2 still has 1 API key ) # assert that user 2 still has 1 API key
def test_cleanup_api_keys():
user = create_new_user()
ApiKey.create(
user_id=user.id, name="used", last_used=arrow.utcnow().shift(days=-3), times=1
)
ApiKey.create(
user_id=user.id, name="keep 1", last_used=arrow.utcnow().shift(days=-2), times=1
)
ApiKey.create(
user_id=user.id, name="keep 2", last_used=arrow.utcnow().shift(days=-1), times=1
)
ApiKey.create(user_id=user.id, name="not used", last_used=None, times=1)
Session.flush()
old_max_api_keys = config.MAX_API_KEYS
config.MAX_API_KEYS = 2
clean_up_unused_or_old_api_keys(user.id)
keys = ApiKey.filter_by(user_id=user.id).all()
assert len(keys) == 2
assert keys[0].name.find("keep") == 0
assert keys[1].name.find("keep") == 0
config.MAX_API_KEYS = old_max_api_keys

View File

@ -316,6 +316,10 @@ def test_add_alias_in_global_trash(flask_client):
def test_add_alias_in_custom_domain_trash(flask_client): def test_add_alias_in_custom_domain_trash(flask_client):
user = login(flask_client) user = login(flask_client)
for deleted_domain in DomainDeletedAlias.all():
Session.delete(deleted_domain)
Session.flush()
domain = random_domain() domain = random_domain()
custom_domain = CustomDomain.create( custom_domain = CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True user_id=user.id, domain=domain, ownership_verified=True, commit=True

View File

@ -0,0 +1,21 @@
Sender: somebody@somewhere.net
Content-Type: multipart/mixed; boundary="----=_Part_3946_1099248058.1688752298149"
--0c916c9b5fe3c925d7bafeb988bb6794
Content-Type: text/plain; charset="UTF-8"
Content-Transfer-Encoding: quoted-printable
notification test
--0c916c9b5fe3c925d7bafeb988bb6794
Content-Type: text/html; charset="UTF-8"
Content-Transfer-Encoding: quoted-printable
<html><head><meta http-equiv=3D"Content-Type" content=3D"text/html; charset=
=3DUTF-8"><meta http-equiv=3D"X-UA-Compatible" content=3D"IE=3Dedge"><meta =
name=3D"format-detection" content=3D"telephone=3Dno"><meta name=3D"viewport=
" content=3D"width=3Ddevice-width, initial-scale=3D1.0">
--0c916c9b5fe3c925d7bafeb988bb6794--

View File

@ -0,0 +1,27 @@
From: {{sender_address}}
To: {{recipient_address}}
Subject: Test subject
Content-Type: multipart/alternative; boundary="MLF8fvg556fdhFDH7=_?:
--MLF8fvg556fdhFDH7=_?:
Content-Type: text/plain;
charset="utf-8"
Content-Transfer-Encoding: quoted-printable
*************************************************************************
This five-part limited series, based on the brilliant graphic novel by Me
--MLF8fvg556fdhFDH7=_?:
Content-Type: text/html;
charset="utf-8"
Content-Transfer-Encoding: 8bit
--MLF8fvg556fdhFDH7=_?:
Content-Type: text/plain;
charset="utf-8"
Content-Transfer-Encoding: quoted-printable
*************************************************************************
*************************************************************************

View File

@ -0,0 +1,65 @@
Received: by mail-ed1-f49.google.com with SMTP id ej4so13657316edb.7
for <gmail@simplemail.fplante.fr>; Mon, 27 Jun 2022 08:48:15 -0700 (PDT)
X-Gm-Message-State: AJIora8exR9DGeRFoKAtjzwLtUpH5hqx6Zt3tm8n4gUQQivGQ3fELjUV
yT7RQIfeW9Kv2atuOcgtmGYVU4iQ8VBeLmK1xvOYL4XpXfrT7ZrJNQ==
Authentication-Results: mx.google.com;
dkim=pass header.i=@matera.eu header.s=fnt header.b=XahYMey7;
dkim=pass header.i=@sendgrid.info header.s=smtpapi header.b="QOCS/yjt";
spf=pass (google.com: domain of bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu designates 168.245.4.42 as permitted sender) smtp.mailfrom="bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu";
dmarc=pass (p=NONE sp=NONE dis=NONE) header.from=matera.eu
Received: from out.frontapp.com (unknown)
by geopod-ismtpd-3-0 (SG)
with ESMTP id d2gM2N7PT7W8d2-UEC4ESA
for <csyndic.quartz@gmail.com>;
Mon, 27 Jun 2022 15:48:11.014 +0000 (UTC)
Content-Type: multipart/alternative;
boundary="----sinikael-?=_1-16563448907660.10629093370416887"
In-Reply-To:
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
References:
<imported@frontapp.com_t:AssembléeGénérale2022-06-25T16:32:03+02:006b3cdade-982b-47cd-8114-6a037dfb7d60>
<imported@frontapp.com_f924cce139940c9935621f067d46443597394f34>
<imported@frontapp.com_t:Appeldefonds2022-06-26T10:04:55+02:00d89f5e23-6d98-4f01-95fa-b7c7544b7aa9>
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
<af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
From: {{ sender_address }}
To: {{ recipient_address }}
CC: {{ cc_address }}
Subject: Something
Message-ID: <af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
X-Mailer: Front (1.0; +https://frontapp.com;
+msgid=af07e94a66ece6564ae30a2aaac7a34c@frontapp.com)
X-Feedback-ID: 14445963:SG
X-SG-EID:
=?us-ascii?Q?XtlxQDg5i3HqMzQY2Upg19JPZBVl1RybInUUL2yta9uBoIU4KU1FMJ5DjWrz6g?=
=?us-ascii?Q?fJUK5Qmneg2uc46gwp5BdHdp6Foaq5gg3xJriv3?=
=?us-ascii?Q?9OA=2FWRifeylU9O+ngdNbOKXoeJAkROmp2mCgw9x?=
=?us-ascii?Q?uud+EclOT9mYVtbZsydOLLm6Y2PPswQl8lnmiku?=
=?us-ascii?Q?DAhkG15HTz2FbWGWNDFb7VrSsN5ddjAscr6sIHw?=
=?us-ascii?Q?S48R5fnXmfhPbmlCgqFjr0FGphfuBdNAt6z6w8a?=
=?us-ascii?Q?o9u1EYDIX7zWHZ+Tr3eyw=3D=3D?=
X-SG-ID:
=?us-ascii?Q?N2C25iY2uzGMFz6rgvQsb8raWjw0ZPf1VmjsCkspi=2FI9PhcvqXQTpKqqyZkvBe?=
=?us-ascii?Q?+2RscnQ4WPkA+BN1vYgz1rezTVIqgp+rlWrKk8o?=
=?us-ascii?Q?HoB5dzpX6HKWtWCVRi10zwlDN1+pJnySoIUrlaT?=
=?us-ascii?Q?PA2aqQKmMQbjTl0CUAFryR8hhHcxdS0cQowZSd7?=
=?us-ascii?Q?XNjJWLvCGF7ODwg=2FKr+4yRE8UvULS2nrdO2wWyQ?=
=?us-ascii?Q?AiFHdPdZsRlgNomEo=3D?=
X-Spamd-Result: default: False [-2.00 / 13.00];
ARC_ALLOW(-1.00)[google.com:s=arc-20160816:i=1];
MIME_GOOD(-0.10)[multipart/alternative,text/plain];
REPLYTO_ADDR_EQ_FROM(0.00)[];
FORGED_RECIPIENTS_FORWARDING(0.00)[];
NEURAL_HAM(-0.00)[-0.981];
FREEMAIL_TO(0.00)[gmail.com];
RCVD_TLS_LAST(0.00)[];
FREEMAIL_ENVFROM(0.00)[gmail.com];
MIME_TRACE(0.00)[0:+,1:+,2:~];
RWL_MAILSPIKE_POSSIBLE(0.00)[209.85.208.49:from]
------sinikael-?=_1-16563448907660.10629093370416887
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: quoted-printable
From {{ sender_address }} To {{ recipient_address }}
------sinikael-?=_1-16563448907660.10629093370416887--

View File

@ -0,0 +1,33 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.config import get_abs_path
from app.db import Session
from app.pgp_utils import load_public_key
from tests.utils import create_new_user, load_eml_file, random_email
from app.models import Alias
def test_encrypt_with_pgp():
user = create_new_user()
pgp_public_key = open(get_abs_path("local_data/public-pgp.asc")).read()
mailbox = user.default_mailbox
mailbox.pgp_public_key = pgp_public_key
mailbox.generic_subject = True
mailbox.pgp_finger_print = load_public_key(pgp_public_key)
alias = Alias.create_new_random(user)
Session.flush()
sender_address = random_email()
msg = load_eml_file(
"email_to_pgp_encrypt.eml",
{
"sender_address": sender_address,
"recipient_address": alias.email,
},
)
envelope = Envelope()
envelope.mail_from = sender_address
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result is not None

View File

@ -0,0 +1,74 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.db import Session
from app.email import headers, status
from app.mail_sender import mail_sender
from app.models import Alias
from app.utils import random_string
from tests.utils import create_new_user, load_eml_file, random_email
@mail_sender.store_emails_test_decorator
def test_original_headers_from_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": original_sender_address,
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert request.msg[headers.SL_ORIGINAL_FROM] == original_sender_address
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)
@mail_sender.store_emails_test_decorator
def test_original_headers_from_with_name_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
name = random_string(10)
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": f"{name} <{original_sender_address}>",
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert (
request.msg[headers.SL_ORIGINAL_FROM] == f"{name} <{original_sender_address}>"
)
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)

View File

@ -131,3 +131,22 @@ def test_suffixes_are_valid():
if len(match.groups()) >= 1: if len(match.groups()) >= 1:
has_prefix += 1 has_prefix += 1
assert has_prefix > 0 assert has_prefix > 0
def test_get_default_domain_is_only_shown_once():
user = create_new_user()
default_domain = SLDomain.filter_by(hidden=False).order_by(SLDomain.order).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
found_default = False
found_domains = set()
for suffix in suffixes:
assert suffix.domain not in found_domains
found_domains.add(suffix.domain)
if default_domain.domain == suffix.domain:
found_default = True
assert found_default

View File

@ -16,6 +16,7 @@ from app.models import (
Directory, Directory,
DirectoryMailbox, DirectoryMailbox,
User, User,
DomainDeletedAlias,
) )
from tests.utils import create_new_user, random_domain, random_token from tests.utils import create_new_user, random_domain, random_token
@ -83,6 +84,11 @@ def get_auto_create_alias_tests(user: User) -> List:
regex="ok-.*", regex="ok-.*",
flush=True, flush=True,
) )
deleted_alias = f"deletedalias@{catchall.domain}"
Session.add(
DomainDeletedAlias(email=deleted_alias, domain_id=catchall.id, user_id=user.id)
)
Session.flush()
dir_name = random_token() dir_name = random_token()
directory = Directory.create(name=dir_name, user_id=user.id, flush=True) directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
DirectoryMailbox.create( DirectoryMailbox.create(
@ -101,6 +107,7 @@ def get_auto_create_alias_tests(user: User) -> List:
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True), (f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True), (f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True), (f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
(deleted_alias, False),
] ]

View File

@ -128,3 +128,74 @@ def test_get_premium_with_partner_domains():
assert [d.domain for d in domains] == user.available_sl_domains( assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options alias_options=options
) )
def test_get_partner_and_free_default_domain():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 3
assert domains[0].domain == "premium_partner"
assert domains[1].domain == "free_partner"
assert domains[2].domain == "free_non_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_premium_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False, premium_only=True).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_hidden_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = SLDomain.filter_by(hidden=True).first().id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)

View File

@ -810,7 +810,7 @@ def test_add_header_multipart_with_invalid_part():
if i < 2: if i < 2:
assert part.get_payload().index("INJECT") > -1 assert part.get_payload().index("INJECT") > -1
else: else:
assert part == "invalid" assert part.get_payload() == "invalid"
def test_sl_formataddr(): def test_sl_formataddr():
@ -822,3 +822,10 @@ def test_sl_formataddr():
# test that the same name-address can't be handled by the built-in formataddr # test that the same name-address can't be handled by the built-in formataddr
with pytest.raises(UnicodeEncodeError): with pytest.raises(UnicodeEncodeError):
formataddr(("é", "è@ç.à")) formataddr(("é", "è@ç.à"))
def test_add_header_to_invalid_multipart():
msg = load_eml_file("add_header_multipart.eml")
msg = add_header(msg, "test", "test")
data = msg.as_string()
assert data != ""