Files
stiftung-management-system/app/stiftung/tasks.py
SysAdmin Agent e0b377014c
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
CI/CD Pipeline / deploy (push) Has been cancelled
Code Quality / quality (push) Has been cancelled
v4.1.0: DMS email documents, category-specific Nachweis linking, version system
- Save cover email body as DMS document with new 'email' context type
- Show email body separately from attachments in email detail view
- Add per-category DMS document assignment in quarterly confirmation
  (Studiennachweis, Einkommenssituation, Vermögenssituation)
- Add VERSION file and context processor for automatic version display
- Add MCP server, agent system, import/export, and new migrations
- Update compose files and production environment template

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 18:48:52 +00:00

421 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Celery-Tasks fuer die automatische Verarbeitung eingehender E-Mails.
Workflow:
1. `poll_emails` laeuft alle 15 Minuten (Celery Beat)
2. Er liest ungelesene E-Mails aus dem IMAP-Postfach
3. Fuer jede E-Mail:
a) Absender wird mit Destinataer-Datenbank abgeglichen (E-Mail-Feld)
b) Betreff/Body wird auf Rechnungs-Keywords geprueft
c) Ein EmailEingang-Datensatz wird angelegt (mit Kategorie)
d) Alle Anhaenge werden als DokumentDatei im Django-DMS gespeichert
4. Nicht erkannte Absender werden als "unbekannt" markiert (manuelle Nachbearbeitung)
Konfiguration (Umgebungsvariablen in .env / compose.yml):
IMAP_HOST — IMAP-Server-Adresse (z. B. mail.vhtv-stiftung.de)
IMAP_PORT — Port (Standard: 993 fuer SSL)
IMAP_USER — Benutzername
IMAP_PASSWORD — Passwort
IMAP_FOLDER — Ordner (Standard: INBOX)
"""
import email
import email.utils
import imaplib
import logging
import mimetypes
import re
from datetime import datetime, timezone as dt_timezone
from email.header import decode_header, make_header
from celery import shared_task
from django.utils import timezone
logger = logging.getLogger(__name__)
# Patterns fuer Rechnungserkennung im Betreff/Body
RECHNUNG_PATTERNS = [
re.compile(r"\brechnung\b", re.IGNORECASE),
re.compile(r"\binvoice\b", re.IGNORECASE),
re.compile(r"\brechnungs[-\s]?nr\.?\s*[:\s]?\s*\d+", re.IGNORECASE),
re.compile(r"\bRE[-/]\d{4,}", re.IGNORECASE), # RE-2024001, RE/20240315
]
GESCHICHTE_PATTERNS = [
re.compile(r"\bstiftungsgeschichte\b", re.IGNORECASE),
re.compile(r"\bahnenforschung\b", re.IGNORECASE),
re.compile(r"\bgenealogie\b", re.IGNORECASE),
re.compile(r"\bstammbaum\b", re.IGNORECASE),
re.compile(r"\bhistorisch", re.IGNORECASE),
re.compile(r"\bchronik\b", re.IGNORECASE),
re.compile(r"\barchiv\b", re.IGNORECASE),
re.compile(r"\bfamiliengeschichte\b", re.IGNORECASE),
re.compile(r"\burkunde\b", re.IGNORECASE),
]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _decode_header_value(raw_value: str) -> str:
"""Dekodiert kodierte E-Mail-Header (z. B. UTF-8 oder Latin-1)."""
if not raw_value:
return ""
try:
return str(make_header(decode_header(raw_value)))
except Exception:
return raw_value
def _parse_email_date(date_str: str) -> datetime:
"""Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurueck."""
try:
parsed = email.utils.parsedate_to_datetime(date_str)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt_timezone.utc)
return parsed
except Exception:
return timezone.now()
def _get_email_body(msg) -> str:
"""Extrahiert den Text-Body aus einer E-Mail (bevorzugt plain text)."""
body_parts = []
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
disposition = str(part.get_content_disposition() or "")
if ctype == "text/plain" and "attachment" not in disposition:
charset = part.get_content_charset() or "utf-8"
try:
body_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
except Exception:
pass
else:
charset = msg.get_content_charset() or "utf-8"
try:
body_parts.append(msg.get_payload(decode=True).decode(charset, errors="replace"))
except Exception:
pass
return "\n".join(body_parts).strip()
def _detect_kategorie(betreff: str, email_text: str, has_destinataer: bool) -> str:
"""
Erkennt die Kategorie einer Email anhand von Betreff und Body.
Gibt 'destinataer', 'rechnung', 'stiftungsgeschichte', oder 'allgemein' zurueck.
"""
if has_destinataer:
return "destinataer"
text_to_check = f"{betreff}\n{email_text[:2000]}"
# Rechnungserkennung via Patterns
for pattern in RECHNUNG_PATTERNS:
if pattern.search(text_to_check):
return "rechnung"
# Stiftungsgeschichte-Erkennung
for pattern in GESCHICHTE_PATTERNS:
if pattern.search(text_to_check):
return "stiftungsgeschichte"
return "allgemein"
def _save_to_dms(content: bytes, filename: str, destinataer=None, betreff: str = "", kontext: str = "korrespondenz"):
"""
Speichert einen E-Mail-Anhang direkt als DokumentDatei im Django-DMS.
Gibt das DokumentDatei-Objekt zurueck, oder None bei Fehler.
"""
from stiftung.models import DokumentDatei
from django.core.files.base import ContentFile
safe_filename = filename or "anhang.bin"
mime_type, _ = mimetypes.guess_type(safe_filename)
mime_type = mime_type or "application/octet-stream"
titel = f"{betreff[:100]} {safe_filename}" if betreff else safe_filename
beschreibung = ""
if destinataer:
beschreibung = (
f"Automatisch importiert aus E-Mail-Eingang.\n"
f"Absender: {destinataer.vorname} {destinataer.nachname} <{destinataer.email}>"
)
try:
doc = DokumentDatei(
titel=titel[:255],
beschreibung=beschreibung,
kontext=kontext,
dateiname_original=safe_filename,
dateityp=mime_type,
dateigroesse=len(content),
destinataer=destinataer,
)
doc.datei.save(safe_filename, ContentFile(content), save=False)
doc.save()
logger.info("Anhang '%s' als DokumentDatei gespeichert (ID: %s).", safe_filename, doc.pk)
return doc
except Exception as exc:
logger.error("Fehler beim Speichern von '%s' im DMS: %s", safe_filename, exc)
return None
# ---------------------------------------------------------------------------
# Haupttask
# ---------------------------------------------------------------------------
@shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_emails")
def poll_emails(self, search_all_recent_days=0):
"""
Liest E-Mails aus dem IMAP-Postfach und verarbeitet sie.
Wird durch Celery Beat alle 15 Minuten ausgefuehrt.
Erkennt automatisch Destinataer-Emails, Rechnungen und allgemeine Post.
Args:
search_all_recent_days: Wenn > 0, werden alle E-Mails der letzten N Tage
durchsucht (nicht nur ungelesene). Nuetzlich fuer manuellen Abruf.
"""
from stiftung.models import Destinataer, EmailEingang
# IMAP-Konfiguration: DB (AppConfiguration) mit Fallback auf env/settings
from stiftung.utils.config import get_config
imap_host = get_config("imap_host")
imap_port = int(get_config("imap_port", 993))
imap_user = get_config("imap_user")
imap_password = get_config("imap_password")
imap_folder = get_config("imap_folder", "INBOX")
imap_use_ssl = get_config("imap_use_ssl", True)
if not all([imap_host, imap_user, imap_password]):
logger.warning(
"IMAP nicht konfiguriert (IMAP_HOST, IMAP_USER, IMAP_PASSWORD fehlen). "
"Task wird uebersprungen."
)
return {"status": "skipped", "reason": "IMAP not configured"}
# Vorab: Destinataer-E-Mail-Index fuer schnelle Zuordnung
# Nur aktive Destinataere mit gesetzter E-Mail-Adresse
destinataer_by_email = {
d.email.lower(): d
for d in Destinataer.objects.filter(aktiv=True, email__isnull=False).exclude(email="")
}
processed = 0
errors = 0
try:
# IMAP-Verbindung aufbauen (mit Socket-Timeout fuer grosse E-Mails)
imap_timeout = 120 # Sekunden genug fuer grosse Anhaenge
if imap_use_ssl:
mail = imaplib.IMAP4_SSL(imap_host, imap_port, timeout=imap_timeout)
else:
mail = imaplib.IMAP4(imap_host, imap_port, timeout=imap_timeout)
mail.login(imap_user, imap_password)
mail.select(imap_folder)
# Nachrichten suchen
if search_all_recent_days and search_all_recent_days > 0:
from datetime import timedelta
since_date = (datetime.now(dt_timezone.utc) - timedelta(days=search_all_recent_days)).strftime("%d-%b-%Y")
_, message_ids_raw = mail.search(None, "SINCE", since_date)
search_mode = f"ALL seit {since_date}"
else:
_, message_ids_raw = mail.search(None, "UNSEEN")
search_mode = "UNSEEN"
message_ids = message_ids_raw[0].split()
logger.info("Postfach '%s' (%s): %d Nachricht(en) gefunden.", imap_folder, search_mode, len(message_ids))
for msg_id in message_ids:
try:
_, msg_data = mail.fetch(msg_id, "(RFC822)")
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Absender ermitteln
from_raw = msg.get("From", "")
absender_name_raw, absender_email_raw = email.utils.parseaddr(from_raw)
absender_email_addr = absender_email_raw.lower().strip()
absender_name = _decode_header_value(absender_name_raw)
# Betreff
betreff = _decode_header_value(msg.get("Subject", ""))
# Eingangsdatum
eingangsdatum = _parse_email_date(msg.get("Date", ""))
# E-Mail-Text
email_text = _get_email_body(msg)
# Destinataer zuordnen
destinataer = destinataer_by_email.get(absender_email_addr)
# Kategorie erkennen
kategorie = _detect_kategorie(betreff, email_text, has_destinataer=bool(destinataer))
# Status basierend auf Kategorie
if destinataer:
status = "zugewiesen"
elif kategorie == "rechnung":
status = "neu" # Muss manuell als Rechnung erfasst werden
else:
status = "unbekannt"
# DMS-Kontext fuer Anhaenge basierend auf Kategorie
dms_kontext_map = {
"rechnung": "rechnung",
"stiftungsgeschichte": "stiftungsgeschichte",
}
dms_kontext = dms_kontext_map.get(kategorie, "korrespondenz")
# Pruefen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via
# Datum + Absender + Betreff)
already_exists = EmailEingang.objects.filter(
absender_email=absender_email_addr,
eingangsdatum=eingangsdatum,
betreff=betreff[:500],
).exists()
if already_exists:
logger.debug(
"E-Mail von %s am %s bereits vorhanden wird uebersprungen.",
absender_email_addr, eingangsdatum,
)
# Als gelesen markieren
mail.store(msg_id, "+FLAGS", "\\Seen")
continue
# Datensatz anlegen
eingang = EmailEingang(
kategorie=kategorie,
destinataer=destinataer,
absender_email=absender_email_addr,
absender_name=absender_name,
betreff=betreff[:500],
eingangsdatum=eingangsdatum,
email_text=email_text,
status=status,
)
# Anhaenge verarbeiten und als DokumentDatei im DMS speichern
dms_dokumente = []
if msg.is_multipart():
for part in msg.walk():
disposition = str(part.get_content_disposition() or "")
if "attachment" in disposition:
filename = _decode_header_value(part.get_filename() or "")
content = part.get_payload(decode=True)
if not content:
logger.warning(
"Anhang '%s' hat keinen Inhalt wird uebersprungen.",
filename,
)
continue
doc = _save_to_dms(
content=content,
filename=filename,
destinataer=destinataer,
betreff=betreff,
kontext=dms_kontext,
)
if doc:
dms_dokumente.append(doc)
# Cover-Email als eigenes DMS-Dokument speichern
email_body_doc = None
if email_text.strip():
email_filename = f"Email_{eingangsdatum.strftime('%Y%m%d_%H%M')}_{betreff[:50]}.txt"
# Bereinige Dateinamen
email_filename = re.sub(r'[^\w\s\-._]', '', email_filename)
anhang_count = len(dms_dokumente)
anhang_hinweis = (
f"\n\n--- Anhänge: {anhang_count} ---\n"
+ "\n".join(f"{d.dateiname_original or d.titel}" for d in dms_dokumente)
if dms_dokumente else ""
)
email_body_content = (
f"Von: {absender_name} <{absender_email_addr}>\n"
f"Datum: {eingangsdatum.strftime('%d.%m.%Y %H:%M')}\n"
f"Betreff: {betreff}\n"
f"{'=' * 60}\n\n"
f"{email_text}"
f"{anhang_hinweis}"
)
email_body_doc = _save_to_dms(
content=email_body_content.encode("utf-8"),
filename=email_filename,
destinataer=destinataer,
betreff=betreff,
kontext="email",
)
if email_body_doc:
# Beschreibung mit Anhang-Verweis ergaenzen
if dms_dokumente:
email_body_doc.beschreibung = (
f"E-Mail-Nachricht mit {anhang_count} Anhang/Anhängen.\n"
f"Absender: {absender_name} <{absender_email_addr}>"
)
else:
email_body_doc.beschreibung = (
f"E-Mail-Nachricht (ohne Anhänge).\n"
f"Absender: {absender_name} <{absender_email_addr}>"
)
email_body_doc.save(update_fields=["beschreibung"])
# Alle DMS-Dokumente (Email-Body + Anhaenge) verknuepfen
alle_dms_dokumente = []
if email_body_doc:
alle_dms_dokumente.append(email_body_doc)
alle_dms_dokumente.extend(dms_dokumente)
if dms_dokumente:
eingang.status = "verarbeitet" if destinataer else status
eingang.save()
if alle_dms_dokumente:
eingang.dokument_dateien.set(alle_dms_dokumente)
# Als gelesen markieren
mail.store(msg_id, "+FLAGS", "\\Seen")
processed += 1
logger.info(
"E-Mail verarbeitet: von=%s, Kategorie=%s, Destinataer=%s, Anhaenge=%d",
absender_email_addr,
kategorie,
str(destinataer) if destinataer else "",
len(dms_dokumente),
)
except Exception as exc:
errors += 1
logger.exception("Fehler bei Verarbeitung von Nachricht %s: %s", msg_id, exc)
# Nicht als gelesen markieren wird beim naechsten Lauf erneut versucht
mail.close()
mail.logout()
except imaplib.IMAP4.error as exc:
logger.error("IMAP-Fehler: %s", exc)
raise self.retry(exc=exc)
except Exception as exc:
logger.exception("Unerwarteter Fehler im poll_emails-Task: %s", exc)
raise self.retry(exc=exc)
result = {"status": "done", "processed": processed, "errors": errors}
logger.info("poll_emails abgeschlossen: %s", result)
return result
# Backward-compatible alias for existing Celery Beat schedules
poll_destinataer_emails = poll_emails