4.55.1
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m28s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m31s
Build-Release-Image / Merge-Images (push) Successful in 16s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 4s

This commit is contained in:
MrMeeb 2024-10-19 12:00:05 +01:00
parent da6e56c4eb
commit bc48198bb1
11 changed files with 105 additions and 23 deletions

View File

@ -115,7 +115,7 @@ class SLAdminIndexView(AdminIndexView):
if not current_user.is_authenticated or not current_user.is_admin: if not current_user.is_authenticated or not current_user.is_admin:
return redirect(url_for("auth.login", next=request.url)) return redirect(url_for("auth.login", next=request.url))
return redirect("/admin/user") return redirect("/admin/email_search")
class UserAdmin(SLModelView): class UserAdmin(SLModelView):
@ -743,13 +743,17 @@ class EmailSearchResult:
mailbox: List[Mailbox] = [] mailbox: List[Mailbox] = []
mailbox_count: int = 0 mailbox_count: int = 0
deleted_alias: Optional[DeletedAlias] = None deleted_alias: Optional[DeletedAlias] = None
deleted_custom_alias: Optional[DomainDeletedAlias] = None deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None
domain_deleted_alias: Optional[DomainDeletedAlias] = None
domain_deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None
user: Optional[User] = None user: Optional[User] = None
user_audit_log: Optional[List[UserAuditLog]] = None user_audit_log: Optional[List[UserAuditLog]] = None
query: str
@staticmethod @staticmethod
def from_email(email: str) -> EmailSearchResult: def from_email(email: str) -> EmailSearchResult:
output = EmailSearchResult() output = EmailSearchResult()
output.query = email
alias = Alias.get_by(email=email) alias = Alias.get_by(email=email)
if alias: if alias:
output.alias = alias output.alias = alias
@ -768,6 +772,15 @@ class EmailSearchResult:
.all() .all()
) )
output.no_match = False output.no_match = False
user_audit_log = (
UserAuditLog.filter_by(user_email=email)
.order_by(UserAuditLog.created_at.desc())
.all()
)
if user_audit_log:
output.user_audit_log = user_audit_log
output.no_match = False
mailboxes = ( mailboxes = (
Mailbox.filter_by(email=email).order_by(Mailbox.id.desc()).limit(10).all() Mailbox.filter_by(email=email).order_by(Mailbox.id.desc()).limit(10).all()
) )
@ -778,10 +791,20 @@ class EmailSearchResult:
deleted_alias = DeletedAlias.get_by(email=email) deleted_alias = DeletedAlias.get_by(email=email)
if deleted_alias: if deleted_alias:
output.deleted_alias = deleted_alias output.deleted_alias = deleted_alias
output.deleted_alias_audit_log = (
AliasAuditLog.filter_by(alias_email=deleted_alias.email)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False output.no_match = False
domain_deleted_alias = DomainDeletedAlias.get_by(email=email) domain_deleted_alias = DomainDeletedAlias.get_by(email=email)
if domain_deleted_alias: if domain_deleted_alias:
output.domain_deleted_alias = domain_deleted_alias output.domain_deleted_alias = domain_deleted_alias
output.domain_deleted_alias_audit_log = (
AliasAuditLog.filter_by(alias_email=domain_deleted_alias.email)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False output.no_match = False
return output return output

View File

@ -23,6 +23,7 @@ from app.events.auth_event import LoginEvent, RegisterEvent
from app.extensions import limiter from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import User, ApiKey, SocialAuth, AccountActivation from app.models import User, ApiKey, SocialAuth, AccountActivation
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.utils import sanitize_email, canonicalize_email from app.utils import sanitize_email, canonicalize_email
@ -187,6 +188,11 @@ def auth_activate():
LOG.d("activate user %s", user) LOG.d("activate user %s", user)
user.activated = True user.activated = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.ActivateUser,
message=f"User has been activated: {user.email}",
)
AccountActivation.delete(account_activation.id) AccountActivation.delete(account_activation.id)
Session.commit() Session.commit()

View File

@ -7,6 +7,7 @@ from app.db import Session
from app.extensions import limiter from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import ActivationCode from app.models import ActivationCode
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.utils import sanitize_next_url from app.utils import sanitize_next_url
@ -47,6 +48,11 @@ def activate():
user = activation_code.user user = activation_code.user
user.activated = True user.activated = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.ActivateUser,
message=f"User has been activated: {user.email}",
)
login_user(user) login_user(user)
# activation code is to be used only once # activation code is to be used only once

