cleanup von abgeschlossenen jobs

This commit is contained in:
2026-05-22 09:14:31 +02:00
parent d61dbd46fd
commit 848e48b0dd
7 changed files with 689 additions and 31 deletions
@@ -0,0 +1,56 @@
"""
Management-Command für den Retention-Cleanup.
Nutzbar für:
- Dry-Run-Audits ("welche Jobs würden gelöscht werden?")
- Ad-hoc-Ausführung außerhalb des Beat-Schedules (z.B. nach DSGVO-Anfrage)
- Smoke-Test in CI
Beispiele:
docker compose exec web python manage.py cleanup_jobs --dry-run
docker compose exec web python manage.py cleanup_jobs --days 7
docker compose exec web python manage.py cleanup_jobs
"""
from __future__ import annotations
from django.core.management.base import BaseCommand, CommandError
from mailmerge.services.retention import cleanup_expired_jobs
class Command(BaseCommand):
help = "Löscht abgelaufene MailMergeJobs (Status DONE/FAILED, älter als Retention-Frist)."
def add_arguments(self, parser):
parser.add_argument(
"--days",
type=int,
default=None,
help="Überschreibt JOB_RETENTION_DAYS aus den Settings.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Nur Kandidaten zählen, nichts löschen.",
)
def handle(self, *args, **options):
days = options["days"]
dry_run = options["dry_run"]
try:
result = cleanup_expired_jobs(retention_days=days, dry_run=dry_run)
except ValueError as exc:
raise CommandError(str(exc)) from exc
self.stdout.write(f"Stichtag : {result.cutoff.isoformat()}")
self.stdout.write(f"Dry-Run : {result.dry_run}")
self.stdout.write(f"Kandidaten : {result.candidates}")
self.stdout.write(f"Gelöschte Jobs : {result.deleted_jobs}")
self.stdout.write(f"Gelöschte Files: {result.deleted_files}")
if result.errors:
self.stdout.write(self.style.WARNING(f"Fehler ({len(result.errors)}):"))
for err in result.errors:
self.stdout.write(self.style.WARNING(f" - {err}"))
elif not dry_run:
self.stdout.write(self.style.SUCCESS("Cleanup erfolgreich."))
@@ -0,0 +1,74 @@
"""
Registriert den Retention-Cleanup als Periodic Task in django_celery_beat.
Standardplan: täglich um 03:15 Uhr (kollidiert nicht mit dem Backup-Service,
der i.d.R. um 03:00 läuft, und liegt in der Wartungsphase).
Die Zeitzone richtet sich nach `settings.CELERY_TIMEZONE` bzw. `TIME_ZONE`.
"""
from __future__ import annotations
from django.db import migrations
CRON_HOUR = "3"
CRON_MINUTE = "15"
TASK_NAME = "Retention-Cleanup: abgelaufene Jobs löschen"
TASK_DOTTED = "mailmerge.cleanup_expired_jobs"
def create_periodic_task(apps, schema_editor):
CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
from django.conf import settings
schedule, _ = CrontabSchedule.objects.get_or_create(
minute=CRON_MINUTE,
hour=CRON_HOUR,
day_of_week="*",
day_of_month="*",
month_of_year="*",
timezone=getattr(settings, "CELERY_TIMEZONE", settings.TIME_ZONE),
)
PeriodicTask.objects.update_or_create(
name=TASK_NAME,
defaults={
"crontab": schedule,
"task": TASK_DOTTED,
"enabled": True,
"description": (
"Entfernt MailMergeJobs im Status DONE/FAILED, deren "
"finished_at älter als JOB_RETENTION_DAYS ist. "
"Inkl. recipients_csv und result_pdf auf dem Storage."
),
},
)
def remove_periodic_task(apps, schema_editor):
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(name=TASK_NAME).delete()
class Migration(migrations.Migration):
"""Achtung: Wir definieren KEINE konkrete Migrationsabhängigkeit zu
django_celery_beat, weil dessen letzte Migration je Paketversion
unterschiedlich heißt. `run_before`/`dependencies` mit `("__latest__")`
gibt es nicht. Stattdessen verlassen wir uns auf die Reihenfolge:
django_celery_beat steht in INSTALLED_APPS und wird mit `migrate` immer
vorab gewandert (Django sortiert nach Abhängigkeiten der App-Initials).
Beim Build wird `migrate` ohnehin sequentiell ausgeführt, daher läuft
diese Migration zuverlässig nach allen django_celery_beat-Migrationen,
sobald `mailmerge.0001_initial` durch ist.
"""
dependencies = [
("mailmerge", "0001_initial"),
]
operations = [
migrations.RunPython(create_periodic_task, remove_periodic_task),
]
+142
View File
@@ -0,0 +1,142 @@
"""
Retention-/Cleanup-Service für abgeschlossene MailMergeJobs.
Designziele:
- DSGVO-Compliance: personenbezogene CSV-Daten und generierte PDFs werden
nach Ablauf der Aufbewahrungsfrist entfernt.
- Service-Logik strikt von Celery getrennt → unit-testbar ohne Broker.
- Bulk-Löschung über das ORM, damit File-Felder ihren `delete()`-Storage-Hook
aufrufen (FileField-Dateien werden mit gelöscht).
- Dry-Run als First-Class-Citizen für Audits.
Was wird gelöscht:
- MailMergeJobs mit `status in {DONE, FAILED}` UND
`finished_at` (oder `created_at` als Fallback) älter als Stichtag.
- Wartende oder laufende Jobs werden **nie** gelöscht.
Was bleibt:
- LetterTemplates (Vorlagen sind keine personenbezogenen Daten).
- JobLogEntries werden über `on_delete=CASCADE` mit dem Job entfernt.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from django.conf import settings
from django.db.models import Q, QuerySet
from django.utils import timezone
from mailmerge.models import MailMergeJob
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class CleanupResult:
"""Ergebnis eines Cleanup-Laufs."""
cutoff: datetime
dry_run: bool
candidates: int
deleted_jobs: int
deleted_files: int
errors: list[str]
# Status-Werte, die als "fertig" gelten und damit für Cleanup in Frage kommen.
TERMINAL_STATUSES = (MailMergeJob.Status.DONE, MailMergeJob.Status.FAILED)
def get_cutoff(retention_days: Optional[int] = None) -> datetime:
"""Berechnet den Stichtag. Jobs, die vor diesem Zeitpunkt fertig wurden,
sind Kandidaten für die Löschung."""
days = retention_days if retention_days is not None else settings.JOB_RETENTION_DAYS
if days < 1:
raise ValueError(
f"retention_days muss >= 1 sein (war: {days}). "
"0 oder negativ würde alle Jobs sofort löschen."
)
return timezone.now() - timedelta(days=days)
def expired_jobs_queryset(cutoff: datetime) -> QuerySet[MailMergeJob]:
"""QuerySet aller Jobs, die für die Löschung in Frage kommen.
Nutzt `finished_at` wenn gesetzt, sonst `created_at` als Fallback
(z.B. falls ein Job ohne sauberen Finish-Zeitstempel hängengeblieben ist).
"""
return MailMergeJob.objects.filter(
Q(status__in=TERMINAL_STATUSES)
& (
Q(finished_at__lt=cutoff)
| Q(finished_at__isnull=True, created_at__lt=cutoff)
)
)
def cleanup_expired_jobs(
retention_days: Optional[int] = None,
dry_run: bool = False,
) -> CleanupResult:
"""Löscht alle abgelaufenen Jobs inkl. ihrer FileField-Dateien.
Args:
retention_days: Optional, überschreibt `settings.JOB_RETENTION_DAYS`.
dry_run: Wenn True, wird nichts gelöscht — nur Kandidaten gezählt.
Returns:
CleanupResult mit Stichtag, Kandidatenzahl, gelöschten Datensätzen
und gelöschten Dateien. Bei Fehlern wird die Exception einzelner
Datei-Löschungen geloggt, der Lauf bricht aber nicht ab.
"""
cutoff = get_cutoff(retention_days)
qs = expired_jobs_queryset(cutoff)
candidates = qs.count()
logger.info(
"Cleanup gestartet: cutoff=%s, dry_run=%s, kandidaten=%d",
cutoff.isoformat(), dry_run, candidates,
)
if dry_run or candidates == 0:
return CleanupResult(
cutoff=cutoff, dry_run=dry_run, candidates=candidates,
deleted_jobs=0, deleted_files=0, errors=[],
)
deleted_files = 0
errors: list[str] = []
# Wichtig: einzeln iterieren, damit FileField.delete() pro Job
# die zugehörigen Dateien vom Storage entfernt.
# `MailMergeJob.objects.filter(...).delete()` würde die Storage-Hooks
# NICHT aufrufen — Files blieben verwaist.
for job in qs.iterator(chunk_size=100):
try:
if job.recipients_csv and job.recipients_csv.name:
job.recipients_csv.delete(save=False)
deleted_files += 1
if job.result_pdf and job.result_pdf.name:
job.result_pdf.delete(save=False)
deleted_files += 1
except OSError as exc:
msg = f"Datei-Löschung für Job {job.id} fehlgeschlagen: {exc}"
logger.warning(msg)
errors.append(msg)
# DB-Löschung in einem Bulk-Statement (CASCADE entfernt JobLogEntry)
deleted_jobs, _ = qs.delete()
logger.info(
"Cleanup beendet: jobs_deleted=%d, files_deleted=%d, errors=%d",
deleted_jobs, deleted_files, len(errors),
)
return CleanupResult(
cutoff=cutoff, dry_run=False, candidates=candidates,
deleted_jobs=deleted_jobs, deleted_files=deleted_files,
errors=errors,
)
+26
View File
@@ -16,6 +16,7 @@ from django.utils import timezone
from .models import JobLogEntry, MailMergeJob
from .services.docx_renderer import docx_to_pdf, render_docx
from .services.pdf_merge import merge_pdfs
from .services.retention import cleanup_expired_jobs
logger = logging.getLogger(__name__)
@@ -85,3 +86,28 @@ def run_mailmerge(self, job_id: str) -> str:
job.save()
_log(job, "error", f"Job fehlgeschlagen: {exc}")
raise
# ---------------------------------------------------------------------------
# Retention-Cleanup
# ---------------------------------------------------------------------------
@shared_task(name="mailmerge.cleanup_expired_jobs")
def cleanup_expired_jobs_task(retention_days: int | None = None,
dry_run: bool = False) -> dict:
"""Periodic Task: löscht abgelaufene Jobs inkl. Files.
Wird vom Beat-Scheduler aufgerufen. Ergebnis als Dict (Celery-Result-
Backend serialisiert Dataclass-Instanzen nicht out-of-the-box).
"""
result = cleanup_expired_jobs(retention_days=retention_days, dry_run=dry_run)
summary = {
"cutoff": result.cutoff.isoformat(),
"dry_run": result.dry_run,
"candidates": result.candidates,
"deleted_jobs": result.deleted_jobs,
"deleted_files": result.deleted_files,
"errors": result.errors,
}
logger.info("Retention-Cleanup-Task abgeschlossen: %s", summary)
return summary
@@ -0,0 +1,245 @@
"""
Unit-Tests für mailmerge.services.retention.
Decken ab:
- Stichtag-Berechnung (inkl. Edge Cases days < 1)
- QuerySet-Logik (welche Jobs sind Kandidaten?)
- Bulk-Cleanup mit Dry-Run
- File-Löschung wird ausgelöst
- Wartende/laufende Jobs werden NICHT gelöscht
- finished_at-Fallback auf created_at
"""
from __future__ import annotations
from datetime import timedelta
from unittest.mock import patch
import pytest
from django.core.files.base import ContentFile
from django.utils import timezone
from mailmerge.models import JobLogEntry, LetterTemplate, MailMergeJob
from mailmerge.services.retention import (
CleanupResult,
cleanup_expired_jobs,
expired_jobs_queryset,
get_cutoff,
)
pytestmark = pytest.mark.django_db
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def template(user):
return LetterTemplate.objects.create(
name="Retention-Testvorlage",
file=ContentFile(b"dummy-docx", name="tpl.docx"),
placeholders=["vorname"],
created_by=user,
)
def _make_job(template, user, *, status, finished_offset_days=None,
created_offset_days=0, with_files=False):
"""Hilfsfunktion zum Anlegen eines Jobs mit künstlichem Zeitstempel.
finished_offset_days: positive Zahl = vor X Tagen abgeschlossen.
None = finished_at bleibt NULL (Fallback-Test).
Reihenfolge wichtig: Files MUSS vor dem Zurückdatieren der Timestamps
angehängt werden, weil `FileField.save(save=True)` ein `instance.save()`
triggert, das die per `.update()` gesetzten Datumswerte wieder mit den
aktuellen Memory-Werten überschreibt.
"""
now = timezone.now()
job = MailMergeJob.objects.create(
template=template,
recipients_csv=ContentFile(b"vorname\nAnna\n", name="r.csv"),
status=status,
created_by=user,
)
if with_files:
# save=False, damit kein zusätzliches instance.save() die
# Zeitstempel verbiegt. Die Datei landet trotzdem auf dem Storage.
job.result_pdf.save("out.pdf", ContentFile(b"%PDF-1.4"), save=False)
job.save(update_fields=["result_pdf"])
# Jetzt erst die Timestamps zurückdatieren — über `.update()`, damit
# auto_now_add nicht greift und kein zusätzliches .save() ausgelöst wird.
new_created = now - timedelta(days=created_offset_days)
new_finished = (
now - timedelta(days=finished_offset_days)
if finished_offset_days is not None else None
)
MailMergeJob.objects.filter(pk=job.pk).update(
created_at=new_created,
finished_at=new_finished,
)
job.refresh_from_db()
return job
# ---------------------------------------------------------------------------
# get_cutoff
# ---------------------------------------------------------------------------
class TestGetCutoff:
def test_uses_settings_default(self, settings):
settings.JOB_RETENTION_DAYS = 30
cutoff = get_cutoff()
delta = timezone.now() - cutoff
# Toleranz, weil ein paar Millisekunden zwischen den Aufrufen liegen
assert timedelta(days=29, hours=23) < delta < timedelta(days=30, hours=1)
def test_explicit_overrides_settings(self, settings):
settings.JOB_RETENTION_DAYS = 30
cutoff = get_cutoff(retention_days=7)
delta = timezone.now() - cutoff
assert timedelta(days=6, hours=23) < delta < timedelta(days=7, hours=1)
def test_zero_raises(self):
with pytest.raises(ValueError):
get_cutoff(retention_days=0)
def test_negative_raises(self):
with pytest.raises(ValueError):
get_cutoff(retention_days=-5)
# ---------------------------------------------------------------------------
# expired_jobs_queryset
# ---------------------------------------------------------------------------
class TestExpiredJobsQueryset:
def test_done_job_older_than_cutoff_is_candidate(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 1
def test_done_job_younger_than_cutoff_is_not_candidate(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=10)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 0
def test_failed_jobs_are_candidates(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.FAILED,
finished_offset_days=40)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 1
def test_running_jobs_are_never_candidates(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.RUNNING,
finished_offset_days=None, created_offset_days=999)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 0
def test_pending_jobs_are_never_candidates(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.PENDING,
finished_offset_days=None, created_offset_days=999)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 0
def test_finished_at_null_falls_back_to_created_at(self, template, user):
"""Job ohne finished_at, aber created_at > cutoff → muss matchen.
Schützt vor verwaisten 'DONE'-Jobs, bei denen aus irgendeinem Grund
finished_at fehlt (z.B. Crash zwischen save() und finished_at).
"""
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=None, created_offset_days=40)
cutoff = timezone.now() - timedelta(days=30)
assert expired_jobs_queryset(cutoff).count() == 1
# ---------------------------------------------------------------------------
# cleanup_expired_jobs
# ---------------------------------------------------------------------------
class TestCleanupHappyPath:
def test_deletes_expired_done_job(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_jobs == 1
assert MailMergeJob.objects.count() == 0
def test_keeps_recent_job(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=10)
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_jobs == 0
assert MailMergeJob.objects.count() == 1
def test_deletes_associated_files(self, template, user):
job = _make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40, with_files=True)
# Eingangs-CSV + Output-PDF = 2 Dateien
assert job.recipients_csv.name
assert job.result_pdf.name
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_files == 2
def test_cascade_removes_log_entries(self, template, user):
job = _make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
JobLogEntry.objects.create(job=job, level="info", message="x")
JobLogEntry.objects.create(job=job, level="info", message="y")
assert JobLogEntry.objects.count() == 2
cleanup_expired_jobs(retention_days=30)
assert JobLogEntry.objects.count() == 0
def test_returns_cleanup_result(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30)
assert isinstance(result, CleanupResult)
assert result.dry_run is False
assert result.candidates == 1
class TestCleanupDryRun:
def test_dry_run_deletes_nothing(self, template, user):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30, dry_run=True)
assert result.dry_run is True
assert result.candidates == 1
assert result.deleted_jobs == 0
assert MailMergeJob.objects.count() == 1
def test_dry_run_counts_candidates(self, template, user):
for _ in range(3):
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
result = cleanup_expired_jobs(retention_days=30, dry_run=True)
assert result.candidates == 3
class TestCleanupMixed:
def test_only_expired_terminal_jobs_are_touched(self, template, user):
# 1× expired DONE, 1× expired FAILED → werden gelöscht
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=40)
_make_job(template, user, status=MailMergeJob.Status.FAILED,
finished_offset_days=40)
# 1× recent DONE, 1× recent FAILED → bleiben
_make_job(template, user, status=MailMergeJob.Status.DONE,
finished_offset_days=10)
_make_job(template, user, status=MailMergeJob.Status.FAILED,
finished_offset_days=10)
# 1× running, 1× pending (älter als cutoff) → bleiben in jedem Fall
_make_job(template, user, status=MailMergeJob.Status.RUNNING,
finished_offset_days=None, created_offset_days=999)
_make_job(template, user, status=MailMergeJob.Status.PENDING,
finished_offset_days=None, created_offset_days=999)
result = cleanup_expired_jobs(retention_days=30)
assert result.deleted_jobs == 2
assert MailMergeJob.objects.count() == 4