#!/usr/bin/env python3 """ ics_mail_importer_env_v5.py --------------------------- ICS-Importer für mailbox.org / CalDAV. Verbesserungen in v4: - Liest Mails per BODY.PEEK[] ohne automatisches Setzen von \Seen - Markiert nur verarbeitete ICS-Mails optional als gelesen - Behandelt mailbox.org / OX-Konflikte vom Typ CAL-4121 (neuere Version existiert) als nicht-fatale Konflikte statt als Importfehler - Protokolliert Konflikte klar als "bereits neuer vorhanden" """ import imaplib import email import logging import hashlib import os from pathlib import Path from datetime import datetime, timezone from dotenv import load_dotenv import caldav from icalendar import Calendar BASE_DIR = Path(__file__).resolve().parent load_dotenv(BASE_DIR / ".env") LOG_FILE = BASE_DIR / "ics_importer.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()], ) log = logging.getLogger(__name__) SEEN_FILE = BASE_DIR / "imported_uids.txt" def load_seen_uids(): if not SEEN_FILE.exists(): return set() return set(SEEN_FILE.read_text().splitlines()) def save_uid(uid: str): with SEEN_FILE.open("a") as f: f.write(uid + "\n") def env_bool(name: str, default: str = "false") -> bool: return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} def is_newer_version_conflict(exc: Exception) -> bool: message = str(exc or "").lower() patterns = [ "412 precondition failed", "cal-4121", "newer version of the appointment already exists", "concurrent modification", "client sequence", "actual sequence", ] matches = sum(1 for p in patterns if p in message) return matches >= 2 def import_ics_to_caldav(ics_bytes: bytes, uid_hash: str): caldav_url = os.getenv("CALDAV_URL") username = os.getenv("CALDAV_USERNAME") password = os.getenv("CALDAV_PASSWORD") if not caldav_url or not username or not password: raise RuntimeError( "CALDAV_URL, CALDAV_USERNAME oder CALDAV_PASSWORD fehlt in .env" ) client = caldav.DAVClient(url=caldav_url, username=username, password=password) calendar = client.calendar(url=caldav_url) cal = Calendar.from_ical(ics_bytes) imported = 0 conflicts = 0 for component in cal.walk(): if component.name != "VEVENT": continue single_cal = Calendar() single_cal.add("prodid", "-//ics_mail_importer//DE") single_cal.add("version", "2.0") single_cal.add_component(component) uid = str(component.get("uid", uid_hash + f"-{imported + conflicts}")) summary = component.get("summary", "(kein Titel)") sequence = component.get("sequence", "(keine sequence)") try: calendar.add_event(single_cal.to_ical().decode("utf-8")) log.info(f" ✓ Termin importiert: {summary} [{uid[:20]}]") imported += 1 except Exception as e: if is_newer_version_conflict(e): log.info( f" ↷ Konflikt ignoriert: Neuere Version existiert bereits: {summary} " f"[{uid[:20]}], SEQUENCE={sequence}" ) conflicts += 1 else: log.warning(f" ✗ Fehler beim Import von {uid[:20]}: {e}") return imported, conflicts def process_mailbox(): host = os.getenv("IMAP_HOST", "imap.mailbox.org") port = int(os.getenv("IMAP_PORT", "993")) username = os.getenv("IMAP_USERNAME") password = os.getenv("IMAP_PASSWORD") folder = os.getenv("IMAP_FOLDER", "INBOX") if not username or not password: raise RuntimeError("IMAP_USERNAME oder IMAP_PASSWORD fehlt in .env") unseen_only = env_bool("IMAP_UNSEEN_ONLY", "true") mark_as_read = env_bool("IMAP_MARK_AS_READ", "false") log.info(f"Verbinde mit IMAP {host}:{port} als {username} ...") conn = imaplib.IMAP4_SSL(host, port) conn.login(username, password) conn.select(folder) criteria = "UNSEEN" if unseen_only else "ALL" typ, data = conn.uid("search", None, criteria) if typ != "OK": raise RuntimeError("IMAP-Suche fehlgeschlagen") message_uids = data[0].split() log.info(f"{len(message_uids)} Nachricht(en) zur Prüfung gefunden.") imported_hashes = load_seen_uids() total_imported = 0 total_conflicts = 0 processed_mail_count = 0 for msg_uid in message_uids: typ, fetched = conn.uid("fetch", msg_uid, "(BODY.PEEK[])") if typ != "OK" or not fetched or not fetched[0]: log.warning(f"Konnte Nachricht UID {msg_uid.decode()} nicht abrufen.") continue raw = fetched[0][1] msg = email.message_from_bytes(raw) mail_had_ics = False mail_processed = False subject = msg.get("Subject", "(ohne Betreff)") log.info(f"Prüfe Mail UID {msg_uid.decode()} – Betreff: {subject}") for part in msg.walk(): content_type = part.get_content_type() filename = part.get_filename() or "" is_ics = content_type in ("text/calendar", "application/ics") or filename.lower().endswith(".ics") if not is_ics: continue ics_bytes = part.get_payload(decode=True) if not ics_bytes: continue mail_had_ics = True uid_hash = hashlib.sha256(ics_bytes).hexdigest() if uid_hash in imported_hashes: log.info(f" Überspringe bereits verarbeiteten ICS-Anhang {filename!r} ({uid_hash[:12]}…)") mail_processed = True continue log.info(f" ICS-Anhang gefunden: {filename!r} ({uid_hash[:12]}…)") try: imported_count, conflict_count = import_ics_to_caldav(ics_bytes, uid_hash) total_imported += imported_count total_conflicts += conflict_count if imported_count > 0 or conflict_count > 0: save_uid(uid_hash) imported_hashes.add(uid_hash) mail_processed = True else: log.warning( f" Kein VEVENT importiert oder als Konflikt erkannt für Anhang {filename!r}; Mail bleibt ungelesen." ) except Exception as e: log.warning( f" Fehler beim Verarbeiten von {filename!r}: {e}; Mail bleibt ungelesen." ) if mark_as_read and mail_processed: store_typ, _ = conn.uid("store", msg_uid, "+FLAGS", "(\\Seen)") if store_typ == "OK": processed_mail_count += 1 log.info(f"Mail UID {msg_uid.decode()} erfolgreich verarbeitet und als gelesen markiert.") else: log.warning( f"Mail UID {msg_uid.decode()} wurde verarbeitet, konnte aber nicht als gelesen markiert werden." ) elif mark_as_read and mail_had_ics and not mail_processed: log.info(f"Mail UID {msg_uid.decode()} hatte ICS-Anhang, aber keine erfolgreiche Verarbeitung; bleibt ungelesen.") conn.close() conn.logout() log.info(f"Fertig. {total_imported} Termin(e) importiert.") log.info(f"{total_conflicts} Konflikt(e) als bereits aktueller Serverstand erkannt.") log.info(f"{processed_mail_count} Mail(s) als gelesen markiert.") def main(): log.info("=" * 60) log.info(f"ICS-Importer gestartet ({datetime.now(timezone.utc).isoformat()})") process_mailbox() log.info("=" * 60) if __name__ == "__main__": main()