Compare commits

...

3 Commits

Author SHA1 Message Date
e47e5a5255 4.64.3
Some checks failed
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m8s
Build-Release-Image / Build-Image (linux/arm64) (push) Failing after 15m37s
Build-Release-Image / Merge-Images (push) Has been skipped
Build-Release-Image / Create-Release (push) Has been skipped
Build-Release-Image / Notify (push) Has been skipped
2025-01-27 12:00:07 +00:00
ed37325b32 4.64.1
Some checks failed
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 2m58s
Build-Release-Image / Build-Image (linux/arm64) (push) Failing after 17m22s
Build-Release-Image / Merge-Images (push) Has been skipped
Build-Release-Image / Create-Release (push) Has been skipped
Build-Release-Image / Notify (push) Has been skipped
2025-01-24 12:00:07 +00:00
dd6005ffdf 4.64.0
Some checks failed
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m25s
Build-Release-Image / Build-Image (linux/arm64) (push) Failing after 14m51s
Build-Release-Image / Merge-Images (push) Has been skipped
Build-Release-Image / Create-Release (push) Has been skipped
Build-Release-Image / Notify (push) Has been skipped
2025-01-21 12:00:08 +00:00
34 changed files with 3334 additions and 339 deletions

View File

@ -15,12 +15,11 @@ jobs:
- name: Check out repo - name: Check out repo
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: "Install rye" - name: Install uv
id: setup-rye uses: astral-sh/setup-uv@v5
uses: eifinger/setup-rye@v4
with: with:
version: '0.43.0' # Install a specific version of uv.
checksum: 'ca702c3d93fd6ec76a1a0efaaa605e10736ee79a0674d241aad1bc0fe26f7d80' version: "0.5.21"
enable-cache: true enable-cache: true
- name: Install OS dependencies - name: Install OS dependencies
@ -28,13 +27,18 @@ jobs:
sudo apt update sudo apt update
sudo apt install -y libre2-dev libpq-dev sudo apt install -y libre2-dev libpq-dev
- name: "Set up Python"
uses: actions/setup-python@v5
with:
python-version-file: "pyproject.toml"
- name: Install dependencies - name: Install dependencies
if: steps.setup-rye.outputs.cache-hit != 'true' if: steps.setup-uv.outputs.cache-hit != 'true'
run: rye sync --no-lock run: uv sync --locked --all-extras
- name: Check formatting & linting - name: Check formatting & linting
run: | run: |
rye run pre-commit run --all-files uv run pre-commit run --all-files
test: test:
@ -70,23 +74,27 @@ jobs:
- name: Check out repo - name: Check out repo
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: Install rye - name: Install uv
id: setup-rye uses: astral-sh/setup-uv@v5
uses: eifinger/setup-rye@v4
with: with:
version: '0.43.0' # Install a specific version of uv.
checksum: 'ca702c3d93fd6ec76a1a0efaaa605e10736ee79a0674d241aad1bc0fe26f7d80' version: "0.5.21"
enable-cache: true enable-cache: true
cache-prefix: 'rye-cache'
- name: Install OS dependencies - name: Install OS dependencies
run: | run: |
sudo apt update sudo apt update
sudo apt install -y libre2-dev libpq-dev sudo apt install -y libre2-dev libpq-dev
- name: "Set up Python"
uses: actions/setup-python@v5
with:
python-version-file: "pyproject.toml"
- name: Install dependencies - name: Install dependencies
if: steps.setup-rye.outputs.cache-hit != 'true' if: steps.setup-uv.outputs.cache-hit != 'true'
run: rye sync --no-lock run: uv sync --locked --all-extras
- name: Start Redis v6 - name: Start Redis v6
uses: superchargejs/redis-github-action@1.1.0 uses: superchargejs/redis-github-action@1.1.0
@ -95,8 +103,7 @@ jobs:
- name: Run db migration - name: Run db migration
run: | run: |
rye install alembic CONFIG=tests/test.env uv run alembic upgrade head
CONFIG=tests/test.env rye run alembic upgrade head
- name: Prepare version file - name: Prepare version file
run: | run: |
@ -105,7 +112,7 @@ jobs:
- name: Test with pytest - name: Test with pytest
run: | run: |
rye run pytest uv run pytest
env: env:
GITHUB_ACTIONS_TEST: true GITHUB_ACTIONS_TEST: true

View File

@ -20,7 +20,7 @@ SimpleLogin backend consists of 2 main components:
## Install dependencies ## Install dependencies
The project requires: The project requires:
- Python 3.10 and rye to manage dependencies - Python 3.10 and uv to manage dependencies
- Node v10 for front-end. - Node v10 for front-end.
- Postgres 13+ - Postgres 13+
@ -28,7 +28,7 @@ First, install all dependencies by running the following command.
Feel free to use `virtualenv` or similar tools to isolate development environment. Feel free to use `virtualenv` or similar tools to isolate development environment.
```bash ```bash
rye sync uv sync
``` ```
On Mac, sometimes you might need to install some other packages via `brew`: On Mac, sometimes you might need to install some other packages via `brew`:
@ -55,7 +55,7 @@ brew install -s re2 pybind11
We use pre-commit to run all our linting and static analysis checks. Please run We use pre-commit to run all our linting and static analysis checks. Please run
```bash ```bash
rye run pre-commit install uv run pre-commit install
``` ```
To install it in your development environment. To install it in your development environment.
@ -160,25 +160,25 @@ Here are the small sum-ups of the directory structures and their roles:
The code is formatted using [ruff](https://github.com/astral-sh/ruff), to format the code, simply run The code is formatted using [ruff](https://github.com/astral-sh/ruff), to format the code, simply run
``` ```
rye run ruff format . uv run ruff format .
``` ```
The code is also checked with `flake8`, make sure to run `flake8` before creating the pull request by The code is also checked with `flake8`, make sure to run `flake8` before creating the pull request by
```bash ```bash
rye run flake8 uv run flake8
``` ```
For HTML templates, we use `djlint`. Before creating a pull request, please run For HTML templates, we use `djlint`. Before creating a pull request, please run
```bash ```bash
rye run djlint --check templates uv run djlint --check templates
``` ```
If some files aren't properly formatted, you can format all files with If some files aren't properly formatted, you can format all files with
```bash ```bash
rye run djlint --reformat . uv run djlint --reformat .
``` ```
## Test sending email ## Test sending email
@ -236,11 +236,18 @@ There are several ways to setup Python and manage the project dependencies on Ma
# we haven't managed to make python 3.12 work # we haven't managed to make python 3.12 work
brew install python3.10 brew install python3.10
# Install rye using the official installation script, found on https://rye.astral.sh/guide/installation/ # make sure to update the PATH so python, pip point to Python3
# for us it can be done by adding "export PATH=/opt/homebrew/opt/python@3.10/libexec/bin:$PATH" to .zprofile
# Install the dependencies # Although pipx is the recommended way to install uv,
rye sync # install pipx via brew will automatically install python 3.12
# and uv will then use python 3.12
# so we recommend using uv this way instead
curl -sSL https://install.python-uv.org | python3 -
uv install
# activate the virtualenv and you should be good to go! # activate the virtualenv and you should be good to go!
source .venv/bin/activate source .venv/bin/activate
```
```

View File

@ -6,8 +6,8 @@ RUN cd /code/static && npm ci
FROM --platform=linux/amd64 ubuntu:22.04 FROM --platform=linux/amd64 ubuntu:22.04
ARG RYE_VERSION="0.43.0" ARG UV_VERSION="0.5.21"
ARG RYE_HASH="ca702c3d93fd6ec76a1a0efaaa605e10736ee79a0674d241aad1bc0fe26f7d80" ARG UV_HASH="e108c300eafae22ad8e6d94519605530f18f8762eb58d2b98a617edfb5d088fc"
# Keeps Python from generating .pyc files in the container # Keeps Python from generating .pyc files in the container
ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONDONTWRITEBYTECODE=1
@ -18,18 +18,20 @@ ENV PYTHONUNBUFFERED=1
WORKDIR /code WORKDIR /code
# Copy dependency files # Copy dependency files
COPY pyproject.toml requirements.lock requirements-dev.lock .python-version ./ COPY pyproject.toml uv.lock .python-version ./
# Install deps # Install deps
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y curl netcat-traditional gcc python3-dev gnupg git libre2-dev build-essential pkg-config cmake ninja-build bash clang \ && apt-get install -y curl netcat-traditional gcc python3-dev gnupg git libre2-dev build-essential pkg-config cmake ninja-build bash clang \
&& curl -sSL "https://github.com/astral-sh/rye/releases/download/${RYE_VERSION}/rye-x86_64-linux.gz" > rye.gz \ && curl -sSL "https://github.com/astral-sh/uv/releases/download/${UV_VERSION}/uv-x86_64-unknown-linux-gnu.tar.gz" > uv.tar.gz \
&& echo "${RYE_HASH} rye.gz" | sha256sum -c - \ && echo "${UV_HASH} uv.tar.gz" | sha256sum -c - \
&& gunzip rye.gz \ && tar xf uv.tar.gz -C /tmp/ \
&& chmod +x rye \ && mv /tmp/uv-x86_64-unknown-linux-gnu/uv /usr/bin/uv \
&& mv rye /usr/bin/rye \ && mv /tmp/uv-x86_64-unknown-linux-gnu/uvx /usr/bin/uvx \
&& rye toolchain fetch `cat .python-version` \ && rm -rf /tmp/uv* \
&& rye sync --no-lock --no-dev \ && rm -f uv.tar.gz \
&& uv python install `cat .python-version` \
&& uv sync --locked \
&& apt-get autoremove -y \ && apt-get autoremove -y \
&& apt-get purge -y curl netcat-traditional build-essential pkg-config cmake ninja-build python3-dev clang\ && apt-get purge -y curl netcat-traditional build-essential pkg-config cmake ninja-build python3-dev clang\
&& apt-get autoremove -y \ && apt-get autoremove -y \

View File

@ -3,7 +3,6 @@ from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
import arrow
import sqlalchemy.exc import sqlalchemy.exc
from arrow import Arrow from arrow import Arrow
from newrelic import agent from newrelic import agent
@ -60,19 +59,21 @@ class LinkResult:
strategy: str strategy: str
def send_user_plan_changed_event(partner_user: PartnerUser) -> Optional[int]: def send_user_plan_changed_event(
partner_user: PartnerUser,
) -> UserPlanChanged:
subscription_end = partner_user.user.get_active_subscription_end( subscription_end = partner_user.user.get_active_subscription_end(
include_partner_subscription=False include_partner_subscription=False
) )
end_timestamp = None
if partner_user.user.lifetime: if partner_user.user.lifetime:
end_timestamp = arrow.get("2038-01-01").timestamp event = UserPlanChanged(lifetime=True)
elif subscription_end: elif subscription_end:
end_timestamp = subscription_end.timestamp event = UserPlanChanged(plan_end_time=subscription_end.timestamp)
event = UserPlanChanged(plan_end_time=end_timestamp) else:
event = UserPlanChanged(plan_end_time=None)
EventDispatcher.send_event(partner_user.user, EventContent(user_plan_change=event)) EventDispatcher.send_event(partner_user.user, EventContent(user_plan_change=event))
Session.flush() Session.flush()
return end_timestamp return event
def set_plan_for_partner_user(partner_user: PartnerUser, plan: SLPlan): def set_plan_for_partner_user(partner_user: PartnerUser, plan: SLPlan):
@ -194,6 +195,7 @@ class NewUserStrategy(ClientMergeStrategy):
strategy=self.__class__.__name__, strategy=self.__class__.__name__,
) )
except (UniqueViolation, sqlalchemy.exc.IntegrityError) as e: except (UniqueViolation, sqlalchemy.exc.IntegrityError) as e:
Session.rollback()
LOG.debug(f"Got the duplicate user error: {e}") LOG.debug(f"Got the duplicate user error: {e}")
return self.create_missing_link(canonical_email) return self.create_missing_link(canonical_email)

