This commit is contained in:
2022-12-30 16:23:27 +00:00
parent 02776e8478
commit 20da343c54
1304 changed files with 870224 additions and 0 deletions

View File

681
app/tests/api/test_alias.py Normal file
View File

@ -0,0 +1,681 @@
import arrow
from flask import url_for
# Need to import directly from config to allow modification from the tests
from app import config
from app.db import Session
from app.email_utils import is_reverse_alias
from app.models import User, Alias, Contact, EmailLog, Mailbox
from tests.api.utils import get_new_user_and_api_key
from tests.utils import login, random_domain
def test_get_aliases_error_without_pagination(flask_client):
user, api_key = get_new_user_and_api_key()
r = flask_client.get(
url_for("api.get_aliases"), headers={"Authentication": api_key.code}
)
assert r.status_code == 400
assert r.json["error"]
def test_get_aliases_with_pagination(flask_client):
user, api_key = get_new_user_and_api_key()
# create more aliases than config.PAGE_LIMIT
for _ in range(config.PAGE_LIMIT + 1):
Alias.create_new_random(user)
Session.commit()
# get aliases on the 1st page, should return config.PAGE_LIMIT aliases
r = flask_client.get(
url_for("api.get_aliases", page_id=0), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert len(r.json["aliases"]) == config.PAGE_LIMIT
# assert returned field
for a in r.json["aliases"]:
assert "id" in a
assert "email" in a
assert "creation_date" in a
assert "creation_timestamp" in a
assert "nb_forward" in a
assert "nb_block" in a
assert "nb_reply" in a
assert "enabled" in a
assert "note" in a
# get aliases on the 2nd page, should return 2 aliases
# as the total number of aliases is config.PAGE_LIMIT +2
# 1 alias is created when user is created
r = flask_client.get(
url_for("api.get_aliases", page_id=1), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert len(r.json["aliases"]) == 2
def test_get_aliases_query(flask_client):
user, api_key = get_new_user_and_api_key()
# create more aliases than config.PAGE_LIMIT
Alias.create_new(user, "prefix1")
Alias.create_new(user, "prefix2")
Session.commit()
# get aliases without query, should return 3 aliases as one alias is created when user is created
r = flask_client.get(
url_for("api.get_aliases", page_id=0), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert len(r.json["aliases"]) == 3
# get aliases with "prefix1" query, should return 1 alias
r = flask_client.get(
url_for("api.get_aliases", page_id=0),
headers={"Authentication": api_key.code},
json={"query": "prefix1"},
)
assert r.status_code == 200
assert len(r.json["aliases"]) == 1
def test_get_aliases_v2(flask_client):
user = login(flask_client)
a0 = Alias.create_new(user, "prefix0")
a1 = Alias.create_new(user, "prefix1")
Session.commit()
# << Aliases have no activity >>
r = flask_client.get("/api/v2/aliases?page_id=0")
assert r.status_code == 200
r0 = r.json["aliases"][0]
assert "name" in r0
# make sure a1 is returned before a0
assert r0["email"].startswith("prefix1")
assert "id" in r0["mailbox"]
assert "email" in r0["mailbox"]
assert r0["mailboxes"]
for mailbox in r0["mailboxes"]:
assert "id" in mailbox
assert "email" in mailbox
assert "support_pgp" in r0
assert not r0["support_pgp"]
assert "disable_pgp" in r0
assert not r0["disable_pgp"]
# << Alias has some activities >>
c0 = Contact.create(
user_id=user.id,
alias_id=a0.id,
website_email="c0@example.com",
reply_email="re0@SL",
commit=True,
)
EmailLog.create(
contact_id=c0.id, user_id=user.id, alias_id=c0.alias_id, commit=True
)
# a1 has more recent activity
c1 = Contact.create(
user_id=user.id,
alias_id=a1.id,
website_email="c1@example.com",
reply_email="re1@SL",
commit=True,
)
EmailLog.create(
contact_id=c1.id, user_id=user.id, alias_id=c1.alias_id, commit=True
)
r = flask_client.get("/api/v2/aliases?page_id=0")
assert r.status_code == 200
r0 = r.json["aliases"][0]
assert r0["latest_activity"]["action"] == "forward"
assert "timestamp" in r0["latest_activity"]
assert r0["latest_activity"]["contact"]["email"] == "c1@example.com"
assert "name" in r0["latest_activity"]["contact"]
assert "reverse_alias" in r0["latest_activity"]["contact"]
assert "pinned" in r0
def test_get_pinned_aliases_v2(flask_client):
user = login(flask_client)
a0 = Alias.create_new(user, "prefix0")
a0.pinned = True
Session.commit()
r = flask_client.get("/api/v2/aliases?page_id=0")
assert r.status_code == 200
# the default alias (created when user is created) and a0 are returned
assert len(r.json["aliases"]) == 2
r = flask_client.get("/api/v2/aliases?page_id=0&pinned=true")
assert r.status_code == 200
# only a0 is returned
assert len(r.json["aliases"]) == 1
assert r.json["aliases"][0]["id"] == a0.id
def test_get_disabled_aliases_v2(flask_client):
user = login(flask_client)
a0 = Alias.create_new(user, "prefix0")
a0.enabled = False
Session.commit()
r = flask_client.get("/api/v2/aliases?page_id=0")
assert r.status_code == 200
# the default alias (created when user is created) and a0 are returned
assert len(r.json["aliases"]) == 2
r = flask_client.get("/api/v2/aliases?page_id=0&disabled=true")
assert r.status_code == 200
# only a0 is returned
assert len(r.json["aliases"]) == 1
assert r.json["aliases"][0]["id"] == a0.id
def test_get_enabled_aliases_v2(flask_client):
user = login(flask_client)
a0 = Alias.create_new(user, "prefix0")
a0.enabled = False
Session.commit()
r = flask_client.get("/api/v2/aliases?page_id=0")
assert r.status_code == 200
# the default alias (created when user is created) and a0 are returned
assert len(r.json["aliases"]) == 2
r = flask_client.get("/api/v2/aliases?page_id=0&enabled=true")
assert r.status_code == 200
# only the first alias is returned
assert len(r.json["aliases"]) == 1
assert r.json["aliases"][0]["id"] != a0.id
def test_delete_alias(flask_client):
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.delete(
url_for("api.delete_alias", alias_id=alias.id),
)
assert r.status_code == 200
assert r.json == {"deleted": True}
def test_toggle_alias(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.post(
url_for("api.toggle_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
assert r.json == {"enabled": False}
def test_alias_activities(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
# create some alias log
contact = Contact.create(
website_email="marketing@example.com",
reply_email="reply@a.b",
alias_id=alias.id,
user_id=alias.user_id,
)
Session.commit()
for _ in range(int(config.PAGE_LIMIT / 2)):
EmailLog.create(
contact_id=contact.id,
is_reply=True,
user_id=contact.user_id,
alias_id=contact.alias_id,
)
for _ in range(int(config.PAGE_LIMIT / 2) + 2):
EmailLog.create(
contact_id=contact.id,
blocked=True,
user_id=contact.user_id,
alias_id=contact.alias_id,
)
r = flask_client.get(
url_for("api.get_alias_activities", alias_id=alias.id, page_id=0),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
assert len(r.json["activities"]) == config.PAGE_LIMIT
for ac in r.json["activities"]:
assert ac["from"]
assert ac["to"]
assert ac["timestamp"]
assert ac["action"]
assert ac["reverse_alias"]
assert ac["reverse_alias_address"]
# second page, should return 1 or 2 results only
r = flask_client.get(
url_for("api.get_alias_activities", alias_id=alias.id, page_id=1),
headers={"Authentication": api_key.code},
)
assert len(r.json["activities"]) < 3
def test_update_alias(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"note": "test note"},
)
assert r.status_code == 200
def test_update_alias_mailbox(flask_client):
user, api_key = get_new_user_and_api_key()
mb = Mailbox.create(user_id=user.id, email="ab@cd.com", verified=True)
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"mailbox_id": mb.id},
)
assert r.status_code == 200
# fail when update with non-existing mailbox
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"mailbox_id": -1},
)
assert r.status_code == 400
def test_update_alias_name(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"name": "Test Name"},
)
assert r.status_code == 200
alias = Alias.get(alias.id)
assert alias.name == "Test Name"
# update name with linebreak
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"name": "Test \nName"},
)
assert r.status_code == 200
alias = Alias.get(alias.id)
assert alias.name == "Test Name"
def test_update_alias_mailboxes(flask_client):
user, api_key = get_new_user_and_api_key()
mb1 = Mailbox.create(user_id=user.id, email="ab1@cd.com", verified=True)
mb2 = Mailbox.create(user_id=user.id, email="ab2@cd.com", verified=True)
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"mailbox_ids": [mb1.id, mb2.id]},
)
assert r.status_code == 200
alias = Alias.get(alias.id)
assert alias.mailbox
assert len(alias._mailboxes) == 1
# fail when update with empty mailboxes
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"mailbox_ids": []},
)
assert r.status_code == 400
def test_update_disable_pgp(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
assert not alias.disable_pgp
r = flask_client.put(
url_for("api.update_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"disable_pgp": True},
)
assert r.status_code == 200
alias = Alias.get(alias.id)
assert alias.disable_pgp
def test_update_pinned(flask_client):
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
assert not alias.pinned
r = flask_client.patch(
url_for("api.update_alias", alias_id=alias.id),
json={"pinned": True},
)
assert r.status_code == 200
assert alias.pinned
def test_alias_contacts(flask_client):
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
# create some alias log
for i in range(config.PAGE_LIMIT + 1):
contact = Contact.create(
website_email=f"marketing-{i}@example.com",
reply_email=f"reply-{i}@a.b",
alias_id=alias.id,
user_id=alias.user_id,
)
Session.commit()
EmailLog.create(
contact_id=contact.id,
is_reply=True,
user_id=contact.user_id,
alias_id=contact.alias_id,
)
Session.commit()
r = flask_client.get(f"/api/aliases/{alias.id}/contacts?page_id=0")
assert r.status_code == 200
assert len(r.json["contacts"]) == config.PAGE_LIMIT
for ac in r.json["contacts"]:
assert ac["creation_date"]
assert ac["creation_timestamp"]
assert ac["last_email_sent_date"]
assert ac["last_email_sent_timestamp"]
assert ac["contact"]
assert ac["reverse_alias"]
assert ac["reverse_alias_address"]
assert "block_forward" in ac
# second page, should return 1 result only
r = flask_client.get(f"/api/aliases/{alias.id}/contacts?page_id=1")
assert len(r.json["contacts"]) == 1
def test_create_contact_route(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": "First Last <first@example.com>"},
)
assert r.status_code == 201
assert r.json["contact"] == "first@example.com"
assert "creation_date" in r.json
assert "creation_timestamp" in r.json
assert r.json["last_email_sent_date"] is None
assert r.json["last_email_sent_timestamp"] is None
assert r.json["reverse_alias"]
assert r.json["reverse_alias_address"]
assert r.json["existed"] is False
# re-add a contact, should return 200
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": "First2 Last2 <first@example.com>"},
)
assert r.status_code == 200
assert r.json["existed"]
def test_create_contact_route_invalid_alias(flask_client):
user, api_key = get_new_user_and_api_key()
other_user, other_api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(other_user)
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": "First Last <first@example.com>"},
)
assert r.status_code == 403
def test_create_contact_route_free_users(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
# On trial, should be ok
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# End trial but allow via flags for older free users
user.trial_end = arrow.now()
user.flags = 0
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# End trial and disallow for new free users. Config should allow it
user.flags = User.FLAG_FREE_DISABLE_CREATE_ALIAS
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# Set the global config to disable free users from create contacts
config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = True
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 403
config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = False
def test_create_contact_route_empty_contact_address(flask_client):
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
json={"contact": ""},
)
assert r.status_code == 400
assert r.json["error"] == "Empty address is not a valid email address"
def test_create_contact_route_invalid_contact_email(flask_client):
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
json={"contact": "@gmail.com"},
)
assert r.status_code == 400
assert r.json["error"] == "@gmail.com is not a valid email address"
def test_delete_contact(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
alias_id=alias.id,
website_email="contact@example.com",
reply_email="reply+random@sl.io",
user_id=alias.user_id,
)
Session.commit()
r = flask_client.delete(
url_for("api.delete_contact", contact_id=contact.id),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
assert r.json == {"deleted": True}
def test_get_alias(flask_client):
user, api_key = get_new_user_and_api_key()
# create more aliases than config.PAGE_LIMIT
alias = Alias.create_new_random(user)
Session.commit()
# get aliases on the 1st page, should return config.PAGE_LIMIT aliases
r = flask_client.get(
url_for("api.get_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
# assert returned field
res = r.json
assert "id" in res
assert "email" in res
assert "creation_date" in res
assert "creation_timestamp" in res
assert "nb_forward" in res
assert "nb_block" in res
assert "nb_reply" in res
assert "enabled" in res
assert "note" in res
assert "pinned" in res
def test_is_reverse_alias(flask_client):
assert is_reverse_alias("ra+abcd@sl.local")
assert is_reverse_alias("reply+abcd@sl.local")
assert not is_reverse_alias("ra+abcd@test.org")
assert not is_reverse_alias("reply+abcd@test.org")
assert not is_reverse_alias("abcd@test.org")
def test_toggle_contact(flask_client):
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
alias_id=alias.id,
website_email="contact@example.com",
reply_email="reply+random@sl.io",
user_id=alias.user_id,
)
Session.commit()
r = flask_client.post(f"/api/contacts/{contact.id}/toggle")
assert r.status_code == 200
assert r.json == {"block_forward": True}
def test_get_aliases_disabled_account(flask_client):
user, api_key = get_new_user_and_api_key()
r = flask_client.get(
"/api/v2/aliases?page_id=0",
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
user.disabled = True
Session.commit()
r = flask_client.get(
"/api/v2/aliases?page_id=0",
headers={"Authentication": api_key.code},
)
assert r.status_code == 403

View File

@ -0,0 +1,127 @@
from flask import url_for
from app.db import Session
from app.models import AliasUsedOn, Alias
from tests.api.utils import get_new_user_and_api_key
from tests.utils import login
def test_different_scenarios_v4(flask_client):
user, api_key = get_new_user_and_api_key()
# <<< without hostname >>>
r = flask_client.get(
"/api/v4/alias/options", headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert r.json["can_create"]
assert r.json["suffixes"]
assert r.json["prefix_suggestion"] == "" # no hostname => no suggestion
# <<< with hostname >>>
r = flask_client.get(
url_for("api.options_v4", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["prefix_suggestion"] == "test"
# <<< with recommendation >>>
alias = Alias.create_new(user, prefix="test")
Session.commit()
AliasUsedOn.create(
alias_id=alias.id, hostname="www.test.com", user_id=alias.user_id
)
Session.commit()
r = flask_client.get(
url_for("api.options_v4", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"
def test_different_scenarios_v4_2(flask_client):
user, api_key = get_new_user_and_api_key()
# <<< without hostname >>>
r = flask_client.get(
url_for("api.options_v4"), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert r.json["can_create"]
assert r.json["suffixes"]
assert r.json["prefix_suggestion"] == "" # no hostname => no suggestion
for (suffix, signed_suffix) in r.json["suffixes"]:
assert signed_suffix.startswith(suffix)
# <<< with hostname >>>
r = flask_client.get(
url_for("api.options_v4", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["prefix_suggestion"] == "test"
# <<< with recommendation >>>
alias = Alias.create_new(user, prefix="test")
Session.commit()
AliasUsedOn.create(
alias_id=alias.id, hostname="www.test.com", user_id=alias.user_id
)
Session.commit()
r = flask_client.get(
url_for("api.options_v4", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"
def test_different_scenarios_v5(flask_client):
user = login(flask_client)
# <<< without hostname >>>
r = flask_client.get("/api/v5/alias/options")
assert r.status_code == 200
assert r.json["can_create"]
assert r.json["suffixes"]
assert r.json["prefix_suggestion"] == "" # no hostname => no suggestion
for suffix_payload in r.json["suffixes"]:
suffix, signed_suffix = (
suffix_payload["suffix"],
suffix_payload["signed_suffix"],
)
assert signed_suffix.startswith(suffix)
assert "is_custom" in suffix_payload
assert "is_premium" in suffix_payload
# <<< with hostname >>>
r = flask_client.get("/api/v5/alias/options?hostname=www.test.com")
assert r.json["prefix_suggestion"] == "test"
# <<< with hostname with 2 parts TLD, for example wwww.numberoneshoes.co.nz >>>
r = flask_client.get("/api/v5/alias/options?hostname=wwww.numberoneshoes.co.nz")
assert r.json["prefix_suggestion"] == "numberoneshoes"
# <<< with recommendation >>>
alias = Alias.create_new(user, prefix="test")
Session.commit()
AliasUsedOn.create(
alias_id=alias.id, hostname="www.test.com", user_id=alias.user_id
)
Session.commit()
r = flask_client.get(url_for("api.options_v4", hostname="www.test.com"))
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"

201
app/tests/api/test_apple.py Normal file

File diff suppressed because one or more lines are too long

272
app/tests/api/test_auth.py Normal file
View File

@ -0,0 +1,272 @@
import pytest
import unicodedata
from flask import url_for
from app import config
from app.db import Session
from app.models import User, AccountActivation
from tests.utils import random_email
PASSWORD_1 = "Aurélie"
PASSWORD_2 = unicodedata.normalize("NFKD", PASSWORD_1)
assert PASSWORD_1 != PASSWORD_2
def setup_module():
config.SKIP_MX_LOOKUP_ON_CHECK = True
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False
@pytest.mark.parametrize("mfa", (True, False), ids=("MFA", "no MFA"))
def test_auth_login_success(flask_client, mfa: bool):
email = random_email()
User.create(
email=email,
password=PASSWORD_1,
name="Test User",
activated=True,
enable_otp=mfa,
)
Session.commit()
r = flask_client.post(
"/api/auth/login",
json={
"email": email,
"password": PASSWORD_2,
"device": "Test Device",
},
)
assert r.status_code == 200
assert r.json["name"] == "Test User"
assert r.json["email"]
if mfa:
assert r.json["api_key"] is None
assert r.json["mfa_enabled"]
assert r.json["mfa_key"]
else:
assert r.json["api_key"]
assert not r.json["mfa_enabled"]
assert r.json["mfa_key"] is None
def test_auth_login_device_exist(flask_client):
email = random_email()
User.create(email=email, password="password", name="Test User", activated=True)
Session.commit()
r = flask_client.post(
url_for("api.auth_login"),
json={
"email": email,
"password": "password",
"device": "Test Device",
},
)
assert r.status_code == 200
api_key = r.json["api_key"]
assert not r.json["mfa_enabled"]
assert r.json["mfa_key"] is None
assert r.json["name"] == "Test User"
# same device, should return same api_key
r = flask_client.post(
url_for("api.auth_login"),
json={
"email": email,
"password": "password",
"device": "Test Device",
},
)
assert r.json["api_key"] == api_key
def test_auth_register_success(flask_client):
email = random_email()
assert AccountActivation.first() is None
r = flask_client.post(
url_for("api.auth_register"),
json={"email": email, "password": "password"},
)
assert r.status_code == 200
assert r.json["msg"]
# make sure an activation code is created
act_code = AccountActivation.first()
assert act_code
assert len(act_code.code) == 6
assert act_code.tries == 3
def test_auth_register_too_short_password(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": email, "password": "short"},
)
assert r.status_code == 400
assert r.json["error"] == "password too short"
def test_auth_register_too_long_password(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": email, "password": "0123456789" * 11},
)
assert r.status_code == 400
assert r.json["error"] == "password too long"
def test_auth_activate_success(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": email, "password": "password"},
)
assert r.status_code == 200
assert r.json["msg"]
# get the activation code
act_code = AccountActivation.first()
assert act_code
assert len(act_code.code) == 6
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": email, "code": act_code.code},
)
assert r.status_code == 200
def test_auth_activate_wrong_email(flask_client):
r = flask_client.post(
url_for("api.auth_activate"), json={"email": "abcd@gmail.com", "code": "123456"}
)
assert r.status_code == 400
def test_auth_activate_user_already_activated(flask_client):
email = random_email()
User.create(email=email, password="password", name="Test User", activated=True)
Session.commit()
r = flask_client.post(
url_for("api.auth_activate"), json={"email": email, "code": "123456"}
)
assert r.status_code == 400
def test_auth_activate_wrong_code(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": email, "password": "password"},
)
assert r.status_code == 200
assert r.json["msg"]
# get the activation code
act_code = AccountActivation.first()
assert act_code
assert len(act_code.code) == 6
assert act_code.tries == 3
# make sure to create a wrong code
wrong_code = act_code.code + "123"
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": email, "code": wrong_code},
)
assert r.status_code == 400
# make sure the nb tries decrements
act_code = AccountActivation.first()
assert act_code.tries == 2
def test_auth_activate_too_many_wrong_code(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": email, "password": "password"},
)
assert r.status_code == 200
assert r.json["msg"]
# get the activation code
act_code = AccountActivation.first()
assert act_code
assert len(act_code.code) == 6
assert act_code.tries == 3
# make sure to create a wrong code
wrong_code = act_code.code + "123"
for _ in range(2):
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": email, "code": wrong_code},
)
assert r.status_code == 400
# the activation code is deleted
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": email, "code": wrong_code},
)
assert r.status_code == 410
# make sure the nb tries decrements
assert AccountActivation.first() is None
def test_auth_reactivate_success(flask_client):
email = random_email()
User.create(email=email, password="password", name="Test User")
Session.commit()
r = flask_client.post(url_for("api.auth_reactivate"), json={"email": email})
assert r.status_code == 200
# make sure an activation code is created
act_code = AccountActivation.first()
assert act_code
assert len(act_code.code) == 6
assert act_code.tries == 3
def test_auth_login_forgot_password(flask_client):
email = random_email()
User.create(email=email, password="password", name="Test User", activated=True)
Session.commit()
r = flask_client.post(
url_for("api.forgot_password"),
json={"email": email},
)
assert r.status_code == 200
# No such email, still return 200
r = flask_client.post(
url_for("api.forgot_password"),
json={"email": random_email()},
)
assert r.status_code == 200

View File

@ -0,0 +1,46 @@
import pyotp
from flask import url_for
from itsdangerous import Signer
from app.config import FLASK_SECRET
from tests.utils import create_new_user
def test_auth_mfa_success(flask_client):
user = create_new_user()
user.enable_otp = True
user.otp_secret = "base32secret3232"
totp = pyotp.TOTP(user.otp_secret)
s = Signer(FLASK_SECRET)
mfa_key = s.sign(str(user.id))
r = flask_client.post(
url_for("api.auth_mfa"),
json={"mfa_token": totp.now(), "mfa_key": mfa_key, "device": "Test Device"},
)
assert r.status_code == 200
assert r.json["api_key"]
assert r.json["email"]
assert r.json["name"] == "Test User"
def test_auth_wrong_mfa_key(flask_client):
user = create_new_user()
user.enable_otp = True
user.otp_secret = "base32secret3232"
totp = pyotp.TOTP(user.otp_secret)
r = flask_client.post(
url_for("api.auth_mfa"),
json={
"mfa_token": totp.now(),
"mfa_key": "wrong mfa key",
"device": "Test Device",
},
)
assert r.status_code == 400
assert r.json["error"]

View File

@ -0,0 +1,116 @@
from app.alias_utils import delete_alias
from app.models import CustomDomain, Alias, Mailbox
from tests.utils import login
def test_get_custom_domains(flask_client):
user = login(flask_client)
CustomDomain.create(user_id=user.id, domain="test1.org", verified=True, commit=True)
CustomDomain.create(
user_id=user.id, domain="test2.org", verified=False, commit=True
)
r = flask_client.get(
"/api/custom_domains",
)
assert r.status_code == 200
assert len(r.json["custom_domains"]) == 2
for domain in r.json["custom_domains"]:
assert domain["domain_name"]
assert domain["id"]
assert domain["nb_alias"] == 0
assert "is_verified" in domain
assert "catch_all" in domain
assert "name" in domain
assert "random_prefix_generation" in domain
assert domain["creation_date"]
assert domain["creation_timestamp"]
assert domain["mailboxes"]
for mailbox in domain["mailboxes"]:
assert "id" in mailbox
assert "email" in mailbox
def test_update_custom_domains(flask_client):
user = login(flask_client)
d1 = CustomDomain.create(
user_id=user.id, domain="test1.org", verified=True, commit=True
)
# test update catch all
assert d1.catch_all is False
r = flask_client.patch(f"/api/custom_domains/{d1.id}", json={"catch_all": True})
assert r.status_code == 200
assert d1.catch_all is True
# make sure the full domain json is returned
cd_json = r.json["custom_domain"]
assert cd_json["domain_name"]
assert cd_json["id"] == d1.id
assert cd_json["nb_alias"] == 0
assert "is_verified" in cd_json
assert "catch_all" in cd_json
assert "name" in cd_json
assert "random_prefix_generation" in cd_json
assert cd_json["creation_date"]
assert cd_json["creation_timestamp"]
assert cd_json["mailboxes"]
for mailbox in cd_json["mailboxes"]:
assert "id" in mailbox
assert "email" in mailbox
# test update random_prefix_generation
assert d1.random_prefix_generation is False
r = flask_client.patch(
f"/api/custom_domains/{d1.id}", json={"random_prefix_generation": True}
)
assert r.status_code == 200
assert d1.random_prefix_generation is True
# test update name
assert d1.name is None
r = flask_client.patch(f"/api/custom_domains/{d1.id}", json={"name": "test name"})
assert r.status_code == 200
assert d1.name == "test name"
# test update mailboxes
assert d1.mailboxes == [user.default_mailbox]
mb = Mailbox.create(
user_id=user.id, email="test@example.org", verified=True, commit=True
)
r = flask_client.patch(
f"/api/custom_domains/{d1.id}", json={"mailbox_ids": [mb.id]}
)
assert r.status_code == 200
assert d1.mailboxes == [mb]
def test_get_custom_domain_trash(flask_client):
user = login(flask_client)
cd = CustomDomain.create(
user_id=user.id, domain="test1.org", verified=True, commit=True
)
alias = Alias.create(
user_id=user.id,
email="first@test1.org",
custom_domain_id=cd.id,
mailbox_id=user.default_mailbox_id,
commit=True,
)
delete_alias(alias, user)
r = flask_client.get(
f"/api/custom_domains/{cd.id}/trash",
)
for deleted_alias in r.json["aliases"]:
assert deleted_alias["alias"]
assert deleted_alias["deletion_timestamp"] > 0

View File

@ -0,0 +1,141 @@
from app.db import Session
from app.import_utils import import_from_csv
from app.models import (
CustomDomain,
Mailbox,
Alias,
BatchImport,
File,
)
from tests.utils_test_alias import alias_export
from tests.utils import login, random_domain, random_token
def test_export(flask_client):
alias_export(flask_client, "api.export_aliases")
def test_import_no_mailboxes_no_domains(flask_client):
# Create user
user = login(flask_client)
# Check start state
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # Onboarding alias
alias_data = [
"alias,note",
"ebay@my-domain.com,Used on eBay",
'facebook@my-domain.com,"Used on Facebook, Instagram."',
]
file = File.create(path=f"/{random_token()}", commit=True)
batch_import = BatchImport.create(user_id=user.id, file_id=file.id, commit=True)
import_from_csv(batch_import, user, alias_data)
# Should have failed to import anything new because my-domain.com isn't registered
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # +0
def test_import_no_mailboxes(flask_client):
# Create user
user = login(flask_client)
# Check start state
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # Onboarding alias
domain = random_domain()
# Create domain
CustomDomain.create(user_id=user.id, domain=domain, ownership_verified=True)
Session.commit()
alias_data = [
"alias,note",
f"ebay@{domain},Used on eBay",
f'facebook@{domain},"Used on Facebook, Instagram."',
]
file = File.create(path=f"/{random_token()}", commit=True)
batch_import = BatchImport.create(user_id=user.id, file_id=file.id)
import_from_csv(batch_import, user, alias_data)
assert len(Alias.filter_by(user_id=user.id).all()) == 3 # +2
def test_import_no_domains(flask_client):
# Create user
user = login(flask_client)
# Check start state
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # Onboarding alias
alias_data = [
"alias,note,mailboxes",
"ebay@my-domain.com,Used on eBay,destination@my-destination-domain.com",
'facebook@my-domain.com,"Used on Facebook, Instagram.",destination1@my-destination-domain.com destination2@my-destination-domain.com',
]
file = File.create(path=f"/{random_token()}", commit=True)
batch_import = BatchImport.create(user_id=user.id, file_id=file.id)
import_from_csv(batch_import, user, alias_data)
# Should have failed to import anything new because my-domain.com isn't registered
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # +0
def test_import(flask_client):
# Create user
user = login(flask_client)
# Check start state
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # Onboarding alias
domain1 = random_domain()
domain2 = random_domain()
# Create domains
CustomDomain.create(user_id=user.id, domain=domain1, ownership_verified=True)
CustomDomain.create(user_id=user.id, domain=domain2, ownership_verified=True)
Session.commit()
# Create mailboxes
mailbox1 = Mailbox.create(
user_id=user.id, email=f"destination@{domain2}", verified=True
)
mailbox2 = Mailbox.create(
user_id=user.id, email=f"destination2@{domain2}", verified=True
)
Session.commit()
alias_data = [
"alias,note,mailboxes",
f"ebay@{domain1},Used on eBay,destination@{domain2}",
f'facebook@{domain1},"Used on Facebook, Instagram.",destination@{domain2} destination2@{domain2}',
]
file = File.create(path=f"/{random_token()}", commit=True)
batch_import = BatchImport.create(user_id=user.id, file_id=file.id)
import_from_csv(batch_import, user, alias_data)
aliases = Alias.filter_by(user_id=user.id).order_by(Alias.id).all()
assert len(aliases) == 3 # +2
# aliases[0] is the onboarding alias, skip it
# eBay alias
assert aliases[1].email == f"ebay@{domain1}"
assert len(aliases[1].mailboxes) == 1
# First one should be primary
assert aliases[1].mailbox_id == mailbox1.id
# Others are sorted
assert aliases[1].mailboxes[0] == mailbox1
# Facebook alias
assert aliases[2].email == f"facebook@{domain1}"
assert len(aliases[2].mailboxes) == 2
# First one should be primary
assert aliases[2].mailbox_id == mailbox1.id
# Others are sorted
assert aliases[2].mailboxes[0] == mailbox2
assert aliases[2].mailboxes[1] == mailbox1

View File

@ -0,0 +1,191 @@
from flask import url_for
from app.db import Session
from app.models import Mailbox
from tests.utils import login
def test_create_mailbox(flask_client):
login(flask_client)
r = flask_client.post(
"/api/mailboxes",
json={"email": "mailbox@gmail.com"},
)
assert r.status_code == 201
assert r.json["email"] == "mailbox@gmail.com"
assert r.json["verified"] is False
assert r.json["id"] > 0
assert r.json["default"] is False
assert r.json["nb_alias"] == 0
# invalid email address
r = flask_client.post(
"/api/mailboxes",
json={"email": "gmail.com"},
)
assert r.status_code == 400
assert r.json == {"error": "gmail.com invalid"}
def test_create_mailbox_fail_for_free_user(flask_client):
user = login(flask_client)
user.trial_end = None
Session.commit()
r = flask_client.post(
"/api/mailboxes",
json={"email": "mailbox@gmail.com"},
)
assert r.status_code == 400
assert r.json == {"error": "Only premium plan can add additional mailbox"}
def test_delete_mailbox(flask_client):
user = login(flask_client)
# create a mailbox
mb = Mailbox.create(user_id=user.id, email="mb@gmail.com")
Session.commit()
r = flask_client.delete(
f"/api/mailboxes/{mb.id}",
)
assert r.status_code == 200
def test_delete_default_mailbox(flask_client):
user = login(flask_client)
# assert user cannot delete the default mailbox
r = flask_client.delete(
url_for("api.delete_mailbox", mailbox_id=user.default_mailbox_id),
)
assert r.status_code == 400
def test_set_mailbox_as_default(flask_client):
user = login(flask_client)
mb = Mailbox.create(
user_id=user.id, email="mb@gmail.com", verified=True, commit=True
)
assert user.default_mailbox_id != mb.id
r = flask_client.put(
f"/api/mailboxes/{mb.id}",
json={"default": True},
)
assert r.status_code == 200
assert user.default_mailbox_id == mb.id
# <<< Cannot set an unverified mailbox as default >>>
mb.verified = False
Session.commit()
r = flask_client.put(
f"/api/mailboxes/{mb.id}",
json={"default": True},
)
assert r.status_code == 400
assert r.json == {"error": "Unverified mailbox cannot be used as default mailbox"}
def test_update_mailbox_email(flask_client):
user = login(flask_client)
# create a mailbox
mb = Mailbox.create(user_id=user.id, email="mb@gmail.com")
Session.commit()
r = flask_client.put(
f"/api/mailboxes/{mb.id}",
json={"email": "new-email@gmail.com"},
)
assert r.status_code == 200
mb = Mailbox.get(mb.id)
assert mb.new_email == "new-email@gmail.com"
def test_cancel_mailbox_email_change(flask_client):
user = login(flask_client)
# create a mailbox
mb = Mailbox.create(user_id=user.id, email="mb@gmail.com")
Session.commit()
# update mailbox email
r = flask_client.put(
f"/api/mailboxes/{mb.id}",
json={"email": "new-email@gmail.com"},
)
assert r.status_code == 200
mb = Mailbox.get(mb.id)
assert mb.new_email == "new-email@gmail.com"
# cancel mailbox email change
r = flask_client.put(
url_for("api.delete_mailbox", mailbox_id=mb.id),
json={"cancel_email_change": True},
)
assert r.status_code == 200
mb = Mailbox.get(mb.id)
assert mb.new_email is None
def test_get_mailboxes(flask_client):
user = login(flask_client)
Mailbox.create(user_id=user.id, email="m1@example.com", verified=True)
Mailbox.create(user_id=user.id, email="m2@example.com", verified=False)
Session.commit()
r = flask_client.get(
"/api/mailboxes",
)
assert r.status_code == 200
# m2@example.com is not returned as it's not verified
assert len(r.json["mailboxes"]) == 2
for mb in r.json["mailboxes"]:
assert "email" in mb
assert "id" in mb
assert "default" in mb
assert "creation_timestamp" in mb
assert "nb_alias" in mb
assert "verified" in mb
def test_get_mailboxes_v2(flask_client):
user = login(flask_client)
Mailbox.create(user_id=user.id, email="m1@example.com", verified=True)
Mailbox.create(user_id=user.id, email="m2@example.com", verified=False)
Session.commit()
r = flask_client.get(
"/api/v2/mailboxes",
)
assert r.status_code == 200
# 3 mailboxes: the default, m1 and m2
assert len(r.json["mailboxes"]) == 3
for mb in r.json["mailboxes"]:
assert "email" in mb
assert "id" in mb
assert "default" in mb
assert "creation_timestamp" in mb
assert "nb_alias" in mb
assert "verified" in mb

View File

@ -0,0 +1,305 @@
from flask import g
from app import config
from app.alias_suffix import signer
from app.alias_utils import delete_alias
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
from app.db import Session
from app.models import Alias, CustomDomain, Mailbox, AliasUsedOn
from app.utils import random_word
from tests.utils import login, random_domain, random_token
def test_v2(flask_client):
login(flask_client)
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
r = flask_client.post(
"/api/v2/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
},
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix.{word}@{EMAIL_DOMAIN}"
res = r.json
assert "id" in res
assert "email" in res
assert "creation_date" in res
assert "creation_timestamp" in res
assert "nb_forward" in res
assert "nb_block" in res
assert "nb_reply" in res
assert "enabled" in res
new_alias: Alias = Alias.get_by(email=r.json["alias"])
assert len(new_alias.mailboxes) == 1
def test_minimal_payload(flask_client):
user = login(flask_client)
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
"mailbox_ids": [user.default_mailbox_id],
},
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix.{word}@{EMAIL_DOMAIN}"
res = r.json
assert "id" in res
assert "email" in res
assert "creation_date" in res
assert "creation_timestamp" in res
assert "nb_forward" in res
assert "nb_block" in res
assert "nb_reply" in res
assert "enabled" in res
new_alias: Alias = Alias.get_by(email=r.json["alias"])
assert len(new_alias.mailboxes) == 1
def test_full_payload(flask_client):
"""Create alias with:
- additional mailbox
- note
- name
- hostname (in URL)
"""
user = login(flask_client)
# create another mailbox
mb = Mailbox.create(user_id=user.id, email="abcd@gmail.com", verified=True)
Session.commit()
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
prefix = random_token()
assert AliasUsedOn.filter(AliasUsedOn.user_id == user.id).count() == 0
r = flask_client.post(
"/api/v3/alias/custom/new?hostname=example.com",
json={
"alias_prefix": prefix,
"signed_suffix": signed_suffix,
"note": "test note",
"mailbox_ids": [user.default_mailbox_id, mb.id],
"name": "your name",
},
)
assert r.status_code == 201
assert r.json["alias"] == f"{prefix}.{word}@{EMAIL_DOMAIN}"
# assert returned field
res = r.json
assert res["note"] == "test note"
assert res["name"] == "your name"
new_alias: Alias = Alias.get_by(email=r.json["alias"])
assert new_alias.note == "test note"
assert len(new_alias.mailboxes) == 2
alias_used_on = AliasUsedOn.filter(AliasUsedOn.user_id == user.id).first()
assert alias_used_on.alias_id == new_alias.id
assert alias_used_on.hostname == "example.com"
def test_custom_domain_alias(flask_client):
user = login(flask_client)
# create a custom domain
domain = random_domain()
CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
"mailbox_ids": [user.default_mailbox_id],
},
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix@{domain}"
def test_wrongly_formatted_payload(flask_client):
login(flask_client)
r = flask_client.post(
"/api/v3/alias/custom/new",
json="string isn't a dict",
)
assert r.status_code == 400
assert r.json == {"error": "request body does not follow the required format"}
def test_mailbox_ids_is_not_an_array(flask_client):
login(flask_client)
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
"mailbox_ids": "not an array",
},
)
assert r.status_code == 400
assert r.json == {"error": "mailbox_ids must be an array of id"}
def test_out_of_quota(flask_client):
user = login(flask_client)
user.trial_end = None
Session.commit()
# create MAX_NB_EMAIL_FREE_PLAN custom alias to run out of quota
for _ in range(MAX_NB_EMAIL_FREE_PLAN):
Alias.create_new(user, prefix="test")
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
"note": "test note",
"mailbox_ids": [user.default_mailbox_id],
"name": "your name",
},
)
assert r.status_code == 400
assert r.json == {
"error": "You have reached the limitation of a "
"free account with the maximum of 3 aliases, please upgrade your plan to create more aliases"
}
def test_cannot_create_alias_in_trash(flask_client):
user = login(flask_client)
# create a custom domain
domain = random_domain()
CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
"mailbox_ids": [user.default_mailbox_id],
},
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix@{domain}"
# delete alias: it's going to be moved to domain trash
alias = Alias.get_by(email=f"prefix@{domain}")
assert alias.custom_domain_id
delete_alias(alias, user)
# try to create the same alias, will fail as the alias is in trash
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix",
"signed_suffix": signed_suffix,
"mailbox_ids": [user.default_mailbox_id],
},
)
assert r.status_code == 409
def test_too_many_requests(flask_client):
config.DISABLE_RATE_LIMIT = False
user = login(flask_client)
# create a custom domain
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
# can't create more than 5 aliases in 1 minute
for i in range(7):
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": f"prefix{i}",
"signed_suffix": signed_suffix,
"mailbox_ids": [user.default_mailbox_id],
},
)
# to make flask-limiter work with unit test
# https://github.com/alisaifee/flask-limiter/issues/147#issuecomment-642683820
g._rate_limiting_complete = False
else:
# last request
assert r.status_code == 429
assert r.json == {"error": "Rate limit exceeded"}
def test_invalid_alias_2_consecutive_dots(flask_client):
user = login(flask_client)
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
json={
"alias_prefix": "prefix.", # with the trailing dot, the alias will have 2 consecutive dots
"signed_suffix": signed_suffix,
"mailbox_ids": [user.default_mailbox_id],
},
)
assert r.status_code == 400
assert r.json == {
"error": "2 consecutive dot signs aren't allowed in an email address"
}

