""" Audit Middleware Automatically tracks all model changes throughout the application """ import threading from django.contrib.auth.signals import user_logged_in, user_logged_out from django.db.models.signals import post_delete, post_save, pre_save from django.dispatch import receiver from django.http import HttpResponseRedirect from django.shortcuts import reverse from django.utils.deprecation import MiddlewareMixin from django_otp.plugins.otp_totp.models import TOTPDevice from stiftung.audit import get_client_ip, log_action, track_model_changes # Thread-local storage for request context _local = threading.local() class AuditMiddleware(MiddlewareMixin): """ Middleware that sets up request context for audit logging """ def process_request(self, request): """Store request in thread-local storage for access in signal handlers""" _local.request = request _local.user_changes = {} # Store pre-save state for change tracking return None def process_response(self, request, response): """Clean up thread-local storage""" if hasattr(_local, "request"): delattr(_local, "request") if hasattr(_local, "user_changes"): delattr(_local, "user_changes") return response class TwoFactorMiddleware(MiddlewareMixin): """ Middleware that enforces 2FA verification for users with 2FA enabled """ def process_request(self, request): """Check if user needs 2FA verification""" # Skip if user is not authenticated if not request.user.is_authenticated: return None # Skip for admin URLs and 2FA URLs themselves if (request.path.startswith('/admin/') or request.path.startswith('/two-factor/') or request.path.startswith('/auth/2fa/') or request.path == '/logout/' or request.path.startswith('/static/') or request.path.startswith('/media/')): return None # Check if user has 2FA enabled has_2fa = TOTPDevice.objects.filter(user=request.user, confirmed=True).exists() if has_2fa: # Check if user has completed 2FA verification in this session if not request.session.get('2fa_verified', False): # Redirect to 2FA verification page return HttpResponseRedirect( reverse('stiftung:two_factor_verify') + f'?next={request.path}' ) return None def get_current_request(): """Get the current request from thread-local storage""" return getattr(_local, "request", None) def get_entity_type_from_model(model): """Map Django model to audit entity type""" model_name = model.__name__.lower() mapping = { "destinataer": "destinataer", "land": "land", "paechter": "paechter", "verpachtung": "verpachtung", "foerderung": "foerderung", "rentmeister": "rentmeister", "stiftungskonto": "stiftungskonto", "verwaltungskosten": "verwaltungskosten", "banktransaction": "banktransaction", "dokumentlink": "dokumentlink", "user": "user", "person": "destinataer", # Legacy model maps to destinataer } return mapping.get(model_name, "unknown") def get_entity_name(instance): """Get a human-readable name for an entity""" if hasattr(instance, "get_full_name") and callable(instance.get_full_name): return instance.get_full_name() elif hasattr(instance, "__str__"): return str(instance) else: return f"{instance.__class__.__name__} #{instance.pk}" # Signal handlers for automatic audit logging @receiver(pre_save) def store_pre_save_state(sender, instance, **kwargs): """Store the pre-save state for change tracking""" request = get_current_request() if not request or not hasattr(request, "user"): return # Skip if user is not authenticated if not request.user.is_authenticated: return # Skip audit log entries themselves to avoid infinite loops if sender.__name__ == "AuditLog": return # Store the current state if this is an update if instance.pk: try: old_instance = sender.objects.get(pk=instance.pk) if not hasattr(_local, "user_changes"): _local.user_changes = {} _local.user_changes[instance.pk] = old_instance except sender.DoesNotExist: pass @receiver(post_save) def log_model_save(sender, instance, created, **kwargs): """Log model creation and updates""" request = get_current_request() if not request or not hasattr(request, "user"): return # Skip if user is not authenticated if not request.user.is_authenticated: return # Skip audit log entries themselves to avoid infinite loops if sender.__name__ == "AuditLog": return # Skip certain system models if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]: return entity_type = get_entity_type_from_model(sender) entity_name = get_entity_name(instance) entity_id = str(instance.pk) if created: # Log creation description = f"Neue {entity_type.replace('_', ' ').title()} '{entity_name}' wurde erstellt" log_action( request=request, action="create", entity_type=entity_type, entity_id=entity_id, entity_name=entity_name, description=description, ) else: # Log update with changes changes = {} if hasattr(_local, "user_changes") and instance.pk in _local.user_changes: old_instance = _local.user_changes[instance.pk] changes = track_model_changes(old_instance, instance) if changes: # Only log if there are actual changes description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde aktualisiert" log_action( request=request, action="update", entity_type=entity_type, entity_id=entity_id, entity_name=entity_name, description=description, changes=changes, ) @receiver(post_delete) def log_model_delete(sender, instance, **kwargs): """Log model deletion""" request = get_current_request() if not request or not hasattr(request, "user"): return # Skip if user is not authenticated if not request.user.is_authenticated: return # Skip audit log entries themselves if sender.__name__ == "AuditLog": return # Skip certain system models if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]: return entity_type = get_entity_type_from_model(sender) entity_name = get_entity_name(instance) entity_id = str(instance.pk) description = ( f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde gelöscht" ) log_action( request=request, action="delete", entity_type=entity_type, entity_id=entity_id, entity_name=entity_name, description=description, ) # Authentication logging @receiver(user_logged_in) def log_user_login(sender, request, user, **kwargs): """Log user login""" log_action( request=request, action="login", entity_type="user", entity_id=str(user.pk), entity_name=user.username, description=f"Benutzer {user.username} hat sich angemeldet", ) @receiver(user_logged_out) def log_user_logout(sender, request, user, **kwargs): """Log user logout""" if user: # user might be None if session expired log_action( request=request, action="logout", entity_type="user", entity_id=str(user.pk), entity_name=user.username, description=f"Benutzer {user.username} hat sich abgemeldet", )