Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
d313c94f77 | |||
39fcf2e48f | |||
41a5a65f51 |
@ -330,7 +330,10 @@ def try_auto_create_via_domain(address: str) -> Optional[Alias]:
|
|||||||
|
|
||||||
|
|
||||||
def delete_alias(
|
def delete_alias(
|
||||||
alias: Alias, user: User, reason: AliasDeleteReason = AliasDeleteReason.Unspecified
|
alias: Alias,
|
||||||
|
user: User,
|
||||||
|
reason: AliasDeleteReason = AliasDeleteReason.Unspecified,
|
||||||
|
commit: bool = False,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Delete an alias and add it to either global or domain trash
|
Delete an alias and add it to either global or domain trash
|
||||||
@ -366,6 +369,8 @@ def delete_alias(
|
|||||||
EventDispatcher.send_event(
|
EventDispatcher.send_event(
|
||||||
user, EventContent(alias_deleted=AliasDeleted(alias_id=alias.id))
|
user, EventContent(alias_deleted=AliasDeleted(alias_id=alias.id))
|
||||||
)
|
)
|
||||||
|
if commit:
|
||||||
|
Session.commit()
|
||||||
|
|
||||||
|
|
||||||
def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:
|
def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:
|
||||||
|
@ -596,12 +596,16 @@ def read_webhook_enabled_user_ids() -> Optional[List[int]]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
ids = []
|
ids = []
|
||||||
for id in user_ids.split(","):
|
for user_id in user_ids.split(","):
|
||||||
try:
|
try:
|
||||||
ids.append(int(id.strip()))
|
ids.append(int(user_id.strip()))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
return ids
|
return ids
|
||||||
|
|
||||||
|
|
||||||
EVENT_WEBHOOK_ENABLED_USER_IDS: Optional[List[int]] = read_webhook_enabled_user_ids()
|
EVENT_WEBHOOK_ENABLED_USER_IDS: Optional[List[int]] = read_webhook_enabled_user_ids()
|
||||||
|
|
||||||
|
# Allow to define a different DB_URI for the event listener, in case we want to skip the connection pool
|
||||||
|
# It defaults to the regular DB_URI in case it's needed
|
||||||
|
EVENT_LISTENER_DB_URI = os.environ.get("EVENT_LISTENER_DB_URI", DB_URI)
|
||||||
|
@ -145,7 +145,7 @@ def index():
|
|||||||
LOG.i(f"User {current_user} requested deletion of alias {alias}")
|
LOG.i(f"User {current_user} requested deletion of alias {alias}")
|
||||||
email = alias.email
|
email = alias.email
|
||||||
alias_utils.delete_alias(
|
alias_utils.delete_alias(
|
||||||
alias, current_user, AliasDeleteReason.ManualAction
|
alias, current_user, AliasDeleteReason.ManualAction, commit=True
|
||||||
)
|
)
|
||||||
flash(f"Alias {email} has been deleted", "success")
|
flash(f"Alias {email} has been deleted", "success")
|
||||||
elif request.form.get("form-name") == "disable-alias":
|
elif request.form.get("form-name") == "disable-alias":
|
||||||
|
@ -2,6 +2,7 @@ import requests
|
|||||||
from requests import RequestException
|
from requests import RequestException
|
||||||
|
|
||||||
from app import config
|
from app import config
|
||||||
|
from app.db import Session
|
||||||
from app.events.event_dispatcher import EventDispatcher
|
from app.events.event_dispatcher import EventDispatcher
|
||||||
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
|
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
|
||||||
from app.log import LOG
|
from app.log import LOG
|
||||||
@ -29,10 +30,11 @@ def execute_subscription_webhook(user: User):
|
|||||||
LOG.i("Sent request to subscription update webhook successfully")
|
LOG.i("Sent request to subscription update webhook successfully")
|
||||||
else:
|
else:
|
||||||
LOG.i(
|
LOG.i(
|
||||||
f"Request to webhook failed with statue {response.status_code}: {response.text}"
|
f"Request to webhook failed with status {response.status_code}: {response.text}"
|
||||||
)
|
)
|
||||||
except RequestException as e:
|
except RequestException as e:
|
||||||
LOG.error(f"Subscription request exception: {e}")
|
LOG.error(f"Subscription request exception: {e}")
|
||||||
|
|
||||||
event = UserPlanChanged(plan_end_time=sl_subscription_end)
|
event = UserPlanChanged(plan_end_time=sl_subscription_end)
|
||||||
EventDispatcher.send_event(user, EventContent(user_plan_change=event))
|
EventDispatcher.send_event(user, EventContent(user_plan_change=event))
|
||||||
|
Session.commit()
|
||||||
|
@ -2,8 +2,9 @@ import argparse
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from sys import argv, exit
|
from sys import argv, exit
|
||||||
|
|
||||||
from app.config import DB_URI
|
from app.config import EVENT_LISTENER_DB_URI
|
||||||
from app.log import LOG
|
from app.log import LOG
|
||||||
|
from events import event_debugger
|
||||||
from events.runner import Runner
|
from events.runner import Runner
|
||||||
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
||||||
from events.event_sink import ConsoleEventSink, HttpEventSink
|
from events.event_sink import ConsoleEventSink, HttpEventSink
|
||||||
@ -31,7 +32,7 @@ def main(mode: Mode, dry_run: bool, max_retries: int):
|
|||||||
source = DeadLetterEventSource(max_retries)
|
source = DeadLetterEventSource(max_retries)
|
||||||
elif mode == Mode.LISTENER:
|
elif mode == Mode.LISTENER:
|
||||||
LOG.i("Using PostgresEventSource")
|
LOG.i("Using PostgresEventSource")
|
||||||
source = PostgresEventSource(DB_URI)
|
source = PostgresEventSource(EVENT_LISTENER_DB_URI)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Invalid mode: {mode}")
|
raise ValueError(f"Invalid mode: {mode}")
|
||||||
|
|
||||||
@ -46,32 +47,67 @@ def main(mode: Mode, dry_run: bool, max_retries: int):
|
|||||||
runner.run()
|
runner.run()
|
||||||
|
|
||||||
|
|
||||||
|
def debug_event(event_id: str):
|
||||||
|
LOG.i(f"Debugging event {event_id}")
|
||||||
|
try:
|
||||||
|
event_id_int = int(event_id)
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError(f"Invalid event id: {event_id}")
|
||||||
|
event_debugger.debug_event(event_id_int)
|
||||||
|
|
||||||
|
|
||||||
|
def run_event(event_id: str, delete_on_success: bool):
|
||||||
|
LOG.i(f"Running event {event_id}")
|
||||||
|
try:
|
||||||
|
event_id_int = int(event_id)
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError(f"Invalid event id: {event_id}")
|
||||||
|
event_debugger.run_event(event_id_int, delete_on_success)
|
||||||
|
|
||||||
|
|
||||||
def args():
|
def args():
|
||||||
parser = argparse.ArgumentParser(description="Run event listener")
|
parser = argparse.ArgumentParser(description="Run event listener")
|
||||||
parser.add_argument(
|
subparsers = parser.add_subparsers(dest="command")
|
||||||
"mode",
|
|
||||||
help="Mode to run",
|
listener_parser = subparsers.add_parser(Mode.LISTENER.value)
|
||||||
choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value],
|
listener_parser.add_argument(
|
||||||
|
"--max-retries", type=int, default=_DEFAULT_MAX_RETRIES
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
listener_parser.add_argument("--dry-run", action="store_true")
|
||||||
"max_retries",
|
|
||||||
help="Max retries to consider an event as error and not try to process it again",
|
dead_letter_parser = subparsers.add_parser(Mode.DEAD_LETTER.value)
|
||||||
type=int,
|
dead_letter_parser.add_argument(
|
||||||
nargs="?",
|
"--max-retries", type=int, default=_DEFAULT_MAX_RETRIES
|
||||||
default=_DEFAULT_MAX_RETRIES,
|
|
||||||
)
|
)
|
||||||
parser.add_argument("--dry-run", help="Dry run mode", action="store_true")
|
dead_letter_parser.add_argument("--dry-run", action="store_true")
|
||||||
|
|
||||||
|
debug_parser = subparsers.add_parser("debug")
|
||||||
|
debug_parser.add_argument("event_id", help="ID of the event to debug")
|
||||||
|
|
||||||
|
run_parser = subparsers.add_parser("run")
|
||||||
|
run_parser.add_argument("event_id", help="ID of the event to run")
|
||||||
|
run_parser.add_argument("--delete-on-success", action="store_true")
|
||||||
|
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if len(argv) < 2:
|
if len(argv) < 2:
|
||||||
print("Invalid usage. Pass 'listener' or 'dead_letter' as argument")
|
print("Invalid usage. Pass a valid subcommand as argument")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
args = args()
|
args = args()
|
||||||
|
|
||||||
|
if args.command in [Mode.LISTENER.value, Mode.DEAD_LETTER.value]:
|
||||||
main(
|
main(
|
||||||
mode=Mode.from_str(args.mode),
|
mode=Mode.from_str(args.command),
|
||||||
dry_run=args.dry_run,
|
dry_run=args.dry_run,
|
||||||
max_retries=args.max_retries,
|
max_retries=args.max_retries,
|
||||||
)
|
)
|
||||||
|
elif args.command == "debug":
|
||||||
|
debug_event(args.event_id)
|
||||||
|
elif args.command == "run":
|
||||||
|
run_event(args.event_id, args.delete_on_success)
|
||||||
|
else:
|
||||||
|
print("Invalid command")
|
||||||
|
exit(1)
|
||||||
|
43
app/events/event_debugger.py
Normal file
43
app/events/event_debugger.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
from app.events.generated import event_pb2
|
||||||
|
from app.models import SyncEvent
|
||||||
|
from events.event_sink import HttpEventSink
|
||||||
|
|
||||||
|
|
||||||
|
def debug_event(event_id: int):
|
||||||
|
event = SyncEvent.get_by(id=event_id)
|
||||||
|
if not event:
|
||||||
|
print("Event not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Info for event {event_id}")
|
||||||
|
print(f"- Created at: {event.created_at}")
|
||||||
|
print(f"- Updated at: {event.updated_at}")
|
||||||
|
print(f"- Taken time: {event.taken_time}")
|
||||||
|
print(f"- Retry count: {event.retry_count}")
|
||||||
|
|
||||||
|
print()
|
||||||
|
print("Event contents")
|
||||||
|
event_contents = event.content
|
||||||
|
parsed = event_pb2.Event.FromString(event_contents)
|
||||||
|
|
||||||
|
print(f"- UserID: {parsed.user_id}")
|
||||||
|
print(f"- ExternalUserID: {parsed.external_user_id}")
|
||||||
|
print(f"- PartnerID: {parsed.partner_id}")
|
||||||
|
|
||||||
|
content = parsed.content
|
||||||
|
print(f"Content: {content}")
|
||||||
|
|
||||||
|
|
||||||
|
def run_event(event_id: int, delete_on_success: bool = True):
|
||||||
|
event = SyncEvent.get_by(id=event_id)
|
||||||
|
if not event:
|
||||||
|
print("Event not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Processing event {event_id}")
|
||||||
|
sink = HttpEventSink()
|
||||||
|
res = sink.process(event)
|
||||||
|
if res:
|
||||||
|
print(f"Processed event {event_id}")
|
||||||
|
if delete_on_success:
|
||||||
|
SyncEvent.delete(event_id, commit=True)
|
@ -46,6 +46,7 @@ class PostgresEventSource(EventSource):
|
|||||||
cursor = self.__connection.cursor()
|
cursor = self.__connection.cursor()
|
||||||
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
|
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
|
||||||
|
|
||||||
|
LOG.info("Starting to listen to events")
|
||||||
while True:
|
while True:
|
||||||
if select.select([self.__connection], [], [], 5) != ([], [], []):
|
if select.select([self.__connection], [], [], 5) != ([], [], []):
|
||||||
self.__connection.poll()
|
self.__connection.poll()
|
||||||
|
@ -14,6 +14,7 @@ from app.email_utils import (
|
|||||||
send_email,
|
send_email,
|
||||||
render,
|
render,
|
||||||
)
|
)
|
||||||
|
from app.events.event_dispatcher import PostgresDispatcher
|
||||||
from app.import_utils import handle_batch_import
|
from app.import_utils import handle_batch_import
|
||||||
from app.jobs.event_jobs import send_alias_creation_events_for_user
|
from app.jobs.event_jobs import send_alias_creation_events_for_user
|
||||||
from app.jobs.export_user_data_job import ExportUserDataJob
|
from app.jobs.export_user_data_job import ExportUserDataJob
|
||||||
@ -276,7 +277,9 @@ SimpleLogin team.
|
|||||||
user = User.get(user_id)
|
user = User.get(user_id)
|
||||||
if user and user.activated:
|
if user and user.activated:
|
||||||
LOG.d(f"Sending alias creation events for {user}")
|
LOG.d(f"Sending alias creation events for {user}")
|
||||||
send_alias_creation_events_for_user(user)
|
send_alias_creation_events_for_user(
|
||||||
|
user, dispatcher=PostgresDispatcher.get()
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
LOG.e("Unknown job name %s", job.name)
|
LOG.e("Unknown job name %s", job.name)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user