View File

@ -9,6 +9,7 @@ from app.auth.views.login_utils import after_login
from app.db import Session from app.db import Session
from app.extensions import limiter from app.extensions import limiter
from app.models import ResetPasswordCode from app.models import ResetPasswordCode
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
class ResetPasswordForm(FlaskForm): class ResetPasswordForm(FlaskForm):
@ -59,6 +60,11 @@ def reset_password():
# this can be served to activate user too # this can be served to activate user too
user.activated = True user.activated = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.ResetPassword,
message="User has reset their password",
)
# remove all reset password codes # remove all reset password codes
ResetPasswordCode.filter_by(user_id=user.id).delete() ResetPasswordCode.filter_by(user_id=user.id).delete()

View File

@ -616,6 +616,15 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if "alternative_id" not in kwargs: if "alternative_id" not in kwargs:
user.alternative_id = str(uuid.uuid4()) user.alternative_id = str(uuid.uuid4())
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
trail = ". Created from partner" if from_partner else ""
emit_user_audit_log(
user=user,
action=UserAuditLogAction.CreateUser,
message=f"Created user {email}{trail}",
)
# If the user is created from partner, do not notify # If the user is created from partner, do not notify
# nor give a trial # nor give a trial
if from_partner: if from_partner:

View File

@ -4,6 +4,10 @@ from app.models import User, UserAuditLog
class UserAuditLogAction(Enum): class UserAuditLogAction(Enum):
CreateUser = "create_user"
ActivateUser = "activate_user"
ResetPassword = "reset_password"
Upgrade = "upgrade" Upgrade = "upgrade"
SubscriptionExtended = "subscription_extended" SubscriptionExtended = "subscription_extended"
SubscriptionCancelled = "subscription_cancelled" SubscriptionCancelled = "subscription_cancelled"

View File

@ -442,10 +442,10 @@ def init_admin(app):
admin = Admin(name="SimpleLogin", template_mode="bootstrap4") admin = Admin(name="SimpleLogin", template_mode="bootstrap4")
admin.init_app(app, index_view=SLAdminIndexView()) admin.init_app(app, index_view=SLAdminIndexView())
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search"))
admin.add_view(UserAdmin(User, Session)) admin.add_view(UserAdmin(User, Session))
admin.add_view(AliasAdmin(Alias, Session)) admin.add_view(AliasAdmin(Alias, Session))
admin.add_view(MailboxAdmin(Mailbox, Session)) admin.add_view(MailboxAdmin(Mailbox, Session))
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search"))
admin.add_view(CouponAdmin(Coupon, Session)) admin.add_view(CouponAdmin(Coupon, Session))
admin.add_view(ManualSubscriptionAdmin(ManualSubscription, Session)) admin.add_view(ManualSubscriptionAdmin(ManualSubscription, Session))
admin.add_view(CustomDomainAdmin(CustomDomain, Session)) admin.add_view(CustomDomainAdmin(CustomDomain, Session))

View File

