Files
stiftung-management-system/app/stiftung/audit.py
Stiftung Development e0c7d0e351 Format code with Black and isort for CI/CD compliance
- Apply Black formatting to all Python files in core and stiftung modules
- Fix import statement ordering with isort
- Ensure all code meets automated quality standards
- Resolve CI/CD pipeline formatting failures
- Maintain consistent code style across the entire codebase
2025-09-06 21:04:07 +02:00

308 lines
8.8 KiB
Python

"""
Audit Logging Utilities
Provides functions to log user actions throughout the application
"""
import json
from django.contrib.auth import get_user_model
from django.utils import timezone
from stiftung.models import AuditLog
User = get_user_model()
def get_client_ip(request):
"""Extract the client IP address from the request"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
def log_action(
request, action, entity_type, entity_id, entity_name, description, changes=None
):
"""
Log a user action to the audit log
Args:
request: Django request object
action: Action type (create, update, delete, etc.)
entity_type: Type of entity (destinataer, land, etc.)
entity_id: ID of the entity
entity_name: Human-readable name of the entity
description: Description of the action
changes: Dictionary of field changes (optional)
"""
user = request.user if request.user.is_authenticated else None
username = user.username if user else "Anonymous"
# Get request metadata
ip_address = get_client_ip(request)
user_agent = request.META.get("HTTP_USER_AGENT", "")
session_key = request.session.session_key if hasattr(request, "session") else ""
# Create audit log entry
audit_entry = AuditLog.objects.create(
user=user,
username=username,
action=action,
entity_type=entity_type,
entity_id=str(entity_id) if entity_id else "",
entity_name=entity_name,
description=description,
changes=changes,
ip_address=ip_address,
user_agent=user_agent[:500], # Truncate to avoid very long user agents
session_key=session_key,
)
return audit_entry
def log_create(request, entity_type, entity_id, entity_name, description=None):
"""Log entity creation"""
if not description:
description = f"Neue {entity_type.replace('_', ' ').title()} '{entity_name}' wurde erstellt"
return log_action(
request=request,
action="create",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
)
def log_update(request, entity_type, entity_id, entity_name, changes, description=None):
"""Log entity update with field changes"""
if not description:
changed_fields = list(changes.keys()) if changes else []
fields_str = ", ".join(changed_fields)
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde aktualisiert: {fields_str}"
return log_action(
request=request,
action="update",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
changes=changes,
)
def log_delete(request, entity_type, entity_id, entity_name, description=None):
"""Log entity deletion"""
if not description:
description = (
f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde gelöscht"
)
return log_action(
request=request,
action="delete",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
)
def log_link(
request,
entity_type,
entity_id,
entity_name,
target_type,
target_name,
description=None,
):
"""Log entity linking"""
if not description:
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde mit {target_type.replace('_', ' ')} '{target_name}' verknüpft"
return log_action(
request=request,
action="link",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
)
def log_unlink(
request,
entity_type,
entity_id,
entity_name,
target_type,
target_name,
description=None,
):
"""Log entity unlinking"""
if not description:
description = f"Verknüpfung zwischen {entity_type.replace('_', ' ').title()} '{entity_name}' und {target_type.replace('_', ' ')} '{target_name}' wurde entfernt"
return log_action(
request=request,
action="unlink",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
description=description,
)
def log_system_action(request, action, description, details=None):
"""Log system-level actions like backup, restore, etc."""
return log_action(
request=request,
action=action,
entity_type="system",
entity_id="",
entity_name="System",
description=description,
changes=details,
)
def track_model_changes(old_instance, new_instance, exclude_fields=None):
"""
Track changes between model instances
Args:
old_instance: Original model instance
new_instance: Updated model instance
exclude_fields: List of fields to exclude from tracking
Returns:
Dictionary of changes in format {field: {'old': old_value, 'new': new_value}}
"""
if exclude_fields is None:
exclude_fields = [
"id",
"erstellt_am",
"aktualisiert_am",
"created_at",
"updated_at",
]
changes = {}
if old_instance and new_instance:
for field in new_instance._meta.fields:
field_name = field.name
if field_name in exclude_fields:
continue
old_value = getattr(old_instance, field_name, None)
new_value = getattr(new_instance, field_name, None)
# Convert to string for comparison
old_str = str(old_value) if old_value is not None else None
new_str = str(new_value) if new_value is not None else None
if old_str != new_str:
changes[field_name] = {"old": old_str, "new": new_str}
return changes
class AuditLogMixin:
"""
Mixin for views that provides audit logging functionality
"""
audit_entity_type = None
audit_entity_name_field = "name"
def get_audit_entity_type(self):
"""Get the entity type for audit logging"""
if self.audit_entity_type:
return self.audit_entity_type
# Try to derive from model name
if hasattr(self, "model") and self.model:
return self.model.__name__.lower()
return "unknown"
def get_audit_entity_name(self, instance):
"""Get the entity name for audit logging"""
if hasattr(instance, self.audit_entity_name_field):
return str(getattr(instance, self.audit_entity_name_field))
elif hasattr(instance, "__str__"):
return str(instance)
else:
return f"{self.get_audit_entity_type()} #{instance.pk}"
def log_create_action(self, instance):
"""Log creation of an instance"""
log_create(
request=self.request,
entity_type=self.get_audit_entity_type(),
entity_id=instance.pk,
entity_name=self.get_audit_entity_name(instance),
)
def log_update_action(self, old_instance, new_instance):
"""Log update of an instance"""
changes = track_model_changes(old_instance, new_instance)
if changes: # Only log if there are actual changes
log_update(
request=self.request,
entity_type=self.get_audit_entity_type(),
entity_id=new_instance.pk,
entity_name=self.get_audit_entity_name(new_instance),
changes=changes,
)
def log_delete_action(self, instance):
"""Log deletion of an instance"""
log_delete(
request=self.request,
entity_type=self.get_audit_entity_type(),
entity_id=instance.pk,
entity_name=self.get_audit_entity_name(instance),
)
# Simple login/logout audit helpers expected by views
def log_login(request, user):
"""Log a successful user login."""
try:
return log_action(
request=request,
action="login",
entity_type="user",
entity_id=user.pk,
entity_name=user.get_username(),
description=f"User '{user.get_username()}' logged in",
)
except Exception:
return None
def log_logout(request, user):
"""Log a successful user logout."""
try:
username = user.get_username() if user else "Unknown"
return log_action(
request=request,
action="logout",
entity_type="user",
entity_id=getattr(user, "pk", ""),
entity_name=username,
description=f"User '{username}' logged out",
)
except Exception:
return None