Compare commits

...

2 Commits

Author SHA1 Message Date
0f73a14926 4.44.3 2024-05-24 12:00:06 +01:00
0ea33ca5f8 4.44.0 2024-05-23 12:00:07 +01:00
28 changed files with 797 additions and 18 deletions

View File

@ -46,7 +46,8 @@ class SLModelView(sqla.ModelView):
def inaccessible_callback(self, name, **kwargs): def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access # redirect to login page if user doesn't have access
return redirect(url_for("auth.login", next=request.url)) flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
def on_model_change(self, form, model, is_created): def on_model_change(self, form, model, is_created):
changes = {} changes = {}

View File

@ -25,6 +25,8 @@ from app.email_utils import (
render, render,
) )
from app.errors import AliasInTrashError from app.errors import AliasInTrashError
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import AliasDeleted, AliasStatusChange, EventContent
from app.log import LOG from app.log import LOG
from app.models import ( from app.models import (
Alias, Alias,
@ -334,6 +336,10 @@ def delete_alias(alias: Alias, user: User):
Alias.filter(Alias.id == alias.id).delete() Alias.filter(Alias.id == alias.id).delete()
Session.commit() Session.commit()
EventDispatcher.send_event(
user, EventContent(alias_deleted=AliasDeleted(alias_id=alias.id))
)
def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]: def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:
""" """
@ -459,3 +465,15 @@ def transfer_alias(alias, new_user, new_mailboxes: [Mailbox]):
alias.pinned = False alias.pinned = False
Session.commit() Session.commit()
def change_alias_status(alias: Alias, enabled: bool, commit: bool = False):
alias.enabled = enabled
event = AliasStatusChange(
alias_id=alias.id, alias_email=alias.email, enabled=enabled
)
EventDispatcher.send_event(alias.user, EventContent(alias_status_change=event))
if commit:
Session.commit()

View File

@ -184,7 +184,7 @@ def toggle_alias(alias_id):
if not alias or alias.user_id != user.id: if not alias or alias.user_id != user.id:
return jsonify(error="Forbidden"), 403 return jsonify(error="Forbidden"), 403
alias.enabled = not alias.enabled alias_utils.change_alias_status(alias, enabled=not alias.enabled)
Session.commit() Session.commit()
return jsonify(enabled=alias.enabled), 200 return jsonify(enabled=alias.enabled), 200

View File

@ -429,7 +429,7 @@ except Exception:
HIBP_SCAN_INTERVAL_DAYS = 7 HIBP_SCAN_INTERVAL_DAYS = 7
HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or [] HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or []
HIBP_MAX_ALIAS_CHECK = 10_000 HIBP_MAX_ALIAS_CHECK = 10_000
HIBP_RPM = 100 HIBP_RPM = int(os.environ.get("HIBP_API_RPM", 100))
HIBP_SKIP_PARTNER_ALIAS = os.environ.get("HIBP_SKIP_PARTNER_ALIAS") HIBP_SKIP_PARTNER_ALIAS = os.environ.get("HIBP_SKIP_PARTNER_ALIAS")
KEEP_OLD_DATA_DAYS = 30 KEEP_OLD_DATA_DAYS = 30
@ -581,3 +581,9 @@ UPCLOUD_PASSWORD = os.environ.get("UPCLOUD_PASSWORD", None)
UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None) UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)
STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ
EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
# We want it disabled by default, so only skip if defined
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ

View File

@ -146,7 +146,7 @@ def index():
alias_utils.delete_alias(alias, current_user) alias_utils.delete_alias(alias, current_user)
flash(f"Alias {email} has been deleted", "success") flash(f"Alias {email} has been deleted", "success")
elif request.form.get("form-name") == "disable-alias": elif request.form.get("form-name") == "disable-alias":
alias.enabled = False alias_utils.change_alias_status(alias, enabled=False)
Session.commit() Session.commit()
flash(f"Alias {alias.email} has been disabled", "success") flash(f"Alias {alias.email} has been disabled", "success")

View File

