This commit is contained in:
MrMeeb 2024-02-27 12:00:07 +00:00
parent f5bce7d7ff
commit f1110506c0
4 changed files with 125 additions and 21 deletions

View File

@ -7,7 +7,7 @@ from app.config import URL, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET
from app.db import Session from app.db import Session
from app.log import LOG from app.log import LOG
from app.models import User, File, SocialAuth from app.models import User, File, SocialAuth
from app.utils import random_string, sanitize_email from app.utils import random_string, sanitize_email, sanitize_next_url
from .login_utils import after_login from .login_utils import after_login
_authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth" _authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth"
@ -29,7 +29,7 @@ def google_login():
# to avoid flask-login displaying the login error message # to avoid flask-login displaying the login error message
session.pop("_flashes", None) session.pop("_flashes", None)
next_url = request.args.get("next") next_url = sanitize_next_url(request.args.get("next"))
# Google does not allow to append param to redirect_url # Google does not allow to append param to redirect_url
# we need to pass the next url by session # we need to pass the next url by session

View File

@ -236,15 +236,16 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
Session.commit() Session.commit()
else: else:
try: try:
contact_email_for_reply = (
contact_email if is_valid_email(contact_email) else ""
)
contact = Contact.create( contact = Contact.create(
user_id=alias.user_id, user_id=alias.user_id,
alias_id=alias.id, alias_id=alias.id,
website_email=contact_email, website_email=contact_email,
name=contact_name, name=contact_name,
mail_from=mail_from, mail_from=mail_from,
reply_email=generate_reply_email(contact_email, alias) reply_email=generate_reply_email(contact_email_for_reply, alias),
if is_valid_email(contact_email)
else NOREPLY,
automatic_created=True, automatic_created=True,
) )
if not contact_email: if not contact_email:

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
import argparse
import time
from app import config
from app.email_utils import generate_reply_email
from app.email_validation import is_valid_email
from app.models import Alias
from app.db import Session
parser = argparse.ArgumentParser(
prog=f"Replace {config.NOREPLY}",
description=f"Replace {config.NOREPLY} from contacts reply email",
)
args = parser.parse_args()
el_query = "SELECT id, alias_id, website_email from contact where id>=:last_id AND reply_email=:reply_email ORDER BY id ASC LIMIT :step"
update_query = "UPDATE contact SET reply_email=:reply_email WHERE id=:contact_id "
updated = 0
start_time = time.time()
step = 100
last_id = 0
print(f"Replacing contacts with reply_email={config.NOREPLY}")
while True:
rows = Session.execute(
el_query, {"last_id": last_id, "reply_email": config.NOREPLY, "step": step}
)
loop_updated = 0
for row in rows:
contact_id = row[0]
alias_id = row[1]
last_id = contact_id
website_email = row[2]
contact_email_for_reply = website_email if is_valid_email(website_email) else ""
alias = Alias.get(alias_id)
if alias is None:
print(f"CANNOT find alias {alias_id} in database for contact {contact_id}")
reply_email = generate_reply_email(contact_email_for_reply, alias)
print(
f"Replacing contact {contact_id} with {website_email} reply_email for {reply_email}"
)
Session.execute(
update_query, {"contact_id": row[0], "reply_email": reply_email}
)
Session.commit()
updated += 1
loop_updated += 1
elapsed = time.time() - start_time
print(f"\rContact {last_id} done")
if loop_updated == 0:
break
print("")

View File

