4.64.1
Some checks failed
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 2m58s
Build-Release-Image / Build-Image (linux/arm64) (push) Failing after 17m22s
Build-Release-Image / Merge-Images (push) Has been skipped
Build-Release-Image / Create-Release (push) Has been skipped
Build-Release-Image / Notify (push) Has been skipped

This commit is contained in:
MrMeeb 2025-01-24 12:00:07 +00:00
parent dd6005ffdf
commit ed37325b32
17 changed files with 728 additions and 271 deletions

View File

@ -3,7 +3,6 @@ from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
import arrow
import sqlalchemy.exc import sqlalchemy.exc
from arrow import Arrow from arrow import Arrow
from newrelic import agent from newrelic import agent
@ -60,19 +59,21 @@ class LinkResult:
strategy: str strategy: str
def send_user_plan_changed_event(partner_user: PartnerUser) -> Optional[int]: def send_user_plan_changed_event(
partner_user: PartnerUser,
) -> UserPlanChanged:
subscription_end = partner_user.user.get_active_subscription_end( subscription_end = partner_user.user.get_active_subscription_end(
include_partner_subscription=False include_partner_subscription=False
) )
end_timestamp = None
if partner_user.user.lifetime: if partner_user.user.lifetime:
end_timestamp = arrow.get("2038-01-01").timestamp event = UserPlanChanged(lifetime=True)
elif subscription_end: elif subscription_end:
end_timestamp = subscription_end.timestamp event = UserPlanChanged(plan_end_time=subscription_end.timestamp)
event = UserPlanChanged(plan_end_time=end_timestamp) else:
event = UserPlanChanged(plan_end_time=None)
EventDispatcher.send_event(partner_user.user, EventContent(user_plan_change=event)) EventDispatcher.send_event(partner_user.user, EventContent(user_plan_change=event))
Session.flush() Session.flush()
return end_timestamp return event
def set_plan_for_partner_user(partner_user: PartnerUser, plan: SLPlan): def set_plan_for_partner_user(partner_user: PartnerUser, plan: SLPlan):
@ -194,6 +195,7 @@ class NewUserStrategy(ClientMergeStrategy):
strategy=self.__class__.__name__, strategy=self.__class__.__name__,
) )
except (UniqueViolation, sqlalchemy.exc.IntegrityError) as e: except (UniqueViolation, sqlalchemy.exc.IntegrityError) as e:
Session.rollback()
LOG.debug(f"Got the duplicate user error: {e}") LOG.debug(f"Got the duplicate user error: {e}")
return self.create_missing_link(canonical_email) return self.create_missing_link(canonical_email)

View File

@ -6,12 +6,7 @@ from flask import request
from app import mailbox_utils from app import mailbox_utils
from app.api.base import api_bp, require_api_auth from app.api.base import api_bp, require_api_auth
from app.dashboard.views.mailbox_detail import verify_mailbox_change
from app.db import Session from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
)
from app.models import Mailbox from app.models import Mailbox
from app.utils import sanitize_email from app.utils import sanitize_email
@ -122,20 +117,10 @@ def update_mailbox(mailbox_id):
if "email" in data: if "email" in data:
new_email = sanitize_email(data.get("email")) new_email = sanitize_email(data.get("email"))
if mailbox_already_used(new_email, user):
return jsonify(error=f"{new_email} already used"), 400
elif not email_can_be_used_as_mailbox(new_email):
return (
jsonify(
error=f"{new_email} cannot be used. Please note a mailbox cannot "
f"be a disposable email address"
),
400,
)
try: try:
verify_mailbox_change(user, mailbox, new_email) mailbox_utils.request_mailbox_email_change(user, mailbox, new_email)
except mailbox_utils.MailboxError as e:
return jsonify(error=e.msg), 400
except SMTPRecipientsRefused: except SMTPRecipientsRefused:
return jsonify(error=f"Incorrect mailbox, please recheck {new_email}"), 400 return jsonify(error=f"Incorrect mailbox, please recheck {new_email}"), 400
else: else:
@ -145,7 +130,7 @@ def update_mailbox(mailbox_id):
if "cancel_email_change" in data: if "cancel_email_change" in data:
cancel_email_change = data.get("cancel_email_change") cancel_email_change = data.get("cancel_email_change")
if cancel_email_change: if cancel_email_change:
mailbox.new_email = None mailbox_utils.cancel_email_change(mailbox.id, user)
changed = True changed = True
if changed: if changed:

127
app/app/coupon_utils.py Normal file
View File

