Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
41a5a65f51 | |||
1d0c7ec4a0 |
@ -3,7 +3,7 @@ import random
|
|||||||
import socket
|
import socket
|
||||||
import string
|
import string
|
||||||
from ast import literal_eval
|
from ast import literal_eval
|
||||||
from typing import Callable, List
|
from typing import Callable, List, Optional
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
@ -588,3 +588,24 @@ EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
|
|||||||
# We want it disabled by default, so only skip if defined
|
# We want it disabled by default, so only skip if defined
|
||||||
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
|
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
|
||||||
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ
|
EVENT_WEBHOOK_DISABLE = "EVENT_WEBHOOK_DISABLE" in os.environ
|
||||||
|
|
||||||
|
|
||||||
|
def read_webhook_enabled_user_ids() -> Optional[List[int]]:
|
||||||
|
user_ids = os.environ.get("EVENT_WEBHOOK_ENABLED_USER_IDS", None)
|
||||||
|
if user_ids is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
ids = []
|
||||||
|
for user_id in user_ids.split(","):
|
||||||
|
try:
|
||||||
|
ids.append(int(user_id.strip()))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return ids
|
||||||
|
|
||||||
|
|
||||||
|
EVENT_WEBHOOK_ENABLED_USER_IDS: Optional[List[int]] = read_webhook_enabled_user_ids()
|
||||||
|
|
||||||
|
# Allow to define a different DB_URI for the event listener, in case we want to skip the connection pool
|
||||||
|
# It defaults to the regular DB_URI in case it's needed
|
||||||
|
EVENT_LISTENER_DB_URI = os.environ.get("EVENT_LISTENER_DB_URI", DB_URI)
|
||||||
|
@ -40,6 +40,10 @@ class EventDispatcher:
|
|||||||
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
|
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if config.EVENT_WEBHOOK_ENABLED_USER_IDS is not None:
|
||||||
|
if user.id not in config.EVENT_WEBHOOK_ENABLED_USER_IDS:
|
||||||
|
return
|
||||||
|
|
||||||
partner_user = EventDispatcher.__partner_user(user.id)
|
partner_user = EventDispatcher.__partner_user(user.id)
|
||||||
if not partner_user:
|
if not partner_user:
|
||||||
return
|
return
|
||||||
|
@ -2,7 +2,7 @@ import argparse
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from sys import argv, exit
|
from sys import argv, exit
|
||||||
|
|
||||||
from app.config import DB_URI
|
from app.config import EVENT_LISTENER_DB_URI
|
||||||
from app.log import LOG
|
from app.log import LOG
|
||||||
from events.runner import Runner
|
from events.runner import Runner
|
||||||
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
||||||
@ -31,7 +31,7 @@ def main(mode: Mode, dry_run: bool, max_retries: int):
|
|||||||
source = DeadLetterEventSource(max_retries)
|
source = DeadLetterEventSource(max_retries)
|
||||||
elif mode == Mode.LISTENER:
|
elif mode == Mode.LISTENER:
|
||||||
LOG.i("Using PostgresEventSource")
|
LOG.i("Using PostgresEventSource")
|
||||||
source = PostgresEventSource(DB_URI)
|
source = PostgresEventSource(EVENT_LISTENER_DB_URI)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Invalid mode: {mode}")
|
raise ValueError(f"Invalid mode: {mode}")
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ class PostgresEventSource(EventSource):
|
|||||||
cursor = self.__connection.cursor()
|
cursor = self.__connection.cursor()
|
||||||
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
|
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
|
||||||
|
|
||||||
|
LOG.info("Starting to listen to events")
|
||||||
while True:
|
while True:
|
||||||
if select.select([self.__connection], [], [], 5) != ([], [], []):
|
if select.select([self.__connection], [], [], 5) != ([], [], []):
|
||||||
self.__connection.poll()
|
self.__connection.poll()
|
||||||
|
Reference in New Issue
Block a user