4.55.0
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m43s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 4m10s
Build-Release-Image / Merge-Images (push) Successful in 27s
Build-Release-Image / Create-Release (push) Successful in 10s
Build-Release-Image / Notify (push) Successful in 3s

This commit is contained in:
2024-10-18 12:00:06 +01:00
parent 798b58529c
commit da6e56c4eb
58 changed files with 2002 additions and 637 deletions

View File

@ -0,0 +1,54 @@
import arrow
from app.db import Session
from app.models import SyncEvent
from events.event_source import DeadLetterEventSource, _DEAD_LETTER_THRESHOLD_MINUTES
class EventCounter:
def __init__(self):
self.processed_events = 0
def on_event(self, event: SyncEvent):
self.processed_events += 1
def setup_function(func):
Session.query(SyncEvent).delete()
def test_dead_letter_does_not_take_untaken_events():
source = DeadLetterEventSource(1)
counter = EventCounter()
threshold_time = arrow.utcnow().shift(minutes=-(_DEAD_LETTER_THRESHOLD_MINUTES) + 1)
SyncEvent.create(
content="test".encode("utf-8"), created_at=threshold_time, flush=True
)
SyncEvent.create(
content="test".encode("utf-8"), taken_time=threshold_time, flush=True
)
events_processed = source.execute_loop(on_event=counter.on_event)
assert len(events_processed) == 0
assert counter.processed_events == 0
def test_dead_letter_takes_untaken_events_created_older_than_threshold():
source = DeadLetterEventSource(1)
counter = EventCounter()
old_create = arrow.utcnow().shift(minutes=-_DEAD_LETTER_THRESHOLD_MINUTES - 1)
SyncEvent.create(content="test".encode("utf-8"), created_at=old_create, flush=True)
events_processed = source.execute_loop(on_event=counter.on_event)
assert len(events_processed) == 1
assert events_processed[0].taken_time > old_create
assert counter.processed_events == 1
def test_dead_letter_takes_taken_events_created_older_than_threshold():
source = DeadLetterEventSource(1)
counter = EventCounter()
old_taken = arrow.utcnow().shift(minutes=-_DEAD_LETTER_THRESHOLD_MINUTES - 1)
SyncEvent.create(content="test".encode("utf-8"), taken_time=old_taken, flush=True)
events_processed = source.execute_loop(on_event=counter.on_event)
assert len(events_processed) == 1
assert events_processed[0].taken_time > old_taken
assert counter.processed_events == 1

View File

