diff --git a/app/app/config.py b/app/app/config.py index 2af562b..325b475 100644 --- a/app/app/config.py +++ b/app/app/config.py @@ -583,3 +583,7 @@ UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None) STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None) + +# We want it disabled by default, so only skip if defined +EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ +EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ diff --git a/app/app/events/event_dispatcher.py b/app/app/events/event_dispatcher.py index c62bf42..af30d33 100644 --- a/app/app/events/event_dispatcher.py +++ b/app/app/events/event_dispatcher.py @@ -34,6 +34,9 @@ class EventDispatcher: dispatcher: Dispatcher = PostgresDispatcher.get(), skip_if_webhook_missing: bool = True, ): + if config.EVENT_WEBHOOK_DISABLE: + return + if not config.EVENT_WEBHOOK and skip_if_webhook_missing: return diff --git a/app/app/models.py b/app/app/models.py index 9783312..fe5d557 100644 --- a/app/app/models.py +++ b/app/app/models.py @@ -3699,7 +3699,10 @@ class SyncEvent(Base, ModelMixin): AND taken_time IS NULL """ args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id} + res = Session.execute(sql, args) + Session.commit() + return res.rowcount > 0 @classmethod diff --git a/app/event_listener.py b/app/event_listener.py index 440f106..93e23ed 100644 --- a/app/event_listener.py +++ b/app/event_listener.py @@ -3,9 +3,10 @@ from enum import Enum from sys import argv, exit from app.config import DB_URI +from app.log import LOG from events.runner import Runner from events.event_source import DeadLetterEventSource, PostgresEventSource -from events.event_sink import ConsoleEventSink +from events.event_sink import ConsoleEventSink, HttpEventSink class Mode(Enum): @@ -24,16 +25,20 @@ class Mode(Enum): def main(mode: Mode, dry_run: bool): if mode == Mode.DEAD_LETTER: + LOG.i("Using DeadLetterEventSource") source = DeadLetterEventSource() elif mode == Mode.LISTENER: + LOG.i("Using PostgresEventSource") source = PostgresEventSource(DB_URI) else: raise ValueError(f"Invalid mode: {mode}") if dry_run: + LOG.i("Starting with ConsoleEventSink") sink = ConsoleEventSink() else: - sink = ConsoleEventSink() + LOG.i("Starting with HttpEventSink") + sink = HttpEventSink() runner = Runner(source=source, sink=sink) runner.run() diff --git a/app/events/event_sink.py b/app/events/event_sink.py index ce5ae98..bc00d1f 100644 --- a/app/events/event_sink.py +++ b/app/events/event_sink.py @@ -1,19 +1,42 @@ +import requests + from abc import ABC, abstractmethod +from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL from app.log import LOG from app.models import SyncEvent class EventSink(ABC): @abstractmethod - def process(self, event: SyncEvent): + def process(self, event: SyncEvent) -> bool: pass class HttpEventSink(EventSink): - def process(self, event: SyncEvent): - pass + def process(self, event: SyncEvent) -> bool: + if not EVENT_WEBHOOK: + LOG.warning("Skipping sending event because there is no webhook configured") + return False + + LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}") + + res = requests.post( + url=EVENT_WEBHOOK, + data=event.content, + headers={"Content-Type": "application/x-protobuf"}, + verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL, + ) + if res.status_code != 200: + LOG.warning( + f"Failed to send event to webhook: {res.status_code} {res.text}" + ) + return False + else: + LOG.info(f"Event {event.id} sent successfully to webhook") + return True class ConsoleEventSink(EventSink): - def process(self, event: SyncEvent): + def process(self, event: SyncEvent) -> bool: LOG.info(f"Handling event {event.id}") + return True diff --git a/app/events/event_source.py b/app/events/event_source.py index f23ea3f..f4f8937 100644 --- a/app/events/event_source.py +++ b/app/events/event_source.py @@ -13,6 +13,8 @@ from typing import Callable, NoReturn _DEAD_LETTER_THRESHOLD_MINUTES = 10 _DEAD_LETTER_INTERVAL_SECONDS = 30 +_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5 + class EventSource(ABC): @abstractmethod @@ -22,9 +24,19 @@ class EventSource(ABC): class PostgresEventSource(EventSource): def __init__(self, connection_string: str): - self.__connection = psycopg2.connect(connection_string) + self.__connection_string = connection_string + self.__connect() def run(self, on_event: Callable[[SyncEvent], NoReturn]): + while True: + try: + self.__listen(on_event) + except Exception as e: + LOG.warn(f"Error listening to events: {e}") + sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS) + self.__connect() + + def __listen(self, on_event: Callable[[SyncEvent], NoReturn]): self.__connection.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT ) @@ -44,12 +56,24 @@ class PostgresEventSource(EventSource): webhook_id = int(notify.payload) event = SyncEvent.get_by(id=webhook_id) if event is not None: - on_event(event) + if event.mark_as_taken(): + on_event(event) + else: + LOG.info( + f"Event {event.id} was handled by another runner" + ) else: LOG.info(f"Could not find event with id={notify.payload}") except Exception as e: LOG.warn(f"Error getting event: {e}") + def __connect(self): + self.__connection = psycopg2.connect(self.__connection_string) + + from app.db import Session + + Session.close() + class DeadLetterEventSource(EventSource): @newrelic.agent.background_task() @@ -73,3 +97,4 @@ class DeadLetterEventSource(EventSource): sleep(_DEAD_LETTER_INTERVAL_SECONDS) except Exception as e: LOG.warn(f"Error getting dead letter event: {e}") + sleep(_DEAD_LETTER_INTERVAL_SECONDS) diff --git a/app/events/runner.py b/app/events/runner.py index 7d0b4b7..d6f9c2e 100644 --- a/app/events/runner.py +++ b/app/events/runner.py @@ -18,11 +18,10 @@ class Runner: @newrelic.agent.background_task() def __on_event(self, event: SyncEvent): try: - can_process = event.mark_as_taken() - if can_process: - event_created_at = event.created_at - start_time = arrow.now() - self.__sink.process(event) + event_created_at = event.created_at + start_time = arrow.now() + success = self.__sink.process(event) + if success: event_id = event.id SyncEvent.delete(event.id, commit=True) LOG.info(f"Marked {event_id} as done") @@ -38,8 +37,6 @@ class Runner: "Custom/sync_event_elapsed_time", time_between_taken_and_created.total_seconds(), ) - else: - LOG.info(f"{event.id} was handled by another runner") except Exception as e: LOG.warn(f"Exception processing event [id={event.id}]: {e}") newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1) diff --git a/app/monitoring.py b/app/monitoring.py index 29c88f1..5234b96 100644 --- a/app/monitoring.py +++ b/app/monitoring.py @@ -4,6 +4,7 @@ import subprocess from time import sleep from typing import List, Dict +import arrow import newrelic.agent from app.db import Session @@ -93,11 +94,44 @@ def log_nb_db_connection(): newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection) +@newrelic.agent.background_task() +def log_pending_to_process_events(): + r = Session.execute("select count(*) from sync_event WHERE taken_time IS NULL;") + events_pending = list(r)[0][0] + + LOG.d("number of events pending to process %s", events_pending) + newrelic.agent.record_custom_metric( + "Custom/sync_events_pending_to_process", events_pending + ) + + +@newrelic.agent.background_task() +def log_events_pending_dead_letter(): + since = arrow.now().shift(minutes=-10).datetime + r = Session.execute( + """ + SELECT COUNT(*) + FROM sync_event + WHERE (taken_time IS NOT NULL AND taken_time < :since) + OR (taken_time IS NULL AND created_at < :since) + """, + {"since": since}, + ) + events_pending = list(r)[0][0] + + LOG.d("number of events pending dead letter %s", events_pending) + newrelic.agent.record_custom_metric( + "Custom/sync_events_pending_dead_letter", events_pending + ) + + if __name__ == "__main__": exporter = MetricExporter(get_newrelic_license()) while True: log_postfix_metrics() log_nb_db_connection() + log_pending_to_process_events() + log_events_pending_dead_letter() Session.close() exporter.run()