@ -0,0 +1,127 @@
from typing import Optional
import arrow
from sqlalchemy import or_, update, and_
from app.config import ADMIN_EMAIL
from app.db import Session
from app.email_utils import send_email
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
from app.log import LOG
from app.models import User, ManualSubscription, Coupon, LifetimeCoupon
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
class CouponUserCannotRedeemError(Exception):
pass
def redeem_coupon(coupon_code: str, user: User) -> Optional[Coupon]:
if user.lifetime:
LOG.i(f"User {user} is a lifetime SL user. Cannot redeem coupons")
raise CouponUserCannotRedeemError()
sub = user.get_active_subscription()
if sub and not isinstance(sub, ManualSubscription):
LOG.i(
f"User {user} has an active subscription that is not manual. Cannot redeem coupon {coupon_code}"
)
raise CouponUserCannotRedeemError()
coupon = Coupon.get_by(code=coupon_code)
if not coupon:
LOG.i(f"User is trying to redeem coupon {coupon_code} that does not exist")
return None
now = arrow.utcnow()
stmt = (
update(Coupon)
.where(
and_(
Coupon.code == coupon_code,
Coupon.used == False, # noqa: E712
or_(
Coupon.expires_date == None, # noqa: E711
Coupon.expires_date > now,
),
)
)
.values(used=True, used_by_user_id=user.id, updated_at=now)
)
res = Session.execute(stmt)
if res.rowcount == 0:
LOG.i(f"Coupon {coupon.id} could not be redeemed. It's expired or invalid.")
return None
LOG.i(
f"Redeemed normal coupon {coupon.id} for {coupon.nb_year} years by user {user}"
)
if sub:
# renew existing subscription
if sub.end_at > arrow.now():
sub.end_at = sub.end_at.shift(years=coupon.nb_year)
else:
sub.end_at = arrow.now().shift(years=coupon.nb_year, days=1)
else:
sub = ManualSubscription.create(
user_id=user.id,
end_at=arrow.now().shift(years=coupon.nb_year, days=1),
comment="using coupon code",
is_giveaway=coupon.is_giveaway,
commit=True,
)
emit_user_audit_log(
user=user,
action=UserAuditLogAction.Upgrade,
message=f"User {user} redeemed coupon {coupon.id} for {coupon.nb_year} years",
)
EventDispatcher.send_event(
user=user,
content=EventContent(
user_plan_change=UserPlanChanged(plan_end_time=sub.end_at.timestamp)
),
)
Session.commit()
return coupon
def redeem_lifetime_coupon(coupon_code: str, user: User) -> Optional[Coupon]:
coupon: LifetimeCoupon = LifetimeCoupon.get_by(code=coupon_code)
if not coupon:
return None
stmt = (
update(LifetimeCoupon)
.where(
and_(
LifetimeCoupon.code == coupon_code,
LifetimeCoupon.nb_used > 0,
)
)
.values(nb_used=LifetimeCoupon.nb_used - 1)
)
res = Session.execute(stmt)
if res.rowcount == 0:
LOG.i("Coupon could not be redeemed")
return None
user.lifetime = True
user.lifetime_coupon_id = coupon.id
if coupon.paid:
user.paid_lifetime = True
EventDispatcher.send_event(
user=user,
content=EventContent(user_plan_change=UserPlanChanged(lifetime=True)),
)
Session.commit()
# notify admin
send_email(
ADMIN_EMAIL,
subject=f"User {user} used lifetime coupon({coupon.comment}). Coupon nb_used: {coupon.nb_used}",
plaintext="",
html="",
)
return coupon

View File

