Compare commits

..

71 Commits

Author SHA1 Message Date
0ea33ca5f8 4.44.0 2024-05-23 12:00:07 +01:00
4e178ad676 4.43.0 2024-05-09 12:00:07 +01:00
24ba25ab6a 4.42.2 2024-04-10 17:23:11 +01:00
78184eeae4 4.42.1 2024-03-26 12:00:08 +00:00
c111fbe8e1 4.42.0 2024-03-19 12:00:09 +00:00
d5981588e4 4.41.2 2024-03-15 12:00:08 +00:00
6af1c2ccf4 Merge pull request 'Correct docker package name' (#2) from fix-package-name-in-gitea-actions into main
Reviewed-on: #2
2024-03-14 15:47:01 +00:00
76664f6e4c Correct docker package name 2024-03-14 15:46:44 +00:00
f7125618c4 4.41.0 2024-03-14 12:00:08 +00:00
050cef0e4e 4.40.2 2024-03-07 12:00:08 +00:00
0d557ef875 4.40.1 2024-03-05 12:00:09 +00:00
6e56ea4489 Merge pull request 'Replace Drone with Gitea Actions' (#1) from gitea-actions into main
Reviewed-on: #1
2024-03-04 13:42:58 +00:00
def0de643b Remove Drone 2024-03-04 13:38:57 +00:00
9e7cb2c7dd Add Gitea Actions 2024-03-04 13:38:52 +00:00
f1110506c0 4.39.3 2024-02-27 12:00:07 +00:00
f5bce7d7ff 4.39.2 2024-02-23 12:00:07 +00:00
75f45d9365 4.39.1 2024-02-20 12:00:07 +00:00
ead425e0c2 4.38.3 2024-02-14 12:00:07 +00:00
6c910d62c5 4.38.2 2024-02-06 12:00:07 +00:00
99ffd1ec0c 4.38.0 2024-02-03 16:55:23 +00:00
eda940f8b2 4.37.2 2024-01-27 12:00:07 +00:00
1dad582523 4.37.1 2024-01-25 12:00:08 +00:00
e516266a27 4.37.0 2024-01-18 12:00:07 +00:00
850fc95477 4.36.8 2023-12-28 12:00:07 +00:00
d172825900 4.36.7 2023-12-21 12:00:09 +00:00
026865e5bf 4.36.6 2023-12-17 14:56:57 +00:00
add94ef2a2 4.36.5 2023-11-30 12:00:09 +00:00
1081400948 4.36.4 2023-11-22 12:00:09 +00:00
5776128905 4.36.3 2023-11-08 12:00:06 +00:00
d661860f4c 4.35.6 2023-11-07 12:00:06 +00:00
0a52e32972 4.35.3 2023-10-05 12:00:06 +01:00
703dcbd0eb 4.35.2 2023-10-03 12:00:06 +01:00
ce7ed69547 4.35.1 2023-10-02 12:00:06 +01:00
4f5564df16 4.35.0 2023-09-29 12:00:06 +01:00
2fee569131 4.34.4 2023-08-31 12:00:06 +01:00
7ea45d6f5d 4.34.3 2023-08-29 20:20:00 +01:00
6d24db50bd 4.34.2 2023-08-25 12:00:05 +01:00
88f270c6a1 4.34.1 2023-08-09 12:00:05 +01:00
0962b1cf29 Update .drone.yml 2023-08-06 17:56:31 +00:00
6051d72691 4.33.3 2023-08-06 17:51:04 +01:00
c31a75a9ef Update README.md 2023-08-06 16:04:57 +00:00
ef289385ff Update README.md 2023-08-06 16:04:47 +00:00
9b12a2ad33 Update README.md 2023-08-06 16:04:41 +00:00
8eb19d88f3 Remove provenance [CI SKIP] 2023-08-06 16:01:04 +00:00
e36e9d3077 4.32.4 2023-08-02 16:49:54 +01:00
b2430cbc5b 4.32.1 2023-07-12 11:00:04 +00:00
1258115397 4.32.0 2023-07-11 11:00:05 +00:00
38c134d903 4.31.0 2023-06-30 11:00:06 +00:00
cd77e4cc2d 4.30.1 2023-06-28 11:00:03 +00:00
87aedf3207 4.30.0 2023-06-27 11:00:04 +00:00
3523c9fc15 4.29.4 2023-06-07 11:00:05 +00:00
a6f4995cb5 4.29.3 2023-06-01 11:00:05 +00:00
727f61a35e 4.28.2 2023-05-16 11:00:09 +00:00
ce5124605a 4.28.1 2023-05-10 11:00:05 +00:00
2c82b03f8d 4.27.0 2023-04-25 11:00:05 +00:00
1b7a6223ac 4.26.1 2023-04-20 11:00:06 +00:00
75331c62a4 4.25.1 2023-04-15 11:00:05 +00:00
3f68a3e640 4.24.0 2023-04-11 11:00:05 +00:00
8ee4f9462e 4.23.0 2023-03-24 12:00:07 +00:00
822855d584 4.22.5 2023-03-14 12:00:06 +00:00
1a6a7e079b Update '.drone.yml' 2023-03-08 18:32:53 +00:00
5210cb6515 4.22.4 2023-03-08 12:00:06 +00:00
b643f0644b 4.22.3 2023-03-01 12:00:06 +00:00
5d093db4f6 4.22.2 2023-02-16 12:00:05 +00:00
0b16fcac67 Update 'README.md' 2023-02-10 13:00:46 +00:00
a0d294da53 Update 'README.md' 2023-01-27 16:29:12 +00:00
c3f755aede Update '.drone.yml' 2023-01-27 16:26:22 +00:00
0aea62c222 4.22.0 2023-01-17 12:00:04 +00:00
92f4ad2237 4.21.3 2022-12-30 16:47:07 +00:00
20da343c54 4.21.3 2022-12-30 16:23:27 +00:00
02776e8478 add drone 2022-12-30 15:35:10 +00:00
8 changed files with 15 additions and 109 deletions

View File

@ -583,7 +583,3 @@ UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)
STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ
EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None) EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
# We want it disabled by default, so only skip if defined
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ

