Generalize email system with invoice workflow and Stiftungsgeschichte category

- Rename DestinataerEmailEingang → EmailEingang with category support
  (destinataer, rechnung, land_pacht, stiftungsgeschichte, allgemein)
- Add invoice capture workflow: create Verwaltungskosten from email,
  link DMS documents as invoice attachments, track payment status
- Add Stiftungsgeschichte email category with auto-detection patterns
  (Ahnenforschung, Genealogie, Chronik, etc.) and DMS integration
- Update poll_emails task with category detection and DMS context mapping
- Show available history documents in Geschichte editor sidebar
- Consolidate DMS views, remove legacy dokument templates
- Update all detail/form templates for DMS document linking
- Add deploy.sh script and streamline compose.yml

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
SysAdmin Agent
2026-03-12 10:17:14 +00:00
parent f4fc512ad3
commit e6f4c5ba1b
44 changed files with 1076 additions and 3428 deletions

View File

@@ -1,20 +1,20 @@
"""
Celery-Tasks für die automatische Verarbeitung von Destinatär-E-Mails.
Celery-Tasks fuer die automatische Verarbeitung eingehender E-Mails.
Workflow:
1. `poll_destinataer_emails` läuft alle 15 Minuten (Celery Beat)
2. Er liest ungelesene E-Mails aus dem IMAP-Postfach (paperless@vhtv-stiftung.de)
3. Für jede E-Mail:
a) Absender wird mit Destinatär-Datenbank abgeglichen (E-Mail-Feld)
b) Ein DestinataerEmailEingang-Datensatz wird angelegt
c) Alle Anhänge werden per Paperless-API hochgeladen
d) Für jeden Anhang wird ein DokumentLink erstellt
1. `poll_emails` laeuft alle 15 Minuten (Celery Beat)
2. Er liest ungelesene E-Mails aus dem IMAP-Postfach
3. Fuer jede E-Mail:
a) Absender wird mit Destinataer-Datenbank abgeglichen (E-Mail-Feld)
b) Betreff/Body wird auf Rechnungs-Keywords geprueft
c) Ein EmailEingang-Datensatz wird angelegt (mit Kategorie)
d) Alle Anhaenge werden als DokumentDatei im Django-DMS gespeichert
4. Nicht erkannte Absender werden als "unbekannt" markiert (manuelle Nachbearbeitung)
Konfiguration (Umgebungsvariablen in .env / compose.yml):
IMAP_HOST — IMAP-Server-Adresse (z. B. mail.vhtv-stiftung.de)
IMAP_PORT — Port (Standard: 993 für SSL)
IMAP_USER — Benutzername (z. B. paperless@vhtv-stiftung.de)
IMAP_PORT — Port (Standard: 993 fuer SSL)
IMAP_USER — Benutzername
IMAP_PASSWORD — Passwort
IMAP_FOLDER — Ordner (Standard: INBOX)
"""
@@ -22,22 +22,39 @@ Konfiguration (Umgebungsvariablen in .env / compose.yml):
import email
import email.utils
import imaplib
import io
import logging
import mimetypes
import re
from datetime import datetime, timezone as dt_timezone
from email.header import decode_header, make_header
import time
import requests
from celery import shared_task
from django.conf import settings
from django.utils import timezone
logger = logging.getLogger(__name__)
# Patterns fuer Rechnungserkennung im Betreff/Body
RECHNUNG_PATTERNS = [
re.compile(r"\brechnung\b", re.IGNORECASE),
re.compile(r"\binvoice\b", re.IGNORECASE),
re.compile(r"\brechnungs[-\s]?nr\.?\s*[:\s]?\s*\d+", re.IGNORECASE),
re.compile(r"\bRE[-/]\d{4,}", re.IGNORECASE), # RE-2024001, RE/20240315
]
GESCHICHTE_PATTERNS = [
re.compile(r"\bstiftungsgeschichte\b", re.IGNORECASE),
re.compile(r"\bahnenforschung\b", re.IGNORECASE),
re.compile(r"\bgenealogie\b", re.IGNORECASE),
re.compile(r"\bstammbaum\b", re.IGNORECASE),
re.compile(r"\bhistorisch", re.IGNORECASE),
re.compile(r"\bchronik\b", re.IGNORECASE),
re.compile(r"\barchiv\b", re.IGNORECASE),
re.compile(r"\bfamiliengeschichte\b", re.IGNORECASE),
re.compile(r"\burkunde\b", re.IGNORECASE),
]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
@@ -54,7 +71,7 @@ def _decode_header_value(raw_value: str) -> str:
def _parse_email_date(date_str: str) -> datetime:
"""Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurück."""
"""Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurueck."""
try:
parsed = email.utils.parsedate_to_datetime(date_str)
if parsed.tzinfo is None:
@@ -86,148 +103,88 @@ def _get_email_body(msg) -> str:
return "\n".join(body_parts).strip()
def _poll_paperless_task(api_url: str, headers: dict, task_id: str, filename: str, max_wait: int = 120) -> int | None:
def _detect_kategorie(betreff: str, email_text: str, has_destinataer: bool) -> str:
"""
Pollt den Paperless-ngx Task-Status bis das Dokument verarbeitet wurde.
Gibt die Dokument-ID zurück, oder None bei Fehler/Timeout.
Erkennt die Kategorie einer Email anhand von Betreff und Body.
Gibt 'destinataer', 'rechnung', 'stiftungsgeschichte', oder 'allgemein' zurueck.
"""
task_url = f"{api_url.rstrip('/')}/api/tasks/?task_id={task_id}"
waited = 0
interval = 2
while waited < max_wait:
try:
resp = requests.get(task_url, headers=headers, timeout=30)
resp.raise_for_status()
tasks = resp.json()
if tasks:
task = tasks[0] if isinstance(tasks, list) else tasks
status = task.get("status", "")
if status == "SUCCESS":
related = task.get("related_document")
if related:
# related_document can be a URL like "/api/documents/42/"
# or just an ID
if isinstance(related, str):
parts = related.rstrip("/").split("/")
try:
return int(parts[-1])
except (ValueError, IndexError):
logger.warning("Konnte Dokument-ID nicht aus '%s' extrahieren.", related)
return None
return int(related)
# Some versions use result_id or document_id
for key in ("result_id", "document_id", "id"):
val = task.get(key)
if val is not None:
try:
return int(val)
except (ValueError, TypeError):
pass
logger.warning("Task %s erfolgreich aber keine Dokument-ID gefunden: %s", task_id, task)
return None
elif status == "FAILURE":
logger.error("Paperless-Task %s fehlgeschlagen für '%s': %s", task_id, filename, task.get("result"))
return None
except requests.RequestException as exc:
logger.warning("Fehler beim Abfragen von Paperless-Task %s: %s", task_id, exc)
time.sleep(interval)
waited += interval
logger.error("Timeout beim Warten auf Paperless-Task %s für '%s' (%ds).", task_id, filename, max_wait)
return None
if has_destinataer:
return "destinataer"
text_to_check = f"{betreff}\n{email_text[:2000]}"
# Rechnungserkennung via Patterns
for pattern in RECHNUNG_PATTERNS:
if pattern.search(text_to_check):
return "rechnung"
# Stiftungsgeschichte-Erkennung
for pattern in GESCHICHTE_PATTERNS:
if pattern.search(text_to_check):
return "stiftungsgeschichte"
return "allgemein"
def _upload_to_paperless(content: bytes, filename: str, destinataer=None, betreff: str = "") -> int | None:
def _save_to_dms(content: bytes, filename: str, destinataer=None, betreff: str = "", kontext: str = "korrespondenz"):
"""
Lädt einen Anhang in Paperless-NGX hoch.
Speichert einen E-Mail-Anhang direkt als DokumentDatei im Django-DMS.
Gibt die neue Paperless-Dokument-ID zurück, oder None bei Fehler.
Gibt das DokumentDatei-Objekt zurueck, oder None bei Fehler.
"""
api_url = getattr(settings, "PAPERLESS_API_URL", None)
api_token = getattr(settings, "PAPERLESS_API_TOKEN", None)
from stiftung.models import DokumentDatei
from django.core.files.base import ContentFile
if not api_url or not api_token:
logger.warning("Paperless nicht konfiguriert Anhang '%s' wird nicht hochgeladen.", filename)
return None
# Tag-ID für Destinatäre ermitteln
tag_ids = []
dest_tag_id = getattr(settings, "PAPERLESS_DESTINATAERE_TAG_ID", None)
if dest_tag_id:
try:
tag_ids.append(int(dest_tag_id))
except (ValueError, TypeError):
pass
# Correspondent: Name des Destinatärs (optional, Paperless sucht/erstellt ihn)
correspondent_name = None
if destinataer:
correspondent_name = f"{destinataer.vorname} {destinataer.nachname}".strip()
# Dateiname bereinigen
safe_filename = filename or "anhang.pdf"
# Mime-Type bestimmen
safe_filename = filename or "anhang.bin"
mime_type, _ = mimetypes.guess_type(safe_filename)
mime_type = mime_type or "application/octet-stream"
upload_url = f"{api_url.rstrip('/')}/api/documents/post_document/"
headers = {"Authorization": f"Token {api_token}"}
form_data = {}
if tag_ids:
form_data["tags"] = tag_ids
if correspondent_name:
form_data["correspondent_name"] = correspondent_name
if betreff:
form_data["title"] = betreff[:128]
files = {"document": (safe_filename, io.BytesIO(content), mime_type)}
titel = f"{betreff[:100]} {safe_filename}" if betreff else safe_filename
beschreibung = ""
if destinataer:
beschreibung = (
f"Automatisch importiert aus E-Mail-Eingang.\n"
f"Absender: {destinataer.vorname} {destinataer.nachname} <{destinataer.email}>"
)
try:
response = requests.post(
upload_url,
headers=headers,
data=form_data,
files=files,
timeout=300, # 5 Minuten für große Anhänge
doc = DokumentDatei(
titel=titel[:255],
beschreibung=beschreibung,
kontext=kontext,
dateiname_original=safe_filename,
dateityp=mime_type,
dateigroesse=len(content),
destinataer=destinataer,
)
response.raise_for_status()
result = response.json()
# Paperless-ngx post_document returns a task UUID string.
# We need to poll the task status to get the actual document ID.
if isinstance(result, str):
task_id = result
doc_id = _poll_paperless_task(api_url, headers, task_id, safe_filename)
elif isinstance(result, int):
doc_id = result
else:
doc_id = result.get("id")
logger.info("Anhang '%s' erfolgreich in Paperless hochgeladen (ID: %s).", safe_filename, doc_id)
return doc_id
except requests.RequestException as exc:
logger.error("Fehler beim Hochladen von '%s' in Paperless: %s", safe_filename, exc)
doc.datei.save(safe_filename, ContentFile(content), save=False)
doc.save()
logger.info("Anhang '%s' als DokumentDatei gespeichert (ID: %s).", safe_filename, doc.pk)
return doc
except Exception as exc:
logger.error("Fehler beim Speichern von '%s' im DMS: %s", safe_filename, exc)
return None
# ---------------------------------------------------------------------------
# Haupttask
# ---------------------------------------------------------------------------
@shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_destinataer_emails")
def poll_destinataer_emails(self, search_all_recent_days=0):
@shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_emails")
def poll_emails(self, search_all_recent_days=0):
"""
Liest E-Mails aus dem IMAP-Postfach und verarbeitet sie.
Wird durch Celery Beat alle 15 Minuten ausgeführt.
Wird durch Celery Beat alle 15 Minuten ausgefuehrt.
Erkennt automatisch Destinataer-Emails, Rechnungen und allgemeine Post.
Args:
search_all_recent_days: Wenn > 0, werden alle E-Mails der letzten N Tage
durchsucht (nicht nur ungelesene). Nützlich für manuellen Abruf.
durchsucht (nicht nur ungelesene). Nuetzlich fuer manuellen Abruf.
"""
from stiftung.models import Destinataer, DestinataerEmailEingang, DokumentLink
from stiftung.models import Destinataer, EmailEingang
# IMAP-Konfiguration: DB (AppConfiguration) mit Fallback auf env/settings
from stiftung.utils.config import get_config
@@ -242,12 +199,12 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
if not all([imap_host, imap_user, imap_password]):
logger.warning(
"IMAP nicht konfiguriert (IMAP_HOST, IMAP_USER, IMAP_PASSWORD fehlen). "
"Task wird übersprungen."
"Task wird uebersprungen."
)
return {"status": "skipped", "reason": "IMAP not configured"}
# Vorab: Destinatär-E-Mail-Index für schnelle Zuordnung
# Nur aktive Destinatäre mit gesetzter E-Mail-Adresse
# Vorab: Destinataer-E-Mail-Index fuer schnelle Zuordnung
# Nur aktive Destinataere mit gesetzter E-Mail-Adresse
destinataer_by_email = {
d.email.lower(): d
for d in Destinataer.objects.filter(aktiv=True, email__isnull=False).exclude(email="")
@@ -257,8 +214,8 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
errors = 0
try:
# IMAP-Verbindung aufbauen (mit Socket-Timeout für große E-Mails)
imap_timeout = 120 # Sekunden genug für große Anhänge
# IMAP-Verbindung aufbauen (mit Socket-Timeout fuer grosse E-Mails)
imap_timeout = 120 # Sekunden genug fuer grosse Anhaenge
if imap_use_ssl:
mail = imaplib.IMAP4_SSL(imap_host, imap_port, timeout=imap_timeout)
else:
@@ -289,7 +246,7 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
# Absender ermitteln
from_raw = msg.get("From", "")
absender_name_raw, absender_email_raw = email.utils.parseaddr(from_raw)
absender_email = absender_email_raw.lower().strip()
absender_email_addr = absender_email_raw.lower().strip()
absender_name = _decode_header_value(absender_name_raw)
# Betreff
@@ -301,30 +258,48 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
# E-Mail-Text
email_text = _get_email_body(msg)
# Destinatär zuordnen
destinataer = destinataer_by_email.get(absender_email)
status = "zugewiesen" if destinataer else "unbekannt"
# Destinataer zuordnen
destinataer = destinataer_by_email.get(absender_email_addr)
# Prüfen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via
# Kategorie erkennen
kategorie = _detect_kategorie(betreff, email_text, has_destinataer=bool(destinataer))
# Status basierend auf Kategorie
if destinataer:
status = "zugewiesen"
elif kategorie == "rechnung":
status = "neu" # Muss manuell als Rechnung erfasst werden
else:
status = "unbekannt"
# DMS-Kontext fuer Anhaenge basierend auf Kategorie
dms_kontext_map = {
"rechnung": "rechnung",
"stiftungsgeschichte": "stiftungsgeschichte",
}
dms_kontext = dms_kontext_map.get(kategorie, "korrespondenz")
# Pruefen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via
# Datum + Absender + Betreff)
already_exists = DestinataerEmailEingang.objects.filter(
absender_email=absender_email,
already_exists = EmailEingang.objects.filter(
absender_email=absender_email_addr,
eingangsdatum=eingangsdatum,
betreff=betreff[:500],
).exists()
if already_exists:
logger.debug(
"E-Mail von %s am %s bereits vorhanden wird übersprungen.",
absender_email, eingangsdatum,
"E-Mail von %s am %s bereits vorhanden wird uebersprungen.",
absender_email_addr, eingangsdatum,
)
# Als gelesen markieren
mail.store(msg_id, "+FLAGS", "\\Seen")
continue
# Datensatz anlegen
eingang = DestinataerEmailEingang(
eingang = EmailEingang(
kategorie=kategorie,
destinataer=destinataer,
absender_email=absender_email,
absender_email=absender_email_addr,
absender_name=absender_name,
betreff=betreff[:500],
eingangsdatum=eingangsdatum,
@@ -332,8 +307,8 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
status=status,
)
# Anhänge verarbeiten
paperless_ids = []
# Anhaenge verarbeiten und als DokumentDatei im DMS speichern
dms_dokumente = []
if msg.is_multipart():
for part in msg.walk():
disposition = str(part.get_content_disposition() or "")
@@ -342,51 +317,42 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
content = part.get_payload(decode=True)
if not content:
logger.warning(
"Anhang '%s' hat keinen Inhalt (möglicherweise zu groß oder beschädigt) wird übersprungen.",
"Anhang '%s' hat keinen Inhalt wird uebersprungen.",
filename,
)
continue
doc_id = _upload_to_paperless(
doc = _save_to_dms(
content=content,
filename=filename,
destinataer=destinataer,
betreff=betreff,
kontext=dms_kontext,
)
if doc_id:
paperless_ids.append(doc_id)
# DokumentLink anlegen
DokumentLink.objects.create(
paperless_document_id=doc_id,
kontext="verwendungsnachweis",
titel=f"{betreff[:100]} {filename}" if filename else betreff[:200],
beschreibung=(
f"Automatisch importiert aus E-Mail-Eingang.\n"
f"Absender: {absender_name} <{absender_email}>\n"
f"Datum: {eingangsdatum.strftime('%d.%m.%Y %H:%M')}"
),
destinataer_id=destinataer.pk if destinataer else None,
)
if doc:
dms_dokumente.append(doc)
eingang.paperless_dokument_ids = paperless_ids
if paperless_ids:
eingang.status = "verarbeitet" if destinataer else "unbekannt"
if dms_dokumente:
eingang.status = "verarbeitet" if destinataer else status
eingang.save()
if dms_dokumente:
eingang.dokument_dateien.set(dms_dokumente)
# Als gelesen markieren
mail.store(msg_id, "+FLAGS", "\\Seen")
processed += 1
logger.info(
"E-Mail verarbeitet: von=%s, Destinatär=%s, Anhänge=%d",
absender_email,
str(destinataer) if destinataer else "unbekannt",
len(paperless_ids),
"E-Mail verarbeitet: von=%s, Kategorie=%s, Destinataer=%s, Anhaenge=%d",
absender_email_addr,
kategorie,
str(destinataer) if destinataer else "",
len(dms_dokumente),
)
except Exception as exc:
errors += 1
logger.exception("Fehler bei Verarbeitung von Nachricht %s: %s", msg_id, exc)
# Nicht als gelesen markieren wird beim nächsten Lauf erneut versucht
# Nicht als gelesen markieren wird beim naechsten Lauf erneut versucht
mail.close()
mail.logout()
@@ -395,9 +361,13 @@ def poll_destinataer_emails(self, search_all_recent_days=0):
logger.error("IMAP-Fehler: %s", exc)
raise self.retry(exc=exc)
except Exception as exc:
logger.exception("Unerwarteter Fehler im poll_destinataer_emails-Task: %s", exc)
logger.exception("Unerwarteter Fehler im poll_emails-Task: %s", exc)
raise self.retry(exc=exc)
result = {"status": "done", "processed": processed, "errors": errors}
logger.info("poll_destinataer_emails abgeschlossen: %s", result)
logger.info("poll_emails abgeschlossen: %s", result)
return result
# Backward-compatible alias for existing Celery Beat schedules
poll_destinataer_emails = poll_emails