- forms.py → forms/ Package (8 Domänen: destinataere, land, finanzen, foerderung, dokumente, veranstaltung, system, geschichte) - admin.py → admin/ Package (7 Domänen, alle 22 @admin.register dekoriert) - views.py (8845 Zeilen) → views/ Package (10 Domänen: dashboard, destinataere, land, paechter, finanzen, foerderung, dokumente, unterstuetzungen, veranstaltung, geschichte, system) - __init__.py in jedem Package re-exportiert alle Symbole für Rückwärtskompatibilität - urls.py bleibt unverändert (funktioniert durch Re-Exports) - Django system check: 0 Fehler, alle URL-Auflösungen funktionieren Keine funktionalen Änderungen – reine Strukturverbesserung für Vision 2026. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2140 lines
76 KiB
Python
2140 lines
76 KiB
Python
# views/system.py
|
||
# Phase 0: Vision 2026 – Code-Refactoring
|
||
|
||
import csv
|
||
import io
|
||
import json
|
||
import os
|
||
import time
|
||
from datetime import datetime, timedelta, date
|
||
from decimal import Decimal
|
||
|
||
import qrcode
|
||
import qrcode.image.svg
|
||
import requests
|
||
from django.conf import settings
|
||
from django.contrib import messages
|
||
from django.contrib.auth.decorators import login_required
|
||
from django.core.paginator import Paginator
|
||
from django.db.models import (Avg, Count, DecimalField, F, IntegerField, Q,
|
||
Sum, Value)
|
||
from django.db.models.functions import Cast, Coalesce, NullIf, Replace
|
||
from django.http import HttpResponse, JsonResponse
|
||
from django.shortcuts import get_object_or_404, redirect, render
|
||
from django.urls import reverse
|
||
from django.utils import timezone
|
||
from django.views.decorators.csrf import csrf_exempt
|
||
from django_otp.decorators import otp_required
|
||
from django_otp.plugins.otp_totp.models import TOTPDevice
|
||
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
|
||
from django_otp.util import random_hex
|
||
from rest_framework.decorators import api_view
|
||
from rest_framework.response import Response
|
||
|
||
from stiftung.models import (AppConfiguration, AuditLog, BackupJob, BankTransaction,
|
||
BriefVorlage, CSVImport, Destinataer,
|
||
DestinataerEmailEingang, DestinataerNotiz,
|
||
DestinataerUnterstuetzung,
|
||
DokumentLink, Foerderung, GeschichteBild, GeschichteSeite,
|
||
Land, LandAbrechnung, LandVerpachtung, Paechter, Person,
|
||
Rentmeister, StiftungsKalenderEintrag, StiftungsKonto,
|
||
UnterstuetzungWiederkehrend, Veranstaltung,
|
||
Veranstaltungsteilnehmer, Verwaltungskosten,
|
||
VierteljahresNachweis)
|
||
from stiftung.forms import (
|
||
DestinataerForm, DestinataerUnterstuetzungForm, DestinataerNotizForm,
|
||
FoerderungForm, GeschichteBildForm, GeschichteSeiteForm,
|
||
LandForm, LandVerpachtungForm, LandAbrechnungForm,
|
||
PaechterForm, DokumentLinkForm,
|
||
RentmeisterForm, StiftungsKontoForm, VerwaltungskostenForm,
|
||
BankTransactionForm, BankImportForm,
|
||
UnterstuetzungForm, UnterstuetzungWiederkehrendForm,
|
||
UnterstuetzungMarkAsPaidForm, VierteljahresNachweisForm,
|
||
UserCreationForm, UserUpdateForm, PasswordChangeForm, UserPermissionForm,
|
||
TwoFactorSetupForm, TwoFactorVerifyForm, TwoFactorDisableForm,
|
||
BackupTokenRegenerateForm, PersonForm,
|
||
VeranstaltungForm, VeranstaltungsteilnehmerForm,
|
||
)
|
||
|
||
|
||
def get_pdf_generator():
|
||
"""Lazy load PDF generator to handle missing dependencies gracefully"""
|
||
try:
|
||
from .utils.pdf_generator import pdf_generator
|
||
|
||
return pdf_generator
|
||
except ImportError as e:
|
||
# Store the error message for use in MockPDFGenerator
|
||
error_message = str(e)
|
||
|
||
# Return a mock generator if dependencies are missing
|
||
class MockPDFGenerator:
|
||
def is_available(self):
|
||
return False
|
||
|
||
def export_data_list_pdf(self, *args, **kwargs):
|
||
from django.http import HttpResponse
|
||
|
||
error_html = f"""
|
||
<!DOCTYPE html>
|
||
<html>
|
||
<head><title>PDF Not Available</title></head>
|
||
<body>
|
||
<h1>PDF Export Not Available</h1>
|
||
<p>PDF generation requires additional system dependencies that are not installed.</p>
|
||
<p>Error: {error_message}</p>
|
||
<p>Please install WeasyPrint dependencies or use CSV export instead.</p>
|
||
</body>
|
||
</html>
|
||
"""
|
||
response = HttpResponse(error_html, content_type="text/html")
|
||
response["Content-Disposition"] = (
|
||
'inline; filename="pdf_not_available.html"'
|
||
)
|
||
return response
|
||
|
||
return MockPDFGenerator()
|
||
|
||
|
||
class GrampsClient:
|
||
"""Lightweight client for Gramps Web API."""
|
||
|
||
def __init__(
|
||
self, base_url: str, token: str = "", username: str = "", password: str = ""
|
||
):
|
||
self.base_url = base_url.rstrip("/")
|
||
self.session = requests.Session()
|
||
if token:
|
||
self.session.headers.update({"Authorization": f"Bearer {token}"})
|
||
self.username = username
|
||
self.password = password
|
||
self._cached_token = token
|
||
|
||
def search_people(self, query: str, limit: int = 5):
|
||
try:
|
||
r = self.session.get(
|
||
f"{self.base_url}/api/people/",
|
||
params={"q": query, "limit": limit},
|
||
timeout=10,
|
||
)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except Exception as e:
|
||
# try login-once if unauthorized and we have credentials
|
||
if self.username and self.password and "401" in str(e):
|
||
if self._login():
|
||
return self.search_people(query, limit)
|
||
return {"error": str(e)}
|
||
|
||
def get_person(self, handle_or_id: str):
|
||
try:
|
||
r = self.session.get(
|
||
f"{self.base_url}/api/people/{handle_or_id}", timeout=10
|
||
)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except Exception as e:
|
||
if self.username and self.password and "401" in str(e):
|
||
if self._login():
|
||
return self.get_person(handle_or_id)
|
||
return {"error": str(e)}
|
||
|
||
def _login(self) -> bool:
|
||
try:
|
||
# try common endpoints
|
||
endpoints = [
|
||
(
|
||
"/api/auth/login",
|
||
{"username": self.username, "password": self.password},
|
||
"json",
|
||
),
|
||
(
|
||
"/auth/login",
|
||
{"username": self.username, "password": self.password},
|
||
"json",
|
||
),
|
||
(
|
||
"/api/token",
|
||
{"username": self.username, "password": self.password},
|
||
"form",
|
||
),
|
||
(
|
||
"/login",
|
||
{"username": self.username, "password": self.password},
|
||
"form",
|
||
),
|
||
(
|
||
"/token",
|
||
{"username": self.username, "password": self.password},
|
||
"form",
|
||
),
|
||
(
|
||
"/api/login",
|
||
{"username": self.username, "password": self.password},
|
||
"json",
|
||
),
|
||
]
|
||
for path, payload, mode in endpoints:
|
||
url = f"{self.base_url}{path}"
|
||
if mode == "json":
|
||
r = self.session.post(
|
||
url, json=payload, timeout=10, allow_redirects=False
|
||
)
|
||
else:
|
||
r = self.session.post(
|
||
url, data=payload, timeout=10, allow_redirects=False
|
||
)
|
||
# Success with token body
|
||
if r.status_code in (200, 201) and "application/json" in r.headers.get(
|
||
"Content-Type", ""
|
||
):
|
||
data = r.json()
|
||
token = (
|
||
data.get("access_token")
|
||
or data.get("token")
|
||
or data.get("access")
|
||
or data.get("jwt")
|
||
)
|
||
if token:
|
||
self._cached_token = token
|
||
self.session.headers.update(
|
||
{"Authorization": f"Bearer {token}"}
|
||
)
|
||
return True
|
||
# Success via session cookie and redirect
|
||
if r.status_code in (200, 302) and (
|
||
"set-cookie" in {k.lower(): v for k, v in r.headers.items()}
|
||
):
|
||
return True
|
||
# Basic Auth fallback (some setups protect API with Basic)
|
||
try:
|
||
self.session.auth = (self.username, self.password)
|
||
r = self.session.get(f"{self.base_url}/api/people/?limit=1", timeout=10)
|
||
if r.status_code == 200:
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def get_gramps_client() -> GrampsClient:
|
||
return GrampsClient(
|
||
getattr(settings, "GRAMPS_URL", ""),
|
||
getattr(settings, "GRAMPS_API_TOKEN", ""),
|
||
getattr(settings, "GRAMPS_USERNAME", ""),
|
||
getattr(settings, "GRAMPS_PASSWORD", ""),
|
||
)
|
||
|
||
|
||
@api_view(["GET"])
|
||
def gramps_debug_api(_request):
|
||
return Response(
|
||
{
|
||
"GRAMPS_URL": getattr(settings, "GRAMPS_URL", ""),
|
||
"has_username": bool(getattr(settings, "GRAMPS_USERNAME", "")),
|
||
"has_password": bool(getattr(settings, "GRAMPS_PASSWORD", "")),
|
||
}
|
||
)
|
||
|
||
|
||
from stiftung.models import DestinataerNotiz, DestinataerUnterstuetzung
|
||
|
||
from stiftung.forms import (DestinataerForm, DestinataerNotizForm,
|
||
DestinataerUnterstuetzungForm, DokumentLinkForm,
|
||
FoerderungForm, LandForm, PaechterForm, PersonForm,
|
||
UnterstuetzungForm, UnterstuetzungMarkAsPaidForm, VierteljahresNachweisForm)
|
||
|
||
@login_required
|
||
def csv_import_list(request):
|
||
"""List all CSV import operations"""
|
||
imports = CSVImport.objects.all().order_by("-started_at")
|
||
|
||
paginator = Paginator(imports, 20)
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
context = {
|
||
"page_obj": page_obj,
|
||
"import_types": CSVImport.IMPORT_TYPE_CHOICES,
|
||
"status_choices": CSVImport.STATUS_CHOICES,
|
||
}
|
||
return render(request, "stiftung/csv_import_list.html", context)
|
||
|
||
|
||
@login_required
|
||
def csv_import_create(request):
|
||
"""Show CSV import form and handle file upload"""
|
||
if request.method == "POST":
|
||
import_type = request.POST.get("import_type")
|
||
csv_file = request.FILES.get("csv_file")
|
||
|
||
if not csv_file or not import_type:
|
||
messages.error(
|
||
request, "Bitte wählen Sie einen Import-Typ und eine CSV-Datei aus."
|
||
)
|
||
return redirect("stiftung:csv_import_create")
|
||
|
||
if not csv_file.name.endswith(".csv"):
|
||
messages.error(request, "Bitte wählen Sie eine gültige CSV-Datei aus.")
|
||
return redirect("stiftung:csv_import_create")
|
||
|
||
try:
|
||
# Create import record
|
||
csv_import = CSVImport.objects.create(
|
||
import_type=import_type,
|
||
filename=csv_file.name,
|
||
file_size=csv_file.size,
|
||
created_by=(
|
||
request.user.username
|
||
if request.user.is_authenticated
|
||
else "Unknown"
|
||
),
|
||
status="processing",
|
||
)
|
||
|
||
# Process the CSV file
|
||
if import_type == "destinataere":
|
||
result = process_destinataere_csv(csv_file, csv_import)
|
||
elif import_type == "paechter":
|
||
result = process_paechter_csv(csv_file, csv_import)
|
||
elif import_type == "personen":
|
||
result = process_personen_csv(csv_file, csv_import)
|
||
elif import_type == "laendereien":
|
||
result = process_laendereien_csv(csv_file, csv_import)
|
||
else:
|
||
messages.error(request, "Unbekannter Import-Typ.")
|
||
csv_import.status = "failed"
|
||
csv_import.save()
|
||
return redirect("stiftung:csv_import_create")
|
||
|
||
# Update import record
|
||
csv_import.total_rows = result["total_rows"]
|
||
csv_import.imported_rows = result["imported_rows"]
|
||
csv_import.failed_rows = result["failed_rows"]
|
||
csv_import.error_log = result["error_log"]
|
||
csv_import.status = result["status"]
|
||
csv_import.completed_at = timezone.now()
|
||
csv_import.save()
|
||
|
||
if result["status"] == "completed":
|
||
messages.success(
|
||
request,
|
||
f'CSV-Import erfolgreich! {result["imported_rows"]} Datensätze importiert.',
|
||
)
|
||
elif result["status"] == "partial":
|
||
messages.warning(
|
||
request,
|
||
f'CSV-Import teilweise erfolgreich. {result["imported_rows"]} importiert, {result["failed_rows"]} fehlgeschlagen.',
|
||
)
|
||
else:
|
||
messages.error(
|
||
request, f'CSV-Import fehlgeschlagen. {result["error_log"]}'
|
||
)
|
||
|
||
return redirect("stiftung:csv_import_list")
|
||
|
||
except Exception as e:
|
||
messages.error(request, f"Fehler beim CSV-Import: {str(e)}")
|
||
return redirect("stiftung:csv_import_create")
|
||
|
||
context = {
|
||
"import_types": CSVImport.IMPORT_TYPE_CHOICES,
|
||
}
|
||
return render(request, "stiftung/csv_import_form.html", context)
|
||
|
||
|
||
def process_personen_csv(csv_file, csv_import):
|
||
"""Process CSV file for Personen import"""
|
||
decoded_file = csv_file.read().decode("utf-8")
|
||
# Handle both comma and semicolon separated files
|
||
if ";" in decoded_file.split("\n")[0]:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";")
|
||
else:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file))
|
||
|
||
total_rows = 0
|
||
imported_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for row_num, row in enumerate(
|
||
csv_data, start=2
|
||
): # Start at 2 because row 1 is header
|
||
total_rows += 1
|
||
|
||
try:
|
||
# Map CSV columns to model fields
|
||
person_data = {
|
||
"vorname": row.get("Vorname", "").strip(),
|
||
"nachname": row.get("Nachname", "").strip(),
|
||
"familienzweig": row.get("Familienzweig", "hauptzweig").strip(),
|
||
"email": row.get("E-Mail", "").strip() or None,
|
||
"telefon": row.get("Telefon", "").strip() or None,
|
||
"iban": row.get("IBAN", "").strip() or None,
|
||
"adresse": row.get("Adresse", "").strip() or None,
|
||
"notizen": row.get("Notizen", "").strip() or None,
|
||
"aktiv": row.get("Aktiv", "true").lower() == "true",
|
||
}
|
||
|
||
# Handle date fields
|
||
if row.get("Geburtsdatum"):
|
||
try:
|
||
person_data["geburtsdatum"] = datetime.strptime(
|
||
row["Geburtsdatum"], "%d.%m.%Y"
|
||
).date()
|
||
except ValueError:
|
||
try:
|
||
person_data["geburtsdatum"] = datetime.strptime(
|
||
row["Geburtsdatum"], "%Y-%m-%d"
|
||
).date()
|
||
except ValueError:
|
||
person_data["geburtsdatum"] = None
|
||
|
||
# Validate required fields
|
||
if not person_data["vorname"] or not person_data["nachname"]:
|
||
error_log.append(
|
||
f"Zeile {row_num}: Vorname und Nachname sind erforderlich"
|
||
)
|
||
failed_rows += 1
|
||
continue
|
||
|
||
# Check if person already exists
|
||
existing_person = Person.objects.filter(
|
||
vorname__iexact=person_data["vorname"],
|
||
nachname__iexact=person_data["nachname"],
|
||
).first()
|
||
|
||
if existing_person:
|
||
# Update existing person
|
||
for field, value in person_data.items():
|
||
if value is not None:
|
||
setattr(existing_person, field, value)
|
||
existing_person.save()
|
||
else:
|
||
# Create new person
|
||
Person.objects.create(**person_data)
|
||
|
||
imported_rows += 1
|
||
|
||
except Exception as e:
|
||
error_log.append(f"Zeile {row_num}: {str(e)}")
|
||
failed_rows += 1
|
||
|
||
# Determine status
|
||
if failed_rows == 0:
|
||
status = "completed"
|
||
elif imported_rows > 0:
|
||
status = "partial"
|
||
else:
|
||
status = "failed"
|
||
|
||
return {
|
||
"total_rows": total_rows,
|
||
"imported_rows": imported_rows,
|
||
"failed_rows": failed_rows,
|
||
"error_log": "\n".join(error_log) if error_log else None,
|
||
"status": status,
|
||
}
|
||
|
||
|
||
def process_destinataere_csv(csv_file, csv_import):
|
||
"""Process CSV file for Destinatäre import"""
|
||
decoded_file = csv_file.read().decode("utf-8")
|
||
# Handle both comma and semicolon separated files
|
||
if ";" in decoded_file.split("\n")[0]:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";")
|
||
else:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file))
|
||
|
||
total_rows = 0
|
||
imported_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for row_num, row in enumerate(
|
||
csv_data, start=2
|
||
): # Start at 2 because row 1 is header
|
||
total_rows += 1
|
||
|
||
try:
|
||
# Helper function to parse boolean values from CSV
|
||
def parse_boolean(value, default=False):
|
||
"""Parse boolean values from CSV with multiple accepted formats"""
|
||
if not value:
|
||
return default
|
||
value_str = str(value).strip().lower()
|
||
# Accept various true values
|
||
true_values = ['true', 'ja', 'yes', '1', 'wahr', 'x']
|
||
# Accept various false values
|
||
false_values = ['false', 'nein', 'no', '0', 'falsch', '']
|
||
|
||
if value_str in true_values:
|
||
return True
|
||
elif value_str in false_values:
|
||
return False
|
||
else:
|
||
# If unclear, return default
|
||
return default
|
||
|
||
# Map CSV columns to model fields
|
||
destinataer_data = {
|
||
"vorname": row.get("Vorname", "").strip(),
|
||
"nachname": row.get("Nachname", "").strip(),
|
||
"familienzweig": row.get("Familienzweig", "hauptzweig").strip(),
|
||
"email": row.get("E-Mail", "").strip() or None,
|
||
"telefon": row.get("Telefon", "").strip() or None,
|
||
"iban": row.get("IBAN", "").strip() or None,
|
||
"strasse": row.get("Straße", "").strip() or None,
|
||
"plz": row.get("PLZ", "").strip() or None,
|
||
"ort": row.get("Ort", "").strip() or None,
|
||
"berufsgruppe": row.get("Berufsgruppe", "andere").strip(),
|
||
"ausbildungsstand": row.get("Ausbildungsstand", "").strip() or None,
|
||
"institution": row.get("Institution", "").strip() or None,
|
||
"projekt_beschreibung": row.get("Projektbeschreibung", "").strip()
|
||
or None,
|
||
"jaehrliches_einkommen": (
|
||
float(row.get("Jährliches_Einkommen", 0))
|
||
if row.get("Jährliches_Einkommen")
|
||
else None
|
||
),
|
||
"notizen": row.get("Notizen", "").strip() or None,
|
||
# Boolean fields with improved parsing
|
||
"finanzielle_notlage": parse_boolean(row.get("Finanzielle_Notlage"), False),
|
||
"aktiv": parse_boolean(row.get("Aktiv"), True),
|
||
"ist_abkoemmling": parse_boolean(row.get("Ist_Abkömmling"), False),
|
||
"unterstuetzung_bestaetigt": parse_boolean(row.get("Unterstützung_bestätigt"), False),
|
||
"studiennachweis_erforderlich": parse_boolean(row.get("Studiennachweis_erforderlich"), False),
|
||
}
|
||
|
||
# Handle numeric fields
|
||
if row.get("Haushaltsgröße"):
|
||
try:
|
||
destinataer_data["haushaltsgroesse"] = int(row["Haushaltsgröße"])
|
||
except ValueError:
|
||
pass
|
||
|
||
if row.get("Monatliche_Bezüge"):
|
||
try:
|
||
destinataer_data["monatliche_bezuege"] = float(row["Monatliche_Bezüge"])
|
||
except ValueError:
|
||
pass
|
||
|
||
if row.get("Vermögen"):
|
||
try:
|
||
destinataer_data["vermoegen"] = float(row["Vermögen"])
|
||
except ValueError:
|
||
pass
|
||
|
||
if row.get("Vierteljährlicher_Betrag"):
|
||
try:
|
||
destinataer_data["vierteljaehrlicher_betrag"] = float(row["Vierteljährlicher_Betrag"])
|
||
except ValueError:
|
||
pass
|
||
|
||
# Handle date fields
|
||
if row.get("Geburtsdatum"):
|
||
try:
|
||
destinataer_data["geburtsdatum"] = datetime.strptime(
|
||
row["Geburtsdatum"], "%d.%m.%Y"
|
||
).date()
|
||
except ValueError:
|
||
try:
|
||
destinataer_data["geburtsdatum"] = datetime.strptime(
|
||
row["Geburtsdatum"], "%Y-%m-%d"
|
||
).date()
|
||
except ValueError:
|
||
destinataer_data["geburtsdatum"] = None
|
||
|
||
if row.get("Letzter_Studiennachweis"):
|
||
try:
|
||
destinataer_data["letzter_studiennachweis"] = datetime.strptime(
|
||
row["Letzter_Studiennachweis"], "%d.%m.%Y"
|
||
).date()
|
||
except ValueError:
|
||
try:
|
||
destinataer_data["letzter_studiennachweis"] = datetime.strptime(
|
||
row["Letzter_Studiennachweis"], "%Y-%m-%d"
|
||
).date()
|
||
except ValueError:
|
||
destinataer_data["letzter_studiennachweis"] = None
|
||
|
||
# Validate required fields
|
||
if not destinataer_data["vorname"] or not destinataer_data["nachname"]:
|
||
error_log.append(
|
||
f"Zeile {row_num}: Vorname und Nachname sind erforderlich"
|
||
)
|
||
failed_rows += 1
|
||
continue
|
||
|
||
# Check if destinataer already exists
|
||
existing_destinataer = Destinataer.objects.filter(
|
||
vorname__iexact=destinataer_data["vorname"],
|
||
nachname__iexact=destinataer_data["nachname"],
|
||
).first()
|
||
|
||
if existing_destinataer:
|
||
# Update existing destinataer
|
||
for field, value in destinataer_data.items():
|
||
if value is not None:
|
||
setattr(existing_destinataer, field, value)
|
||
existing_destinataer.save()
|
||
else:
|
||
# Create new destinataer
|
||
Destinataer.objects.create(**destinataer_data)
|
||
|
||
imported_rows += 1
|
||
|
||
except Exception as e:
|
||
error_log.append(f"Zeile {row_num}: {str(e)}")
|
||
failed_rows += 1
|
||
|
||
# Determine status
|
||
if failed_rows == 0:
|
||
status = "completed"
|
||
elif imported_rows > 0:
|
||
status = "partial"
|
||
else:
|
||
status = "failed"
|
||
|
||
return {
|
||
"total_rows": total_rows,
|
||
"imported_rows": imported_rows,
|
||
"failed_rows": failed_rows,
|
||
"error_log": "\n".join(error_log) if error_log else None,
|
||
"status": status,
|
||
}
|
||
|
||
|
||
def process_paechter_csv(csv_file, csv_import):
|
||
"""Process CSV file for Paechter import"""
|
||
decoded_file = csv_file.read().decode("utf-8")
|
||
|
||
# Handle both comma and semicolon separated files
|
||
if ";" in decoded_file.split("\n")[0]:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";")
|
||
else:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file))
|
||
|
||
total_rows = 0
|
||
imported_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for row_num, row in enumerate(
|
||
csv_data, start=2
|
||
): # Start at 2 because row 1 is header
|
||
total_rows += 1
|
||
|
||
try:
|
||
# Get raw values from CSV - handle both semicolon and comma separated
|
||
# Handle BOM in column names
|
||
vorname_raw = row.get("Vorname", "") or row.get("\ufeffVorname", "")
|
||
nachname_raw = row.get("Nachname", "")
|
||
personentyp_raw = row.get("Personentyp", "")
|
||
|
||
# Clean up the values (remove extra whitespace but keep empty strings)
|
||
vorname_raw = vorname_raw.strip() if vorname_raw else ""
|
||
nachname_raw = nachname_raw.strip() if nachname_raw else ""
|
||
personentyp_raw = personentyp_raw.strip() if personentyp_raw else ""
|
||
|
||
# Debug: Log raw values and available columns
|
||
error_log.append(f"Zeile {row_num}: Available columns: {list(row.keys())}")
|
||
error_log.append(
|
||
f"Zeile {row_num}: RAW Vorname='{vorname_raw}', Nachname='{nachname_raw}', Personentyp='{personentyp_raw}'"
|
||
)
|
||
|
||
# Determine personentyp based on the data
|
||
if personentyp_raw in ["Gesellschaft", "KG", "GbR", "GmbH"]:
|
||
personentyp = "gesellschaft"
|
||
elif personentyp_raw in ["Herrn", "Frau"]:
|
||
personentyp = "natuerlich"
|
||
else:
|
||
# Fallback: analyze the Nachname to detect companies
|
||
nachname_lower = nachname_raw.lower()
|
||
if any(
|
||
keyword in nachname_lower
|
||
for keyword in [
|
||
"kg",
|
||
"gbr",
|
||
"gmbh",
|
||
"ag",
|
||
"ohg",
|
||
"e.v.",
|
||
"stiftung",
|
||
"genossenschaft",
|
||
]
|
||
):
|
||
personentyp = "gesellschaft"
|
||
else:
|
||
personentyp = "natuerlich"
|
||
|
||
# Handle Vorname - keep original value unless it's 'N/A'
|
||
vorname = vorname_raw if vorname_raw and vorname_raw != "N/A" else ""
|
||
|
||
# Debug: Log processed values
|
||
error_log.append(
|
||
f"Zeile {row_num}: PROCESSED Vorname='{vorname}', Nachname='{nachname_raw}', Personentyp='{personentyp}'"
|
||
)
|
||
|
||
paechter_data = {
|
||
"vorname": vorname,
|
||
"nachname": nachname_raw,
|
||
"email": row.get("E-Mail", "").strip() or None,
|
||
"telefon": row.get("Telefon", "").strip() or None,
|
||
"iban": row.get("IBAN", "").strip() or None,
|
||
"strasse": row.get("Straße", "").strip() or None,
|
||
"plz": row.get("PLZ", "").strip() or None,
|
||
"ort": row.get("Ort", "").strip() or None,
|
||
"personentyp": personentyp,
|
||
"pachtnummer": row.get("Pachtnummer", "").strip() or None,
|
||
"landwirtschaftliche_ausbildung": row.get(
|
||
"Landwirtschaftliche_Ausbildung", "false"
|
||
).lower()
|
||
== "true",
|
||
"berufserfahrung_jahre": (
|
||
int(row.get("Berufserfahrung_Jahre", 0))
|
||
if row.get("Berufserfahrung_Jahre")
|
||
else None
|
||
),
|
||
"spezialisierung": row.get("Spezialisierung", "").strip() or None,
|
||
"notizen": row.get("Notizen", "").strip() or None,
|
||
"aktiv": row.get("Aktiv", "true").lower()
|
||
in ["true", "wahr", "ja", "1"],
|
||
}
|
||
|
||
# Handle date fields
|
||
if row.get("Geburtsdatum"):
|
||
try:
|
||
paechter_data["geburtsdatum"] = datetime.strptime(
|
||
row["Geburtsdatum"], "%d.%m.%Y"
|
||
).date()
|
||
except ValueError:
|
||
try:
|
||
paechter_data["geburtsdatum"] = datetime.strptime(
|
||
row["Geburtsdatum"], "%Y-%m-%d"
|
||
).date()
|
||
except ValueError:
|
||
paechter_data["geburtsdatum"] = None
|
||
|
||
if row.get("Pachtbeginn_Erste"):
|
||
try:
|
||
paechter_data["pachtbeginn_erste"] = datetime.strptime(
|
||
row["Pachtbeginn_Erste"], "%d.%m.%Y"
|
||
).date()
|
||
except ValueError:
|
||
try:
|
||
paechter_data["pachtbeginn_erste"] = datetime.strptime(
|
||
row["Pachtbeginn_Erste"], "%Y-%m-%d"
|
||
).date()
|
||
except ValueError:
|
||
paechter_data["pachtbeginn_erste"] = None
|
||
|
||
if row.get("Pachtende_Letzte"):
|
||
try:
|
||
paechter_data["pachtende_letzte"] = datetime.strptime(
|
||
row["Pachtende_Letzte"], "%d.%m.%Y"
|
||
).date()
|
||
except ValueError:
|
||
try:
|
||
paechter_data["pachtende_letzte"] = datetime.strptime(
|
||
row["Pachtende_Letzte"], "%Y-%m-%d"
|
||
).date()
|
||
except ValueError:
|
||
paechter_data["pachtende_letzte"] = None
|
||
|
||
# Handle decimal fields
|
||
if row.get("Pachtzins_Aktuell"):
|
||
try:
|
||
paechter_data["pachtzins_aktuell"] = float(row["Pachtzins_Aktuell"])
|
||
except ValueError:
|
||
paechter_data["pachtzins_aktuell"] = None
|
||
|
||
# Validate required fields
|
||
if personentyp == "gesellschaft":
|
||
# For companies, only Nachname is required
|
||
if not paechter_data["nachname"]:
|
||
error_log.append(
|
||
f"Zeile {row_num}: Nachname ist für Gesellschaften erforderlich"
|
||
)
|
||
failed_rows += 1
|
||
continue
|
||
else:
|
||
# For natural persons, only Nachname is required
|
||
if not paechter_data["nachname"]:
|
||
error_log.append(
|
||
f"Zeile {row_num}: Nachname ist für natürliche Personen erforderlich"
|
||
)
|
||
failed_rows += 1
|
||
continue
|
||
|
||
# Check if paechter already exists
|
||
if personentyp == "gesellschaft":
|
||
# For companies, search by Nachname only
|
||
existing_paechter = Paechter.objects.filter(
|
||
nachname__iexact=paechter_data["nachname"],
|
||
personentyp="gesellschaft",
|
||
).first()
|
||
else:
|
||
# For natural persons, search by Nachname only (since Vorname can be empty)
|
||
existing_paechter = Paechter.objects.filter(
|
||
nachname__iexact=paechter_data["nachname"], personentyp="natuerlich"
|
||
).first()
|
||
|
||
if existing_paechter:
|
||
# Update existing paechter
|
||
for field, value in paechter_data.items():
|
||
if value is not None:
|
||
setattr(existing_paechter, field, value)
|
||
existing_paechter.save()
|
||
else:
|
||
# Create new paechter
|
||
Paechter.objects.create(**paechter_data)
|
||
|
||
imported_rows += 1
|
||
|
||
except Exception as e:
|
||
error_log.append(f"Zeile {row_num}: {str(e)}")
|
||
failed_rows += 1
|
||
|
||
# Determine status
|
||
if failed_rows == 0:
|
||
status = "completed"
|
||
elif imported_rows > 0:
|
||
status = "partial"
|
||
else:
|
||
status = "failed"
|
||
|
||
return {
|
||
"total_rows": total_rows,
|
||
"imported_rows": imported_rows,
|
||
"failed_rows": failed_rows,
|
||
"error_log": "\n".join(error_log) if error_log else None,
|
||
"status": status,
|
||
}
|
||
|
||
|
||
def process_laendereien_csv(csv_file, csv_import):
|
||
"""Process CSV file for Ländereien import"""
|
||
decoded_file = csv_file.read().decode("utf-8")
|
||
# Handle both comma and semicolon separated files
|
||
if ";" in decoded_file.split("\n")[0]:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";")
|
||
else:
|
||
csv_data = csv.DictReader(io.StringIO(decoded_file))
|
||
|
||
total_rows = 0
|
||
imported_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
last_gemeinde = None
|
||
for row_num, row in enumerate(csv_data, start=2):
|
||
total_rows += 1
|
||
|
||
try:
|
||
# Build case-insensitive access helpers (strip BOM, normalize separators)
|
||
def clean_key(key: str) -> str:
|
||
return (key or "").replace("\ufeff", "").replace("\ufeff", "").strip()
|
||
|
||
normalized_row = {clean_key(k): (v or "").strip() for k, v in row.items()}
|
||
lower_row = {
|
||
clean_key(k).lower(): (v or "").strip() for k, v in row.items()
|
||
}
|
||
sanitized_row = {
|
||
clean_key(k)
|
||
.lower()
|
||
.replace("-", "_")
|
||
.replace(" ", "_"): (v or "")
|
||
.strip()
|
||
for k, v in row.items()
|
||
}
|
||
|
||
def get_val(*keys):
|
||
# Try exact keys first, then case-insensitive
|
||
for key in keys:
|
||
if key in normalized_row:
|
||
return normalized_row[key]
|
||
for key in keys:
|
||
lk = key.lower()
|
||
if lk in lower_row:
|
||
return lower_row[lk]
|
||
sk = lk.replace("-", "_").replace(" ", "_")
|
||
if sk in sanitized_row:
|
||
return sanitized_row[sk]
|
||
return ""
|
||
|
||
def parse_float(value):
|
||
if not value:
|
||
return 0
|
||
# replace comma decimal if present
|
||
v = (
|
||
value.replace(".", "").replace(",", ".")
|
||
if value.count(",") == 1 and value.count(".") > 1
|
||
else value.replace(",", ".")
|
||
)
|
||
try:
|
||
return float(v)
|
||
except ValueError:
|
||
return 0
|
||
|
||
# Map CSV columns to model fields (robust to header variants)
|
||
lfd_nr_val = get_val(
|
||
"Lfd_Nr",
|
||
"lfd_nr",
|
||
"LfdNr",
|
||
"lfdnr",
|
||
"laufende_nummer",
|
||
"laufende-nummer",
|
||
)
|
||
land_data = {
|
||
"lfd_nr": lfd_nr_val,
|
||
"ew_nummer": get_val("EW_Nummer", "ew_nummer") or None,
|
||
"amtsgericht": get_val("Amtsgericht", "amtsgericht"),
|
||
"gemeinde": get_val("Gemeinde", "gemeinde"),
|
||
"gemarkung": get_val("Gemarkung", "gemarkung"),
|
||
"flur": get_val("Flur", "flur"),
|
||
"flurstueck": get_val(
|
||
"Flurstück", "Flurstueck", "flurstück", "flurstueck"
|
||
),
|
||
"groesse_qm": parse_float(
|
||
get_val("Größe_qm", "Groesse_qm", "groesse_qm", "größe_qm")
|
||
),
|
||
"gruenland_qm": parse_float(
|
||
get_val(
|
||
"Grünland_qm", "Gruenland_qm", "gruenland_qm", "grünland_qm"
|
||
)
|
||
),
|
||
"acker_qm": parse_float(get_val("Acker_qm", "acker_qm")),
|
||
"wald_qm": parse_float(get_val("Wald_qm", "wald_qm")),
|
||
"sonstiges_qm": parse_float(get_val("Sonstiges_qm", "sonstiges_qm")),
|
||
"verpachtete_gesamtflaeche": parse_float(
|
||
get_val(
|
||
"Verpachtete_Gesamtfläche_qm",
|
||
"Verpachtete_Gesamtflaeche_qm",
|
||
"verpachtete_gesamtfläche_qm",
|
||
"verpachtete_gesamtflaeche_qm",
|
||
)
|
||
),
|
||
"verp_flaeche_aktuell": parse_float(
|
||
get_val(
|
||
"Verp_Fläche_aktuell_qm",
|
||
"Verp_Flaeche_aktuell_qm",
|
||
"verp_flaeche_aktuell_qm",
|
||
"verp_fläche_aktuell_qm",
|
||
)
|
||
),
|
||
"aktiv": get_val("Aktiv", "aktiv").lower()
|
||
in ["true", "wahr", "ja", "1"],
|
||
"notizen": get_val("Notizen", "notizen") or None,
|
||
}
|
||
|
||
# Fallback for missing 'Gemeinde' -> set explicit placeholder
|
||
if not land_data["gemeinde"]:
|
||
land_data["gemeinde"] = "FEHLT"
|
||
|
||
# Validate required fields
|
||
required_fields = ["lfd_nr", "gemeinde", "gemarkung", "flur", "flurstueck"]
|
||
missing_fields = [
|
||
field for field in required_fields if not land_data[field]
|
||
]
|
||
|
||
if missing_fields:
|
||
# Log header diagnostics on first failure only to help debugging
|
||
if row_num == 2:
|
||
error_log.append(f"Erkannte Spalten: {list(normalized_row.keys())}")
|
||
error_log.append(
|
||
f"Zeile {row_num}: Fehlende Pflichtfelder: {', '.join(missing_fields)}"
|
||
)
|
||
failed_rows += 1
|
||
continue
|
||
|
||
# Check if land already exists
|
||
existing_land = Land.objects.filter(lfd_nr=land_data["lfd_nr"]).first()
|
||
|
||
if existing_land:
|
||
# Update existing land
|
||
for field, value in land_data.items():
|
||
if value is not None:
|
||
setattr(existing_land, field, value)
|
||
existing_land.save()
|
||
else:
|
||
# Create new land
|
||
Land.objects.create(**land_data)
|
||
|
||
imported_rows += 1
|
||
if land_data["gemeinde"]:
|
||
last_gemeinde = land_data["gemeinde"]
|
||
|
||
except Exception as e:
|
||
error_log.append(f"Zeile {row_num}: {str(e)}")
|
||
failed_rows += 1
|
||
|
||
# Determine status
|
||
if failed_rows == 0:
|
||
status = "completed"
|
||
elif imported_rows > 0:
|
||
status = "partial"
|
||
else:
|
||
status = "failed"
|
||
|
||
return {
|
||
"total_rows": total_rows,
|
||
"imported_rows": imported_rows,
|
||
"failed_rows": failed_rows,
|
||
"error_log": "\n".join(error_log) if error_log else None,
|
||
"status": status,
|
||
}
|
||
|
||
|
||
# Person Views
|
||
@api_view(["GET"])
|
||
def gramps_search_api(request):
|
||
"""Probe-Endpoint: Suche Personen in Gramps Web nach q (Nachname, Vorname)."""
|
||
q = request.GET.get("q", "")
|
||
if not q:
|
||
return Response({"error": "Parameter q erforderlich"}, status=400)
|
||
client = get_gramps_client()
|
||
result = client.search_people(q)
|
||
return Response(result)
|
||
|
||
|
||
# Geschäftsführung Views
|
||
@login_required
|
||
def administration(request):
|
||
"""Administration Dashboard"""
|
||
from datetime import datetime, timedelta
|
||
|
||
from django.db.models import Count
|
||
|
||
from stiftung.models import AuditLog, BackupJob
|
||
|
||
# Recent audit activity
|
||
recent_audit = AuditLog.objects.all()[:10]
|
||
|
||
# Audit statistics
|
||
heute = datetime.now().date()
|
||
stats = {
|
||
"total_logs": AuditLog.objects.count(),
|
||
"logs_today": AuditLog.objects.filter(timestamp__date=heute).count(),
|
||
"logs_week": AuditLog.objects.filter(
|
||
timestamp__gte=heute - timedelta(days=7)
|
||
).count(),
|
||
"recent_backups": BackupJob.objects.all()[:5],
|
||
"last_backup": BackupJob.objects.filter(status="completed").first(),
|
||
}
|
||
|
||
# User activity summary
|
||
user_activity = (
|
||
AuditLog.objects.values("username")
|
||
.annotate(count=Count("id"))
|
||
.order_by("-count")[:10]
|
||
)
|
||
|
||
context = {
|
||
"recent_audit": recent_audit,
|
||
"stats": stats,
|
||
"user_activity": user_activity,
|
||
}
|
||
|
||
return render(request, "stiftung/administration.html", context)
|
||
|
||
|
||
@login_required
|
||
def audit_log_list(request):
|
||
"""Liste aller Audit Log Einträge"""
|
||
from django.core.paginator import Paginator
|
||
|
||
from stiftung.models import AuditLog
|
||
|
||
logs = AuditLog.objects.all()
|
||
|
||
# Filter
|
||
user_filter = request.GET.get("user")
|
||
if user_filter:
|
||
logs = logs.filter(username__icontains=user_filter)
|
||
|
||
action_filter = request.GET.get("action")
|
||
if action_filter:
|
||
logs = logs.filter(action=action_filter)
|
||
|
||
entity_filter = request.GET.get("entity_type")
|
||
if entity_filter:
|
||
logs = logs.filter(entity_type=entity_filter)
|
||
|
||
date_from = request.GET.get("date_from")
|
||
if date_from:
|
||
logs = logs.filter(timestamp__date__gte=date_from)
|
||
|
||
date_to = request.GET.get("date_to")
|
||
if date_to:
|
||
logs = logs.filter(timestamp__date__lte=date_to)
|
||
|
||
# Pagination
|
||
paginator = Paginator(logs, 50)
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
context = {
|
||
"page_obj": page_obj,
|
||
"action_choices": AuditLog.ACTION_TYPES,
|
||
"entity_choices": AuditLog.ENTITY_TYPES,
|
||
"user_filter": user_filter,
|
||
"action_filter": action_filter,
|
||
"entity_filter": entity_filter,
|
||
"date_from": date_from,
|
||
"date_to": date_to,
|
||
}
|
||
|
||
return render(request, "stiftung/audit_log_list.html", context)
|
||
|
||
|
||
@login_required
|
||
def backup_management(request):
|
||
"""Backup Management Interface"""
|
||
from django.core.paginator import Paginator
|
||
|
||
from stiftung.models import BackupJob
|
||
|
||
# Handle backup creation
|
||
if request.method == "POST":
|
||
backup_type = request.POST.get("backup_type", "full")
|
||
|
||
# Create backup job
|
||
backup_job = BackupJob.objects.create(
|
||
backup_type=backup_type, created_by=request.user
|
||
)
|
||
|
||
# Log the backup initiation
|
||
from stiftung.audit import log_system_action
|
||
|
||
log_system_action(
|
||
request=request,
|
||
action="backup",
|
||
description=f"Backup-Job erstellt: {backup_job.get_backup_type_display()}",
|
||
details={"backup_job_id": str(backup_job.id), "backup_type": backup_type},
|
||
)
|
||
|
||
# Start backup process asynchronously (we'll create a simple version for now)
|
||
import threading
|
||
|
||
from stiftung.backup_utils import run_backup
|
||
|
||
backup_thread = threading.Thread(target=run_backup, args=(str(backup_job.id),))
|
||
backup_thread.start()
|
||
|
||
messages.success(
|
||
request,
|
||
f'Backup-Job "{backup_job.get_backup_type_display()}" wurde gestartet.',
|
||
)
|
||
return redirect("stiftung:backup_management")
|
||
|
||
# List backup jobs
|
||
backup_jobs = BackupJob.objects.all()
|
||
|
||
# Pagination
|
||
paginator = Paginator(backup_jobs, 20)
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
context = {
|
||
"page_obj": page_obj,
|
||
"backup_types": BackupJob.TYPE_CHOICES,
|
||
}
|
||
|
||
return render(request, "stiftung/backup_management.html", context)
|
||
|
||
|
||
@login_required
|
||
def backup_download(request, backup_id):
|
||
"""Download a backup file"""
|
||
import os
|
||
|
||
from django.http import FileResponse, Http404
|
||
|
||
from stiftung.models import BackupJob
|
||
|
||
try:
|
||
backup_job = BackupJob.objects.get(id=backup_id, status="completed")
|
||
except BackupJob.DoesNotExist:
|
||
raise Http404("Backup nicht gefunden oder nicht vollständig")
|
||
|
||
backup_path = os.path.join("/app/backups", backup_job.backup_filename)
|
||
if not os.path.exists(backup_path):
|
||
raise Http404("Backup-Datei nicht gefunden")
|
||
|
||
# Log download
|
||
from stiftung.audit import log_system_action
|
||
|
||
log_system_action(
|
||
request=request,
|
||
action="export",
|
||
description=f"Backup heruntergeladen: {backup_job.backup_filename}",
|
||
details={"backup_job_id": str(backup_job.id)},
|
||
)
|
||
|
||
response = FileResponse(
|
||
open(backup_path, "rb"), as_attachment=True, filename=backup_job.backup_filename
|
||
)
|
||
return response
|
||
|
||
|
||
@login_required
|
||
def backup_restore(request):
|
||
"""Restore from backup"""
|
||
if request.method == "POST":
|
||
from stiftung.models import BackupJob
|
||
|
||
backup_file = request.FILES.get("backup_file")
|
||
|
||
if not backup_file:
|
||
messages.error(request, "Bitte wählen Sie eine Backup-Datei aus.")
|
||
return redirect("stiftung:backup_management")
|
||
|
||
# Validate file format
|
||
if not backup_file.name.endswith(".tar.gz"):
|
||
messages.error(
|
||
request, "Ungültiges Backup-Format. Nur .tar.gz Dateien sind erlaubt."
|
||
)
|
||
return redirect("stiftung:backup_management")
|
||
|
||
# Save uploaded file to temporary location
|
||
import os
|
||
import tempfile
|
||
|
||
temp_dir = tempfile.mkdtemp()
|
||
backup_path = os.path.join(temp_dir, backup_file.name)
|
||
|
||
try:
|
||
with open(backup_path, "wb+") as destination:
|
||
for chunk in backup_file.chunks():
|
||
destination.write(chunk)
|
||
|
||
# Validate the backup file
|
||
from stiftung.backup_utils import validate_backup_file
|
||
|
||
is_valid, message = validate_backup_file(backup_path)
|
||
if not is_valid:
|
||
messages.error(request, f"Ungültiges Backup: {message}")
|
||
return redirect("stiftung:backup_management")
|
||
|
||
# Show validation success
|
||
messages.info(request, f"Backup validiert: {message}")
|
||
|
||
# Create restore job
|
||
restore_job = BackupJob.objects.create(
|
||
operation="restore",
|
||
backup_type="full",
|
||
created_by=request.user,
|
||
backup_filename=backup_file.name,
|
||
)
|
||
|
||
# Log restore initiation
|
||
from stiftung.audit import log_system_action
|
||
|
||
log_system_action(
|
||
request=request,
|
||
action="restore",
|
||
description=f"Wiederherstellung gestartet von: {backup_file.name}",
|
||
details={
|
||
"restore_job_id": str(restore_job.id),
|
||
"filename": backup_file.name,
|
||
},
|
||
)
|
||
|
||
# Start restore process
|
||
import threading
|
||
|
||
from stiftung.backup_utils import run_restore
|
||
|
||
restore_thread = threading.Thread(
|
||
target=run_restore, args=(str(restore_job.id), backup_path)
|
||
)
|
||
restore_thread.start()
|
||
|
||
messages.success(
|
||
request, f'Wiederherstellung von "{backup_file.name}" wurde gestartet. '
|
||
f'Überwachen Sie den Fortschritt in der Backup-Historie.'
|
||
)
|
||
return redirect("stiftung:backup_management")
|
||
|
||
except Exception as e:
|
||
messages.error(request, f"Fehler beim Verarbeiten der Backup-Datei: {e}")
|
||
return redirect("stiftung:backup_management")
|
||
|
||
return redirect("stiftung:backup_management")
|
||
|
||
|
||
@login_required
|
||
def backup_cancel(request, backup_id):
|
||
"""Cancel a running backup job"""
|
||
from stiftung.models import BackupJob
|
||
import traceback
|
||
|
||
try:
|
||
print(f"DEBUG: Attempting to cancel backup job {backup_id}")
|
||
backup_job = BackupJob.objects.get(id=backup_id)
|
||
print(f"DEBUG: Found backup job - ID: {backup_job.id}, Status: {backup_job.status}")
|
||
|
||
# Use created_by_id instead of created_by to avoid triggering the foreign key lookup
|
||
print(f"DEBUG: Created by ID: {backup_job.created_by_id}, Current user ID: {request.user.id}")
|
||
|
||
# Only allow cancelling running or pending jobs
|
||
if backup_job.status not in ['running', 'pending']:
|
||
messages.error(request, "Nur laufende oder wartende Backups können abgebrochen werden.")
|
||
return redirect("stiftung:backup_management")
|
||
|
||
# Check if user has permission to cancel (either own job or admin)
|
||
# Use created_by_id to avoid database lookup for potentially non-existent user
|
||
print(f"DEBUG: Checking permissions - created_by_id: {backup_job.created_by_id}, is_staff: {request.user.is_staff}")
|
||
if backup_job.created_by_id is not None and backup_job.created_by_id != request.user.id and not request.user.is_staff:
|
||
messages.error(request, "Sie können nur Ihre eigenen Backup-Jobs abbrechen.")
|
||
return redirect("stiftung:backup_management")
|
||
|
||
# Mark as cancelled
|
||
print("DEBUG: About to mark job as cancelled")
|
||
from django.utils import timezone
|
||
backup_job.status = "cancelled"
|
||
backup_job.completed_at = timezone.now()
|
||
|
||
print(f"DEBUG: About to set error message with username: {request.user.username}")
|
||
backup_job.error_message = f"Abgebrochen von {request.user.username}"
|
||
|
||
print("DEBUG: About to save backup job")
|
||
backup_job.save()
|
||
print("DEBUG: Backup job saved successfully")
|
||
|
||
# Log the cancellation (with error handling)
|
||
try:
|
||
print("DEBUG: About to log system action")
|
||
from stiftung.audit import log_system_action
|
||
|
||
print(f"DEBUG: About to call get_backup_type_display")
|
||
backup_type_display = backup_job.get_backup_type_display()
|
||
print(f"DEBUG: Backup type display: {backup_type_display}")
|
||
|
||
log_system_action(
|
||
request=request,
|
||
action="backup_cancel",
|
||
description=f"Backup-Job abgebrochen: {backup_type_display}",
|
||
details={"backup_job_id": str(backup_job.id)},
|
||
)
|
||
print("DEBUG: System action logged successfully")
|
||
except Exception as audit_error:
|
||
print(f"ERROR in audit logging: {audit_error}")
|
||
print(f"ERROR traceback: {traceback.format_exc()}")
|
||
# Don't fail the cancellation if logging fails
|
||
|
||
messages.success(request, f"Backup-Job wurde abgebrochen.")
|
||
|
||
except BackupJob.DoesNotExist:
|
||
print(f"ERROR: Backup job {backup_id} not found")
|
||
messages.error(request, "Backup-Job nicht gefunden.")
|
||
except Exception as e:
|
||
print(f"ERROR: Unexpected error in backup_cancel: {e}")
|
||
print(f"ERROR traceback: {traceback.format_exc()}")
|
||
messages.error(request, f"Fehler beim Abbrechen des Backup-Jobs: {e}")
|
||
|
||
return redirect("stiftung:backup_management")
|
||
|
||
|
||
# =============================================================================
|
||
# USER MANAGEMENT VIEWS
|
||
# =============================================================================
|
||
|
||
|
||
@login_required
|
||
def user_management(request):
|
||
"""User Management Dashboard"""
|
||
from django.contrib.auth.models import User
|
||
from django.core.paginator import Paginator
|
||
from django.db.models import Q
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_users"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Benutzerverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
users = User.objects.all().order_by("username")
|
||
|
||
# Search functionality
|
||
search = request.GET.get("search")
|
||
if search:
|
||
users = users.filter(
|
||
Q(username__icontains=search)
|
||
| Q(email__icontains=search)
|
||
| Q(first_name__icontains=search)
|
||
| Q(last_name__icontains=search)
|
||
)
|
||
|
||
# Filter by status
|
||
status_filter = request.GET.get("status")
|
||
if status_filter == "active":
|
||
users = users.filter(is_active=True)
|
||
elif status_filter == "inactive":
|
||
users = users.filter(is_active=False)
|
||
elif status_filter == "staff":
|
||
users = users.filter(is_staff=True)
|
||
|
||
# Pagination
|
||
paginator = Paginator(users, 20)
|
||
page_number = request.GET.get("page")
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
# Statistics
|
||
stats = {
|
||
"total_users": User.objects.count(),
|
||
"active_users": User.objects.filter(is_active=True).count(),
|
||
"staff_users": User.objects.filter(is_staff=True).count(),
|
||
"inactive_users": User.objects.filter(is_active=False).count(),
|
||
}
|
||
|
||
context = {
|
||
"page_obj": page_obj,
|
||
"stats": stats,
|
||
"search": search,
|
||
"status_filter": status_filter,
|
||
}
|
||
|
||
return render(request, "stiftung/user_management.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_create(request):
|
||
"""Create a new user"""
|
||
from django.contrib.auth.models import User
|
||
|
||
from stiftung.forms import UserCreationForm
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_users"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Benutzerverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
if request.method == "POST":
|
||
form = UserCreationForm(request.POST)
|
||
if form.is_valid():
|
||
# Create user
|
||
user = User.objects.create_user(
|
||
username=form.cleaned_data["username"],
|
||
email=form.cleaned_data["email"],
|
||
password=form.cleaned_data["password1"],
|
||
first_name=form.cleaned_data["first_name"],
|
||
last_name=form.cleaned_data["last_name"],
|
||
is_active=form.cleaned_data["is_active"],
|
||
is_staff=form.cleaned_data["is_staff"],
|
||
)
|
||
|
||
# Log user creation
|
||
from stiftung.audit import log_action
|
||
|
||
log_action(
|
||
request=request,
|
||
action="create",
|
||
entity_type="user",
|
||
entity_id=str(user.pk),
|
||
entity_name=user.username,
|
||
description=f'Neuer Benutzer "{user.username}" wurde erstellt',
|
||
)
|
||
|
||
messages.success(
|
||
request, f'Benutzer "{user.username}" wurde erfolgreich erstellt.'
|
||
)
|
||
return redirect("stiftung:user_detail", pk=user.pk)
|
||
else:
|
||
form = UserCreationForm()
|
||
|
||
context = {"form": form, "title": "Neuen Benutzer erstellen"}
|
||
|
||
return render(request, "stiftung/user_form.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_detail(request, pk):
|
||
"""User detail view"""
|
||
from django.contrib.auth.models import User
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_users"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Benutzerverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
user = get_object_or_404(User, pk=pk)
|
||
|
||
# Get user's permissions
|
||
user_permissions = user.get_all_permissions()
|
||
stiftung_permissions = [
|
||
perm for perm in user_permissions if perm.startswith("stiftung.")
|
||
]
|
||
|
||
# Get recent audit activity
|
||
from stiftung.models import AuditLog
|
||
|
||
recent_activity = AuditLog.objects.filter(user=user).order_by("-timestamp")[:10]
|
||
|
||
context = {
|
||
"user_obj": user, # Use user_obj to avoid conflict with request.user
|
||
"stiftung_permissions": stiftung_permissions,
|
||
"recent_activity": recent_activity,
|
||
}
|
||
|
||
return render(request, "stiftung/user_detail.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_edit(request, pk):
|
||
"""Edit user"""
|
||
from django.contrib.auth.models import User
|
||
|
||
from stiftung.forms import UserUpdateForm
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_users"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Benutzerverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
user = get_object_or_404(User, pk=pk)
|
||
|
||
if request.method == "POST":
|
||
form = UserUpdateForm(request.POST, instance=user)
|
||
if form.is_valid():
|
||
# Track changes
|
||
from stiftung.audit import log_action, track_model_changes
|
||
|
||
old_user = User.objects.get(pk=user.pk)
|
||
|
||
updated_user = form.save()
|
||
|
||
# Log changes
|
||
changes = track_model_changes(old_user, updated_user)
|
||
if changes:
|
||
log_action(
|
||
request=request,
|
||
action="update",
|
||
entity_type="user",
|
||
entity_id=str(updated_user.pk),
|
||
entity_name=updated_user.username,
|
||
description=f'Benutzer "{updated_user.username}" wurde aktualisiert',
|
||
changes=changes,
|
||
)
|
||
|
||
messages.success(
|
||
request,
|
||
f'Benutzer "{updated_user.username}" wurde erfolgreich aktualisiert.',
|
||
)
|
||
return redirect("stiftung:user_detail", pk=updated_user.pk)
|
||
else:
|
||
form = UserUpdateForm(instance=user)
|
||
|
||
context = {
|
||
"form": form,
|
||
"user_obj": user,
|
||
"title": f'Benutzer "{user.username}" bearbeiten',
|
||
}
|
||
|
||
return render(request, "stiftung/user_form.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_change_password(request, pk):
|
||
"""Change user password"""
|
||
from django.contrib.auth.models import User
|
||
|
||
from stiftung.forms import PasswordChangeForm
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_users"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Benutzerverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
user = get_object_or_404(User, pk=pk)
|
||
|
||
if request.method == "POST":
|
||
form = PasswordChangeForm(request.POST)
|
||
if form.is_valid():
|
||
user.set_password(form.cleaned_data["new_password1"])
|
||
user.save()
|
||
|
||
# Log password change
|
||
from stiftung.audit import log_action
|
||
|
||
log_action(
|
||
request=request,
|
||
action="update",
|
||
entity_type="user",
|
||
entity_id=str(user.pk),
|
||
entity_name=user.username,
|
||
description=f'Passwort für Benutzer "{user.username}" wurde geändert',
|
||
)
|
||
|
||
messages.success(
|
||
request,
|
||
f'Passwort für Benutzer "{user.username}" wurde erfolgreich geändert.',
|
||
)
|
||
return redirect("stiftung:user_detail", pk=user.pk)
|
||
else:
|
||
form = PasswordChangeForm()
|
||
|
||
context = {
|
||
"form": form,
|
||
"user_obj": user,
|
||
"title": f'Passwort für "{user.username}" ändern',
|
||
}
|
||
|
||
return render(request, "stiftung/user_change_password.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_permissions(request, pk):
|
||
"""Manage user permissions"""
|
||
from django.contrib.auth.models import Permission, User
|
||
|
||
from stiftung.forms import UserPermissionForm
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_permissions"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Berechtigungsverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
user = get_object_or_404(User, pk=pk)
|
||
|
||
if request.method == "POST":
|
||
form = UserPermissionForm(request.POST, user=user)
|
||
if form.is_valid():
|
||
# Get selected permissions
|
||
selected_perms = []
|
||
for field_name, value in form.cleaned_data.items():
|
||
if field_name.startswith("perm_") and value:
|
||
perm_id = field_name.replace("perm_", "")
|
||
selected_perms.append(int(perm_id))
|
||
|
||
# Get current stiftung permissions
|
||
current_perms = user.user_permissions.filter(
|
||
content_type__app_label="stiftung"
|
||
)
|
||
current_perm_ids = set(current_perms.values_list("id", flat=True))
|
||
selected_perm_ids = set(selected_perms)
|
||
|
||
# Remove permissions that are no longer selected
|
||
to_remove = current_perm_ids - selected_perm_ids
|
||
if to_remove:
|
||
user.user_permissions.remove(
|
||
*Permission.objects.filter(id__in=to_remove)
|
||
)
|
||
|
||
# Add new permissions
|
||
to_add = selected_perm_ids - current_perm_ids
|
||
if to_add:
|
||
user.user_permissions.add(*Permission.objects.filter(id__in=to_add))
|
||
|
||
# Log permission changes
|
||
from stiftung.audit import log_action
|
||
|
||
if to_remove or to_add:
|
||
changes = {
|
||
"removed_permissions": list(
|
||
Permission.objects.filter(id__in=to_remove).values_list(
|
||
"name", flat=True
|
||
)
|
||
),
|
||
"added_permissions": list(
|
||
Permission.objects.filter(id__in=to_add).values_list(
|
||
"name", flat=True
|
||
)
|
||
),
|
||
}
|
||
log_action(
|
||
request=request,
|
||
action="update",
|
||
entity_type="user",
|
||
entity_id=str(user.pk),
|
||
entity_name=user.username,
|
||
description=f'Berechtigungen für Benutzer "{user.username}" wurden aktualisiert',
|
||
changes=changes,
|
||
)
|
||
|
||
messages.success(
|
||
request,
|
||
f'Berechtigungen für Benutzer "{user.username}" wurden erfolgreich aktualisiert.',
|
||
)
|
||
return redirect("stiftung:user_detail", pk=user.pk)
|
||
else:
|
||
form = UserPermissionForm(user=user)
|
||
|
||
context = {
|
||
"form": form,
|
||
"user_obj": user,
|
||
"permission_groups": form.get_permission_groups(),
|
||
"title": f'Berechtigungen für "{user.username}"',
|
||
}
|
||
|
||
return render(request, "stiftung/user_permissions.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_delete(request, pk):
|
||
"""Delete user"""
|
||
from django.contrib.auth.models import User
|
||
|
||
# Check permission
|
||
if not request.user.has_perm("stiftung.manage_users"):
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung für die Benutzerverwaltung."
|
||
)
|
||
return redirect("stiftung:administration")
|
||
|
||
user = get_object_or_404(User, pk=pk)
|
||
|
||
# Prevent deletion of current user
|
||
if user == request.user:
|
||
messages.error(request, "Sie können sich nicht selbst löschen.")
|
||
return redirect("stiftung:user_detail", pk=pk)
|
||
|
||
if request.method == "POST":
|
||
username = user.username
|
||
|
||
# Log deletion before deleting
|
||
from stiftung.audit import log_action
|
||
|
||
log_action(
|
||
request=request,
|
||
action="delete",
|
||
entity_type="user",
|
||
entity_id=str(user.pk),
|
||
entity_name=username,
|
||
description=f'Benutzer "{username}" wurde gelöscht',
|
||
)
|
||
|
||
user.delete()
|
||
|
||
messages.success(request, f'Benutzer "{username}" wurde erfolgreich gelöscht.')
|
||
return redirect("stiftung:user_management")
|
||
|
||
context = {"user_obj": user, "title": f'Benutzer "{user.username}" löschen'}
|
||
|
||
return render(request, "stiftung/user_delete.html", context)
|
||
|
||
|
||
# =============================================================================
|
||
# AUTHENTICATION VIEWS
|
||
# =============================================================================
|
||
|
||
|
||
def user_login(request):
|
||
"""User login view"""
|
||
from django.contrib.auth import authenticate, login
|
||
from django.contrib.auth.forms import AuthenticationForm
|
||
|
||
if request.user.is_authenticated:
|
||
return redirect("stiftung:home")
|
||
|
||
if request.method == "POST":
|
||
form = AuthenticationForm(request, data=request.POST)
|
||
if form.is_valid():
|
||
username = form.cleaned_data.get("username")
|
||
password = form.cleaned_data.get("password")
|
||
user = authenticate(username=username, password=password)
|
||
if user is not None:
|
||
login(request, user)
|
||
|
||
# Log the login
|
||
from stiftung.audit import log_login
|
||
|
||
log_login(request, user)
|
||
|
||
# Determine redirect target
|
||
next_param = request.GET.get("next") or request.POST.get("next")
|
||
if not next_param or not next_param.startswith("/"):
|
||
next_param = reverse("stiftung:home")
|
||
|
||
# Check if user has 2FA enabled - redirect to verification first
|
||
has_2fa = TOTPDevice.objects.filter(user=user, confirmed=True).exists()
|
||
if has_2fa:
|
||
from urllib.parse import urlencode
|
||
verify_url = reverse("stiftung:two_factor_verify") + "?" + urlencode({"next": next_param})
|
||
return redirect(verify_url)
|
||
|
||
messages.success(request, f"Willkommen zurück, {user.username}!")
|
||
return redirect(next_param)
|
||
else:
|
||
messages.error(request, "Ungültige Anmeldedaten.")
|
||
else:
|
||
messages.error(request, "Bitte korrigieren Sie die Fehler im Formular.")
|
||
else:
|
||
form = AuthenticationForm()
|
||
|
||
context = {"form": form, "next": request.GET.get("next", "")}
|
||
|
||
return render(request, "stiftung/login.html", context)
|
||
|
||
|
||
@login_required
|
||
def user_logout(request):
|
||
"""User logout view"""
|
||
from django.contrib.auth import logout
|
||
|
||
# Log the logout before actually logging out
|
||
from stiftung.audit import log_logout
|
||
|
||
log_logout(request, request.user)
|
||
|
||
username = request.user.username
|
||
logout(request)
|
||
|
||
messages.success(request, f"Sie wurden erfolgreich abgemeldet, {username}.")
|
||
return redirect("stiftung:login")
|
||
|
||
|
||
# ============================================================================
|
||
# LANDABRECHNUNGS VIEWS
|
||
# ============================================================================
|
||
|
||
|
||
@login_required
|
||
def app_settings(request):
|
||
"""Application settings management interface"""
|
||
|
||
# Group settings by category
|
||
categories = {}
|
||
for setting in AppConfiguration.objects.filter(is_active=True).order_by(
|
||
"category", "order", "display_name"
|
||
):
|
||
if setting.category not in categories:
|
||
categories[setting.category] = []
|
||
categories[setting.category].append(setting)
|
||
|
||
if request.method == "POST":
|
||
# Handle form submission
|
||
updated_count = 0
|
||
for key, value in request.POST.items():
|
||
if key.startswith("setting_"):
|
||
setting_key = key.replace("setting_", "")
|
||
try:
|
||
setting = AppConfiguration.objects.get(
|
||
key=setting_key, is_active=True
|
||
)
|
||
if not setting.is_system and setting.value != value:
|
||
setting.value = value
|
||
setting.save()
|
||
updated_count += 1
|
||
except AppConfiguration.DoesNotExist:
|
||
continue
|
||
|
||
if updated_count > 0:
|
||
messages.success(request, f"Successfully updated {updated_count} settings!")
|
||
else:
|
||
messages.info(request, "No changes were made.")
|
||
|
||
return redirect("stiftung:app_settings")
|
||
|
||
context = {
|
||
"categories": categories,
|
||
"title": "Application Settings",
|
||
}
|
||
return render(request, "stiftung/app_settings.html", context)
|
||
|
||
|
||
# Unterstützungen Views (Destinataer-focused)
|
||
@login_required
|
||
def edit_help_box(request):
|
||
"""Bearbeite oder erstelle eine Hilfs-Infobox"""
|
||
from stiftung.models import HelpBox
|
||
|
||
# Nur root oder Superuser dürfen bearbeiten
|
||
if request.user.username != "root" and not request.user.is_superuser:
|
||
messages.error(
|
||
request, "Sie haben keine Berechtigung, Hilfsboxen zu bearbeiten."
|
||
)
|
||
return redirect("stiftung:home")
|
||
|
||
if request.method == "POST":
|
||
page_key = request.POST.get("page_key")
|
||
title = request.POST.get("title")
|
||
content = request.POST.get("content")
|
||
is_active = request.POST.get("is_active") == "on"
|
||
|
||
if not page_key or not title or not content:
|
||
messages.error(request, "Alle Felder sind erforderlich.")
|
||
return redirect(request.META.get("HTTP_REFERER", "stiftung:home"))
|
||
|
||
# Hilfsbox erstellen oder aktualisieren
|
||
help_box, created = HelpBox.objects.get_or_create(
|
||
page_key=page_key,
|
||
defaults={
|
||
"title": title,
|
||
"content": content,
|
||
"is_active": is_active,
|
||
"created_by": request.user.username,
|
||
"updated_by": request.user.username,
|
||
},
|
||
)
|
||
|
||
if not created:
|
||
# Existierende Hilfsbox aktualisieren
|
||
help_box.title = title
|
||
help_box.content = content
|
||
help_box.is_active = is_active
|
||
help_box.updated_by = request.user.username
|
||
help_box.save()
|
||
|
||
messages.success(request, f'Hilfsbox "{title}" wurde aktualisiert.')
|
||
else:
|
||
messages.success(request, f'Hilfsbox "{title}" wurde erstellt.')
|
||
|
||
# Zurück zur vorherigen Seite
|
||
return redirect(request.META.get("HTTP_REFERER", "stiftung:home"))
|
||
|
||
# GET Request - Zeige Admin-Übersicht der Hilfsboxen
|
||
help_boxes = HelpBox.objects.all().order_by("page_key", "-updated_at")
|
||
|
||
# Statistiken berechnen
|
||
active_count = help_boxes.filter(is_active=True).count()
|
||
inactive_count = help_boxes.filter(is_active=False).count()
|
||
existing_pages = set(help_boxes.values_list("page_key", flat=True))
|
||
|
||
# Verfügbare Seiten aus dem Model holen
|
||
available_pages = HelpBox.PAGE_CHOICES
|
||
|
||
context = {
|
||
"help_boxes": help_boxes,
|
||
"active_count": active_count,
|
||
"inactive_count": inactive_count,
|
||
"existing_pages": existing_pages,
|
||
"available_pages": available_pages,
|
||
"title": "Hilfs-Infoboxen verwalten",
|
||
}
|
||
return render(request, "stiftung/help_boxes_admin.html", context)
|
||
|
||
|
||
# =============================================================================
|
||
# Verpachtung Management Views (Standalone CRUD)
|
||
# =============================================================================
|
||
|
||
@login_required
|
||
def two_factor_setup(request):
|
||
"""Setup or manage TOTP 2FA for the current user"""
|
||
|
||
# Check if user already has TOTP device
|
||
device = TOTPDevice.objects.filter(user=request.user, confirmed=True).first()
|
||
static_device = StaticDevice.objects.filter(user=request.user).first()
|
||
|
||
if device:
|
||
# User has 2FA enabled - show management options
|
||
context = {
|
||
'has_2fa': True,
|
||
'device': device,
|
||
'backup_token_count': static_device.token_set.count() if static_device else 0,
|
||
'title': 'Zwei-Faktor-Authentifizierung verwalten'
|
||
}
|
||
return render(request, 'stiftung/auth/two_factor_manage.html', context)
|
||
|
||
# User doesn't have 2FA - show setup
|
||
# Get or create unconfirmed TOTP device
|
||
device, created = TOTPDevice.objects.get_or_create(
|
||
user=request.user,
|
||
name='default',
|
||
defaults={'confirmed': False}
|
||
)
|
||
|
||
if request.method == "POST":
|
||
token = request.POST.get('token', '').strip()
|
||
if device.verify_token(token):
|
||
device.confirmed = True
|
||
device.save()
|
||
|
||
# Generate backup tokens
|
||
static_device = StaticDevice.objects.create(
|
||
user=request.user,
|
||
name='backup'
|
||
)
|
||
|
||
backup_tokens = []
|
||
for _ in range(10): # Generate 10 backup codes
|
||
token_value = random_hex()[:8] # 8 character backup codes
|
||
StaticToken.objects.create(
|
||
device=static_device,
|
||
token=token_value
|
||
)
|
||
backup_tokens.append(token_value)
|
||
|
||
messages.success(
|
||
request,
|
||
"Zwei-Faktor-Authentifizierung wurde erfolgreich aktiviert! "
|
||
"Bitte speichern Sie Ihre Backup-Codes sicher."
|
||
)
|
||
|
||
return render(request, 'stiftung/auth/backup_tokens.html', {
|
||
'backup_tokens': backup_tokens,
|
||
'title': 'Backup-Codes'
|
||
})
|
||
else:
|
||
messages.error(request, "Ungültiger Bestätigungscode. Bitte versuchen Sie es erneut.")
|
||
|
||
# Generate QR code URL
|
||
qr_url = device.config_url
|
||
|
||
context = {
|
||
'device': device,
|
||
'qr_url': qr_url,
|
||
'title': 'Zwei-Faktor-Authentifizierung einrichten'
|
||
}
|
||
|
||
return render(request, 'stiftung/auth/two_factor_setup.html', context)
|
||
|
||
|
||
@login_required
|
||
def two_factor_qr(request):
|
||
"""Generate QR code for TOTP setup"""
|
||
device = TOTPDevice.objects.filter(user=request.user, confirmed=False).first()
|
||
|
||
if not device:
|
||
return HttpResponse("Kein Setup-Device gefunden", status=404)
|
||
|
||
# Generate QR code
|
||
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
||
qr.add_data(device.config_url)
|
||
qr.make(fit=True)
|
||
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
|
||
response = HttpResponse(content_type="image/png")
|
||
img.save(response, "PNG")
|
||
|
||
return response
|
||
|
||
|
||
@login_required
|
||
def two_factor_verify(request):
|
||
"""Verify TOTP token during login process"""
|
||
if request.method == "POST":
|
||
token = request.POST.get('otp_token', '').strip()
|
||
|
||
# Check TOTP devices
|
||
devices = TOTPDevice.objects.filter(user=request.user, confirmed=True)
|
||
for device in devices:
|
||
if device.verify_token(token):
|
||
request.session['2fa_verified'] = True
|
||
messages.success(request, "Zwei-Faktor-Authentifizierung erfolgreich.")
|
||
return redirect(request.GET.get('next', 'stiftung:home'))
|
||
|
||
# Check static backup tokens
|
||
static_devices = StaticDevice.objects.filter(user=request.user)
|
||
for device in static_devices:
|
||
if device.verify_token(token):
|
||
request.session['2fa_verified'] = True
|
||
messages.success(request, "Backup-Code erfolgreich verwendet.")
|
||
return redirect(request.GET.get('next', 'stiftung:home'))
|
||
|
||
messages.error(request, "Ungültiger Code. Bitte versuchen Sie es erneut.")
|
||
|
||
context = {
|
||
'title': 'Zwei-Faktor-Authentifizierung',
|
||
'next': request.GET.get('next', '')
|
||
}
|
||
|
||
return render(request, 'stiftung/auth/two_factor_verify.html', context)
|
||
|
||
|
||
@login_required
|
||
def two_factor_disable(request):
|
||
"""Disable TOTP 2FA for the current user"""
|
||
if request.method == "POST":
|
||
password = request.POST.get('password', '')
|
||
|
||
if request.user.check_password(password):
|
||
# Remove all TOTP devices
|
||
TOTPDevice.objects.filter(user=request.user).delete()
|
||
|
||
# Remove all static backup token devices
|
||
StaticDevice.objects.filter(user=request.user).delete()
|
||
|
||
messages.success(
|
||
request,
|
||
"Zwei-Faktor-Authentifizierung wurde erfolgreich deaktiviert."
|
||
)
|
||
return redirect("stiftung:home")
|
||
else:
|
||
messages.error(request, "Ungültiges Passwort.")
|
||
|
||
context = {
|
||
'title': 'Zwei-Faktor-Authentifizierung deaktivieren'
|
||
}
|
||
|
||
return render(request, 'stiftung/auth/two_factor_disable.html', context)
|
||
|
||
|
||
@login_required
|
||
def backup_tokens(request):
|
||
"""Display or regenerate backup tokens"""
|
||
static_device = StaticDevice.objects.filter(user=request.user).first()
|
||
|
||
if request.method == "POST" and 'regenerate' in request.POST:
|
||
password = request.POST.get('password', '')
|
||
|
||
if request.user.check_password(password):
|
||
# Delete old tokens
|
||
if static_device:
|
||
static_device.delete()
|
||
|
||
# Generate new backup tokens
|
||
static_device = StaticDevice.objects.create(
|
||
user=request.user,
|
||
name='backup'
|
||
)
|
||
|
||
backup_tokens = []
|
||
for _ in range(10): # Generate 10 backup codes
|
||
token_value = random_hex()[:8] # 8 character backup codes
|
||
StaticToken.objects.create(
|
||
device=static_device,
|
||
token=token_value
|
||
)
|
||
backup_tokens.append(token_value)
|
||
|
||
messages.success(
|
||
request,
|
||
"Neue Backup-Codes wurden generiert. Bitte speichern Sie diese sicher."
|
||
)
|
||
|
||
context = {
|
||
'backup_tokens': backup_tokens,
|
||
'title': 'Neue Backup-Codes'
|
||
}
|
||
|
||
return render(request, 'stiftung/auth/backup_tokens.html', context)
|
||
else:
|
||
messages.error(request, "Ungültiges Passwort.")
|
||
|
||
# Show existing tokens (count only for security)
|
||
token_count = 0
|
||
if static_device:
|
||
token_count = static_device.token_set.count()
|
||
|
||
context = {
|
||
'token_count': token_count,
|
||
'has_tokens': token_count > 0,
|
||
'title': 'Backup-Codes'
|
||
}
|
||
|
||
return render(request, 'stiftung/auth/backup_tokens_manage.html', context)
|
||
|
||
|
||
# Geschichte (History) Views
|
||
from stiftung.models import GeschichteSeite, GeschichteBild
|
||
from stiftung.forms import GeschichteSeiteForm, GeschichteBildForm
|
||
|
||
|