@ -8,6 +8,7 @@ from app.db import Session
from flask import redirect, url_for, flash, request, render_template from flask import redirect, url_for, flash, request, render_template
from flask_login import login_required, current_user from flask_login import login_required, current_user
from app import alias_utils
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.handler.unsubscribe_encoder import UnsubscribeAction from app.handler.unsubscribe_encoder import UnsubscribeAction
from app.handler.unsubscribe_handler import UnsubscribeHandler from app.handler.unsubscribe_handler import UnsubscribeHandler
@ -31,7 +32,7 @@ def unsubscribe(alias_id):
# automatic unsubscribe, according to https://tools.ietf.org/html/rfc8058 # automatic unsubscribe, according to https://tools.ietf.org/html/rfc8058
if request.method == "POST": if request.method == "POST":
alias.enabled = False alias_utils.change_alias_status(alias, False)
flash(f"Alias {alias.email} has been blocked", "success") flash(f"Alias {alias.email} has been blocked", "success")
Session.commit() Session.commit()

View File

View File

@ -0,0 +1,66 @@
from abc import ABC, abstractmethod
from app import config
from app.db import Session
from app.errors import ProtonPartnerNotSetUp
from app.events.generated import event_pb2
from app.models import User, PartnerUser, SyncEvent
from app.proton.utils import get_proton_partner
from typing import Optional
NOTIFICATION_CHANNEL = "simplelogin_sync_events"
class Dispatcher(ABC):
@abstractmethod
def send(self, event: bytes):
pass
class PostgresDispatcher(Dispatcher):
def send(self, event: bytes):
instance = SyncEvent.create(content=event, flush=True)
Session.execute(f"NOTIFY {NOTIFICATION_CHANNEL}, '{instance.id}';")
@staticmethod
def get():
return PostgresDispatcher()
class EventDispatcher:
@staticmethod
def send_event(
user: User,
content: event_pb2.EventContent,
dispatcher: Dispatcher = PostgresDispatcher.get(),
skip_if_webhook_missing: bool = True,
):
if config.EVENT_WEBHOOK_DISABLE:
return
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
return
partner_user = EventDispatcher.__partner_user(user.id)
if not partner_user:
return
event = event_pb2.Event(
user_id=user.id,
external_user_id=partner_user.external_user_id,
partner_id=partner_user.partner_id,
content=content,
)
serialized = event.SerializeToString()
dispatcher.send(serialized)
@staticmethod
def __partner_user(user_id: int) -> Optional[PartnerUser]:
# Check if the current user has a partner_id
try:
proton_partner_id = get_proton_partner().id
except ProtonPartnerNotSetUp:
return None
# It has. Retrieve the information for the PartnerUser
return PartnerUser.get_by(user_id=user_id, partner_id=proton_partner_id)

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: event.proto
# Protobuf Python Version: 5.26.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x65vent.proto\x12\x12simplelogin_events\"\'\n\x0eUserPlanChange\x12\x15\n\rplan_end_time\x18\x01 \x01(\r\"\r\n\x0bUserDeleted\"Z\n\x0c\x41liasCreated\x12\x10\n\x08\x61lias_id\x18\x01 \x01(\r\x12\x13\n\x0b\x61lias_email\x18\x02 \x01(\t\x12\x12\n\nalias_note\x18\x03 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x04 \x01(\x08\"K\n\x11\x41liasStatusChange\x12\x10\n\x08\x61lias_id\x18\x01 \x01(\r\x12\x13\n\x0b\x61lias_email\x18\x02 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x03 \x01(\x08\"5\n\x0c\x41liasDeleted\x12\x10\n\x08\x61lias_id\x18\x01 \x01(\r\x12\x13\n\x0b\x61lias_email\x18\x02 \x01(\t\"\xce\x02\n\x0c\x45ventContent\x12>\n\x10user_plan_change\x18\x01 \x01(\x0b\x32\".simplelogin_events.UserPlanChangeH\x00\x12\x37\n\x0cuser_deleted\x18\x02 \x01(\x0b\x32\x1f.simplelogin_events.UserDeletedH\x00\x12\x39\n\ralias_created\x18\x03 \x01(\x0b\x32 .simplelogin_events.AliasCreatedH\x00\x12\x44\n\x13\x61lias_status_change\x18\x04 \x01(\x0b\x32%.simplelogin_events.AliasStatusChangeH\x00\x12\x39\n\ralias_deleted\x18\x05 \x01(\x0b\x32 .simplelogin_events.AliasDeletedH\x00\x42\t\n\x07\x63ontent\"y\n\x05\x45vent\x12\x0f\n\x07user_id\x18\x01 \x01(\r\x12\x18\n\x10\x65xternal_user_id\x18\x02 \x01(\t\x12\x12\n\npartner_id\x18\x03 \x01(\r\x12\x31\n\x07\x63ontent\x18\x04 \x01(\x0b\x32 .simplelogin_events.EventContentb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'event_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_USERPLANCHANGE']._serialized_start=35
_globals['_USERPLANCHANGE']._serialized_end=74
_globals['_USERDELETED']._serialized_start=76
_globals['_USERDELETED']._serialized_end=89
_globals['_ALIASCREATED']._serialized_start=91
_globals['_ALIASCREATED']._serialized_end=181
_globals['_ALIASSTATUSCHANGE']._serialized_start=183
_globals['_ALIASSTATUSCHANGE']._serialized_end=258
_globals['_ALIASDELETED']._serialized_start=260
_globals['_ALIASDELETED']._serialized_end=313
_globals['_EVENTCONTENT']._serialized_start=316
_globals['_EVENTCONTENT']._serialized_end=650
_globals['_EVENT']._serialized_start=652
_globals['_EVENT']._serialized_end=773
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,71 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class UserPlanChange(_message.Message):
__slots__ = ("plan_end_time",)
PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int]
plan_end_time: int
def __init__(self, plan_end_time: _Optional[int] = ...) -> None: ...
class UserDeleted(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...
class AliasCreated(_message.Message):
__slots__ = ("alias_id", "alias_email", "alias_note", "enabled")
ALIAS_ID_FIELD_NUMBER: _ClassVar[int]
ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int]
ALIAS_NOTE_FIELD_NUMBER: _ClassVar[int]
ENABLED_FIELD_NUMBER: _ClassVar[int]
alias_id: int
alias_email: str
alias_note: str
enabled: bool
def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ..., alias_note: _Optional[str] = ..., enabled: bool = ...) -> None: ...
class AliasStatusChange(_message.Message):
__slots__ = ("alias_id", "alias_email", "enabled")
ALIAS_ID_FIELD_NUMBER: _ClassVar[int]
ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int]
ENABLED_FIELD_NUMBER: _ClassVar[int]
alias_id: int
alias_email: str
enabled: bool
def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ..., enabled: bool = ...) -> None: ...
class AliasDeleted(_message.Message):
__slots__ = ("alias_id", "alias_email")
ALIAS_ID_FIELD_NUMBER: _ClassVar[int]
ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int]
alias_id: int
alias_email: str
def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ...) -> None: ...
class EventContent(_message.Message):
__slots__ = ("user_plan_change", "user_deleted", "alias_created", "alias_status_change", "alias_deleted")
USER_PLAN_CHANGE_FIELD_NUMBER: _ClassVar[int]
USER_DELETED_FIELD_NUMBER: _ClassVar[int]
ALIAS_CREATED_FIELD_NUMBER: _ClassVar[int]
ALIAS_STATUS_CHANGE_FIELD_NUMBER: _ClassVar[int]
ALIAS_DELETED_FIELD_NUMBER: _ClassVar[int]
user_plan_change: UserPlanChange
user_deleted: UserDeleted
alias_created: AliasCreated
alias_status_change: AliasStatusChange
alias_deleted: AliasDeleted
def __init__(self, user_plan_change: _Optional[_Union[UserPlanChange, _Mapping]] = ..., user_deleted: _Optional[_Union[UserDeleted, _Mapping]] = ..., alias_created: _Optional[_Union[AliasCreated, _Mapping]] = ..., alias_status_change: _Optional[_Union[AliasStatusChange, _Mapping]] = ..., alias_deleted: _Optional[_Union[AliasDeleted, _Mapping]] = ...) -> None: ...
class Event(_message.Message):
__slots__ = ("user_id", "external_user_id", "partner_id", "content")
USER_ID_FIELD_NUMBER: _ClassVar[int]
EXTERNAL_USER_ID_FIELD_NUMBER: _ClassVar[int]
PARTNER_ID_FIELD_NUMBER: _ClassVar[int]
CONTENT_FIELD_NUMBER: _ClassVar[int]
user_id: int
external_user_id: str
partner_id: int
content: EventContent
def __init__(self, user_id: _Optional[int] = ..., external_user_id: _Optional[str] = ..., partner_id: _Optional[int] = ..., content: _Optional[_Union[EventContent, _Mapping]] = ...) -> None: ...

