282 lines
8.9 KiB
Python
282 lines
8.9 KiB
Python
"""
|
|
Audit Logging Utilities
|
|
Provides functions to log user actions throughout the application
|
|
"""
|
|
|
|
import json
|
|
from django.utils import timezone
|
|
from django.contrib.auth import get_user_model
|
|
from stiftung.models import AuditLog
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
def get_client_ip(request):
|
|
"""Extract the client IP address from the request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
def log_action(request, action, entity_type, entity_id, entity_name, description, changes=None):
|
|
"""
|
|
Log a user action to the audit log
|
|
|
|
Args:
|
|
request: Django request object
|
|
action: Action type (create, update, delete, etc.)
|
|
entity_type: Type of entity (destinataer, land, etc.)
|
|
entity_id: ID of the entity
|
|
entity_name: Human-readable name of the entity
|
|
description: Description of the action
|
|
changes: Dictionary of field changes (optional)
|
|
"""
|
|
user = request.user if request.user.is_authenticated else None
|
|
username = user.username if user else 'Anonymous'
|
|
|
|
# Get request metadata
|
|
ip_address = get_client_ip(request)
|
|
user_agent = request.META.get('HTTP_USER_AGENT', '')
|
|
session_key = request.session.session_key if hasattr(request, 'session') else ''
|
|
|
|
# Create audit log entry
|
|
audit_entry = AuditLog.objects.create(
|
|
user=user,
|
|
username=username,
|
|
action=action,
|
|
entity_type=entity_type,
|
|
entity_id=str(entity_id) if entity_id else '',
|
|
entity_name=entity_name,
|
|
description=description,
|
|
changes=changes,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent[:500], # Truncate to avoid very long user agents
|
|
session_key=session_key
|
|
)
|
|
|
|
return audit_entry
|
|
|
|
|
|
def log_create(request, entity_type, entity_id, entity_name, description=None):
|
|
"""Log entity creation"""
|
|
if not description:
|
|
description = f"Neue {entity_type.replace('_', ' ').title()} '{entity_name}' wurde erstellt"
|
|
|
|
return log_action(
|
|
request=request,
|
|
action='create',
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description
|
|
)
|
|
|
|
|
|
def log_update(request, entity_type, entity_id, entity_name, changes, description=None):
|
|
"""Log entity update with field changes"""
|
|
if not description:
|
|
changed_fields = list(changes.keys()) if changes else []
|
|
fields_str = ", ".join(changed_fields)
|
|
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde aktualisiert: {fields_str}"
|
|
|
|
return log_action(
|
|
request=request,
|
|
action='update',
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description,
|
|
changes=changes
|
|
)
|
|
|
|
|
|
def log_delete(request, entity_type, entity_id, entity_name, description=None):
|
|
"""Log entity deletion"""
|
|
if not description:
|
|
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde gelöscht"
|
|
|
|
return log_action(
|
|
request=request,
|
|
action='delete',
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description
|
|
)
|
|
|
|
|
|
def log_link(request, entity_type, entity_id, entity_name, target_type, target_name, description=None):
|
|
"""Log entity linking"""
|
|
if not description:
|
|
description = f"{entity_type.replace('_', ' ').title()} '{entity_name}' wurde mit {target_type.replace('_', ' ')} '{target_name}' verknüpft"
|
|
|
|
return log_action(
|
|
request=request,
|
|
action='link',
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description
|
|
)
|
|
|
|
|
|
def log_unlink(request, entity_type, entity_id, entity_name, target_type, target_name, description=None):
|
|
"""Log entity unlinking"""
|
|
if not description:
|
|
description = f"Verknüpfung zwischen {entity_type.replace('_', ' ').title()} '{entity_name}' und {target_type.replace('_', ' ')} '{target_name}' wurde entfernt"
|
|
|
|
return log_action(
|
|
request=request,
|
|
action='unlink',
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
description=description
|
|
)
|
|
|
|
|
|
def log_system_action(request, action, description, details=None):
|
|
"""Log system-level actions like backup, restore, etc."""
|
|
return log_action(
|
|
request=request,
|
|
action=action,
|
|
entity_type='system',
|
|
entity_id='',
|
|
entity_name='System',
|
|
description=description,
|
|
changes=details
|
|
)
|
|
|
|
|
|
def track_model_changes(old_instance, new_instance, exclude_fields=None):
|
|
"""
|
|
Track changes between model instances
|
|
|
|
Args:
|
|
old_instance: Original model instance
|
|
new_instance: Updated model instance
|
|
exclude_fields: List of fields to exclude from tracking
|
|
|
|
Returns:
|
|
Dictionary of changes in format {field: {'old': old_value, 'new': new_value}}
|
|
"""
|
|
if exclude_fields is None:
|
|
exclude_fields = ['id', 'erstellt_am', 'aktualisiert_am', 'created_at', 'updated_at']
|
|
|
|
changes = {}
|
|
|
|
if old_instance and new_instance:
|
|
for field in new_instance._meta.fields:
|
|
field_name = field.name
|
|
|
|
if field_name in exclude_fields:
|
|
continue
|
|
|
|
old_value = getattr(old_instance, field_name, None)
|
|
new_value = getattr(new_instance, field_name, None)
|
|
|
|
# Convert to string for comparison
|
|
old_str = str(old_value) if old_value is not None else None
|
|
new_str = str(new_value) if new_value is not None else None
|
|
|
|
if old_str != new_str:
|
|
changes[field_name] = {
|
|
'old': old_str,
|
|
'new': new_str
|
|
}
|
|
|
|
return changes
|
|
|
|
|
|
class AuditLogMixin:
|
|
"""
|
|
Mixin for views that provides audit logging functionality
|
|
"""
|
|
audit_entity_type = None
|
|
audit_entity_name_field = 'name'
|
|
|
|
def get_audit_entity_type(self):
|
|
"""Get the entity type for audit logging"""
|
|
if self.audit_entity_type:
|
|
return self.audit_entity_type
|
|
|
|
# Try to derive from model name
|
|
if hasattr(self, 'model') and self.model:
|
|
return self.model.__name__.lower()
|
|
|
|
return 'unknown'
|
|
|
|
def get_audit_entity_name(self, instance):
|
|
"""Get the entity name for audit logging"""
|
|
if hasattr(instance, self.audit_entity_name_field):
|
|
return str(getattr(instance, self.audit_entity_name_field))
|
|
elif hasattr(instance, '__str__'):
|
|
return str(instance)
|
|
else:
|
|
return f"{self.get_audit_entity_type()} #{instance.pk}"
|
|
|
|
def log_create_action(self, instance):
|
|
"""Log creation of an instance"""
|
|
log_create(
|
|
request=self.request,
|
|
entity_type=self.get_audit_entity_type(),
|
|
entity_id=instance.pk,
|
|
entity_name=self.get_audit_entity_name(instance)
|
|
)
|
|
|
|
def log_update_action(self, old_instance, new_instance):
|
|
"""Log update of an instance"""
|
|
changes = track_model_changes(old_instance, new_instance)
|
|
if changes: # Only log if there are actual changes
|
|
log_update(
|
|
request=self.request,
|
|
entity_type=self.get_audit_entity_type(),
|
|
entity_id=new_instance.pk,
|
|
entity_name=self.get_audit_entity_name(new_instance),
|
|
changes=changes
|
|
)
|
|
|
|
def log_delete_action(self, instance):
|
|
"""Log deletion of an instance"""
|
|
log_delete(
|
|
request=self.request,
|
|
entity_type=self.get_audit_entity_type(),
|
|
entity_id=instance.pk,
|
|
entity_name=self.get_audit_entity_name(instance)
|
|
)
|
|
|
|
|
|
# Simple login/logout audit helpers expected by views
|
|
def log_login(request, user):
|
|
"""Log a successful user login."""
|
|
try:
|
|
return log_action(
|
|
request=request,
|
|
action='login',
|
|
entity_type='user',
|
|
entity_id=user.pk,
|
|
entity_name=user.get_username(),
|
|
description=f"User '{user.get_username()}' logged in"
|
|
)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def log_logout(request, user):
|
|
"""Log a successful user logout."""
|
|
try:
|
|
username = user.get_username() if user else 'Unknown'
|
|
return log_action(
|
|
request=request,
|
|
action='logout',
|
|
entity_type='user',
|
|
entity_id=getattr(user, 'pk', ''),
|
|
entity_name=username,
|
|
description=f"User '{username}' logged out"
|
|
)
|
|
except Exception:
|
|
return None
|