@ -1,17 +1,15 @@
import arrow import arrow
from flask import render_template, flash, redirect, url_for, request from flask import render_template, flash, redirect, url_for
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, validators from wtforms import StringField, validators
from app import parallel_limiter from app import parallel_limiter
from app.config import PADDLE_VENDOR_ID, PADDLE_COUPON_ID from app.config import PADDLE_VENDOR_ID, PADDLE_COUPON_ID
from app.coupon_utils import redeem_coupon, CouponUserCannotRedeemError
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.db import Session
from app.log import LOG from app.log import LOG
from app.models import ( from app.models import (
ManualSubscription,
Coupon,
Subscription, Subscription,
AppleSubscription, AppleSubscription,
CoinbaseSubscription, CoinbaseSubscription,
@ -58,56 +56,23 @@ def coupon_route():
if coupon_form.validate_on_submit(): if coupon_form.validate_on_submit():
code = coupon_form.code.data code = coupon_form.code.data
try:
coupon: Coupon = Coupon.get_by(code=code) coupon = redeem_coupon(code, current_user)
if coupon and not coupon.used: if coupon:
if coupon.expires_date and coupon.expires_date < arrow.now():
flash(
f"The coupon was expired on {coupon.expires_date.humanize()}",
"error",
)
return redirect(request.url)
updated = (
Session.query(Coupon)
.filter_by(code=code, used=False)
.update({"used_by_user_id": current_user.id, "used": True})
)
if updated != 1:
flash("Coupon is not valid", "error")
return redirect(request.url)
manual_sub: ManualSubscription = ManualSubscription.get_by(
user_id=current_user.id
)
if manual_sub:
# renew existing subscription
if manual_sub.end_at > arrow.now():
manual_sub.end_at = manual_sub.end_at.shift(years=coupon.nb_year)
else:
manual_sub.end_at = arrow.now().shift(years=coupon.nb_year, days=1)
Session.commit()
flash(
f"Your current subscription is extended to {manual_sub.end_at.humanize()}",
"success",
)
else:
ManualSubscription.create(
user_id=current_user.id,
end_at=arrow.now().shift(years=coupon.nb_year, days=1),
comment="using coupon code",
is_giveaway=coupon.is_giveaway,
commit=True,
)
flash( flash(
"Your account has been upgraded to Premium, thanks for your support!", "Your account has been upgraded to Premium, thanks for your support!",
"success", "success",
) )
return redirect(url_for("dashboard.index"))
else: else:
flash(f"Code *{code}* expired or invalid", "warning") flash(
"This coupon cannot be redeemed. It's invalid or has expired",
"warning",
)
except CouponUserCannotRedeemError:
flash(
"You have an active subscription. Please remove it before redeeming a coupon",
"warning",
)
return render_template( return render_template(
"dashboard/coupon.html", "dashboard/coupon.html",

View File

@ -1,16 +1,11 @@
import arrow
from flask import render_template, flash, redirect, url_for from flask import render_template, flash, redirect, url_for
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, validators from wtforms import StringField, validators
from app.config import ADMIN_EMAIL from app import parallel_limiter
from app.coupon_utils import redeem_lifetime_coupon
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import send_email
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import UserPlanChanged, EventContent
from app.models import LifetimeCoupon
class CouponForm(FlaskForm): class CouponForm(FlaskForm):
@ -19,6 +14,7 @@ class CouponForm(FlaskForm):
@dashboard_bp.route("/lifetime_licence", methods=["GET", "POST"]) @dashboard_bp.route("/lifetime_licence", methods=["GET", "POST"])
@login_required @login_required
@parallel_limiter.lock()
def lifetime_licence(): def lifetime_licence():
if current_user.lifetime: if current_user.lifetime:
flash("You already have a lifetime licence", "warning") flash("You already have a lifetime licence", "warning")
@ -35,36 +31,12 @@ def lifetime_licence():
if coupon_form.validate_on_submit(): if coupon_form.validate_on_submit():
code = coupon_form.code.data code = coupon_form.code.data
coupon = redeem_lifetime_coupon(code, current_user)
coupon: LifetimeCoupon = LifetimeCoupon.get_by(code=code) if coupon:
if coupon and coupon.nb_used > 0:
coupon.nb_used -= 1
current_user.lifetime = True
current_user.lifetime_coupon_id = coupon.id
if coupon.paid:
current_user.paid_lifetime = True
EventDispatcher.send_event(
user=current_user,
content=EventContent(
user_plan_change=UserPlanChanged(
plan_end_time=arrow.get("2038-01-01").timestamp
)
),
)
Session.commit()
# notify admin
send_email(
ADMIN_EMAIL,
subject=f"User {current_user} used lifetime coupon({coupon.comment}). Coupon nb_used: {coupon.nb_used}",
plaintext="",
html="",
)
flash("You are upgraded to lifetime premium!", "success") flash("You are upgraded to lifetime premium!", "success")
return redirect(url_for("dashboard.index")) return redirect(url_for("dashboard.index"))
else: else:
flash(f"Code *{code}* expired or invalid", "warning") flash("Coupon code expired or invalid", "warning")
return render_template("dashboard/lifetime_licence.html", coupon_form=coupon_form) return render_template("dashboard/lifetime_licence.html", coupon_form=coupon_form)

View File

@ -1,23 +1,23 @@
from smtplib import SMTPRecipientsRefused
from email_validator import validate_email, EmailNotValidError from email_validator import validate_email, EmailNotValidError
from flask import render_template, request, redirect, url_for, flash from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from itsdangerous import TimestampSigner from itsdangerous import TimestampSigner
from wtforms import validators from wtforms import validators
from wtforms.fields.html5 import EmailField from wtforms.fields.simple import StringField
from app import mailbox_utils
from app.config import ENFORCE_SPF, MAILBOX_SECRET from app.config import ENFORCE_SPF, MAILBOX_SECRET
from app.config import URL
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.dashboard.views.enter_sudo import sudo_required from app.dashboard.views.enter_sudo import sudo_required
from app.db import Session from app.db import Session
from app.email_utils import email_can_be_used_as_mailbox
from app.email_utils import mailbox_already_used, render, send_email
from app.extensions import limiter from app.extensions import limiter
from app.mailbox_utils import perform_mailbox_email_change, MailboxEmailChangeError from app.mailbox_utils import (
from app.models import Alias, AuthorizedAddress perform_mailbox_email_change,
MailboxEmailChangeError,
MailboxError,
)
from app.models import AuthorizedAddress
from app.models import Mailbox from app.models import Mailbox
from app.pgp_utils import PGPException, load_public_key_and_check from app.pgp_utils import PGPException, load_public_key_and_check
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
@ -25,7 +25,7 @@ from app.utils import sanitize_email, CSRFValidationForm
class ChangeEmailForm(FlaskForm): class ChangeEmailForm(FlaskForm):
email = EmailField( email = StringField(
"email", validators=[validators.DataRequired(), validators.Email()] "email", validators=[validators.DataRequired(), validators.Email()]
) )
@ -56,31 +56,16 @@ def mailbox_detail_route(mailbox_id):
request.form.get("form-name") == "update-email" request.form.get("form-name") == "update-email"
and change_email_form.validate_on_submit() and change_email_form.validate_on_submit()
): ):
new_email = sanitize_email(change_email_form.email.data)
if new_email != mailbox.email and not pending_email:
# check if this email is not already used
if mailbox_already_used(new_email, current_user) or Alias.get_by(
email=new_email
):
flash(f"Email {new_email} already used", "error")
elif not email_can_be_used_as_mailbox(new_email):
flash("You cannot use this email address as your mailbox", "error")
else:
mailbox.new_email = new_email
Session.commit()
try: try:
verify_mailbox_change(current_user, mailbox, new_email) response = mailbox_utils.request_mailbox_email_change(
except SMTPRecipientsRefused: current_user, mailbox, change_email_form.email.data
flash(
f"Incorrect mailbox, please recheck {mailbox.email}",
"error",
) )
else:
flash( flash(
f"You are going to receive an email to confirm {new_email}.", f"You are going to receive an email to confirm {mailbox.email}.",
"success", "success",
) )
except mailbox_utils.MailboxError as e:
flash(e.msg, "error")
return redirect( return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
) )
@ -265,70 +250,44 @@ def mailbox_detail_route(mailbox_id):
return render_template("dashboard/mailbox_detail.html", **locals()) return render_template("dashboard/mailbox_detail.html", **locals())
def verify_mailbox_change(user, mailbox, new_email):
s = TimestampSigner(MAILBOX_SECRET)
mailbox_id_signed = s.sign(str(mailbox.id)).decode()
verification_url = (
f"{URL}/dashboard/mailbox/confirm_change?mailbox_id={mailbox_id_signed}"
)
send_email(
new_email,
"Confirm mailbox change on SimpleLogin",
render(
"transactional/verify-mailbox-change.txt.jinja2",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=new_email,
),
render(
"transactional/verify-mailbox-change.html",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=new_email,
),
)
@dashboard_bp.route( @dashboard_bp.route(
"/mailbox/<int:mailbox_id>/cancel_email_change", methods=["GET", "POST"] "/mailbox/<int:mailbox_id>/cancel_email_change", methods=["GET", "POST"]
) )
@login_required @login_required
def cancel_mailbox_change_route(mailbox_id): def cancel_mailbox_change_route(mailbox_id):
mailbox = Mailbox.get(mailbox_id) try:
if not mailbox or mailbox.user_id != current_user.id: mailbox_utils.cancel_email_change(mailbox_id, current_user)
flash("You cannot see this page", "warning")
return redirect(url_for("dashboard.index"))
if mailbox.new_email:
mailbox.new_email = None
Session.commit()
flash("Your mailbox change is cancelled", "success") flash("Your mailbox change is cancelled", "success")
return redirect( return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
) )
else: except MailboxError as e:
flash("You have no pending mailbox change", "warning") flash(e.msg, "warning")
return redirect( return redirect(url_for("dashboard.index"))
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
@dashboard_bp.route("/mailbox/confirm_change") @dashboard_bp.route("/mailbox/confirm_change")
def mailbox_confirm_email_change_route(): def mailbox_confirm_email_change_route():
s = TimestampSigner(MAILBOX_SECRET) mailbox_id = request.args.get("mailbox_id")
signed_mailbox_id = request.args.get("mailbox_id")
code = request.args.get("code")
if code:
print("HAS OCO", code)
try: try:
mailbox_id = int(s.unsign(signed_mailbox_id, max_age=900)) mailbox = mailbox_utils.verify_mailbox_code(current_user, mailbox_id, code)
except Exception: flash("Successfully changed mailbox email", "success")
flash("Invalid link", "error") return redirect(
return redirect(url_for("dashboard.index")) url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox.id)
)
except mailbox_utils.MailboxError as e:
print(e)
flash(f"Cannot verify mailbox: {e.msg}", "error")
return redirect(url_for("dashboard.mailbox_route"))
else:
s = TimestampSigner(MAILBOX_SECRET)
try:
mailbox_id = int(s.unsign(mailbox_id, max_age=900))
res = perform_mailbox_email_change(mailbox_id) res = perform_mailbox_email_change(mailbox_id)
flash(res.message, res.message_category) flash(res.message, res.message_category)
if res.error: if res.error:
if res.error == MailboxEmailChangeError.EmailAlreadyUsed: if res.error == MailboxEmailChangeError.EmailAlreadyUsed:
@ -339,7 +298,9 @@ def mailbox_confirm_email_change_route():
return redirect(url_for("dashboard.index")) return redirect(url_for("dashboard.index"))
else: else:
raise Exception("Unhandled MailboxEmailChangeError") raise Exception("Unhandled MailboxEmailChangeError")
else: except Exception:
return redirect( flash("Invalid link", "error")
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) return redirect(url_for("dashboard.index"))
)
flash("Successfully changed mailbox email", "success")
return redirect(url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id))