View File

@ -34,9 +34,6 @@ class EventDispatcher:
dispatcher: Dispatcher = PostgresDispatcher.get(), dispatcher: Dispatcher = PostgresDispatcher.get(),
skip_if_webhook_missing: bool = True, skip_if_webhook_missing: bool = True,
): ):
if config.EVENT_WEBHOOK_DISABLE:
return
if not config.EVENT_WEBHOOK and skip_if_webhook_missing: if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
return return

View File

@ -3699,10 +3699,7 @@ class SyncEvent(Base, ModelMixin):
AND taken_time IS NULL AND taken_time IS NULL
""" """
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id} args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}
res = Session.execute(sql, args) res = Session.execute(sql, args)
Session.commit()
return res.rowcount > 0 return res.rowcount > 0
@classmethod @classmethod

View File

@ -3,10 +3,9 @@ from enum import Enum
from sys import argv, exit from sys import argv, exit
from app.config import DB_URI from app.config import DB_URI
from app.log import LOG
from events.runner import Runner from events.runner import Runner
from events.event_source import DeadLetterEventSource, PostgresEventSource from events.event_source import DeadLetterEventSource, PostgresEventSource
from events.event_sink import ConsoleEventSink, HttpEventSink from events.event_sink import ConsoleEventSink
class Mode(Enum): class Mode(Enum):
@ -25,20 +24,16 @@ class Mode(Enum):
def main(mode: Mode, dry_run: bool): def main(mode: Mode, dry_run: bool):
if mode == Mode.DEAD_LETTER: if mode == Mode.DEAD_LETTER:
LOG.i("Using DeadLetterEventSource")
source = DeadLetterEventSource() source = DeadLetterEventSource()
elif mode == Mode.LISTENER: elif mode == Mode.LISTENER:
LOG.i("Using PostgresEventSource")
source = PostgresEventSource(DB_URI) source = PostgresEventSource(DB_URI)
else: else:
raise ValueError(f"Invalid mode: {mode}") raise ValueError(f"Invalid mode: {mode}")
if dry_run: if dry_run:
LOG.i("Starting with ConsoleEventSink")
sink = ConsoleEventSink() sink = ConsoleEventSink()
else: else:
LOG.i("Starting with HttpEventSink") sink = ConsoleEventSink()
sink = HttpEventSink()
runner = Runner(source=source, sink=sink) runner = Runner(source=source, sink=sink)
runner.run() runner.run()

View File

@ -1,42 +1,19 @@
import requests
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL
from app.log import LOG from app.log import LOG
from app.models import SyncEvent from app.models import SyncEvent
class EventSink(ABC): class EventSink(ABC):
@abstractmethod @abstractmethod
def process(self, event: SyncEvent) -> bool: def process(self, event: SyncEvent):
pass pass
class HttpEventSink(EventSink): class HttpEventSink(EventSink):
def process(self, event: SyncEvent) -> bool: def process(self, event: SyncEvent):
if not EVENT_WEBHOOK: pass
LOG.warning("Skipping sending event because there is no webhook configured")
return False
LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}")
res = requests.post(
url=EVENT_WEBHOOK,
data=event.content,
headers={"Content-Type": "application/x-protobuf"},
verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL,
)
if res.status_code != 200:
LOG.warning(
f"Failed to send event to webhook: {res.status_code} {res.text}"
)
return False
else:
LOG.info(f"Event {event.id} sent successfully to webhook")
return True
class ConsoleEventSink(EventSink): class ConsoleEventSink(EventSink):
def process(self, event: SyncEvent) -> bool: def process(self, event: SyncEvent):
LOG.info(f"Handling event {event.id}") LOG.info(f"Handling event {event.id}")
return True

View File

