4.31.0
This commit is contained in:
parent
cd77e4cc2d
commit
38c134d903
@ -534,3 +534,4 @@ SKIP_MX_LOOKUP_ON_CHECK = False
|
|||||||
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
|
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
|
||||||
|
|
||||||
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)
|
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)
|
||||||
|
MAX_API_KEYS = int(os.environ.get("MAX_API_KEYS", 30))
|
||||||
|
@ -3,9 +3,11 @@ from flask_login import login_required, current_user
|
|||||||
from flask_wtf import FlaskForm
|
from flask_wtf import FlaskForm
|
||||||
from wtforms import StringField, validators
|
from wtforms import StringField, validators
|
||||||
|
|
||||||
|
from app import config
|
||||||
from app.dashboard.base import dashboard_bp
|
from app.dashboard.base import dashboard_bp
|
||||||
from app.dashboard.views.enter_sudo import sudo_required
|
from app.dashboard.views.enter_sudo import sudo_required
|
||||||
from app.db import Session
|
from app.db import Session
|
||||||
|
from app.extensions import limiter
|
||||||
from app.models import ApiKey
|
from app.models import ApiKey
|
||||||
from app.utils import CSRFValidationForm
|
from app.utils import CSRFValidationForm
|
||||||
|
|
||||||
@ -14,9 +16,32 @@ class NewApiKeyForm(FlaskForm):
|
|||||||
name = StringField("Name", validators=[validators.DataRequired()])
|
name = StringField("Name", validators=[validators.DataRequired()])
|
||||||
|
|
||||||
|
|
||||||
|
def clean_up_unused_or_old_api_keys(user_id: int):
|
||||||
|
total_keys = ApiKey.filter_by(user_id=user_id).count()
|
||||||
|
# Remove oldest unused
|
||||||
|
for api_key in (
|
||||||
|
ApiKey.filter_by(user_id=user_id, last_used=None)
|
||||||
|
.order_by(ApiKey.created_at.asc())
|
||||||
|
.all()
|
||||||
|
):
|
||||||
|
Session.delete(api_key)
|
||||||
|
total_keys -= 1
|
||||||
|
if total_keys <= config.MAX_API_KEYS:
|
||||||
|
return
|
||||||
|
# Clean up oldest used
|
||||||
|
for api_key in (
|
||||||
|
ApiKey.filter_by(user_id=user_id).order_by(ApiKey.last_used.asc()).all()
|
||||||
|
):
|
||||||
|
Session.delete(api_key)
|
||||||
|
total_keys -= 1
|
||||||
|
if total_keys <= config.MAX_API_KEYS:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
@dashboard_bp.route("/api_key", methods=["GET", "POST"])
|
@dashboard_bp.route("/api_key", methods=["GET", "POST"])
|
||||||
@login_required
|
@login_required
|
||||||
@sudo_required
|
@sudo_required
|
||||||
|
@limiter.limit("10/hour")
|
||||||
def api_key():
|
def api_key():
|
||||||
api_keys = (
|
api_keys = (
|
||||||
ApiKey.filter(ApiKey.user_id == current_user.id)
|
ApiKey.filter(ApiKey.user_id == current_user.id)
|
||||||
@ -50,6 +75,7 @@ def api_key():
|
|||||||
|
|
||||||
elif request.form.get("form-name") == "create":
|
elif request.form.get("form-name") == "create":
|
||||||
if new_api_key_form.validate():
|
if new_api_key_form.validate():
|
||||||
|
clean_up_unused_or_old_api_keys(current_user.id)
|
||||||
new_api_key = ApiKey.create(
|
new_api_key = ApiKey.create(
|
||||||
name=new_api_key_form.name.data, user_id=current_user.id
|
name=new_api_key_form.name.data, user_id=current_user.id
|
||||||
)
|
)
|
||||||
|
@ -899,7 +899,11 @@ def forward_email_to_mailbox(
|
|||||||
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
|
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
|
||||||
if user.include_header_email_header:
|
if user.include_header_email_header:
|
||||||
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
|
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
|
||||||
msg[headers.SL_ORIGINAL_FROM] = contact.website_email
|
if contact.name:
|
||||||
|
original_from = f"{contact.name} <{contact.website_email}>"
|
||||||
|
else:
|
||||||
|
original_from = contact.website_email
|
||||||
|
msg[headers.SL_ORIGINAL_FROM] = original_from
|
||||||
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email
|
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email
|
||||||
msg[headers.SL_ENVELOPE_TO] = alias.email
|
msg[headers.SL_ENVELOPE_TO] = alias.email
|
||||||
|
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
|
import arrow
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
|
|
||||||
|
from app import config
|
||||||
|
from app.dashboard.views.api_key import clean_up_unused_or_old_api_keys
|
||||||
from app.db import Session
|
from app.db import Session
|
||||||
from app.models import User, ApiKey
|
from app.models import User, ApiKey
|
||||||
from tests.utils import login
|
from tests.utils import login, create_new_user
|
||||||
|
|
||||||
|
|
||||||
def test_api_key_page_requires_password(flask_client):
|
def test_api_key_page_requires_password(flask_client):
|
||||||
@ -87,3 +90,26 @@ def test_delete_all_api_keys(flask_client):
|
|||||||
assert (
|
assert (
|
||||||
ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1
|
ApiKey.filter(ApiKey.user_id == user_2.id).count() == 1
|
||||||
) # assert that user 2 still has 1 API key
|
) # assert that user 2 still has 1 API key
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_api_keys():
|
||||||
|
user = create_new_user()
|
||||||
|
ApiKey.create(
|
||||||
|
user_id=user.id, name="used", last_used=arrow.utcnow().shift(days=-3), times=1
|
||||||
|
)
|
||||||
|
ApiKey.create(
|
||||||
|
user_id=user.id, name="keep 1", last_used=arrow.utcnow().shift(days=-2), times=1
|
||||||
|
)
|
||||||
|
ApiKey.create(
|
||||||
|
user_id=user.id, name="keep 2", last_used=arrow.utcnow().shift(days=-1), times=1
|
||||||
|
)
|
||||||
|
ApiKey.create(user_id=user.id, name="not used", last_used=None, times=1)
|
||||||
|
Session.flush()
|
||||||
|
old_max_api_keys = config.MAX_API_KEYS
|
||||||
|
config.MAX_API_KEYS = 2
|
||||||
|
clean_up_unused_or_old_api_keys(user.id)
|
||||||
|
keys = ApiKey.filter_by(user_id=user.id).all()
|
||||||
|
assert len(keys) == 2
|
||||||
|
assert keys[0].name.find("keep") == 0
|
||||||
|
assert keys[1].name.find("keep") == 0
|
||||||
|
config.MAX_API_KEYS = old_max_api_keys
|
||||||
|
@ -5,6 +5,7 @@ from app.db import Session
|
|||||||
from app.email import headers, status
|
from app.email import headers, status
|
||||||
from app.mail_sender import mail_sender
|
from app.mail_sender import mail_sender
|
||||||
from app.models import Alias
|
from app.models import Alias
|
||||||
|
from app.utils import random_string
|
||||||
from tests.utils import create_new_user, load_eml_file, random_email
|
from tests.utils import create_new_user, load_eml_file, random_email
|
||||||
|
|
||||||
|
|
||||||
@ -37,3 +38,37 @@ def test_original_headers_from_preserved():
|
|||||||
request.msg[headers.AUTHENTICATION_RESULTS]
|
request.msg[headers.AUTHENTICATION_RESULTS]
|
||||||
== msg[headers.AUTHENTICATION_RESULTS]
|
== msg[headers.AUTHENTICATION_RESULTS]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mail_sender.store_emails_test_decorator
|
||||||
|
def test_original_headers_from_with_name_preserved():
|
||||||
|
user = create_new_user()
|
||||||
|
alias = Alias.create_new_random(user)
|
||||||
|
Session.flush()
|
||||||
|
assert user.include_header_email_header
|
||||||
|
original_sender_address = random_email()
|
||||||
|
name = random_string(10)
|
||||||
|
msg = load_eml_file(
|
||||||
|
"replacement_on_forward_phase.eml",
|
||||||
|
{
|
||||||
|
"sender_address": f"{name} <{original_sender_address}>",
|
||||||
|
"recipient_address": alias.email,
|
||||||
|
"cc_address": random_email(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
envelope = Envelope()
|
||||||
|
envelope.mail_from = f"env.{original_sender_address}"
|
||||||
|
envelope.rcpt_tos = [alias.email]
|
||||||
|
result = email_handler.MailHandler()._handle(envelope, msg)
|
||||||
|
assert result == status.E200
|
||||||
|
send_requests = mail_sender.get_stored_emails()
|
||||||
|
assert len(send_requests) == 1
|
||||||
|
request = send_requests[0]
|
||||||
|
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
|
||||||
|
assert (
|
||||||
|
request.msg[headers.SL_ORIGINAL_FROM] == f"{name} <{original_sender_address}>"
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
request.msg[headers.AUTHENTICATION_RESULTS]
|
||||||
|
== msg[headers.AUTHENTICATION_RESULTS]
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user