View File

@ -24,7 +24,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x65vent.proto\x12\x12simplelogin_events\"(\n\x0fUserPlanChanged\x12\x15\n\rplan_end_time\x18\x01 \x01(\r\"\r\n\x0bUserDeleted\"\\\n\x0c\x41liasCreated\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0c\n\x04note\x18\x03 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x04 \x01(\x08\x12\x12\n\ncreated_at\x18\x05 \x01(\r\"T\n\x12\x41liasStatusChanged\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x03 \x01(\x08\x12\x12\n\ncreated_at\x18\x04 \x01(\r\")\n\x0c\x41liasDeleted\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\"D\n\x10\x41liasCreatedList\x12\x30\n\x06\x65vents\x18\x01 \x03(\x0b\x32 .simplelogin_events.AliasCreated\"\x93\x03\n\x0c\x45ventContent\x12?\n\x10user_plan_change\x18\x01 \x01(\x0b\x32#.simplelogin_events.UserPlanChangedH\x00\x12\x37\n\x0cuser_deleted\x18\x02 \x01(\x0b\x32\x1f.simplelogin_events.UserDeletedH\x00\x12\x39\n\ralias_created\x18\x03 \x01(\x0b\x32 .simplelogin_events.AliasCreatedH\x00\x12\x45\n\x13\x61lias_status_change\x18\x04 \x01(\x0b\x32&.simplelogin_events.AliasStatusChangedH\x00\x12\x39\n\ralias_deleted\x18\x05 \x01(\x0b\x32 .simplelogin_events.AliasDeletedH\x00\x12\x41\n\x11\x61lias_create_list\x18\x06 \x01(\x0b\x32$.simplelogin_events.AliasCreatedListH\x00\x42\t\n\x07\x63ontent\"y\n\x05\x45vent\x12\x0f\n\x07user_id\x18\x01 \x01(\r\x12\x18\n\x10\x65xternal_user_id\x18\x02 \x01(\t\x12\x12\n\npartner_id\x18\x03 \x01(\r\x12\x31\n\x07\x63ontent\x18\x04 \x01(\x0b\x32 .simplelogin_events.EventContentb\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x65vent.proto\x12\x12simplelogin_events\":\n\x0fUserPlanChanged\x12\x15\n\rplan_end_time\x18\x01 \x01(\r\x12\x10\n\x08lifetime\x18\x02 \x01(\x08\"\r\n\x0bUserDeleted\"\\\n\x0c\x41liasCreated\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0c\n\x04note\x18\x03 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x04 \x01(\x08\x12\x12\n\ncreated_at\x18\x05 \x01(\r\"T\n\x12\x41liasStatusChanged\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x03 \x01(\x08\x12\x12\n\ncreated_at\x18\x04 \x01(\r\")\n\x0c\x41liasDeleted\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\"D\n\x10\x41liasCreatedList\x12\x30\n\x06\x65vents\x18\x01 \x03(\x0b\x32 .simplelogin_events.AliasCreated\"\x93\x03\n\x0c\x45ventContent\x12?\n\x10user_plan_change\x18\x01 \x01(\x0b\x32#.simplelogin_events.UserPlanChangedH\x00\x12\x37\n\x0cuser_deleted\x18\x02 \x01(\x0b\x32\x1f.simplelogin_events.UserDeletedH\x00\x12\x39\n\ralias_created\x18\x03 \x01(\x0b\x32 .simplelogin_events.AliasCreatedH\x00\x12\x45\n\x13\x61lias_status_change\x18\x04 \x01(\x0b\x32&.simplelogin_events.AliasStatusChangedH\x00\x12\x39\n\ralias_deleted\x18\x05 \x01(\x0b\x32 .simplelogin_events.AliasDeletedH\x00\x12\x41\n\x11\x61lias_create_list\x18\x06 \x01(\x0b\x32$.simplelogin_events.AliasCreatedListH\x00\x42\t\n\x07\x63ontent\"y\n\x05\x45vent\x12\x0f\n\x07user_id\x18\x01 \x01(\r\x12\x18\n\x10\x65xternal_user_id\x18\x02 \x01(\t\x12\x12\n\npartner_id\x18\x03 \x01(\r\x12\x31\n\x07\x63ontent\x18\x04 \x01(\x0b\x32 .simplelogin_events.EventContentb\x06proto3')
_globals = globals() _globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@ -32,19 +32,19 @@ _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'event_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS: if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None DESCRIPTOR._loaded_options = None
_globals['_USERPLANCHANGED']._serialized_start=35 _globals['_USERPLANCHANGED']._serialized_start=35
_globals['_USERPLANCHANGED']._serialized_end=75 _globals['_USERPLANCHANGED']._serialized_end=93
_globals['_USERDELETED']._serialized_start=77 _globals['_USERDELETED']._serialized_start=95
_globals['_USERDELETED']._serialized_end=90 _globals['_USERDELETED']._serialized_end=108
_globals['_ALIASCREATED']._serialized_start=92 _globals['_ALIASCREATED']._serialized_start=110
_globals['_ALIASCREATED']._serialized_end=184 _globals['_ALIASCREATED']._serialized_end=202
_globals['_ALIASSTATUSCHANGED']._serialized_start=186 _globals['_ALIASSTATUSCHANGED']._serialized_start=204
_globals['_ALIASSTATUSCHANGED']._serialized_end=270 _globals['_ALIASSTATUSCHANGED']._serialized_end=288
_globals['_ALIASDELETED']._serialized_start=272 _globals['_ALIASDELETED']._serialized_start=290
_globals['_ALIASDELETED']._serialized_end=313 _globals['_ALIASDELETED']._serialized_end=331
_globals['_ALIASCREATEDLIST']._serialized_start=315 _globals['_ALIASCREATEDLIST']._serialized_start=333
_globals['_ALIASCREATEDLIST']._serialized_end=383 _globals['_ALIASCREATEDLIST']._serialized_end=401
_globals['_EVENTCONTENT']._serialized_start=386 _globals['_EVENTCONTENT']._serialized_start=404
_globals['_EVENTCONTENT']._serialized_end=789 _globals['_EVENTCONTENT']._serialized_end=807
_globals['_EVENT']._serialized_start=791 _globals['_EVENT']._serialized_start=809
_globals['_EVENT']._serialized_end=912 _globals['_EVENT']._serialized_end=930
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@ -6,10 +6,12 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map
DESCRIPTOR: _descriptor.FileDescriptor DESCRIPTOR: _descriptor.FileDescriptor
class UserPlanChanged(_message.Message): class UserPlanChanged(_message.Message):
__slots__ = ("plan_end_time",) __slots__ = ("plan_end_time", "lifetime")
PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int] PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int]
LIFETIME_FIELD_NUMBER: _ClassVar[int]
plan_end_time: int plan_end_time: int
def __init__(self, plan_end_time: _Optional[int] = ...) -> None: ... lifetime: bool
def __init__(self, plan_end_time: _Optional[int] = ..., lifetime: bool = ...) -> None: ...
class UserDeleted(_message.Message): class UserDeleted(_message.Message):
__slots__ = () __slots__ = ()

