Compare commits

...

3 Commits

Author SHA1 Message Date
87aedf3207 4.30.0 2023-06-27 11:00:04 +00:00
3523c9fc15 4.29.4 2023-06-07 11:00:05 +00:00
a6f4995cb5 4.29.3 2023-06-01 11:00:05 +00:00
25 changed files with 669 additions and 81 deletions

View File

@ -23,10 +23,10 @@ COPY poetry.lock pyproject.toml ./
# Install and setup poetry # Install and setup poetry
RUN pip install -U pip \ RUN pip install -U pip \
&& apt-get update \ && apt-get update \
&& apt install -y curl netcat gcc python3-dev gnupg git libre2-dev \ && apt install -y curl netcat-traditional gcc python3-dev gnupg git libre2-dev \
&& curl -sSL https://install.python-poetry.org | python3 - \ && curl -sSL https://install.python-poetry.org | python3 - \
# Remove curl and netcat from the image # Remove curl and netcat from the image
&& apt-get purge -y curl netcat \ && apt-get purge -y curl netcat-traditional \
# Run poetry # Run poetry
&& poetry config virtualenvs.create false \ && poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --no-root \ && poetry install --no-interaction --no-ansi --no-root \

View File

@ -6,7 +6,7 @@ from typing import Optional
import itsdangerous import itsdangerous
from app import config from app import config
from app.log import LOG from app.log import LOG
from app.models import User, AliasOptions from app.models import User, AliasOptions, SLDomain
signer = itsdangerous.TimestampSigner(config.CUSTOM_ALIAS_SECRET) signer = itsdangerous.TimestampSigner(config.CUSTOM_ALIAS_SECRET)
@ -105,10 +105,7 @@ def get_alias_suffixes(
for custom_domain in user_custom_domains: for custom_domain in user_custom_domains:
if custom_domain.random_prefix_generation: if custom_domain.random_prefix_generation:
suffix = ( suffix = (
"." f".{user.get_random_alias_suffix(custom_domain)}@{custom_domain.domain}"
+ user.get_random_alias_suffix(custom_domain)
+ "@"
+ custom_domain.domain
) )
alias_suffix = AliasSuffix( alias_suffix = AliasSuffix(
is_custom=True, is_custom=True,
@ -123,7 +120,7 @@ def get_alias_suffixes(
else: else:
alias_suffixes.append(alias_suffix) alias_suffixes.append(alias_suffix)
suffix = "@" + custom_domain.domain suffix = f"@{custom_domain.domain}"
alias_suffix = AliasSuffix( alias_suffix = AliasSuffix(
is_custom=True, is_custom=True,
suffix=suffix, suffix=suffix,
@ -144,16 +141,13 @@ def get_alias_suffixes(
alias_suffixes.append(alias_suffix) alias_suffixes.append(alias_suffix)
# then SimpleLogin domain # then SimpleLogin domain
for sl_domain in user.get_sl_domains(alias_options=alias_options): sl_domains = user.get_sl_domains(alias_options=alias_options)
suffix = ( default_domain_found = False
( for sl_domain in sl_domains:
"" prefix = (
if config.DISABLE_ALIAS_SUFFIX "" if config.DISABLE_ALIAS_SUFFIX else f".{user.get_random_alias_suffix()}"
else "." + user.get_random_alias_suffix()
)
+ "@"
+ sl_domain.domain
) )
suffix = f"{prefix}@{sl_domain.domain}"
alias_suffix = AliasSuffix( alias_suffix = AliasSuffix(
is_custom=False, is_custom=False,
suffix=suffix, suffix=suffix,
@ -162,11 +156,36 @@ def get_alias_suffixes(
domain=sl_domain.domain, domain=sl_domain.domain,
mx_verified=True, mx_verified=True,
) )
# No default or this is not the default
# put the default domain to top if (
if user.default_alias_public_domain_id == sl_domain.id: user.default_alias_public_domain_id is None
alias_suffixes.insert(0, alias_suffix) or user.default_alias_public_domain_id != sl_domain.id
else: ):
alias_suffixes.append(alias_suffix) alias_suffixes.append(alias_suffix)
else:
default_domain_found = True
alias_suffixes.insert(0, alias_suffix)
if not default_domain_found:
domain_conditions = {"id": user.default_alias_public_domain_id, "hidden": False}
if not user.is_premium():
domain_conditions["premium_only"] = False
sl_domain = SLDomain.get_by(**domain_conditions)
if sl_domain:
prefix = (
""
if config.DISABLE_ALIAS_SUFFIX
else f".{user.get_random_alias_suffix()}"
)
suffix = f"{prefix}@{sl_domain.domain}"
alias_suffix = AliasSuffix(
is_custom=False,
suffix=suffix,
signed_suffix=signer.sign(suffix).decode(),
is_premium=sl_domain.premium_only,
domain=sl_domain.domain,
mx_verified=True,
)
alias_suffixes.insert(0, alias_suffix)
return alias_suffixes return alias_suffixes

View File

@ -57,6 +57,8 @@ def get_user_if_alias_would_auto_create(
domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain( domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user address, notify_user=notify_user
) )
if DomainDeletedAlias.get_by(email=address):
return None
if domain_and_rule: if domain_and_rule:
return domain_and_rule[0].user return domain_and_rule[0].user
directory = check_if_alias_can_be_auto_created_for_a_directory( directory = check_if_alias_can_be_auto_created_for_a_directory(

View File

@ -9,6 +9,7 @@ from requests import RequestException
from app.api.base import api_bp, require_api_auth from app.api.base import api_bp, require_api_auth
from app.config import APPLE_API_SECRET, MACAPP_APPLE_API_SECRET from app.config import APPLE_API_SECRET, MACAPP_APPLE_API_SECRET
from app.subscription_webhook import execute_subscription_webhook
from app.db import Session from app.db import Session
from app.log import LOG from app.log import LOG
from app.models import PlanEnum, AppleSubscription from app.models import PlanEnum, AppleSubscription
@ -50,6 +51,7 @@ def apple_process_payment():
apple_sub = verify_receipt(receipt_data, user, password) apple_sub = verify_receipt(receipt_data, user, password)
if apple_sub: if apple_sub:
execute_subscription_webhook(user)
return jsonify(ok=True), 200 return jsonify(ok=True), 200
return jsonify(error="Processing failed"), 400 return jsonify(error="Processing failed"), 400
@ -282,6 +284,7 @@ def apple_update_notification():
apple_sub.plan = plan apple_sub.plan = plan
apple_sub.product_id = transaction["product_id"] apple_sub.product_id = transaction["product_id"]
Session.commit() Session.commit()
execute_subscription_webhook(user)
return jsonify(ok=True), 200 return jsonify(ok=True), 200
else: else:
LOG.w( LOG.w(
@ -554,6 +557,7 @@ def verify_receipt(receipt_data, user, password) -> Optional[AppleSubscription]:
product_id=latest_transaction["product_id"], product_id=latest_transaction["product_id"],
) )
execute_subscription_webhook(user)
Session.commit() Session.commit()
return apple_sub return apple_sub

View File

@ -532,3 +532,5 @@ if ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT:
SKIP_MX_LOOKUP_ON_CHECK = False SKIP_MX_LOOKUP_ON_CHECK = False
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
SUBSCRIPTION_CHANGE_WEBHOOK = os.environ.get("SUBSCRIPTION_CHANGE_WEBHOOK", None)

View File

@ -1,3 +1,7 @@
import base64
import binascii
import json
import arrow import arrow
from flask import render_template, request, redirect, url_for, flash from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user from flask_login import login_required, current_user
@ -180,7 +184,9 @@ def mailbox_route():
def send_verification_email(user, mailbox): def send_verification_email(user, mailbox):
s = TimestampSigner(MAILBOX_SECRET) s = TimestampSigner(MAILBOX_SECRET)
mailbox_id_signed = s.sign(str(mailbox.id)).decode() encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8")
b64_data = base64.urlsafe_b64encode(encoded_data)
mailbox_id_signed = s.sign(b64_data).decode()
verification_url = ( verification_url = (
URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}" URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}"
) )
@ -205,22 +211,34 @@ def send_verification_email(user, mailbox):
@dashboard_bp.route("/mailbox_verify") @dashboard_bp.route("/mailbox_verify")
def mailbox_verify(): def mailbox_verify():
s = TimestampSigner(MAILBOX_SECRET) s = TimestampSigner(MAILBOX_SECRET)
mailbox_id = request.args.get("mailbox_id") mailbox_verify_request = request.args.get("mailbox_id")
try: try:
r_id = int(s.unsign(mailbox_id, max_age=900)) mailbox_raw_data = s.unsign(mailbox_verify_request, max_age=900)
except Exception: except Exception:
flash("Invalid link. Please delete and re-add your mailbox", "error") flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route")) return redirect(url_for("dashboard.mailbox_route"))
else: try:
mailbox = Mailbox.get(r_id) decoded_data = base64.urlsafe_b64decode(mailbox_raw_data)
if not mailbox: except binascii.Error:
flash("Invalid link", "error") flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route")) return redirect(url_for("dashboard.mailbox_route"))
mailbox_data = json.loads(decoded_data)
if not isinstance(mailbox_data, list) or len(mailbox_data) != 2:
flash("Invalid link. Please delete and re-add your mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_id = mailbox_data[0]
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox_email = mailbox_data[1]
if mailbox_email != mailbox.email:
flash("Invalid link", "error")
return redirect(url_for("dashboard.mailbox_route"))
mailbox.verified = True mailbox.verified = True
Session.commit() Session.commit()
LOG.d("Mailbox %s is verified", mailbox) LOG.d("Mailbox %s is verified", mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox) return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)

View File

@ -206,7 +206,7 @@ def setting():
current_profile_file is not None current_profile_file is not None
and current_profile_file.user_id == current_user.id and current_profile_file.user_id == current_user.id
): ):
s3.delete(current_user.path) s3.delete(current_profile_file.path)
file_path = random_string(30) file_path = random_string(30)
file = File.create(user_id=current_user.id, path=file_path) file = File.create(user_id=current_user.id, path=file_path)

View File

@ -20,6 +20,7 @@ X_SPAM_STATUS = "X-Spam-Status"
LIST_UNSUBSCRIBE = "List-Unsubscribe" LIST_UNSUBSCRIBE = "List-Unsubscribe"
LIST_UNSUBSCRIBE_POST = "List-Unsubscribe-Post" LIST_UNSUBSCRIBE_POST = "List-Unsubscribe-Post"
RETURN_PATH = "Return-Path" RETURN_PATH = "Return-Path"
AUTHENTICATION_RESULTS = "Authentication-Results"
# headers used to DKIM sign in order of preference # headers used to DKIM sign in order of preference
DKIM_HEADERS = [ DKIM_HEADERS = [
@ -32,6 +33,7 @@ DKIM_HEADERS = [
SL_DIRECTION = "X-SimpleLogin-Type" SL_DIRECTION = "X-SimpleLogin-Type"
SL_EMAIL_LOG_ID = "X-SimpleLogin-EmailLog-ID" SL_EMAIL_LOG_ID = "X-SimpleLogin-EmailLog-ID"
SL_ENVELOPE_FROM = "X-SimpleLogin-Envelope-From" SL_ENVELOPE_FROM = "X-SimpleLogin-Envelope-From"
SL_ORIGINAL_FROM = "X-SimpleLogin-Original-From"
SL_ENVELOPE_TO = "X-SimpleLogin-Envelope-To" SL_ENVELOPE_TO = "X-SimpleLogin-Envelope-To"
SL_CLIENT_IP = "X-SimpleLogin-Client-IP" SL_CLIENT_IP = "X-SimpleLogin-Client-IP"

View File

@ -9,6 +9,7 @@ from app.handler.unsubscribe_encoder import (
UnsubscribeData, UnsubscribeData,
UnsubscribeOriginalData, UnsubscribeOriginalData,
) )
from app.log import LOG
from app.models import Alias, Contact, UnsubscribeBehaviourEnum from app.models import Alias, Contact, UnsubscribeBehaviourEnum
@ -30,6 +31,7 @@ class UnsubscribeGenerator:
""" """
unsubscribe_data = message[headers.LIST_UNSUBSCRIBE] unsubscribe_data = message[headers.LIST_UNSUBSCRIBE]
if not unsubscribe_data: if not unsubscribe_data:
LOG.info("Email has no unsubscribe header")
return message return message
raw_methods = [method.strip() for method in unsubscribe_data.split(",")] raw_methods = [method.strip() for method in unsubscribe_data.split(",")]
mailto_unsubs = None mailto_unsubs = None
@ -44,7 +46,9 @@ class UnsubscribeGenerator:
if url_data.scheme == "mailto": if url_data.scheme == "mailto":
query_data = urllib.parse.parse_qs(url_data.query) query_data = urllib.parse.parse_qs(url_data.query)
mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0]) mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0])
LOG.debug(f"Unsub is mailto to {mailto_unsubs}")
else: else:
LOG.debug(f"Unsub has {url_data.scheme} scheme")
other_unsubs.append(method) other_unsubs.append(method)
# If there are non mailto unsubscribe methods, use those in the header # If there are non mailto unsubscribe methods, use those in the header
if other_unsubs: if other_unsubs:
@ -56,18 +60,19 @@ class UnsubscribeGenerator:
add_or_replace_header( add_or_replace_header(
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click" message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
) )
LOG.debug(f"Adding click unsub methods to header {other_unsubs}")
return message return message
if not mailto_unsubs: elif not mailto_unsubs:
LOG.debug("No unsubs. Deleting all unsub headers")
message = delete_header(message, headers.LIST_UNSUBSCRIBE) message = delete_header(message, headers.LIST_UNSUBSCRIBE)
message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST) message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
return message return message
return self._add_unsubscribe_header( unsub_data = UnsubscribeData(
message, UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeData( UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
),
) )
LOG.debug(f"Adding unsub data {unsub_data}")
return self._add_unsubscribe_header(message, unsub_data)
def _add_unsubscribe_header( def _add_unsubscribe_header(
self, message: Message, unsub: UnsubscribeData self, message: Message, unsub: UnsubscribeData

View File

@ -41,7 +41,7 @@ from app.models import (
class ExportUserDataJob: class ExportUserDataJob:
REMOVE_FIELDS = { REMOVE_FIELDS = {
"User": ("otp_secret",), "User": ("otp_secret", "password"),
"Alias": ("ts_vector", "transfer_token", "hibp_last_check"), "Alias": ("ts_vector", "transfer_token", "hibp_last_check"),
"CustomDomain": ("ownership_txt_token",), "CustomDomain": ("ownership_txt_token",),
} }

View File

@ -32,6 +32,7 @@ class SendRequest:
rcpt_options: Dict = {} rcpt_options: Dict = {}
is_forward: bool = False is_forward: bool = False
ignore_smtp_errors: bool = False ignore_smtp_errors: bool = False
retries: int = 0
def to_bytes(self) -> bytes: def to_bytes(self) -> bytes:
if not config.SAVE_UNSENT_DIR: if not config.SAVE_UNSENT_DIR:
@ -67,6 +68,30 @@ class SendRequest:
is_forward=decoded_data["is_forward"], is_forward=decoded_data["is_forward"],
) )
def save_request_to_unsent_dir(self, prefix: str = "DeliveryFail"):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
self.save_request_to_file(file_path)
@staticmethod
def save_request_to_failed_dir(self, prefix: str = "DeliveryRetryFail"):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
dir_name = os.path.join(config.SAVE_UNSENT_DIR, "failed")
if not os.path.isdir(dir_name):
os.makedirs(dir_name)
file_path = os.path.join(dir_name, file_name)
self.save_request_to_file(file_path)
def save_request_to_file(self, file_path: str):
file_contents = self.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
class MailSender: class MailSender:
def __init__(self): def __init__(self):
@ -171,21 +196,9 @@ class MailSender:
f"Could not send message to smtp server {config.POSTFIX_SERVER}:{config.POSTFIX_PORT}" f"Could not send message to smtp server {config.POSTFIX_SERVER}:{config.POSTFIX_PORT}"
) )
if config.SAVE_UNSENT_DIR: if config.SAVE_UNSENT_DIR:
self._save_request_to_unsent_dir(send_request) send_request.save_request_to_unsent_dir()
return False return False
def _save_request_to_unsent_dir(
self, send_request: SendRequest, prefix: str = "DeliveryFail"
):
file_name = (
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
)
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
file_contents = send_request.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
mail_sender = MailSender() mail_sender = MailSender()
@ -219,6 +232,7 @@ def load_unsent_mails_from_fs_and_resend():
LOG.i(f"Trying to re-deliver email {filename}") LOG.i(f"Trying to re-deliver email {filename}")
try: try:
send_request = SendRequest.load_from_file(full_file_path) send_request = SendRequest.load_from_file(full_file_path)
send_request.retries += 1
except Exception as e: except Exception as e:
LOG.e(f"Cannot load {filename}. Error {e}") LOG.e(f"Cannot load {filename}. Error {e}")
continue continue
@ -230,6 +244,11 @@ def load_unsent_mails_from_fs_and_resend():
"DeliverUnsentEmail", {"delivered": "true"} "DeliverUnsentEmail", {"delivered": "true"}
) )
else: else:
if send_request.retries > 2:
os.unlink(full_file_path)
send_request.save_request_to_failed_dir()
else:
send_request.save_request_to_file(full_file_path)
newrelic.agent.record_custom_event( newrelic.agent.record_custom_event(
"DeliverUnsentEmail", {"delivered": "false"} "DeliverUnsentEmail", {"delivered": "false"}
) )

