""" Celery-Tasks fuer die automatische Verarbeitung eingehender E-Mails. Workflow: 1. `poll_emails` laeuft alle 15 Minuten (Celery Beat) 2. Er liest ungelesene E-Mails aus dem IMAP-Postfach 3. Fuer jede E-Mail: a) Absender wird mit Destinataer-Datenbank abgeglichen (E-Mail-Feld) b) Betreff/Body wird auf Rechnungs-Keywords geprueft c) Ein EmailEingang-Datensatz wird angelegt (mit Kategorie) d) Alle Anhaenge werden als DokumentDatei im Django-DMS gespeichert 4. Nicht erkannte Absender werden als "unbekannt" markiert (manuelle Nachbearbeitung) Konfiguration (Umgebungsvariablen in .env / compose.yml): IMAP_HOST — IMAP-Server-Adresse (z. B. mail.vhtv-stiftung.de) IMAP_PORT — Port (Standard: 993 fuer SSL) IMAP_USER — Benutzername IMAP_PASSWORD — Passwort IMAP_FOLDER — Ordner (Standard: INBOX) """ import email import email.utils import imaplib import logging import mimetypes import re from datetime import datetime, timezone as dt_timezone from email.header import decode_header, make_header from celery import shared_task from django.utils import timezone logger = logging.getLogger(__name__) # Patterns fuer Rechnungserkennung im Betreff/Body RECHNUNG_PATTERNS = [ re.compile(r"\brechnung\b", re.IGNORECASE), re.compile(r"\binvoice\b", re.IGNORECASE), re.compile(r"\brechnungs[-\s]?nr\.?\s*[:\s]?\s*\d+", re.IGNORECASE), re.compile(r"\bRE[-/]\d{4,}", re.IGNORECASE), # RE-2024001, RE/20240315 ] GESCHICHTE_PATTERNS = [ re.compile(r"\bstiftungsgeschichte\b", re.IGNORECASE), re.compile(r"\bahnenforschung\b", re.IGNORECASE), re.compile(r"\bgenealogie\b", re.IGNORECASE), re.compile(r"\bstammbaum\b", re.IGNORECASE), re.compile(r"\bhistorisch", re.IGNORECASE), re.compile(r"\bchronik\b", re.IGNORECASE), re.compile(r"\barchiv\b", re.IGNORECASE), re.compile(r"\bfamiliengeschichte\b", re.IGNORECASE), re.compile(r"\burkunde\b", re.IGNORECASE), ] # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def _decode_header_value(raw_value: str) -> str: """Dekodiert kodierte E-Mail-Header (z. B. UTF-8 oder Latin-1).""" if not raw_value: return "" try: return str(make_header(decode_header(raw_value))) except Exception: return raw_value def _parse_email_date(date_str: str) -> datetime: """Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurueck.""" try: parsed = email.utils.parsedate_to_datetime(date_str) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=dt_timezone.utc) return parsed except Exception: return timezone.now() def _get_email_body(msg) -> str: """Extrahiert den Text-Body aus einer E-Mail (bevorzugt plain text).""" body_parts = [] if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() disposition = str(part.get_content_disposition() or "") if ctype == "text/plain" and "attachment" not in disposition: charset = part.get_content_charset() or "utf-8" try: body_parts.append(part.get_payload(decode=True).decode(charset, errors="replace")) except Exception: pass else: charset = msg.get_content_charset() or "utf-8" try: body_parts.append(msg.get_payload(decode=True).decode(charset, errors="replace")) except Exception: pass return "\n".join(body_parts).strip() def _detect_kategorie(betreff: str, email_text: str, has_destinataer: bool) -> str: """ Erkennt die Kategorie einer Email anhand von Betreff und Body. Gibt 'destinataer', 'rechnung', 'stiftungsgeschichte', oder 'allgemein' zurueck. """ if has_destinataer: return "destinataer" text_to_check = f"{betreff}\n{email_text[:2000]}" # Rechnungserkennung via Patterns for pattern in RECHNUNG_PATTERNS: if pattern.search(text_to_check): return "rechnung" # Stiftungsgeschichte-Erkennung for pattern in GESCHICHTE_PATTERNS: if pattern.search(text_to_check): return "stiftungsgeschichte" return "allgemein" def _save_to_dms(content: bytes, filename: str, destinataer=None, betreff: str = "", kontext: str = "korrespondenz"): """ Speichert einen E-Mail-Anhang direkt als DokumentDatei im Django-DMS. Gibt das DokumentDatei-Objekt zurueck, oder None bei Fehler. """ from stiftung.models import DokumentDatei from django.core.files.base import ContentFile safe_filename = filename or "anhang.bin" mime_type, _ = mimetypes.guess_type(safe_filename) mime_type = mime_type or "application/octet-stream" titel = f"{betreff[:100]} – {safe_filename}" if betreff else safe_filename beschreibung = "" if destinataer: beschreibung = ( f"Automatisch importiert aus E-Mail-Eingang.\n" f"Absender: {destinataer.vorname} {destinataer.nachname} <{destinataer.email}>" ) try: doc = DokumentDatei( titel=titel[:255], beschreibung=beschreibung, kontext=kontext, dateiname_original=safe_filename, dateityp=mime_type, dateigroesse=len(content), destinataer=destinataer, ) doc.datei.save(safe_filename, ContentFile(content), save=False) doc.save() logger.info("Anhang '%s' als DokumentDatei gespeichert (ID: %s).", safe_filename, doc.pk) return doc except Exception as exc: logger.error("Fehler beim Speichern von '%s' im DMS: %s", safe_filename, exc) return None # --------------------------------------------------------------------------- # Haupttask # --------------------------------------------------------------------------- @shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_emails") def poll_emails(self, search_all_recent_days=0): """ Liest E-Mails aus dem IMAP-Postfach und verarbeitet sie. Wird durch Celery Beat alle 15 Minuten ausgefuehrt. Erkennt automatisch Destinataer-Emails, Rechnungen und allgemeine Post. Args: search_all_recent_days: Wenn > 0, werden alle E-Mails der letzten N Tage durchsucht (nicht nur ungelesene). Nuetzlich fuer manuellen Abruf. """ from stiftung.models import Destinataer, EmailEingang # IMAP-Konfiguration: DB (AppConfiguration) mit Fallback auf env/settings from stiftung.utils.config import get_config imap_host = get_config("imap_host") imap_port = int(get_config("imap_port", 993)) imap_user = get_config("imap_user") imap_password = get_config("imap_password") imap_folder = get_config("imap_folder", "INBOX") imap_use_ssl = get_config("imap_use_ssl", True) if not all([imap_host, imap_user, imap_password]): logger.warning( "IMAP nicht konfiguriert (IMAP_HOST, IMAP_USER, IMAP_PASSWORD fehlen). " "Task wird uebersprungen." ) return {"status": "skipped", "reason": "IMAP not configured"} # Vorab: Destinataer-E-Mail-Index fuer schnelle Zuordnung # Nur aktive Destinataere mit gesetzter E-Mail-Adresse destinataer_by_email = { d.email.lower(): d for d in Destinataer.objects.filter(aktiv=True, email__isnull=False).exclude(email="") } processed = 0 errors = 0 try: # IMAP-Verbindung aufbauen (mit Socket-Timeout fuer grosse E-Mails) imap_timeout = 120 # Sekunden – genug fuer grosse Anhaenge if imap_use_ssl: mail = imaplib.IMAP4_SSL(imap_host, imap_port, timeout=imap_timeout) else: mail = imaplib.IMAP4(imap_host, imap_port, timeout=imap_timeout) mail.login(imap_user, imap_password) mail.select(imap_folder) # Nachrichten suchen if search_all_recent_days and search_all_recent_days > 0: from datetime import timedelta since_date = (datetime.now(dt_timezone.utc) - timedelta(days=search_all_recent_days)).strftime("%d-%b-%Y") _, message_ids_raw = mail.search(None, "SINCE", since_date) search_mode = f"ALL seit {since_date}" else: _, message_ids_raw = mail.search(None, "UNSEEN") search_mode = "UNSEEN" message_ids = message_ids_raw[0].split() logger.info("Postfach '%s' (%s): %d Nachricht(en) gefunden.", imap_folder, search_mode, len(message_ids)) for msg_id in message_ids: try: _, msg_data = mail.fetch(msg_id, "(RFC822)") raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Absender ermitteln from_raw = msg.get("From", "") absender_name_raw, absender_email_raw = email.utils.parseaddr(from_raw) absender_email_addr = absender_email_raw.lower().strip() absender_name = _decode_header_value(absender_name_raw) # Betreff betreff = _decode_header_value(msg.get("Subject", "")) # Eingangsdatum eingangsdatum = _parse_email_date(msg.get("Date", "")) # E-Mail-Text email_text = _get_email_body(msg) # Destinataer zuordnen destinataer = destinataer_by_email.get(absender_email_addr) # Kategorie erkennen kategorie = _detect_kategorie(betreff, email_text, has_destinataer=bool(destinataer)) # Status basierend auf Kategorie if destinataer: status = "zugewiesen" elif kategorie == "rechnung": status = "neu" # Muss manuell als Rechnung erfasst werden else: status = "unbekannt" # DMS-Kontext fuer Anhaenge basierend auf Kategorie dms_kontext_map = { "rechnung": "rechnung", "stiftungsgeschichte": "stiftungsgeschichte", } dms_kontext = dms_kontext_map.get(kategorie, "korrespondenz") # Pruefen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via # Datum + Absender + Betreff) already_exists = EmailEingang.objects.filter( absender_email=absender_email_addr, eingangsdatum=eingangsdatum, betreff=betreff[:500], ).exists() if already_exists: logger.debug( "E-Mail von %s am %s bereits vorhanden – wird uebersprungen.", absender_email_addr, eingangsdatum, ) # Als gelesen markieren mail.store(msg_id, "+FLAGS", "\\Seen") continue # Datensatz anlegen eingang = EmailEingang( kategorie=kategorie, destinataer=destinataer, absender_email=absender_email_addr, absender_name=absender_name, betreff=betreff[:500], eingangsdatum=eingangsdatum, email_text=email_text, status=status, ) # Anhaenge verarbeiten und als DokumentDatei im DMS speichern dms_dokumente = [] if msg.is_multipart(): for part in msg.walk(): disposition = str(part.get_content_disposition() or "") if "attachment" in disposition: filename = _decode_header_value(part.get_filename() or "") content = part.get_payload(decode=True) if not content: logger.warning( "Anhang '%s' hat keinen Inhalt – wird uebersprungen.", filename, ) continue doc = _save_to_dms( content=content, filename=filename, destinataer=destinataer, betreff=betreff, kontext=dms_kontext, ) if doc: dms_dokumente.append(doc) if dms_dokumente: eingang.status = "verarbeitet" if destinataer else status eingang.save() if dms_dokumente: eingang.dokument_dateien.set(dms_dokumente) # Als gelesen markieren mail.store(msg_id, "+FLAGS", "\\Seen") processed += 1 logger.info( "E-Mail verarbeitet: von=%s, Kategorie=%s, Destinataer=%s, Anhaenge=%d", absender_email_addr, kategorie, str(destinataer) if destinataer else "–", len(dms_dokumente), ) except Exception as exc: errors += 1 logger.exception("Fehler bei Verarbeitung von Nachricht %s: %s", msg_id, exc) # Nicht als gelesen markieren – wird beim naechsten Lauf erneut versucht mail.close() mail.logout() except imaplib.IMAP4.error as exc: logger.error("IMAP-Fehler: %s", exc) raise self.retry(exc=exc) except Exception as exc: logger.exception("Unerwarteter Fehler im poll_emails-Task: %s", exc) raise self.retry(exc=exc) result = {"status": "done", "processed": processed, "errors": errors} logger.info("poll_emails abgeschlossen: %s", result) return result # Backward-compatible alias for existing Celery Beat schedules poll_destinataer_emails = poll_emails