feat: Email-Eingangsverarbeitung für Destinatäre implementieren
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
CI/CD Pipeline / deploy (push) Has been cancelled
Code Quality / quality (push) Has been cancelled

Neues System zur automatischen Verarbeitung eingehender E-Mails von
Destinatären. IMAP-Polling alle 15 Minuten via Celery Beat, automatische
Zuordnung zu Destinatären anhand der E-Mail-Adresse, Upload von Anhängen
zu Paperless-NGX.

Umfasst:
- DestinataerEmailEingang Model mit Status-Tracking
- Celery Task für IMAP-Polling und Paperless-Integration
- Web-UI (Liste + Detail) mit Such- und Filterfunktion
- Admin-Interface mit Bulk-Actions
- Agent-Dokumentation (SysAdmin, RentmeisterAI)
- Dev-Environment Modernisierung (docker compose v2)

Reviewed by: SysAdmin (STI-15), RentmeisterAI (STI-16)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Stiftung CEO Agent
2026-03-09 21:11:22 +00:00
parent 6c8ddbb4f0
commit 4b21f553c3
16 changed files with 1554 additions and 49 deletions

324
app/stiftung/tasks.py Normal file
View File

@@ -0,0 +1,324 @@
"""
Celery-Tasks für die automatische Verarbeitung von Destinatär-E-Mails.
Workflow:
1. `poll_destinataer_emails` läuft alle 15 Minuten (Celery Beat)
2. Er liest ungelesene E-Mails aus dem IMAP-Postfach (paperless@vhtv-stiftung.de)
3. Für jede E-Mail:
a) Absender wird mit Destinatär-Datenbank abgeglichen (E-Mail-Feld)
b) Ein DestinataerEmailEingang-Datensatz wird angelegt
c) Alle Anhänge werden per Paperless-API hochgeladen
d) Für jeden Anhang wird ein DokumentLink erstellt
4. Nicht erkannte Absender werden als "unbekannt" markiert (manuelle Nachbearbeitung)
Konfiguration (Umgebungsvariablen in .env / compose.yml):
IMAP_HOST — IMAP-Server-Adresse (z. B. mail.vhtv-stiftung.de)
IMAP_PORT — Port (Standard: 993 für SSL)
IMAP_USER — Benutzername (z. B. paperless@vhtv-stiftung.de)
IMAP_PASSWORD — Passwort
IMAP_FOLDER — Ordner (Standard: INBOX)
"""
import email
import email.utils
import imaplib
import io
import logging
import mimetypes
from datetime import datetime, timezone as dt_timezone
from email.header import decode_header, make_header
import requests
from celery import shared_task
from django.conf import settings
from django.utils import timezone
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _decode_header_value(raw_value: str) -> str:
"""Dekodiert kodierte E-Mail-Header (z. B. UTF-8 oder Latin-1)."""
if not raw_value:
return ""
try:
return str(make_header(decode_header(raw_value)))
except Exception:
return raw_value
def _parse_email_date(date_str: str) -> datetime:
"""Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurück."""
try:
parsed = email.utils.parsedate_to_datetime(date_str)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt_timezone.utc)
return parsed
except Exception:
return timezone.now()
def _get_email_body(msg) -> str:
"""Extrahiert den Text-Body aus einer E-Mail (bevorzugt plain text)."""
body_parts = []
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
disposition = str(part.get_content_disposition() or "")
if ctype == "text/plain" and "attachment" not in disposition:
charset = part.get_content_charset() or "utf-8"
try:
body_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
except Exception:
pass
else:
charset = msg.get_content_charset() or "utf-8"
try:
body_parts.append(msg.get_payload(decode=True).decode(charset, errors="replace"))
except Exception:
pass
return "\n".join(body_parts).strip()
def _upload_to_paperless(content: bytes, filename: str, destinataer=None, betreff: str = "") -> int | None:
"""
Lädt einen Anhang in Paperless-NGX hoch.
Gibt die neue Paperless-Dokument-ID zurück, oder None bei Fehler.
"""
api_url = getattr(settings, "PAPERLESS_API_URL", None)
api_token = getattr(settings, "PAPERLESS_API_TOKEN", None)
if not api_url or not api_token:
logger.warning("Paperless nicht konfiguriert Anhang '%s' wird nicht hochgeladen.", filename)
return None
# Tag-ID für Destinatäre ermitteln
tag_ids = []
dest_tag_id = getattr(settings, "PAPERLESS_DESTINATAERE_TAG_ID", None)
if dest_tag_id:
try:
tag_ids.append(int(dest_tag_id))
except (ValueError, TypeError):
pass
# Correspondent: Name des Destinatärs (optional, Paperless sucht/erstellt ihn)
correspondent_name = None
if destinataer:
correspondent_name = f"{destinataer.vorname} {destinataer.nachname}".strip()
# Dateiname bereinigen
safe_filename = filename or "anhang.pdf"
# Mime-Type bestimmen
mime_type, _ = mimetypes.guess_type(safe_filename)
mime_type = mime_type or "application/octet-stream"
upload_url = f"{api_url.rstrip('/')}/api/documents/post_document/"
headers = {"Authorization": f"Token {api_token}"}
form_data = {}
if tag_ids:
form_data["tags"] = tag_ids
if correspondent_name:
form_data["correspondent_name"] = correspondent_name
if betreff:
form_data["title"] = betreff[:128]
files = {"document": (safe_filename, io.BytesIO(content), mime_type)}
try:
response = requests.post(
upload_url,
headers=headers,
data=form_data,
files=files,
timeout=60,
)
response.raise_for_status()
# Paperless gibt die neue Dokument-ID zurück (als Integer oder UUID-String)
result = response.json()
doc_id = result if isinstance(result, int) else result.get("id")
logger.info("Anhang '%s' erfolgreich in Paperless hochgeladen (ID: %s).", safe_filename, doc_id)
return doc_id
except requests.RequestException as exc:
logger.error("Fehler beim Hochladen von '%s' in Paperless: %s", safe_filename, exc)
return None
# ---------------------------------------------------------------------------
# Haupttask
# ---------------------------------------------------------------------------
@shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_destinataer_emails")
def poll_destinataer_emails(self):
"""
Liest ungelesene E-Mails aus dem IMAP-Postfach und verarbeitet sie.
Wird durch Celery Beat alle 15 Minuten ausgeführt.
"""
from stiftung.models import Destinataer, DestinataerEmailEingang, DokumentLink
# IMAP-Konfiguration aus Settings
imap_host = getattr(settings, "IMAP_HOST", None)
imap_port = int(getattr(settings, "IMAP_PORT", 993))
imap_user = getattr(settings, "IMAP_USER", None)
imap_password = getattr(settings, "IMAP_PASSWORD", None)
imap_folder = getattr(settings, "IMAP_FOLDER", "INBOX")
imap_use_ssl = getattr(settings, "IMAP_USE_SSL", True)
if not all([imap_host, imap_user, imap_password]):
logger.warning(
"IMAP nicht konfiguriert (IMAP_HOST, IMAP_USER, IMAP_PASSWORD fehlen). "
"Task wird übersprungen."
)
return {"status": "skipped", "reason": "IMAP not configured"}
# Vorab: Destinatär-E-Mail-Index für schnelle Zuordnung
# Nur aktive Destinatäre mit gesetzter E-Mail-Adresse
destinataer_by_email = {
d.email.lower(): d
for d in Destinataer.objects.filter(aktiv=True, email__isnull=False).exclude(email="")
}
processed = 0
errors = 0
try:
# IMAP-Verbindung aufbauen
if imap_use_ssl:
mail = imaplib.IMAP4_SSL(imap_host, imap_port)
else:
mail = imaplib.IMAP4(imap_host, imap_port)
mail.login(imap_user, imap_password)
mail.select(imap_folder)
# Ungelesene Nachrichten suchen
_, message_ids_raw = mail.search(None, "UNSEEN")
message_ids = message_ids_raw[0].split()
logger.info("Postfach '%s': %d ungelesene Nachricht(en) gefunden.", imap_folder, len(message_ids))
for msg_id in message_ids:
try:
_, msg_data = mail.fetch(msg_id, "(RFC822)")
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Absender ermitteln
from_raw = msg.get("From", "")
absender_name_raw, absender_email_raw = email.utils.parseaddr(from_raw)
absender_email = absender_email_raw.lower().strip()
absender_name = _decode_header_value(absender_name_raw)
# Betreff
betreff = _decode_header_value(msg.get("Subject", ""))
# Eingangsdatum
eingangsdatum = _parse_email_date(msg.get("Date", ""))
# E-Mail-Text
email_text = _get_email_body(msg)
# Destinatär zuordnen
destinataer = destinataer_by_email.get(absender_email)
status = "zugewiesen" if destinataer else "unbekannt"
# Prüfen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via
# Datum + Absender + Betreff)
already_exists = DestinataerEmailEingang.objects.filter(
absender_email=absender_email,
eingangsdatum=eingangsdatum,
betreff=betreff[:500],
).exists()
if already_exists:
logger.debug(
"E-Mail von %s am %s bereits vorhanden wird übersprungen.",
absender_email, eingangsdatum,
)
# Als gelesen markieren
mail.store(msg_id, "+FLAGS", "\\Seen")
continue
# Datensatz anlegen
eingang = DestinataerEmailEingang(
destinataer=destinataer,
absender_email=absender_email,
absender_name=absender_name,
betreff=betreff[:500],
eingangsdatum=eingangsdatum,
email_text=email_text,
status=status,
)
# Anhänge verarbeiten
paperless_ids = []
if msg.is_multipart():
for part in msg.walk():
disposition = str(part.get_content_disposition() or "")
if "attachment" in disposition:
filename = _decode_header_value(part.get_filename() or "")
content = part.get_payload(decode=True)
if not content:
continue
doc_id = _upload_to_paperless(
content=content,
filename=filename,
destinataer=destinataer,
betreff=betreff,
)
if doc_id:
paperless_ids.append(doc_id)
# DokumentLink anlegen
DokumentLink.objects.create(
paperless_document_id=doc_id,
kontext="verwendungsnachweis",
titel=f"{betreff[:100]} {filename}" if filename else betreff[:200],
beschreibung=(
f"Automatisch importiert aus E-Mail-Eingang.\n"
f"Absender: {absender_name} <{absender_email}>\n"
f"Datum: {eingangsdatum.strftime('%d.%m.%Y %H:%M')}"
),
destinataer_id=destinataer.pk if destinataer else None,
)
eingang.paperless_dokument_ids = paperless_ids
if paperless_ids:
eingang.status = "verarbeitet" if destinataer else "unbekannt"
eingang.save()
# Als gelesen markieren
mail.store(msg_id, "+FLAGS", "\\Seen")
processed += 1
logger.info(
"E-Mail verarbeitet: von=%s, Destinatär=%s, Anhänge=%d",
absender_email,
str(destinataer) if destinataer else "unbekannt",
len(paperless_ids),
)
except Exception as exc:
errors += 1
logger.exception("Fehler bei Verarbeitung von Nachricht %s: %s", msg_id, exc)
# Nicht als gelesen markieren wird beim nächsten Lauf erneut versucht
mail.close()
mail.logout()
except imaplib.IMAP4.error as exc:
logger.error("IMAP-Fehler: %s", exc)
raise self.retry(exc=exc)
except Exception as exc:
logger.exception("Unerwarteter Fehler im poll_destinataer_emails-Task: %s", exc)
raise self.retry(exc=exc)
result = {"status": "done", "processed": processed, "errors": errors}
logger.info("poll_destinataer_emails abgeschlossen: %s", result)
return result