# views/system.py # Phase 0: Vision 2026 – Code-Refactoring import csv import io import json import os import time from datetime import datetime, timedelta, date from decimal import Decimal import qrcode import qrcode.image.svg import requests from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator from django.db.models import (Avg, Count, DecimalField, F, IntegerField, Q, Sum, Value) from django.db.models.functions import Cast, Coalesce, NullIf, Replace from django.http import HttpResponse, JsonResponse from django.shortcuts import get_object_or_404, redirect, render from django.urls import reverse from django.utils import timezone from django.views.decorators.csrf import csrf_exempt from django_otp.decorators import otp_required from django_otp.plugins.otp_totp.models import TOTPDevice from django_otp.plugins.otp_static.models import StaticDevice, StaticToken from django_otp.util import random_hex from rest_framework.decorators import api_view from rest_framework.response import Response from stiftung.models import (AppConfiguration, AuditLog, BackupJob, BankTransaction, BriefVorlage, CSVImport, Destinataer, DestinataerEmailEingang, DestinataerNotiz, DestinataerUnterstuetzung, DokumentLink, Foerderung, GeschichteBild, GeschichteSeite, Land, LandAbrechnung, LandVerpachtung, Paechter, Person, Rentmeister, StiftungsKalenderEintrag, StiftungsKonto, UnterstuetzungWiederkehrend, Veranstaltung, Veranstaltungsteilnehmer, Verwaltungskosten, VierteljahresNachweis) from stiftung.forms import ( DestinataerForm, DestinataerUnterstuetzungForm, DestinataerNotizForm, FoerderungForm, GeschichteBildForm, GeschichteSeiteForm, LandForm, LandVerpachtungForm, LandAbrechnungForm, PaechterForm, DokumentLinkForm, RentmeisterForm, StiftungsKontoForm, VerwaltungskostenForm, BankTransactionForm, BankImportForm, UnterstuetzungForm, UnterstuetzungWiederkehrendForm, UnterstuetzungMarkAsPaidForm, VierteljahresNachweisForm, UserCreationForm, UserUpdateForm, PasswordChangeForm, UserPermissionForm, TwoFactorSetupForm, TwoFactorVerifyForm, TwoFactorDisableForm, BackupTokenRegenerateForm, PersonForm, VeranstaltungForm, VeranstaltungsteilnehmerForm, ) def get_pdf_generator(): """Lazy load PDF generator to handle missing dependencies gracefully""" try: from .utils.pdf_generator import pdf_generator return pdf_generator except ImportError as e: # Store the error message for use in MockPDFGenerator error_message = str(e) # Return a mock generator if dependencies are missing class MockPDFGenerator: def is_available(self): return False def export_data_list_pdf(self, *args, **kwargs): from django.http import HttpResponse error_html = f"""
PDF generation requires additional system dependencies that are not installed.
Error: {error_message}
Please install WeasyPrint dependencies or use CSV export instead.
""" response = HttpResponse(error_html, content_type="text/html") response["Content-Disposition"] = ( 'inline; filename="pdf_not_available.html"' ) return response return MockPDFGenerator() class GrampsClient: """Lightweight client for Gramps Web API.""" def __init__( self, base_url: str, token: str = "", username: str = "", password: str = "" ): self.base_url = base_url.rstrip("/") self.session = requests.Session() if token: self.session.headers.update({"Authorization": f"Bearer {token}"}) self.username = username self.password = password self._cached_token = token def search_people(self, query: str, limit: int = 5): try: r = self.session.get( f"{self.base_url}/api/people/", params={"q": query, "limit": limit}, timeout=10, ) r.raise_for_status() return r.json() except Exception as e: # try login-once if unauthorized and we have credentials if self.username and self.password and "401" in str(e): if self._login(): return self.search_people(query, limit) return {"error": str(e)} def get_person(self, handle_or_id: str): try: r = self.session.get( f"{self.base_url}/api/people/{handle_or_id}", timeout=10 ) r.raise_for_status() return r.json() except Exception as e: if self.username and self.password and "401" in str(e): if self._login(): return self.get_person(handle_or_id) return {"error": str(e)} def _login(self) -> bool: try: # try common endpoints endpoints = [ ( "/api/auth/login", {"username": self.username, "password": self.password}, "json", ), ( "/auth/login", {"username": self.username, "password": self.password}, "json", ), ( "/api/token", {"username": self.username, "password": self.password}, "form", ), ( "/login", {"username": self.username, "password": self.password}, "form", ), ( "/token", {"username": self.username, "password": self.password}, "form", ), ( "/api/login", {"username": self.username, "password": self.password}, "json", ), ] for path, payload, mode in endpoints: url = f"{self.base_url}{path}" if mode == "json": r = self.session.post( url, json=payload, timeout=10, allow_redirects=False ) else: r = self.session.post( url, data=payload, timeout=10, allow_redirects=False ) # Success with token body if r.status_code in (200, 201) and "application/json" in r.headers.get( "Content-Type", "" ): data = r.json() token = ( data.get("access_token") or data.get("token") or data.get("access") or data.get("jwt") ) if token: self._cached_token = token self.session.headers.update( {"Authorization": f"Bearer {token}"} ) return True # Success via session cookie and redirect if r.status_code in (200, 302) and ( "set-cookie" in {k.lower(): v for k, v in r.headers.items()} ): return True # Basic Auth fallback (some setups protect API with Basic) try: self.session.auth = (self.username, self.password) r = self.session.get(f"{self.base_url}/api/people/?limit=1", timeout=10) if r.status_code == 200: return True except Exception: pass return False except Exception: return False def get_gramps_client() -> GrampsClient: return GrampsClient( getattr(settings, "GRAMPS_URL", ""), getattr(settings, "GRAMPS_API_TOKEN", ""), getattr(settings, "GRAMPS_USERNAME", ""), getattr(settings, "GRAMPS_PASSWORD", ""), ) @api_view(["GET"]) def gramps_debug_api(_request): return Response( { "GRAMPS_URL": getattr(settings, "GRAMPS_URL", ""), "has_username": bool(getattr(settings, "GRAMPS_USERNAME", "")), "has_password": bool(getattr(settings, "GRAMPS_PASSWORD", "")), } ) from stiftung.models import DestinataerNotiz, DestinataerUnterstuetzung from stiftung.forms import (DestinataerForm, DestinataerNotizForm, DestinataerUnterstuetzungForm, DokumentLinkForm, FoerderungForm, LandForm, PaechterForm, PersonForm, UnterstuetzungForm, UnterstuetzungMarkAsPaidForm, VierteljahresNachweisForm) @login_required def csv_import_list(request): """List all CSV import operations""" imports = CSVImport.objects.all().order_by("-started_at") paginator = Paginator(imports, 20) page_number = request.GET.get("page") page_obj = paginator.get_page(page_number) context = { "page_obj": page_obj, "import_types": CSVImport.IMPORT_TYPE_CHOICES, "status_choices": CSVImport.STATUS_CHOICES, } return render(request, "stiftung/csv_import_list.html", context) @login_required def csv_import_create(request): """Show CSV import form and handle file upload""" if request.method == "POST": import_type = request.POST.get("import_type") csv_file = request.FILES.get("csv_file") if not csv_file or not import_type: messages.error( request, "Bitte wählen Sie einen Import-Typ und eine CSV-Datei aus." ) return redirect("stiftung:csv_import_create") if not csv_file.name.endswith(".csv"): messages.error(request, "Bitte wählen Sie eine gültige CSV-Datei aus.") return redirect("stiftung:csv_import_create") try: # Create import record csv_import = CSVImport.objects.create( import_type=import_type, filename=csv_file.name, file_size=csv_file.size, created_by=( request.user.username if request.user.is_authenticated else "Unknown" ), status="processing", ) # Process the CSV file if import_type == "destinataere": result = process_destinataere_csv(csv_file, csv_import) elif import_type == "paechter": result = process_paechter_csv(csv_file, csv_import) elif import_type == "personen": result = process_personen_csv(csv_file, csv_import) elif import_type == "laendereien": result = process_laendereien_csv(csv_file, csv_import) else: messages.error(request, "Unbekannter Import-Typ.") csv_import.status = "failed" csv_import.save() return redirect("stiftung:csv_import_create") # Update import record csv_import.total_rows = result["total_rows"] csv_import.imported_rows = result["imported_rows"] csv_import.failed_rows = result["failed_rows"] csv_import.error_log = result["error_log"] csv_import.status = result["status"] csv_import.completed_at = timezone.now() csv_import.save() if result["status"] == "completed": messages.success( request, f'CSV-Import erfolgreich! {result["imported_rows"]} Datensätze importiert.', ) elif result["status"] == "partial": messages.warning( request, f'CSV-Import teilweise erfolgreich. {result["imported_rows"]} importiert, {result["failed_rows"]} fehlgeschlagen.', ) else: messages.error( request, f'CSV-Import fehlgeschlagen. {result["error_log"]}' ) return redirect("stiftung:csv_import_list") except Exception as e: messages.error(request, f"Fehler beim CSV-Import: {str(e)}") return redirect("stiftung:csv_import_create") context = { "import_types": CSVImport.IMPORT_TYPE_CHOICES, } return render(request, "stiftung/csv_import_form.html", context) def process_personen_csv(csv_file, csv_import): """Process CSV file for Personen import""" decoded_file = csv_file.read().decode("utf-8") # Handle both comma and semicolon separated files if ";" in decoded_file.split("\n")[0]: csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";") else: csv_data = csv.DictReader(io.StringIO(decoded_file)) total_rows = 0 imported_rows = 0 failed_rows = 0 error_log = [] for row_num, row in enumerate( csv_data, start=2 ): # Start at 2 because row 1 is header total_rows += 1 try: # Map CSV columns to model fields person_data = { "vorname": row.get("Vorname", "").strip(), "nachname": row.get("Nachname", "").strip(), "familienzweig": row.get("Familienzweig", "hauptzweig").strip(), "email": row.get("E-Mail", "").strip() or None, "telefon": row.get("Telefon", "").strip() or None, "iban": row.get("IBAN", "").strip() or None, "adresse": row.get("Adresse", "").strip() or None, "notizen": row.get("Notizen", "").strip() or None, "aktiv": row.get("Aktiv", "true").lower() == "true", } # Handle date fields if row.get("Geburtsdatum"): try: person_data["geburtsdatum"] = datetime.strptime( row["Geburtsdatum"], "%d.%m.%Y" ).date() except ValueError: try: person_data["geburtsdatum"] = datetime.strptime( row["Geburtsdatum"], "%Y-%m-%d" ).date() except ValueError: person_data["geburtsdatum"] = None # Validate required fields if not person_data["vorname"] or not person_data["nachname"]: error_log.append( f"Zeile {row_num}: Vorname und Nachname sind erforderlich" ) failed_rows += 1 continue # Check if person already exists existing_person = Person.objects.filter( vorname__iexact=person_data["vorname"], nachname__iexact=person_data["nachname"], ).first() if existing_person: # Update existing person for field, value in person_data.items(): if value is not None: setattr(existing_person, field, value) existing_person.save() else: # Create new person Person.objects.create(**person_data) imported_rows += 1 except Exception as e: error_log.append(f"Zeile {row_num}: {str(e)}") failed_rows += 1 # Determine status if failed_rows == 0: status = "completed" elif imported_rows > 0: status = "partial" else: status = "failed" return { "total_rows": total_rows, "imported_rows": imported_rows, "failed_rows": failed_rows, "error_log": "\n".join(error_log) if error_log else None, "status": status, } def process_destinataere_csv(csv_file, csv_import): """Process CSV file for Destinatäre import""" decoded_file = csv_file.read().decode("utf-8") # Handle both comma and semicolon separated files if ";" in decoded_file.split("\n")[0]: csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";") else: csv_data = csv.DictReader(io.StringIO(decoded_file)) total_rows = 0 imported_rows = 0 failed_rows = 0 error_log = [] for row_num, row in enumerate( csv_data, start=2 ): # Start at 2 because row 1 is header total_rows += 1 try: # Helper function to parse boolean values from CSV def parse_boolean(value, default=False): """Parse boolean values from CSV with multiple accepted formats""" if not value: return default value_str = str(value).strip().lower() # Accept various true values true_values = ['true', 'ja', 'yes', '1', 'wahr', 'x'] # Accept various false values false_values = ['false', 'nein', 'no', '0', 'falsch', ''] if value_str in true_values: return True elif value_str in false_values: return False else: # If unclear, return default return default # Map CSV columns to model fields destinataer_data = { "vorname": row.get("Vorname", "").strip(), "nachname": row.get("Nachname", "").strip(), "familienzweig": row.get("Familienzweig", "hauptzweig").strip(), "email": row.get("E-Mail", "").strip() or None, "telefon": row.get("Telefon", "").strip() or None, "iban": row.get("IBAN", "").strip() or None, "strasse": row.get("Straße", "").strip() or None, "plz": row.get("PLZ", "").strip() or None, "ort": row.get("Ort", "").strip() or None, "berufsgruppe": row.get("Berufsgruppe", "andere").strip(), "ausbildungsstand": row.get("Ausbildungsstand", "").strip() or None, "institution": row.get("Institution", "").strip() or None, "projekt_beschreibung": row.get("Projektbeschreibung", "").strip() or None, "jaehrliches_einkommen": ( float(row.get("Jährliches_Einkommen", 0)) if row.get("Jährliches_Einkommen") else None ), "notizen": row.get("Notizen", "").strip() or None, # Boolean fields with improved parsing "finanzielle_notlage": parse_boolean(row.get("Finanzielle_Notlage"), False), "aktiv": parse_boolean(row.get("Aktiv"), True), "ist_abkoemmling": parse_boolean(row.get("Ist_Abkömmling"), False), "unterstuetzung_bestaetigt": parse_boolean(row.get("Unterstützung_bestätigt"), False), "studiennachweis_erforderlich": parse_boolean(row.get("Studiennachweis_erforderlich"), False), } # Handle numeric fields if row.get("Haushaltsgröße"): try: destinataer_data["haushaltsgroesse"] = int(row["Haushaltsgröße"]) except ValueError: pass if row.get("Monatliche_Bezüge"): try: destinataer_data["monatliche_bezuege"] = float(row["Monatliche_Bezüge"]) except ValueError: pass if row.get("Vermögen"): try: destinataer_data["vermoegen"] = float(row["Vermögen"]) except ValueError: pass if row.get("Vierteljährlicher_Betrag"): try: destinataer_data["vierteljaehrlicher_betrag"] = float(row["Vierteljährlicher_Betrag"]) except ValueError: pass # Handle date fields if row.get("Geburtsdatum"): try: destinataer_data["geburtsdatum"] = datetime.strptime( row["Geburtsdatum"], "%d.%m.%Y" ).date() except ValueError: try: destinataer_data["geburtsdatum"] = datetime.strptime( row["Geburtsdatum"], "%Y-%m-%d" ).date() except ValueError: destinataer_data["geburtsdatum"] = None if row.get("Letzter_Studiennachweis"): try: destinataer_data["letzter_studiennachweis"] = datetime.strptime( row["Letzter_Studiennachweis"], "%d.%m.%Y" ).date() except ValueError: try: destinataer_data["letzter_studiennachweis"] = datetime.strptime( row["Letzter_Studiennachweis"], "%Y-%m-%d" ).date() except ValueError: destinataer_data["letzter_studiennachweis"] = None # Validate required fields if not destinataer_data["vorname"] or not destinataer_data["nachname"]: error_log.append( f"Zeile {row_num}: Vorname und Nachname sind erforderlich" ) failed_rows += 1 continue # Check if destinataer already exists existing_destinataer = Destinataer.objects.filter( vorname__iexact=destinataer_data["vorname"], nachname__iexact=destinataer_data["nachname"], ).first() if existing_destinataer: # Update existing destinataer for field, value in destinataer_data.items(): if value is not None: setattr(existing_destinataer, field, value) existing_destinataer.save() else: # Create new destinataer Destinataer.objects.create(**destinataer_data) imported_rows += 1 except Exception as e: error_log.append(f"Zeile {row_num}: {str(e)}") failed_rows += 1 # Determine status if failed_rows == 0: status = "completed" elif imported_rows > 0: status = "partial" else: status = "failed" return { "total_rows": total_rows, "imported_rows": imported_rows, "failed_rows": failed_rows, "error_log": "\n".join(error_log) if error_log else None, "status": status, } def process_paechter_csv(csv_file, csv_import): """Process CSV file for Paechter import""" decoded_file = csv_file.read().decode("utf-8") # Handle both comma and semicolon separated files if ";" in decoded_file.split("\n")[0]: csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";") else: csv_data = csv.DictReader(io.StringIO(decoded_file)) total_rows = 0 imported_rows = 0 failed_rows = 0 error_log = [] for row_num, row in enumerate( csv_data, start=2 ): # Start at 2 because row 1 is header total_rows += 1 try: # Get raw values from CSV - handle both semicolon and comma separated # Handle BOM in column names vorname_raw = row.get("Vorname", "") or row.get("\ufeffVorname", "") nachname_raw = row.get("Nachname", "") personentyp_raw = row.get("Personentyp", "") # Clean up the values (remove extra whitespace but keep empty strings) vorname_raw = vorname_raw.strip() if vorname_raw else "" nachname_raw = nachname_raw.strip() if nachname_raw else "" personentyp_raw = personentyp_raw.strip() if personentyp_raw else "" # Debug: Log raw values and available columns error_log.append(f"Zeile {row_num}: Available columns: {list(row.keys())}") error_log.append( f"Zeile {row_num}: RAW Vorname='{vorname_raw}', Nachname='{nachname_raw}', Personentyp='{personentyp_raw}'" ) # Determine personentyp based on the data if personentyp_raw in ["Gesellschaft", "KG", "GbR", "GmbH"]: personentyp = "gesellschaft" elif personentyp_raw in ["Herrn", "Frau"]: personentyp = "natuerlich" else: # Fallback: analyze the Nachname to detect companies nachname_lower = nachname_raw.lower() if any( keyword in nachname_lower for keyword in [ "kg", "gbr", "gmbh", "ag", "ohg", "e.v.", "stiftung", "genossenschaft", ] ): personentyp = "gesellschaft" else: personentyp = "natuerlich" # Handle Vorname - keep original value unless it's 'N/A' vorname = vorname_raw if vorname_raw and vorname_raw != "N/A" else "" # Debug: Log processed values error_log.append( f"Zeile {row_num}: PROCESSED Vorname='{vorname}', Nachname='{nachname_raw}', Personentyp='{personentyp}'" ) paechter_data = { "vorname": vorname, "nachname": nachname_raw, "email": row.get("E-Mail", "").strip() or None, "telefon": row.get("Telefon", "").strip() or None, "iban": row.get("IBAN", "").strip() or None, "strasse": row.get("Straße", "").strip() or None, "plz": row.get("PLZ", "").strip() or None, "ort": row.get("Ort", "").strip() or None, "personentyp": personentyp, "pachtnummer": row.get("Pachtnummer", "").strip() or None, "landwirtschaftliche_ausbildung": row.get( "Landwirtschaftliche_Ausbildung", "false" ).lower() == "true", "berufserfahrung_jahre": ( int(row.get("Berufserfahrung_Jahre", 0)) if row.get("Berufserfahrung_Jahre") else None ), "spezialisierung": row.get("Spezialisierung", "").strip() or None, "notizen": row.get("Notizen", "").strip() or None, "aktiv": row.get("Aktiv", "true").lower() in ["true", "wahr", "ja", "1"], } # Handle date fields if row.get("Geburtsdatum"): try: paechter_data["geburtsdatum"] = datetime.strptime( row["Geburtsdatum"], "%d.%m.%Y" ).date() except ValueError: try: paechter_data["geburtsdatum"] = datetime.strptime( row["Geburtsdatum"], "%Y-%m-%d" ).date() except ValueError: paechter_data["geburtsdatum"] = None if row.get("Pachtbeginn_Erste"): try: paechter_data["pachtbeginn_erste"] = datetime.strptime( row["Pachtbeginn_Erste"], "%d.%m.%Y" ).date() except ValueError: try: paechter_data["pachtbeginn_erste"] = datetime.strptime( row["Pachtbeginn_Erste"], "%Y-%m-%d" ).date() except ValueError: paechter_data["pachtbeginn_erste"] = None if row.get("Pachtende_Letzte"): try: paechter_data["pachtende_letzte"] = datetime.strptime( row["Pachtende_Letzte"], "%d.%m.%Y" ).date() except ValueError: try: paechter_data["pachtende_letzte"] = datetime.strptime( row["Pachtende_Letzte"], "%Y-%m-%d" ).date() except ValueError: paechter_data["pachtende_letzte"] = None # Handle decimal fields if row.get("Pachtzins_Aktuell"): try: paechter_data["pachtzins_aktuell"] = float(row["Pachtzins_Aktuell"]) except ValueError: paechter_data["pachtzins_aktuell"] = None # Validate required fields if personentyp == "gesellschaft": # For companies, only Nachname is required if not paechter_data["nachname"]: error_log.append( f"Zeile {row_num}: Nachname ist für Gesellschaften erforderlich" ) failed_rows += 1 continue else: # For natural persons, only Nachname is required if not paechter_data["nachname"]: error_log.append( f"Zeile {row_num}: Nachname ist für natürliche Personen erforderlich" ) failed_rows += 1 continue # Check if paechter already exists if personentyp == "gesellschaft": # For companies, search by Nachname only existing_paechter = Paechter.objects.filter( nachname__iexact=paechter_data["nachname"], personentyp="gesellschaft", ).first() else: # For natural persons, search by Nachname only (since Vorname can be empty) existing_paechter = Paechter.objects.filter( nachname__iexact=paechter_data["nachname"], personentyp="natuerlich" ).first() if existing_paechter: # Update existing paechter for field, value in paechter_data.items(): if value is not None: setattr(existing_paechter, field, value) existing_paechter.save() else: # Create new paechter Paechter.objects.create(**paechter_data) imported_rows += 1 except Exception as e: error_log.append(f"Zeile {row_num}: {str(e)}") failed_rows += 1 # Determine status if failed_rows == 0: status = "completed" elif imported_rows > 0: status = "partial" else: status = "failed" return { "total_rows": total_rows, "imported_rows": imported_rows, "failed_rows": failed_rows, "error_log": "\n".join(error_log) if error_log else None, "status": status, } def process_laendereien_csv(csv_file, csv_import): """Process CSV file for Ländereien import""" decoded_file = csv_file.read().decode("utf-8") # Handle both comma and semicolon separated files if ";" in decoded_file.split("\n")[0]: csv_data = csv.DictReader(io.StringIO(decoded_file), delimiter=";") else: csv_data = csv.DictReader(io.StringIO(decoded_file)) total_rows = 0 imported_rows = 0 failed_rows = 0 error_log = [] last_gemeinde = None for row_num, row in enumerate(csv_data, start=2): total_rows += 1 try: # Build case-insensitive access helpers (strip BOM, normalize separators) def clean_key(key: str) -> str: return (key or "").replace("\ufeff", "").replace("\ufeff", "").strip() normalized_row = {clean_key(k): (v or "").strip() for k, v in row.items()} lower_row = { clean_key(k).lower(): (v or "").strip() for k, v in row.items() } sanitized_row = { clean_key(k) .lower() .replace("-", "_") .replace(" ", "_"): (v or "") .strip() for k, v in row.items() } def get_val(*keys): # Try exact keys first, then case-insensitive for key in keys: if key in normalized_row: return normalized_row[key] for key in keys: lk = key.lower() if lk in lower_row: return lower_row[lk] sk = lk.replace("-", "_").replace(" ", "_") if sk in sanitized_row: return sanitized_row[sk] return "" def parse_float(value): if not value: return 0 # replace comma decimal if present v = ( value.replace(".", "").replace(",", ".") if value.count(",") == 1 and value.count(".") > 1 else value.replace(",", ".") ) try: return float(v) except ValueError: return 0 # Map CSV columns to model fields (robust to header variants) lfd_nr_val = get_val( "Lfd_Nr", "lfd_nr", "LfdNr", "lfdnr", "laufende_nummer", "laufende-nummer", ) land_data = { "lfd_nr": lfd_nr_val, "ew_nummer": get_val("EW_Nummer", "ew_nummer") or None, "amtsgericht": get_val("Amtsgericht", "amtsgericht"), "gemeinde": get_val("Gemeinde", "gemeinde"), "gemarkung": get_val("Gemarkung", "gemarkung"), "flur": get_val("Flur", "flur"), "flurstueck": get_val( "Flurstück", "Flurstueck", "flurstück", "flurstueck" ), "groesse_qm": parse_float( get_val("Größe_qm", "Groesse_qm", "groesse_qm", "größe_qm") ), "gruenland_qm": parse_float( get_val( "Grünland_qm", "Gruenland_qm", "gruenland_qm", "grünland_qm" ) ), "acker_qm": parse_float(get_val("Acker_qm", "acker_qm")), "wald_qm": parse_float(get_val("Wald_qm", "wald_qm")), "sonstiges_qm": parse_float(get_val("Sonstiges_qm", "sonstiges_qm")), "verpachtete_gesamtflaeche": parse_float( get_val( "Verpachtete_Gesamtfläche_qm", "Verpachtete_Gesamtflaeche_qm", "verpachtete_gesamtfläche_qm", "verpachtete_gesamtflaeche_qm", ) ), "verp_flaeche_aktuell": parse_float( get_val( "Verp_Fläche_aktuell_qm", "Verp_Flaeche_aktuell_qm", "verp_flaeche_aktuell_qm", "verp_fläche_aktuell_qm", ) ), "aktiv": get_val("Aktiv", "aktiv").lower() in ["true", "wahr", "ja", "1"], "notizen": get_val("Notizen", "notizen") or None, } # Fallback for missing 'Gemeinde' -> set explicit placeholder if not land_data["gemeinde"]: land_data["gemeinde"] = "FEHLT" # Validate required fields required_fields = ["lfd_nr", "gemeinde", "gemarkung", "flur", "flurstueck"] missing_fields = [ field for field in required_fields if not land_data[field] ] if missing_fields: # Log header diagnostics on first failure only to help debugging if row_num == 2: error_log.append(f"Erkannte Spalten: {list(normalized_row.keys())}") error_log.append( f"Zeile {row_num}: Fehlende Pflichtfelder: {', '.join(missing_fields)}" ) failed_rows += 1 continue # Check if land already exists existing_land = Land.objects.filter(lfd_nr=land_data["lfd_nr"]).first() if existing_land: # Update existing land for field, value in land_data.items(): if value is not None: setattr(existing_land, field, value) existing_land.save() else: # Create new land Land.objects.create(**land_data) imported_rows += 1 if land_data["gemeinde"]: last_gemeinde = land_data["gemeinde"] except Exception as e: error_log.append(f"Zeile {row_num}: {str(e)}") failed_rows += 1 # Determine status if failed_rows == 0: status = "completed" elif imported_rows > 0: status = "partial" else: status = "failed" return { "total_rows": total_rows, "imported_rows": imported_rows, "failed_rows": failed_rows, "error_log": "\n".join(error_log) if error_log else None, "status": status, } # Person Views @api_view(["GET"]) def gramps_search_api(request): """Probe-Endpoint: Suche Personen in Gramps Web nach q (Nachname, Vorname).""" q = request.GET.get("q", "") if not q: return Response({"error": "Parameter q erforderlich"}, status=400) client = get_gramps_client() result = client.search_people(q) return Response(result) # Geschäftsführung Views @login_required def administration(request): """Administration Dashboard""" from datetime import datetime, timedelta from django.db.models import Count from stiftung.models import AuditLog, BackupJob # Recent audit activity recent_audit = AuditLog.objects.all()[:10] # Audit statistics heute = datetime.now().date() stats = { "total_logs": AuditLog.objects.count(), "logs_today": AuditLog.objects.filter(timestamp__date=heute).count(), "logs_week": AuditLog.objects.filter( timestamp__gte=heute - timedelta(days=7) ).count(), "recent_backups": BackupJob.objects.all()[:5], "last_backup": BackupJob.objects.filter(status="completed").first(), } # User activity summary user_activity = ( AuditLog.objects.values("username") .annotate(count=Count("id")) .order_by("-count")[:10] ) context = { "recent_audit": recent_audit, "stats": stats, "user_activity": user_activity, } return render(request, "stiftung/administration.html", context) @login_required def audit_log_list(request): """Liste aller Audit Log Einträge""" from django.core.paginator import Paginator from stiftung.models import AuditLog logs = AuditLog.objects.all() # Filter user_filter = request.GET.get("user") if user_filter: logs = logs.filter(username__icontains=user_filter) action_filter = request.GET.get("action") if action_filter: logs = logs.filter(action=action_filter) entity_filter = request.GET.get("entity_type") if entity_filter: logs = logs.filter(entity_type=entity_filter) date_from = request.GET.get("date_from") if date_from: logs = logs.filter(timestamp__date__gte=date_from) date_to = request.GET.get("date_to") if date_to: logs = logs.filter(timestamp__date__lte=date_to) # Pagination paginator = Paginator(logs, 50) page_number = request.GET.get("page") page_obj = paginator.get_page(page_number) context = { "page_obj": page_obj, "action_choices": AuditLog.ACTION_TYPES, "entity_choices": AuditLog.ENTITY_TYPES, "user_filter": user_filter, "action_filter": action_filter, "entity_filter": entity_filter, "date_from": date_from, "date_to": date_to, } return render(request, "stiftung/audit_log_list.html", context) @login_required def backup_management(request): """Backup Management Interface""" from django.core.paginator import Paginator from stiftung.models import BackupJob # Handle backup creation if request.method == "POST": backup_type = request.POST.get("backup_type", "full") # Create backup job backup_job = BackupJob.objects.create( backup_type=backup_type, created_by=request.user ) # Log the backup initiation from stiftung.audit import log_system_action log_system_action( request=request, action="backup", description=f"Backup-Job erstellt: {backup_job.get_backup_type_display()}", details={"backup_job_id": str(backup_job.id), "backup_type": backup_type}, ) # Start backup process asynchronously (we'll create a simple version for now) import threading from stiftung.backup_utils import run_backup backup_thread = threading.Thread(target=run_backup, args=(str(backup_job.id),)) backup_thread.start() messages.success( request, f'Backup-Job "{backup_job.get_backup_type_display()}" wurde gestartet.', ) return redirect("stiftung:backup_management") # List backup jobs backup_jobs = BackupJob.objects.all() # Pagination paginator = Paginator(backup_jobs, 20) page_number = request.GET.get("page") page_obj = paginator.get_page(page_number) context = { "page_obj": page_obj, "backup_types": BackupJob.TYPE_CHOICES, } return render(request, "stiftung/backup_management.html", context) @login_required def backup_download(request, backup_id): """Download a backup file""" import os from django.http import FileResponse, Http404 from stiftung.models import BackupJob try: backup_job = BackupJob.objects.get(id=backup_id, status="completed") except BackupJob.DoesNotExist: raise Http404("Backup nicht gefunden oder nicht vollständig") backup_path = os.path.join("/app/backups", backup_job.backup_filename) if not os.path.exists(backup_path): raise Http404("Backup-Datei nicht gefunden") # Log download from stiftung.audit import log_system_action log_system_action( request=request, action="export", description=f"Backup heruntergeladen: {backup_job.backup_filename}", details={"backup_job_id": str(backup_job.id)}, ) response = FileResponse( open(backup_path, "rb"), as_attachment=True, filename=backup_job.backup_filename ) return response @login_required def backup_restore(request): """Restore from backup""" if request.method == "POST": from stiftung.models import BackupJob backup_file = request.FILES.get("backup_file") if not backup_file: messages.error(request, "Bitte wählen Sie eine Backup-Datei aus.") return redirect("stiftung:backup_management") # Validate file format if not backup_file.name.endswith(".tar.gz"): messages.error( request, "Ungültiges Backup-Format. Nur .tar.gz Dateien sind erlaubt." ) return redirect("stiftung:backup_management") # Save uploaded file to temporary location import os import tempfile temp_dir = tempfile.mkdtemp() backup_path = os.path.join(temp_dir, backup_file.name) try: with open(backup_path, "wb+") as destination: for chunk in backup_file.chunks(): destination.write(chunk) # Validate the backup file from stiftung.backup_utils import validate_backup_file is_valid, message = validate_backup_file(backup_path) if not is_valid: messages.error(request, f"Ungültiges Backup: {message}") return redirect("stiftung:backup_management") # Show validation success messages.info(request, f"Backup validiert: {message}") # Create restore job restore_job = BackupJob.objects.create( operation="restore", backup_type="full", created_by=request.user, backup_filename=backup_file.name, ) # Log restore initiation from stiftung.audit import log_system_action log_system_action( request=request, action="restore", description=f"Wiederherstellung gestartet von: {backup_file.name}", details={ "restore_job_id": str(restore_job.id), "filename": backup_file.name, }, ) # Start restore process import threading from stiftung.backup_utils import run_restore restore_thread = threading.Thread( target=run_restore, args=(str(restore_job.id), backup_path) ) restore_thread.start() messages.success( request, f'Wiederherstellung von "{backup_file.name}" wurde gestartet. ' f'Überwachen Sie den Fortschritt in der Backup-Historie.' ) return redirect("stiftung:backup_management") except Exception as e: messages.error(request, f"Fehler beim Verarbeiten der Backup-Datei: {e}") return redirect("stiftung:backup_management") return redirect("stiftung:backup_management") @login_required def backup_cancel(request, backup_id): """Cancel a running backup job""" from stiftung.models import BackupJob import traceback try: print(f"DEBUG: Attempting to cancel backup job {backup_id}") backup_job = BackupJob.objects.get(id=backup_id) print(f"DEBUG: Found backup job - ID: {backup_job.id}, Status: {backup_job.status}") # Use created_by_id instead of created_by to avoid triggering the foreign key lookup print(f"DEBUG: Created by ID: {backup_job.created_by_id}, Current user ID: {request.user.id}") # Only allow cancelling running or pending jobs if backup_job.status not in ['running', 'pending']: messages.error(request, "Nur laufende oder wartende Backups können abgebrochen werden.") return redirect("stiftung:backup_management") # Check if user has permission to cancel (either own job or admin) # Use created_by_id to avoid database lookup for potentially non-existent user print(f"DEBUG: Checking permissions - created_by_id: {backup_job.created_by_id}, is_staff: {request.user.is_staff}") if backup_job.created_by_id is not None and backup_job.created_by_id != request.user.id and not request.user.is_staff: messages.error(request, "Sie können nur Ihre eigenen Backup-Jobs abbrechen.") return redirect("stiftung:backup_management") # Mark as cancelled print("DEBUG: About to mark job as cancelled") from django.utils import timezone backup_job.status = "cancelled" backup_job.completed_at = timezone.now() print(f"DEBUG: About to set error message with username: {request.user.username}") backup_job.error_message = f"Abgebrochen von {request.user.username}" print("DEBUG: About to save backup job") backup_job.save() print("DEBUG: Backup job saved successfully") # Log the cancellation (with error handling) try: print("DEBUG: About to log system action") from stiftung.audit import log_system_action print(f"DEBUG: About to call get_backup_type_display") backup_type_display = backup_job.get_backup_type_display() print(f"DEBUG: Backup type display: {backup_type_display}") log_system_action( request=request, action="backup_cancel", description=f"Backup-Job abgebrochen: {backup_type_display}", details={"backup_job_id": str(backup_job.id)}, ) print("DEBUG: System action logged successfully") except Exception as audit_error: print(f"ERROR in audit logging: {audit_error}") print(f"ERROR traceback: {traceback.format_exc()}") # Don't fail the cancellation if logging fails messages.success(request, f"Backup-Job wurde abgebrochen.") except BackupJob.DoesNotExist: print(f"ERROR: Backup job {backup_id} not found") messages.error(request, "Backup-Job nicht gefunden.") except Exception as e: print(f"ERROR: Unexpected error in backup_cancel: {e}") print(f"ERROR traceback: {traceback.format_exc()}") messages.error(request, f"Fehler beim Abbrechen des Backup-Jobs: {e}") return redirect("stiftung:backup_management") # ============================================================================= # USER MANAGEMENT VIEWS # ============================================================================= @login_required def user_management(request): """User Management Dashboard""" from django.contrib.auth.models import User from django.core.paginator import Paginator from django.db.models import Q # Check permission if not request.user.has_perm("stiftung.manage_users"): messages.error( request, "Sie haben keine Berechtigung für die Benutzerverwaltung." ) return redirect("stiftung:administration") users = User.objects.all().order_by("username") # Search functionality search = request.GET.get("search") if search: users = users.filter( Q(username__icontains=search) | Q(email__icontains=search) | Q(first_name__icontains=search) | Q(last_name__icontains=search) ) # Filter by status status_filter = request.GET.get("status") if status_filter == "active": users = users.filter(is_active=True) elif status_filter == "inactive": users = users.filter(is_active=False) elif status_filter == "staff": users = users.filter(is_staff=True) # Pagination paginator = Paginator(users, 20) page_number = request.GET.get("page") page_obj = paginator.get_page(page_number) # Statistics stats = { "total_users": User.objects.count(), "active_users": User.objects.filter(is_active=True).count(), "staff_users": User.objects.filter(is_staff=True).count(), "inactive_users": User.objects.filter(is_active=False).count(), } context = { "page_obj": page_obj, "stats": stats, "search": search, "status_filter": status_filter, } return render(request, "stiftung/user_management.html", context) @login_required def user_create(request): """Create a new user""" from django.contrib.auth.models import User from stiftung.forms import UserCreationForm # Check permission if not request.user.has_perm("stiftung.manage_users"): messages.error( request, "Sie haben keine Berechtigung für die Benutzerverwaltung." ) return redirect("stiftung:administration") if request.method == "POST": form = UserCreationForm(request.POST) if form.is_valid(): # Create user user = User.objects.create_user( username=form.cleaned_data["username"], email=form.cleaned_data["email"], password=form.cleaned_data["password1"], first_name=form.cleaned_data["first_name"], last_name=form.cleaned_data["last_name"], is_active=form.cleaned_data["is_active"], is_staff=form.cleaned_data["is_staff"], ) # Log user creation from stiftung.audit import log_action log_action( request=request, action="create", entity_type="user", entity_id=str(user.pk), entity_name=user.username, description=f'Neuer Benutzer "{user.username}" wurde erstellt', ) messages.success( request, f'Benutzer "{user.username}" wurde erfolgreich erstellt.' ) return redirect("stiftung:user_detail", pk=user.pk) else: form = UserCreationForm() context = {"form": form, "title": "Neuen Benutzer erstellen"} return render(request, "stiftung/user_form.html", context) @login_required def user_detail(request, pk): """User detail view""" from django.contrib.auth.models import User # Check permission if not request.user.has_perm("stiftung.manage_users"): messages.error( request, "Sie haben keine Berechtigung für die Benutzerverwaltung." ) return redirect("stiftung:administration") user = get_object_or_404(User, pk=pk) # Get user's permissions user_permissions = user.get_all_permissions() stiftung_permissions = [ perm for perm in user_permissions if perm.startswith("stiftung.") ] # Get recent audit activity from stiftung.models import AuditLog recent_activity = AuditLog.objects.filter(user=user).order_by("-timestamp")[:10] context = { "user_obj": user, # Use user_obj to avoid conflict with request.user "stiftung_permissions": stiftung_permissions, "recent_activity": recent_activity, } return render(request, "stiftung/user_detail.html", context) @login_required def user_edit(request, pk): """Edit user""" from django.contrib.auth.models import User from stiftung.forms import UserUpdateForm # Check permission if not request.user.has_perm("stiftung.manage_users"): messages.error( request, "Sie haben keine Berechtigung für die Benutzerverwaltung." ) return redirect("stiftung:administration") user = get_object_or_404(User, pk=pk) if request.method == "POST": form = UserUpdateForm(request.POST, instance=user) if form.is_valid(): # Track changes from stiftung.audit import log_action, track_model_changes old_user = User.objects.get(pk=user.pk) updated_user = form.save() # Log changes changes = track_model_changes(old_user, updated_user) if changes: log_action( request=request, action="update", entity_type="user", entity_id=str(updated_user.pk), entity_name=updated_user.username, description=f'Benutzer "{updated_user.username}" wurde aktualisiert', changes=changes, ) messages.success( request, f'Benutzer "{updated_user.username}" wurde erfolgreich aktualisiert.', ) return redirect("stiftung:user_detail", pk=updated_user.pk) else: form = UserUpdateForm(instance=user) context = { "form": form, "user_obj": user, "title": f'Benutzer "{user.username}" bearbeiten', } return render(request, "stiftung/user_form.html", context) @login_required def user_change_password(request, pk): """Change user password""" from django.contrib.auth.models import User from stiftung.forms import PasswordChangeForm # Check permission if not request.user.has_perm("stiftung.manage_users"): messages.error( request, "Sie haben keine Berechtigung für die Benutzerverwaltung." ) return redirect("stiftung:administration") user = get_object_or_404(User, pk=pk) if request.method == "POST": form = PasswordChangeForm(request.POST) if form.is_valid(): user.set_password(form.cleaned_data["new_password1"]) user.save() # Log password change from stiftung.audit import log_action log_action( request=request, action="update", entity_type="user", entity_id=str(user.pk), entity_name=user.username, description=f'Passwort für Benutzer "{user.username}" wurde geändert', ) messages.success( request, f'Passwort für Benutzer "{user.username}" wurde erfolgreich geändert.', ) return redirect("stiftung:user_detail", pk=user.pk) else: form = PasswordChangeForm() context = { "form": form, "user_obj": user, "title": f'Passwort für "{user.username}" ändern', } return render(request, "stiftung/user_change_password.html", context) @login_required def user_permissions(request, pk): """Manage user permissions""" from django.contrib.auth.models import Permission, User from stiftung.forms import UserPermissionForm # Check permission if not request.user.has_perm("stiftung.manage_permissions"): messages.error( request, "Sie haben keine Berechtigung für die Berechtigungsverwaltung." ) return redirect("stiftung:administration") user = get_object_or_404(User, pk=pk) if request.method == "POST": form = UserPermissionForm(request.POST, user=user) if form.is_valid(): # Get selected permissions selected_perms = [] for field_name, value in form.cleaned_data.items(): if field_name.startswith("perm_") and value: perm_id = field_name.replace("perm_", "") selected_perms.append(int(perm_id)) # Get current stiftung permissions current_perms = user.user_permissions.filter( content_type__app_label="stiftung" ) current_perm_ids = set(current_perms.values_list("id", flat=True)) selected_perm_ids = set(selected_perms) # Remove permissions that are no longer selected to_remove = current_perm_ids - selected_perm_ids if to_remove: user.user_permissions.remove( *Permission.objects.filter(id__in=to_remove) ) # Add new permissions to_add = selected_perm_ids - current_perm_ids if to_add: user.user_permissions.add(*Permission.objects.filter(id__in=to_add)) # Log permission changes from stiftung.audit import log_action if to_remove or to_add: changes = { "removed_permissions": list( Permission.objects.filter(id__in=to_remove).values_list( "name", flat=True ) ), "added_permissions": list( Permission.objects.filter(id__in=to_add).values_list( "name", flat=True ) ), } log_action( request=request, action="update", entity_type="user", entity_id=str(user.pk), entity_name=user.username, description=f'Berechtigungen für Benutzer "{user.username}" wurden aktualisiert', changes=changes, ) messages.success( request, f'Berechtigungen für Benutzer "{user.username}" wurden erfolgreich aktualisiert.', ) return redirect("stiftung:user_detail", pk=user.pk) else: form = UserPermissionForm(user=user) context = { "form": form, "user_obj": user, "permission_groups": form.get_permission_groups(), "title": f'Berechtigungen für "{user.username}"', } return render(request, "stiftung/user_permissions.html", context) @login_required def user_delete(request, pk): """Delete user""" from django.contrib.auth.models import User # Check permission if not request.user.has_perm("stiftung.manage_users"): messages.error( request, "Sie haben keine Berechtigung für die Benutzerverwaltung." ) return redirect("stiftung:administration") user = get_object_or_404(User, pk=pk) # Prevent deletion of current user if user == request.user: messages.error(request, "Sie können sich nicht selbst löschen.") return redirect("stiftung:user_detail", pk=pk) if request.method == "POST": username = user.username # Log deletion before deleting from stiftung.audit import log_action log_action( request=request, action="delete", entity_type="user", entity_id=str(user.pk), entity_name=username, description=f'Benutzer "{username}" wurde gelöscht', ) user.delete() messages.success(request, f'Benutzer "{username}" wurde erfolgreich gelöscht.') return redirect("stiftung:user_management") context = {"user_obj": user, "title": f'Benutzer "{user.username}" löschen'} return render(request, "stiftung/user_delete.html", context) # ============================================================================= # AUTHENTICATION VIEWS # ============================================================================= def user_login(request): """User login view""" from django.contrib.auth import authenticate, login from django.contrib.auth.forms import AuthenticationForm if request.user.is_authenticated: return redirect("stiftung:home") if request.method == "POST": form = AuthenticationForm(request, data=request.POST) if form.is_valid(): username = form.cleaned_data.get("username") password = form.cleaned_data.get("password") user = authenticate(username=username, password=password) if user is not None: login(request, user) # Log the login from stiftung.audit import log_login log_login(request, user) # Determine redirect target next_param = request.GET.get("next") or request.POST.get("next") if not next_param or not next_param.startswith("/"): next_param = reverse("stiftung:home") # Check if user has 2FA enabled - redirect to verification first has_2fa = TOTPDevice.objects.filter(user=user, confirmed=True).exists() if has_2fa: from urllib.parse import urlencode verify_url = reverse("stiftung:two_factor_verify") + "?" + urlencode({"next": next_param}) return redirect(verify_url) messages.success(request, f"Willkommen zurück, {user.username}!") return redirect(next_param) else: messages.error(request, "Ungültige Anmeldedaten.") else: messages.error(request, "Bitte korrigieren Sie die Fehler im Formular.") else: form = AuthenticationForm() context = {"form": form, "next": request.GET.get("next", "")} return render(request, "stiftung/login.html", context) @login_required def user_logout(request): """User logout view""" from django.contrib.auth import logout # Log the logout before actually logging out from stiftung.audit import log_logout log_logout(request, request.user) username = request.user.username logout(request) messages.success(request, f"Sie wurden erfolgreich abgemeldet, {username}.") return redirect("stiftung:login") # ============================================================================ # LANDABRECHNUNGS VIEWS # ============================================================================ @login_required def app_settings(request): """Application settings management interface""" # Group settings by category categories = {} for setting in AppConfiguration.objects.filter(is_active=True).order_by( "category", "order", "display_name" ): if setting.category not in categories: categories[setting.category] = [] categories[setting.category].append(setting) if request.method == "POST": # Handle form submission updated_count = 0 for key, value in request.POST.items(): if key.startswith("setting_"): setting_key = key.replace("setting_", "") try: setting = AppConfiguration.objects.get( key=setting_key, is_active=True ) if not setting.is_system and setting.value != value: setting.value = value setting.save() updated_count += 1 except AppConfiguration.DoesNotExist: continue if updated_count > 0: messages.success(request, f"Successfully updated {updated_count} settings!") else: messages.info(request, "No changes were made.") return redirect("stiftung:app_settings") context = { "categories": categories, "title": "Application Settings", } return render(request, "stiftung/app_settings.html", context) # Unterstützungen Views (Destinataer-focused) @login_required def edit_help_box(request): """Bearbeite oder erstelle eine Hilfs-Infobox""" from stiftung.models import HelpBox # Nur root oder Superuser dürfen bearbeiten if request.user.username != "root" and not request.user.is_superuser: messages.error( request, "Sie haben keine Berechtigung, Hilfsboxen zu bearbeiten." ) return redirect("stiftung:home") if request.method == "POST": page_key = request.POST.get("page_key") title = request.POST.get("title") content = request.POST.get("content") is_active = request.POST.get("is_active") == "on" if not page_key or not title or not content: messages.error(request, "Alle Felder sind erforderlich.") return redirect(request.META.get("HTTP_REFERER", "stiftung:home")) # Hilfsbox erstellen oder aktualisieren help_box, created = HelpBox.objects.get_or_create( page_key=page_key, defaults={ "title": title, "content": content, "is_active": is_active, "created_by": request.user.username, "updated_by": request.user.username, }, ) if not created: # Existierende Hilfsbox aktualisieren help_box.title = title help_box.content = content help_box.is_active = is_active help_box.updated_by = request.user.username help_box.save() messages.success(request, f'Hilfsbox "{title}" wurde aktualisiert.') else: messages.success(request, f'Hilfsbox "{title}" wurde erstellt.') # Zurück zur vorherigen Seite return redirect(request.META.get("HTTP_REFERER", "stiftung:home")) # GET Request - Zeige Admin-Übersicht der Hilfsboxen help_boxes = HelpBox.objects.all().order_by("page_key", "-updated_at") # Statistiken berechnen active_count = help_boxes.filter(is_active=True).count() inactive_count = help_boxes.filter(is_active=False).count() existing_pages = set(help_boxes.values_list("page_key", flat=True)) # Verfügbare Seiten aus dem Model holen available_pages = HelpBox.PAGE_CHOICES context = { "help_boxes": help_boxes, "active_count": active_count, "inactive_count": inactive_count, "existing_pages": existing_pages, "available_pages": available_pages, "title": "Hilfs-Infoboxen verwalten", } return render(request, "stiftung/help_boxes_admin.html", context) # ============================================================================= # Verpachtung Management Views (Standalone CRUD) # ============================================================================= @login_required def two_factor_setup(request): """Setup or manage TOTP 2FA for the current user""" # Check if user already has TOTP device device = TOTPDevice.objects.filter(user=request.user, confirmed=True).first() static_device = StaticDevice.objects.filter(user=request.user).first() if device: # User has 2FA enabled - show management options context = { 'has_2fa': True, 'device': device, 'backup_token_count': static_device.token_set.count() if static_device else 0, 'title': 'Zwei-Faktor-Authentifizierung verwalten' } return render(request, 'stiftung/auth/two_factor_manage.html', context) # User doesn't have 2FA - show setup # Get or create unconfirmed TOTP device device, created = TOTPDevice.objects.get_or_create( user=request.user, name='default', defaults={'confirmed': False} ) if request.method == "POST": token = request.POST.get('token', '').strip() if device.verify_token(token): device.confirmed = True device.save() # Generate backup tokens static_device = StaticDevice.objects.create( user=request.user, name='backup' ) backup_tokens = [] for _ in range(10): # Generate 10 backup codes token_value = random_hex()[:8] # 8 character backup codes StaticToken.objects.create( device=static_device, token=token_value ) backup_tokens.append(token_value) messages.success( request, "Zwei-Faktor-Authentifizierung wurde erfolgreich aktiviert! " "Bitte speichern Sie Ihre Backup-Codes sicher." ) return render(request, 'stiftung/auth/backup_tokens.html', { 'backup_tokens': backup_tokens, 'title': 'Backup-Codes' }) else: messages.error(request, "Ungültiger Bestätigungscode. Bitte versuchen Sie es erneut.") # Generate QR code URL qr_url = device.config_url context = { 'device': device, 'qr_url': qr_url, 'title': 'Zwei-Faktor-Authentifizierung einrichten' } return render(request, 'stiftung/auth/two_factor_setup.html', context) @login_required def two_factor_qr(request): """Generate QR code for TOTP setup""" device = TOTPDevice.objects.filter(user=request.user, confirmed=False).first() if not device: return HttpResponse("Kein Setup-Device gefunden", status=404) # Generate QR code qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(device.config_url) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") response = HttpResponse(content_type="image/png") img.save(response, "PNG") return response @login_required def two_factor_verify(request): """Verify TOTP token during login process""" if request.method == "POST": token = request.POST.get('otp_token', '').strip() # Check TOTP devices devices = TOTPDevice.objects.filter(user=request.user, confirmed=True) for device in devices: if device.verify_token(token): request.session['2fa_verified'] = True messages.success(request, "Zwei-Faktor-Authentifizierung erfolgreich.") return redirect(request.GET.get('next', 'stiftung:home')) # Check static backup tokens static_devices = StaticDevice.objects.filter(user=request.user) for device in static_devices: if device.verify_token(token): request.session['2fa_verified'] = True messages.success(request, "Backup-Code erfolgreich verwendet.") return redirect(request.GET.get('next', 'stiftung:home')) messages.error(request, "Ungültiger Code. Bitte versuchen Sie es erneut.") context = { 'title': 'Zwei-Faktor-Authentifizierung', 'next': request.GET.get('next', '') } return render(request, 'stiftung/auth/two_factor_verify.html', context) @login_required def two_factor_disable(request): """Disable TOTP 2FA for the current user""" if request.method == "POST": password = request.POST.get('password', '') if request.user.check_password(password): # Remove all TOTP devices TOTPDevice.objects.filter(user=request.user).delete() # Remove all static backup token devices StaticDevice.objects.filter(user=request.user).delete() messages.success( request, "Zwei-Faktor-Authentifizierung wurde erfolgreich deaktiviert." ) return redirect("stiftung:home") else: messages.error(request, "Ungültiges Passwort.") context = { 'title': 'Zwei-Faktor-Authentifizierung deaktivieren' } return render(request, 'stiftung/auth/two_factor_disable.html', context) @login_required def backup_tokens(request): """Display or regenerate backup tokens""" static_device = StaticDevice.objects.filter(user=request.user).first() if request.method == "POST" and 'regenerate' in request.POST: password = request.POST.get('password', '') if request.user.check_password(password): # Delete old tokens if static_device: static_device.delete() # Generate new backup tokens static_device = StaticDevice.objects.create( user=request.user, name='backup' ) backup_tokens = [] for _ in range(10): # Generate 10 backup codes token_value = random_hex()[:8] # 8 character backup codes StaticToken.objects.create( device=static_device, token=token_value ) backup_tokens.append(token_value) messages.success( request, "Neue Backup-Codes wurden generiert. Bitte speichern Sie diese sicher." ) context = { 'backup_tokens': backup_tokens, 'title': 'Neue Backup-Codes' } return render(request, 'stiftung/auth/backup_tokens.html', context) else: messages.error(request, "Ungültiges Passwort.") # Show existing tokens (count only for security) token_count = 0 if static_device: token_count = static_device.token_set.count() context = { 'token_count': token_count, 'has_tokens': token_count > 0, 'title': 'Backup-Codes' } return render(request, 'stiftung/auth/backup_tokens_manage.html', context) # Geschichte (History) Views from stiftung.models import GeschichteSeite, GeschichteBild from stiftung.forms import GeschichteSeiteForm, GeschichteBildForm # ============================================================================= # Phase 4: GLOBALE SUCHE (Cmd+K) # ============================================================================= @login_required def globale_suche_api(request): """Phase 4: AJAX-Endpunkt für globale Suche über alle Bereiche.""" from django.http import JsonResponse from stiftung.models import ( Destinataer, Paechter, Land, LandVerpachtung, Foerderung, ) from stiftung.models.dokumente import DokumentDatei q = request.GET.get("q", "").strip() if len(q) < 2: return JsonResponse({"results": []}) results = [] # Destinatäre for d in Destinataer.objects.filter( Q(vorname__icontains=q) | Q(nachname__icontains=q) | Q(email__icontains=q) )[:5]: results.append({ "typ": "Destinatär", "titel": d.get_full_name(), "untertitel": d.email or "", "url": f"/destinataere/{d.pk}/", "icon": "fas fa-user", }) # Pächter for p in Paechter.objects.filter( Q(vorname__icontains=q) | Q(nachname__icontains=q) | Q(email__icontains=q) )[:5]: results.append({ "typ": "Pächter", "titel": p.get_full_name(), "untertitel": p.email or "", "url": f"/paechter/{p.pk}/", "icon": "fas fa-user-tie", }) # Ländereien for l in Land.objects.filter( Q(gemeinde__icontains=q) | Q(gemarkung__icontains=q) | Q(lfd_nr__icontains=q) )[:5]: results.append({ "typ": "Länderei", "titel": str(l), "untertitel": l.gemeinde or "", "url": f"/laendereien/{l.pk}/", "icon": "fas fa-map", }) # Förderungen – suche über Destinatär-Name oder Bemerkungen for f in Foerderung.objects.filter( Q(destinataer__vorname__icontains=q) | Q(destinataer__nachname__icontains=q) | Q(bemerkungen__icontains=q) ).select_related("destinataer", "person")[:5]: empfaenger = ( f.destinataer.get_full_name() if f.destinataer else (f.person.get_full_name() if f.person else "Unbekannt") ) results.append({ "typ": "Förderung", "titel": f"{empfaenger} ({f.jahr})", "untertitel": f"€{f.betrag} · {f.get_status_display()}", "url": f"/foerderungen/{f.pk}/", "icon": "fas fa-gift", }) # Dokumente (DMS) try: for dok in DokumentDatei.objects.filter( Q(titel__icontains=q) | Q(beschreibung__icontains=q) )[:5]: results.append({ "typ": "Dokument", "titel": dok.titel, "untertitel": dok.get_kontext_display(), "url": f"/dms/{dok.pk}/", "icon": "fas fa-file-alt", }) except Exception: pass return JsonResponse({"results": results, "query": q})