Files
2026-05-22 09:14:31 +02:00

246 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Unit-Tests für mailmerge.services.retention.
Decken ab:
- Stichtag-Berechnung (inkl. Edge Cases days < 1)
- QuerySet-Logik (welche Jobs sind Kandidaten?)
- Bulk-Cleanup mit Dry-Run
- File-Löschung wird ausgelöst
- Wartende/laufende Jobs werden NICHT gelöscht
- finished_at-Fallback auf created_at
"""
from __future__ import annotations
from datetime import timedelta
from unittest.mock import patch
import pytest
from django.core.files.base import ContentFile
from django.utils import timezone
from mailmerge.models import JobLogEntry, LetterTemplate, MailMergeJob
from mailmerge.services.retention import (
CleanupResult,
cleanup_expired_jobs,
expired_jobs_queryset,
get_cutoff,
)
pytestmark = pytest.mark.django_db
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def template(user):
return LetterTemplate.objects.create(
name="Retention-Testvorlage",
file=ContentFile(b"dummy-docx", name="tpl.docx"),
placeholders=["vorname"],
created_by=user,
)
def _make_job(template, user, *, status, finished_offset_days=None,
created_offset_days=0, with_files=False):
"""Hilfsfunktion zum Anlegen eines Jobs mit künstlichem Zeitstempel.
finished_offset_days: positive Zahl = vor X Tagen abgeschlossen.
None = finished_at bleibt NULL (Fallback-Test).
Reihenfolge wichtig: Files MUSS vor dem Zurückdatieren der Timestamps
angehängt werden, weil `FileField.save(save=True)` ein `instance.save()`
triggert, das die per `.update()` gesetzten Datumswerte wieder mit den
aktuellen Memory-Werten überschreibt.
"""
now = timezone.now()
job = MailMergeJob.objects.create(
template=template,
recipients_csv=ContentFile(b"vorname\nAnna\n", name="r.csv"),
status=status,
created_by=user,
)
if with_files:
# save=False, damit kein zusätzliches instance.save() die
# Zeitstempel verbiegt. Die Datei landet trotzdem auf dem Storage.
job.result_pdf.save("out.pdf", ContentFile(b"%PDF-1.4"), save=False)
job.save(update_fields=["result_pdf"])
# Jetzt erst die Timestamps zurückdatieren — über `.update()`, damit
# auto_now_add nicht greift und kein zusätzliches .save() ausgelöst wird.
new_created = now - timedelta(days=created_offset_days)
new_finished = (
now - timedelta(days=finished_offset_days)
if finished_offset_days is not None else None
)
MailMergeJob.objects.filter(pk=job.pk).update(
created_at=new_created,
finished_at=new_finished,
)
job.refresh_from_db()
return job
# ---------------------------------------------------------------------------
# get_cutoff
# ---------------------------------------------------------------------------
class TestGetCutoff:
def test_uses_settings_default(self, settings):
settings.JOB_RETENTION_DAYS = 30
cutoff = get_cutoff()
delta = timezone.now() - cutoff
# Toleranz, weil ein paar Millisekunden zwischen den Aufrufen liegen
assert timedelta(days=29, hours=23) < delta < timedelta(days=30, hours=1)
def test_explicit_overrides_settings(self, settings):
settings.JOB_RETENTION_DAYS = 30
cutoff = get_cutoff(retention_days=7)
delta = timezone.now() - cutoff
assert timedelta(days=6, hours=23) < delta < timedelta(days=7, hours=1)
def test_zero_raises(self):
with pytest.raises(ValueError):
get_cutoff(retention_days=0)
def test_negative_raises(self):
with pytest.raises(ValueError):
get_cutoff(retention_days=-5)
# ---------------------------------------------------------------------------
# expired_jobs_queryset
# ---------------------------------------------------------------------------
class TestExpiredJobsQueryset:
def test_done_job_older_than_cutoff_is_candidate(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 1
def test_done_job_younger_than_cutoff_is_not_candidate(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=10)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 0
def test_failed_jobs_are_candidates(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.FAILED,
finished_offset_days=40)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 1
def test_running_jobs_are_never_candidates(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.RUNNING,
finished_offset_days=None, created_offset_days=999)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 0
def test_pending_jobs_are_never_candidates(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.PENDING,
finished_offset_days=None, created_offset_days=999)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 0
def test_finished_at_null_falls_back_to_created_at(self, template, user):
"""Job ohne finished_at, aber created_at > cutoff → muss matchen.
Schützt vor verwaisten 'DONE'-Jobs, bei denen aus irgendeinem Grund
finished_at fehlt (z.B. Crash zwischen save() und finished_at).
"""
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=None, created_offset_days=40)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 1
# ---------------------------------------------------------------------------
# cleanup_expired_jobs
# ---------------------------------------------------------------------------
class TestCleanupHappyPath:
def test_deletes_expired_done_job(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_jobs == 1
assert MailMergeJob.objects.count() == 0
def test_keeps_recent_job(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=10)
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_jobs == 0
assert MailMergeJob.objects.count() == 1
def test_deletes_associated_files(self, template, user):
job = _make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40, with_files=True)
# Eingangs-CSV + Output-PDF = 2 Dateien
assert job.recipients_csv.name
assert job.result_pdf.name
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_files == 2
def test_cascade_removes_log_entries(self, template, user):
job = _make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
JobLogEntry.objects.create(job=job, level="info", message="x")
JobLogEntry.objects.create(job=job, level="info", message="y")
assert JobLogEntry.objects.count() == 2
cleanup_expired_jobs(retention_days=30)
assert JobLogEntry.objects.count() == 0
def test_returns_cleanup_result(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30)
assert isinstance(result, CleanupResult)
assert result.dry_run is False
assert result.candidates == 1
class TestCleanupDryRun:
def test_dry_run_deletes_nothing(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30, dry_run=True)
assert result.dry_run is True
assert result.candidates == 1
assert result.deleted_jobs == 0
assert MailMergeJob.objects.count() == 1
def test_dry_run_counts_candidates(self, template, user):
for _ in range(3):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30, dry_run=True)
assert result.candidates == 3
class TestCleanupMixed:
def test_only_expired_terminal_jobs_are_touched(self, template, user):
# 1× expired DONE, 1× expired FAILED → werden gelöscht
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
_make_job(template, user, status=MailMergeJob.Status.FAILED,
finished_offset_days=40)
# 1× recent DONE, 1× recent FAILED → bleiben
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=10)
_make_job(template, user, status=MailMergeJob.Status.FAILED,
finished_offset_days=10)
# 1× running, 1× pending (älter als cutoff) → bleiben in jedem Fall
_make_job(template, user, status=MailMergeJob.Status.RUNNING,
finished_offset_days=None, created_offset_days=999)
_make_job(template, user, status=MailMergeJob.Status.PENDING,
finished_offset_days=None, created_offset_days=999)
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_jobs == 2
assert MailMergeJob.objects.count() == 4