""" Celery-Tasks für die automatische Verarbeitung von Destinatär-E-Mails. Workflow: 1. `poll_destinataer_emails` läuft alle 15 Minuten (Celery Beat) 2. Er liest ungelesene E-Mails aus dem IMAP-Postfach (paperless@vhtv-stiftung.de) 3. Für jede E-Mail: a) Absender wird mit Destinatär-Datenbank abgeglichen (E-Mail-Feld) b) Ein DestinataerEmailEingang-Datensatz wird angelegt c) Alle Anhänge werden per Paperless-API hochgeladen d) Für jeden Anhang wird ein DokumentLink erstellt 4. Nicht erkannte Absender werden als "unbekannt" markiert (manuelle Nachbearbeitung) Konfiguration (Umgebungsvariablen in .env / compose.yml): IMAP_HOST — IMAP-Server-Adresse (z. B. mail.vhtv-stiftung.de) IMAP_PORT — Port (Standard: 993 für SSL) IMAP_USER — Benutzername (z. B. paperless@vhtv-stiftung.de) IMAP_PASSWORD — Passwort IMAP_FOLDER — Ordner (Standard: INBOX) """ import email import email.utils import imaplib import io import logging import mimetypes from datetime import datetime, timezone as dt_timezone from email.header import decode_header, make_header import time import requests from celery import shared_task from django.conf import settings from django.utils import timezone logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def _decode_header_value(raw_value: str) -> str: """Dekodiert kodierte E-Mail-Header (z. B. UTF-8 oder Latin-1).""" if not raw_value: return "" try: return str(make_header(decode_header(raw_value))) except Exception: return raw_value def _parse_email_date(date_str: str) -> datetime: """Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurück.""" try: parsed = email.utils.parsedate_to_datetime(date_str) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=dt_timezone.utc) return parsed except Exception: return timezone.now() def _get_email_body(msg) -> str: """Extrahiert den Text-Body aus einer E-Mail (bevorzugt plain text).""" body_parts = [] if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() disposition = str(part.get_content_disposition() or "") if ctype == "text/plain" and "attachment" not in disposition: charset = part.get_content_charset() or "utf-8" try: body_parts.append(part.get_payload(decode=True).decode(charset, errors="replace")) except Exception: pass else: charset = msg.get_content_charset() or "utf-8" try: body_parts.append(msg.get_payload(decode=True).decode(charset, errors="replace")) except Exception: pass return "\n".join(body_parts).strip() def _poll_paperless_task(api_url: str, headers: dict, task_id: str, filename: str, max_wait: int = 120) -> int | None: """ Pollt den Paperless-ngx Task-Status bis das Dokument verarbeitet wurde. Gibt die Dokument-ID zurück, oder None bei Fehler/Timeout. """ task_url = f"{api_url.rstrip('/')}/api/tasks/?task_id={task_id}" waited = 0 interval = 2 while waited < max_wait: try: resp = requests.get(task_url, headers=headers, timeout=30) resp.raise_for_status() tasks = resp.json() if tasks: task = tasks[0] if isinstance(tasks, list) else tasks status = task.get("status", "") if status == "SUCCESS": related = task.get("related_document") if related: # related_document can be a URL like "/api/documents/42/" # or just an ID if isinstance(related, str): parts = related.rstrip("/").split("/") try: return int(parts[-1]) except (ValueError, IndexError): logger.warning("Konnte Dokument-ID nicht aus '%s' extrahieren.", related) return None return int(related) # Some versions use result_id or document_id for key in ("result_id", "document_id", "id"): val = task.get(key) if val is not None: try: return int(val) except (ValueError, TypeError): pass logger.warning("Task %s erfolgreich aber keine Dokument-ID gefunden: %s", task_id, task) return None elif status == "FAILURE": logger.error("Paperless-Task %s fehlgeschlagen für '%s': %s", task_id, filename, task.get("result")) return None except requests.RequestException as exc: logger.warning("Fehler beim Abfragen von Paperless-Task %s: %s", task_id, exc) time.sleep(interval) waited += interval logger.error("Timeout beim Warten auf Paperless-Task %s für '%s' (%ds).", task_id, filename, max_wait) return None def _upload_to_paperless(content: bytes, filename: str, destinataer=None, betreff: str = "") -> int | None: """ Lädt einen Anhang in Paperless-NGX hoch. Gibt die neue Paperless-Dokument-ID zurück, oder None bei Fehler. """ api_url = getattr(settings, "PAPERLESS_API_URL", None) api_token = getattr(settings, "PAPERLESS_API_TOKEN", None) if not api_url or not api_token: logger.warning("Paperless nicht konfiguriert – Anhang '%s' wird nicht hochgeladen.", filename) return None # Tag-ID für Destinatäre ermitteln tag_ids = [] dest_tag_id = getattr(settings, "PAPERLESS_DESTINATAERE_TAG_ID", None) if dest_tag_id: try: tag_ids.append(int(dest_tag_id)) except (ValueError, TypeError): pass # Correspondent: Name des Destinatärs (optional, Paperless sucht/erstellt ihn) correspondent_name = None if destinataer: correspondent_name = f"{destinataer.vorname} {destinataer.nachname}".strip() # Dateiname bereinigen safe_filename = filename or "anhang.pdf" # Mime-Type bestimmen mime_type, _ = mimetypes.guess_type(safe_filename) mime_type = mime_type or "application/octet-stream" upload_url = f"{api_url.rstrip('/')}/api/documents/post_document/" headers = {"Authorization": f"Token {api_token}"} form_data = {} if tag_ids: form_data["tags"] = tag_ids if correspondent_name: form_data["correspondent_name"] = correspondent_name if betreff: form_data["title"] = betreff[:128] files = {"document": (safe_filename, io.BytesIO(content), mime_type)} try: response = requests.post( upload_url, headers=headers, data=form_data, files=files, timeout=300, # 5 Minuten für große Anhänge ) response.raise_for_status() result = response.json() # Paperless-ngx post_document returns a task UUID string. # We need to poll the task status to get the actual document ID. if isinstance(result, str): task_id = result doc_id = _poll_paperless_task(api_url, headers, task_id, safe_filename) elif isinstance(result, int): doc_id = result else: doc_id = result.get("id") logger.info("Anhang '%s' erfolgreich in Paperless hochgeladen (ID: %s).", safe_filename, doc_id) return doc_id except requests.RequestException as exc: logger.error("Fehler beim Hochladen von '%s' in Paperless: %s", safe_filename, exc) return None # --------------------------------------------------------------------------- # Haupttask # --------------------------------------------------------------------------- @shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_destinataer_emails") def poll_destinataer_emails(self, search_all_recent_days=0): """ Liest E-Mails aus dem IMAP-Postfach und verarbeitet sie. Wird durch Celery Beat alle 15 Minuten ausgeführt. Args: search_all_recent_days: Wenn > 0, werden alle E-Mails der letzten N Tage durchsucht (nicht nur ungelesene). Nützlich für manuellen Abruf. """ from stiftung.models import Destinataer, DestinataerEmailEingang, DokumentLink # IMAP-Konfiguration: DB (AppConfiguration) mit Fallback auf env/settings from stiftung.utils.config import get_config imap_host = get_config("imap_host") imap_port = int(get_config("imap_port", 993)) imap_user = get_config("imap_user") imap_password = get_config("imap_password") imap_folder = get_config("imap_folder", "INBOX") imap_use_ssl = get_config("imap_use_ssl", True) if not all([imap_host, imap_user, imap_password]): logger.warning( "IMAP nicht konfiguriert (IMAP_HOST, IMAP_USER, IMAP_PASSWORD fehlen). " "Task wird übersprungen." ) return {"status": "skipped", "reason": "IMAP not configured"} # Vorab: Destinatär-E-Mail-Index für schnelle Zuordnung # Nur aktive Destinatäre mit gesetzter E-Mail-Adresse destinataer_by_email = { d.email.lower(): d for d in Destinataer.objects.filter(aktiv=True, email__isnull=False).exclude(email="") } processed = 0 errors = 0 try: # IMAP-Verbindung aufbauen (mit Socket-Timeout für große E-Mails) imap_timeout = 120 # Sekunden – genug für große Anhänge if imap_use_ssl: mail = imaplib.IMAP4_SSL(imap_host, imap_port, timeout=imap_timeout) else: mail = imaplib.IMAP4(imap_host, imap_port, timeout=imap_timeout) mail.login(imap_user, imap_password) mail.select(imap_folder) # Nachrichten suchen if search_all_recent_days and search_all_recent_days > 0: from datetime import timedelta since_date = (datetime.now(dt_timezone.utc) - timedelta(days=search_all_recent_days)).strftime("%d-%b-%Y") _, message_ids_raw = mail.search(None, "SINCE", since_date) search_mode = f"ALL seit {since_date}" else: _, message_ids_raw = mail.search(None, "UNSEEN") search_mode = "UNSEEN" message_ids = message_ids_raw[0].split() logger.info("Postfach '%s' (%s): %d Nachricht(en) gefunden.", imap_folder, search_mode, len(message_ids)) for msg_id in message_ids: try: _, msg_data = mail.fetch(msg_id, "(RFC822)") raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Absender ermitteln from_raw = msg.get("From", "") absender_name_raw, absender_email_raw = email.utils.parseaddr(from_raw) absender_email = absender_email_raw.lower().strip() absender_name = _decode_header_value(absender_name_raw) # Betreff betreff = _decode_header_value(msg.get("Subject", "")) # Eingangsdatum eingangsdatum = _parse_email_date(msg.get("Date", "")) # E-Mail-Text email_text = _get_email_body(msg) # Destinatär zuordnen destinataer = destinataer_by_email.get(absender_email) status = "zugewiesen" if destinataer else "unbekannt" # Prüfen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via # Datum + Absender + Betreff) already_exists = DestinataerEmailEingang.objects.filter( absender_email=absender_email, eingangsdatum=eingangsdatum, betreff=betreff[:500], ).exists() if already_exists: logger.debug( "E-Mail von %s am %s bereits vorhanden – wird übersprungen.", absender_email, eingangsdatum, ) # Als gelesen markieren mail.store(msg_id, "+FLAGS", "\\Seen") continue # Datensatz anlegen eingang = DestinataerEmailEingang( destinataer=destinataer, absender_email=absender_email, absender_name=absender_name, betreff=betreff[:500], eingangsdatum=eingangsdatum, email_text=email_text, status=status, ) # Anhänge verarbeiten paperless_ids = [] if msg.is_multipart(): for part in msg.walk(): disposition = str(part.get_content_disposition() or "") if "attachment" in disposition: filename = _decode_header_value(part.get_filename() or "") content = part.get_payload(decode=True) if not content: logger.warning( "Anhang '%s' hat keinen Inhalt (möglicherweise zu groß oder beschädigt) – wird übersprungen.", filename, ) continue doc_id = _upload_to_paperless( content=content, filename=filename, destinataer=destinataer, betreff=betreff, ) if doc_id: paperless_ids.append(doc_id) # DokumentLink anlegen DokumentLink.objects.create( paperless_document_id=doc_id, kontext="verwendungsnachweis", titel=f"{betreff[:100]} – {filename}" if filename else betreff[:200], beschreibung=( f"Automatisch importiert aus E-Mail-Eingang.\n" f"Absender: {absender_name} <{absender_email}>\n" f"Datum: {eingangsdatum.strftime('%d.%m.%Y %H:%M')}" ), destinataer_id=destinataer.pk if destinataer else None, ) eingang.paperless_dokument_ids = paperless_ids if paperless_ids: eingang.status = "verarbeitet" if destinataer else "unbekannt" eingang.save() # Als gelesen markieren mail.store(msg_id, "+FLAGS", "\\Seen") processed += 1 logger.info( "E-Mail verarbeitet: von=%s, Destinatär=%s, Anhänge=%d", absender_email, str(destinataer) if destinataer else "unbekannt", len(paperless_ids), ) except Exception as exc: errors += 1 logger.exception("Fehler bei Verarbeitung von Nachricht %s: %s", msg_id, exc) # Nicht als gelesen markieren – wird beim nächsten Lauf erneut versucht mail.close() mail.logout() except imaplib.IMAP4.error as exc: logger.error("IMAP-Fehler: %s", exc) raise self.retry(exc=exc) except Exception as exc: logger.exception("Unerwarteter Fehler im poll_destinataer_emails-Task: %s", exc) raise self.retry(exc=exc) result = {"status": "done", "processed": processed, "errors": errors} logger.info("poll_destinataer_emails abgeschlossen: %s", result) return result