View File

@ -60,21 +60,7 @@ def create_mailbox(
f"User {user} has tried to create mailbox with {email} but is not premium" f"User {user} has tried to create mailbox with {email} but is not premium"
) )
raise OnlyPaidError() raise OnlyPaidError()
if not is_valid_email(email): check_email_for_mailbox(email, user)
LOG.i(
f"User {user} has tried to create mailbox with {email} but is not valid email"
)
raise MailboxError("Invalid email")
elif mailbox_already_used(email, user):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is already used"
)
raise MailboxError("Email already used")
elif not email_can_be_used_as_mailbox(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is invalid"
)
raise MailboxError("Invalid email")
new_mailbox: Mailbox = Mailbox.create( new_mailbox: Mailbox = Mailbox.create(
email=email, user_id=user.id, verified=verified, commit=True email=email, user_id=user.id, verified=verified, commit=True
) )
@ -106,6 +92,24 @@ def create_mailbox(
return output return output
def check_email_for_mailbox(email, user):
if not is_valid_email(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but is not valid email"
)
raise MailboxError("Invalid email")
elif mailbox_already_used(email, user):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is already used"
)
raise MailboxError("Email already used")
elif not email_can_be_used_as_mailbox(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is invalid"
)
raise MailboxError("Invalid email")
def delete_mailbox( def delete_mailbox(
user: User, user: User,
mailbox_id: int, mailbox_id: int,
@ -183,7 +187,7 @@ def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
f"User {user} failed to verify mailbox {mailbox_id} because it's owned by another user" f"User {user} failed to verify mailbox {mailbox_id} because it's owned by another user"
) )
raise MailboxError("Invalid mailbox") raise MailboxError("Invalid mailbox")
if mailbox.verified: if mailbox.verified and not mailbox.new_email:
LOG.i( LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because it's already verified" f"User {user} failed to verify mailbox {mailbox_id} because it's already verified"
) )
@ -220,6 +224,19 @@ def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
activation.tries = activation.tries + 1 activation.tries = activation.tries + 1
Session.commit() Session.commit()
raise CannotVerifyError("Invalid activation code") raise CannotVerifyError("Invalid activation code")
if mailbox.new_email:
LOG.i(
f"User {user} has verified mailbox email change from {mailbox.email} to {mailbox.new_email}"
)
emit_user_audit_log(
user=user,
action=UserAuditLogAction.UpdateMailbox,
message=f"Change mailbox email for mailbox {mailbox_id} (old={mailbox.email} | new={mailbox.new_email})",
)
mailbox.email = mailbox.new_email
mailbox.new_email = None
mailbox.verified = True
elif not mailbox.verified:
LOG.i(f"User {user} has verified mailbox {mailbox_id}") LOG.i(f"User {user} has verified mailbox {mailbox_id}")
mailbox.verified = True mailbox.verified = True
emit_user_audit_log( emit_user_audit_log(
@ -227,6 +244,14 @@ def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
action=UserAuditLogAction.VerifyMailbox, action=UserAuditLogAction.VerifyMailbox,
message=f"Verify mailbox {mailbox_id} ({mailbox.email})", message=f"Verify mailbox {mailbox_id} ({mailbox.email})",
) )
if Mailbox.get_by(email=mailbox.new_email, user_id=user.id):
raise MailboxError("That addres is already in use")
else:
LOG.i(
"User {user} alread has mailbox {mailbox} verified and no pending email change"
)
clear_activation_codes_for_mailbox(mailbox) clear_activation_codes_for_mailbox(mailbox)
return mailbox return mailbox
@ -251,7 +276,10 @@ def generate_activation_code(
def send_verification_email( def send_verification_email(
user: User, mailbox: Mailbox, activation: MailboxActivation, send_link: bool = True user: User,
mailbox: Mailbox,
activation: MailboxActivation,
send_link: bool = True,
): ):
LOG.i( LOG.i(
f"Sending mailbox verification email to {mailbox.email} with send link={send_link}" f"Sending mailbox verification email to {mailbox.email} with send link={send_link}"
@ -286,6 +314,72 @@ def send_verification_email(
) )
def send_change_email(user: User, mailbox: Mailbox, activation: MailboxActivation):
verification_url = f"{config.URL}/dashboard/mailbox/confirm_change?mailbox_id={mailbox.id}&code={activation.code}"
send_email(
mailbox.new_email,
"Confirm mailbox change on SimpleLogin",
render(
"transactional/verify-mailbox-change.txt.jinja2",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=mailbox.new_email,
),
render(
"transactional/verify-mailbox-change.html",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=mailbox.new_email,
),
)
def request_mailbox_email_change(
user: User,
mailbox: Mailbox,
new_email: str,
email_ownership_verified: bool = False,
send_email: bool = True,
use_digit_codes: bool = False,
) -> CreateMailboxOutput:
new_email = sanitize_email(new_email)
if new_email == mailbox.email:
raise MailboxError("Same email")
check_email_for_mailbox(new_email, user)
if email_ownership_verified:
mailbox.email = new_email
else:
mailbox.new_email = new_email
emit_user_audit_log(
user=user,
action=UserAuditLogAction.UpdateMailbox,
message=f"Updated mailbox {mailbox.id} email ({new_email}) pre-verified({email_ownership_verified}",
)
Session.commit()
if email_ownership_verified:
LOG.i(f"User {user} as created a pre-verified mailbox with {new_email}")
return CreateMailboxOutput(mailbox=mailbox, activation=None)
LOG.i(f"User {user} has updated mailbox email with {new_email}")
activation = generate_activation_code(mailbox, use_digit_code=use_digit_codes)
output = CreateMailboxOutput(mailbox=mailbox, activation=activation)
if not send_email:
LOG.i(f"Skipping sending validation email for mailbox {mailbox}")
return output
send_change_email(
user,
mailbox,
activation=activation,
)
return output
class MailboxEmailChangeError(Enum): class MailboxEmailChangeError(Enum):
InvalidId = 1 InvalidId = 1
EmailAlreadyUsed = 2 EmailAlreadyUsed = 2
@ -337,6 +431,23 @@ def perform_mailbox_email_change(mailbox_id: int) -> MailboxEmailChangeResult:
) )
def cancel_email_change(mailbox_id: int, user: User):
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
LOG.i(
f"User {user} has tried to cancel a mailbox an unknown mailbox {mailbox_id}"
)
raise MailboxError("Invalid mailbox")
if mailbox.user.id != user.id:
LOG.i(
f"User {user} has tried to cancel a mailbox {mailbox} owned by another user"
)
raise MailboxError("Invalid mailbox")
mailbox.new_email = None
LOG.i(f"User {mailbox.user} has cancelled mailbox email change")
clear_activation_codes_for_mailbox(mailbox)
def __get_alias_mailbox_from_email( def __get_alias_mailbox_from_email(
email_address: str, alias: Alias email_address: str, alias: Alias
) -> Optional[Mailbox]: ) -> Optional[Mailbox]:

