Compare commits

...

4 Commits

Author SHA1 Message Date
f1110506c0 4.39.3 2024-02-27 12:00:07 +00:00
f5bce7d7ff 4.39.2 2024-02-23 12:00:07 +00:00
75f45d9365 4.39.1 2024-02-20 12:00:07 +00:00
ead425e0c2 4.38.3 2024-02-14 12:00:07 +00:00
21 changed files with 336 additions and 77 deletions

View File

@ -168,6 +168,8 @@ class NewUserStrategy(ClientMergeStrategy):
class ExistingUnlinkedUserStrategy(ClientMergeStrategy):
def process(self) -> LinkResult:
# IF it was scheduled to be deleted. Unschedule it.
self.user.delete_on = None
partner_user = ensure_partner_user_exists_for_user(
self.link_request, self.user, self.partner
)
@ -246,6 +248,8 @@ def link_user(
) -> LinkResult:
# Sanitize email just in case
link_request.email = sanitize_email(link_request.email)
# If it was scheduled to be deleted. Unschedule it.
current_user.delete_on = None
partner_user = ensure_partner_user_exists_for_user(
link_request, current_user, partner
)

View File

@ -33,6 +33,9 @@ def authorize_request() -> Optional[Tuple[str, int]]:
if g.user.disabled:
return jsonify(error="Disabled account"), 403
if not g.user.is_active():
return jsonify(error="Account does not exist"), 401
g.api_key = api_key
return None

View File

@ -201,10 +201,10 @@ def get_alias_infos_with_pagination_v3(
q = q.order_by(Alias.pinned.desc())
q = q.order_by(latest_activity.desc())
q = list(q.limit(page_limit).offset(page_id * page_size))
q = q.limit(page_limit).offset(page_id * page_size)
ret = []
for alias, contact, email_log, nb_reply, nb_blocked, nb_forward in q:
for alias, contact, email_log, nb_reply, nb_blocked, nb_forward in list(q):
ret.append(
AliasInfo(
alias=alias,
@ -358,7 +358,6 @@ def construct_alias_query(user: User):
else_=0,
)
).label("nb_forward"),
func.max(EmailLog.created_at).label("latest_email_log_created_at"),
)
.join(EmailLog, Alias.id == EmailLog.alias_id, isouter=True)
.filter(Alias.user_id == user.id)
@ -366,14 +365,6 @@ def construct_alias_query(user: User):
.subquery()
)
alias_contact_subquery = (
Session.query(Alias.id, func.max(Contact.id).label("max_contact_id"))
.join(Contact, Alias.id == Contact.alias_id, isouter=True)
.filter(Alias.user_id == user.id)
.group_by(Alias.id)
.subquery()
)
return (
Session.query(
Alias,
@ -385,23 +376,7 @@ def construct_alias_query(user: User):
)
.options(joinedload(Alias.hibp_breaches))
.options(joinedload(Alias.custom_domain))
.join(Contact, Alias.id == Contact.alias_id, isouter=True)
.join(EmailLog, Contact.id == EmailLog.contact_id, isouter=True)
.join(EmailLog, Alias.last_email_log_id == EmailLog.id, isouter=True)
.join(Contact, EmailLog.contact_id == Contact.id, isouter=True)
.filter(Alias.id == alias_activity_subquery.c.id)
.filter(Alias.id == alias_contact_subquery.c.id)
.filter(
or_(
EmailLog.created_at
== alias_activity_subquery.c.latest_email_log_created_at,
and_(
# no email log yet for this alias
alias_activity_subquery.c.latest_email_log_created_at.is_(None),
# to make sure only 1 contact is returned in this case
or_(
Contact.id == alias_contact_subquery.c.max_contact_id,
alias_contact_subquery.c.max_contact_id.is_(None),
),
),
)
)
)

View File

@ -7,7 +7,7 @@ from app.config import URL, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET
from app.db import Session
from app.log import LOG
from app.models import User, File, SocialAuth
from app.utils import random_string, sanitize_email
from app.utils import random_string, sanitize_email, sanitize_next_url
from .login_utils import after_login
_authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth"
@ -29,7 +29,7 @@ def google_login():
# to avoid flask-login displaying the login error message
session.pop("_flashes", None)
next_url = request.args.get("next")
next_url = sanitize_next_url(request.args.get("next"))
# Google does not allow to append param to redirect_url
# we need to pass the next url by session

