4.44.3
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 2m53s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m28s
Build-Release-Image / Merge-Images (push) Successful in 26s
Build-Release-Image / Create-Release (push) Successful in 10s
Build-Release-Image / Notify (push) Successful in 3s
All checks were successful
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 2m53s
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m28s
Build-Release-Image / Merge-Images (push) Successful in 26s
Build-Release-Image / Create-Release (push) Successful in 10s
Build-Release-Image / Notify (push) Successful in 3s
This commit is contained in:
parent
b0a170dcb4
commit
f447611d6f
@ -583,3 +583,7 @@ UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)
|
||||
STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ
|
||||
|
||||
EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
|
||||
|
||||
# We want it disabled by default, so only skip if defined
|
||||
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
|
||||
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ
|
||||
|
@ -34,6 +34,9 @@ class EventDispatcher:
|
||||
dispatcher: Dispatcher = PostgresDispatcher.get(),
|
||||
skip_if_webhook_missing: bool = True,
|
||||
):
|
||||
if config.EVENT_WEBHOOK_DISABLE:
|
||||
return
|
||||
|
||||
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
|
||||
return
|
||||
|
||||
|
@ -3699,7 +3699,10 @@ class SyncEvent(Base, ModelMixin):
|
||||
AND taken_time IS NULL
|
||||
"""
|
||||
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}
|
||||
|
||||
res = Session.execute(sql, args)
|
||||
Session.commit()
|
||||
|
||||
return res.rowcount > 0
|
||||
|
||||
@classmethod
|
||||
|
@ -3,9 +3,10 @@ from enum import Enum
|
||||
from sys import argv, exit
|
||||
|
||||
from app.config import DB_URI
|
||||
from app.log import LOG
|
||||
from events.runner import Runner
|
||||
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
||||
from events.event_sink import ConsoleEventSink
|
||||
from events.event_sink import ConsoleEventSink, HttpEventSink
|
||||
|
||||
|
||||
class Mode(Enum):
|
||||
@ -24,16 +25,20 @@ class Mode(Enum):
|
||||
|
||||
def main(mode: Mode, dry_run: bool):
|
||||
if mode == Mode.DEAD_LETTER:
|
||||
LOG.i("Using DeadLetterEventSource")
|
||||
source = DeadLetterEventSource()
|
||||
elif mode == Mode.LISTENER:
|
||||
LOG.i("Using PostgresEventSource")
|
||||
source = PostgresEventSource(DB_URI)
|
||||
else:
|
||||
raise ValueError(f"Invalid mode: {mode}")
|
||||
|
||||
if dry_run:
|
||||
LOG.i("Starting with ConsoleEventSink")
|
||||
sink = ConsoleEventSink()
|
||||
else:
|
||||
sink = ConsoleEventSink()
|
||||
LOG.i("Starting with HttpEventSink")
|
||||
sink = HttpEventSink()
|
||||
|
||||
runner = Runner(source=source, sink=sink)
|
||||
runner.run()
|
||||
|
@ -1,19 +1,42 @@
|
||||
import requests
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL
|
||||
from app.log import LOG
|
||||
from app.models import SyncEvent
|
||||
|
||||
|
||||
class EventSink(ABC):
|
||||
@abstractmethod
|
||||
def process(self, event: SyncEvent):
|
||||
def process(self, event: SyncEvent) -> bool:
|
||||
pass
|
||||
|
||||
|
||||
class HttpEventSink(EventSink):
|
||||
def process(self, event: SyncEvent):
|
||||
pass
|
||||
def process(self, event: SyncEvent) -> bool:
|
||||
if not EVENT_WEBHOOK:
|
||||
LOG.warning("Skipping sending event because there is no webhook configured")
|
||||
return False
|
||||
|
||||
LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}")
|
||||
|
||||
res = requests.post(
|
||||
url=EVENT_WEBHOOK,
|
||||
data=event.content,
|
||||
headers={"Content-Type": "application/x-protobuf"},
|
||||
verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL,
|
||||
)
|
||||
if res.status_code != 200:
|
||||
LOG.warning(
|
||||
f"Failed to send event to webhook: {res.status_code} {res.text}"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
LOG.info(f"Event {event.id} sent successfully to webhook")
|
||||
return True
|
||||
|
||||
|
||||
class ConsoleEventSink(EventSink):
|
||||
def process(self, event: SyncEvent):
|
||||
def process(self, event: SyncEvent) -> bool:
|
||||
LOG.info(f"Handling event {event.id}")
|
||||
return True
|
||||
|
@ -13,6 +13,8 @@ from typing import Callable, NoReturn
|
||||
_DEAD_LETTER_THRESHOLD_MINUTES = 10
|
||||
_DEAD_LETTER_INTERVAL_SECONDS = 30
|
||||
|
||||
_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5
|
||||
|
||||
|
||||
class EventSource(ABC):
|
||||
@abstractmethod
|
||||
@ -22,9 +24,19 @@ class EventSource(ABC):
|
||||
|
||||
class PostgresEventSource(EventSource):
|
||||
def __init__(self, connection_string: str):
|
||||
self.__connection = psycopg2.connect(connection_string)
|
||||
self.__connection_string = connection_string
|
||||
self.__connect()
|
||||
|
||||
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
||||
while True:
|
||||
try:
|
||||
self.__listen(on_event)
|
||||
except Exception as e:
|
||||
LOG.warn(f"Error listening to events: {e}")
|
||||
sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS)
|
||||
self.__connect()
|
||||
|
||||
def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
|
||||
self.__connection.set_isolation_level(
|
||||
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
|
||||
)
|
||||
@ -44,12 +56,24 @@ class PostgresEventSource(EventSource):
|
||||
webhook_id = int(notify.payload)
|
||||
event = SyncEvent.get_by(id=webhook_id)
|
||||
if event is not None:
|
||||
on_event(event)
|
||||
if event.mark_as_taken():
|
||||
on_event(event)
|
||||
else:
|
||||
LOG.info(
|
||||
f"Event {event.id} was handled by another runner"
|
||||
)
|
||||
else:
|
||||
LOG.info(f"Could not find event with id={notify.payload}")
|
||||
except Exception as e:
|
||||
LOG.warn(f"Error getting event: {e}")
|
||||
|
||||
def __connect(self):
|
||||
self.__connection = psycopg2.connect(self.__connection_string)
|
||||
|
||||
from app.db import Session
|
||||
|
||||
Session.close()
|
||||
|
||||
|
||||
class DeadLetterEventSource(EventSource):
|
||||
@newrelic.agent.background_task()
|
||||
@ -73,3 +97,4 @@ class DeadLetterEventSource(EventSource):
|
||||
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
||||
except Exception as e:
|
||||
LOG.warn(f"Error getting dead letter event: {e}")
|
||||
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
||||
|
@ -18,11 +18,10 @@ class Runner:
|
||||
@newrelic.agent.background_task()
|
||||
def __on_event(self, event: SyncEvent):
|
||||
try:
|
||||
can_process = event.mark_as_taken()
|
||||
if can_process:
|
||||
event_created_at = event.created_at
|
||||
start_time = arrow.now()
|
||||
self.__sink.process(event)
|
||||
event_created_at = event.created_at
|
||||
start_time = arrow.now()
|
||||
success = self.__sink.process(event)
|
||||
if success:
|
||||
event_id = event.id
|
||||
SyncEvent.delete(event.id, commit=True)
|
||||
LOG.info(f"Marked {event_id} as done")
|
||||
@ -38,8 +37,6 @@ class Runner:
|
||||
"Custom/sync_event_elapsed_time",
|
||||
time_between_taken_and_created.total_seconds(),
|
||||
)
|
||||
else:
|
||||
LOG.info(f"{event.id} was handled by another runner")
|
||||
except Exception as e:
|
||||
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
|
||||
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)
|
||||
|
@ -4,6 +4,7 @@ import subprocess
|
||||
from time import sleep
|
||||
from typing import List, Dict
|
||||
|
||||
import arrow
|
||||
import newrelic.agent
|
||||
|
||||
from app.db import Session
|
||||
@ -93,11 +94,44 @@ def log_nb_db_connection():
|
||||
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)
|
||||
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def log_pending_to_process_events():
|
||||
r = Session.execute("select count(*) from sync_event WHERE taken_time IS NULL;")
|
||||
events_pending = list(r)[0][0]
|
||||
|
||||
LOG.d("number of events pending to process %s", events_pending)
|
||||
newrelic.agent.record_custom_metric(
|
||||
"Custom/sync_events_pending_to_process", events_pending
|
||||
)
|
||||
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def log_events_pending_dead_letter():
|
||||
since = arrow.now().shift(minutes=-10).datetime
|
||||
r = Session.execute(
|
||||
"""
|
||||
SELECT COUNT(*)
|
||||
FROM sync_event
|
||||
WHERE (taken_time IS NOT NULL AND taken_time < :since)
|
||||
OR (taken_time IS NULL AND created_at < :since)
|
||||
""",
|
||||
{"since": since},
|
||||
)
|
||||
events_pending = list(r)[0][0]
|
||||
|
||||
LOG.d("number of events pending dead letter %s", events_pending)
|
||||
newrelic.agent.record_custom_metric(
|
||||
"Custom/sync_events_pending_dead_letter", events_pending
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exporter = MetricExporter(get_newrelic_license())
|
||||
while True:
|
||||
log_postfix_metrics()
|
||||
log_nb_db_connection()
|
||||
log_pending_to_process_events()
|
||||
log_events_pending_dead_letter()
|
||||
Session.close()
|
||||
|
||||
exporter.run()
|
||||
|
Loading…
x
Reference in New Issue
Block a user