View File

@ -445,7 +445,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
random_alias_suffix = sa.Column( random_alias_suffix = sa.Column(
sa.Integer, sa.Integer,
nullable=False, nullable=False,
default=AliasSuffixEnum.random_string.value, default=AliasSuffixEnum.word.value,
server_default=str(AliasSuffixEnum.random_string.value), server_default=str(AliasSuffixEnum.random_string.value),
) )
@ -514,9 +514,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
server_default=BlockBehaviourEnum.return_2xx.name, server_default=BlockBehaviourEnum.return_2xx.name,
) )
# to keep existing behavior, the server default is TRUE whereas for new user, the default value is FALSE
include_header_email_header = sa.Column( include_header_email_header = sa.Column(
sa.Boolean, default=False, nullable=False, server_default="1" sa.Boolean, default=True, nullable=False, server_default="1"
) )
# bitwise flags. Allow for future expansion # bitwise flags. Allow for future expansion
@ -673,6 +672,22 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return None return None
def get_active_subscription_end(
self, include_partner_subscription: bool = True
) -> Optional[arrow.Arrow]:
sub = self.get_active_subscription(
include_partner_subscription=include_partner_subscription
)
if isinstance(sub, Subscription):
return arrow.get(sub.next_bill_date)
if isinstance(sub, AppleSubscription):
return sub.expires_date
if isinstance(sub, ManualSubscription):
return sub.end_at
if isinstance(sub, CoinbaseSubscription):
return sub.end_at
return None
# region Billing # region Billing
def lifetime_or_active_subscription( def lifetime_or_active_subscription(
self, include_partner_subscription: bool = True self, include_partner_subscription: bool = True
@ -1000,6 +1015,10 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if not self.is_premium(): if not self.is_premium():
conditions.append(SLDomain.premium_only == False) # noqa: E712 conditions.append(SLDomain.premium_only == False) # noqa: E712
partner_domain_cond = [] # noqa:E711 partner_domain_cond = [] # noqa:E711
if self.default_alias_public_domain_id is not None:
partner_domain_cond.append(
SLDomain.id == self.default_alias_public_domain_id
)
if alias_options.show_partner_domains is not None: if alias_options.show_partner_domains is not None:
partner_user = PartnerUser.filter_by( partner_user = PartnerUser.filter_by(
user_id=self.id, partner_id=alias_options.show_partner_domains.id user_id=self.id, partner_id=alias_options.show_partner_domains.id

View File

@ -0,0 +1,33 @@
import requests
from requests import RequestException
from app import config
from app.log import LOG
from app.models import User
def execute_subscription_webhook(user: User):
webhook_url = config.SUBSCRIPTION_CHANGE_WEBHOOK
if webhook_url is None:
return
subscription_end = user.get_active_subscription_end(
include_partner_subscription=False
)
sl_subscription_end = None
if subscription_end:
sl_subscription_end = subscription_end.timestamp
payload = {
"user_id": user.id,
"is_premium": user.is_premium(),
"active_subscription_end": sl_subscription_end,
}
try:
response = requests.post(webhook_url, json=payload, timeout=2)
if response.status_code == 200:
LOG.i("Sent request to subscription update webhook successfully")
else:
LOG.i(
f"Request to webhook failed with statue {response.status_code}: {response.text}"
)
except RequestException as e:
LOG.error(f"Subscription request exception: {e}")

View File

@ -846,22 +846,23 @@ def forward_email_to_mailbox(
f"""Email sent to {alias.email} from an invalid address and cannot be replied""", f"""Email sent to {alias.email} from an invalid address and cannot be replied""",
) )
delete_all_headers_except( headers_to_keep = [
msg, headers.FROM,
[ headers.TO,
headers.FROM, headers.CC,
headers.TO, headers.SUBJECT,
headers.CC, headers.DATE,
headers.SUBJECT, # do not delete original message id
headers.DATE, headers.MESSAGE_ID,
# do not delete original message id # References and In-Reply-To are used for keeping the email thread
headers.MESSAGE_ID, headers.REFERENCES,
# References and In-Reply-To are used for keeping the email thread headers.IN_REPLY_TO,
headers.REFERENCES, headers.LIST_UNSUBSCRIBE,
headers.IN_REPLY_TO, headers.LIST_UNSUBSCRIBE_POST,
] ] + headers.MIME_HEADERS
+ headers.MIME_HEADERS, if user.include_header_email_header:
) headers_to_keep.append(headers.AUTHENTICATION_RESULTS)
delete_all_headers_except(msg, headers_to_keep)
# create PGP email if needed # create PGP email if needed
if mailbox.pgp_enabled() and user.is_premium() and not alias.disable_pgp: if mailbox.pgp_enabled() and user.is_premium() and not alias.disable_pgp:
@ -898,6 +899,7 @@ def forward_email_to_mailbox(
msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id) msg[headers.SL_EMAIL_LOG_ID] = str(email_log.id)
if user.include_header_email_header: if user.include_header_email_header:
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
msg[headers.SL_ORIGINAL_FROM] = contact.website_email
# when an alias isn't in the To: header, there's no way for users to know what alias has received the email # when an alias isn't in the To: header, there's no way for users to know what alias has received the email
msg[headers.SL_ENVELOPE_TO] = alias.email msg[headers.SL_ENVELOPE_TO] = alias.email

View File

@ -79,6 +79,7 @@ from app.config import (
MEM_STORE_URI, MEM_STORE_URI,
) )
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.subscription_webhook import execute_subscription_webhook
from app.db import Session from app.db import Session
from app.developer.base import developer_bp from app.developer.base import developer_bp
from app.discover.base import discover_bp from app.discover.base import discover_bp
@ -491,6 +492,7 @@ def setup_paddle_callback(app: Flask):
# in case user cancels a plan and subscribes a new plan # in case user cancels a plan and subscribes a new plan
sub.cancelled = False sub.cancelled = False
execute_subscription_webhook(user)
LOG.d("User %s upgrades!", user) LOG.d("User %s upgrades!", user)
Session.commit() Session.commit()
@ -509,6 +511,7 @@ def setup_paddle_callback(app: Flask):
).date() ).date()
Session.commit() Session.commit()
execute_subscription_webhook(sub.user)
elif request.form.get("alert_name") == "subscription_cancelled": elif request.form.get("alert_name") == "subscription_cancelled":
subscription_id = request.form.get("subscription_id") subscription_id = request.form.get("subscription_id")
@ -538,6 +541,7 @@ def setup_paddle_callback(app: Flask):
end_date=request.form.get("cancellation_effective_date"), end_date=request.form.get("cancellation_effective_date"),
), ),
) )
execute_subscription_webhook(sub.user)
else: else:
# user might have deleted their account # user might have deleted their account
@ -580,6 +584,7 @@ def setup_paddle_callback(app: Flask):
sub.cancelled = False sub.cancelled = False
Session.commit() Session.commit()
execute_subscription_webhook(sub.user)
else: else:
LOG.w( LOG.w(
f"update non-exist subscription {subscription_id}. {request.form}" f"update non-exist subscription {subscription_id}. {request.form}"
@ -596,6 +601,7 @@ def setup_paddle_callback(app: Flask):
Subscription.delete(sub.id) Subscription.delete(sub.id)
Session.commit() Session.commit()
LOG.e("%s requests a refund", user) LOG.e("%s requests a refund", user)
execute_subscription_webhook(sub.user)
elif request.form.get("alert_name") == "subscription_payment_refunded": elif request.form.get("alert_name") == "subscription_payment_refunded":
subscription_id = request.form.get("subscription_id") subscription_id = request.form.get("subscription_id")
@ -629,6 +635,7 @@ def setup_paddle_callback(app: Flask):
LOG.e("Unknown plan_id %s", plan_id) LOG.e("Unknown plan_id %s", plan_id)
else: else:
LOG.w("partial subscription_payment_refunded, not handled") LOG.w("partial subscription_payment_refunded, not handled")
execute_subscription_webhook(sub.user)
return "OK" return "OK"
@ -742,6 +749,7 @@ def handle_coinbase_event(event) -> bool:
coinbase_subscription=coinbase_subscription, coinbase_subscription=coinbase_subscription,
), ),
) )
execute_subscription_webhook(user)
return True return True