View File

@ -590,15 +590,25 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
contact.alias contact.alias
) # In case the Session was closed in the get_or_create we re-fetch the alias ) # In case the Session was closed in the get_or_create we re-fetch the alias
reply_to_contact = None reply_to_contact = []
if msg[headers.REPLY_TO]: if msg[headers.REPLY_TO]:
reply_to = get_header_unicode(msg[headers.REPLY_TO]) reply_to_header_contents = get_header_unicode(msg[headers.REPLY_TO])
LOG.d("Create or get contact for reply_to_header:%s", reply_to) if reply_to_header_contents:
# ignore when reply-to = alias LOG.d(
if reply_to == alias.email: "Create or get contact for reply_to_header:%s", reply_to_header_contents
)
for reply_to in [
reply_to.strip()
for reply_to in reply_to_header_contents.split(",")
if reply_to.strip()
]:
reply_to_name, reply_to_email = parse_full_address(reply_to)
if reply_to_email == alias.email:
LOG.i("Reply-to same as alias %s", alias) LOG.i("Reply-to same as alias %s", alias)
else: else:
reply_to_contact = get_or_create_reply_to_contact(reply_to, alias, msg) reply_to_contact.append(
get_or_create_reply_to_contact(reply_to_email, alias, msg)
)
if alias.user.delete_on is not None: if alias.user.delete_on is not None:
LOG.d(f"user {user} is pending to be deleted. Do not forward") LOG.d(f"user {user} is pending to be deleted. Do not forward")
@ -701,7 +711,7 @@ def forward_email_to_mailbox(
envelope, envelope,
mailbox, mailbox,
user, user,
reply_to_contact: Optional[Contact], reply_to_contacts: list[Contact],
) -> (bool, str): ) -> (bool, str):
LOG.d("Forward %s -> %s -> %s", contact, alias, mailbox) LOG.d("Forward %s -> %s -> %s", contact, alias, mailbox)
@ -884,11 +894,13 @@ def forward_email_to_mailbox(
add_or_replace_header(msg, "From", new_from_header) add_or_replace_header(msg, "From", new_from_header)
LOG.d("From header, new:%s, old:%s", new_from_header, old_from_header) LOG.d("From header, new:%s, old:%s", new_from_header, old_from_header)
if reply_to_contact: if len(reply_to_contacts) > 0:
reply_to_header = msg[headers.REPLY_TO] original_reply_to = get_header_unicode(msg[headers.REPLY_TO])
new_reply_to_header = reply_to_contact.new_addr() new_reply_to_header = ", ".join(
[reply_to_contact.new_addr() for reply_to_contact in reply_to_contacts][:5]
)
add_or_replace_header(msg, "Reply-To", new_reply_to_header) add_or_replace_header(msg, "Reply-To", new_reply_to_header)
LOG.d("Reply-To header, new:%s, old:%s", new_reply_to_header, reply_to_header) LOG.d("Reply-To header, new:%s, old:%s", new_reply_to_header, original_reply_to)
# replace CC & To emails by reverse-alias for all emails that are not alias # replace CC & To emails by reverse-alias for all emails that are not alias
try: try:

View File

@ -2,7 +2,6 @@
import argparse import argparse
import time import time
import arrow
from sqlalchemy import func from sqlalchemy import func
from app.account_linking import send_user_plan_changed_event from app.account_linking import send_user_plan_changed_event
@ -38,9 +37,9 @@ for batch_start in range(pu_id_start, max_pu_id, step):
) )
).all() ).all()
for partner_user in partner_users: for partner_user in partner_users:
subscription_end = send_user_plan_changed_event(partner_user) event = send_user_plan_changed_event(partner_user)
if subscription_end is not None: if event is not None:
if subscription_end > arrow.get("2038-01-01").timestamp: if event.lifetime:
with_lifetime += 1 with_lifetime += 1
else: else:
with_premium += 1 with_premium += 1

