- Add 120s IMAP socket timeout (was unlimited, could hang on large emails) - Increase Paperless upload timeout from 60s to 300s for large attachments - Increase manual poll UI timeout from 60s to 300s - Show error count in UI when emails fail to process - Log warning when attachment payload is empty/corrupted Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
343 lines
13 KiB
Python
343 lines
13 KiB
Python
"""
|
||
Celery-Tasks für die automatische Verarbeitung von Destinatär-E-Mails.
|
||
|
||
Workflow:
|
||
1. `poll_destinataer_emails` läuft alle 15 Minuten (Celery Beat)
|
||
2. Er liest ungelesene E-Mails aus dem IMAP-Postfach (paperless@vhtv-stiftung.de)
|
||
3. Für jede E-Mail:
|
||
a) Absender wird mit Destinatär-Datenbank abgeglichen (E-Mail-Feld)
|
||
b) Ein DestinataerEmailEingang-Datensatz wird angelegt
|
||
c) Alle Anhänge werden per Paperless-API hochgeladen
|
||
d) Für jeden Anhang wird ein DokumentLink erstellt
|
||
4. Nicht erkannte Absender werden als "unbekannt" markiert (manuelle Nachbearbeitung)
|
||
|
||
Konfiguration (Umgebungsvariablen in .env / compose.yml):
|
||
IMAP_HOST — IMAP-Server-Adresse (z. B. mail.vhtv-stiftung.de)
|
||
IMAP_PORT — Port (Standard: 993 für SSL)
|
||
IMAP_USER — Benutzername (z. B. paperless@vhtv-stiftung.de)
|
||
IMAP_PASSWORD — Passwort
|
||
IMAP_FOLDER — Ordner (Standard: INBOX)
|
||
"""
|
||
|
||
import email
|
||
import email.utils
|
||
import imaplib
|
||
import io
|
||
import logging
|
||
import mimetypes
|
||
from datetime import datetime, timezone as dt_timezone
|
||
from email.header import decode_header, make_header
|
||
|
||
import requests
|
||
from celery import shared_task
|
||
from django.conf import settings
|
||
from django.utils import timezone
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Hilfsfunktionen
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _decode_header_value(raw_value: str) -> str:
|
||
"""Dekodiert kodierte E-Mail-Header (z. B. UTF-8 oder Latin-1)."""
|
||
if not raw_value:
|
||
return ""
|
||
try:
|
||
return str(make_header(decode_header(raw_value)))
|
||
except Exception:
|
||
return raw_value
|
||
|
||
|
||
def _parse_email_date(date_str: str) -> datetime:
|
||
"""Parst das E-Mail-Datum und gibt ein timezone-aware datetime zurück."""
|
||
try:
|
||
parsed = email.utils.parsedate_to_datetime(date_str)
|
||
if parsed.tzinfo is None:
|
||
parsed = parsed.replace(tzinfo=dt_timezone.utc)
|
||
return parsed
|
||
except Exception:
|
||
return timezone.now()
|
||
|
||
|
||
def _get_email_body(msg) -> str:
|
||
"""Extrahiert den Text-Body aus einer E-Mail (bevorzugt plain text)."""
|
||
body_parts = []
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
ctype = part.get_content_type()
|
||
disposition = str(part.get_content_disposition() or "")
|
||
if ctype == "text/plain" and "attachment" not in disposition:
|
||
charset = part.get_content_charset() or "utf-8"
|
||
try:
|
||
body_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
|
||
except Exception:
|
||
pass
|
||
else:
|
||
charset = msg.get_content_charset() or "utf-8"
|
||
try:
|
||
body_parts.append(msg.get_payload(decode=True).decode(charset, errors="replace"))
|
||
except Exception:
|
||
pass
|
||
return "\n".join(body_parts).strip()
|
||
|
||
|
||
def _upload_to_paperless(content: bytes, filename: str, destinataer=None, betreff: str = "") -> int | None:
|
||
"""
|
||
Lädt einen Anhang in Paperless-NGX hoch.
|
||
|
||
Gibt die neue Paperless-Dokument-ID zurück, oder None bei Fehler.
|
||
"""
|
||
api_url = getattr(settings, "PAPERLESS_API_URL", None)
|
||
api_token = getattr(settings, "PAPERLESS_API_TOKEN", None)
|
||
|
||
if not api_url or not api_token:
|
||
logger.warning("Paperless nicht konfiguriert – Anhang '%s' wird nicht hochgeladen.", filename)
|
||
return None
|
||
|
||
# Tag-ID für Destinatäre ermitteln
|
||
tag_ids = []
|
||
dest_tag_id = getattr(settings, "PAPERLESS_DESTINATAERE_TAG_ID", None)
|
||
if dest_tag_id:
|
||
try:
|
||
tag_ids.append(int(dest_tag_id))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Correspondent: Name des Destinatärs (optional, Paperless sucht/erstellt ihn)
|
||
correspondent_name = None
|
||
if destinataer:
|
||
correspondent_name = f"{destinataer.vorname} {destinataer.nachname}".strip()
|
||
|
||
# Dateiname bereinigen
|
||
safe_filename = filename or "anhang.pdf"
|
||
|
||
# Mime-Type bestimmen
|
||
mime_type, _ = mimetypes.guess_type(safe_filename)
|
||
mime_type = mime_type or "application/octet-stream"
|
||
|
||
upload_url = f"{api_url.rstrip('/')}/api/documents/post_document/"
|
||
headers = {"Authorization": f"Token {api_token}"}
|
||
|
||
form_data = {}
|
||
if tag_ids:
|
||
form_data["tags"] = tag_ids
|
||
if correspondent_name:
|
||
form_data["correspondent_name"] = correspondent_name
|
||
if betreff:
|
||
form_data["title"] = betreff[:128]
|
||
|
||
files = {"document": (safe_filename, io.BytesIO(content), mime_type)}
|
||
|
||
try:
|
||
response = requests.post(
|
||
upload_url,
|
||
headers=headers,
|
||
data=form_data,
|
||
files=files,
|
||
timeout=300, # 5 Minuten für große Anhänge
|
||
)
|
||
response.raise_for_status()
|
||
# Paperless gibt die neue Dokument-ID zurück (als Integer oder UUID-String)
|
||
result = response.json()
|
||
doc_id = result if isinstance(result, int) else result.get("id")
|
||
logger.info("Anhang '%s' erfolgreich in Paperless hochgeladen (ID: %s).", safe_filename, doc_id)
|
||
return doc_id
|
||
except requests.RequestException as exc:
|
||
logger.error("Fehler beim Hochladen von '%s' in Paperless: %s", safe_filename, exc)
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Haupttask
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@shared_task(bind=True, max_retries=3, default_retry_delay=300, name="stiftung.tasks.poll_destinataer_emails")
|
||
def poll_destinataer_emails(self, search_all_recent_days=0):
|
||
"""
|
||
Liest E-Mails aus dem IMAP-Postfach und verarbeitet sie.
|
||
|
||
Wird durch Celery Beat alle 15 Minuten ausgeführt.
|
||
|
||
Args:
|
||
search_all_recent_days: Wenn > 0, werden alle E-Mails der letzten N Tage
|
||
durchsucht (nicht nur ungelesene). Nützlich für manuellen Abruf.
|
||
"""
|
||
from stiftung.models import Destinataer, DestinataerEmailEingang, DokumentLink
|
||
|
||
# IMAP-Konfiguration: DB (AppConfiguration) mit Fallback auf env/settings
|
||
from stiftung.utils.config import get_config
|
||
|
||
imap_host = get_config("imap_host")
|
||
imap_port = int(get_config("imap_port", 993))
|
||
imap_user = get_config("imap_user")
|
||
imap_password = get_config("imap_password")
|
||
imap_folder = get_config("imap_folder", "INBOX")
|
||
imap_use_ssl = get_config("imap_use_ssl", True)
|
||
|
||
if not all([imap_host, imap_user, imap_password]):
|
||
logger.warning(
|
||
"IMAP nicht konfiguriert (IMAP_HOST, IMAP_USER, IMAP_PASSWORD fehlen). "
|
||
"Task wird übersprungen."
|
||
)
|
||
return {"status": "skipped", "reason": "IMAP not configured"}
|
||
|
||
# Vorab: Destinatär-E-Mail-Index für schnelle Zuordnung
|
||
# Nur aktive Destinatäre mit gesetzter E-Mail-Adresse
|
||
destinataer_by_email = {
|
||
d.email.lower(): d
|
||
for d in Destinataer.objects.filter(aktiv=True, email__isnull=False).exclude(email="")
|
||
}
|
||
|
||
processed = 0
|
||
errors = 0
|
||
|
||
try:
|
||
# IMAP-Verbindung aufbauen (mit Socket-Timeout für große E-Mails)
|
||
imap_timeout = 120 # Sekunden – genug für große Anhänge
|
||
if imap_use_ssl:
|
||
mail = imaplib.IMAP4_SSL(imap_host, imap_port, timeout=imap_timeout)
|
||
else:
|
||
mail = imaplib.IMAP4(imap_host, imap_port, timeout=imap_timeout)
|
||
|
||
mail.login(imap_user, imap_password)
|
||
mail.select(imap_folder)
|
||
|
||
# Nachrichten suchen
|
||
if search_all_recent_days and search_all_recent_days > 0:
|
||
from datetime import timedelta
|
||
since_date = (datetime.now(dt_timezone.utc) - timedelta(days=search_all_recent_days)).strftime("%d-%b-%Y")
|
||
_, message_ids_raw = mail.search(None, "SINCE", since_date)
|
||
search_mode = f"ALL seit {since_date}"
|
||
else:
|
||
_, message_ids_raw = mail.search(None, "UNSEEN")
|
||
search_mode = "UNSEEN"
|
||
message_ids = message_ids_raw[0].split()
|
||
|
||
logger.info("Postfach '%s' (%s): %d Nachricht(en) gefunden.", imap_folder, search_mode, len(message_ids))
|
||
|
||
for msg_id in message_ids:
|
||
try:
|
||
_, msg_data = mail.fetch(msg_id, "(RFC822)")
|
||
raw_email = msg_data[0][1]
|
||
msg = email.message_from_bytes(raw_email)
|
||
|
||
# Absender ermitteln
|
||
from_raw = msg.get("From", "")
|
||
absender_name_raw, absender_email_raw = email.utils.parseaddr(from_raw)
|
||
absender_email = absender_email_raw.lower().strip()
|
||
absender_name = _decode_header_value(absender_name_raw)
|
||
|
||
# Betreff
|
||
betreff = _decode_header_value(msg.get("Subject", ""))
|
||
|
||
# Eingangsdatum
|
||
eingangsdatum = _parse_email_date(msg.get("Date", ""))
|
||
|
||
# E-Mail-Text
|
||
email_text = _get_email_body(msg)
|
||
|
||
# Destinatär zuordnen
|
||
destinataer = destinataer_by_email.get(absender_email)
|
||
status = "zugewiesen" if destinataer else "unbekannt"
|
||
|
||
# Prüfen ob diese E-Mail bereits verarbeitet wurde (Duplikat-Check via
|
||
# Datum + Absender + Betreff)
|
||
already_exists = DestinataerEmailEingang.objects.filter(
|
||
absender_email=absender_email,
|
||
eingangsdatum=eingangsdatum,
|
||
betreff=betreff[:500],
|
||
).exists()
|
||
if already_exists:
|
||
logger.debug(
|
||
"E-Mail von %s am %s bereits vorhanden – wird übersprungen.",
|
||
absender_email, eingangsdatum,
|
||
)
|
||
# Als gelesen markieren
|
||
mail.store(msg_id, "+FLAGS", "\\Seen")
|
||
continue
|
||
|
||
# Datensatz anlegen
|
||
eingang = DestinataerEmailEingang(
|
||
destinataer=destinataer,
|
||
absender_email=absender_email,
|
||
absender_name=absender_name,
|
||
betreff=betreff[:500],
|
||
eingangsdatum=eingangsdatum,
|
||
email_text=email_text,
|
||
status=status,
|
||
)
|
||
|
||
# Anhänge verarbeiten
|
||
paperless_ids = []
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
disposition = str(part.get_content_disposition() or "")
|
||
if "attachment" in disposition:
|
||
filename = _decode_header_value(part.get_filename() or "")
|
||
content = part.get_payload(decode=True)
|
||
if not content:
|
||
logger.warning(
|
||
"Anhang '%s' hat keinen Inhalt (möglicherweise zu groß oder beschädigt) – wird übersprungen.",
|
||
filename,
|
||
)
|
||
continue
|
||
|
||
doc_id = _upload_to_paperless(
|
||
content=content,
|
||
filename=filename,
|
||
destinataer=destinataer,
|
||
betreff=betreff,
|
||
)
|
||
if doc_id:
|
||
paperless_ids.append(doc_id)
|
||
# DokumentLink anlegen
|
||
DokumentLink.objects.create(
|
||
paperless_document_id=doc_id,
|
||
kontext="verwendungsnachweis",
|
||
titel=f"{betreff[:100]} – {filename}" if filename else betreff[:200],
|
||
beschreibung=(
|
||
f"Automatisch importiert aus E-Mail-Eingang.\n"
|
||
f"Absender: {absender_name} <{absender_email}>\n"
|
||
f"Datum: {eingangsdatum.strftime('%d.%m.%Y %H:%M')}"
|
||
),
|
||
destinataer_id=destinataer.pk if destinataer else None,
|
||
)
|
||
|
||
eingang.paperless_dokument_ids = paperless_ids
|
||
if paperless_ids:
|
||
eingang.status = "verarbeitet" if destinataer else "unbekannt"
|
||
eingang.save()
|
||
|
||
# Als gelesen markieren
|
||
mail.store(msg_id, "+FLAGS", "\\Seen")
|
||
processed += 1
|
||
logger.info(
|
||
"E-Mail verarbeitet: von=%s, Destinatär=%s, Anhänge=%d",
|
||
absender_email,
|
||
str(destinataer) if destinataer else "unbekannt",
|
||
len(paperless_ids),
|
||
)
|
||
|
||
except Exception as exc:
|
||
errors += 1
|
||
logger.exception("Fehler bei Verarbeitung von Nachricht %s: %s", msg_id, exc)
|
||
# Nicht als gelesen markieren – wird beim nächsten Lauf erneut versucht
|
||
|
||
mail.close()
|
||
mail.logout()
|
||
|
||
except imaplib.IMAP4.error as exc:
|
||
logger.error("IMAP-Fehler: %s", exc)
|
||
raise self.retry(exc=exc)
|
||
except Exception as exc:
|
||
logger.exception("Unerwarteter Fehler im poll_destinataer_emails-Task: %s", exc)
|
||
raise self.retry(exc=exc)
|
||
|
||
result = {"status": "done", "processed": processed, "errors": errors}
|
||
logger.info("poll_destinataer_emails abgeschlossen: %s", result)
|
||
return result
|