View File

@ -5,6 +5,7 @@ from typing import Optional
from aiosmtpd.smtp import Envelope from aiosmtpd.smtp import Envelope
from app import config from app import config
from app import alias_utils
from app.db import Session from app.db import Session
from app.email import headers, status from app.email import headers, status
from app.email_utils import ( from app.email_utils import (
@ -101,7 +102,7 @@ class UnsubscribeHandler:
mailbox.email, alias mailbox.email, alias
): ):
return status.E509 return status.E509
alias.enabled = False alias_utils.change_alias_status(alias, enabled=False)
Session.commit() Session.commit()
enable_alias_url = config.URL + f"/dashboard/?highlight_alias_id={alias.id}" enable_alias_url = config.URL + f"/dashboard/?highlight_alias_id={alias.id}"
for mailbox in alias.mailboxes: for mailbox in alias.mailboxes:

View File

@ -657,6 +657,21 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
return user return user
@classmethod
def delete(cls, obj_id, commit=False):
# Internal import to avoid global import cycles
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import UserDeleted, EventContent
user: User = cls.get(obj_id)
EventDispatcher.send_event(user, EventContent(user_deleted=UserDeleted()))
res = super(User, cls).delete(obj_id)
if commit:
Session.commit()
return res
def get_active_subscription( def get_active_subscription(
self, include_partner_subscription: bool = True self, include_partner_subscription: bool = True
) -> Optional[ ) -> Optional[
@ -1619,6 +1634,18 @@ class Alias(Base, ModelMixin):
Session.add(new_alias) Session.add(new_alias)
DailyMetric.get_or_create_today_metric().nb_alias += 1 DailyMetric.get_or_create_today_metric().nb_alias += 1
# Internal import to avoid global import cycles
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import AliasCreated, EventContent
event = AliasCreated(
alias_id=new_alias.id,
alias_email=new_alias.email,
alias_note=new_alias.note,
enabled=True,
)
EventDispatcher.send_event(user, EventContent(alias_created=event))
if commit: if commit:
Session.commit() Session.commit()
@ -3648,3 +3675,52 @@ class ApiToCookieToken(Base, ModelMixin):
code = secrets.token_urlsafe(32) code = secrets.token_urlsafe(32)
return super().create(code=code, **kwargs) return super().create(code=code, **kwargs)
class SyncEvent(Base, ModelMixin):
"""This model holds the events that need to be sent to the webhook"""
__tablename__ = "sync_event"
content = sa.Column(sa.LargeBinary, unique=False, nullable=False)
taken_time = sa.Column(
ArrowType, default=None, nullable=True, server_default=None, index=True
)
__table_args__ = (
sa.Index("ix_sync_event_created_at", "created_at"),
sa.Index("ix_sync_event_taken_time", "taken_time"),
)
def mark_as_taken(self) -> bool:
sql = """
UPDATE sync_event
SET taken_time = :taken_time
WHERE id = :sync_event_id
AND taken_time IS NULL
"""
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}
res = Session.execute(sql, args)
Session.commit()
return res.rowcount > 0
@classmethod
def get_dead_letter(cls, older_than: Arrow) -> [SyncEvent]:
return (
SyncEvent.filter(
(
(
SyncEvent.taken_time.isnot(None)
& (SyncEvent.taken_time < older_than)
)
| (
SyncEvent.taken_time.is_(None)
& (SyncEvent.created_at < older_than)
)
)
)
.order_by(SyncEvent.id)
.limit(100)
.all()
)

View File

@ -2,6 +2,8 @@ import requests
from requests import RequestException from requests import RequestException
from app import config from app import config
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChange
from app.log import LOG from app.log import LOG
from app.models import User from app.models import User
@ -31,3 +33,6 @@ def execute_subscription_webhook(user: User):
) )
except RequestException as e: except RequestException as e:
LOG.error(f"Subscription request exception: {e}") LOG.error(f"Subscription request exception: {e}")
event = UserPlanChange(plan_end_time=sl_subscription_end)
EventDispatcher.send_event(user, EventContent(user_plan_change=event))

