#!/usr/bin/env python3 """ ics_mail_importer_env_v2.py --------------------------- Sucht in einem mailbox.org IMAP-Postfach nach E-Mails mit .ics-Anhängen und importiert die enthaltenen Termine automatisch in einen CalDAV-Kalender. Wichtige Eigenschaften: - Konfiguration über .env-Datei mit python-dotenv - Duplikat-Schutz über Hash der ICS-Anhänge - E-Mails werden beim Abrufen nicht automatisch als gelesen markiert - Nur E-Mails mit erfolgreich verarbeitetem ICS-Anhang werden optional als gelesen markiert """ import imaplib import email import logging import hashlib import os from pathlib import Path from datetime import datetime, timezone from dotenv import load_dotenv import caldav from icalendar import Calendar BASE_DIR = Path(__file__).resolve().parent load_dotenv(BASE_DIR / ".env") LOG_FILE = BASE_DIR / "ics_importer.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()], ) log = logging.getLogger(__name__) SEEN_FILE = BASE_DIR / "imported_uids.txt" def load_seen_uids(): if not SEEN_FILE.exists(): return set() return set(SEEN_FILE.read_text().splitlines()) def save_uid(uid: str): with SEEN_FILE.open("a") as f: f.write(uid + "\n") def env_bool(name: str, default: str = "false") -> bool: return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} def import_ics_to_caldav(ics_bytes: bytes, uid_hash: str) -> int: caldav_url = os.getenv("CALDAV_URL") username = os.getenv("CALDAV_USERNAME") password = os.getenv("CALDAV_PASSWORD") if not caldav_url or not username or not password: raise RuntimeError( "CALDAV_URL, CALDAV_USERNAME oder CALDAV_PASSWORD fehlt in .env" ) client = caldav.DAVClient(url=caldav_url, username=username, password=password) calendar = client.calendar(url=caldav_url) cal = Calendar.from_ical(ics_bytes) imported = 0 for component in cal.walk(): if component.name != "VEVENT": continue single_cal = Calendar() single_cal.add("prodid", "-//ics_mail_importer//DE") single_cal.add("version", "2.0") single_cal.add_component(component) uid = str(component.get("uid", uid_hash + f"-{imported}")) try: calendar.add_event(single_cal.to_ical().decode("utf-8")) log.info( f" ✓ Termin importiert: {component.get('summary', '(kein Titel)')} [{uid[:20]}]" ) imported += 1 except Exception as e: log.warning(f" ✗ Fehler beim Import von {uid[:20]}: {e}") return imported def process_mailbox(): host = os.getenv("IMAP_HOST", "imap.mailbox.org") port = int(os.getenv("IMAP_PORT", "993")) username = os.getenv("IMAP_USERNAME") password = os.getenv("IMAP_PASSWORD") folder = os.getenv("IMAP_FOLDER", "INBOX") if not username or not password: raise RuntimeError("IMAP_USERNAME oder IMAP_PASSWORD fehlt in .env") unseen_only = env_bool("IMAP_UNSEEN_ONLY", "true") mark_as_read = env_bool("IMAP_MARK_AS_READ", "false") log.info(f"Verbinde mit IMAP {host}:{port} als {username} ...") conn = imaplib.IMAP4_SSL(host, port) conn.login(username, password) conn.select(folder) criteria = "UNSEEN" if unseen_only else "ALL" typ, data = conn.uid("search", None, criteria) if typ != "OK": raise RuntimeError("IMAP-Suche fehlgeschlagen") message_uids = data[0].split() log.info(f"{len(message_uids)} Nachricht(en) zur Prüfung gefunden.") imported_hashes = load_seen_uids() total_imported = 0 processed_mail_count = 0 for msg_uid in message_uids: typ, fetched = conn.uid("fetch", msg_uid, "(BODY.PEEK[])") if typ != "OK" or not fetched or not fetched[0]: log.warning(f"Konnte Nachricht UID {msg_uid.decode()} nicht abrufen.") continue raw = fetched[0][1] msg = email.message_from_bytes(raw) mail_had_ics = False mail_had_successful_import = False subject = msg.get("Subject", "(ohne Betreff)") log.info(f"Prüfe Mail UID {msg_uid.decode()} – Betreff: {subject}") for part in msg.walk(): content_type = part.get_content_type() filename = part.get_filename() or "" is_ics = ( content_type in ("text/calendar", "application/ics") or filename.lower().endswith(".ics") ) if not is_ics: continue ics_bytes = part.get_payload(decode=True) if not ics_bytes: continue mail_had_ics = True uid_hash = hashlib.sha256(ics_bytes).hexdigest() if uid_hash in imported_hashes: log.info( f" Überspringe bereits importierten ICS-Anhang {filename!r} ({uid_hash[:12]}…)" ) continue log.info(f" ICS-Anhang gefunden: {filename!r} ({uid_hash[:12]}…)") try: imported_count = import_ics_to_caldav(ics_bytes, uid_hash) if imported_count > 0: save_uid(uid_hash) imported_hashes.add(uid_hash) total_imported += imported_count mail_had_successful_import = True else: log.warning( f" Kein VEVENT importiert für Anhang {filename!r}; Mail bleibt ungelesen." ) except Exception as e: log.warning( f" Fehler beim Verarbeiten von {filename!r}: {e}; Mail bleibt ungelesen." ) if mark_as_read and mail_had_successful_import: store_typ, _ = conn.uid("store", msg_uid, "+FLAGS", "(\\Seen)") if store_typ == "OK": processed_mail_count += 1 log.info( f"Mail UID {msg_uid.decode()} erfolgreich verarbeitet und als gelesen markiert." ) else: log.warning( f"Mail UID {msg_uid.decode()} wurde importiert, konnte aber nicht als gelesen markiert werden." ) elif mark_as_read and mail_had_ics and not mail_had_successful_import: log.info( f"Mail UID {msg_uid.decode()} hatte ICS-Anhang, aber keinen erfolgreichen Import; bleibt ungelesen." ) conn.close() conn.logout() log.info(f"Fertig. {total_imported} Termin(e) importiert.") log.info(f"{processed_mail_count} Mail(s) als gelesen markiert.") def main(): log.info("=" * 60) log.info(f"ICS-Importer gestartet ({datetime.now(timezone.utc).isoformat()})") process_mailbox() log.info("=" * 60) if __name__ == "__main__": main()