Compare commits
71 Commits
f447611d6f
...
4.44.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ea33ca5f8 | |||
| 4e178ad676 | |||
| 24ba25ab6a | |||
| 78184eeae4 | |||
| c111fbe8e1 | |||
| d5981588e4 | |||
| 6af1c2ccf4 | |||
| 76664f6e4c | |||
| f7125618c4 | |||
| 050cef0e4e | |||
| 0d557ef875 | |||
| 6e56ea4489 | |||
| def0de643b | |||
| 9e7cb2c7dd | |||
| f1110506c0 | |||
| f5bce7d7ff | |||
| 75f45d9365 | |||
| ead425e0c2 | |||
| 6c910d62c5 | |||
| 99ffd1ec0c | |||
| eda940f8b2 | |||
| 1dad582523 | |||
| e516266a27 | |||
| 850fc95477 | |||
| d172825900 | |||
| 026865e5bf | |||
| add94ef2a2 | |||
| 1081400948 | |||
| 5776128905 | |||
| d661860f4c | |||
| 0a52e32972 | |||
| 703dcbd0eb | |||
| ce7ed69547 | |||
| 4f5564df16 | |||
| 2fee569131 | |||
| 7ea45d6f5d | |||
| 6d24db50bd | |||
| 88f270c6a1 | |||
| 0962b1cf29 | |||
| 6051d72691 | |||
| c31a75a9ef | |||
| ef289385ff | |||
| 9b12a2ad33 | |||
| 8eb19d88f3 | |||
| e36e9d3077 | |||
| b2430cbc5b | |||
| 1258115397 | |||
| 38c134d903 | |||
| cd77e4cc2d | |||
| 87aedf3207 | |||
| 3523c9fc15 | |||
| a6f4995cb5 | |||
| 727f61a35e | |||
| ce5124605a | |||
| 2c82b03f8d | |||
| 1b7a6223ac | |||
| 75331c62a4 | |||
| 3f68a3e640 | |||
| 8ee4f9462e | |||
| 822855d584 | |||
| 1a6a7e079b | |||
| 5210cb6515 | |||
| b643f0644b | |||
| 5d093db4f6 | |||
| 0b16fcac67 | |||
| a0d294da53 | |||
| c3f755aede | |||
| 0aea62c222 | |||
| 92f4ad2237 | |||
| 20da343c54 | |||
| 02776e8478 |
@ -583,7 +583,3 @@ UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)
|
|||||||
STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ
|
STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ
|
||||||
|
|
||||||
EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
|
EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
|
||||||
|
|
||||||
# We want it disabled by default, so only skip if defined
|
|
||||||
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
|
|
||||||
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ
|
|
||||||
|
|||||||
@ -34,9 +34,6 @@ class EventDispatcher:
|
|||||||
dispatcher: Dispatcher = PostgresDispatcher.get(),
|
dispatcher: Dispatcher = PostgresDispatcher.get(),
|
||||||
skip_if_webhook_missing: bool = True,
|
skip_if_webhook_missing: bool = True,
|
||||||
):
|
):
|
||||||
if config.EVENT_WEBHOOK_DISABLE:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
|
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@ -3699,10 +3699,7 @@ class SyncEvent(Base, ModelMixin):
|
|||||||
AND taken_time IS NULL
|
AND taken_time IS NULL
|
||||||
"""
|
"""
|
||||||
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}
|
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}
|
||||||
|
|
||||||
res = Session.execute(sql, args)
|
res = Session.execute(sql, args)
|
||||||
Session.commit()
|
|
||||||
|
|
||||||
return res.rowcount > 0
|
return res.rowcount > 0
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@ -3,10 +3,9 @@ from enum import Enum
|
|||||||
from sys import argv, exit
|
from sys import argv, exit
|
||||||
|
|
||||||
from app.config import DB_URI
|
from app.config import DB_URI
|
||||||
from app.log import LOG
|
|
||||||
from events.runner import Runner
|
from events.runner import Runner
|
||||||
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
||||||
from events.event_sink import ConsoleEventSink, HttpEventSink
|
from events.event_sink import ConsoleEventSink
|
||||||
|
|
||||||
|
|
||||||
class Mode(Enum):
|
class Mode(Enum):
|
||||||
@ -25,20 +24,16 @@ class Mode(Enum):
|
|||||||
|
|
||||||
def main(mode: Mode, dry_run: bool):
|
def main(mode: Mode, dry_run: bool):
|
||||||
if mode == Mode.DEAD_LETTER:
|
if mode == Mode.DEAD_LETTER:
|
||||||
LOG.i("Using DeadLetterEventSource")
|
|
||||||
source = DeadLetterEventSource()
|
source = DeadLetterEventSource()
|
||||||
elif mode == Mode.LISTENER:
|
elif mode == Mode.LISTENER:
|
||||||
LOG.i("Using PostgresEventSource")
|
|
||||||
source = PostgresEventSource(DB_URI)
|
source = PostgresEventSource(DB_URI)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Invalid mode: {mode}")
|
raise ValueError(f"Invalid mode: {mode}")
|
||||||
|
|
||||||
if dry_run:
|
if dry_run:
|
||||||
LOG.i("Starting with ConsoleEventSink")
|
|
||||||
sink = ConsoleEventSink()
|
sink = ConsoleEventSink()
|
||||||
else:
|
else:
|
||||||
LOG.i("Starting with HttpEventSink")
|
sink = ConsoleEventSink()
|
||||||
sink = HttpEventSink()
|
|
||||||
|
|
||||||
runner = Runner(source=source, sink=sink)
|
runner = Runner(source=source, sink=sink)
|
||||||
runner.run()
|
runner.run()
|
||||||
|
|||||||
@ -1,42 +1,19 @@
|
|||||||
import requests
|
|
||||||
|
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL
|
|
||||||
from app.log import LOG
|
from app.log import LOG
|
||||||
from app.models import SyncEvent
|
from app.models import SyncEvent
|
||||||
|
|
||||||
|
|
||||||
class EventSink(ABC):
|
class EventSink(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def process(self, event: SyncEvent) -> bool:
|
def process(self, event: SyncEvent):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class HttpEventSink(EventSink):
|
class HttpEventSink(EventSink):
|
||||||
def process(self, event: SyncEvent) -> bool:
|
def process(self, event: SyncEvent):
|
||||||
if not EVENT_WEBHOOK:
|
pass
|
||||||
LOG.warning("Skipping sending event because there is no webhook configured")
|
|
||||||
return False
|
|
||||||
|
|
||||||
LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}")
|
|
||||||
|
|
||||||
res = requests.post(
|
|
||||||
url=EVENT_WEBHOOK,
|
|
||||||
data=event.content,
|
|
||||||
headers={"Content-Type": "application/x-protobuf"},
|
|
||||||
verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL,
|
|
||||||
)
|
|
||||||
if res.status_code != 200:
|
|
||||||
LOG.warning(
|
|
||||||
f"Failed to send event to webhook: {res.status_code} {res.text}"
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
LOG.info(f"Event {event.id} sent successfully to webhook")
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class ConsoleEventSink(EventSink):
|
class ConsoleEventSink(EventSink):
|
||||||
def process(self, event: SyncEvent) -> bool:
|
def process(self, event: SyncEvent):
|
||||||
LOG.info(f"Handling event {event.id}")
|
LOG.info(f"Handling event {event.id}")
|
||||||
return True
|
|
||||||
|
|||||||
@ -13,8 +13,6 @@ from typing import Callable, NoReturn
|
|||||||
_DEAD_LETTER_THRESHOLD_MINUTES = 10
|
_DEAD_LETTER_THRESHOLD_MINUTES = 10
|
||||||
_DEAD_LETTER_INTERVAL_SECONDS = 30
|
_DEAD_LETTER_INTERVAL_SECONDS = 30
|
||||||
|
|
||||||
_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5
|
|
||||||
|
|
||||||
|
|
||||||
class EventSource(ABC):
|
class EventSource(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
@ -24,19 +22,9 @@ class EventSource(ABC):
|
|||||||
|
|
||||||
class PostgresEventSource(EventSource):
|
class PostgresEventSource(EventSource):
|
||||||
def __init__(self, connection_string: str):
|
def __init__(self, connection_string: str):
|
||||||
self.__connection_string = connection_string
|
self.__connection = psycopg2.connect(connection_string)
|
||||||
self.__connect()
|
|
||||||
|
|
||||||
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
self.__listen(on_event)
|
|
||||||
except Exception as e:
|
|
||||||
LOG.warn(f"Error listening to events: {e}")
|
|
||||||
sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS)
|
|
||||||
self.__connect()
|
|
||||||
|
|
||||||
def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
|
|
||||||
self.__connection.set_isolation_level(
|
self.__connection.set_isolation_level(
|
||||||
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
|
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
|
||||||
)
|
)
|
||||||
@ -56,24 +44,12 @@ class PostgresEventSource(EventSource):
|
|||||||
webhook_id = int(notify.payload)
|
webhook_id = int(notify.payload)
|
||||||
event = SyncEvent.get_by(id=webhook_id)
|
event = SyncEvent.get_by(id=webhook_id)
|
||||||
if event is not None:
|
if event is not None:
|
||||||
if event.mark_as_taken():
|
on_event(event)
|
||||||
on_event(event)
|
|
||||||
else:
|
|
||||||
LOG.info(
|
|
||||||
f"Event {event.id} was handled by another runner"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
LOG.info(f"Could not find event with id={notify.payload}")
|
LOG.info(f"Could not find event with id={notify.payload}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.warn(f"Error getting event: {e}")
|
LOG.warn(f"Error getting event: {e}")
|
||||||
|
|
||||||
def __connect(self):
|
|
||||||
self.__connection = psycopg2.connect(self.__connection_string)
|
|
||||||
|
|
||||||
from app.db import Session
|
|
||||||
|
|
||||||
Session.close()
|
|
||||||
|
|
||||||
|
|
||||||
class DeadLetterEventSource(EventSource):
|
class DeadLetterEventSource(EventSource):
|
||||||
@newrelic.agent.background_task()
|
@newrelic.agent.background_task()
|
||||||
@ -97,4 +73,3 @@ class DeadLetterEventSource(EventSource):
|
|||||||
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.warn(f"Error getting dead letter event: {e}")
|
LOG.warn(f"Error getting dead letter event: {e}")
|
||||||
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
|
||||||
|
|||||||
@ -18,10 +18,11 @@ class Runner:
|
|||||||
@newrelic.agent.background_task()
|
@newrelic.agent.background_task()
|
||||||
def __on_event(self, event: SyncEvent):
|
def __on_event(self, event: SyncEvent):
|
||||||
try:
|
try:
|
||||||
event_created_at = event.created_at
|
can_process = event.mark_as_taken()
|
||||||
start_time = arrow.now()
|
if can_process:
|
||||||
success = self.__sink.process(event)
|
event_created_at = event.created_at
|
||||||
if success:
|
start_time = arrow.now()
|
||||||
|
self.__sink.process(event)
|
||||||
event_id = event.id
|
event_id = event.id
|
||||||
SyncEvent.delete(event.id, commit=True)
|
SyncEvent.delete(event.id, commit=True)
|
||||||
LOG.info(f"Marked {event_id} as done")
|
LOG.info(f"Marked {event_id} as done")
|
||||||
@ -37,6 +38,8 @@ class Runner:
|
|||||||
"Custom/sync_event_elapsed_time",
|
"Custom/sync_event_elapsed_time",
|
||||||
time_between_taken_and_created.total_seconds(),
|
time_between_taken_and_created.total_seconds(),
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
LOG.info(f"{event.id} was handled by another runner")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
|
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
|
||||||
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)
|
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)
|
||||||
|
|||||||
@ -4,7 +4,6 @@ import subprocess
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import List, Dict
|
from typing import List, Dict
|
||||||
|
|
||||||
import arrow
|
|
||||||
import newrelic.agent
|
import newrelic.agent
|
||||||
|
|
||||||
from app.db import Session
|
from app.db import Session
|
||||||
@ -94,44 +93,11 @@ def log_nb_db_connection():
|
|||||||
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)
|
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)
|
||||||
|
|
||||||
|
|
||||||
@newrelic.agent.background_task()
|
|
||||||
def log_pending_to_process_events():
|
|
||||||
r = Session.execute("select count(*) from sync_event WHERE taken_time IS NULL;")
|
|
||||||
events_pending = list(r)[0][0]
|
|
||||||
|
|
||||||
LOG.d("number of events pending to process %s", events_pending)
|
|
||||||
newrelic.agent.record_custom_metric(
|
|
||||||
"Custom/sync_events_pending_to_process", events_pending
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@newrelic.agent.background_task()
|
|
||||||
def log_events_pending_dead_letter():
|
|
||||||
since = arrow.now().shift(minutes=-10).datetime
|
|
||||||
r = Session.execute(
|
|
||||||
"""
|
|
||||||
SELECT COUNT(*)
|
|
||||||
FROM sync_event
|
|
||||||
WHERE (taken_time IS NOT NULL AND taken_time < :since)
|
|
||||||
OR (taken_time IS NULL AND created_at < :since)
|
|
||||||
""",
|
|
||||||
{"since": since},
|
|
||||||
)
|
|
||||||
events_pending = list(r)[0][0]
|
|
||||||
|
|
||||||
LOG.d("number of events pending dead letter %s", events_pending)
|
|
||||||
newrelic.agent.record_custom_metric(
|
|
||||||
"Custom/sync_events_pending_dead_letter", events_pending
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
exporter = MetricExporter(get_newrelic_license())
|
exporter = MetricExporter(get_newrelic_license())
|
||||||
while True:
|
while True:
|
||||||
log_postfix_metrics()
|
log_postfix_metrics()
|
||||||
log_nb_db_connection()
|
log_nb_db_connection()
|
||||||
log_pending_to_process_events()
|
|
||||||
log_events_pending_dead_letter()
|
|
||||||
Session.close()
|
Session.close()
|
||||||
|
|
||||||
exporter.run()
|
exporter.run()
|
||||||
|
|||||||
Reference in New Issue
Block a user