View File

@ -684,7 +684,8 @@
SimpleLogin forwards emails to your mailbox from the <b>reverse-alias</b> and not from the <b>original</b> SimpleLogin forwards emails to your mailbox from the <b>reverse-alias</b> and not from the <b>original</b>
sender address. sender address.
<br /> <br />
If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>. If this option is enabled, the original sender addresses is stored in the email header <b>X-SimpleLogin-Envelope-From</b>
and the original From header is stored in <b>X-SimpleLogin-Original-From<b>.
You can choose to display this header in your email client. You can choose to display this header in your email client.
<br /> <br />
As email headers aren't encrypted, your mailbox service can know the sender address via this header. As email headers aren't encrypted, your mailbox service can know the sender address via this header.

View File

@ -286,6 +286,7 @@
}, },
async mounted() { async mounted() {
Object.freeze(Object.prototype);
let that = this; let that = this;
let res = await fetch(`/api/notifications?page=${that.page}`, { let res = await fetch(`/api/notifications?page=${that.page}`, {
method: "GET", method: "GET",

View File

@ -17,7 +17,7 @@ def test_get_setting(flask_client):
"notification": True, "notification": True,
"random_alias_default_domain": "sl.local", "random_alias_default_domain": "sl.local",
"sender_format": "AT", "sender_format": "AT",
"random_alias_suffix": "random_string", "random_alias_suffix": "word",
} }
@ -95,11 +95,13 @@ def test_get_setting_domains_v2(flask_client):
def test_update_settings_random_alias_suffix(flask_client): def test_update_settings_random_alias_suffix(flask_client):
user = login(flask_client) user = login(flask_client)
# default random_alias_suffix is random_string # default random_alias_suffix is random_string
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value assert user.random_alias_suffix == AliasSuffixEnum.word.value
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"}) r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"})
assert r.status_code == 400 assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "word"}) r = flask_client.patch(
"/api/setting", json={"random_alias_suffix": "random_string"}
)
assert r.status_code == 200 assert r.status_code == 200
assert user.random_alias_suffix == AliasSuffixEnum.word.value assert user.random_alias_suffix == AliasSuffixEnum.random_string.value