View File

@ -53,7 +53,7 @@ from flanker.addresslib.address import EmailAddress
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from app import pgp_utils, s3, config from app import pgp_utils, s3, config
from app.alias_utils import try_auto_create from app.alias_utils import try_auto_create, change_alias_status
from app.config import ( from app.config import (
EMAIL_DOMAIN, EMAIL_DOMAIN,
URL, URL,
@ -1585,7 +1585,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog):
LOG.w( LOG.w(
f"Disable alias {alias} because {reason}. {alias.mailboxes} {alias.user}. Last contact {contact}" f"Disable alias {alias} because {reason}. {alias.mailboxes} {alias.user}. Last contact {contact}"
) )
alias.enabled = False change_alias_status(alias, enabled=False)
Notification.create( Notification.create(
user_id=user.id, user_id=user.id,

64
app/event_listener.py Normal file
View File

@ -0,0 +1,64 @@
import argparse
from enum import Enum
from sys import argv, exit
from app.config import DB_URI
from app.log import LOG
from events.runner import Runner
from events.event_source import DeadLetterEventSource, PostgresEventSource
from events.event_sink import ConsoleEventSink, HttpEventSink
class Mode(Enum):
DEAD_LETTER = "dead_letter"
LISTENER = "listener"
@staticmethod
def from_str(value: str):
if value == Mode.DEAD_LETTER.value:
return Mode.DEAD_LETTER
elif value == Mode.LISTENER.value:
return Mode.LISTENER
else:
raise ValueError(f"Invalid mode: {value}")
def main(mode: Mode, dry_run: bool):
if mode == Mode.DEAD_LETTER:
LOG.i("Using DeadLetterEventSource")
source = DeadLetterEventSource()
elif mode == Mode.LISTENER:
LOG.i("Using PostgresEventSource")
source = PostgresEventSource(DB_URI)
else:
raise ValueError(f"Invalid mode: {mode}")
if dry_run:
LOG.i("Starting with ConsoleEventSink")
sink = ConsoleEventSink()
else:
LOG.i("Starting with HttpEventSink")
sink = HttpEventSink()
runner = Runner(source=source, sink=sink)
runner.run()
def args():
parser = argparse.ArgumentParser(description="Run event listener")
parser.add_argument(
"mode",
help="Mode to run",
choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value],
)
parser.add_argument("--dry-run", help="Dry run mode", action="store_true")
return parser.parse_args()
if __name__ == "__main__":
if len(argv) < 2:
print("Invalid usage. Pass 'listener' or 'dead_letter' as argument")
exit(1)
args = args()
main(Mode.from_str(args.mode), args.dry_run)

0
app/events/__init__.py Normal file
View File

42
app/events/event_sink.py Normal file
View File

@ -0,0 +1,42 @@
import requests
from abc import ABC, abstractmethod
from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL
from app.log import LOG
from app.models import SyncEvent
class EventSink(ABC):
@abstractmethod
def process(self, event: SyncEvent) -> bool:
pass
class HttpEventSink(EventSink):
def process(self, event: SyncEvent) -> bool:
if not EVENT_WEBHOOK:
LOG.warning("Skipping sending event because there is no webhook configured")
return False
LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}")
res = requests.post(
url=EVENT_WEBHOOK,
data=event.content,
headers={"Content-Type": "application/x-protobuf"},
verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL,
)
if res.status_code != 200:
LOG.warning(
f"Failed to send event to webhook: {res.status_code} {res.text}"
)
return False
else:
LOG.info(f"Event {event.id} sent successfully to webhook")
return True
class ConsoleEventSink(EventSink):
def process(self, event: SyncEvent) -> bool:
LOG.info(f"Handling event {event.id}")
return True

100
app/events/event_source.py Normal file
View File

@ -0,0 +1,100 @@
import arrow
import newrelic.agent
import psycopg2
import select
from abc import ABC, abstractmethod
from app.log import LOG
from app.models import SyncEvent
from app.events.event_dispatcher import NOTIFICATION_CHANNEL
from time import sleep
from typing import Callable, NoReturn
_DEAD_LETTER_THRESHOLD_MINUTES = 10
_DEAD_LETTER_INTERVAL_SECONDS = 30
_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5
class EventSource(ABC):
@abstractmethod
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
pass
class PostgresEventSource(EventSource):
def __init__(self, connection_string: str):
self.__connection_string = connection_string
self.__connect()
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
while True:
try:
self.__listen(on_event)
except Exception as e:
LOG.warn(f"Error listening to events: {e}")
sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS)
self.__connect()
def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
self.__connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
)
cursor = self.__connection.cursor()
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
while True:
if select.select([self.__connection], [], [], 5) != ([], [], []):
self.__connection.poll()
while self.__connection.notifies:
notify = self.__connection.notifies.pop(0)
LOG.debug(
f"Got NOTIFY: pid={notify.pid} channel={notify.channel} payload={notify.payload}"
)
try:
webhook_id = int(notify.payload)
event = SyncEvent.get_by(id=webhook_id)
if event is not None:
if event.mark_as_taken():
on_event(event)
else:
LOG.info(
f"Event {event.id} was handled by another runner"
)
else:
LOG.info(f"Could not find event with id={notify.payload}")
except Exception as e:
LOG.warn(f"Error getting event: {e}")
def __connect(self):
self.__connection = psycopg2.connect(self.__connection_string)
from app.db import Session
Session.close()
class DeadLetterEventSource(EventSource):
@newrelic.agent.background_task()
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
while True:
try:
threshold = arrow.utcnow().shift(
minutes=-_DEAD_LETTER_THRESHOLD_MINUTES
)
events = SyncEvent.get_dead_letter(older_than=threshold)
if events:
LOG.info(f"Got {len(events)} dead letter events")
if events:
newrelic.agent.record_custom_metric(
"Custom/dead_letter_events_to_process", len(events)
)
for event in events:
on_event(event)
else:
LOG.debug("No dead letter events")
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
except Exception as e:
LOG.warn(f"Error getting dead letter event: {e}")
sleep(_DEAD_LETTER_INTERVAL_SECONDS)