View File

@ -6,12 +6,7 @@ from flask import request
from app import mailbox_utils from app import mailbox_utils
from app.api.base import api_bp, require_api_auth from app.api.base import api_bp, require_api_auth
from app.dashboard.views.mailbox_detail import verify_mailbox_change
from app.db import Session from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
)
from app.models import Mailbox from app.models import Mailbox
from app.utils import sanitize_email from app.utils import sanitize_email
@ -122,20 +117,10 @@ def update_mailbox(mailbox_id):
if "email" in data: if "email" in data:
new_email = sanitize_email(data.get("email")) new_email = sanitize_email(data.get("email"))
if mailbox_already_used(new_email, user):
return jsonify(error=f"{new_email} already used"), 400
elif not email_can_be_used_as_mailbox(new_email):
return (
jsonify(
error=f"{new_email} cannot be used. Please note a mailbox cannot "
f"be a disposable email address"
),
400,
)
try: try:
verify_mailbox_change(user, mailbox, new_email) mailbox_utils.request_mailbox_email_change(user, mailbox, new_email)
except mailbox_utils.MailboxError as e:
return jsonify(error=e.msg), 400
except SMTPRecipientsRefused: except SMTPRecipientsRefused:
return jsonify(error=f"Incorrect mailbox, please recheck {new_email}"), 400 return jsonify(error=f"Incorrect mailbox, please recheck {new_email}"), 400
else: else:
@ -145,7 +130,7 @@ def update_mailbox(mailbox_id):
if "cancel_email_change" in data: if "cancel_email_change" in data:
cancel_email_change = data.get("cancel_email_change") cancel_email_change = data.get("cancel_email_change")
if cancel_email_change: if cancel_email_change:
mailbox.new_email = None mailbox_utils.cancel_email_change(mailbox.id, user)
changed = True changed = True
if changed: if changed:

View File

@ -144,5 +144,6 @@ def get_available_domains_for_random_alias_v2():
@require_api_auth @require_api_auth
def unlink_proton_account(): def unlink_proton_account():
user = g.user user = g.user
perform_proton_account_unlink(user) if not perform_proton_account_unlink(user):
return jsonify(error="The account cannot be unlinked"), 400
return jsonify({"ok": True}) return jsonify({"ok": True})

127
app/app/coupon_utils.py Normal file
View File

@ -0,0 +1,127 @@
from typing import Optional
import arrow
from sqlalchemy import or_, update, and_
from app.config import ADMIN_EMAIL
from app.db import Session
from app.email_utils import send_email
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
from app.log import LOG
from app.models import User, ManualSubscription, Coupon, LifetimeCoupon
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
class CouponUserCannotRedeemError(Exception):
pass
def redeem_coupon(coupon_code: str, user: User) -> Optional[Coupon]:
if user.lifetime:
LOG.i(f"User {user} is a lifetime SL user. Cannot redeem coupons")
raise CouponUserCannotRedeemError()
sub = user.get_active_subscription()
if sub and not isinstance(sub, ManualSubscription):
LOG.i(
f"User {user} has an active subscription that is not manual. Cannot redeem coupon {coupon_code}"
)
raise CouponUserCannotRedeemError()
coupon = Coupon.get_by(code=coupon_code)
if not coupon:
LOG.i(f"User is trying to redeem coupon {coupon_code} that does not exist")
return None
now = arrow.utcnow()
stmt = (
update(Coupon)
.where(
and_(
Coupon.code == coupon_code,
Coupon.used == False, # noqa: E712
or_(
Coupon.expires_date == None, # noqa: E711
Coupon.expires_date > now,
),
)
)
.values(used=True, used_by_user_id=user.id, updated_at=now)
)
res = Session.execute(stmt)
if res.rowcount == 0:
LOG.i(f"Coupon {coupon.id} could not be redeemed. It's expired or invalid.")
return None
LOG.i(
f"Redeemed normal coupon {coupon.id} for {coupon.nb_year} years by user {user}"
)
if sub:
# renew existing subscription
if sub.end_at > arrow.now():
sub.end_at = sub.end_at.shift(years=coupon.nb_year)
else:
sub.end_at = arrow.now().shift(years=coupon.nb_year, days=1)
else:
sub = ManualSubscription.create(
user_id=user.id,
end_at=arrow.now().shift(years=coupon.nb_year, days=1),
comment="using coupon code",
is_giveaway=coupon.is_giveaway,
commit=True,
)
emit_user_audit_log(
user=user,
action=UserAuditLogAction.Upgrade,
message=f"User {user} redeemed coupon {coupon.id} for {coupon.nb_year} years",
)
EventDispatcher.send_event(
user=user,
content=EventContent(
user_plan_change=UserPlanChanged(plan_end_time=sub.end_at.timestamp)
),
)
Session.commit()
return coupon
def redeem_lifetime_coupon(coupon_code: str, user: User) -> Optional[Coupon]:
coupon: LifetimeCoupon = LifetimeCoupon.get_by(code=coupon_code)
if not coupon:
return None
stmt = (
update(LifetimeCoupon)
.where(
and_(
LifetimeCoupon.code == coupon_code,
LifetimeCoupon.nb_used > 0,
)
)
.values(nb_used=LifetimeCoupon.nb_used - 1)
)
res = Session.execute(stmt)
if res.rowcount == 0:
LOG.i("Coupon could not be redeemed")
return None
user.lifetime = True
user.lifetime_coupon_id = coupon.id
if coupon.paid:
user.paid_lifetime = True
EventDispatcher.send_event(
user=user,
content=EventContent(user_plan_change=UserPlanChanged(lifetime=True)),
)
Session.commit()
# notify admin
send_email(
ADMIN_EMAIL,
subject=f"User {user} used lifetime coupon({coupon.comment}). Coupon nb_used: {coupon.nb_used}",
plaintext="",
html="",
)
return coupon

