- Add django-otp and qrcode dependencies - Create comprehensive 2FA views and templates in German - Add 2FA setup, verification, and management interfaces - Implement backup token system with 10 recovery codes - Add TwoFactorMiddleware for session enforcement - Integrate 2FA controls into user navigation menu - Support QR code generation for authenticator apps - Add forms for secure 2FA operations with validation - Configure OTP settings and admin site integration Features: - Optional 2FA (users can enable/disable) - TOTP compatible with Google Authenticator, Authy, etc. - Backup codes for emergency access - German language interface - Session-based 2FA enforcement - Password confirmation for sensitive operations - Production-ready with HTTPS support
256 lines
7.9 KiB
Python
256 lines
7.9 KiB
Python
"""
|
|
Audit Middleware
|
|
Automatically tracks all model changes throughout the application
|
|
"""
|
|
|
|
import threading
|
|
|
|
from django.contrib.auth.signals import user_logged_in, user_logged_out
|
|
from django.db.models.signals import post_delete, post_save, pre_save
|
|
from django.dispatch import receiver
|
|
from django.http import HttpResponseRedirect
|
|
from django.shortcuts import reverse
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
from django_otp.plugins.otp_totp.models import TOTPDevice
|
|
|
|
from stiftung.audit import get_client_ip, log_action, track_model_changes
|
|
|
|
# Thread-local storage for request context
|
|
_local = threading.local()
|
|
|
|
|
|
class AuditMiddleware(MiddlewareMixin):
|
|
"""
|
|
Middleware that sets up request context for audit logging
|
|
"""
|
|
|
|
def process_request(self, request):
|
|
"""Store request in thread-local storage for access in signal handlers"""
|
|
_local.request = request
|
|
_local.user_changes = {} # Store pre-save state for change tracking
|
|
return None
|
|
|
|
def process_response(self, request, response):
|
|
"""Clean up thread-local storage"""
|
|
if hasattr(_local, "request"):
|
|
delattr(_local, "request")
|
|
if hasattr(_local, "user_changes"):
|
|
delattr(_local, "user_changes")
|
|
return response
|
|
|
|
|
|
class TwoFactorMiddleware(MiddlewareMixin):
|
|
"""
|
|
Middleware that enforces 2FA verification for users with 2FA enabled
|
|
"""
|
|
|
|
def process_request(self, request):
|
|
"""Check if user needs 2FA verification"""
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return None
|
|
|
|
# Skip for admin URLs and 2FA URLs themselves
|
|
if (request.path.startswith('/admin/') or
|
|
request.path.startswith('/two-factor/') or
|
|
request.path.startswith('/auth/2fa/') or
|
|
request.path == '/logout/' or
|
|
request.path.startswith('/static/') or
|
|
request.path.startswith('/media/')):
|
|
return None
|
|
|
|
# Check if user has 2FA enabled
|
|
has_2fa = TOTPDevice.objects.filter(user=request.user, confirmed=True).exists()
|
|
|
|
if has_2fa:
|
|
# Check if user has completed 2FA verification in this session
|
|
if not request.session.get('2fa_verified', False):
|
|
# Redirect to 2FA verification page
|
|
return HttpResponseRedirect(
|
|
reverse('stiftung:two_factor_verify') + f'?next={request.path}'
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def get_current_request():
|
|
"""Get the current request from thread-local storage"""
|
|
return getattr(_local, "request", None)
|
|
|
|
|
|
def get_entity_type_from_model(model):
|
|
"""Map Django model to audit entity type"""
|
|
model_name = model.__name__.lower()
|
|
|
|
mapping = {
|
|
"destinataer": "destinataer",
|
|
"land": "land",
|
|
"paechter": "paechter",
|
|
"verpachtung": "verpachtung",
|
|
"foerderung": "foerderung",
|
|
"rentmeister": "rentmeister",
|
|
"stiftungskonto": "stiftungskonto",
|
|
"verwaltungskosten": "verwaltungskosten",
|
|
"banktransaction": "banktransaction",
|
|
"dokumentlink": "dokumentlink",
|
|
"user": "user",
|
|
"person": "destinataer", # Legacy model maps to destinataer
|
|
}
|
|
|
|
return mapping.get(model_name, "unknown")
|
|
|
|
|
|
def get_entity_name(instance):
|
|
"""Get a human-readable name for an entity"""
|
|
if hasattr(instance, "get_full_name") and callable(instance.get_full_name):
|
|
return instance.get_full_name()
|
|
elif hasattr(instance, "__str__"):
|
|
return str(instance)
|
|
else:
|
|
return f"{instance.__class__.__name__} #{instance.pk}"
|
|
|
|
|
|
# Signal handlers for automatic audit logging
|
|
@receiver(pre_save)
|
|
def store_pre_save_state(sender, instance, **kwargs):
|
|
"""Store the pre-save state for change tracking"""
|
|
request = get_current_request()
|
|
if not request or not hasattr(request, "user"):
|
|
return
|
|
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return
|
|
|
|
# Skip audit log entries themselves to avoid infinite loops
|
|
if sender.__name__ == "AuditLog":
|
|
return
|
|
|
|
# Store the current state if this is an update
|
|
if instance.pk:
|
|
try:
|
|
old_instance = sender.objects.get(pk=instance.pk)
|
|
if not hasattr(_local, "user_changes"):
|
|
_local.user_changes = {}
|
|
_local.user_changes[instance.pk] = old_instance
|
|
except sender.DoesNotExist:
|
|
pass
|
|
|
|
|
|
@receiver(post_save)
|
|
def log_model_save(sender, instance, created, **kwargs):
|
|
"""Log model creation and updates"""
|
|
request = get_current_request()
|
|
if not request or not hasattr(request, "user"):
|
|
return
|
|
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return
|
|
|
|
# Skip audit log entries themselves to avoid infinite loops
|
|
if sender.__name__ == "AuditLog":
|
|
return
|
|
|
|
# Skip certain system models
|
|
if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]:
|
|
return
|
|
|
|
entity_type = get_entity_type_from_model(sender)
|
|
entity_name = get_entity_name(instance)
|
|
entity_id = str(instance.pk)
|
|
|
|
if created:
|
|
# Log creation
|
|
description = f"Neue {entity_type.replace('_', ' ').title()} '{entity_name}' wurde erstellt"
|
|
log_action(
|
|
request=request,
|
|
action="create",
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
)
|
|
else:
|
|
# Log update with changes
|
|
changes = {}
|
|
if hasattr(_local, "user_changes") and instance.pk in _local.user_changes:
|
|
old_instance = _local.user_changes[instance.pk]
|
|
changes = track_model_changes(old_instance, instance)
|
|
|
|
if changes: # Only log if there are actual changes
|
|
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde aktualisiert"
|
|
log_action(
|
|
request=request,
|
|
action="update",
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
changes=changes,
|
|
)
|
|
|
|
|
|
@receiver(post_delete)
|
|
def log_model_delete(sender, instance, **kwargs):
|
|
"""Log model deletion"""
|
|
request = get_current_request()
|
|
if not request or not hasattr(request, "user"):
|
|
return
|
|
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return
|
|
|
|
# Skip audit log entries themselves
|
|
if sender.__name__ == "AuditLog":
|
|
return
|
|
|
|
# Skip certain system models
|
|
if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]:
|
|
return
|
|
|
|
entity_type = get_entity_type_from_model(sender)
|
|
entity_name = get_entity_name(instance)
|
|
entity_id = str(instance.pk)
|
|
|
|
description = (
|
|
f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde gelöscht"
|
|
)
|
|
log_action(
|
|
request=request,
|
|
action="delete",
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
)
|
|
|
|
|
|
# Authentication logging
|
|
@receiver(user_logged_in)
|
|
def log_user_login(sender, request, user, **kwargs):
|
|
"""Log user login"""
|
|
log_action(
|
|
request=request,
|
|
action="login",
|
|
entity_type="user",
|
|
entity_id=str(user.pk),
|
|
entity_name=user.username,
|
|
description=f"Benutzer {user.username} hat sich angemeldet",
|
|
)
|
|
|
|
|
|
@receiver(user_logged_out)
|
|
def log_user_logout(sender, request, user, **kwargs):
|
|
"""Log user logout"""
|
|
if user: # user might be None if session expired
|
|
log_action(
|
|
request=request,
|
|
action="logout",
|
|
entity_type="user",
|
|
entity_id=str(user.pk),
|
|
entity_name=user.username,
|
|
description=f"Benutzer {user.username} hat sich abgemeldet",
|
|
)
|