Nur mails mit ics anängen werden als gelesen markiert
This commit is contained in:
@@ -0,0 +1,210 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ics_mail_importer_env_v2.py
|
||||
---------------------------
|
||||
Sucht in einem mailbox.org IMAP-Postfach nach E-Mails mit .ics-Anhängen
|
||||
und importiert die enthaltenen Termine automatisch in einen CalDAV-Kalender.
|
||||
|
||||
Wichtige Eigenschaften:
|
||||
- Konfiguration über .env-Datei mit python-dotenv
|
||||
- Duplikat-Schutz über Hash der ICS-Anhänge
|
||||
- E-Mails werden beim Abrufen nicht automatisch als gelesen markiert
|
||||
- Nur E-Mails mit erfolgreich verarbeitetem ICS-Anhang werden optional als gelesen markiert
|
||||
"""
|
||||
|
||||
import imaplib
|
||||
import email
|
||||
import logging
|
||||
import hashlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import caldav
|
||||
from icalendar import Calendar
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
load_dotenv(BASE_DIR / ".env")
|
||||
|
||||
LOG_FILE = BASE_DIR / "ics_importer.log"
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()],
|
||||
)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SEEN_FILE = BASE_DIR / "imported_uids.txt"
|
||||
|
||||
|
||||
def load_seen_uids():
|
||||
if not SEEN_FILE.exists():
|
||||
return set()
|
||||
return set(SEEN_FILE.read_text().splitlines())
|
||||
|
||||
|
||||
def save_uid(uid: str):
|
||||
with SEEN_FILE.open("a") as f:
|
||||
f.write(uid + "\n")
|
||||
|
||||
|
||||
def env_bool(name: str, default: str = "false") -> bool:
|
||||
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def import_ics_to_caldav(ics_bytes: bytes, uid_hash: str) -> int:
|
||||
caldav_url = os.getenv("CALDAV_URL")
|
||||
username = os.getenv("CALDAV_USERNAME")
|
||||
password = os.getenv("CALDAV_PASSWORD")
|
||||
|
||||
if not caldav_url or not username or not password:
|
||||
raise RuntimeError(
|
||||
"CALDAV_URL, CALDAV_USERNAME oder CALDAV_PASSWORD fehlt in .env"
|
||||
)
|
||||
|
||||
client = caldav.DAVClient(url=caldav_url, username=username, password=password)
|
||||
calendar = client.calendar(url=caldav_url)
|
||||
cal = Calendar.from_ical(ics_bytes)
|
||||
|
||||
imported = 0
|
||||
for component in cal.walk():
|
||||
if component.name != "VEVENT":
|
||||
continue
|
||||
|
||||
single_cal = Calendar()
|
||||
single_cal.add("prodid", "-//ics_mail_importer//DE")
|
||||
single_cal.add("version", "2.0")
|
||||
single_cal.add_component(component)
|
||||
|
||||
uid = str(component.get("uid", uid_hash + f"-{imported}"))
|
||||
try:
|
||||
calendar.add_event(single_cal.to_ical().decode("utf-8"))
|
||||
log.info(
|
||||
f" ✓ Termin importiert: {component.get('summary', '(kein Titel)')} [{uid[:20]}]"
|
||||
)
|
||||
imported += 1
|
||||
except Exception as e:
|
||||
log.warning(f" ✗ Fehler beim Import von {uid[:20]}: {e}")
|
||||
|
||||
return imported
|
||||
|
||||
|
||||
def process_mailbox():
|
||||
host = os.getenv("IMAP_HOST", "imap.mailbox.org")
|
||||
port = int(os.getenv("IMAP_PORT", "993"))
|
||||
username = os.getenv("IMAP_USERNAME")
|
||||
password = os.getenv("IMAP_PASSWORD")
|
||||
folder = os.getenv("IMAP_FOLDER", "INBOX")
|
||||
|
||||
if not username or not password:
|
||||
raise RuntimeError("IMAP_USERNAME oder IMAP_PASSWORD fehlt in .env")
|
||||
|
||||
unseen_only = env_bool("IMAP_UNSEEN_ONLY", "true")
|
||||
mark_as_read = env_bool("IMAP_MARK_AS_READ", "false")
|
||||
|
||||
log.info(f"Verbinde mit IMAP {host}:{port} als {username} ...")
|
||||
conn = imaplib.IMAP4_SSL(host, port)
|
||||
conn.login(username, password)
|
||||
|
||||
conn.select(folder)
|
||||
criteria = "UNSEEN" if unseen_only else "ALL"
|
||||
typ, data = conn.uid("search", None, criteria)
|
||||
if typ != "OK":
|
||||
raise RuntimeError("IMAP-Suche fehlgeschlagen")
|
||||
|
||||
message_uids = data[0].split()
|
||||
log.info(f"{len(message_uids)} Nachricht(en) zur Prüfung gefunden.")
|
||||
|
||||
imported_hashes = load_seen_uids()
|
||||
total_imported = 0
|
||||
processed_mail_count = 0
|
||||
|
||||
for msg_uid in message_uids:
|
||||
typ, fetched = conn.uid("fetch", msg_uid, "(BODY.PEEK[])")
|
||||
if typ != "OK" or not fetched or not fetched[0]:
|
||||
log.warning(f"Konnte Nachricht UID {msg_uid.decode()} nicht abrufen.")
|
||||
continue
|
||||
|
||||
raw = fetched[0][1]
|
||||
msg = email.message_from_bytes(raw)
|
||||
|
||||
mail_had_ics = False
|
||||
mail_had_successful_import = False
|
||||
|
||||
subject = msg.get("Subject", "(ohne Betreff)")
|
||||
log.info(f"Prüfe Mail UID {msg_uid.decode()} – Betreff: {subject}")
|
||||
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
filename = part.get_filename() or ""
|
||||
is_ics = (
|
||||
content_type in ("text/calendar", "application/ics")
|
||||
or filename.lower().endswith(".ics")
|
||||
)
|
||||
if not is_ics:
|
||||
continue
|
||||
|
||||
ics_bytes = part.get_payload(decode=True)
|
||||
if not ics_bytes:
|
||||
continue
|
||||
|
||||
mail_had_ics = True
|
||||
uid_hash = hashlib.sha256(ics_bytes).hexdigest()
|
||||
|
||||
if uid_hash in imported_hashes:
|
||||
log.info(
|
||||
f" Überspringe bereits importierten ICS-Anhang {filename!r} ({uid_hash[:12]}…)"
|
||||
)
|
||||
continue
|
||||
|
||||
log.info(f" ICS-Anhang gefunden: {filename!r} ({uid_hash[:12]}…)")
|
||||
|
||||
try:
|
||||
imported_count = import_ics_to_caldav(ics_bytes, uid_hash)
|
||||
if imported_count > 0:
|
||||
save_uid(uid_hash)
|
||||
imported_hashes.add(uid_hash)
|
||||
total_imported += imported_count
|
||||
mail_had_successful_import = True
|
||||
else:
|
||||
log.warning(
|
||||
f" Kein VEVENT importiert für Anhang {filename!r}; Mail bleibt ungelesen."
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning(
|
||||
f" Fehler beim Verarbeiten von {filename!r}: {e}; Mail bleibt ungelesen."
|
||||
)
|
||||
|
||||
if mark_as_read and mail_had_successful_import:
|
||||
store_typ, _ = conn.uid("store", msg_uid, "+FLAGS", "(\\Seen)")
|
||||
if store_typ == "OK":
|
||||
processed_mail_count += 1
|
||||
log.info(
|
||||
f"Mail UID {msg_uid.decode()} erfolgreich verarbeitet und als gelesen markiert."
|
||||
)
|
||||
else:
|
||||
log.warning(
|
||||
f"Mail UID {msg_uid.decode()} wurde importiert, konnte aber nicht als gelesen markiert werden."
|
||||
)
|
||||
elif mark_as_read and mail_had_ics and not mail_had_successful_import:
|
||||
log.info(
|
||||
f"Mail UID {msg_uid.decode()} hatte ICS-Anhang, aber keinen erfolgreichen Import; bleibt ungelesen."
|
||||
)
|
||||
|
||||
conn.close()
|
||||
conn.logout()
|
||||
|
||||
log.info(f"Fertig. {total_imported} Termin(e) importiert.")
|
||||
log.info(f"{processed_mail_count} Mail(s) als gelesen markiert.")
|
||||
|
||||
|
||||
def main():
|
||||
log.info("=" * 60)
|
||||
log.info(f"ICS-Importer gestartet ({datetime.now(timezone.utc).isoformat()})")
|
||||
process_mailbox()
|
||||
log.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user