@ -1,7 +1,9 @@
import arrow
from app import config, alias_utils
from app.db import Session
from app.events.event_dispatcher import GlobalDispatcher
from app.models import Alias
from app.models import Alias, SyncEvent
from tests.utils import random_token
from .event_test_utils import (
OnMemoryDispatcher,
@ -26,6 +28,33 @@ def setup_function(func):
on_memory_dispatcher.clear()
def test_event_taken_updates():
event = SyncEvent.create(content="test".encode("utf-8"), flush=True)
assert event.taken_time is None
assert event.mark_as_taken()
assert event.taken_time is not None
def test_event_mark_as_taken_does_nothing_for_taken_events():
now = arrow.utcnow()
event = SyncEvent.create(content="test".encode("utf-8"), taken_time=now, flush=True)
assert not event.mark_as_taken()
def test_event_mark_as_taken_does_nothing_for_not_before_events():
now = arrow.utcnow()
event = SyncEvent.create(content="test".encode("utf-8"), taken_time=now, flush=True)
older_than = now.shift(minutes=-1)
assert not event.mark_as_taken(allow_taken_older_than=older_than)
def test_event_mark_as_taken_works_for_before_events():
now = arrow.utcnow()
event = SyncEvent.create(content="test".encode("utf-8"), taken_time=now, flush=True)
older_than = now.shift(minutes=+1)
assert event.mark_as_taken(allow_taken_older_than=older_than)
def test_fire_event_on_alias_creation():
(user, pu) = _create_linked_user()
alias = Alias.create_new_random(user)
@ -79,7 +108,7 @@ def test_fire_event_on_alias_status_change():
alias = Alias.create_new_random(user)
Session.flush()
on_memory_dispatcher.clear()
alias_utils.change_alias_status(alias, True)
alias_utils.change_alias_status(alias, enabled=True)
assert len(on_memory_dispatcher.memory) == 1
event_data = on_memory_dispatcher.memory[0]
event_content = _get_event_from_string(event_data, user, pu)

View File

@ -0,0 +1,109 @@
import arrow
from app import config
from app.events.event_dispatcher import GlobalDispatcher
from app.events.generated.event_pb2 import UserPlanChanged
from app.models import (
Subscription,
AppleSubscription,
CoinbaseSubscription,
ManualSubscription,
User,
PartnerUser,
)
from .event_test_utils import (
OnMemoryDispatcher,
_create_linked_user,
_get_event_from_string,
)
from tests.utils import random_token
from app.subscription_webhook import execute_subscription_webhook
on_memory_dispatcher = OnMemoryDispatcher()
def setup_module():
GlobalDispatcher.set_dispatcher(on_memory_dispatcher)
config.EVENT_WEBHOOK = "http://test"
def teardown_module():
GlobalDispatcher.set_dispatcher(None)
config.EVENT_WEBHOOK = None
def setup_function(func):
on_memory_dispatcher.clear()
def check_event(user: User, pu: PartnerUser) -> UserPlanChanged:
assert len(on_memory_dispatcher.memory) == 1
event_data = on_memory_dispatcher.memory[0]
event_content = _get_event_from_string(event_data, user, pu)
assert event_content.user_plan_change is not None
plan_change = event_content.user_plan_change
return plan_change
def test_webhook_with_trial():
(user, pu) = _create_linked_user()
execute_subscription_webhook(user)
assert check_event(user, pu).plan_end_time == 0
def test_webhook_with_subscription():
(user, pu) = _create_linked_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_token(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
execute_subscription_webhook(user)
assert check_event(user, pu).plan_end_time == end_at.timestamp
def test_webhook_with_apple_subscription():
(user, pu) = _create_linked_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_token(10),
plan="yearly",
product_id="",
flush=True,
)
execute_subscription_webhook(user)
assert check_event(user, pu).plan_end_time == end_at.timestamp
def test_webhook_with_coinbase_subscription():
(user, pu) = _create_linked_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert check_event(user, pu).plan_end_time == end_at.timestamp
def test_webhook_with_manual_subscription():
(user, pu) = _create_linked_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
ManualSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert check_event(user, pu).plan_end_time == end_at.timestamp

View File

@ -1,3 +1,5 @@
from typing import List
import pytest
from arrow import Arrow
@ -16,8 +18,9 @@ from app.account_linking import (
)
from app.db import Session
from app.errors import AccountAlreadyLinkedToAnotherPartnerException
from app.models import Partner, PartnerUser, User
from app.models import Partner, PartnerUser, User, UserAuditLog
from app.proton.utils import get_proton_partner
from app.user_audit_log_utils import UserAuditLogAction
from app.utils import random_string, canonicalize_email
from tests.utils import random_email
@ -91,6 +94,11 @@ def test_login_case_from_partner():
)
assert res.user.activated is True
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
def test_login_case_from_partner_with_uppercase_email():
partner = get_proton_partner()
@ -125,6 +133,11 @@ def test_login_case_from_web():
assert 0 == (res.user.flags & User.FLAG_CREATED_FROM_PARTNER)
assert res.user.activated is True
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
def test_get_strategy_existing_sl_user():
email = random_email()
@ -205,6 +218,10 @@ def test_link_account_with_proton_account_same_address(flask_client):
)
assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
def test_link_account_with_proton_account_different_address(flask_client):
@ -229,6 +246,11 @@ def test_link_account_with_proton_account_different_address(flask_client):
assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
def test_link_account_with_proton_account_same_address_but_linked_to_other_user(
flask_client,
@ -248,22 +270,54 @@ def test_link_account_with_proton_account_same_address_but_linked_to_other_user(
partner_user_id, email=random_email()
) # User already linked with the proton account
# START Ensure sl_user_2 has a partner_user with the right data
partner_user = PartnerUser.get_by(
partner_id=get_proton_partner().id, user_id=sl_user_2.id
)
assert partner_user is not None
assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id
assert partner_user.partner_email == sl_user_2.email
assert partner_user.user_id == sl_user_2.id
# END Ensure sl_user_2 has a partner_user with the right data
# Proceed to link sl_user_1
res = process_link_case(link_request, sl_user_1, get_proton_partner())
# Check that the result is linking sl_user_1
assert res.user.id == sl_user_1.id
assert res.user.email == partner_email
assert res.strategy == "Link"
# Ensure partner_user for sl_user_1 exists
partner_user = PartnerUser.get_by(
partner_id=get_proton_partner().id, user_id=sl_user_1.id
)
assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id
# Ensure partner_user for sl_user_2 does not exist anymore
partner_user = PartnerUser.get_by(
partner_id=get_proton_partner().id, user_id=sl_user_2.id
)
assert partner_user is None
# Ensure audit logs for sl_user_1 show the link action
sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_1.id
).all()
assert len(sl_user_1_audit_logs) == 1
assert sl_user_1_audit_logs[0].user_id == sl_user_1.id
assert sl_user_1_audit_logs[0].action == UserAuditLogAction.LinkAccount.value
# Ensure audit logs for sl_user_2 show the unlink action
sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_2.id
).all()
assert len(sl_user_2_audit_logs) == 1
assert sl_user_2_audit_logs[0].user_id == sl_user_2.id
assert sl_user_2_audit_logs[0].action == UserAuditLogAction.UnlinkAccount.value
def test_link_account_with_proton_account_different_address_and_linked_to_other_user(
flask_client,
@ -300,6 +354,22 @@ def test_link_account_with_proton_account_different_address_and_linked_to_other_
)
assert partner_user_2 is None
# Ensure audit logs for sl_user_1 show the link action
sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_1.id
).all()
assert len(sl_user_1_audit_logs) == 1
assert sl_user_1_audit_logs[0].user_id == sl_user_1.id
assert sl_user_1_audit_logs[0].action == UserAuditLogAction.LinkAccount.value
# Ensure audit logs for sl_user_2 show the unlink action
sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_2.id
).all()
assert len(sl_user_2_audit_logs) == 1
assert sl_user_2_audit_logs[0].user_id == sl_user_2.id
assert sl_user_2_audit_logs[0].action == UserAuditLogAction.UnlinkAccount.value
def test_cannot_create_instance_of_base_strategy():
with pytest.raises(Exception):