View File

@ -316,6 +316,10 @@ def test_add_alias_in_global_trash(flask_client):
def test_add_alias_in_custom_domain_trash(flask_client): def test_add_alias_in_custom_domain_trash(flask_client):
user = login(flask_client) user = login(flask_client)
for deleted_domain in DomainDeletedAlias.all():
Session.delete(deleted_domain)
Session.flush()
domain = random_domain() domain = random_domain()
custom_domain = CustomDomain.create( custom_domain = CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True user_id=user.id, domain=domain, ownership_verified=True, commit=True

View File

@ -0,0 +1,65 @@
Received: by mail-ed1-f49.google.com with SMTP id ej4so13657316edb.7
for <gmail@simplemail.fplante.fr>; Mon, 27 Jun 2022 08:48:15 -0700 (PDT)
X-Gm-Message-State: AJIora8exR9DGeRFoKAtjzwLtUpH5hqx6Zt3tm8n4gUQQivGQ3fELjUV
yT7RQIfeW9Kv2atuOcgtmGYVU4iQ8VBeLmK1xvOYL4XpXfrT7ZrJNQ==
Authentication-Results: mx.google.com;
dkim=pass header.i=@matera.eu header.s=fnt header.b=XahYMey7;
dkim=pass header.i=@sendgrid.info header.s=smtpapi header.b="QOCS/yjt";
spf=pass (google.com: domain of bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu designates 168.245.4.42 as permitted sender) smtp.mailfrom="bounces+14445963-ab4e-csyndic.quartz=gmail.com@front-mail.matera.eu";
dmarc=pass (p=NONE sp=NONE dis=NONE) header.from=matera.eu
Received: from out.frontapp.com (unknown)
by geopod-ismtpd-3-0 (SG)
with ESMTP id d2gM2N7PT7W8d2-UEC4ESA
for <csyndic.quartz@gmail.com>;
Mon, 27 Jun 2022 15:48:11.014 +0000 (UTC)
Content-Type: multipart/alternative;
boundary="----sinikael-?=_1-16563448907660.10629093370416887"
In-Reply-To:
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
References:
<imported@frontapp.com_t:AssembléeGénérale2022-06-25T16:32:03+02:006b3cdade-982b-47cd-8114-6a037dfb7d60>
<imported@frontapp.com_f924cce139940c9935621f067d46443597394f34>
<imported@frontapp.com_t:Appeldefonds2022-06-26T10:04:55+02:00d89f5e23-6d98-4f01-95fa-b7c7544b7aa9>
<imported@frontapp.com_81c5208b4cff8b0633f167fda4e6e8e8f63b7a9b>
<af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
From: {{ sender_address }}
To: {{ recipient_address }}
CC: {{ cc_address }}
Subject: Something
Message-ID: <af07e94a66ece6564ae30a2aaac7a34c@frontapp.com>
X-Mailer: Front (1.0; +https://frontapp.com;
+msgid=af07e94a66ece6564ae30a2aaac7a34c@frontapp.com)
X-Feedback-ID: 14445963:SG
X-SG-EID:
=?us-ascii?Q?XtlxQDg5i3HqMzQY2Upg19JPZBVl1RybInUUL2yta9uBoIU4KU1FMJ5DjWrz6g?=
=?us-ascii?Q?fJUK5Qmneg2uc46gwp5BdHdp6Foaq5gg3xJriv3?=
=?us-ascii?Q?9OA=2FWRifeylU9O+ngdNbOKXoeJAkROmp2mCgw9x?=
=?us-ascii?Q?uud+EclOT9mYVtbZsydOLLm6Y2PPswQl8lnmiku?=
=?us-ascii?Q?DAhkG15HTz2FbWGWNDFb7VrSsN5ddjAscr6sIHw?=
=?us-ascii?Q?S48R5fnXmfhPbmlCgqFjr0FGphfuBdNAt6z6w8a?=
=?us-ascii?Q?o9u1EYDIX7zWHZ+Tr3eyw=3D=3D?=
X-SG-ID:
=?us-ascii?Q?N2C25iY2uzGMFz6rgvQsb8raWjw0ZPf1VmjsCkspi=2FI9PhcvqXQTpKqqyZkvBe?=
=?us-ascii?Q?+2RscnQ4WPkA+BN1vYgz1rezTVIqgp+rlWrKk8o?=
=?us-ascii?Q?HoB5dzpX6HKWtWCVRi10zwlDN1+pJnySoIUrlaT?=
=?us-ascii?Q?PA2aqQKmMQbjTl0CUAFryR8hhHcxdS0cQowZSd7?=
=?us-ascii?Q?XNjJWLvCGF7ODwg=2FKr+4yRE8UvULS2nrdO2wWyQ?=
=?us-ascii?Q?AiFHdPdZsRlgNomEo=3D?=
X-Spamd-Result: default: False [-2.00 / 13.00];
ARC_ALLOW(-1.00)[google.com:s=arc-20160816:i=1];
MIME_GOOD(-0.10)[multipart/alternative,text/plain];
REPLYTO_ADDR_EQ_FROM(0.00)[];
FORGED_RECIPIENTS_FORWARDING(0.00)[];
NEURAL_HAM(-0.00)[-0.981];
FREEMAIL_TO(0.00)[gmail.com];
RCVD_TLS_LAST(0.00)[];
FREEMAIL_ENVFROM(0.00)[gmail.com];
MIME_TRACE(0.00)[0:+,1:+,2:~];
RWL_MAILSPIKE_POSSIBLE(0.00)[209.85.208.49:from]
------sinikael-?=_1-16563448907660.10629093370416887
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: quoted-printable
From {{ sender_address }} To {{ recipient_address }}
------sinikael-?=_1-16563448907660.10629093370416887--

View File

@ -0,0 +1,39 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.db import Session
from app.email import headers, status
from app.mail_sender import mail_sender
from app.models import Alias
from tests.utils import create_new_user, load_eml_file, random_email
@mail_sender.store_emails_test_decorator
def test_original_headers_from_preserved():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.flush()
assert user.include_header_email_header
original_sender_address = random_email()
msg = load_eml_file(
"replacement_on_forward_phase.eml",
{
"sender_address": original_sender_address,
"recipient_address": alias.email,
"cc_address": random_email(),
},
)
envelope = Envelope()
envelope.mail_from = f"env.{original_sender_address}"
envelope.rcpt_tos = [alias.email]
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
send_requests = mail_sender.get_stored_emails()
assert len(send_requests) == 1
request = send_requests[0]
assert request.msg[headers.SL_ENVELOPE_FROM] == envelope.mail_from
assert request.msg[headers.SL_ORIGINAL_FROM] == original_sender_address
assert (
request.msg[headers.AUTHENTICATION_RESULTS]
== msg[headers.AUTHENTICATION_RESULTS]
)

View File

@ -0,0 +1,152 @@
import re
from app.alias_suffix import get_alias_suffixes
from app.db import Session
from app.models import SLDomain, PartnerUser, AliasOptions, CustomDomain
from app.proton.utils import get_proton_partner
from init_app import add_sl_domains
from tests.utils import create_new_user, random_token
def setup_module():
Session.query(SLDomain).delete()
SLDomain.create(
domain="hidden", premium_only=False, flush=True, order=5, hidden=True
)
SLDomain.create(domain="free_non_partner", premium_only=False, flush=True, order=4)
SLDomain.create(
domain="premium_non_partner", premium_only=True, flush=True, order=3
)
SLDomain.create(
domain="free_partner",
premium_only=False,
flush=True,
partner_id=get_proton_partner().id,
order=2,
)
SLDomain.create(
domain="premium_partner",
premium_only=True,
flush=True,
partner_id=get_proton_partner().id,
order=1,
)
Session.commit()
def teardown_module():
Session.query(SLDomain).delete()
add_sl_domains()
def test_get_default_domain_even_if_is_not_allowed():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(
hidden=False, partner_id=None, premium_only=False
).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
assert suffixes[0].domain == default_domain.domain
def test_get_default_domain_hidden():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(
hidden=True, partner_id=None, premium_only=False
).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
for suffix in suffixes:
domain = SLDomain.get_by(domain=suffix.domain)
assert not domain.hidden
assert suffixes[0].domain != default_domain.domain
def test_get_default_domain_is_premium_for_free_user():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.trial_end = None
default_domain = SLDomain.filter_by(partner_id=None, premium_only=True).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
for suffix in suffixes:
domain = SLDomain.get_by(domain=suffix.domain)
assert not domain.premium_only
assert suffixes[0].domain != default_domain.domain
def test_suffixes_are_valid():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
CustomDomain.create(
user_id=user.id, domain=f"{random_token(10)}.com", verified=True
)
user.trial_end = None
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
alias_suffixes = get_alias_suffixes(user, alias_options=options)
valid_re = re.compile(r"^(\.[\w_]+)?@[\.\w]+$")
has_prefix = 0
for suffix in alias_suffixes:
match = valid_re.match(suffix.suffix)
assert match is not None
if len(match.groups()) >= 1:
has_prefix += 1
assert has_prefix > 0
def test_get_default_domain_is_only_shown_once():
user = create_new_user()
default_domain = SLDomain.filter_by(hidden=False).order_by(SLDomain.order).first()
user.default_alias_public_domain_id = default_domain.id
Session.flush()
options = AliasOptions(
show_sl_domains=True, show_partner_domains=get_proton_partner()
)
suffixes = get_alias_suffixes(user, alias_options=options)
found_default = False
found_domains = set()
for suffix in suffixes:
assert suffix.domain not in found_domains
found_domains.add(suffix.domain)
if default_domain.domain == suffix.domain:
found_default = True
assert found_default

View File

@ -16,6 +16,7 @@ from app.models import (
Directory, Directory,
DirectoryMailbox, DirectoryMailbox,
User, User,
DomainDeletedAlias,
) )
from tests.utils import create_new_user, random_domain, random_token from tests.utils import create_new_user, random_domain, random_token
@ -83,6 +84,11 @@ def get_auto_create_alias_tests(user: User) -> List:
regex="ok-.*", regex="ok-.*",
flush=True, flush=True,
) )
deleted_alias = f"deletedalias@{catchall.domain}"
Session.add(
DomainDeletedAlias(email=deleted_alias, domain_id=catchall.id, user_id=user.id)
)
Session.flush()
dir_name = random_token() dir_name = random_token()
directory = Directory.create(name=dir_name, user_id=user.id, flush=True) directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
DirectoryMailbox.create( DirectoryMailbox.create(
@ -101,6 +107,7 @@ def get_auto_create_alias_tests(user: User) -> List:
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True), (f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True), (f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True), (f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
(deleted_alias, False),
] ]

