#!/usr/bin/env python3 """ ics_mail_importer_env_v6.py --------------------------- ICS-Importer für mailbox.org / CalDAV. Verbesserungen in v6: - CAL-4121-Konflikte werden als "bereits aktueller Serverstand" behandelt - Timeout-Fehler beim CalDAV-Upload werden mit Retries und Backoff erneut versucht - Netzwerk-/Timeout-Probleme werden getrennt von fachlichen Importfehlern protokolliert - Mails werden nur bei erfolgreicher Verarbeitung oder erkanntem Konflikt als verarbeitet markiert Optionale .env-Werte: - CALDAV_TIMEOUT=90 - CALDAV_RETRIES=3 - CALDAV_RETRY_BACKOFF=5 """ import imaplib import email import logging import hashlib import os import time from pathlib import Path from datetime import datetime, timezone from dotenv import load_dotenv import caldav from icalendar import Calendar BASE_DIR = Path(__file__).resolve().parent load_dotenv(BASE_DIR / ".env") LOG_FILE = BASE_DIR / "ics_importer.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()], ) log = logging.getLogger(__name__) SEEN_FILE = BASE_DIR / "imported_uids.txt" def load_seen_uids(): if not SEEN_FILE.exists(): return set() return set(SEEN_FILE.read_text().splitlines()) def save_uid(uid: str): with SEEN_FILE.open("a") as f: f.write(uid + "\n") def env_bool(name: str, default: str = "false") -> bool: return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} def env_int(name: str, default: int) -> int: value = os.getenv(name) if value is None or not value.strip(): return default return int(value.strip()) def exception_text(exc: Exception) -> str: return str(exc or "") def is_newer_version_conflict(exc: Exception) -> bool: message = exception_text(exc).lower() patterns = [ "412 precondition failed", "cal-4121", "newer version of the appointment already exists", "concurrent modification", "actual sequence", "client sequence", ] return sum(1 for pattern in patterns if pattern in message) >= 2 def is_timeout_error(exc: Exception) -> bool: message = exception_text(exc).lower() timeout_patterns = [ "read timed out", "timeout", "timed out", "read timeout", ] return any(pattern in message for pattern in timeout_patterns) def add_event_with_retries(calendar, ical_text: str, summary: str, uid: str, sequence, timeout: int, retries: int, backoff: int): last_exception = None for attempt in range(1, retries + 1): try: calendar.add_event(ical_text) log.info(f" ✓ Termin importiert: {summary} [{uid[:20]}]") return "imported" except Exception as e: last_exception = e exc_name = type(e).__name__ exc_message = exception_text(e) log.info(f" Exception-Typ beim Import: {exc_name}") if is_newer_version_conflict(e): log.info( f" ↷ Konflikt ignoriert: Neuere Version existiert bereits: {summary} " f"[{uid[:20]}], SEQUENCE={sequence}" ) return "conflict" if is_timeout_error(e): if attempt < retries: wait_seconds = backoff * attempt log.warning( f" ⏳ Timeout beim Import von {uid[:20]} (Versuch {attempt}/{retries}, Timeout={timeout}s). " f"Neuer Versuch in {wait_seconds}s ..." ) time.sleep(wait_seconds) continue log.warning( f" ✗ Timeout beim Import von {uid[:20]} nach {retries} Versuchen: {exc_message}" ) return "timeout" log.warning(f" ✗ Fehler beim Import von {uid[:20]}: {exc_message}") return "error" if last_exception is not None: raise last_exception return "error" def import_ics_to_caldav(ics_bytes: bytes, uid_hash: str): caldav_url = os.getenv("CALDAV_URL") username = os.getenv("CALDAV_USERNAME") password = os.getenv("CALDAV_PASSWORD") timeout = env_int("CALDAV_TIMEOUT", 90) retries = env_int("CALDAV_RETRIES", 3) backoff = env_int("CALDAV_RETRY_BACKOFF", 5) if not caldav_url or not username or not password: raise RuntimeError( "CALDAV_URL, CALDAV_USERNAME oder CALDAV_PASSWORD fehlt in .env" ) client = caldav.DAVClient(url=caldav_url, username=username, password=password, timeout=timeout) calendar = client.calendar(url=caldav_url) cal = Calendar.from_ical(ics_bytes) imported = 0 conflicts = 0 timeouts = 0 errors = 0 for component in cal.walk(): if component.name != "VEVENT": continue single_cal = Calendar() single_cal.add("prodid", "-//ics_mail_importer//DE") single_cal.add("version", "2.0") single_cal.add_component(component) uid = str(component.get("uid", uid_hash + f"-{imported + conflicts + timeouts + errors}")) summary = str(component.get("summary", "(kein Titel)")) sequence = component.get("sequence", "(keine sequence)") result = add_event_with_retries( calendar, single_cal.to_ical().decode("utf-8"), summary, uid, sequence, timeout, retries, backoff, ) if result == "imported": imported += 1 elif result == "conflict": conflicts += 1 elif result == "timeout": timeouts += 1 else: errors += 1 return imported, conflicts, timeouts, errors def process_mailbox(): host = os.getenv("IMAP_HOST", "imap.mailbox.org") port = int(os.getenv("IMAP_PORT", "993")) username = os.getenv("IMAP_USERNAME") password = os.getenv("IMAP_PASSWORD") folder = os.getenv("IMAP_FOLDER", "INBOX") if not username or not password: raise RuntimeError("IMAP_USERNAME oder IMAP_PASSWORD fehlt in .env") unseen_only = env_bool("IMAP_UNSEEN_ONLY", "true") mark_as_read = env_bool("IMAP_MARK_AS_READ", "false") log.info(f"Verbinde mit IMAP {host}:{port} als {username} ...") conn = imaplib.IMAP4_SSL(host, port) conn.login(username, password) conn.select(folder) criteria = "UNSEEN" if unseen_only else "ALL" typ, data = conn.uid("search", None, criteria) if typ != "OK": raise RuntimeError("IMAP-Suche fehlgeschlagen") message_uids = data[0].split() log.info(f"{len(message_uids)} Nachricht(en) zur Prüfung gefunden.") imported_hashes = load_seen_uids() total_imported = 0 total_conflicts = 0 total_timeouts = 0 total_errors = 0 processed_mail_count = 0 for msg_uid in message_uids: typ, fetched = conn.uid("fetch", msg_uid, "(BODY.PEEK[])") if typ != "OK" or not fetched or not fetched[0]: log.warning(f"Konnte Nachricht UID {msg_uid.decode()} nicht abrufen.") continue raw = fetched[0][1] msg = email.message_from_bytes(raw) mail_had_ics = False mail_processed = False subject = msg.get("Subject", "(ohne Betreff)") log.info(f"Prüfe Mail UID {msg_uid.decode()} – Betreff: {subject}") for part in msg.walk(): content_type = part.get_content_type() filename = part.get_filename() or "" is_ics = content_type in ("text/calendar", "application/ics") or filename.lower().endswith(".ics") if not is_ics: continue ics_bytes = part.get_payload(decode=True) if not ics_bytes: continue mail_had_ics = True uid_hash = hashlib.sha256(ics_bytes).hexdigest() if uid_hash in imported_hashes: log.info(f" Überspringe bereits verarbeiteten ICS-Anhang {filename!r} ({uid_hash[:12]}…)") mail_processed = True continue log.info(f" ICS-Anhang gefunden: {filename!r} ({uid_hash[:12]}…)") try: imported_count, conflict_count, timeout_count, error_count = import_ics_to_caldav(ics_bytes, uid_hash) total_imported += imported_count total_conflicts += conflict_count total_timeouts += timeout_count total_errors += error_count if imported_count > 0 or conflict_count > 0: save_uid(uid_hash) imported_hashes.add(uid_hash) mail_processed = True else: log.warning( f" Kein VEVENT erfolgreich verarbeitet für Anhang {filename!r}; Mail bleibt ungelesen." ) except Exception as e: log.warning( f" Fehler beim Verarbeiten von {filename!r}: {e}; Mail bleibt ungelesen." ) if mark_as_read and mail_processed: store_typ, _ = conn.uid("store", msg_uid, "+FLAGS", "(\\Seen)") if store_typ == "OK": processed_mail_count += 1 log.info(f"Mail UID {msg_uid.decode()} erfolgreich verarbeitet und als gelesen markiert.") else: log.warning( f"Mail UID {msg_uid.decode()} wurde verarbeitet, konnte aber nicht als gelesen markiert werden." ) elif mark_as_read and mail_had_ics and not mail_processed: log.info(f"Mail UID {msg_uid.decode()} hatte ICS-Anhang, aber keine erfolgreiche Verarbeitung; bleibt ungelesen.") conn.close() conn.logout() log.info(f"Fertig. {total_imported} Termin(e) importiert.") log.info(f"{total_conflicts} Konflikt(e) als bereits aktueller Serverstand erkannt.") log.info(f"{total_timeouts} Timeout-Fall/Fälle beim CalDAV-Upload.") log.info(f"{total_errors} sonstige Fehler beim CalDAV-Upload.") log.info(f"{processed_mail_count} Mail(s) als gelesen markiert.") def main(): log.info("=" * 60) log.info(f"ICS-Importer gestartet ({datetime.now(timezone.utc).isoformat()})") process_mailbox() log.info("=" * 60) if __name__ == "__main__": main()