View File

@ -239,6 +239,8 @@ def unlink_proton_account():
flash("Invalid request", "warning") flash("Invalid request", "warning")
return redirect(url_for("dashboard.setting")) return redirect(url_for("dashboard.setting"))
perform_proton_account_unlink(current_user) if not perform_proton_account_unlink(current_user):
flash("Your Proton account has been unlinked", "success") flash("Account cannot be unlinked", "warning")
else:
flash("Your Proton account has been unlinked", "success")
return redirect(url_for("dashboard.setting")) return redirect(url_for("dashboard.setting"))

View File

@ -1,17 +1,15 @@
import arrow import arrow
from flask import render_template, flash, redirect, url_for, request from flask import render_template, flash, redirect, url_for
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, validators from wtforms import StringField, validators
from app import parallel_limiter from app import parallel_limiter
from app.config import PADDLE_VENDOR_ID, PADDLE_COUPON_ID from app.config import PADDLE_VENDOR_ID, PADDLE_COUPON_ID
from app.coupon_utils import redeem_coupon, CouponUserCannotRedeemError
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.db import Session
from app.log import LOG from app.log import LOG
from app.models import ( from app.models import (
ManualSubscription,
Coupon,
Subscription, Subscription,
AppleSubscription, AppleSubscription,
CoinbaseSubscription, CoinbaseSubscription,
@ -58,56 +56,23 @@ def coupon_route():
if coupon_form.validate_on_submit(): if coupon_form.validate_on_submit():
code = coupon_form.code.data code = coupon_form.code.data
try:
coupon: Coupon = Coupon.get_by(code=code) coupon = redeem_coupon(code, current_user)
if coupon and not coupon.used: if coupon:
if coupon.expires_date and coupon.expires_date < arrow.now():
flash(
f"The coupon was expired on {coupon.expires_date.humanize()}",
"error",
)
return redirect(request.url)
updated = (
Session.query(Coupon)
.filter_by(code=code, used=False)
.update({"used_by_user_id": current_user.id, "used": True})
)
if updated != 1:
flash("Coupon is not valid", "error")
return redirect(request.url)
manual_sub: ManualSubscription = ManualSubscription.get_by(
user_id=current_user.id
)
if manual_sub:
# renew existing subscription
if manual_sub.end_at > arrow.now():
manual_sub.end_at = manual_sub.end_at.shift(years=coupon.nb_year)
else:
manual_sub.end_at = arrow.now().shift(years=coupon.nb_year, days=1)
Session.commit()
flash(
f"Your current subscription is extended to {manual_sub.end_at.humanize()}",
"success",
)
else:
ManualSubscription.create(
user_id=current_user.id,
end_at=arrow.now().shift(years=coupon.nb_year, days=1),
comment="using coupon code",
is_giveaway=coupon.is_giveaway,
commit=True,
)
flash( flash(
"Your account has been upgraded to Premium, thanks for your support!", "Your account has been upgraded to Premium, thanks for your support!",
"success", "success",
) )
else:
return redirect(url_for("dashboard.index")) flash(
"This coupon cannot be redeemed. It's invalid or has expired",
else: "warning",
flash(f"Code *{code}* expired or invalid", "warning") )
except CouponUserCannotRedeemError:
flash(
"You have an active subscription. Please remove it before redeeming a coupon",
"warning",
)
return render_template( return render_template(
"dashboard/coupon.html", "dashboard/coupon.html",

View File

@ -1,16 +1,11 @@
import arrow
from flask import render_template, flash, redirect, url_for from flask import render_template, flash, redirect, url_for
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, validators from wtforms import StringField, validators
from app.config import ADMIN_EMAIL from app import parallel_limiter
from app.coupon_utils import redeem_lifetime_coupon
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import send_email
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import UserPlanChanged, EventContent
from app.models import LifetimeCoupon
class CouponForm(FlaskForm): class CouponForm(FlaskForm):
@ -19,6 +14,7 @@ class CouponForm(FlaskForm):
@dashboard_bp.route("/lifetime_licence", methods=["GET", "POST"]) @dashboard_bp.route("/lifetime_licence", methods=["GET", "POST"])
@login_required @login_required
@parallel_limiter.lock()
def lifetime_licence(): def lifetime_licence():
if current_user.lifetime: if current_user.lifetime:
flash("You already have a lifetime licence", "warning") flash("You already have a lifetime licence", "warning")
@ -35,36 +31,12 @@ def lifetime_licence():
if coupon_form.validate_on_submit(): if coupon_form.validate_on_submit():
code = coupon_form.code.data code = coupon_form.code.data
coupon = redeem_lifetime_coupon(code, current_user)
coupon: LifetimeCoupon = LifetimeCoupon.get_by(code=code) if coupon:
if coupon and coupon.nb_used > 0:
coupon.nb_used -= 1
current_user.lifetime = True
current_user.lifetime_coupon_id = coupon.id
if coupon.paid:
current_user.paid_lifetime = True
EventDispatcher.send_event(
user=current_user,
content=EventContent(
user_plan_change=UserPlanChanged(
plan_end_time=arrow.get("2038-01-01").timestamp
)
),
)
Session.commit()
# notify admin
send_email(
ADMIN_EMAIL,
subject=f"User {current_user} used lifetime coupon({coupon.comment}). Coupon nb_used: {coupon.nb_used}",
plaintext="",
html="",
)
flash("You are upgraded to lifetime premium!", "success") flash("You are upgraded to lifetime premium!", "success")
return redirect(url_for("dashboard.index")) return redirect(url_for("dashboard.index"))
else: else:
flash(f"Code *{code}* expired or invalid", "warning") flash("Coupon code expired or invalid", "warning")
return render_template("dashboard/lifetime_licence.html", coupon_form=coupon_form) return render_template("dashboard/lifetime_licence.html", coupon_form=coupon_form)

View File

@ -1,23 +1,23 @@
from smtplib import SMTPRecipientsRefused
from email_validator import validate_email, EmailNotValidError from email_validator import validate_email, EmailNotValidError
from flask import render_template, request, redirect, url_for, flash from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from itsdangerous import TimestampSigner from itsdangerous import TimestampSigner
from wtforms import validators from wtforms import validators
from wtforms.fields.html5 import EmailField from wtforms.fields.simple import StringField
from app import mailbox_utils
from app.config import ENFORCE_SPF, MAILBOX_SECRET from app.config import ENFORCE_SPF, MAILBOX_SECRET
from app.config import URL
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.dashboard.views.enter_sudo import sudo_required from app.dashboard.views.enter_sudo import sudo_required
from app.db import Session from app.db import Session
from app.email_utils import email_can_be_used_as_mailbox
from app.email_utils import mailbox_already_used, render, send_email
from app.extensions import limiter from app.extensions import limiter
from app.mailbox_utils import perform_mailbox_email_change, MailboxEmailChangeError from app.mailbox_utils import (
from app.models import Alias, AuthorizedAddress perform_mailbox_email_change,
MailboxEmailChangeError,
MailboxError,
)
from app.models import AuthorizedAddress
from app.models import Mailbox from app.models import Mailbox
from app.pgp_utils import PGPException, load_public_key_and_check from app.pgp_utils import PGPException, load_public_key_and_check
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
@ -25,7 +25,7 @@ from app.utils import sanitize_email, CSRFValidationForm
class ChangeEmailForm(FlaskForm): class ChangeEmailForm(FlaskForm):
email = EmailField( email = StringField(
"email", validators=[validators.DataRequired(), validators.Email()] "email", validators=[validators.DataRequired(), validators.Email()]
) )
@ -56,34 +56,19 @@ def mailbox_detail_route(mailbox_id):
request.form.get("form-name") == "update-email" request.form.get("form-name") == "update-email"
and change_email_form.validate_on_submit() and change_email_form.validate_on_submit()
): ):
new_email = sanitize_email(change_email_form.email.data) try:
if new_email != mailbox.email and not pending_email: response = mailbox_utils.request_mailbox_email_change(
# check if this email is not already used current_user, mailbox, change_email_form.email.data
if mailbox_already_used(new_email, current_user) or Alias.get_by( )
email=new_email flash(
): f"You are going to receive an email to confirm {mailbox.email}.",
flash(f"Email {new_email} already used", "error") "success",
elif not email_can_be_used_as_mailbox(new_email): )
flash("You cannot use this email address as your mailbox", "error") except mailbox_utils.MailboxError as e:
else: flash(e.msg, "error")
mailbox.new_email = new_email return redirect(
Session.commit() url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
try:
verify_mailbox_change(current_user, mailbox, new_email)
except SMTPRecipientsRefused:
flash(
f"Incorrect mailbox, please recheck {mailbox.email}",
"error",
)
else:
flash(
f"You are going to receive an email to confirm {new_email}.",
"success",
)
return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
elif request.form.get("form-name") == "force-spf": elif request.form.get("form-name") == "force-spf":
if not ENFORCE_SPF: if not ENFORCE_SPF:
flash("SPF enforcement globally not enabled", "error") flash("SPF enforcement globally not enabled", "error")
@ -265,81 +250,57 @@ def mailbox_detail_route(mailbox_id):
return render_template("dashboard/mailbox_detail.html", **locals()) return render_template("dashboard/mailbox_detail.html", **locals())
def verify_mailbox_change(user, mailbox, new_email):
s = TimestampSigner(MAILBOX_SECRET)
mailbox_id_signed = s.sign(str(mailbox.id)).decode()
verification_url = (
f"{URL}/dashboard/mailbox/confirm_change?mailbox_id={mailbox_id_signed}"
)
send_email(
new_email,
"Confirm mailbox change on SimpleLogin",
render(
"transactional/verify-mailbox-change.txt.jinja2",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=new_email,
),
render(
"transactional/verify-mailbox-change.html",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=new_email,
),
)
@dashboard_bp.route( @dashboard_bp.route(
"/mailbox/<int:mailbox_id>/cancel_email_change", methods=["GET", "POST"] "/mailbox/<int:mailbox_id>/cancel_email_change", methods=["GET", "POST"]
) )
@login_required @login_required
def cancel_mailbox_change_route(mailbox_id): def cancel_mailbox_change_route(mailbox_id):
mailbox = Mailbox.get(mailbox_id) try:
if not mailbox or mailbox.user_id != current_user.id: mailbox_utils.cancel_email_change(mailbox_id, current_user)
flash("You cannot see this page", "warning")
return redirect(url_for("dashboard.index"))
if mailbox.new_email:
mailbox.new_email = None
Session.commit()
flash("Your mailbox change is cancelled", "success") flash("Your mailbox change is cancelled", "success")
return redirect( return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
) )
else: except MailboxError as e:
flash("You have no pending mailbox change", "warning") flash(e.msg, "warning")
return redirect( return redirect(url_for("dashboard.index"))
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
@dashboard_bp.route("/mailbox/confirm_change") @dashboard_bp.route("/mailbox/confirm_change")
def mailbox_confirm_email_change_route(): def mailbox_confirm_email_change_route():
s = TimestampSigner(MAILBOX_SECRET) mailbox_id = request.args.get("mailbox_id")
signed_mailbox_id = request.args.get("mailbox_id")
try: code = request.args.get("code")
mailbox_id = int(s.unsign(signed_mailbox_id, max_age=900)) if code:
except Exception: print("HAS OCO", code)
flash("Invalid link", "error") try:
return redirect(url_for("dashboard.index")) mailbox = mailbox_utils.verify_mailbox_code(current_user, mailbox_id, code)
flash("Successfully changed mailbox email", "success")
res = perform_mailbox_email_change(mailbox_id)
flash(res.message, res.message_category)
if res.error:
if res.error == MailboxEmailChangeError.EmailAlreadyUsed:
return redirect( return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox.id)
) )
elif res.error == MailboxEmailChangeError.InvalidId: except mailbox_utils.MailboxError as e:
return redirect(url_for("dashboard.index")) print(e)
else: flash(f"Cannot verify mailbox: {e.msg}", "error")
raise Exception("Unhandled MailboxEmailChangeError") return redirect(url_for("dashboard.mailbox_route"))
else: else:
return redirect( s = TimestampSigner(MAILBOX_SECRET)
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) try:
) mailbox_id = int(s.unsign(mailbox_id, max_age=900))
res = perform_mailbox_email_change(mailbox_id)
flash(res.message, res.message_category)
if res.error:
if res.error == MailboxEmailChangeError.EmailAlreadyUsed:
return redirect(
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id)
)
elif res.error == MailboxEmailChangeError.InvalidId:
return redirect(url_for("dashboard.index"))
else:
raise Exception("Unhandled MailboxEmailChangeError")
except Exception:
flash("Invalid link", "error")
return redirect(url_for("dashboard.index"))
flash("Successfully changed mailbox email", "success")
return redirect(url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id))