View File

@ -0,0 +1,148 @@
import uuid
from flask import url_for, g
from app import config
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
from app.db import Session
from app.models import Alias, CustomDomain, AliasUsedOn
from tests.utils import login, random_domain
def test_with_hostname(flask_client):
login(flask_client)
r = flask_client.post(
url_for("api.new_random_alias", hostname="www.test.com"),
)
assert r.status_code == 201
assert r.json["alias"].endswith("d1.test")
# make sure alias starts with the suggested prefix
assert r.json["alias"].startswith("test")
# assert returned field
res = r.json
assert "id" in res
assert "email" in res
assert "creation_date" in res
assert "creation_timestamp" in res
assert "nb_forward" in res
assert "nb_block" in res
assert "nb_reply" in res
assert "enabled" in res
assert "note" in res
alias_used_on: AliasUsedOn = AliasUsedOn.order_by(AliasUsedOn.id.desc()).first()
assert alias_used_on.hostname == "www.test.com"
assert alias_used_on.alias_id == res["id"]
def test_with_custom_domain(flask_client):
user = login(flask_client)
domain = random_domain()
CustomDomain.create(
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
r = flask_client.post(
url_for("api.new_random_alias", hostname="www.test.com"),
)
assert r.status_code == 201
assert r.json["alias"] == f"test@{domain}"
assert Alias.filter_by(user_id=user.id).count() == 2
# call the endpoint again, should return the same alias
r = flask_client.post(
url_for("api.new_random_alias", hostname="www.test.com"),
)
assert r.status_code == 201
assert r.json["alias"] == f"test@{domain}"
# no new alias is created
assert Alias.filter_by(user_id=user.id).count() == 2
def test_without_hostname(flask_client):
login(flask_client)
r = flask_client.post(
url_for("api.new_random_alias"),
)
assert r.status_code == 201
assert r.json["alias"].endswith(EMAIL_DOMAIN)
def test_custom_mode(flask_client):
login(flask_client)
# without note
r = flask_client.post(
url_for("api.new_random_alias", mode="uuid"),
)
assert r.status_code == 201
# extract the uuid part
alias = r.json["alias"]
uuid_part = alias[: len(alias) - len(EMAIL_DOMAIN) - 1]
assert is_valid_uuid(uuid_part)
# with note
r = flask_client.post(
url_for("api.new_random_alias", mode="uuid"),
json={"note": "test note"},
)
assert r.status_code == 201
alias = r.json["alias"]
ge = Alias.get_by(email=alias)
assert ge.note == "test note"
def test_out_of_quota(flask_client):
user = login(flask_client)
user.trial_end = None
Session.commit()
# create MAX_NB_EMAIL_FREE_PLAN random alias to run out of quota
for _ in range(MAX_NB_EMAIL_FREE_PLAN):
Alias.create_new(user, prefix="test1")
r = flask_client.post(
url_for("api.new_random_alias", hostname="www.test.com"),
)
assert r.status_code == 400
assert (
r.json["error"] == "You have reached the limitation of a free account with "
"the maximum of 3 aliases, please upgrade your plan to create more aliases"
)
def test_too_many_requests(flask_client):
config.DISABLE_RATE_LIMIT = False
login(flask_client)
# can't create more than 5 aliases in 1 minute
for _ in range(7):
r = flask_client.post(
url_for("api.new_random_alias", hostname="www.test.com", mode="uuid"),
)
# to make flask-limiter work with unit test
# https://github.com/alisaifee/flask-limiter/issues/147#issuecomment-642683820
g._rate_limiting_complete = False
else:
# last request
assert r.status_code == 429
assert r.json == {"error": "Rate limit exceeded"}
def is_valid_uuid(val):
try:
uuid.UUID(str(val))
return True
except ValueError:
return False

View File

@ -0,0 +1,53 @@
from flask import url_for
from app.db import Session
from app.models import Notification
from tests.api.utils import get_new_user_and_api_key
def test_get_notifications(flask_client):
user, api_key = get_new_user_and_api_key()
# create some notifications
Notification.create(user_id=user.id, message="Test message 1")
Notification.create(user_id=user.id, message="Test message 2")
Session.commit()
r = flask_client.get(
"/api/notifications?page=0",
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
assert r.json["more"] is False
assert len(r.json["notifications"]) == 2
for n in r.json["notifications"]:
assert n["id"] > 0
assert n["message"]
assert "title" in n
assert n["read"] is False
assert n["created_at"]
# no more post at the next page
r = flask_client.get(
url_for("api.get_notifications", page=1),
headers={"Authentication": api_key.code},
)
assert r.json["more"] is False
assert len(r.json["notifications"]) == 0
def test_mark_notification_as_read(flask_client):
user, api_key = get_new_user_and_api_key()
Notification.create(id=1, user_id=user.id, message="Test message 1")
Session.commit()
r = flask_client.post(
url_for("api.mark_as_read", notification_id=1),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
notification = Notification.first()
assert notification.read

View File

@ -0,0 +1,64 @@
import arrow
from app.models import (
PhoneReservation,
PhoneNumber,
PhoneCountry,
PhoneMessage,
)
from tests.utils import login
def test_phone_messages(flask_client):
user = login(flask_client)
country = PhoneCountry.create(name="FR", commit=True)
number = PhoneNumber.create(
country_id=country.id, number="+331234567890", active=True, commit=True
)
reservation = PhoneReservation.create(
number_id=number.id,
user_id=user.id,
start=arrow.now().shift(hours=-1),
end=arrow.now().shift(hours=1),
commit=True,
)
# no messages yet
r = flask_client.post(f"/api/phone/reservations/{reservation.id}")
assert r.status_code == 200
assert r.json == {"ended": False, "messages": []}
# a message arrives
PhoneMessage.create(
number_id=number.id, from_number="from_number", body="body", commit=True
)
r = flask_client.post(f"/api/phone/reservations/{reservation.id}")
assert len(r.json["messages"]) == 1
msg = r.json["messages"][0]
assert msg["body"] == "body"
assert msg["from_number"] == "from_number"
assert "created_at" in msg
assert "id" in msg
# print(json.dumps(r.json, indent=2))
def test_phone_messages_ended_reservation(flask_client):
user = login(flask_client)
country = PhoneCountry.create(name="FR", commit=True)
number = PhoneNumber.create(
country_id=country.id, number="+331234567890", active=True, commit=True
)
reservation = PhoneReservation.create(
number_id=number.id,
user_id=user.id,
start=arrow.now().shift(hours=-2),
end=arrow.now().shift(hours=-1), # reservation is ended
commit=True,
)
r = flask_client.post(f"/api/phone/reservations/{reservation.id}")
assert r.status_code == 200
assert r.json == {"ended": True, "messages": []}

View File

@ -0,0 +1,157 @@
from app.api.serializer import get_alias_infos_with_pagination_v3
from app.config import PAGE_LIMIT
from app.db import Session
from app.models import Alias, Mailbox, Contact
from tests.utils import create_new_user
def test_get_alias_infos_with_pagination_v3(flask_client):
user = create_new_user()
# user has 1 alias that's automatically created when the account is created
alias_infos = get_alias_infos_with_pagination_v3(user)
assert len(alias_infos) == 1
alias_info = alias_infos[0]
alias = Alias.filter_by(user_id=user.id).first()
assert alias_info.alias == alias
assert alias_info.mailbox == user.default_mailbox
assert alias_info.mailboxes == [user.default_mailbox]
assert alias_info.nb_forward == 0
assert alias_info.nb_blocked == 0
assert alias_info.nb_reply == 0
assert alias_info.latest_email_log is None
assert alias_info.latest_contact is None
def test_get_alias_infos_with_pagination_v3_query_alias_email(flask_client):
"""test the query on the alias email"""
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
alias_infos = get_alias_infos_with_pagination_v3(user, query=alias.email)
assert len(alias_infos) == 1
alias_infos = get_alias_infos_with_pagination_v3(user, query="no match")
assert len(alias_infos) == 0
def test_get_alias_infos_with_pagination_v3_query_alias_mailbox(flask_client):
"""test the query on the alias mailbox email"""
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
alias_infos = get_alias_infos_with_pagination_v3(user, mailbox_id=alias.mailbox_id)
assert len(alias_infos) == 1
def test_get_alias_infos_with_pagination_v3_query_alias_mailboxes(flask_client):
"""test the query on the alias additional mailboxes"""
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
mb = Mailbox.create(user_id=user.id, email="mb@gmail.com")
alias._mailboxes.append(mb)
Session.commit()
alias_infos = get_alias_infos_with_pagination_v3(user, mailbox_id=mb.id)
assert len(alias_infos) == 1
alias_infos = get_alias_infos_with_pagination_v3(user, query=alias.email)
assert len(alias_infos) == 1
def test_get_alias_infos_with_pagination_v3_query_alias_note(flask_client):
"""test the query on the alias note"""
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
alias.note = "test note"
Session.commit()
alias_infos = get_alias_infos_with_pagination_v3(user, query="test note")
assert len(alias_infos) == 1
def test_get_alias_infos_with_pagination_v3_query_alias_name(flask_client):
"""test the query on the alias name"""
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
alias.name = "Test Name"
Session.commit()
alias_infos = get_alias_infos_with_pagination_v3(user, query="test name")
assert len(alias_infos) == 1
def test_get_alias_infos_with_pagination_v3_no_duplicate(flask_client):
"""When an alias belongs to multiple mailboxes, make sure get_alias_infos_with_pagination_v3
returns no duplicates
"""
user = create_new_user()
alias = Alias.first()
mb = Mailbox.create(user_id=user.id, email="mb@gmail.com")
alias._mailboxes.append(mb)
Session.commit()
alias_infos = get_alias_infos_with_pagination_v3(user)
assert len(alias_infos) == 1
def test_get_alias_infos_with_pagination_v3_no_duplicate_when_empty_contact(
flask_client,
):
"""
Make sure an alias is returned once when it has 2 contacts that have no email log activity
"""
user = create_new_user()
alias = Alias.first()
Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email="rep@sl.local",
)
Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact2@example.com",
reply_email="rep2@sl.local",
)
alias_infos = get_alias_infos_with_pagination_v3(user)
assert len(alias_infos) == 1
def test_get_alias_infos_pinned_alias(flask_client):
"""Different scenarios with pinned alias"""
user = create_new_user()
# to have 3 pages: 2*PAGE_LIMIT + the alias automatically created for a new account
for _ in range(2 * PAGE_LIMIT):
Alias.create_new_random(user)
first_alias = Alias.filter_by(user_id=user.id).order_by(Alias.id).first()
# should return PAGE_LIMIT alias
alias_infos = get_alias_infos_with_pagination_v3(user)
assert len(alias_infos) == PAGE_LIMIT
# make sure first_alias is not returned as the default order is alias creation date
assert first_alias not in [ai.alias for ai in alias_infos]
# pin the first alias
first_alias.pinned = True
Session.commit()
alias_infos = get_alias_infos_with_pagination_v3(user)
# now first_alias is the first result
assert first_alias == alias_infos[0].alias
# and the page size is still the same
assert len(alias_infos) == PAGE_LIMIT
# pinned alias isn't included in the search
alias_infos = get_alias_infos_with_pagination_v3(user, query="no match")
assert len(alias_infos) == 0

View File

@ -0,0 +1,105 @@
from app.models import (
CustomDomain,
AliasGeneratorEnum,
SenderFormatEnum,
AliasSuffixEnum,
)
from tests.utils import login, random_domain
def test_get_setting(flask_client):
login(flask_client)
r = flask_client.get("/api/setting")
assert r.status_code == 200
assert r.json == {
"alias_generator": "word",
"notification": True,
"random_alias_default_domain": "sl.local",
"sender_format": "AT",
"random_alias_suffix": "random_string",
}
def test_update_settings_notification(flask_client):
user = login(flask_client)
assert user.notification
r = flask_client.patch("/api/setting", json={"notification": False})
assert r.status_code == 200
assert not user.notification
def test_update_settings_alias_generator(flask_client):
user = login(flask_client)
assert user.alias_generator == AliasGeneratorEnum.word.value
r = flask_client.patch("/api/setting", json={"alias_generator": "invalid"})
assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"alias_generator": "uuid"})
assert r.status_code == 200
assert user.alias_generator == AliasGeneratorEnum.uuid.value
def test_update_settings_random_alias_default_domain(flask_client):
user = login(flask_client)
assert user.default_random_alias_domain() == "sl.local"
r = flask_client.patch(
"/api/setting", json={"random_alias_default_domain": "invalid"}
)
assert r.status_code == 400
r = flask_client.patch(
"/api/setting", json={"random_alias_default_domain": "d1.test"}
)
assert r.status_code == 200
assert user.default_random_alias_domain() == "d1.test"
def test_update_settings_sender_format(flask_client):
user = login(flask_client)
assert user.sender_format == SenderFormatEnum.AT.value
r = flask_client.patch("/api/setting", json={"sender_format": "invalid"})
assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"sender_format": "A"})
assert r.status_code == 200
assert user.sender_format == SenderFormatEnum.A.value
r = flask_client.patch("/api/setting", json={"sender_format": "NAME_ONLY"})
assert r.status_code == 200
assert user.sender_format == SenderFormatEnum.NAME_ONLY.value
def test_get_setting_domains(flask_client):
user = login(flask_client)
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
r = flask_client.get("/api/setting/domains")
assert r.status_code == 200
def test_get_setting_domains_v2(flask_client):
user = login(flask_client)
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
r = flask_client.get("/api/v2/setting/domains")
assert r.status_code == 200
def test_update_settings_random_alias_suffix(flask_client):
user = login(flask_client)
# default random_alias_suffix is random_string
assert user.random_alias_suffix == AliasSuffixEnum.random_string.value
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "invalid"})
assert r.status_code == 400
r = flask_client.patch("/api/setting", json={"random_alias_suffix": "word"})
assert r.status_code == 200
assert user.random_alias_suffix == AliasSuffixEnum.word.value