View File

@ -128,3 +128,74 @@ def test_get_premium_with_partner_domains():
assert [d.domain for d in domains] == user.available_sl_domains( assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options alias_options=options
) )
def test_get_partner_and_free_default_domain():
user = create_new_user()
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 3
assert domains[0].domain == "premium_partner"
assert domains[1].domain == "free_partner"
assert domains[2].domain == "free_non_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_premium_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = (
SLDomain.filter_by(partner_id=None, hidden=False, premium_only=True).first().id
)
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)
def test_get_free_partner_and_hidden_default_domain():
user = create_new_user()
user.trial_end = None
PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
user.default_alias_public_domain_id = SLDomain.filter_by(hidden=True).first().id
Session.flush()
options = AliasOptions(
show_sl_domains=False, show_partner_domains=get_proton_partner()
)
domains = user.get_sl_domains(alias_options=options)
assert len(domains) == 1
assert domains[0].domain == "free_partner"
assert [d.domain for d in domains] == user.available_sl_domains(
alias_options=options
)

View File

@ -0,0 +1,113 @@
import http.server
import json
import threading
import arrow
from app import config
from app.models import (
Subscription,
AppleSubscription,
CoinbaseSubscription,
ManualSubscription,
)
from tests.utils import create_new_user, random_token
from app.subscription_webhook import execute_subscription_webhook
http_server = None
last_http_request = None
def setup_module():
global http_server
http_server = http.server.ThreadingHTTPServer(("", 0), HTTPTestServer)
print(http_server.server_port)
threading.Thread(target=http_server.serve_forever, daemon=True).start()
config.SUBSCRIPTION_CHANGE_WEBHOOK = f"http://localhost:{http_server.server_port}"
def teardown_module():
global http_server
config.SUBSCRIPTION_CHANGE_WEBHOOK = None
http_server.shutdown()
class HTTPTestServer(http.server.BaseHTTPRequestHandler):
def do_POST(self):
global last_http_request
content_len = int(self.headers.get("Content-Length"))
body_data = self.rfile.read(content_len)
last_http_request = json.loads(body_data)
self.send_response(200)
def test_webhook_with_trial():
user = create_new_user()
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] is None
def test_webhook_with_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_token(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_apple_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_token(10),
plan="yearly",
product_id="",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_coinbase_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_manual_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
ManualSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp