4.49.3
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m11s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m17s
Build-Release-Image / Merge-Images (push) Successful in 13s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 4s
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m11s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m17s
Build-Release-Image / Merge-Images (push) Successful in 13s
Build-Release-Image / Create-Release (push) Successful in 9s
Build-Release-Image / Notify (push) Successful in 4s
This commit is contained in:
parent
41a5a65f51
commit
39fcf2e48f
@ -4,6 +4,7 @@ from sys import argv, exit
|
||||
|
||||
from app.config import EVENT_LISTENER_DB_URI
|
||||
from app.log import LOG
|
||||
from events import event_debugger
|
||||
from events.runner import Runner
|
||||
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
||||
from events.event_sink import ConsoleEventSink, HttpEventSink
|
||||
@ -46,32 +47,67 @@ def main(mode: Mode, dry_run: bool, max_retries: int):
|
||||
runner.run()
|
||||
|
||||
|
||||
def debug_event(event_id: str):
|
||||
LOG.i(f"Debugging event {event_id}")
|
||||
try:
|
||||
event_id_int = int(event_id)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid event id: {event_id}")
|
||||
event_debugger.debug_event(event_id_int)
|
||||
|
||||
|
||||
def run_event(event_id: str, delete_on_success: bool):
|
||||
LOG.i(f"Running event {event_id}")
|
||||
try:
|
||||
event_id_int = int(event_id)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid event id: {event_id}")
|
||||
event_debugger.run_event(event_id_int, delete_on_success)
|
||||
|
||||
|
||||
def args():
|
||||
parser = argparse.ArgumentParser(description="Run event listener")
|
||||
parser.add_argument(
|
||||
"mode",
|
||||
help="Mode to run",
|
||||
choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value],
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
listener_parser = subparsers.add_parser(Mode.LISTENER.value)
|
||||
listener_parser.add_argument(
|
||||
"--max-retries", type=int, default=_DEFAULT_MAX_RETRIES
|
||||
)
|
||||
parser.add_argument(
|
||||
"max_retries",
|
||||
help="Max retries to consider an event as error and not try to process it again",
|
||||
type=int,
|
||||
nargs="?",
|
||||
default=_DEFAULT_MAX_RETRIES,
|
||||
listener_parser.add_argument("--dry-run", action="store_true")
|
||||
|
||||
dead_letter_parser = subparsers.add_parser(Mode.DEAD_LETTER.value)
|
||||
dead_letter_parser.add_argument(
|
||||
"--max-retries", type=int, default=_DEFAULT_MAX_RETRIES
|
||||
)
|
||||
parser.add_argument("--dry-run", help="Dry run mode", action="store_true")
|
||||
dead_letter_parser.add_argument("--dry-run", action="store_true")
|
||||
|
||||
debug_parser = subparsers.add_parser("debug")
|
||||
debug_parser.add_argument("event_id", help="ID of the event to debug")
|
||||
|
||||
run_parser = subparsers.add_parser("run")
|
||||
run_parser.add_argument("event_id", help="ID of the event to run")
|
||||
run_parser.add_argument("--delete-on-success", action="store_true")
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(argv) < 2:
|
||||
print("Invalid usage. Pass 'listener' or 'dead_letter' as argument")
|
||||
print("Invalid usage. Pass a valid subcommand as argument")
|
||||
exit(1)
|
||||
|
||||
args = args()
|
||||
main(
|
||||
mode=Mode.from_str(args.mode),
|
||||
dry_run=args.dry_run,
|
||||
max_retries=args.max_retries,
|
||||
)
|
||||
|
||||
if args.command in [Mode.LISTENER.value, Mode.DEAD_LETTER.value]:
|
||||
main(
|
||||
mode=Mode.from_str(args.command),
|
||||
dry_run=args.dry_run,
|
||||
max_retries=args.max_retries,
|
||||
)
|
||||
elif args.command == "debug":
|
||||
debug_event(args.event_id)
|
||||
elif args.command == "run":
|
||||
run_event(args.event_id, args.delete_on_success)
|
||||
else:
|
||||
print("Invalid command")
|
||||
exit(1)
|
||||
|
43
app/events/event_debugger.py
Normal file
43
app/events/event_debugger.py
Normal file
@ -0,0 +1,43 @@
|
||||
from app.events.generated import event_pb2
|
||||
from app.models import SyncEvent
|
||||
from events.event_sink import HttpEventSink
|
||||
|
||||
|
||||
def debug_event(event_id: int):
|
||||
event = SyncEvent.get_by(id=event_id)
|
||||
if not event:
|
||||
print("Event not found")
|
||||
return
|
||||
|
||||
print(f"Info for event {event_id}")
|
||||
print(f"- Created at: {event.created_at}")
|
||||
print(f"- Updated at: {event.updated_at}")
|
||||
print(f"- Taken time: {event.taken_time}")
|
||||
print(f"- Retry count: {event.retry_count}")
|
||||
|
||||
print()
|
||||
print("Event contents")
|
||||
event_contents = event.content
|
||||
parsed = event_pb2.Event.FromString(event_contents)
|
||||
|
||||
print(f"- UserID: {parsed.user_id}")
|
||||
print(f"- ExternalUserID: {parsed.external_user_id}")
|
||||
print(f"- PartnerID: {parsed.partner_id}")
|
||||
|
||||
content = parsed.content
|
||||
print(f"Content: {content}")
|
||||
|
||||
|
||||
def run_event(event_id: int, delete_on_success: bool = True):
|
||||
event = SyncEvent.get_by(id=event_id)
|
||||
if not event:
|
||||
print("Event not found")
|
||||
return
|
||||
|
||||
print(f"Processing event {event_id}")
|
||||
sink = HttpEventSink()
|
||||
res = sink.process(event)
|
||||
if res:
|
||||
print(f"Processed event {event_id}")
|
||||
if delete_on_success:
|
||||
SyncEvent.delete(event_id, commit=True)
|
@ -14,6 +14,7 @@ from app.email_utils import (
|
||||
send_email,
|
||||
render,
|
||||
)
|
||||
from app.events.event_dispatcher import PostgresDispatcher
|
||||
from app.import_utils import handle_batch_import
|
||||
from app.jobs.event_jobs import send_alias_creation_events_for_user
|
||||
from app.jobs.export_user_data_job import ExportUserDataJob
|
||||
@ -276,7 +277,9 @@ SimpleLogin team.
|
||||
user = User.get(user_id)
|
||||
if user and user.activated:
|
||||
LOG.d(f"Sending alias creation events for {user}")
|
||||
send_alias_creation_events_for_user(user)
|
||||
send_alias_creation_events_for_user(
|
||||
user, dispatcher=PostgresDispatcher.get()
|
||||
)
|
||||
else:
|
||||
LOG.e("Unknown job name %s", job.name)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user