View File

@ -41,7 +41,7 @@ from app.models import (
PartnerSubscription, PartnerSubscription,
UnsubscribeBehaviourEnum, UnsubscribeBehaviourEnum,
) )
from app.proton.utils import get_proton_partner from app.proton.utils import get_proton_partner, can_unlink_proton_account
from app.utils import ( from app.utils import (
random_string, random_string,
CSRFValidationForm, CSRFValidationForm,
@ -323,4 +323,5 @@ def setting():
ALIAS_RAND_SUFFIX_LENGTH=ALIAS_RANDOM_SUFFIX_LENGTH, ALIAS_RAND_SUFFIX_LENGTH=ALIAS_RANDOM_SUFFIX_LENGTH,
connect_with_proton=CONNECT_WITH_PROTON, connect_with_proton=CONNECT_WITH_PROTON,
proton_linked_account=proton_linked_account, proton_linked_account=proton_linked_account,
can_unlink_proton_account=can_unlink_proton_account(current_user),
) )

View File

@ -24,7 +24,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x65vent.proto\x12\x12simplelogin_events\"(\n\x0fUserPlanChanged\x12\x15\n\rplan_end_time\x18\x01 \x01(\r\"\r\n\x0bUserDeleted\"\\\n\x0c\x41liasCreated\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0c\n\x04note\x18\x03 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x04 \x01(\x08\x12\x12\n\ncreated_at\x18\x05 \x01(\r\"T\n\x12\x41liasStatusChanged\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x03 \x01(\x08\x12\x12\n\ncreated_at\x18\x04 \x01(\r\")\n\x0c\x41liasDeleted\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\"D\n\x10\x41liasCreatedList\x12\x30\n\x06\x65vents\x18\x01 \x03(\x0b\x32 .simplelogin_events.AliasCreated\"\x93\x03\n\x0c\x45ventContent\x12?\n\x10user_plan_change\x18\x01 \x01(\x0b\x32#.simplelogin_events.UserPlanChangedH\x00\x12\x37\n\x0cuser_deleted\x18\x02 \x01(\x0b\x32\x1f.simplelogin_events.UserDeletedH\x00\x12\x39\n\ralias_created\x18\x03 \x01(\x0b\x32 .simplelogin_events.AliasCreatedH\x00\x12\x45\n\x13\x61lias_status_change\x18\x04 \x01(\x0b\x32&.simplelogin_events.AliasStatusChangedH\x00\x12\x39\n\ralias_deleted\x18\x05 \x01(\x0b\x32 .simplelogin_events.AliasDeletedH\x00\x12\x41\n\x11\x61lias_create_list\x18\x06 \x01(\x0b\x32$.simplelogin_events.AliasCreatedListH\x00\x42\t\n\x07\x63ontent\"y\n\x05\x45vent\x12\x0f\n\x07user_id\x18\x01 \x01(\r\x12\x18\n\x10\x65xternal_user_id\x18\x02 \x01(\t\x12\x12\n\npartner_id\x18\x03 \x01(\r\x12\x31\n\x07\x63ontent\x18\x04 \x01(\x0b\x32 .simplelogin_events.EventContentb\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x65vent.proto\x12\x12simplelogin_events\":\n\x0fUserPlanChanged\x12\x15\n\rplan_end_time\x18\x01 \x01(\r\x12\x10\n\x08lifetime\x18\x02 \x01(\x08\"\r\n\x0bUserDeleted\"\\\n\x0c\x41liasCreated\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0c\n\x04note\x18\x03 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x04 \x01(\x08\x12\x12\n\ncreated_at\x18\x05 \x01(\r\"T\n\x12\x41liasStatusChanged\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x03 \x01(\x08\x12\x12\n\ncreated_at\x18\x04 \x01(\r\")\n\x0c\x41liasDeleted\x12\n\n\x02id\x18\x01 \x01(\r\x12\r\n\x05\x65mail\x18\x02 \x01(\t\"D\n\x10\x41liasCreatedList\x12\x30\n\x06\x65vents\x18\x01 \x03(\x0b\x32 .simplelogin_events.AliasCreated\"\x93\x03\n\x0c\x45ventContent\x12?\n\x10user_plan_change\x18\x01 \x01(\x0b\x32#.simplelogin_events.UserPlanChangedH\x00\x12\x37\n\x0cuser_deleted\x18\x02 \x01(\x0b\x32\x1f.simplelogin_events.UserDeletedH\x00\x12\x39\n\ralias_created\x18\x03 \x01(\x0b\x32 .simplelogin_events.AliasCreatedH\x00\x12\x45\n\x13\x61lias_status_change\x18\x04 \x01(\x0b\x32&.simplelogin_events.AliasStatusChangedH\x00\x12\x39\n\ralias_deleted\x18\x05 \x01(\x0b\x32 .simplelogin_events.AliasDeletedH\x00\x12\x41\n\x11\x61lias_create_list\x18\x06 \x01(\x0b\x32$.simplelogin_events.AliasCreatedListH\x00\x42\t\n\x07\x63ontent\"y\n\x05\x45vent\x12\x0f\n\x07user_id\x18\x01 \x01(\r\x12\x18\n\x10\x65xternal_user_id\x18\x02 \x01(\t\x12\x12\n\npartner_id\x18\x03 \x01(\r\x12\x31\n\x07\x63ontent\x18\x04 \x01(\x0b\x32 .simplelogin_events.EventContentb\x06proto3')
_globals = globals() _globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@ -32,19 +32,19 @@ _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'event_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS: if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None DESCRIPTOR._loaded_options = None
_globals['_USERPLANCHANGED']._serialized_start=35 _globals['_USERPLANCHANGED']._serialized_start=35
_globals['_USERPLANCHANGED']._serialized_end=75 _globals['_USERPLANCHANGED']._serialized_end=93
_globals['_USERDELETED']._serialized_start=77 _globals['_USERDELETED']._serialized_start=95
_globals['_USERDELETED']._serialized_end=90 _globals['_USERDELETED']._serialized_end=108
_globals['_ALIASCREATED']._serialized_start=92 _globals['_ALIASCREATED']._serialized_start=110
_globals['_ALIASCREATED']._serialized_end=184 _globals['_ALIASCREATED']._serialized_end=202
_globals['_ALIASSTATUSCHANGED']._serialized_start=186 _globals['_ALIASSTATUSCHANGED']._serialized_start=204
_globals['_ALIASSTATUSCHANGED']._serialized_end=270 _globals['_ALIASSTATUSCHANGED']._serialized_end=288
_globals['_ALIASDELETED']._serialized_start=272 _globals['_ALIASDELETED']._serialized_start=290
_globals['_ALIASDELETED']._serialized_end=313 _globals['_ALIASDELETED']._serialized_end=331
_globals['_ALIASCREATEDLIST']._serialized_start=315 _globals['_ALIASCREATEDLIST']._serialized_start=333
_globals['_ALIASCREATEDLIST']._serialized_end=383 _globals['_ALIASCREATEDLIST']._serialized_end=401
_globals['_EVENTCONTENT']._serialized_start=386 _globals['_EVENTCONTENT']._serialized_start=404
_globals['_EVENTCONTENT']._serialized_end=789 _globals['_EVENTCONTENT']._serialized_end=807
_globals['_EVENT']._serialized_start=791 _globals['_EVENT']._serialized_start=809
_globals['_EVENT']._serialized_end=912 _globals['_EVENT']._serialized_end=930
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@ -6,10 +6,12 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map
DESCRIPTOR: _descriptor.FileDescriptor DESCRIPTOR: _descriptor.FileDescriptor
class UserPlanChanged(_message.Message): class UserPlanChanged(_message.Message):
__slots__ = ("plan_end_time",) __slots__ = ("plan_end_time", "lifetime")
PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int] PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int]
LIFETIME_FIELD_NUMBER: _ClassVar[int]
plan_end_time: int plan_end_time: int
def __init__(self, plan_end_time: _Optional[int] = ...) -> None: ... lifetime: bool
def __init__(self, plan_end_time: _Optional[int] = ..., lifetime: bool = ...) -> None: ...
class UserDeleted(_message.Message): class UserDeleted(_message.Message):
__slots__ = () __slots__ = ()

