diff --git a/app/app/auth/views/google.py b/app/app/auth/views/google.py index 25f45d3..82ab682 100644 --- a/app/app/auth/views/google.py +++ b/app/app/auth/views/google.py @@ -7,7 +7,7 @@ from app.config import URL, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET from app.db import Session from app.log import LOG from app.models import User, File, SocialAuth -from app.utils import random_string, sanitize_email +from app.utils import random_string, sanitize_email, sanitize_next_url from .login_utils import after_login _authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth" @@ -29,7 +29,7 @@ def google_login(): # to avoid flask-login displaying the login error message session.pop("_flashes", None) - next_url = request.args.get("next") + next_url = sanitize_next_url(request.args.get("next")) # Google does not allow to append param to redirect_url # we need to pass the next url by session diff --git a/app/email_handler.py b/app/email_handler.py index 0ac8e18..efa00f9 100644 --- a/app/email_handler.py +++ b/app/email_handler.py @@ -236,15 +236,16 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con Session.commit() else: try: + contact_email_for_reply = ( + contact_email if is_valid_email(contact_email) else "" + ) contact = Contact.create( user_id=alias.user_id, alias_id=alias.id, website_email=contact_email, name=contact_name, mail_from=mail_from, - reply_email=generate_reply_email(contact_email, alias) - if is_valid_email(contact_email) - else NOREPLY, + reply_email=generate_reply_email(contact_email_for_reply, alias), automatic_created=True, ) if not contact_email: diff --git a/app/oneshot/replace_noreply_in_cotnacts.py b/app/oneshot/replace_noreply_in_cotnacts.py new file mode 100644 index 0000000..13e667a --- /dev/null +++ b/app/oneshot/replace_noreply_in_cotnacts.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +import argparse +import time + + +from app import config +from app.email_utils import generate_reply_email +from app.email_validation import is_valid_email +from app.models import Alias +from app.db import Session + +parser = argparse.ArgumentParser( + prog=f"Replace {config.NOREPLY}", + description=f"Replace {config.NOREPLY} from contacts reply email", +) +args = parser.parse_args() + +el_query = "SELECT id, alias_id, website_email from contact where id>=:last_id AND reply_email=:reply_email ORDER BY id ASC LIMIT :step" +update_query = "UPDATE contact SET reply_email=:reply_email WHERE id=:contact_id " +updated = 0 +start_time = time.time() +step = 100 +last_id = 0 +print(f"Replacing contacts with reply_email={config.NOREPLY}") +while True: + rows = Session.execute( + el_query, {"last_id": last_id, "reply_email": config.NOREPLY, "step": step} + ) + loop_updated = 0 + for row in rows: + contact_id = row[0] + alias_id = row[1] + last_id = contact_id + website_email = row[2] + contact_email_for_reply = website_email if is_valid_email(website_email) else "" + alias = Alias.get(alias_id) + if alias is None: + print(f"CANNOT find alias {alias_id} in database for contact {contact_id}") + reply_email = generate_reply_email(contact_email_for_reply, alias) + print( + f"Replacing contact {contact_id} with {website_email} reply_email for {reply_email}" + ) + Session.execute( + update_query, {"contact_id": row[0], "reply_email": reply_email} + ) + Session.commit() + updated += 1 + loop_updated += 1 + elapsed = time.time() - start_time + print(f"\rContact {last_id} done") + if loop_updated == 0: + break +print("") diff --git a/app/tests/auth/test_login.py b/app/tests/auth/test_login.py index fa00df8..915e0b6 100644 --- a/app/tests/auth/test_login.py +++ b/app/tests/auth/test_login.py @@ -6,16 +6,27 @@ from tests.utils import create_new_user def test_unactivated_user_login(flask_client): - user = create_new_user() - user.activated = False - Session.commit() + """ + Test function for logging in with an unactivated user. + Steps: + 1. Creates a new user. + 2. Sets the user's activated status to False. + 3. Sends a POST request to the login route with user credentials. + 4. Checks the response status code and content for expected messages. + """ + user = create_new_user() # Creating a new user + user.activated = False # Setting the user's activated status to False + Session.commit() # Committing the session changes + + # Sending a POST request to the login route with user credentials and following redirects r = flask_client.post( url_for("auth.login"), data={"email": user.email, "password": "password"}, follow_redirects=True, ) + # Asserting the response status code and content for expected messages assert r.status_code == 200 assert ( b"Please check your inbox for the activation email. You can also have this email re-sent" @@ -24,59 +35,98 @@ def test_unactivated_user_login(flask_client): def test_non_canonical_login(flask_client): - email = f"pre.{random_string(10)}@gmail.com" - name = f"NAME-{random_string(10)}" - user = create_new_user(email, name) - Session.commit() + """ + Test function for logging in with a non-canonical email. + Steps: + 1. Creates a new user with a non-canonical email. + 2. Sends a POST request to the login route with user credentials. + 3. Checks the response status code and content for expected messages. + 4. Checks the canonicalization of the email. + 5. Logs out the user. + 6. Sends a POST request to the login route with the canonicalized email. + 7. Checks the response status code and content for expected messages. + """ + email = f"pre.{random_string(10)}@gmail.com" # Generating a non-canonical email + name = f"NAME-{random_string(10)}" # Generating a random name + user = create_new_user( + email, name + ) # Creating a new user with the generated email and name + Session.commit() # Committing the session changes + + # Sending a POST request to the login route with user credentials and following redirects r = flask_client.post( url_for("auth.login"), data={"email": user.email, "password": "password"}, follow_redirects=True, ) + # Asserting the response status code and content for expected messages assert r.status_code == 200 assert name.encode("utf-8") in r.data + # Canonicalizing the email canonical_email = canonicalize_email(email) - assert canonical_email != email + assert ( + canonical_email != email + ) # Checking if the canonical email is different from the original email - flask_client.get(url_for("auth.logout")) + flask_client.get(url_for("auth.logout")) # Logging out the user + # Sending a POST request to the login route with the canonicalized email and following redirects r = flask_client.post( url_for("auth.login"), data={"email": canonical_email, "password": "password"}, follow_redirects=True, ) + # Asserting the response status code and content for expected messages assert r.status_code == 200 assert name.encode("utf-8") not in r.data def test_canonical_login_with_non_canonical_email(flask_client): - suffix = f"{random_string(10)}@gmail.com" - canonical_email = f"pre{suffix}" - non_canonical_email = f"pre.{suffix}" - name = f"NAME-{random_string(10)}" - create_new_user(canonical_email, name) - Session.commit() + """ + Test function for logging in with a canonical email and a non-canonical email. + Steps: + 1. Generates canonical and non-canonical email addresses. + 2. Creates a new user with the canonical email. + 3. Sends a POST request to the login route with the non-canonical email. + 4. Checks the response status code and content for expected messages. + 5. Logs out the user. + 6. Sends a POST request to the login route with the canonical email. + 7. Checks the response status code and content for expected messages. + """ + suffix = f"{random_string(10)}@gmail.com" # Generating a random suffix for emails + canonical_email = f"pre{suffix}" # Generating a canonical email + non_canonical_email = f"pre.{suffix}" # Generating a non-canonical email + name = f"NAME-{random_string(10)}" # Generating a random name + create_new_user( + canonical_email, name + ) # Creating a new user with the canonical email + Session.commit() # Committing the session changes + + # Sending a POST request to the login route with the non-canonical email and following redirects r = flask_client.post( url_for("auth.login"), data={"email": non_canonical_email, "password": "password"}, follow_redirects=True, ) + # Asserting the response status code and content for expected messages assert r.status_code == 200 assert name.encode("utf-8") in r.data - flask_client.get(url_for("auth.logout")) + flask_client.get(url_for("auth.logout")) # Logging out the user + # Sending a POST request to the login route with the canonical email and following redirects r = flask_client.post( url_for("auth.login"), data={"email": canonical_email, "password": "password"}, follow_redirects=True, ) + # Asserting the response status code and content for expected messages assert r.status_code == 200 assert name.encode("utf-8") in r.data