View File

@ -131,7 +131,7 @@ def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> Emai
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
return EmailLog.create(
email_log = EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
@ -142,6 +142,7 @@ def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> Emai
blocked=True,
commit=True,
)
return email_log
def apply_dmarc_policy_for_reply_phase(

View File

@ -727,6 +727,11 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return True
def is_active(self) -> bool:
if self.delete_on is None:
return True
return self.delete_on < arrow.now()
def in_trial(self):
"""return True if user does not have lifetime licence or an active subscription AND is in trial period"""
if self.lifetime_or_active_subscription():
@ -828,6 +833,9 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
Whether user can create a new alias. User can't create a new alias if
- has more than 15 aliases in the free plan, *even in the free trial*
"""
if not self.is_active():
return False
if self.disabled:
return False
@ -908,7 +916,11 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return sub
def verified_custom_domains(self) -> List["CustomDomain"]:
return CustomDomain.filter_by(user_id=self.id, ownership_verified=True).all()
return (
CustomDomain.filter_by(user_id=self.id, ownership_verified=True)
.order_by(CustomDomain.domain.asc())
.all()
)
def mailboxes(self) -> List["Mailbox"]:
"""list of mailbox that user own"""
@ -1496,6 +1508,8 @@ class Alias(Base, ModelMixin):
TSVector(), sa.Computed("to_tsvector('english', note)", persisted=True)
)
last_email_log_id = sa.Column(sa.Integer, default=None, nullable=True)
__table_args__ = (
Index("ix_video___ts_vector__", ts_vector, postgresql_using="gin"),
# index on note column using pg_trgm
@ -2055,6 +2069,20 @@ class EmailLog(Base, ModelMixin):
def get_dashboard_url(self):
return f"{config.URL}/dashboard/refused_email?highlight_id={self.id}"
@classmethod
def create(cls, *args, **kwargs):
commit = kwargs.pop("commit", False)
email_log = super().create(*args, **kwargs)
Session.flush()
if "alias_id" in kwargs:
sql = "UPDATE alias SET last_email_log_id = :el_id WHERE id = :alias_id"
Session.execute(
sql, {"el_id": email_log.id, "alias_id": kwargs["alias_id"]}
)
if commit:
Session.commit()
return email_log
def __repr__(self):
return f"<EmailLog {self.id}>"

View File

@ -140,7 +140,7 @@ def authorize():
Scope=Scope,
)
else: # POST - user allows or denies
if not current_user.is_authenticated or not current_user.is_active:
if not current_user.is_authenticated or not current_user.is_active():
LOG.i(
"Attempt to validate a OAUth allow request by an unauthenticated user"
)

View File

@ -6,7 +6,7 @@ import redis.exceptions
import werkzeug.exceptions
from limits.storage import RedisStorage
from app.log import log
from app.log import LOG
lock_redis: Optional[RedisStorage] = None
@ -22,17 +22,19 @@ def check_bucket_limit(
bucket_seconds: int = 3600,
):
# Calculate current bucket time
bucket_id = int(datetime.utcnow().timestamp()) % bucket_seconds
int_time = int(datetime.utcnow().timestamp())
bucket_id = int_time - (int_time % bucket_seconds)
bucket_lock_name = f"bl:{lock_name}:{bucket_id}"
if not lock_redis:
return
try:
value = lock_redis.incr(bucket_lock_name, bucket_seconds)
if value > max_hits:
LOG.i(f"Rate limit hit for {bucket_lock_name} -> {value}/{max_hits}")
newrelic.agent.record_custom_event(
"BucketRateLimit",
{"lock_name": lock_name, "bucket_seconds": bucket_seconds},
)
raise werkzeug.exceptions.TooManyRequests()
except (redis.exceptions.RedisError, AttributeError):
log.e("Cannot connect to redis")
LOG.e("Cannot connect to redis")

View File

@ -62,6 +62,8 @@ from app.proton.utils import get_proton_partner
from app.utils import sanitize_email
from server import create_light_app
DELETE_GRACE_DAYS = 30
def notify_trial_end():
for user in User.filter(
@ -1126,14 +1128,19 @@ def notify_hibp():
Session.commit()
def clear_users_scheduled_to_be_deleted():
def clear_users_scheduled_to_be_deleted(dry_run=False):
users = User.filter(
and_(User.delete_on.isnot(None), User.delete_on < arrow.now())
and_(
User.delete_on.isnot(None),
User.delete_on <= arrow.now().shift(days=-DELETE_GRACE_DAYS),
)
).all()
for user in users:
LOG.i(
f"Scheduled deletion of user {user} with scheduled delete on {user.delete_on}"
)
if dry_run:
continue
User.delete(user.id)
Session.commit()
@ -1206,4 +1213,4 @@ if __name__ == "__main__":
load_unsent_mails_from_fs_and_resend()
elif args.job == "delete_scheduled_users":
LOG.d("Deleting users scheduled to be deleted")
clear_users_scheduled_to_be_deleted()
clear_users_scheduled_to_be_deleted(dry_run=True)

View File

@ -62,7 +62,7 @@ jobs:
captureStderr: true
- name: SimpleLogin delete users scheduled to be deleted
command: echo disabled_user_deletion #python /code/cron.py -j delete_scheduled_users
command: python /code/cron.py -j delete_scheduled_users
shell: /bin/bash
schedule: "15 11 * * *"
captureStderr: true

View File

@ -236,15 +236,16 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
Session.commit()
else:
try:
contact_email_for_reply = (
contact_email if is_valid_email(contact_email) else ""
)
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
mail_from=mail_from,
reply_email=generate_reply_email(contact_email, alias)
if is_valid_email(contact_email)
else NOREPLY,
reply_email=generate_reply_email(contact_email_for_reply, alias),
automatic_created=True,
)
if not contact_email:
@ -636,6 +637,10 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
user = alias.user
if not user.is_active():
LOG.w(f"User {user} has been soft deleted")
return False, status.E502
if not user.can_send_or_receive():
LOG.i(f"User {user} cannot receive emails")
if should_ignore_bounce(envelope.mail_from):
@ -1055,6 +1060,9 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
if not contact:
LOG.w(f"No contact with {reply_email} as reverse alias")
return False, status.E502
if not contact.user.is_active():
LOG.w(f"User {contact.user} has been soft deleted")
return False, status.E502
alias = contact.alias
alias_address: str = contact.alias.email
@ -1921,6 +1929,9 @@ def handle_bounce(envelope, email_log: EmailLog, msg: Message) -> str:
contact,
alias,
)
if not email_log.user.is_active():
LOG.d(f"User {email_log.user} is not active")
return status.E510
if email_log.is_reply:
content_type = msg.get_content_type().lower()
@ -1982,6 +1993,9 @@ def send_no_reply_response(mail_from: str, msg: Message):
if not mailbox:
LOG.d("Unknown sender. Skipping reply from {}".format(NOREPLY))
return
if not mailbox.user.is_active():
LOG.d(f"User {mailbox.user} is soft-deleted. Skipping sending reply response")
return
send_email_at_most_times(
mailbox.user,
ALERT_TO_NOREPLY,

View File

@ -7460,9 +7460,7 @@ villain
vindicate
vineyard
vintage
violate
violation
violator
violet
violin
viper

View File

@ -0,0 +1,29 @@
"""empty message
Revision ID: 818b0a956205
Revises: 4bc54632d9aa
Create Date: 2024-02-01 10:43:46.253184
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '818b0a956205'
down_revision = '4bc54632d9aa'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('alias', sa.Column('last_email_log_id', sa.Integer(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('alias', 'last_email_log_id')
# ### end Alembic commands ###

View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
import argparse
import time
from sqlalchemy import func
from app.models import Alias
from app.db import Session
parser = argparse.ArgumentParser(
prog="Backfill alias", description="Backfill alias las use"
)
parser.add_argument(
"-s", "--start_alias_id", default=0, type=int, help="Initial alias_id"
)
parser.add_argument("-e", "--end_alias_id", default=0, type=int, help="Last alias_id")
args = parser.parse_args()
alias_id_start = args.start_alias_id
max_alias_id = args.end_alias_id
if max_alias_id == 0:
max_alias_id = Session.query(func.max(Alias.id)).scalar()
print(f"Checking alias {alias_id_start} to {max_alias_id}")
step = 1000
el_query = "SELECT alias_id, MAX(id) from email_log where alias_id>=:start AND alias_id < :end GROUP BY alias_id"
alias_query = "UPDATE alias set last_email_log_id = :el_id where id = :alias_id"
updated = 0
start_time = time.time()
for batch_start in range(alias_id_start, max_alias_id, step):
rows = Session.execute(el_query, {"start": batch_start, "end": batch_start + step})
for row in rows:
Session.execute(alias_query, {"alias_id": row[0], "el_id": row[1]})
Session.commit()
updated += 1
elapsed = time.time() - start_time
time_per_alias = elapsed / (updated + 1)
last_batch_id = batch_start + step
remaining = max_alias_id - last_batch_id
time_remaining = (max_alias_id - last_batch_id) * time_per_alias
hours_remaining = time_remaining / 3600.0
print(
f"\rAlias {batch_start}/{max_alias_id} {updated} {hours_remaining:.2f}hrs remaining"
)
print("")

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
import argparse
import time
from app import config
from app.email_utils import generate_reply_email
from app.email_validation import is_valid_email
from app.models import Alias
from app.db import Session
parser = argparse.ArgumentParser(
prog=f"Replace {config.NOREPLY}",
description=f"Replace {config.NOREPLY} from contacts reply email",
)
args = parser.parse_args()
el_query = "SELECT id, alias_id, website_email from contact where id>=:last_id AND reply_email=:reply_email ORDER BY id ASC LIMIT :step"
update_query = "UPDATE contact SET reply_email=:reply_email WHERE id=:contact_id "
updated = 0
start_time = time.time()
step = 100
last_id = 0
print(f"Replacing contacts with reply_email={config.NOREPLY}")
while True:
rows = Session.execute(
el_query, {"last_id": last_id, "reply_email": config.NOREPLY, "step": step}
)
loop_updated = 0
for row in rows:
contact_id = row[0]
alias_id = row[1]
last_id = contact_id
website_email = row[2]
contact_email_for_reply = website_email if is_valid_email(website_email) else ""
alias = Alias.get(alias_id)
if alias is None:
print(f"CANNOT find alias {alias_id} in database for contact {contact_id}")
reply_email = generate_reply_email(contact_email_for_reply, alias)
print(
f"Replacing contact {contact_id} with {website_email} reply_email for {reply_email}"
)
Session.execute(
update_query, {"contact_id": row[0], "reply_email": reply_email}
)
Session.commit()
updated += 1
loop_updated += 1
elapsed = time.time() - start_time
print(f"\rContact {last_id} done")
if loop_updated == 0:
break
print("")

View File

@ -228,6 +228,8 @@ def load_user(alternative_id):
sentry_sdk.set_user({"email": user.email, "id": user.id})
if user.disabled:
return None
if not user.is_active():
return None
return user

View File

@ -40,14 +40,16 @@ def test_get_notifications(flask_client):
def test_mark_notification_as_read(flask_client):
user, api_key = get_new_user_and_api_key()
Notification.create(id=1, user_id=user.id, message="Test message 1")
notif_id = Notification.create(
user_id=user.id, message="Test message 1", flush=True
).id
Session.commit()
r = flask_client.post(
url_for("api.mark_as_read", notification_id=1),
url_for("api.mark_as_read", notification_id=notif_id),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
notification = Notification.first()
notification = Notification.filter_by(id=notif_id).first()
assert notification.read

View File

@ -1,8 +1,8 @@
from app.api.serializer import get_alias_infos_with_pagination_v3
from app.config import PAGE_LIMIT
from app.db import Session
from app.models import Alias, Mailbox, Contact
from tests.utils import create_new_user
from app.models import Alias, Mailbox, Contact, EmailLog
from tests.utils import create_new_user, random_email
def test_get_alias_infos_with_pagination_v3(flask_client):
@ -155,3 +155,46 @@ def test_get_alias_infos_pinned_alias(flask_client):
# pinned alias isn't included in the search
alias_infos = get_alias_infos_with_pagination_v3(user, query="no match")
assert len(alias_infos) == 0
def test_get_alias_infos_with_no_last_email_log(flask_client):
user = create_new_user()
alias_infos = get_alias_infos_with_pagination_v3(user)
assert len(alias_infos) == 1
row = alias_infos[0]
assert row.alias.id == user.newsletter_alias_id
assert row.latest_contact is None
assert row.latest_email_log is None
def test_get_alias_infos_with_email_log_no_contact():
user = create_new_user()
contact = Contact.create(
user_id=user.id,
alias_id=user.newsletter_alias_id,
website_email="a@a.com",
reply_email=random_email(),
flush=True,
)
Contact.create(
user_id=user.id,
alias_id=user.newsletter_alias_id,
website_email="unused@a.com",
reply_email=random_email(),
flush=True,
)
EmailLog.create(
user_id=user.id,
alias_id=user.newsletter_alias_id,
contact_id=contact.id,
commit=True,
)
alias_infos = get_alias_infos_with_pagination_v3(user)
assert len(alias_infos) == 1
row = alias_infos[0]
assert row.alias.id == user.newsletter_alias_id
assert row.latest_contact is not None
assert row.latest_contact.id == contact.id
assert row.latest_email_log is not None
alias = Alias.get(id=user.newsletter_alias_id)
assert row.latest_email_log.id == alias.last_email_log_id

View File

@ -6,16 +6,27 @@ from tests.utils import create_new_user
def test_unactivated_user_login(flask_client):
user = create_new_user()
user.activated = False
Session.commit()
"""
Test function for logging in with an unactivated user.
Steps:
1. Creates a new user.
2. Sets the user's activated status to False.
3. Sends a POST request to the login route with user credentials.
4. Checks the response status code and content for expected messages.
"""
user = create_new_user() # Creating a new user
user.activated = False # Setting the user's activated status to False
Session.commit() # Committing the session changes
# Sending a POST request to the login route with user credentials and following redirects
r = flask_client.post(
url_for("auth.login"),
data={"email": user.email, "password": "password"},
follow_redirects=True,
)
# Asserting the response status code and content for expected messages
assert r.status_code == 200
assert (
b"Please check your inbox for the activation email. You can also have this email re-sent"
@ -24,59 +35,98 @@ def test_unactivated_user_login(flask_client):
def test_non_canonical_login(flask_client):
email = f"pre.{random_string(10)}@gmail.com"
name = f"NAME-{random_string(10)}"
user = create_new_user(email, name)
Session.commit()
"""
Test function for logging in with a non-canonical email.
Steps:
1. Creates a new user with a non-canonical email.
2. Sends a POST request to the login route with user credentials.
3. Checks the response status code and content for expected messages.
4. Checks the canonicalization of the email.
5. Logs out the user.
6. Sends a POST request to the login route with the canonicalized email.
7. Checks the response status code and content for expected messages.
"""
email = f"pre.{random_string(10)}@gmail.com" # Generating a non-canonical email
name = f"NAME-{random_string(10)}" # Generating a random name
user = create_new_user(
email, name
) # Creating a new user with the generated email and name
Session.commit() # Committing the session changes
# Sending a POST request to the login route with user credentials and following redirects
r = flask_client.post(
url_for("auth.login"),
data={"email": user.email, "password": "password"},
follow_redirects=True,
)
# Asserting the response status code and content for expected messages
assert r.status_code == 200
assert name.encode("utf-8") in r.data
# Canonicalizing the email
canonical_email = canonicalize_email(email)
assert canonical_email != email
assert (
canonical_email != email
) # Checking if the canonical email is different from the original email
flask_client.get(url_for("auth.logout"))
flask_client.get(url_for("auth.logout")) # Logging out the user
# Sending a POST request to the login route with the canonicalized email and following redirects
r = flask_client.post(
url_for("auth.login"),
data={"email": canonical_email, "password": "password"},
follow_redirects=True,
)
# Asserting the response status code and content for expected messages
assert r.status_code == 200
assert name.encode("utf-8") not in r.data
def test_canonical_login_with_non_canonical_email(flask_client):
suffix = f"{random_string(10)}@gmail.com"
canonical_email = f"pre{suffix}"
non_canonical_email = f"pre.{suffix}"
name = f"NAME-{random_string(10)}"
create_new_user(canonical_email, name)
Session.commit()
"""
Test function for logging in with a canonical email and a non-canonical email.
Steps:
1. Generates canonical and non-canonical email addresses.
2. Creates a new user with the canonical email.
3. Sends a POST request to the login route with the non-canonical email.
4. Checks the response status code and content for expected messages.
5. Logs out the user.
6. Sends a POST request to the login route with the canonical email.
7. Checks the response status code and content for expected messages.
"""
suffix = f"{random_string(10)}@gmail.com" # Generating a random suffix for emails
canonical_email = f"pre{suffix}" # Generating a canonical email
non_canonical_email = f"pre.{suffix}" # Generating a non-canonical email
name = f"NAME-{random_string(10)}" # Generating a random name
create_new_user(
canonical_email, name
) # Creating a new user with the canonical email
Session.commit() # Committing the session changes
# Sending a POST request to the login route with the non-canonical email and following redirects
r = flask_client.post(
url_for("auth.login"),
data={"email": non_canonical_email, "password": "password"},
follow_redirects=True,
)
# Asserting the response status code and content for expected messages
assert r.status_code == 200
assert name.encode("utf-8") in r.data
flask_client.get(url_for("auth.logout"))
flask_client.get(url_for("auth.logout")) # Logging out the user
# Sending a POST request to the login route with the canonical email and following redirects
r = flask_client.post(
url_for("auth.login"),
data={"email": canonical_email, "password": "password"},
follow_redirects=True,
)
# Asserting the response status code and content for expected messages
assert r.status_code == 200
assert name.encode("utf-8") in r.data

View File

@ -39,15 +39,17 @@ def test_cleanup_tokens(flask_client):
def test_cleanup_users():
u_delete_none_id = create_new_user().id
u_delete_after = create_new_user()
u_delete_after_id = u_delete_after.id
u_delete_before = create_new_user()
u_delete_before_id = u_delete_before.id
u_delete_grace_has_expired = create_new_user()
u_delete_grace_has_expired_id = u_delete_grace_has_expired.id
u_delete_grace_has_not_expired = create_new_user()
u_delete_grace_has_not_expired_id = u_delete_grace_has_not_expired.id
now = arrow.now()
u_delete_after.delete_on = now.shift(minutes=1)
u_delete_before.delete_on = now.shift(minutes=-1)
u_delete_grace_has_expired.delete_on = now.shift(days=-(cron.DELETE_GRACE_DAYS + 1))
u_delete_grace_has_not_expired.delete_on = now.shift(
days=-(cron.DELETE_GRACE_DAYS - 1)
)
Session.flush()
cron.clear_users_scheduled_to_be_deleted()
assert User.get(u_delete_none_id) is not None
assert User.get(u_delete_after_id) is not None
assert User.get(u_delete_before_id) is None
assert User.get(u_delete_grace_has_not_expired_id) is not None
assert User.get(u_delete_grace_has_expired_id) is None

View File

@ -57,6 +57,7 @@ from tests.utils import (
login,
load_eml_file,
create_new_user,
random_email,
random_domain,
random_token,
)
@ -186,13 +187,14 @@ def test_parse_full_address():
def test_send_email_with_rate_control(flask_client):
user = create_new_user()
email = random_email()
for _ in range(MAX_ALERT_24H):
assert send_email_with_rate_control(
user, "test alert type", "abcd@gmail.com", "subject", "plaintext"
user, "test alert type", email, "subject", "plaintext"
)
assert not send_email_with_rate_control(
user, "test alert type", "abcd@gmail.com", "subject", "plaintext"
user, "test alert type", email, "subject", "plaintext"
)