View File

@ -0,0 +1,95 @@
import random
from app.alias_audit_log_utils import emit_alias_audit_log, AliasAuditLogAction
from app.alias_utils import delete_alias, transfer_alias
from app.models import Alias, AliasAuditLog, AliasDeleteReason
from app.utils import random_string
from tests.utils import create_new_user, random_email
def test_emit_alias_audit_log_for_random_data():
user = create_new_user()
alias = Alias.create(
user_id=user.id,
email=random_email(),
mailbox_id=user.default_mailbox_id,
)
random_user_id = random.randint(1000, 2000)
message = random_string()
action = AliasAuditLogAction.ChangeAliasStatus
emit_alias_audit_log(
alias=alias,
user_id=random_user_id,
action=action,
message=message,
commit=True,
)
logs_for_alias = AliasAuditLog.filter_by(alias_id=alias.id).all()
assert len(logs_for_alias) == 2
last_log = logs_for_alias[-1]
assert last_log.alias_id == alias.id
assert last_log.alias_email == alias.email
assert last_log.user_id == random_user_id
assert last_log.action == action.value
assert last_log.message == message
def test_emit_alias_audit_log_on_alias_creation():
user = create_new_user()
alias = Alias.create(
user_id=user.id,
email=random_email(),
mailbox_id=user.default_mailbox_id,
)
log_for_alias = AliasAuditLog.filter_by(alias_id=alias.id).all()
assert len(log_for_alias) == 1
assert log_for_alias[0].alias_id == alias.id
assert log_for_alias[0].alias_email == alias.email
assert log_for_alias[0].user_id == user.id
assert log_for_alias[0].action == AliasAuditLogAction.CreateAlias.value
def test_alias_audit_log_exists_after_alias_deletion():
user = create_new_user()
alias = Alias.create(
user_id=user.id,
email=random_email(),
mailbox_id=user.default_mailbox_id,
)
alias_id = alias.id
emit_alias_audit_log(alias, AliasAuditLogAction.UpdateAlias, "")
emit_alias_audit_log(alias, AliasAuditLogAction.UpdateAlias, "")
delete_alias(alias, user, AliasDeleteReason.ManualAction, commit=True)
db_alias = Alias.get_by(id=alias_id)
assert db_alias is None
logs_for_alias = AliasAuditLog.filter_by(alias_id=alias.id).all()
assert len(logs_for_alias) == 4
assert logs_for_alias[0].action == AliasAuditLogAction.CreateAlias.value
assert logs_for_alias[1].action == AliasAuditLogAction.UpdateAlias.value
assert logs_for_alias[2].action == AliasAuditLogAction.UpdateAlias.value
assert logs_for_alias[3].action == AliasAuditLogAction.DeleteAlias.value
def test_alias_audit_log_for_transfer():
original_user = create_new_user()
new_user = create_new_user()
alias = Alias.create(
user_id=original_user.id,
email=random_email(),
mailbox_id=original_user.default_mailbox_id,
)
transfer_alias(alias, new_user, [new_user.default_mailbox])
logs_for_alias = AliasAuditLog.filter_by(alias_id=alias.id).all()
assert len(logs_for_alias) == 3
assert logs_for_alias[0].action == AliasAuditLogAction.CreateAlias.value
assert logs_for_alias[1].action == AliasAuditLogAction.TransferredAlias.value
assert logs_for_alias[1].user_id == original_user.id
assert logs_for_alias[2].action == AliasAuditLogAction.AcceptTransferAlias.value
assert logs_for_alias[2].user_id == new_user.id