View File

@ -0,0 +1,34 @@
from random import random
from flask import url_for
from app.api.base import check_sudo_mode_is_active
from app.db import Session
from app.models import ApiKey
from tests.api.utils import get_new_user_and_api_key
def test_enter_sudo_mode(flask_client):
user, api_key = get_new_user_and_api_key()
password = f"passwd-{random()}"
user.set_password(password)
Session.commit()
r = flask_client.patch(
url_for("api.enter_sudo"),
headers={"Authentication": api_key.code},
json={"password": "invalid"},
)
assert r.status_code == 403
assert not check_sudo_mode_is_active(ApiKey.get(id=api_key.id))
r = flask_client.patch(
url_for("api.enter_sudo"),
headers={"Authentication": api_key.code},
json={"password": password},
)
assert r.status_code == 200
assert r.json == {"ok": True}
assert check_sudo_mode_is_active(ApiKey.get(id=api_key.id))

View File

@ -0,0 +1,68 @@
from random import random
from flask import url_for
from app import config
from app.db import Session
from app.models import Job, ApiToCookieToken
from tests.api.utils import get_new_user_and_api_key
def test_delete_without_sudo(flask_client):
user, api_key = get_new_user_and_api_key()
for job in Job.all():
job.delete(job.id)
Session.commit()
r = flask_client.delete(
url_for("api.delete_user"),
headers={"Authentication": api_key.code},
)
assert r.status_code == 440
assert Job.count() == 0
def test_delete_with_sudo(flask_client):
user, api_key = get_new_user_and_api_key()
password = f"passwd-{random()}"
user.set_password(password)
for job in Job.all():
job.delete(job.id)
Session.commit()
r = flask_client.patch(
url_for("api.enter_sudo"),
headers={"Authentication": api_key.code},
json={"password": password},
)
assert r.status_code == 200
r = flask_client.delete(
url_for("api.delete_user"),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
jobs = Job.all()
assert len(jobs) == 1
job = jobs[0]
assert job.name == config.JOB_DELETE_ACCOUNT
assert job.payload == {"user_id": user.id}
def test_get_cookie_token(flask_client):
user, api_key = get_new_user_and_api_key()
r = flask_client.get(
url_for("api.get_api_session_token"),
headers={"Authentication": api_key.code},
)
assert r.status_code == 200
code = r.json["token"]
token = ApiToCookieToken.get_by(code=code)
assert token is not None
assert token.user_id == user.id

View File

@ -0,0 +1,131 @@
from flask import url_for
from app import config
from app.models import User, PartnerUser
from app.proton.utils import get_proton_partner
from tests.api.utils import get_new_user_and_api_key
from tests.utils import login, random_token, random_email
def test_user_in_trial(flask_client):
user, api_key = get_new_user_and_api_key()
r = flask_client.get(
url_for("api.user_info"), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert r.json == {
"is_premium": True,
"name": "Test User",
"email": user.email,
"in_trial": True,
"profile_picture_url": None,
"max_alias_free_plan": config.MAX_NB_EMAIL_FREE_PLAN,
"connected_proton_address": None,
}
def test_user_linked_to_proton(flask_client):
config.CONNECT_WITH_PROTON = True
user, api_key = get_new_user_and_api_key()
partner = get_proton_partner()
partner_email = random_email()
PartnerUser.create(
user_id=user.id,
partner_id=partner.id,
external_user_id=random_token(),
partner_email=partner_email,
commit=True,
)
r = flask_client.get(
url_for("api.user_info"), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert r.json == {
"is_premium": True,
"name": "Test User",
"email": user.email,
"in_trial": True,
"profile_picture_url": None,
"max_alias_free_plan": config.MAX_NB_EMAIL_FREE_PLAN,
"connected_proton_address": partner_email,
}
def test_wrong_api_key(flask_client):
r = flask_client.get(
url_for("api.user_info"), headers={"Authentication": "Invalid code"}
)
assert r.status_code == 401
assert r.json == {"error": "Wrong api key"}
def test_create_api_key(flask_client):
login(flask_client)
# create api key
r = flask_client.post(url_for("api.create_api_key"), json={"device": "Test device"})
assert r.status_code == 201
assert r.json["api_key"]
def test_logout(flask_client):
login(flask_client)
# logout
r = flask_client.get(
url_for("auth.logout"),
follow_redirects=True,
)
assert r.status_code == 200
def test_change_profile_picture(flask_client):
user = login(flask_client)
assert not user.profile_picture_id
# <<< Set the profile picture >>>
img_base64 = """iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg=="""
r = flask_client.patch(
"/api/user_info",
json={"profile_picture": img_base64},
)
assert r.status_code == 200
assert r.json["profile_picture_url"] is not None
user = User.get(user.id)
assert user.profile_picture_id
# <<< remove the profile picture >>>
r = flask_client.patch(
"/api/user_info",
json={"profile_picture": None},
)
assert r.status_code == 200
assert r.json["profile_picture_url"] is None
user = User.get(user.id)
assert not user.profile_picture_id
def test_change_name(flask_client):
user = login(flask_client)
assert user.name != "new name"
r = flask_client.patch(
"/api/user_info",
json={"name": "new name"},
)
assert r.status_code == 200
assert r.json["name"] == "new name"
assert user.name == "new name"

13
app/tests/api/utils.py Normal file
View File

@ -0,0 +1,13 @@
from typing import Tuple
from app.models import User, ApiKey
from tests.utils import create_new_user
def get_new_user_and_api_key() -> Tuple[User, ApiKey]:
user = create_new_user()
# create api_key
api_key = ApiKey.create(user.id, "for test", commit=True)
return user, api_key