Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
b2430cbc5b | |||
1258115397 | |||
38c134d903 | |||
cd77e4cc2d |
@ -169,6 +169,12 @@ For HTML templates, we use `djlint`. Before creating a pull request, please run
|
||||
poetry run djlint --check templates
|
||||
```
|
||||
|
||||
If some files aren't properly formatted, you can format all files with
|
||||
|
||||
```bash
|
||||
poetry run djlint --reformat .
|
||||
```
|
||||
|
||||
## Test sending email
|
||||
|
||||
[swaks](http://www.jetmore.org/john/code/swaks/) is used for sending test emails to the `email_handler`.
|
||||
|
@ -534,3 +534,4 @@ SKIP_MX_LOOKUP_ON_CHECK = False
|
||||
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
|
||||
|
||||
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)
|
||||
MAX_API_KEYS = int(os.environ.get("MAX_API_KEYS", 30))
|
||||
|
@ -3,9 +3,11 @@ from flask_login import login_required, current_user
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import StringField, validators
|
||||
|
||||
from app import config
|
||||
from app.dashboard.base import dashboard_bp
|
||||
from app.dashboard.views.enter_sudo import sudo_required
|
||||
from app.db import Session
|
||||
from app.extensions import limiter
|
||||
from app.models import ApiKey
|
||||
from app.utils import CSRFValidationForm
|
||||
|
||||
@ -14,9 +16,32 @@ class NewApiKeyForm(FlaskForm):
|
||||
name = StringField("Name", validators=[validators.DataRequired()])
|
||||
|
||||
|
||||
def clean_up_unused_or_old_api_keys(user_id: int):
|
||||
total_keys = ApiKey.filter_by(user_id=user_id).count()
|
||||
# Remove oldest unused
|
||||
for api_key in (
|
||||
ApiKey.filter_by(user_id=user_id, last_used=None)
|
||||
.order_by(ApiKey.created_at.asc())
|
||||
.all()
|
||||
):
|
||||
Session.delete(api_key)
|
||||
total_keys -= 1
|
||||
if total_keys <= config.MAX_API_KEYS:
|
||||
return
|
||||
# Clean up oldest used
|
||||
for api_key in (
|
||||
ApiKey.filter_by(user_id=user_id).order_by(ApiKey.last_used.asc()).all()
|
||||
):
|
||||
Session.delete(api_key)
|
||||
total_keys -= 1
|
||||
if total_keys <= config.MAX_API_KEYS:
|
||||
return
|
||||
|
||||
|
||||
@dashboard_bp.route("/api_key", methods=["GET", "POST"])
|
||||
@login_required
|
||||
@sudo_required
|
||||
@limiter.limit("10/hour")
|
||||
def api_key():
|
||||
api_keys = (
|
||||
ApiKey.filter(ApiKey.user_id == current_user.id)
|
||||
@ -50,6 +75,7 @@ def api_key():
|
||||
|
||||
elif request.form.get("form-name") == "create":
|
||||
if new_api_key_form.validate():
|
||||
clean_up_unused_or_old_api_keys(current_user.id)
|
||||
new_api_key = ApiKey.create(
|
||||
name=new_api_key_form.name.data, user_id=current_user.id
|
||||
)
|
||||
|
@ -951,6 +951,8 @@ def add_header(msg: Message, text_header, html_header=None) -> Message:
|
||||
for part in msg.get_payload():
|
||||
if isinstance(part, Message):
|
||||
new_parts.append(add_header(part, text_header, html_header))
|
||||
elif isinstance(part, str):
|
||||
new_parts.append(MIMEText(part))
|
||||
else:
|
||||
new_parts.append(part)
|
||||
clone_msg = copy(msg)
|
||||
@ -959,7 +961,14 @@ def add_header(msg: Message, text_header, html_header=None) -> Message:
|
||||
|
||||
elif content_type in ("multipart/mixed", "multipart/signed"):
|
||||
new_parts = []
|
||||
parts = list(msg.get_payload())
|
||||
payload = msg.get_payload()
|
||||
if isinstance(payload, str):
|
||||
# The message is badly formatted inject as new
|
||||
new_parts = [MIMEText(text_header, "plain"), MIMEText(payload, "plain")]
|
||||
clone_msg = copy(msg)
|
||||
clone_msg.set_payload(new_parts)
|
||||
return clone_msg
|
||||
parts = list(payload)
|
||||
LOG.d("only add header for the first part for %s", content_type)
|
||||
for ix, part in enumerate(parts):
|
||||
if ix == 0:
|
||||
|
@ -74,8 +74,8 @@ class UnsubscribeEncoder:
|
||||
)
|
||||
signed_data = cls._get_signer().sign(serialized_data).decode("utf-8")
|
||||
encoded_request = f"{UNSUB_PREFIX}.{signed_data}"
|
||||
if len(encoded_request) > 256:
|
||||
LOG.e("Encoded request is longer than 256 chars")
|
||||
if len(encoded_request) > 512:
|
||||
LOG.w("Encoded request is longer than 512 chars")
|
||||
return encoded_request
|
||||
|
||||
@staticmethod
|
||||
|
@ -64,8 +64,8 @@ class UnsubscribeGenerator:
|
||||
return message
|
||||
elif not mailto_unsubs:
|
||||
LOG.debug("No unsubs. Deleting all unsub headers")
|
||||
message = delete_header(message, headers.LIST_UNSUBSCRIBE)
|
||||
message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
|
||||
delete_header(message, headers.LIST_UNSUBSCRIBE)
|
||||
delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
|
||||
return message
|
||||
unsub_data = UnsubscribeData(
|
||||
UnsubscribeAction.OriginalUnsubscribeMailto,
|
||||
|
@ -46,6 +46,7 @@ class SendRequest:
|
||||
"mail_options": self.mail_options,
|
||||
"rcpt_options": self.rcpt_options,
|
||||
"is_forward": self.is_forward,
|
||||
"retries": self.retries,
|
||||
}
|
||||
return json.dumps(data).encode("utf-8")
|
||||
|
||||
@ -66,6 +67,7 @@ class SendRequest:
|
||||
mail_options=decoded_data["mail_options"],
|
||||
rcpt_options=decoded_data["rcpt_options"],
|
||||
is_forward=decoded_data["is_forward"],
|
||||
retries=decoded_data.get("retries", 1),
|
||||
)
|
||||
|
||||
def save_request_to_unsent_dir(self, prefix: str = "DeliveryFail"):
|
||||
|
@ -899,7 +899,11 @@ def forward_email_to_mailbox(
|
||||
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
|
||||
if user.include_header_email_header:
|
||||
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
|
||||
msg[headers.SL_ORIGINAL_FROM] = contact.website_email
|
||||
if contact.name:
|
||||
original_from = f"{contact.name} <{contact.website_email}>"
|
||||
else:
|
||||
original_from = contact.website_email
|
||||
msg[headers.SL_ORIGINAL_FROM] = original_from
|
||||
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email
|
||||
msg[headers.SL_ENVELOPE_TO] = alias.email
|
||||
|
||||
|
@ -9,10 +9,13 @@
|
||||
<h1 class="card-title">Create new account</h1>
|
||||
<div class="form-group">
|
||||
<label class="form-label">Email address</label>
|
||||
{{ form.email(class="form-control", type="email") }}
|
||||
{{ form.email(class="form-control", type="email", placeholder="YourName@protonmail.com") }}
|
||||
<div class="small-text alert alert-info" style="margin-top: 1px">
|
||||
Emails sent to your alias will be forwarded to this email address.
|
||||
<br>
|
||||
It can't be a disposable or forwarding email address.
|
||||
<br>
|
||||
We recommend using a <a href="https://proton.me/mail" target="_blank">Proton Mail</a> address
|
||||
</div>
|
||||
{{ render_field_errors(form.email) }}
|
||||
</div>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<div class="page-single">
|
||||
<div class="container">
|
||||
<div class="row">
|
||||
<div class="col mx-auto" style="max-width: 28rem">
|
||||
<div class="col mx-auto" style="max-width: 32rem">
|
||||
<div class="text-center mb-6">
|
||||
<a href="{{ LANDING_PAGE_URL }}">
|
||||
<img src="/static/logo.svg"
|
||||
|
@ -1,10 +1,13 @@
|
||||
from time import time
|
||||
|
||||
import arrow
|
||||
from flask import url_for
|
||||
|
||||
from app import config
|
||||
from app.dashboard.views.api_key import clean_up_unused_or_old_api_keys
|
||||
from app.db import Session
|
||||
from app.models import User, ApiKey
|
||||
from tests.utils import login
|
||||
from tests.utils import login, create_new_user
|
||||
|
||||
|
||||
def test_api_key_page_requires_password(flask_client):
|
||||
@ -87,3 +90,26 @@ def test_delete_all_api_keys(flask_client):
|
||||
assert (
|
||||
ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1
|
||||
) # assert that user 2 still has 1 API key
|
||||
|
||||
|
||||
def test_cleanup_api_keys():
|
||||
user = create_new_user()
|
||||
ApiKey.create(
|
||||
user_id=user.id, name="used", last_used=arrow.utcnow().shift(days=-3), times=1
|
||||
)
|
||||
ApiKey.create(
|
||||
user_id=user.id, name="keep 1", last_used=arrow.utcnow().shift(days=-2), times=1
|
||||
)
|
||||
ApiKey.create(
|
||||
user_id=user.id, name="keep 2", last_used=arrow.utcnow().shift(days=-1), times=1
|
||||
)
|
||||
ApiKey.create(user_id=user.id, name="not used", last_used=None, times=1)
|
||||
Session.flush()
|
||||
old_max_api_keys = config.MAX_API_KEYS
|
||||
config.MAX_API_KEYS = 2
|
||||
clean_up_unused_or_old_api_keys(user.id)
|
||||
keys = ApiKey.filter_by(user_id=user.id).all()
|
||||
assert len(keys) == 2
|
||||
assert keys[0].name.find("keep") == 0
|
||||
assert keys[1].name.find("keep") == 0
|
||||
config.MAX_API_KEYS = old_max_api_keys
|
||||
|
21
app/tests/example_emls/add_header_multipart.eml
Normal file
21
app/tests/example_emls/add_header_multipart.eml
Normal file
@ -0,0 +1,21 @@
|
||||
Sender: somebody@somewhere.net
|
||||
Content-Type: multipart/mixed; boundary="----=_Part_3946_1099248058.1688752298149"
|
||||
|
||||
--0c916c9b5fe3c925d7bafeb988bb6794
|
||||
Content-Type: text/plain; charset="UTF-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
notification test
|
||||
|
||||
--0c916c9b5fe3c925d7bafeb988bb6794
|
||||
Content-Type: text/html; charset="UTF-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
<html><head><meta http-equiv=3D"Content-Type" content=3D"text/html; charset=
|
||||
=3DUTF-8"><meta http-equiv=3D"X-UA-Compatible" content=3D"IE=3Dedge"><meta =
|
||||
name=3D"format-detection" content=3D"telephone=3Dno"><meta name=3D"viewport=
|
||||
" content=3D"width=3Ddevice-width, initial-scale=3D1.0">
|
||||
|
||||
--0c916c9b5fe3c925d7bafeb988bb6794--
|
||||
|
||||
|
27
app/tests/example_emls/email_to_pgp_encrypt.eml
Normal file
27
app/tests/example_emls/email_to_pgp_encrypt.eml
Normal file
@ -0,0 +1,27 @@
|
||||
From: {{sender_address}}
|
||||
To: {{recipient_address}}
|
||||
Subject: Test subject
|
||||
Content-Type: multipart/alternative; boundary="MLF8fvg556fdhFDH7=_?:
|
||||
|
||||
--MLF8fvg556fdhFDH7=_?:
|
||||
Content-Type: text/plain;
|
||||
charset="utf-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
*************************************************************************
|
||||
|
||||
This five-part limited series, based on the brilliant graphic novel by Me
|
||||
|
||||
--MLF8fvg556fdhFDH7=_?:
|
||||
Content-Type: text/html;
|
||||
charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
--MLF8fvg556fdhFDH7=_?:
|
||||
Content-Type: text/plain;
|
||||
charset="utf-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
*************************************************************************
|
||||
*************************************************************************
|
||||
|
||||
|
33
app/tests/handler/test_encrypt_pgp.py
Normal file
33
app/tests/handler/test_encrypt_pgp.py
Normal file
@ -0,0 +1,33 @@
|
||||
from aiosmtpd.smtp import Envelope
|
||||
|
||||
import email_handler
|
||||
from app.config import get_abs_path
|
||||
from app.db import Session
|
||||
from app.pgp_utils import load_public_key
|
||||
from tests.utils import create_new_user, load_eml_file, random_email
|
||||
|
||||
from app.models import Alias
|
||||
|
||||
|
||||
def test_encrypt_with_pgp():
|
||||
user = create_new_user()
|
||||
pgp_public_key = open(get_abs_path("local_data/public-pgp.asc")).read()
|
||||
mailbox = user.default_mailbox
|
||||
mailbox.pgp_public_key = pgp_public_key
|
||||
mailbox.generic_subject = True
|
||||
mailbox.pgp_finger_print = load_public_key(pgp_public_key)
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.flush()
|
||||
sender_address = random_email()
|
||||
msg = load_eml_file(
|
||||
"email_to_pgp_encrypt.eml",
|
||||
{
|
||||
"sender_address": sender_address,
|
||||
"recipient_address": alias.email,
|
||||
},
|
||||
)
|
||||
envelope = Envelope()
|
||||
envelope.mail_from = sender_address
|
||||
envelope.rcpt_tos = [alias.email]
|
||||
result = email_handler.MailHandler()._handle(envelope, msg)
|
||||
assert result is not None
|
@ -5,6 +5,7 @@ from app.db import Session
|
||||
from app.email import headers, status
|
||||
from app.mail_sender import mail_sender
|
||||
from app.models import Alias
|
||||
from app.utils import random_string
|
||||
from tests.utils import create_new_user, load_eml_file, random_email
|
||||
|
||||
|
||||
@ -37,3 +38,37 @@ def test_original_headers_from_preserved():
|
||||
request.msg[headers.AUTHENTICATION_RESULTS]
|
||||
== msg[headers.AUTHENTICATION_RESULTS]
|
||||
)
|
||||
|
||||
|
||||
@mail_sender.store_emails_test_decorator
|
||||
def test_original_headers_from_with_name_preserved():
|
||||
user = create_new_user()
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.flush()
|
||||
assert user.include_header_email_header
|
||||
original_sender_address = random_email()
|
||||
name = random_string(10)
|
||||
msg = load_eml_file(
|
||||
"replacement_on_forward_phase.eml",
|
||||
{
|
||||
"sender_address": f"{name} <{original_sender_address}>",
|
||||
"recipient_address": alias.email,
|
||||
"cc_address": random_email(),
|
||||
},
|
||||
)
|
||||
envelope = Envelope()
|
||||
envelope.mail_from = f"env.{original_sender_address}"
|
||||
envelope.rcpt_tos = [alias.email]
|
||||
result = email_handler.MailHandler()._handle(envelope, msg)
|
||||
assert result == status.E200
|
||||
send_requests = mail_sender.get_stored_emails()
|
||||
assert len(send_requests) == 1
|
||||
request = send_requests[0]
|
||||
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
|
||||
assert (
|
||||
request.msg[headers.SL_ORIGINAL_FROM] == f"{name} <{original_sender_address}>"
|
||||
)
|
||||
assert (
|
||||
request.msg[headers.AUTHENTICATION_RESULTS]
|
||||
== msg[headers.AUTHENTICATION_RESULTS]
|
||||
)
|
||||
|
@ -810,7 +810,7 @@ def test_add_header_multipart_with_invalid_part():
|
||||
if i < 2:
|
||||
assert part.get_payload().index("INJECT") > -1
|
||||
else:
|
||||
assert part == "invalid"
|
||||
assert part.get_payload() == "invalid"
|
||||
|
||||
|
||||
def test_sl_formataddr():
|
||||
@ -822,3 +822,10 @@ def test_sl_formataddr():
|
||||
# test that the same name-address can't be handled by the built-in formataddr
|
||||
with pytest.raises(UnicodeEncodeError):
|
||||
formataddr(("é", "è@ç.à"))
|
||||
|
||||
|
||||
def test_add_header_to_invalid_multipart():
|
||||
msg = load_eml_file("add_header_multipart.eml")
|
||||
msg = add_header(msg, "test", "test")
|
||||
data = msg.as_string()
|
||||
assert data != ""
|
||||
|
Reference in New Issue
Block a user