View File

@ -0,0 +1,70 @@
from typing import Tuple
from app.alias_audit_log_utils import AliasAuditLogAction
from app.alias_mailbox_utils import (
set_mailboxes_for_alias,
CannotSetMailboxesForAliasCause,
)
from app.models import Alias, Mailbox, User, AliasMailbox, AliasAuditLog
from tests.utils import create_new_user, random_email
def setup() -> Tuple[User, Alias]:
user = create_new_user()
alias = Alias.create(
user_id=user.id,
email=random_email(),
mailbox_id=user.default_mailbox_id,
commit=True,
)
return user, alias
def test_set_mailboxes_for_alias_empty_list():
user, alias = setup()
err = set_mailboxes_for_alias(user.id, alias, [])
assert err is CannotSetMailboxesForAliasCause.EmptyMailboxes
def test_set_mailboxes_for_alias_mailbox_for_other_user():
user, alias = setup()
another_user = create_new_user()
err = set_mailboxes_for_alias(user.id, alias, [another_user.default_mailbox_id])
assert err is CannotSetMailboxesForAliasCause.Forbidden
def test_set_mailboxes_for_alias_mailbox_not_exists():
user, alias = setup()
err = set_mailboxes_for_alias(user.id, alias, [9999999])
assert err is CannotSetMailboxesForAliasCause.Forbidden
def test_set_mailboxes_for_alias_mailbox_success():
user, alias = setup()
mb1 = Mailbox.create(
user_id=user.id,
email=random_email(),
verified=True,
)
mb2 = Mailbox.create(
user_id=user.id,
email=random_email(),
verified=True,
commit=True,
)
err = set_mailboxes_for_alias(user.id, alias, [mb1.id, mb2.id])
assert err is None
db_alias = Alias.get_by(id=alias.id)
assert db_alias is not None
assert db_alias.mailbox_id == mb1.id
alias_mailboxes = AliasMailbox.filter_by(alias_id=alias.id).all()
assert len(alias_mailboxes) == 1
assert alias_mailboxes[0].mailbox_id == mb2.id
audit_logs = AliasAuditLog.filter_by(alias_id=alias.id).all()
assert len(audit_logs) == 2
assert audit_logs[0].action == AliasAuditLogAction.CreateAlias.value
assert audit_logs[1].action == AliasAuditLogAction.ChangedMailboxes.value
assert audit_logs[1].message == f"{mb1.id} ({mb1.email}),{mb2.id} ({mb2.email})"