@ -239,6 +239,11 @@
{{ show_user(data.user) }} {{ show_user(data.user) }}
{{ list_mailboxes("Mailboxes for user", helper.mailbox_count(data.user) , helper.mailbox_list(data.user) ) }} {{ list_mailboxes("Mailboxes for user", helper.mailbox_count(data.user) , helper.mailbox_list(data.user) ) }}
{{ list_alias(helper.alias_count(data.user) ,helper.alias_list(data.user)) }} {{ list_alias(helper.alias_count(data.user) ,helper.alias_list(data.user)) }}
</div>
{% endif %}
{% if data.user_audit_log %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Audit log entries for user {{ data.query }}</h3>
{{ list_user_audit_log(data.user_audit_log) }} {{ list_user_audit_log(data.user_audit_log) }}
</div> </div>
{% endif %} {% endif %}
@ -260,6 +265,7 @@
<div class="border border-dark mt-1 mb-2 p-3"> <div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found DeletedAlias {{ data.deleted_alias.email }}</h3> <h3 class="mb-3">Found DeletedAlias {{ data.deleted_alias.email }}</h3>
{{ show_deleted_alias(data.deleted_alias) }} {{ show_deleted_alias(data.deleted_alias) }}
{{ list_alias_audit_log(data.deleted_alias_audit_log) }}
</div> </div>
{% endif %} {% endif %}
{% if data.domain_deleted_alias %} {% if data.domain_deleted_alias %}
@ -267,6 +273,7 @@
<div class="border border-dark mt-1 mb-2 p-3"> <div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}</h3> <h3 class="mb-3">Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}</h3>
{{ show_domain_deleted_alias(data.domain_deleted_alias) }} {{ show_domain_deleted_alias(data.domain_deleted_alias) }}
{{ list_alias_audit_log(data.domain_deleted_alias_audit_log) }}
</div> </div>
{% endif %} {% endif %}
{% endblock %} {% endblock %}

View File

@ -94,10 +94,12 @@ def test_login_case_from_partner():
) )
assert res.user.activated is True assert res.user.activated is True
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all() audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1 assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
def test_login_case_from_partner_with_uppercase_email(): def test_login_case_from_partner_with_uppercase_email():
@ -133,7 +135,10 @@ def test_login_case_from_web():
assert 0 == (res.user.flags & User.FLAG_CREATED_FROM_PARTNER) assert 0 == (res.user.flags & User.FLAG_CREATED_FROM_PARTNER)
assert res.user.activated is True assert res.user.activated is True
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all() audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1 assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
@ -218,7 +223,10 @@ def test_link_account_with_proton_account_same_address(flask_client):
) )
assert partner_user.partner_id == get_proton_partner().id assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id assert partner_user.external_user_id == partner_user_id
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all() audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1 assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
@ -246,7 +254,10 @@ def test_link_account_with_proton_account_different_address(flask_client):
assert partner_user.partner_id == get_proton_partner().id assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id assert partner_user.external_user_id == partner_user_id
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all() audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1 assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
@ -304,19 +315,19 @@ def test_link_account_with_proton_account_same_address_but_linked_to_other_user(
# Ensure audit logs for sl_user_1 show the link action # Ensure audit logs for sl_user_1 show the link action
sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by( sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_1.id user_id=sl_user_1.id,
action=UserAuditLogAction.LinkAccount.value,
).all() ).all()
assert len(sl_user_1_audit_logs) == 1 assert len(sl_user_1_audit_logs) == 1
assert sl_user_1_audit_logs[0].user_id == sl_user_1.id assert sl_user_1_audit_logs[0].user_id == sl_user_1.id
assert sl_user_1_audit_logs[0].action == UserAuditLogAction.LinkAccount.value
# Ensure audit logs for sl_user_2 show the unlink action # Ensure audit logs for sl_user_2 show the unlink action
sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by( sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_2.id user_id=sl_user_2.id,
action=UserAuditLogAction.UnlinkAccount.value,
).all() ).all()
assert len(sl_user_2_audit_logs) == 1 assert len(sl_user_2_audit_logs) == 1
assert sl_user_2_audit_logs[0].user_id == sl_user_2.id assert sl_user_2_audit_logs[0].user_id == sl_user_2.id
assert sl_user_2_audit_logs[0].action == UserAuditLogAction.UnlinkAccount.value
def test_link_account_with_proton_account_different_address_and_linked_to_other_user( def test_link_account_with_proton_account_different_address_and_linked_to_other_user(
@ -356,19 +367,19 @@ def test_link_account_with_proton_account_different_address_and_linked_to_other_
# Ensure audit logs for sl_user_1 show the link action # Ensure audit logs for sl_user_1 show the link action
sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by( sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_1.id user_id=sl_user_1.id,
action=UserAuditLogAction.LinkAccount.value,
).all() ).all()
assert len(sl_user_1_audit_logs) == 1 assert len(sl_user_1_audit_logs) == 1
assert sl_user_1_audit_logs[0].user_id == sl_user_1.id assert sl_user_1_audit_logs[0].user_id == sl_user_1.id
assert sl_user_1_audit_logs[0].action == UserAuditLogAction.LinkAccount.value
# Ensure audit logs for sl_user_2 show the unlink action # Ensure audit logs for sl_user_2 show the unlink action
sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by( sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_2.id user_id=sl_user_2.id,
action=UserAuditLogAction.UnlinkAccount.value,
).all() ).all()
assert len(sl_user_2_audit_logs) == 1 assert len(sl_user_2_audit_logs) == 1
assert sl_user_2_audit_logs[0].user_id == sl_user_2.id assert sl_user_2_audit_logs[0].user_id == sl_user_2.id
assert sl_user_2_audit_logs[0].action == UserAuditLogAction.UnlinkAccount.value
def test_cannot_create_instance_of_base_strategy(): def test_cannot_create_instance_of_base_strategy():