@ -13,8 +13,6 @@ from typing import Callable, NoReturn
_DEAD_LETTER_THRESHOLD_MINUTES = 10 _DEAD_LETTER_THRESHOLD_MINUTES = 10
_DEAD_LETTER_INTERVAL_SECONDS = 30 _DEAD_LETTER_INTERVAL_SECONDS = 30
_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5
class EventSource(ABC): class EventSource(ABC):
@abstractmethod @abstractmethod
@ -24,19 +22,9 @@ class EventSource(ABC):
class PostgresEventSource(EventSource): class PostgresEventSource(EventSource):
def __init__(self, connection_string: str): def __init__(self, connection_string: str):
self.__connection_string = connection_string self.__connection = psycopg2.connect(connection_string)
self.__connect()
def run(self, on_event: Callable[[SyncEvent], NoReturn]): def run(self, on_event: Callable[[SyncEvent], NoReturn]):
while True:
try:
self.__listen(on_event)
except Exception as e:
LOG.warn(f"Error listening to events: {e}")
sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS)
self.__connect()
def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
self.__connection.set_isolation_level( self.__connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
) )
@ -56,24 +44,12 @@ class PostgresEventSource(EventSource):
webhook_id = int(notify.payload) webhook_id = int(notify.payload)
event = SyncEvent.get_by(id=webhook_id) event = SyncEvent.get_by(id=webhook_id)
if event is not None: if event is not None:
if event.mark_as_taken():
on_event(event) on_event(event)
else:
LOG.info(
f"Event {event.id} was handled by another runner"
)
else: else:
LOG.info(f"Could not find event with id={notify.payload}") LOG.info(f"Could not find event with id={notify.payload}")
except Exception as e: except Exception as e:
LOG.warn(f"Error getting event: {e}") LOG.warn(f"Error getting event: {e}")
def __connect(self):
self.__connection = psycopg2.connect(self.__connection_string)
from app.db import Session
Session.close()
class DeadLetterEventSource(EventSource): class DeadLetterEventSource(EventSource):
@newrelic.agent.background_task() @newrelic.agent.background_task()
@ -97,4 +73,3 @@ class DeadLetterEventSource(EventSource):
sleep(_DEAD_LETTER_INTERVAL_SECONDS) sleep(_DEAD_LETTER_INTERVAL_SECONDS)
except Exception as e: except Exception as e:
LOG.warn(f"Error getting dead letter event: {e}") LOG.warn(f"Error getting dead letter event: {e}")
sleep(_DEAD_LETTER_INTERVAL_SECONDS)

View File

@ -18,10 +18,11 @@ class Runner:
@newrelic.agent.background_task() @newrelic.agent.background_task()
def __on_event(self, event: SyncEvent): def __on_event(self, event: SyncEvent):
try: try:
can_process = event.mark_as_taken()
if can_process:
event_created_at = event.created_at event_created_at = event.created_at
start_time = arrow.now() start_time = arrow.now()
success = self.__sink.process(event) self.__sink.process(event)
if success:
event_id = event.id event_id = event.id
SyncEvent.delete(event.id, commit=True) SyncEvent.delete(event.id, commit=True)
LOG.info(f"Marked {event_id} as done") LOG.info(f"Marked {event_id} as done")
@ -37,6 +38,8 @@ class Runner:
"Custom/sync_event_elapsed_time", "Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(), time_between_taken_and_created.total_seconds(),
) )
else:
LOG.info(f"{event.id} was handled by another runner")
except Exception as e: except Exception as e:
LOG.warn(f"Exception processing event [id={event.id}]: {e}") LOG.warn(f"Exception processing event [id={event.id}]: {e}")
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1) newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)

View File

@ -4,7 +4,6 @@ import subprocess
from time import sleep from time import sleep
from typing import List, Dict from typing import List, Dict
import arrow
import newrelic.agent import newrelic.agent
from app.db import Session from app.db import Session
@ -94,44 +93,11 @@ def log_nb_db_connection():
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection) newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)
@newrelic.agent.background_task()
def log_pending_to_process_events():
r = Session.execute("select count(*) from sync_event WHERE taken_time IS NULL;")
events_pending = list(r)[0][0]
LOG.d("number of events pending to process %s", events_pending)
newrelic.agent.record_custom_metric(
"Custom/sync_events_pending_to_process", events_pending
)
@newrelic.agent.background_task()
def log_events_pending_dead_letter():
since = arrow.now().shift(minutes=-10).datetime
r = Session.execute(
"""
SELECT COUNT(*)
FROM sync_event
WHERE (taken_time IS NOT NULL AND taken_time < :since)
OR (taken_time IS NULL AND created_at < :since)
""",
{"since": since},
)
events_pending = list(r)[0][0]
LOG.d("number of events pending dead letter %s", events_pending)
newrelic.agent.record_custom_metric(
"Custom/sync_events_pending_dead_letter", events_pending
)
if __name__ == "__main__": if __name__ == "__main__":
exporter = MetricExporter(get_newrelic_license()) exporter = MetricExporter(get_newrelic_license())
while True: while True:
log_postfix_metrics() log_postfix_metrics()
log_nb_db_connection() log_nb_db_connection()
log_pending_to_process_events()
log_events_pending_dead_letter()
Session.close() Session.close()
exporter.run() exporter.run()