@ -6,16 +6,27 @@ from tests.utils import create_new_user
def test_unactivated_user_login(flask_client): def test_unactivated_user_login(flask_client):
user = create_new_user() """
user.activated = False Test function for logging in with an unactivated user.
Session.commit()
Steps:
1. Creates a new user.
2. Sets the user's activated status to False.
3. Sends a POST request to the login route with user credentials.
4. Checks the response status code and content for expected messages.
"""
user = create_new_user() # Creating a new user
user.activated = False # Setting the user's activated status to False
Session.commit() # Committing the session changes
# Sending a POST request to the login route with user credentials and following redirects
r = flask_client.post( r = flask_client.post(
url_for("auth.login"), url_for("auth.login"),
data={"email": user.email, "password": "password"}, data={"email": user.email, "password": "password"},
follow_redirects=True, follow_redirects=True,
) )
# Asserting the response status code and content for expected messages
assert r.status_code == 200 assert r.status_code == 200
assert ( assert (
b"Please check your inbox for the activation email. You can also have this email re-sent" b"Please check your inbox for the activation email. You can also have this email re-sent"
@ -24,59 +35,98 @@ def test_unactivated_user_login(flask_client):
def test_non_canonical_login(flask_client): def test_non_canonical_login(flask_client):
email = f"pre.{random_string(10)}@gmail.com" """
name = f"NAME-{random_string(10)}" Test function for logging in with a non-canonical email.
user = create_new_user(email, name)
Session.commit()
Steps:
1. Creates a new user with a non-canonical email.
2. Sends a POST request to the login route with user credentials.
3. Checks the response status code and content for expected messages.
4. Checks the canonicalization of the email.
5. Logs out the user.
6. Sends a POST request to the login route with the canonicalized email.
7. Checks the response status code and content for expected messages.
"""
email = f"pre.{random_string(10)}@gmail.com" # Generating a non-canonical email
name = f"NAME-{random_string(10)}" # Generating a random name
user = create_new_user(
email, name
) # Creating a new user with the generated email and name
Session.commit() # Committing the session changes
# Sending a POST request to the login route with user credentials and following redirects
r = flask_client.post( r = flask_client.post(
url_for("auth.login"), url_for("auth.login"),
data={"email": user.email, "password": "password"}, data={"email": user.email, "password": "password"},
follow_redirects=True, follow_redirects=True,
) )
# Asserting the response status code and content for expected messages
assert r.status_code == 200 assert r.status_code == 200
assert name.encode("utf-8") in r.data assert name.encode("utf-8") in r.data
# Canonicalizing the email
canonical_email = canonicalize_email(email) canonical_email = canonicalize_email(email)
assert canonical_email != email assert (
canonical_email != email
) # Checking if the canonical email is different from the original email
flask_client.get(url_for("auth.logout")) flask_client.get(url_for("auth.logout")) # Logging out the user
# Sending a POST request to the login route with the canonicalized email and following redirects
r = flask_client.post( r = flask_client.post(
url_for("auth.login"), url_for("auth.login"),
data={"email": canonical_email, "password": "password"}, data={"email": canonical_email, "password": "password"},
follow_redirects=True, follow_redirects=True,
) )
# Asserting the response status code and content for expected messages
assert r.status_code == 200 assert r.status_code == 200
assert name.encode("utf-8") not in r.data assert name.encode("utf-8") not in r.data
def test_canonical_login_with_non_canonical_email(flask_client): def test_canonical_login_with_non_canonical_email(flask_client):
suffix = f"{random_string(10)}@gmail.com" """
canonical_email = f"pre{suffix}" Test function for logging in with a canonical email and a non-canonical email.
non_canonical_email = f"pre.{suffix}"
name = f"NAME-{random_string(10)}"
create_new_user(canonical_email, name)
Session.commit()
Steps:
1. Generates canonical and non-canonical email addresses.
2. Creates a new user with the canonical email.
3. Sends a POST request to the login route with the non-canonical email.
4. Checks the response status code and content for expected messages.
5. Logs out the user.
6. Sends a POST request to the login route with the canonical email.
7. Checks the response status code and content for expected messages.
"""
suffix = f"{random_string(10)}@gmail.com" # Generating a random suffix for emails
canonical_email = f"pre{suffix}" # Generating a canonical email
non_canonical_email = f"pre.{suffix}" # Generating a non-canonical email
name = f"NAME-{random_string(10)}" # Generating a random name
create_new_user(
canonical_email, name
) # Creating a new user with the canonical email
Session.commit() # Committing the session changes
# Sending a POST request to the login route with the non-canonical email and following redirects
r = flask_client.post( r = flask_client.post(
url_for("auth.login"), url_for("auth.login"),
data={"email": non_canonical_email, "password": "password"}, data={"email": non_canonical_email, "password": "password"},
follow_redirects=True, follow_redirects=True,
) )
# Asserting the response status code and content for expected messages
assert r.status_code == 200 assert r.status_code == 200
assert name.encode("utf-8") in r.data assert name.encode("utf-8") in r.data
flask_client.get(url_for("auth.logout")) flask_client.get(url_for("auth.logout")) # Logging out the user
# Sending a POST request to the login route with the canonical email and following redirects
r = flask_client.post( r = flask_client.post(
url_for("auth.login"), url_for("auth.login"),
data={"email": canonical_email, "password": "password"}, data={"email": canonical_email, "password": "password"},
follow_redirects=True, follow_redirects=True,
) )
# Asserting the response status code and content for expected messages
assert r.status_code == 200 assert r.status_code == 200
assert name.encode("utf-8") in r.data assert name.encode("utf-8") in r.data