View File

@ -6,7 +6,9 @@ import pytest
from app import mailbox_utils, config
from app.db import Session
from app.mail_sender import mail_sender
from app.models import Mailbox, MailboxActivation, User, Job
from app.mailbox_utils import MailboxEmailChangeError
from app.models import Mailbox, MailboxActivation, User, Job, UserAuditLog
from app.user_audit_log_utils import UserAuditLogAction
from tests.utils import create_new_user, random_email
@ -218,7 +220,11 @@ def test_delete_with_transfer():
user, random_email(), use_digit_codes=True, send_link=False
).mailbox
transfer_mailbox = mailbox_utils.create_mailbox(
user, random_email(), use_digit_codes=True, send_link=False
user,
random_email(),
use_digit_codes=True,
send_link=False,
verified=True,
).mailbox
mailbox_utils.delete_mailbox(
user, mailbox.id, transfer_mailbox_id=transfer_mailbox.id
@ -236,6 +242,28 @@ def test_delete_with_transfer():
assert job.payload["transfer_mailbox_id"] is None
def test_cannot_delete_with_transfer_to_unverified_mailbox():
mailbox = mailbox_utils.create_mailbox(
user, random_email(), use_digit_codes=True, send_link=False
).mailbox
transfer_mailbox = mailbox_utils.create_mailbox(
user,
random_email(),
use_digit_codes=True,
send_link=False,
verified=False,
).mailbox
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(
user, mailbox.id, transfer_mailbox_id=transfer_mailbox.id
)
# Verify mailbox still exists
db_mailbox = Mailbox.get_by(id=mailbox.id)
assert db_mailbox is not None
def test_verify_non_existing_mailbox():
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.verify_mailbox_code(user, 999999999, "9999999")
@ -302,3 +330,74 @@ def test_verify_ok():
assert activation is None
mailbox = Mailbox.get(id=output.mailbox.id)
assert mailbox.verified
# perform_mailbox_email_change
def test_perform_mailbox_email_change_invalid_id():
res = mailbox_utils.perform_mailbox_email_change(99999)
assert res.error == MailboxEmailChangeError.InvalidId
assert res.message_category == "error"
def test_perform_mailbox_email_change_valid_id_not_new_email():
user = create_new_user()
mb = Mailbox.create(
user_id=user.id,
email=random_email(),
new_email=None,
verified=True,
commit=True,
)
res = mailbox_utils.perform_mailbox_email_change(mb.id)
assert res.error == MailboxEmailChangeError.InvalidId
assert res.message_category == "error"
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count()
assert audit_log_entries == 0
def test_perform_mailbox_email_change_valid_id_email_already_used():
user = create_new_user()
new_email = random_email()
# Create mailbox with that email
Mailbox.create(
user_id=user.id,
email=new_email,
verified=True,
)
mb_to_change = Mailbox.create(
user_id=user.id,
email=random_email(),
new_email=new_email,
verified=True,
commit=True,
)
res = mailbox_utils.perform_mailbox_email_change(mb_to_change.id)
assert res.error == MailboxEmailChangeError.EmailAlreadyUsed
assert res.message_category == "error"
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count()
assert audit_log_entries == 0
def test_perform_mailbox_email_change_success():
user = create_new_user()
new_email = random_email()
mb = Mailbox.create(
user_id=user.id,
email=random_email(),
new_email=new_email,
verified=True,
commit=True,
)
res = mailbox_utils.perform_mailbox_email_change(mb.id)
assert res.error is None
assert res.message_category == "success"
db_mailbox = Mailbox.get_by(id=mb.id)
assert db_mailbox is not None
assert db_mailbox.verified is True
assert db_mailbox.email == new_email
assert db_mailbox.new_email is None
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).all()
assert len(audit_log_entries) == 1
assert audit_log_entries[0].action == UserAuditLogAction.UpdateMailbox.value