View File

@ -33,8 +33,11 @@ from app.models import (
SLDomain, SLDomain,
Hibp, Hibp,
AliasHibp, AliasHibp,
PartnerUser,
PartnerSubscription,
) )
from app.pgp_utils import load_public_key from app.pgp_utils import load_public_key
from app.proton.utils import get_proton_partner
def fake_data(): def fake_data():
@ -269,3 +272,27 @@ def fake_data():
CustomDomain.create( CustomDomain.create(
user_id=user.id, domain="old.com", verified=True, ownership_verified=True user_id=user.id, domain="old.com", verified=True, ownership_verified=True
) )
# Create a user
proton_partner = get_proton_partner()
user = User.create(
email="test@proton.me",
name="Proton test",
password="password",
activated=True,
is_admin=False,
intro_shown=True,
from_partner=True,
flush=True,
)
pu = PartnerUser.create(
user_id=user.id,
partner_id=proton_partner.id,
partner_email="test@proton.me",
external_user_id="DUMMY",
flush=True,
)
PartnerSubscription.create(
partner_user_id=pu.id, end_at=arrow.now().shift(years=1, days=1)
)
Session.commit()

View File

@ -60,21 +60,7 @@ def create_mailbox(
f"User {user} has tried to create mailbox with {email} but is not premium" f"User {user} has tried to create mailbox with {email} but is not premium"
) )
raise OnlyPaidError() raise OnlyPaidError()
if not is_valid_email(email): check_email_for_mailbox(email, user)
LOG.i(
f"User {user} has tried to create mailbox with {email} but is not valid email"
)
raise MailboxError("Invalid email")
elif mailbox_already_used(email, user):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is already used"
)
raise MailboxError("Email already used")
elif not email_can_be_used_as_mailbox(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is invalid"
)
raise MailboxError("Invalid email")
new_mailbox: Mailbox = Mailbox.create( new_mailbox: Mailbox = Mailbox.create(
email=email, user_id=user.id, verified=verified, commit=True email=email, user_id=user.id, verified=verified, commit=True
) )
@ -106,6 +92,24 @@ def create_mailbox(
return output return output
def check_email_for_mailbox(email, user):
if not is_valid_email(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but is not valid email"
)
raise MailboxError("Invalid email")
elif mailbox_already_used(email, user):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is already used"
)
raise MailboxError("Email already used")
elif not email_can_be_used_as_mailbox(email):
LOG.i(
f"User {user} has tried to create mailbox with {email} but email is invalid"
)
raise MailboxError("Invalid email")
def delete_mailbox( def delete_mailbox(
user: User, user: User,
mailbox_id: int, mailbox_id: int,
@ -183,7 +187,7 @@ def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
f"User {user} failed to verify mailbox {mailbox_id} because it's owned by another user" f"User {user} failed to verify mailbox {mailbox_id} because it's owned by another user"
) )
raise MailboxError("Invalid mailbox") raise MailboxError("Invalid mailbox")
if mailbox.verified: if mailbox.verified and not mailbox.new_email:
LOG.i( LOG.i(
f"User {user} failed to verify mailbox {mailbox_id} because it's already verified" f"User {user} failed to verify mailbox {mailbox_id} because it's already verified"
) )
@ -220,13 +224,34 @@ def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
activation.tries = activation.tries + 1 activation.tries = activation.tries + 1
Session.commit() Session.commit()
raise CannotVerifyError("Invalid activation code") raise CannotVerifyError("Invalid activation code")
LOG.i(f"User {user} has verified mailbox {mailbox_id}") if mailbox.new_email:
mailbox.verified = True LOG.i(
emit_user_audit_log( f"User {user} has verified mailbox email change from {mailbox.email} to {mailbox.new_email}"
user=user, )
action=UserAuditLogAction.VerifyMailbox, emit_user_audit_log(
message=f"Verify mailbox {mailbox_id} ({mailbox.email})", user=user,
) action=UserAuditLogAction.UpdateMailbox,
message=f"Change mailbox email for mailbox {mailbox_id} (old={mailbox.email} | new={mailbox.new_email})",
)
mailbox.email = mailbox.new_email
mailbox.new_email = None
mailbox.verified = True
elif not mailbox.verified:
LOG.i(f"User {user} has verified mailbox {mailbox_id}")
mailbox.verified = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.VerifyMailbox,
message=f"Verify mailbox {mailbox_id} ({mailbox.email})",
)
if Mailbox.get_by(email=mailbox.new_email, user_id=user.id):
raise MailboxError("That addres is already in use")
else:
LOG.i(
"User {user} alread has mailbox {mailbox} verified and no pending email change"
)
clear_activation_codes_for_mailbox(mailbox) clear_activation_codes_for_mailbox(mailbox)
return mailbox return mailbox
@ -251,7 +276,10 @@ def generate_activation_code(
def send_verification_email( def send_verification_email(
user: User, mailbox: Mailbox, activation: MailboxActivation, send_link: bool = True user: User,
mailbox: Mailbox,
activation: MailboxActivation,
send_link: bool = True,
): ):
LOG.i( LOG.i(
f"Sending mailbox verification email to {mailbox.email} with send link={send_link}" f"Sending mailbox verification email to {mailbox.email} with send link={send_link}"
@ -286,6 +314,72 @@ def send_verification_email(
) )
def send_change_email(user: User, mailbox: Mailbox, activation: MailboxActivation):
verification_url = f"{config.URL}/dashboard/mailbox/confirm_change?mailbox_id={mailbox.id}&code={activation.code}"
send_email(
mailbox.new_email,
"Confirm mailbox change on SimpleLogin",
render(
"transactional/verify-mailbox-change.txt.jinja2",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=mailbox.new_email,
),
render(
"transactional/verify-mailbox-change.html",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
mailbox_new_email=mailbox.new_email,
),
)
def request_mailbox_email_change(
user: User,
mailbox: Mailbox,
new_email: str,
email_ownership_verified: bool = False,
send_email: bool = True,
use_digit_codes: bool = False,
) -> CreateMailboxOutput:
new_email = sanitize_email(new_email)
if new_email == mailbox.email:
raise MailboxError("Same email")
check_email_for_mailbox(new_email, user)
if email_ownership_verified:
mailbox.email = new_email
else:
mailbox.new_email = new_email
emit_user_audit_log(
user=user,
action=UserAuditLogAction.UpdateMailbox,
message=f"Updated mailbox {mailbox.id} email ({new_email}) pre-verified({email_ownership_verified}",
)
Session.commit()
if email_ownership_verified:
LOG.i(f"User {user} as created a pre-verified mailbox with {new_email}")
return CreateMailboxOutput(mailbox=mailbox, activation=None)
LOG.i(f"User {user} has updated mailbox email with {new_email}")
activation = generate_activation_code(mailbox, use_digit_code=use_digit_codes)
output = CreateMailboxOutput(mailbox=mailbox, activation=activation)
if not send_email:
LOG.i(f"Skipping sending validation email for mailbox {mailbox}")
return output
send_change_email(
user,
mailbox,
activation=activation,
)
return output
class MailboxEmailChangeError(Enum): class MailboxEmailChangeError(Enum):
InvalidId = 1 InvalidId = 1
EmailAlreadyUsed = 2 EmailAlreadyUsed = 2
@ -337,6 +431,23 @@ def perform_mailbox_email_change(mailbox_id: int) -> MailboxEmailChangeResult:
) )
def cancel_email_change(mailbox_id: int, user: User):
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
LOG.i(
f"User {user} has tried to cancel a mailbox an unknown mailbox {mailbox_id}"
)
raise MailboxError("Invalid mailbox")
if mailbox.user.id != user.id:
LOG.i(
f"User {user} has tried to cancel a mailbox {mailbox} owned by another user"
)
raise MailboxError("Invalid mailbox")
mailbox.new_email = None
LOG.i(f"User {mailbox.user} has cancelled mailbox email change")
clear_activation_codes_for_mailbox(mailbox)
def __get_alias_mailbox_from_email( def __get_alias_mailbox_from_email(
email_address: str, alias: Alias email_address: str, alias: Alias
) -> Optional[Mailbox]: ) -> Optional[Mailbox]:

