4.47.2
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m33s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m39s
Build-Release-Image / Merge-Images (push) Successful in 12s
Build-Release-Image / Create-Release (push) Successful in 8s
Build-Release-Image / Notify (push) Successful in 5s
All checks were successful
Build-Release-Image / Build-Image (linux/arm64) (push) Successful in 3m33s
Build-Release-Image / Build-Image (linux/amd64) (push) Successful in 3m39s
Build-Release-Image / Merge-Images (push) Successful in 12s
Build-Release-Image / Create-Release (push) Successful in 8s
Build-Release-Image / Notify (push) Successful in 5s
This commit is contained in:
@ -4,6 +4,8 @@ import psycopg2
|
||||
import select
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from app.db import Session
|
||||
from app.log import LOG
|
||||
from app.models import SyncEvent
|
||||
from app.events.event_dispatcher import NOTIFICATION_CHANNEL
|
||||
@ -66,6 +68,7 @@ class PostgresEventSource(EventSource):
|
||||
LOG.info(f"Could not find event with id={notify.payload}")
|
||||
except Exception as e:
|
||||
LOG.warn(f"Error getting event: {e}")
|
||||
Session.close() # Ensure we get a new connection and we don't leave a dangling tx
|
||||
|
||||
def __connect(self):
|
||||
self.__connection = psycopg2.connect(self.__connection_string)
|
||||
@ -76,6 +79,9 @@ class PostgresEventSource(EventSource):
|
||||
|
||||
|
||||
class DeadLetterEventSource(EventSource):
|
||||
def __init__(self, max_retries: int):
|
||||
self.__max_retries = max_retries
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
||||
while True:
|
||||
@ -83,7 +89,9 @@ class DeadLetterEventSource(EventSource):
|
||||
threshold = arrow.utcnow().shift(
|
||||
minutes=-_DEAD_LETTER_THRESHOLD_MINUTES
|
||||
)
|
||||
events = SyncEvent.get_dead_letter(older_than=threshold)
|
||||
events = SyncEvent.get_dead_letter(
|
||||
older_than=threshold, max_retries=self.__max_retries
|
||||
)
|
||||
if events:
|
||||
LOG.info(f"Got {len(events)} dead letter events")
|
||||
if events:
|
||||
@ -92,7 +100,8 @@ class DeadLetterEventSource(EventSource):
|
||||
)
|
||||
for event in events:
|
||||
on_event(event)
|
||||
else:
|
||||
Session.close() # Ensure that we have a new connection and we don't have a dangling tx with a lock
|
||||
if not events:
|
||||
LOG.debug("No dead letter events")
|
||||
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
||||
except Exception as e:
|
||||
|
@ -2,6 +2,7 @@ import arrow
|
||||
import newrelic.agent
|
||||
|
||||
from app.log import LOG
|
||||
from app.db import Session
|
||||
from app.models import SyncEvent
|
||||
from events.event_sink import EventSink
|
||||
from events.event_source import EventSource
|
||||
@ -37,6 +38,9 @@ class Runner:
|
||||
"Custom/sync_event_elapsed_time",
|
||||
time_between_taken_and_created.total_seconds(),
|
||||
)
|
||||
else:
|
||||
event.retry_count = event.retry_count + 1
|
||||
Session.commit()
|
||||
except Exception as e:
|
||||
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
|
||||
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)
|
||||
|
Reference in New Issue
Block a user