42
app/events/runner.py Normal file
View File

@ -0,0 +1,42 @@
import arrow
import newrelic.agent
from app.log import LOG
from app.models import SyncEvent
from events.event_sink import EventSink
from events.event_source import EventSource
class Runner:
def __init__(self, source: EventSource, sink: EventSink):
self.__source = source
self.__sink = sink
def run(self):
self.__source.run(self.__on_event)
@newrelic.agent.background_task()
def __on_event(self, event: SyncEvent):
try:
event_created_at = event.created_at
start_time = arrow.now()
success = self.__sink.process(event)
if success:
event_id = event.id
SyncEvent.delete(event.id, commit=True)
LOG.info(f"Marked {event_id} as done")
end_time = arrow.now() - start_time
time_between_taken_and_created = start_time - event_created_at
newrelic.agent.record_custom_metric("Custom/sync_event_processed", 1)
newrelic.agent.record_custom_metric(
"Custom/sync_event_process_time", end_time.total_seconds()
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(),
)
except Exception as e:
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)

View File

@ -0,0 +1,38 @@
"""Create sync_event table
Revision ID: 06a9a7133445
Revises: fa2f19bb4e5a
Create Date: 2024-05-17 13:11:20.402259
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '06a9a7133445'
down_revision = 'fa2f19bb4e5a'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('sync_event',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False),
sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True),
sa.Column('content', sa.LargeBinary(), nullable=False),
sa.Column('taken_time', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_sync_event_created_at'), 'sync_event', ['created_at'], unique=False)
op.create_index(op.f('ix_sync_event_taken_time'), 'sync_event', ['taken_time'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('sync_event')
# ### end Alembic commands ###

View File

@ -4,6 +4,7 @@ import subprocess
from time import sleep from time import sleep
from typing import List, Dict from typing import List, Dict
import arrow
import newrelic.agent import newrelic.agent
from app.db import Session from app.db import Session
@ -93,11 +94,44 @@ def log_nb_db_connection():
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection) newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)
@newrelic.agent.background_task()
def log_pending_to_process_events():
r = Session.execute("select count(*) from sync_event WHERE taken_time IS NULL;")
events_pending = list(r)[0][0]
LOG.d("number of events pending to process %s", events_pending)
newrelic.agent.record_custom_metric(
"Custom/sync_events_pending_to_process", events_pending
)
@newrelic.agent.background_task()
def log_events_pending_dead_letter():
since = arrow.now().shift(minutes=-10).datetime
r = Session.execute(
"""
SELECT COUNT(*)
FROM sync_event
WHERE (taken_time IS NOT NULL AND taken_time < :since)
OR (taken_time IS NULL AND created_at < :since)
""",
{"since": since},
)
events_pending = list(r)[0][0]
LOG.d("number of events pending dead letter %s", events_pending)
newrelic.agent.record_custom_metric(
"Custom/sync_events_pending_dead_letter", events_pending
)
if __name__ == "__main__": if __name__ == "__main__":
exporter = MetricExporter(get_newrelic_license()) exporter = MetricExporter(get_newrelic_license())
while True: while True:
log_postfix_metrics() log_postfix_metrics()
log_nb_db_connection() log_nb_db_connection()
log_pending_to_process_events()
log_events_pending_dead_letter()
Session.close() Session.close()
exporter.run() exporter.run()

45
app/proto/event.proto Normal file
View File

@ -0,0 +1,45 @@
syntax = "proto3";
package simplelogin_events;
message UserPlanChange {
uint32 plan_end_time = 1;
}
message UserDeleted {
}
message AliasCreated {
uint32 alias_id = 1;
string alias_email = 2;
string alias_note = 3;
bool enabled = 4;
}
message AliasStatusChange {
uint32 alias_id = 1;
string alias_email = 2;
bool enabled = 3;
}
message AliasDeleted {
uint32 alias_id = 1;
string alias_email = 2;
}
message EventContent {
oneof content {
UserPlanChange user_plan_change = 1;
UserDeleted user_deleted = 2;
AliasCreated alias_created = 3;
AliasStatusChange alias_status_change = 4;
AliasDeleted alias_deleted = 5;
}
}
message Event {
uint32 user_id = 1;
string external_user_id = 2;
uint32 partner_id = 3;
EventContent content = 4;
}

View File

@ -14,13 +14,14 @@ exclude = '''
| build | build
| dist | dist
| migrations # migrations/ is generated by alembic | migrations # migrations/ is generated by alembic
| app/events/generated
)/ )/
) )
''' '''
[tool.ruff] [tool.ruff]
ignore-init-module-imports = true ignore-init-module-imports = true
exclude = [".venv", "migrations"] exclude = [".venv", "migrations", "app/events/generated"]
[tool.djlint] [tool.djlint]
indent = 2 indent = 2

View File

@ -0,0 +1,24 @@
#!/bin/bash
set -euxo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" || exit 1; pwd -P)"
REPO_ROOT=$(echo "${SCRIPT_DIR}" | sed 's:scripts::g')
DEST_DIR="${REPO_ROOT}/app/events/generated"
PROTOC=${PROTOC:-"protoc"}
if ! eval "${PROTOC} --version" &> /dev/null ; then
echo "Cannot find $PROTOC"
exit 1
fi
rm -rf "${DEST_DIR}"
mkdir -p "${DEST_DIR}"
pushd $REPO_ROOT || exit 1
eval "${PROTOC} --proto_path=proto --python_out=\"${DEST_DIR}\" --pyi_out=\"${DEST_DIR}\" proto/event.proto"
popd || exit 1

View File

@ -48,15 +48,16 @@
{# SIWSL#} {# SIWSL#}
{# </a>#} {# </a>#}
{# </li>#} {# </li>#}
{# {% if current_user.should_show_app_page() %}#} {% if current_user.should_show_app_page() %}
{# <li class="nav-item">#}
{# <a href="{{ url_for('dashboard.app_route') }}"#} <li class="nav-item">
{# class="nav-link {{ 'active' if active_page == 'app' }}">#} <a href="{{ url_for('dashboard.app_route') }}"
{# <i class="fe fe-grid"></i>#} class="nav-link {{ 'active' if active_page == 'app' }}">
{# Apps#} <i class="fe fe-grid"></i>
{# </a>#} Apps
{# </li>#} </a>
{# {% endif %}#} </li>
{% endif %}
<li class="nav-item"> <li class="nav-item">
<a href="{{ url_for('dashboard.setting') }}" <a href="{{ url_for('dashboard.setting') }}"
class="nav-link {{ 'active' if active_page == 'setting' }}"> class="nav-link {{ 'active' if active_page == 'setting' }}">

View File

View File

@ -0,0 +1,56 @@
from app.events.event_dispatcher import EventDispatcher, Dispatcher
from app.events.generated.event_pb2 import EventContent, UserDeleted
from app.models import PartnerUser, User
from app.proton.utils import get_proton_partner
from tests.utils import create_new_user, random_token
from typing import Tuple
class OnMemoryDispatcher(Dispatcher):
def __init__(self):
self.memory = []
def send(self, event: bytes):
self.memory.append(event)
def _create_unlinked_user() -> User:
return create_new_user()
def _create_linked_user() -> Tuple[User, PartnerUser]:
user = _create_unlinked_user()
partner_user = PartnerUser.create(
partner_id=get_proton_partner().id,
user_id=user.id,
external_user_id=random_token(10),
flush=True,
)
return user, partner_user
def test_event_dispatcher_stores_events():
dispatcher = OnMemoryDispatcher()
(user, partner) = _create_linked_user()
content = EventContent(user_deleted=UserDeleted())
EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False)
assert len(dispatcher.memory) == 1
content = EventContent(user_deleted=UserDeleted())
EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False)
assert len(dispatcher.memory) == 2
def test_event_dispatcher_does_not_send_event_if_user_not_linked():
dispatcher = OnMemoryDispatcher()
user = _create_unlinked_user()
content = EventContent(user_deleted=UserDeleted())
EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False)
assert len(dispatcher.memory) == 0
content = EventContent(user_deleted=UserDeleted())
EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False)
assert len(dispatcher.memory) == 0

View File

@ -17,6 +17,7 @@ from app.models import (
Subscription, Subscription,
PlanEnum, PlanEnum,
PADDLE_SUBSCRIPTION_GRACE_DAYS, PADDLE_SUBSCRIPTION_GRACE_DAYS,
SyncEvent,
) )
from tests.utils import login, create_new_user, random_token from tests.utils import login, create_new_user, random_token
@ -325,3 +326,51 @@ def test_user_can_send_receive():
user.disabled = False user.disabled = False
user.delete_on = arrow.now() user.delete_on = arrow.now()
assert not user.can_send_or_receive() assert not user.can_send_or_receive()
def test_sync_event_dead_letter():
# remove all SyncEvents before the test
all_events = SyncEvent.all()
for event in all_events:
SyncEvent.delete(event.id, commit=True)
# create an expired not taken event
e1 = SyncEvent.create(
content=b"content",
created_at=arrow.now().shift(minutes=-15),
taken_time=None,
commit=True,
)
# create an expired taken event (but too long ago)
e2 = SyncEvent.create(
content=b"content",
created_at=arrow.now().shift(minutes=-15),
taken_time=arrow.now().shift(minutes=-14),
commit=True,
)
# create an expired taken event (but recently)
e3 = SyncEvent.create(
content=b"content",
created_at=arrow.now().shift(minutes=-15),
taken_time=arrow.now().shift(minutes=-1),
commit=True,
)
# create a normal event
e4 = SyncEvent.create(
content=b"content",
created_at=arrow.now(),
commit=True,
)
# get dead letter events
dead_letter_events = SyncEvent.get_dead_letter(
older_than=arrow.now().shift(minutes=-10)
)
assert len(dead_letter_events) == 2
assert e1 in dead_letter_events
assert e2 in dead_letter_events
assert e3 not in dead_letter_events
assert e4 not in dead_letter_events