Files
2026-05-22 09:14:31 +02:00

114 lines
4.0 KiB
Python

"""
Celery-Tasks für die PDF-Erzeugung.
"""
from __future__ import annotations
import csv
import io
import logging
import tempfile
from pathlib import Path
from celery import shared_task
from django.core.files import File
from django.utils import timezone
from .models import JobLogEntry, MailMergeJob
from .services.docx_renderer import docx_to_pdf, render_docx
from .services.pdf_merge import merge_pdfs
from .services.retention import cleanup_expired_jobs
logger = logging.getLogger(__name__)
def _log(job: MailMergeJob, level: str, msg: str) -> None:
JobLogEntry.objects.create(job=job, level=level, message=msg)
logger.log(getattr(logging, level.upper(), logging.INFO), "[job %s] %s", job.id, msg)
@shared_task(bind=True)
def run_mailmerge(self, job_id: str) -> str:
job = MailMergeJob.objects.select_related("template").get(pk=job_id)
job.status = MailMergeJob.Status.RUNNING
job.started_at = timezone.now()
job.save(update_fields=["status", "started_at"])
_log(job, "info", f"Job gestartet (task={self.request.id}).")
try:
# CSV einlesen
raw = job.recipients_csv.read().decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(raw))
rows = list(reader)
job.total_rows = len(rows)
job.save(update_fields=["total_rows"])
_log(job, "info", f"{len(rows)} Empfänger gefunden.")
if not rows:
raise ValueError("CSV enthält keine Datenzeilen.")
# CSV-Felder vs. Template-Platzhalter prüfen
csv_fields = set(reader.fieldnames or [])
placeholders = set(job.template.placeholders or [])
missing = placeholders - csv_fields
if missing:
_log(job, "warning",
f"CSV fehlen Spalten: {', '.join(sorted(missing))}")
# Jeden Brief rendern, alle zu einem PDF zusammenführen
with tempfile.TemporaryDirectory(prefix="mailmerge-") as tmpdir:
tmp = Path(tmpdir)
pdfs: list[Path] = []
template_path = Path(job.template.file.path)
for idx, row in enumerate(rows, start=1):
docx_out = tmp / f"letter_{idx:05d}.docx"
render_docx(template_path, row, docx_out)
pdf = docx_to_pdf(docx_out, tmp / "pdf")
pdfs.append(pdf)
job.processed_rows = idx
job.save(update_fields=["processed_rows"])
merged = tmp / f"serienbrief_{job.id}.pdf"
merge_pdfs(pdfs, merged)
with merged.open("rb") as f:
job.result_pdf.save(merged.name, File(f), save=False)
job.status = MailMergeJob.Status.DONE
job.finished_at = timezone.now()
job.save()
_log(job, "info", "Job erfolgreich abgeschlossen.")
return str(job.id)
except Exception as exc: # noqa: BLE001
job.status = MailMergeJob.Status.FAILED
job.error_message = str(exc)
job.finished_at = timezone.now()
job.save()
_log(job, "error", f"Job fehlgeschlagen: {exc}")
raise
# ---------------------------------------------------------------------------
# Retention-Cleanup
# ---------------------------------------------------------------------------
@shared_task(name="mailmerge.cleanup_expired_jobs")
def cleanup_expired_jobs_task(retention_days: int | None = None,
dry_run: bool = False) -> dict:
"""Periodic Task: löscht abgelaufene Jobs inkl. Files.
Wird vom Beat-Scheduler aufgerufen. Ergebnis als Dict (Celery-Result-
Backend serialisiert Dataclass-Instanzen nicht out-of-the-box).
"""
result = cleanup_expired_jobs(retention_days=retention_days, dry_run=dry_run)
summary = {
"cutoff": result.cutoff.isoformat(),
"dry_run": result.dry_run,
"candidates": result.candidates,
"deleted_jobs": result.deleted_jobs,
"deleted_files": result.deleted_files,
"errors": result.errors,
}
logger.info("Retention-Cleanup-Task abgeschlossen: %s", summary)
return summary