View File

@ -1,9 +1,10 @@
from newrelic import agent
from typing import Optional from typing import Optional
from newrelic import agent
from app.db import Session from app.db import Session
from app.log import LOG
from app.errors import ProtonPartnerNotSetUp from app.errors import ProtonPartnerNotSetUp
from app.log import LOG
from app.models import Partner, PartnerUser, User from app.models import Partner, PartnerUser, User
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
@ -26,7 +27,13 @@ def is_proton_partner(partner: Partner) -> bool:
return partner.name == PROTON_PARTNER_NAME return partner.name == PROTON_PARTNER_NAME
def perform_proton_account_unlink(current_user: User): def can_unlink_proton_account(user: User) -> bool:
return (user.flags & User.FLAG_CREATED_FROM_PARTNER) == 0
def perform_proton_account_unlink(current_user: User) -> bool:
if not can_unlink_proton_account(current_user):
return False
proton_partner = get_proton_partner() proton_partner = get_proton_partner()
partner_user = PartnerUser.get_by( partner_user = PartnerUser.get_by(
user_id=current_user.id, partner_id=proton_partner.id user_id=current_user.id, partner_id=proton_partner.id
@ -41,3 +48,4 @@ def perform_proton_account_unlink(current_user: User):
PartnerUser.delete(partner_user.id) PartnerUser.delete(partner_user.id)
Session.commit() Session.commit()
agent.record_custom_event("AccountUnlinked", {"partner": proton_partner.name}) agent.record_custom_event("AccountUnlinked", {"partner": proton_partner.name})
return True

View File

@ -47,11 +47,6 @@ from typing import List, Tuple, Optional
import newrelic.agent import newrelic.agent
from aiosmtpd.controller import Controller from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope from aiosmtpd.smtp import Envelope
from email_validator import validate_email, EmailNotValidError
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from sqlalchemy.exc import IntegrityError
from app import pgp_utils, s3, config, contact_utils from app import pgp_utils, s3, config, contact_utils
from app.alias_utils import ( from app.alias_utils import (
try_auto_create, try_auto_create,
@ -174,8 +169,12 @@ from app.pgp_utils import (
load_public_key_and_check, load_public_key_and_check,
) )
from app.utils import sanitize_email from app.utils import sanitize_email
from email_validator import validate_email, EmailNotValidError
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from init_app import load_pgp_public_keys from init_app import load_pgp_public_keys
from server import create_light_app from server import create_light_app
from sqlalchemy.exc import IntegrityError
def get_or_create_contact( def get_or_create_contact(
@ -590,15 +589,27 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
contact.alias contact.alias
) # In case the Session was closed in the get_or_create we re-fetch the alias ) # In case the Session was closed in the get_or_create we re-fetch the alias
reply_to_contact = None reply_to_contact = []
if msg[headers.REPLY_TO]: if msg[headers.REPLY_TO]:
reply_to = get_header_unicode(msg[headers.REPLY_TO]) reply_to_header_contents = get_header_unicode(msg[headers.REPLY_TO])
LOG.d("Create or get contact for reply_to_header:%s", reply_to) if reply_to_header_contents:
# ignore when reply-to = alias LOG.d(
if reply_to == alias.email: "Create or get contact for reply_to_header:%s", reply_to_header_contents
LOG.i("Reply-to same as alias %s", alias) )
else: for reply_to in [
reply_to_contact = get_or_create_reply_to_contact(reply_to, alias, msg) reply_to.strip()
for reply_to in reply_to_header_contents.split(",")
if reply_to.strip()
]:
reply_to_name, reply_to_email = parse_full_address(reply_to)
if reply_to_email == alias.email:
LOG.i("Reply-to same as alias %s", alias)
else:
reply_contact = get_or_create_reply_to_contact(
reply_to_email, alias, msg
)
if reply_contact:
reply_to_contact.append(reply_contact)
if alias.user.delete_on is not None: if alias.user.delete_on is not None:
LOG.d(f"user {user} is pending to be deleted. Do not forward") LOG.d(f"user {user} is pending to be deleted. Do not forward")
@ -701,7 +712,7 @@ def forward_email_to_mailbox(
envelope, envelope,
mailbox, mailbox,
user, user,
reply_to_contact: Optional[Contact], reply_to_contacts: list[Contact],
) -> (bool, str): ) -> (bool, str):
LOG.d("Forward %s -> %s -> %s", contact, alias, mailbox) LOG.d("Forward %s -> %s -> %s", contact, alias, mailbox)
@ -884,11 +895,13 @@ def forward_email_to_mailbox(
add_or_replace_header(msg, "From", new_from_header) add_or_replace_header(msg, "From", new_from_header)
LOG.d("From header, new:%s, old:%s", new_from_header, old_from_header) LOG.d("From header, new:%s, old:%s", new_from_header, old_from_header)
if reply_to_contact: if len(reply_to_contacts) > 0:
reply_to_header = msg[headers.REPLY_TO] original_reply_to = get_header_unicode(msg[headers.REPLY_TO])
new_reply_to_header = reply_to_contact.new_addr() new_reply_to_header = ", ".join(
[reply_to_contact.new_addr() for reply_to_contact in reply_to_contacts][:5]
)
add_or_replace_header(msg, "Reply-To", new_reply_to_header) add_or_replace_header(msg, "Reply-To", new_reply_to_header)
LOG.d("Reply-To header, new:%s, old:%s", new_reply_to_header, reply_to_header) LOG.d("Reply-To header, new:%s, old:%s", new_reply_to_header, original_reply_to)
# replace CC & To emails by reverse-alias for all emails that are not alias # replace CC & To emails by reverse-alias for all emails that are not alias
try: try:

View File

@ -56,14 +56,15 @@ def add_sl_domains():
Session.commit() Session.commit()
def add_proton_partner(): def add_proton_partner() -> Partner:
proton_partner = Partner.get_by(name=PROTON_PARTNER_NAME) proton_partner = Partner.get_by(name=PROTON_PARTNER_NAME)
if not proton_partner: if not proton_partner:
Partner.create( proton_partner = Partner.create(
name=PROTON_PARTNER_NAME, name=PROTON_PARTNER_NAME,
contact_email="simplelogin@protonmail.com", contact_email="simplelogin@protonmail.com",
) )
Session.commit() Session.commit()
return proton_partner
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -2,7 +2,6 @@
import argparse import argparse
import time import time
import arrow
from sqlalchemy import func from sqlalchemy import func
from app.account_linking import send_user_plan_changed_event from app.account_linking import send_user_plan_changed_event
@ -38,9 +37,9 @@ for batch_start in range(pu_id_start, max_pu_id, step):
) )
).all() ).all()
for partner_user in partner_users: for partner_user in partner_users:
subscription_end = send_user_plan_changed_event(partner_user) event = send_user_plan_changed_event(partner_user)
if subscription_end is not None: if event is not None:
if subscription_end > arrow.get("2038-01-01").timestamp: if event.lifetime:
with_lifetime += 1 with_lifetime += 1
else: else:
with_premium += 1 with_premium += 1