View File

@ -4,6 +4,7 @@ package simplelogin_events;
message UserPlanChanged { message UserPlanChanged {
uint32 plan_end_time = 1; uint32 plan_end_time = 1;
bool lifetime = 2;
} }
message UserDeleted { message UserDeleted {

View File

@ -1,10 +1,11 @@
from flask import url_for from flask import url_for
from app.models import Coupon
from app.models import Coupon, LifetimeCoupon
from app.utils import random_string from app.utils import random_string
from tests.utils import login from tests.utils import login
def test_use_coupon(flask_client): def test_redeem_coupon_without_subscription(flask_client):
user = login(flask_client) user = login(flask_client)
code = random_string(10) code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True) Coupon.create(code=code, nb_year=1, commit=True)
@ -14,7 +15,22 @@ def test_use_coupon(flask_client):
data={"code": code}, data={"code": code},
) )
assert r.status_code == 302 assert r.status_code == 200
coupon = Coupon.get_by(code=code) coupon = Coupon.get_by(code=code)
assert coupon.used assert coupon.used
assert coupon.used_by_user_id == user.id assert coupon.used_by_user_id == user.id
def test_redeem_lifetime_coupon(flask_client):
login(flask_client)
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=1, commit=True)
r = flask_client.post(
url_for("dashboard.lifetime_licence"),
data={"code": code},
)
assert r.status_code == 302
coupon = LifetimeCoupon.get_by(code=code)
assert coupon.nb_used == 0

View File

@ -0,0 +1,21 @@
X-SimpleLogin-Client-IP: 54.39.200.130
Received-SPF: Softfail (mailfrom) identity=mailfrom; client-ip=34.59.200.130;
helo=relay.somewhere.net; envelope-from=everwaste@gmail.com;
receiver=<UNKNOWN>
Received: from relay.somewhere.net (relay.somewhere.net [34.59.200.130])
(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits))
(No client certificate requested)
by mx1.sldev.ovh (Postfix) with ESMTPS id 6D8C13F069
for <wehrman_mannequin@sldev.ovh>; Thu, 17 Mar 2022 16:50:20 +0000 (UTC)
Date: Thu, 17 Mar 2022 16:50:18 +0000
To: {{ alias_email }}
From: somewhere@rainbow.com
Reply-To: 666-Mail Test <a1@mailcstest.com>, 777-Mail Test <a2@mailcstest.com>,
888-Mail Test <a3@mailcstest.com>, - 5 at mailcstest.com" <a4@simplelogin.co>
Subject: test Thu, 17 Mar 2022 16:50:18 +0000
Message-Id: <20220317165018.000191@somewhere-5488dd4b6b-7crp6>
X-Mailer: swaks v20201014.0 jetmore.org/john/code/swaks/
X-Rspamd-Queue-Id: 6D8C13F069
X-Rspamd-Server: staging1
This is a test mailing

View File

@ -0,0 +1,33 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.email import status, headers
from app.email_utils import get_header_unicode, parse_full_address
from app.mail_sender import mail_sender
from app.models import Alias, Contact
from tests.utils import create_new_user, load_eml_file
@mail_sender.store_emails_test_decorator
def test_multi_reply_to():
user = create_new_user()
alias = Alias.create_new_random(user)
envelope = Envelope()
envelope.mail_from = "env.somewhere"
envelope.rcpt_tos = [alias.email]
msg = load_eml_file("multi_reply_to.eml", {"alias_email": alias.email})
alias_id = alias.id
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
sent_emails = mail_sender.get_stored_emails()
assert 1 == len(sent_emails)
msg = sent_emails[0].msg
reply_to = get_header_unicode(msg[headers.REPLY_TO])
entries = reply_to.split(",")
assert 4 == len(entries)
for entry in entries:
dummy, email = parse_full_address(entry)
contact = Contact.get_by(reply_email=email)
assert contact is not None
assert contact.alias_id == alias_id

