Files
stiftung-management-system/app/stiftung/middleware.py
Jan Remmer Siebels ed6a02232e feat: Implement TOTP-based Two-Factor Authentication
- Add django-otp and qrcode dependencies
- Create comprehensive 2FA views and templates in German
- Add 2FA setup, verification, and management interfaces
- Implement backup token system with 10 recovery codes
- Add TwoFactorMiddleware for session enforcement
- Integrate 2FA controls into user navigation menu
- Support QR code generation for authenticator apps
- Add forms for secure 2FA operations with validation
- Configure OTP settings and admin site integration

Features:
- Optional 2FA (users can enable/disable)
- TOTP compatible with Google Authenticator, Authy, etc.
- Backup codes for emergency access
- German language interface
- Session-based 2FA enforcement
- Password confirmation for sensitive operations
- Production-ready with HTTPS support
2025-09-30 00:10:02 +02:00

256 lines
7.9 KiB
Python

"""
Audit Middleware
Automatically tracks all model changes throughout the application
"""
import threading
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.db.models.signals import post_delete, post_save, pre_save
from django.dispatch import receiver
from django.http import HttpResponseRedirect
from django.shortcuts import reverse
from django.utils.deprecation import MiddlewareMixin
from django_otp.plugins.otp_totp.models import TOTPDevice
from stiftung.audit import get_client_ip, log_action, track_model_changes
# Thread-local storage for request context
_local = threading.local()
class AuditMiddleware(MiddlewareMixin):
"""
Middleware that sets up request context for audit logging
"""
def process_request(self, request):
"""Store request in thread-local storage for access in signal handlers"""
_local.request = request
_local.user_changes = {} # Store pre-save state for change tracking
return None
def process_response(self, request, response):
"""Clean up thread-local storage"""
if hasattr(_local, "request"):
delattr(_local, "request")
if hasattr(_local, "user_changes"):
delattr(_local, "user_changes")
return response
class TwoFactorMiddleware(MiddlewareMixin):
"""
Middleware that enforces 2FA verification for users with 2FA enabled
"""
def process_request(self, request):
"""Check if user needs 2FA verification"""
# Skip if user is not authenticated
if not request.user.is_authenticated:
return None
# Skip for admin URLs and 2FA URLs themselves
if (request.path.startswith('/admin/') or
request.path.startswith('/two-factor/') or
request.path.startswith('/auth/2fa/') or
request.path == '/logout/' or
request.path.startswith('/static/') or
request.path.startswith('/media/')):
return None
# Check if user has 2FA enabled
has_2fa = TOTPDevice.objects.filter(user=request.user, confirmed=True).exists()
if has_2fa:
# Check if user has completed 2FA verification in this session
if not request.session.get('2fa_verified', False):
# Redirect to 2FA verification page
return HttpResponseRedirect(
reverse('stiftung:two_factor_verify') + f'?next={request.path}'
)
return None
def get_current_request():
"""Get the current request from thread-local storage"""
return getattr(_local, "request", None)
def get_entity_type_from_model(model):
"""Map Django model to audit entity type"""
model_name = model.__name__.lower()
mapping = {
"destinataer": "destinataer",
"land": "land",
"paechter": "paechter",
"verpachtung": "verpachtung",
"foerderung": "foerderung",
"rentmeister": "rentmeister",
"stiftungskonto": "stiftungskonto",
"verwaltungskosten": "verwaltungskosten",
"banktransaction": "banktransaction",
"dokumentlink": "dokumentlink",
"user": "user",
"person": "destinataer", # Legacy model maps to destinataer
}
return mapping.get(model_name, "unknown")
def get_entity_name(instance):
"""Get a human-readable name for an entity"""
if hasattr(instance, "get_full_name") and callable(instance.get_full_name):
return instance.get_full_name()
elif hasattr(instance, "__str__"):
return str(instance)
else:
return f"{instance.__class__.__name__} #{instance.pk}"
# Signal handlers for automatic audit logging
@receiver(pre_save)
def store_pre_save_state(sender, instance, **kwargs):
"""Store the pre-save state for change tracking"""
request = get_current_request()
if not request or not hasattr(request, "user"):
return
# Skip if user is not authenticated
if not request.user.is_authenticated:
return
# Skip audit log entries themselves to avoid infinite loops
if sender.__name__ == "AuditLog":
return
# Store the current state if this is an update
if instance.pk:
try:
old_instance = sender.objects.get(pk=instance.pk)
if not hasattr(_local, "user_changes"):
_local.user_changes = {}
_local.user_changes[instance.pk] = old_instance
except sender.DoesNotExist:
pass
@receiver(post_save)
def log_model_save(sender, instance, created, **kwargs):
"""Log model creation and updates"""
request = get_current_request()
if not request or not hasattr(request, "user"):
return
# Skip if user is not authenticated
if not request.user.is_authenticated:
return
# Skip audit log entries themselves to avoid infinite loops
if sender.__name__ == "AuditLog":
return
# Skip certain system models
if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]:
return
entity_type = get_entity_type_from_model(sender)
entity_name = get_entity_name(instance)
entity_id = str(instance.pk)
if created:
# Log creation
description = f"Neue {entity_type.replace('_', ' ').title()} '{entity_name}' wurde erstellt"
log_action(
request=request,
action="create",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
)
else:
# Log update with changes
changes = {}
if hasattr(_local, "user_changes") and instance.pk in _local.user_changes:
old_instance = _local.user_changes[instance.pk]
changes = track_model_changes(old_instance, instance)
if changes: # Only log if there are actual changes
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde aktualisiert"
log_action(
request=request,
action="update",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
changes=changes,
)
@receiver(post_delete)
def log_model_delete(sender, instance, **kwargs):
"""Log model deletion"""
request = get_current_request()
if not request or not hasattr(request, "user"):
return
# Skip if user is not authenticated
if not request.user.is_authenticated:
return
# Skip audit log entries themselves
if sender.__name__ == "AuditLog":
return
# Skip certain system models
if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]:
return
entity_type = get_entity_type_from_model(sender)
entity_name = get_entity_name(instance)
entity_id = str(instance.pk)
description = (
f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde gelöscht"
)
log_action(
request=request,
action="delete",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
)
# Authentication logging
@receiver(user_logged_in)
def log_user_login(sender, request, user, **kwargs):
"""Log user login"""
log_action(
request=request,
action="login",
entity_type="user",
entity_id=str(user.pk),
entity_name=user.username,
description=f"Benutzer {user.username} hat sich angemeldet",
)
@receiver(user_logged_out)
def log_user_logout(sender, request, user, **kwargs):
"""Log user logout"""
if user: # user might be None if session expired
log_action(
request=request,
action="logout",
entity_type="user",
entity_id=str(user.pk),
entity_name=user.username,
description=f"Benutzer {user.username} hat sich abgemeldet",
)