View File

@ -2,7 +2,7 @@ import arrow
from app.db import Session
from app.models import CoinbaseSubscription
from server import handle_coinbase_event
from app.payments.coinbase import handle_coinbase_event
from tests.utils import create_new_user

View File

@ -1,113 +0,0 @@
import http.server
import json
import threading
import arrow
from app import config
from app.models import (
Subscription,
AppleSubscription,
CoinbaseSubscription,
ManualSubscription,
)
from tests.utils import create_new_user, random_token
from app.subscription_webhook import execute_subscription_webhook
http_server = None
last_http_request = None
def setup_module():
global http_server
http_server = http.server.ThreadingHTTPServer(("", 0), HTTPTestServer)
print(http_server.server_port)
threading.Thread(target=http_server.serve_forever, daemon=True).start()
config.SUBSCRIPTION_CHANGE_WEBHOOK = f"http://localhost:{http_server.server_port}"
def teardown_module():
global http_server
config.SUBSCRIPTION_CHANGE_WEBHOOK = None
http_server.shutdown()
class HTTPTestServer(http.server.BaseHTTPRequestHandler):
def do_POST(self):
global last_http_request
content_len = int(self.headers.get("Content-Length"))
body_data = self.rfile.read(content_len)
last_http_request = json.loads(body_data)
self.send_response(200)
def test_webhook_with_trial():
user = create_new_user()
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] is None
def test_webhook_with_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=1).replace(hour=0, minute=0, second=0)
Subscription.create(
user_id=user.id,
cancel_url="",
update_url="",
subscription_id=random_token(10),
event_time=arrow.now(),
next_bill_date=end_at.date(),
plan="yearly",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_apple_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=2).replace(hour=0, minute=0, second=0)
AppleSubscription.create(
user_id=user.id,
receipt_data=arrow.now().date().strftime("%Y-%m-%d"),
expires_date=end_at.date().strftime("%Y-%m-%d"),
original_transaction_id=random_token(10),
plan="yearly",
product_id="",
flush=True,
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_coinbase_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
CoinbaseSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp
def test_webhook_with_manual_subscription():
user = create_new_user()
end_at = arrow.utcnow().shift(days=3).replace(hour=0, minute=0, second=0)
ManualSubscription.create(
user_id=user.id, end_at=end_at.date().strftime("%Y-%m-%d"), flush=True
)
execute_subscription_webhook(user)
assert last_http_request["user_id"] == user.id
assert last_http_request["is_premium"]
assert last_http_request["active_subscription_end"] == end_at.timestamp

View File

@ -0,0 +1,52 @@
from typing import List
from app import config, mailbox_utils
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.models import UserAuditLog
from app.utils import random_string
from tests.utils import create_new_user, random_email
def setup_module():
config.SKIP_MX_LOOKUP_ON_CHECK = True
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False
def test_emit_alias_audit_log_for_random_data():
user = create_new_user()
message = random_string()
action = UserAuditLogAction.CreateMailbox
emit_user_audit_log(
user=user,
action=action,
message=message,
commit=True,
)
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(user_id=user.id).all()
assert len(logs_for_user) == 1
assert logs_for_user[0].user_id == user.id
assert logs_for_user[0].user_email == user.email
assert logs_for_user[0].action == action.value
assert logs_for_user[0].message == message
def test_emit_audit_log_on_mailbox_creation():
user = create_new_user()
output = mailbox_utils.create_mailbox(
user=user, email=random_email(), verified=True
)
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(user_id=user.id).all()
assert len(logs_for_user) == 1
assert logs_for_user[0].user_id == user.id
assert logs_for_user[0].user_email == user.email
assert logs_for_user[0].action == UserAuditLogAction.CreateMailbox.value
assert (
logs_for_user[0].message
== f"Create mailbox {output.mailbox.id} ({output.mailbox.email}). Verified=True"
)