View File

@ -0,0 +1,159 @@
import arrow
import pytest
from app.coupon_utils import (
redeem_coupon,
CouponUserCannotRedeemError,
redeem_lifetime_coupon,
)
from app.models import (
Coupon,
Subscription,
ManualSubscription,
AppleSubscription,
CoinbaseSubscription,
LifetimeCoupon,
User,
)
from tests.utils import create_new_user, random_string
def test_use_coupon():
user = create_new_user()
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True)
coupon = redeem_coupon(code, user)
assert coupon
coupon = Coupon.get_by(code=code)
assert coupon
assert coupon.used
assert coupon.used_by_user_id == user.id
sub = user.get_active_subscription()
assert isinstance(sub, ManualSubscription)
left = sub.end_at - arrow.utcnow()
assert left.days > 364
def test_use_coupon_extend_manual_sub():
user = create_new_user()
initial_end = arrow.now().shift(days=15)
ManualSubscription.create(
user_id=user.id,
end_at=initial_end,
flush=True,
)
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True)
coupon = redeem_coupon(code, user)
assert coupon
coupon = Coupon.get_by(code=code)
assert coupon
assert coupon.used
assert coupon.used_by_user_id == user.id
sub = user.get_active_subscription()
assert isinstance(sub, ManualSubscription)
left = sub.end_at - initial_end
assert left.days > 364
def test_coupon_with_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_string(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
with pytest.raises(CouponUserCannotRedeemError):
redeem_coupon("", user)
def test_webhook_with_apple_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_string(10),
plan="yearly",
product_id="",
flush=True,
)
with pytest.raises(CouponUserCannotRedeemError):
redeem_coupon("", user)
def test_webhook_with_coinbase_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
with pytest.raises(CouponUserCannotRedeemError):
redeem_coupon("", user)
def test_expired_coupon():
user = create_new_user()
code = random_string(10)
Coupon.create(
code=code, nb_year=1, commit=True, expires_date=arrow.utcnow().shift(days=-1)
)
coupon = redeem_coupon(code, user)
assert coupon is None
def test_used_coupon():
user = create_new_user()
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True, used=True)
coupon = redeem_coupon(code, user)
assert coupon is None
# Lifetime
def test_lifetime_coupon():
user = create_new_user()
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=1)
coupon = redeem_lifetime_coupon(code, user)
assert coupon
user = User.get(user.id)
assert user.lifetime
assert not user.paid_lifetime
def test_lifetime_paid_coupon():
user = create_new_user()
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=1, paid=True)
coupon = redeem_lifetime_coupon(code, user)
assert coupon
user = User.get(user.id)
assert user.lifetime
assert user.paid_lifetime
def test_used_lifetime_coupon():
user = create_new_user()
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=0, paid=True)
coupon = redeem_lifetime_coupon(code, user)
assert coupon is None
user = User.get(user.id)
assert not user.lifetime
assert not user.paid_lifetime

View File

@ -1,3 +1,4 @@
import re
from typing import Optional from typing import Optional
import arrow import arrow
@ -6,7 +7,11 @@ import pytest
from app import mailbox_utils, config from app import mailbox_utils, config
from app.db import Session from app.db import Session
from app.mail_sender import mail_sender from app.mail_sender import mail_sender
from app.mailbox_utils import MailboxEmailChangeError, get_mailbox_for_reply_phase from app.mailbox_utils import (
MailboxEmailChangeError,
get_mailbox_for_reply_phase,
request_mailbox_email_change,
)
from app.models import ( from app.models import (
Mailbox, Mailbox,
MailboxActivation, MailboxActivation,
@ -361,6 +366,24 @@ def test_verify_ok():
assert mailbox.verified assert mailbox.verified
@mail_sender.store_emails_test_decorator
def test_verify_ok_for_mailbox_email_change():
out_create = mailbox_utils.create_mailbox(user, random_email(), verified=True)
mailbox_id = out_create.mailbox.id
new_email = f"new{out_create.mailbox.email}"
out_change = mailbox_utils.request_mailbox_email_change(
user, out_create.mailbox, new_email
)
assert out_change.activation.code is not None
mailbox_utils.verify_mailbox_code(user, mailbox_id, out_change.activation.code)
activation = MailboxActivation.get_by(mailbox_id=out_create.mailbox.id)
assert activation is None
mailbox = Mailbox.get(id=out_create.mailbox.id)
assert mailbox.verified
assert mailbox.email == new_email
assert mailbox.new_email is None
# perform_mailbox_email_change # perform_mailbox_email_change
def test_perform_mailbox_email_change_invalid_id(): def test_perform_mailbox_email_change_invalid_id():
res = mailbox_utils.perform_mailbox_email_change(99999) res = mailbox_utils.perform_mailbox_email_change(99999)
@ -507,3 +530,71 @@ def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_not_aligned(
mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias) mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb is None assert mb is None
@mail_sender.store_emails_test_decorator
def test_change_mailbox_address(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.new_email == mail2
assert out.activation.mailbox_id == changed_mailbox.id
assert re.match("^[0-9]+$", out.activation.code) is None
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(config.URL) > 0
assert mail_contents.find(out.activation.code) > 0
assert mail_sent.envelope_to == mail2
@mail_sender.store_emails_test_decorator
def test_change_mailbox_address_without_verification_email(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2, send_email=False)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.new_email == mail2
assert out.activation.mailbox_id == changed_mailbox.id
assert re.match("^[0-9]+$", out.activation.code) is None
assert 0 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_change_mailbox_address_with_code(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2, use_digit_codes=True)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.new_email == mail2
assert out.activation.mailbox_id == changed_mailbox.id
assert re.match("^[0-9]+$", out.activation.code) is not None
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(config.URL) > 0
assert mail_contents.find(out.activation.code) > 0
assert mail_sent.envelope_to == mail2
def test_change_mailbox_verified_address(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2, email_ownership_verified=True)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.email == mail2
assert out.activation is None
assert 0 == len(mail_sender.get_stored_emails())