View File

@ -4,6 +4,7 @@ package simplelogin_events;
message UserPlanChanged { message UserPlanChanged {
uint32 plan_end_time = 1; uint32 plan_end_time = 1;
bool lifetime = 2;
} }
message UserDeleted { message UserDeleted {

View File

@ -36,12 +36,12 @@ dependencies = [
"watchtower ~= 0.8.0", "watchtower ~= 0.8.0",
"sqlalchemy-utils == 0.36.8", "sqlalchemy-utils == 0.36.8",
"jwcrypto ~= 0.8", "jwcrypto ~= 0.8",
"yacron>=0.19.0", "yacron~=0.11.2",
"flask-debugtoolbar ~= 0.11.0", "flask-debugtoolbar ~= 0.11.0",
"requests_oauthlib ~= 1.3.0", "requests_oauthlib ~= 1.3.0",
"pyopenssl ~= 19.1.0", "pyopenssl ~= 19.1.0",
"aiosmtpd ~= 1.2", "aiosmtpd ~= 1.2",
"dnspython == 2.6.1", "dnspython==2.0.0",
"coloredlogs ~= 14.0", "coloredlogs ~= 14.0",
"pycryptodome ~= 3.9.8", "pycryptodome ~= 3.9.8",
"phpserialize ~= 1.3", "phpserialize ~= 1.3",
@ -71,11 +71,12 @@ dependencies = [
"MarkupSafe~=1.1.1", "MarkupSafe~=1.1.1",
"cryptography ~= 37.0.1", "cryptography ~= 37.0.1",
"SQLAlchemy ~= 1.3.24", "SQLAlchemy ~= 1.3.24",
"redis ~= 4.5.3", "redis==4.6.0",
"newrelic-telemetry-sdk ~= 0.5.0", "newrelic-telemetry-sdk ~= 0.5.0",
"aiospamc == 0.10", "aiospamc == 0.10",
"itsdangerous ~= 1.1.0", "itsdangerous ~= 1.1.0",
"werkzeug ~= 1.0.1", "werkzeug ~= 1.0.1",
"alembic ~= 1.4.3",
] ]
[tool.black] [tool.black]
@ -128,7 +129,7 @@ dev-dependencies = [
"pytest-cov ~= 3.0.0", "pytest-cov ~= 3.0.0",
"pre-commit ~= 2.17.0", "pre-commit ~= 2.17.0",
"black ~= 22.1.0", "black ~= 22.1.0",
"djlint ~= 1.3.0", "djlint==1.34.1",
"pylint ~= 2.14.4", "pylint ~= 2.14.4",
"ruff ~= 0.1.5", "ruff ~= 0.1.5",
] ]

View File

@ -12,10 +12,10 @@ docker run -p 25432:5432 --name ${container_name} -e POSTGRES_PASSWORD=postgres
sleep 3 sleep 3
# upgrade the DB to the latest stage and # upgrade the DB to the latest stage and
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl rye run alembic upgrade head env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl uv run alembic upgrade head
# generate the migration script. # generate the migration script.
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl rye run alembic revision --autogenerate $@ env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl uv run alembic revision --autogenerate $@
# remove the db # remove the db
docker rm -f ${container_name} docker rm -f ${container_name}

View File

@ -3,5 +3,5 @@
export DB_URI=postgresql://myuser:mypassword@localhost:15432/simplelogin export DB_URI=postgresql://myuser:mypassword@localhost:15432/simplelogin
echo 'drop schema public cascade; create schema public;' | psql $DB_URI echo 'drop schema public cascade; create schema public;' | psql $DB_URI
rye run alembic upgrade head uv run alembic upgrade head
rye run flask dummy-data uv run flask dummy-data

View File

@ -3,4 +3,4 @@
export DB_URI=postgresql://myuser:mypassword@localhost:15432/test export DB_URI=postgresql://myuser:mypassword@localhost:15432/test
echo 'drop schema public cascade; create schema public;' | psql $DB_URI echo 'drop schema public cascade; create schema public;' | psql $DB_URI
rye run alembic upgrade head uv run alembic upgrade head

View File

@ -10,10 +10,10 @@ docker run -d --name sl-test-db -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test
sleep 3 sleep 3
# migrate the DB to the latest version # migrate the DB to the latest version
CONFIG=tests/test.env rye run alembic upgrade head CONFIG=tests/test.env uv run alembic upgrade head
# run test # run test
rye run pytest -c pytest.ci.ini uv run pytest -c pytest.ci.ini
# Delete the test DB # Delete the test DB
docker rm -f sl-test-db docker rm -f sl-test-db

View File

@ -8,7 +8,6 @@ import flask_limiter
import flask_profiler import flask_profiler
import newrelic.agent import newrelic.agent
import sentry_sdk import sentry_sdk
from flask import ( from flask import (
Flask, Flask,
redirect, redirect,
@ -498,9 +497,9 @@ def register_custom_commands(app):
from init_app import add_sl_domains, add_proton_partner from init_app import add_sl_domains, add_proton_partner
LOG.w("reset db, add fake data") LOG.w("reset db, add fake data")
add_proton_partner()
fake_data() fake_data()
add_sl_domains() add_sl_domains()
add_proton_partner()
@app.cli.command("send-newsletter") @app.cli.command("send-newsletter")
@click.option("-n", "--newsletter_id", type=int, help="Newsletter ID to be sent") @click.option("-n", "--newsletter_id", type=int, help="Newsletter ID to be sent")

View File

@ -144,7 +144,7 @@
</div> </div>
<!-- END change name & profile picture --> <!-- END change name & profile picture -->
<!-- Connect with Proton --> <!-- Connect with Proton -->
{% if connect_with_proton %} {% if connect_with_proton and can_unlink_proton_account %}
<div class="card" id="connect-with-proton"> <div class="card" id="connect-with-proton">
<div class="card-body"> <div class="card-body">

View File

@ -1,10 +1,11 @@
from flask import url_for from flask import url_for
from app.models import Coupon
from app.models import Coupon, LifetimeCoupon
from app.utils import random_string from app.utils import random_string
from tests.utils import login from tests.utils import login
def test_use_coupon(flask_client): def test_redeem_coupon_without_subscription(flask_client):
user = login(flask_client) user = login(flask_client)
code = random_string(10) code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True) Coupon.create(code=code, nb_year=1, commit=True)
@ -14,7 +15,22 @@ def test_use_coupon(flask_client):
data={"code": code}, data={"code": code},
) )
assert r.status_code == 302 assert r.status_code == 200
coupon = Coupon.get_by(code=code) coupon = Coupon.get_by(code=code)
assert coupon.used assert coupon.used
assert coupon.used_by_user_id == user.id assert coupon.used_by_user_id == user.id
def test_redeem_lifetime_coupon(flask_client):
login(flask_client)
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=1, commit=True)
r = flask_client.post(
url_for("dashboard.lifetime_licence"),
data={"code": code},
)
assert r.status_code == 302
coupon = LifetimeCoupon.get_by(code=code)
assert coupon.nb_used == 0

View File