View File

@ -351,7 +351,9 @@ def test_perform_mailbox_email_change_valid_id_not_new_email():
res = mailbox_utils.perform_mailbox_email_change(mb.id) res = mailbox_utils.perform_mailbox_email_change(mb.id)
assert res.error == MailboxEmailChangeError.InvalidId assert res.error == MailboxEmailChangeError.InvalidId
assert res.message_category == "error" assert res.message_category == "error"
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count() audit_log_entries = UserAuditLog.filter_by(
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 0 assert audit_log_entries == 0
@ -374,7 +376,9 @@ def test_perform_mailbox_email_change_valid_id_email_already_used():
res = mailbox_utils.perform_mailbox_email_change(mb_to_change.id) res = mailbox_utils.perform_mailbox_email_change(mb_to_change.id)
assert res.error == MailboxEmailChangeError.EmailAlreadyUsed assert res.error == MailboxEmailChangeError.EmailAlreadyUsed
assert res.message_category == "error" assert res.message_category == "error"
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count() audit_log_entries = UserAuditLog.filter_by(
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 0 assert audit_log_entries == 0
@ -398,6 +402,7 @@ def test_perform_mailbox_email_change_success():
assert db_mailbox.email == new_email assert db_mailbox.email == new_email
assert db_mailbox.new_email is None assert db_mailbox.new_email is None
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).all() audit_log_entries = UserAuditLog.filter_by(
assert len(audit_log_entries) == 1 user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
assert audit_log_entries[0].action == UserAuditLogAction.UpdateMailbox.value ).count()
assert audit_log_entries == 1

View File

@ -27,7 +27,9 @@ def test_emit_alias_audit_log_for_random_data():
commit=True, commit=True,
) )
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(user_id=user.id).all() logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=user.id, action=action.value
).all()
assert len(logs_for_user) == 1 assert len(logs_for_user) == 1
assert logs_for_user[0].user_id == user.id assert logs_for_user[0].user_id == user.id
assert logs_for_user[0].user_email == user.email assert logs_for_user[0].user_email == user.email
@ -41,7 +43,10 @@ def test_emit_audit_log_on_mailbox_creation():
user=user, email=random_email(), verified=True user=user, email=random_email(), verified=True
) )
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(user_id=user.id).all() logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=user.id,
action=UserAuditLogAction.CreateMailbox.value,
).all()
assert len(logs_for_user) == 1 assert len(logs_for_user) == 1
assert logs_for_user[0].user_id == user.id assert logs_for_user[0].user_id == user.id
assert logs_for_user[0].user_email == user.email assert logs_for_user[0].user_email == user.email