This commit is contained in:
2024-03-19 12:00:09 +00:00
parent d5981588e4
commit c111fbe8e1
15 changed files with 269 additions and 40 deletions

View File

View File

@ -0,0 +1,35 @@
import tempfile
from io import BytesIO
import arrow
from app import s3, config
from app.models import File, BatchImport
from tasks.cleanup_old_imports import cleanup_old_imports
from tests.utils import random_token, create_new_user
def test_cleanup_old_imports():
BatchImport.filter().delete()
with tempfile.TemporaryDirectory() as tmpdir:
config.UPLOAD_DIR = tmpdir
user = create_new_user()
path = random_token()
s3.upload_from_bytesio(path, BytesIO("data".encode("utf-8")))
file = File.create(path=path, commit=True) # noqa: F821
now = arrow.now()
delete_batch_import_id = BatchImport.create(
user_id=user.id,
file_id=file.id,
created_at=now.shift(minutes=-1),
flush=True,
).id
keep_batch_import_id = BatchImport.create(
user_id=user.id,
file_id=file.id,
created_at=now.shift(minutes=+1),
commit=True,
).id
cleanup_old_imports(now)
assert BatchImport.get(id=delete_batch_import_id) is None
assert BatchImport.get(id=keep_batch_import_id) is not None

View File

@ -0,0 +1,72 @@
import arrow
from app import config
from app.models import Job, JobState
from tasks.cleanup_old_jobs import cleanup_old_jobs
def test_cleanup_old_jobs():
Job.filter().delete()
now = arrow.now()
delete_ids = [
Job.create(
updated_at=now.shift(minutes=-1),
state=JobState.done.value,
name="",
payload="",
flush=True,
).id,
Job.create(
updated_at=now.shift(minutes=-1),
state=JobState.error.value,
name="",
payload="",
flush=True,
).id,
Job.create(
updated_at=now.shift(minutes=-1),
state=JobState.taken.value,
attempts=config.JOB_MAX_ATTEMPTS,
name="",
payload="",
flush=True,
).id,
]
keep_ids = [
Job.create(
updated_at=now.shift(minutes=+1),
state=JobState.done.value,
name="",
payload="",
flush=True,
).id,
Job.create(
updated_at=now.shift(minutes=+1),
state=JobState.error.value,
name="",
payload="",
flush=True,
).id,
Job.create(
updated_at=now.shift(minutes=+1),
state=JobState.taken.value,
attempts=config.JOB_MAX_ATTEMPTS,
name="",
payload="",
flush=True,
).id,
Job.create(
updated_at=now.shift(minutes=-1),
state=JobState.taken.value,
attempts=config.JOB_MAX_ATTEMPTS - 1,
name="",
payload="",
flush=True,
).id,
]
cleanup_old_jobs(now)
for delete_id in delete_ids:
assert Job.get(id=delete_id) is None
for keep_id in keep_ids:
assert Job.get(id=keep_id) is not None

View File

@ -0,0 +1,26 @@
import arrow
from app.models import Notification
from tasks.cleanup_old_notifications import cleanup_old_notifications
from tests.utils import create_new_user
def test_cleanup_old_notifications():
Notification.filter().delete()
user = create_new_user()
now = arrow.now()
delete_id = Notification.create(
user_id=user.id,
created_at=now.shift(minutes=-1),
message="",
flush=True,
).id
keep_id = Notification.create(
user_id=user.id,
created_at=now.shift(minutes=+1),
message="",
flush=True,
).id
cleanup_old_notifications(now)
assert Notification.get(id=delete_id) is None
assert Notification.get(id=keep_id) is not None