@ -0,0 +1,21 @@
X-SimpleLogin-Client-IP: 54.39.200.130
Received-SPF: Softfail (mailfrom) identity=mailfrom; client-ip=34.59.200.130;
helo=relay.somewhere.net; envelope-from=everwaste@gmail.com;
receiver=<UNKNOWN>
Received: from relay.somewhere.net (relay.somewhere.net [34.59.200.130])
(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits))
(No client certificate requested)
by mx1.sldev.ovh (Postfix) with ESMTPS id 6D8C13F069
for <wehrman_mannequin@sldev.ovh>; Thu, 17 Mar 2022 16:50:20 +0000 (UTC)
Date: Thu, 17 Mar 2022 16:50:18 +0000
To: {{ alias_email }}
From: somewhere@rainbow.com
Reply-To: 666-Mail Test <a1@mailcstest.com>, 777-Mail Test <a2@mailcstest.com>,
888-Mail Test <a3@mailcstest.com>, - 5 at mailcstest.com" <a4@simplelogin.co>
Subject: test Thu, 17 Mar 2022 16:50:18 +0000
Message-Id: <20220317165018.000191@somewhere-5488dd4b6b-7crp6>
X-Mailer: swaks v20201014.0 jetmore.org/john/code/swaks/
X-Rspamd-Queue-Id: 6D8C13F069
X-Rspamd-Server: staging1
This is a test mailing

View File

@ -0,0 +1,33 @@
from aiosmtpd.smtp import Envelope
import email_handler
from app.email import status, headers
from app.email_utils import get_header_unicode, parse_full_address
from app.mail_sender import mail_sender
from app.models import Alias, Contact
from tests.utils import create_new_user, load_eml_file
@mail_sender.store_emails_test_decorator
def test_multi_reply_to():
user = create_new_user()
alias = Alias.create_new_random(user)
envelope = Envelope()
envelope.mail_from = "env.somewhere"
envelope.rcpt_tos = [alias.email]
msg = load_eml_file("multi_reply_to.eml", {"alias_email": alias.email})
alias_id = alias.id
result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E200
sent_emails = mail_sender.get_stored_emails()
assert 1 == len(sent_emails)
msg = sent_emails[0].msg
reply_to = get_header_unicode(msg[headers.REPLY_TO])
entries = reply_to.split(",")
assert 4 == len(entries)
for entry in entries:
dummy, email = parse_full_address(entry)
contact = Contact.get_by(reply_email=email)
assert contact is not None
assert contact.alias_id == alias_id

View File

@ -0,0 +1,159 @@
import arrow
import pytest
from app.coupon_utils import (
redeem_coupon,
CouponUserCannotRedeemError,
redeem_lifetime_coupon,
)
from app.models import (
Coupon,
Subscription,
ManualSubscription,
AppleSubscription,
CoinbaseSubscription,
LifetimeCoupon,
User,
)
from tests.utils import create_new_user, random_string
def test_use_coupon():
user = create_new_user()
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True)
coupon = redeem_coupon(code, user)
assert coupon
coupon = Coupon.get_by(code=code)
assert coupon
assert coupon.used
assert coupon.used_by_user_id == user.id
sub = user.get_active_subscription()
assert isinstance(sub, ManualSubscription)
left = sub.end_at - arrow.utcnow()
assert left.days > 364
def test_use_coupon_extend_manual_sub():
user = create_new_user()
initial_end = arrow.now().shift(days=15)
ManualSubscription.create(
user_id=user.id,
end_at=initial_end,
flush=True,
)
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True)
coupon = redeem_coupon(code, user)
assert coupon
coupon = Coupon.get_by(code=code)
assert coupon
assert coupon.used
assert coupon.used_by_user_id == user.id
sub = user.get_active_subscription()
assert isinstance(sub, ManualSubscription)
left = sub.end_at - initial_end
assert left.days > 364
def test_coupon_with_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_string(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
with pytest.raises(CouponUserCannotRedeemError):
redeem_coupon("", user)
def test_webhook_with_apple_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_string(10),
plan="yearly",
product_id="",
flush=True,
)
with pytest.raises(CouponUserCannotRedeemError):
redeem_coupon("", user)
def test_webhook_with_coinbase_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
with pytest.raises(CouponUserCannotRedeemError):
redeem_coupon("", user)
def test_expired_coupon():
user = create_new_user()
code = random_string(10)
Coupon.create(
code=code, nb_year=1, commit=True, expires_date=arrow.utcnow().shift(days=-1)
)
coupon = redeem_coupon(code, user)
assert coupon is None
def test_used_coupon():
user = create_new_user()
code = random_string(10)
Coupon.create(code=code, nb_year=1, commit=True, used=True)
coupon = redeem_coupon(code, user)
assert coupon is None
# Lifetime
def test_lifetime_coupon():
user = create_new_user()
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=1)
coupon = redeem_lifetime_coupon(code, user)
assert coupon
user = User.get(user.id)
assert user.lifetime
assert not user.paid_lifetime
def test_lifetime_paid_coupon():
user = create_new_user()
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=1, paid=True)
coupon = redeem_lifetime_coupon(code, user)
assert coupon
user = User.get(user.id)
assert user.lifetime
assert user.paid_lifetime
def test_used_lifetime_coupon():
user = create_new_user()
code = random_string(10)
LifetimeCoupon.create(code=code, nb_used=0, paid=True)
coupon = redeem_lifetime_coupon(code, user)
assert coupon is None
user = User.get(user.id)
assert not user.lifetime
assert not user.paid_lifetime

View File

@ -1,3 +1,4 @@
import re
from typing import Optional from typing import Optional
import arrow import arrow
@ -6,7 +7,11 @@ import pytest
from app import mailbox_utils, config from app import mailbox_utils, config
from app.db import Session from app.db import Session
from app.mail_sender import mail_sender from app.mail_sender import mail_sender
from app.mailbox_utils import MailboxEmailChangeError, get_mailbox_for_reply_phase from app.mailbox_utils import (
MailboxEmailChangeError,
get_mailbox_for_reply_phase,
request_mailbox_email_change,
)
from app.models import ( from app.models import (
Mailbox, Mailbox,
MailboxActivation, MailboxActivation,
@ -361,6 +366,24 @@ def test_verify_ok():
assert mailbox.verified assert mailbox.verified
@mail_sender.store_emails_test_decorator
def test_verify_ok_for_mailbox_email_change():
out_create = mailbox_utils.create_mailbox(user, random_email(), verified=True)
mailbox_id = out_create.mailbox.id
new_email = f"new{out_create.mailbox.email}"
out_change = mailbox_utils.request_mailbox_email_change(
user, out_create.mailbox, new_email
)
assert out_change.activation.code is not None
mailbox_utils.verify_mailbox_code(user, mailbox_id, out_change.activation.code)
activation = MailboxActivation.get_by(mailbox_id=out_create.mailbox.id)
assert activation is None
mailbox = Mailbox.get(id=out_create.mailbox.id)
assert mailbox.verified
assert mailbox.email == new_email
assert mailbox.new_email is None
# perform_mailbox_email_change # perform_mailbox_email_change
def test_perform_mailbox_email_change_invalid_id(): def test_perform_mailbox_email_change_invalid_id():
res = mailbox_utils.perform_mailbox_email_change(99999) res = mailbox_utils.perform_mailbox_email_change(99999)
@ -507,3 +530,71 @@ def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_not_aligned(
mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias) mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb is None assert mb is None
@mail_sender.store_emails_test_decorator
def test_change_mailbox_address(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.new_email == mail2
assert out.activation.mailbox_id == changed_mailbox.id
assert re.match("^[0-9]+$", out.activation.code) is None
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(config.URL) > 0
assert mail_contents.find(out.activation.code) > 0
assert mail_sent.envelope_to == mail2
@mail_sender.store_emails_test_decorator
def test_change_mailbox_address_without_verification_email(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2, send_email=False)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.new_email == mail2
assert out.activation.mailbox_id == changed_mailbox.id
assert re.match("^[0-9]+$", out.activation.code) is None
assert 0 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_change_mailbox_address_with_code(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2, use_digit_codes=True)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.new_email == mail2
assert out.activation.mailbox_id == changed_mailbox.id
assert re.match("^[0-9]+$", out.activation.code) is not None
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(config.URL) > 0
assert mail_contents.find(out.activation.code) > 0
assert mail_sent.envelope_to == mail2
def test_change_mailbox_verified_address(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail1 = f"mail_1@{domain}"
mbox = Mailbox.create(email=mail1, user_id=user.id, verified=True, flush=True)
mail2 = f"mail_2@{domain}"
out = request_mailbox_email_change(user, mbox, mail2, email_ownership_verified=True)
changed_mailbox = Mailbox.get(mbox.id)
assert changed_mailbox.email == mail2
assert out.activation is None
assert 0 == len(mail_sender.get_stored_emails())

2481
app/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff