4.65.4
Some checks failed
Build-Release-Image / Build-Image (linux/arm64) (push) Failing after 12m30s
Build-Release-Image / Build-Image (linux/amd64) (push) Has been cancelled
Build-Release-Image / Merge-Images (push) Has been cancelled
Build-Release-Image / Create-Release (push) Has been cancelled
Build-Release-Image / Notify (push) Has been cancelled

This commit is contained in:
MrMeeb 2025-02-11 12:00:08 +00:00
parent 2904d04a2c
commit 0fa4b1b7ee
9 changed files with 96 additions and 21 deletions

View File

@ -48,6 +48,7 @@ from app.models import (
CustomDomain,
)
from app.newsletter_utils import send_newsletter_to_user, send_newsletter_to_address
from app.proton.proton_unlink import perform_proton_account_unlink
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
@ -125,7 +126,7 @@ class SLAdminIndexView(AdminIndexView):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect(url_for("auth.login", next=request.url))
return redirect("/admin/email_search")
return redirect(url_for("admin.email_search.index"))
class UserAdmin(SLModelView):
@ -917,7 +918,7 @@ class EmailSearchAdmin(BaseView):
@expose("/", methods=["GET", "POST"])
def index(self):
search = EmailSearchResult()
email = request.args.get("email")
email = request.args.get("query")
if email is not None and len(email) > 0:
email = email.strip()
search = EmailSearchResult.from_request_email(email)
@ -929,6 +930,37 @@ class EmailSearchAdmin(BaseView):
helper=EmailSearchHelpers,
)
@expose("/partner_unlink", methods=["POST"])
def delete_partner_link(self):
user_id = request.form.get("user_id")
if not user_id:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index"))
try:
user_id = int(user_id)
except ValueError:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
user = User.get(user_id)
if user is None:
flash("User not found", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
external_user_id = perform_proton_account_unlink(user, skip_check=True)
if not external_user_id:
flash("User unlinked", "success")
return redirect(url_for("admin.email_search.index", query=user_id))
AdminAuditLog.create(
admin_user_id=user.id,
model=User.__class__.__name__,
model_id=user.id,
action=AuditLogActionEnum.unlink_user.value,
data={"external_user_id": external_user_id},
)
Session.commit()
return redirect(url_for("admin.email_search.index", query=user_id))
class CustomDomainWithValidationData:
def __init__(self, domain: CustomDomain):

View File

@ -62,8 +62,17 @@ def new_custom_alias_v2():
if not data:
return jsonify(error="request body cannot be empty"), 400
alias_prefix = data.get("alias_prefix", "").strip().lower().replace(" ", "")
signed_suffix = data.get("signed_suffix", "").strip()
alias_prefix = data.get("alias_prefix", "")
if not isinstance(alias_prefix, str) or not alias_prefix:
return jsonify(error="invalid value for alias_prefix"), 400
alias_prefix = alias_prefix.strip().lower().replace(" ", "")
signed_suffix = data.get("signed_suffix", "")
if not isinstance(signed_suffix, str) or not signed_suffix:
return jsonify(error="invalid value for signed_suffix"), 400
signed_suffix = signed_suffix.strip()
note = data.get("note")
alias_prefix = convert_to_id(alias_prefix)

View File

@ -44,7 +44,7 @@ class RequestIdFilter(logging.Filter):
from flask import g, has_request_context
request_id = ""
if has_request_context():
if has_request_context() and hasattr(g, "request_id"):
ctx_request_id = getattr(g, "request_id")
if ctx_request_id:
request_id = f"{ctx_request_id} - "

View File

@ -247,7 +247,7 @@ def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
message=f"Verify mailbox {mailbox_id} ({mailbox.email})",
)
if Mailbox.get_by(email=mailbox.new_email, user_id=user.id):
raise MailboxError("That addres is already in use")
raise MailboxError("That address is already in use")
else:
LOG.i(
@ -354,6 +354,7 @@ def request_mailbox_email_change(
if email_ownership_verified:
mailbox.email = new_email
mailbox.new_email = None
mailbox.verified = True
else:
mailbox.new_email = new_email
emit_user_audit_log(

View File

@ -238,6 +238,7 @@ class AuditLogActionEnum(EnumE):
disable_user = 9
enable_user = 10
stop_trial = 11
unlink_user = 12
class Phase(EnumE):

View File

@ -13,9 +13,11 @@ def can_unlink_proton_account(user: User) -> bool:
return (user.flags & User.FLAG_CREATED_FROM_PARTNER) == 0
def perform_proton_account_unlink(current_user: User) -> bool:
if not can_unlink_proton_account(current_user):
return False
def perform_proton_account_unlink(
current_user: User, skip_check: bool = False
) -> None | str:
if not skip_check and not can_unlink_proton_account(current_user):
return None
proton_partner = get_proton_partner()
partner_user = PartnerUser.get_by(
user_id=current_user.id, partner_id=proton_partner.id
@ -31,6 +33,7 @@ def perform_proton_account_unlink(current_user: User) -> bool:
partner_user.user, EventContent(user_unlinked=UserUnlinked())
)
PartnerUser.delete(partner_user.id)
external_user_id = partner_user.external_user_id
Session.commit()
agent.record_custom_event("AccountUnlinked", {"partner": proton_partner.name})
return True
return external_user_id

View File

@ -446,10 +446,10 @@ def init_admin(app):
admin = Admin(name="SimpleLogin", template_mode="bootstrap4")
admin.init_app(app, index_view=SLAdminIndexView())
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search"))
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="admin.email_search"))
admin.add_view(
CustomDomainSearchAdmin(
name="Custom domain search", endpoint="custom_domain_search"
name="Custom domain search", endpoint="admin.custom_domain_search"
)
)
admin.add_view(UserAdmin(User, Session))

View File

@ -22,7 +22,7 @@
<tr>
<td>{{ user.id }}</td>
<td>
<a href="?email={{ user.email }}">{{ user.email }}</a>
<a href="?query={{ user.email }}">{{ user.email }}</a>
</td>
{% if user.activated %}
@ -43,8 +43,16 @@
<td>{{ user.updated_at }}</td>
{% if pu %}
<td>
<a href="?email={{ pu.partner_email }}">{{ pu.partner_email }}</a>
<td class="flex">
<a href="?query={{ pu.partner_email }}">{{ pu.partner_email }}</a>
<form class="d-inline"
action="{{ url_for("admin.email_search.delete_partner_link") }}"
method="POST">
<input type="hidden" name="user_id" value="{{ user.id }}">
<button type="submit"
onclick="return confirm('Are you sure you would like to unlink the user?');"
class="btn btn-danger d-inline">Unlink</button>
</form>
</td>
{% else %}
<td>No</td>
@ -72,7 +80,7 @@
<tr>
<td>{{ mailbox.id }}</td>
<td>
<a href="?email={{ mailbox.email }}">{{ mailbox.email }}</a>
<a href="?query={{ mailbox.email }}">{{ mailbox.email }}</a>
</td>
<td>{{ "Yes" if mailbox.verified else "No" }}</td>
<td>{{ mailbox.created_at }}</td>
@ -101,7 +109,7 @@
<tr>
<td>{{ alias.id }}</td>
<td>
<a href="?email={{ alias.email }}">{{ alias.email }}</a>
<a href="?query={{ alias.email }}">{{ alias.email }}</a>
</td>
<td>{{ "Yes" if alias.enabled else "No" }}</td>
<td>{{ alias.created_at }}</td>
@ -181,7 +189,7 @@
<td>{{ entry.user_id }}</td>
<td>{{ entry.alias_id }}</td>
<td>
<a href="?email={{ entry.alias_email }}">{{ entry.alias_email }}</a>
<a href="?query={{ entry.alias_email }}">{{ entry.alias_email }}</a>
</td>
<td>{{ entry.action }}</td>
<td>{{ entry.message }}</td>
@ -207,7 +215,7 @@
<tr>
<td>
<a href="?email={{ entry.user_email }}">{{ entry.user_email }}</a>
<a href="?query={{ entry.user_email }}">{{ entry.user_email }}</a>
</td>
<td>{{ entry.action }}</td>
<td>{{ entry.message }}</td>
@ -222,10 +230,10 @@
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<form method="get">
<div class="form-group">
<label for="email">Email to search:</label>
<label for="email">UserID or Email to search:</label>
<input type="text"
class="form-control"
name="email"
name="query"
value="{{ email or '' }}" />
</div>
<button type="submit" class="btn btn-primary">Submit</button>

View File

@ -641,3 +641,24 @@ def test_change_mailbox_verified_email_clears_pending_email(flask_client):
assert out.activation is None
assert out.mailbox.email == new_email
assert out.mailbox.new_email is None
def test_change_mailbox_verified_email_sets_mailbox_as_verified(flask_client):
user = create_new_user()
domain = f"{random_string(10)}.com"
mail = f"mail_1@{domain}"
mbox1 = Mailbox.create(
email=mail,
new_email=f"oldpending_{mail}",
user_id=user.id,
verified=False,
flush=True,
)
new_email = f"new_{mail}"
out = request_mailbox_email_change(
user, mbox1, new_email, email_ownership_verified=True
)
assert out.activation is None
assert out.mailbox.email == new_email
assert out.mailbox.new_email is None
assert out.mailbox.verified is True