""" Unit-Tests für mailmerge.services.retention. Decken ab: - Stichtag-Berechnung (inkl. Edge Cases days < 1) - QuerySet-Logik (welche Jobs sind Kandidaten?) - Bulk-Cleanup mit Dry-Run - File-Löschung wird ausgelöst - Wartende/laufende Jobs werden NICHT gelöscht - finished_at-Fallback auf created_at """ from __future__ import annotations from datetime import timedelta from unittest.mock import patch import pytest from django.core.files.base import ContentFile from django.utils import timezone from mailmerge.models import JobLogEntry, LetterTemplate, MailMergeJob from mailmerge.services.retention import ( CleanupResult, cleanup_expired_jobs, expired_jobs_queryset, get_cutoff, ) pytestmark = pytest.mark.django_db # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def template(user): return LetterTemplate.objects.create( name="Retention-Testvorlage", file=ContentFile(b"dummy-docx", name="tpl.docx"), placeholders=["vorname"], created_by=user, ) def _make_job(template, user, *, status, finished_offset_days=None, created_offset_days=0, with_files=False): """Hilfsfunktion zum Anlegen eines Jobs mit künstlichem Zeitstempel. finished_offset_days: positive Zahl = vor X Tagen abgeschlossen. None = finished_at bleibt NULL (Fallback-Test). Reihenfolge wichtig: Files MUSS vor dem Zurückdatieren der Timestamps angehängt werden, weil `FileField.save(save=True)` ein `instance.save()` triggert, das die per `.update()` gesetzten Datumswerte wieder mit den aktuellen Memory-Werten überschreibt. """ now = timezone.now() job = MailMergeJob.objects.create( template=template, recipients_csv=ContentFile(b"vorname\nAnna\n", name="r.csv"), status=status, created_by=user, ) if with_files: # save=False, damit kein zusätzliches instance.save() die # Zeitstempel verbiegt. Die Datei landet trotzdem auf dem Storage. job.result_pdf.save("out.pdf", ContentFile(b"%PDF-1.4"), save=False) job.save(update_fields=["result_pdf"]) # Jetzt erst die Timestamps zurückdatieren — über `.update()`, damit # auto_now_add nicht greift und kein zusätzliches .save() ausgelöst wird. new_created = now - timedelta(days=created_offset_days) new_finished = ( now - timedelta(days=finished_offset_days) if finished_offset_days is not None else None ) MailMergeJob.objects.filter(pk=job.pk).update( created_at=new_created, finished_at=new_finished, ) job.refresh_from_db() return job # --------------------------------------------------------------------------- # get_cutoff # --------------------------------------------------------------------------- class TestGetCutoff: def test_uses_settings_default(self, settings): settings.JOB_RETENTION_DAYS = 30 cutoff = get_cutoff() delta = timezone.now() - cutoff # Toleranz, weil ein paar Millisekunden zwischen den Aufrufen liegen assert timedelta(days=29, hours=23) < delta < timedelta(days=30, hours=1) def test_explicit_overrides_settings(self, settings): settings.JOB_RETENTION_DAYS = 30 cutoff = get_cutoff(retention_days=7) delta = timezone.now() - cutoff assert timedelta(days=6, hours=23) < delta < timedelta(days=7, hours=1) def test_zero_raises(self): with pytest.raises(ValueError): get_cutoff(retention_days=0) def test_negative_raises(self): with pytest.raises(ValueError): get_cutoff(retention_days=-5) # --------------------------------------------------------------------------- # expired_jobs_queryset # --------------------------------------------------------------------------- class TestExpiredJobsQueryset: def test_done_job_older_than_cutoff_is_candidate(self, template, user): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) cutoff = timezone.now() - timedelta(days=30) assert expired_jobs_queryset(cutoff).count() == 1 def test_done_job_younger_than_cutoff_is_not_candidate(self, template, user): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=10) cutoff = timezone.now() - timedelta(days=30) assert expired_jobs_queryset(cutoff).count() == 0 def test_failed_jobs_are_candidates(self, template, user): _make_job(template, user, status=MailMergeJob.Status.FAILED, finished_offset_days=40) cutoff = timezone.now() - timedelta(days=30) assert expired_jobs_queryset(cutoff).count() == 1 def test_running_jobs_are_never_candidates(self, template, user): _make_job(template, user, status=MailMergeJob.Status.RUNNING, finished_offset_days=None, created_offset_days=999) cutoff = timezone.now() - timedelta(days=30) assert expired_jobs_queryset(cutoff).count() == 0 def test_pending_jobs_are_never_candidates(self, template, user): _make_job(template, user, status=MailMergeJob.Status.PENDING, finished_offset_days=None, created_offset_days=999) cutoff = timezone.now() - timedelta(days=30) assert expired_jobs_queryset(cutoff).count() == 0 def test_finished_at_null_falls_back_to_created_at(self, template, user): """Job ohne finished_at, aber created_at > cutoff → muss matchen. Schützt vor verwaisten 'DONE'-Jobs, bei denen aus irgendeinem Grund finished_at fehlt (z.B. Crash zwischen save() und finished_at). """ _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=None, created_offset_days=40) cutoff = timezone.now() - timedelta(days=30) assert expired_jobs_queryset(cutoff).count() == 1 # --------------------------------------------------------------------------- # cleanup_expired_jobs # --------------------------------------------------------------------------- class TestCleanupHappyPath: def test_deletes_expired_done_job(self, template, user): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) result = cleanup_expired_jobs(retention_days=30) assert result.deleted_jobs == 1 assert MailMergeJob.objects.count() == 0 def test_keeps_recent_job(self, template, user): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=10) result = cleanup_expired_jobs(retention_days=30) assert result.deleted_jobs == 0 assert MailMergeJob.objects.count() == 1 def test_deletes_associated_files(self, template, user): job = _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40, with_files=True) # Eingangs-CSV + Output-PDF = 2 Dateien assert job.recipients_csv.name assert job.result_pdf.name result = cleanup_expired_jobs(retention_days=30) assert result.deleted_files == 2 def test_cascade_removes_log_entries(self, template, user): job = _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) JobLogEntry.objects.create(job=job, level="info", message="x") JobLogEntry.objects.create(job=job, level="info", message="y") assert JobLogEntry.objects.count() == 2 cleanup_expired_jobs(retention_days=30) assert JobLogEntry.objects.count() == 0 def test_returns_cleanup_result(self, template, user): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) result = cleanup_expired_jobs(retention_days=30) assert isinstance(result, CleanupResult) assert result.dry_run is False assert result.candidates == 1 class TestCleanupDryRun: def test_dry_run_deletes_nothing(self, template, user): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) result = cleanup_expired_jobs(retention_days=30, dry_run=True) assert result.dry_run is True assert result.candidates == 1 assert result.deleted_jobs == 0 assert MailMergeJob.objects.count() == 1 def test_dry_run_counts_candidates(self, template, user): for _ in range(3): _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) result = cleanup_expired_jobs(retention_days=30, dry_run=True) assert result.candidates == 3 class TestCleanupMixed: def test_only_expired_terminal_jobs_are_touched(self, template, user): # 1× expired DONE, 1× expired FAILED → werden gelöscht _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=40) _make_job(template, user, status=MailMergeJob.Status.FAILED, finished_offset_days=40) # 1× recent DONE, 1× recent FAILED → bleiben _make_job(template, user, status=MailMergeJob.Status.DONE, finished_offset_days=10) _make_job(template, user, status=MailMergeJob.Status.FAILED, finished_offset_days=10) # 1× running, 1× pending (älter als cutoff) → bleiben in jedem Fall _make_job(template, user, status=MailMergeJob.Status.RUNNING, finished_offset_days=None, created_offset_days=999) _make_job(template, user, status=MailMergeJob.Status.PENDING, finished_offset_days=None, created_offset_days=999) result = cleanup_expired_jobs(retention_days=30) assert result.deleted_jobs == 2 assert MailMergeJob.objects.count() == 4