- Apply Black formatting to all Python files in core and stiftung modules - Fix import statement ordering with isort - Ensure all code meets automated quality standards - Resolve CI/CD pipeline formatting failures - Maintain consistent code style across the entire codebase
219 lines
6.6 KiB
Python
219 lines
6.6 KiB
Python
"""
|
|
Audit Middleware
|
|
Automatically tracks all model changes throughout the application
|
|
"""
|
|
|
|
import threading
|
|
|
|
from django.contrib.auth.signals import user_logged_in, user_logged_out
|
|
from django.db.models.signals import post_delete, post_save, pre_save
|
|
from django.dispatch import receiver
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
|
|
from stiftung.audit import get_client_ip, log_action, track_model_changes
|
|
|
|
# Thread-local storage for request context
|
|
_local = threading.local()
|
|
|
|
|
|
class AuditMiddleware(MiddlewareMixin):
|
|
"""
|
|
Middleware that sets up request context for audit logging
|
|
"""
|
|
|
|
def process_request(self, request):
|
|
"""Store request in thread-local storage for access in signal handlers"""
|
|
_local.request = request
|
|
_local.user_changes = {} # Store pre-save state for change tracking
|
|
return None
|
|
|
|
def process_response(self, request, response):
|
|
"""Clean up thread-local storage"""
|
|
if hasattr(_local, "request"):
|
|
delattr(_local, "request")
|
|
if hasattr(_local, "user_changes"):
|
|
delattr(_local, "user_changes")
|
|
return response
|
|
|
|
|
|
def get_current_request():
|
|
"""Get the current request from thread-local storage"""
|
|
return getattr(_local, "request", None)
|
|
|
|
|
|
def get_entity_type_from_model(model):
|
|
"""Map Django model to audit entity type"""
|
|
model_name = model.__name__.lower()
|
|
|
|
mapping = {
|
|
"destinataer": "destinataer",
|
|
"land": "land",
|
|
"paechter": "paechter",
|
|
"verpachtung": "verpachtung",
|
|
"foerderung": "foerderung",
|
|
"rentmeister": "rentmeister",
|
|
"stiftungskonto": "stiftungskonto",
|
|
"verwaltungskosten": "verwaltungskosten",
|
|
"banktransaction": "banktransaction",
|
|
"dokumentlink": "dokumentlink",
|
|
"user": "user",
|
|
"person": "destinataer", # Legacy model maps to destinataer
|
|
}
|
|
|
|
return mapping.get(model_name, "unknown")
|
|
|
|
|
|
def get_entity_name(instance):
|
|
"""Get a human-readable name for an entity"""
|
|
if hasattr(instance, "get_full_name") and callable(instance.get_full_name):
|
|
return instance.get_full_name()
|
|
elif hasattr(instance, "__str__"):
|
|
return str(instance)
|
|
else:
|
|
return f"{instance.__class__.__name__} #{instance.pk}"
|
|
|
|
|
|
# Signal handlers for automatic audit logging
|
|
@receiver(pre_save)
|
|
def store_pre_save_state(sender, instance, **kwargs):
|
|
"""Store the pre-save state for change tracking"""
|
|
request = get_current_request()
|
|
if not request or not hasattr(request, "user"):
|
|
return
|
|
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return
|
|
|
|
# Skip audit log entries themselves to avoid infinite loops
|
|
if sender.__name__ == "AuditLog":
|
|
return
|
|
|
|
# Store the current state if this is an update
|
|
if instance.pk:
|
|
try:
|
|
old_instance = sender.objects.get(pk=instance.pk)
|
|
if not hasattr(_local, "user_changes"):
|
|
_local.user_changes = {}
|
|
_local.user_changes[instance.pk] = old_instance
|
|
except sender.DoesNotExist:
|
|
pass
|
|
|
|
|
|
@receiver(post_save)
|
|
def log_model_save(sender, instance, created, **kwargs):
|
|
"""Log model creation and updates"""
|
|
request = get_current_request()
|
|
if not request or not hasattr(request, "user"):
|
|
return
|
|
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return
|
|
|
|
# Skip audit log entries themselves to avoid infinite loops
|
|
if sender.__name__ == "AuditLog":
|
|
return
|
|
|
|
# Skip certain system models
|
|
if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]:
|
|
return
|
|
|
|
entity_type = get_entity_type_from_model(sender)
|
|
entity_name = get_entity_name(instance)
|
|
entity_id = str(instance.pk)
|
|
|
|
if created:
|
|
# Log creation
|
|
description = f"Neue {entity_type.replace('_', ' ').title()} '{entity_name}' wurde erstellt"
|
|
log_action(
|
|
request=request,
|
|
action="create",
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
)
|
|
else:
|
|
# Log update with changes
|
|
changes = {}
|
|
if hasattr(_local, "user_changes") and instance.pk in _local.user_changes:
|
|
old_instance = _local.user_changes[instance.pk]
|
|
changes = track_model_changes(old_instance, instance)
|
|
|
|
if changes: # Only log if there are actual changes
|
|
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde aktualisiert"
|
|
log_action(
|
|
request=request,
|
|
action="update",
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
changes=changes,
|
|
)
|
|
|
|
|
|
@receiver(post_delete)
|
|
def log_model_delete(sender, instance, **kwargs):
|
|
"""Log model deletion"""
|
|
request = get_current_request()
|
|
if not request or not hasattr(request, "user"):
|
|
return
|
|
|
|
# Skip if user is not authenticated
|
|
if not request.user.is_authenticated:
|
|
return
|
|
|
|
# Skip audit log entries themselves
|
|
if sender.__name__ == "AuditLog":
|
|
return
|
|
|
|
# Skip certain system models
|
|
if sender.__name__ in ["Session", "LogEntry", "ContentType", "Permission"]:
|
|
return
|
|
|
|
entity_type = get_entity_type_from_model(sender)
|
|
entity_name = get_entity_name(instance)
|
|
entity_id = str(instance.pk)
|
|
|
|
description = (
|
|
f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde gelöscht"
|
|
)
|
|
log_action(
|
|
request=request,
|
|
action="delete",
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
)
|
|
|
|
|
|
# Authentication logging
|
|
@receiver(user_logged_in)
|
|
def log_user_login(sender, request, user, **kwargs):
|
|
"""Log user login"""
|
|
log_action(
|
|
request=request,
|
|
action="login",
|
|
entity_type="user",
|
|
entity_id=str(user.pk),
|
|
entity_name=user.username,
|
|
description=f"Benutzer {user.username} hat sich angemeldet",
|
|
)
|
|
|
|
|
|
@receiver(user_logged_out)
|
|
def log_user_logout(sender, request, user, **kwargs):
|
|
"""Log user logout"""
|
|
if user: # user might be None if session expired
|
|
log_action(
|
|
request=request,
|
|
action="logout",
|
|
entity_type="user",
|
|
entity_id=str(user.pk),
|
|
entity_name=user.username,
|
|
description=f"Benutzer {user.username} hat sich abgemeldet",
|
|
)
|