""" Audit-Integration für MCP-Aktionen. Alle MCP-Aktionen werden im bestehenden AuditLog erfasst. Da MCP kein HTTP-Request-Objekt hat, werden Felder direkt gesetzt: - user_agent = "MCP/" - session_key = "mcp" - ip_address = None """ from __future__ import annotations def log_mcp_action( role: str, action: str, entity_type: str, entity_id: str, entity_name: str, description: str, changes: dict | None = None, ) -> None: """ Schreibt einen Audit-Log-Eintrag für eine MCP-Aktion. Args: role: Aktuelle MCP-Rolle ("readonly", "editor", "admin") action: Aktionstyp (aus AuditLog.ACTION_TYPES) entity_type: Entitätstyp (aus AuditLog.ENTITY_TYPES oder freier Text) entity_id: ID der Entität entity_name: Lesbarer Name der Entität description: Beschreibung der Aktion changes: Optionales Dict mit Änderungen """ # Import hier, damit Django bereits initialisiert ist wenn diese Funktion aufgerufen wird from stiftung.models import AuditLog # Normalisiere entity_type: muss in den AuditLog.ENTITY_TYPES-Choices sein # oder auf "system" fallen, da AuditLog choices-Validierung ggf. nicht hart durchgesetzt wird valid_entity_types = {choice[0] for choice in AuditLog.ENTITY_TYPES} if entity_type not in valid_entity_types: entity_type = "system" # Normalisiere action: muss in ACTION_TYPES sein valid_actions = {choice[0] for choice in AuditLog.ACTION_TYPES} if action not in valid_actions: action = "export" # Generischer Fallback für MCP-Leseoperationen AuditLog.objects.create( user=None, username=f"mcp:{role}", action=action, entity_type=entity_type, entity_id=str(entity_id) if entity_id else "", entity_name=entity_name, description=description, changes=changes, ip_address=None, user_agent=f"MCP/{role}", session_key="mcp", ) def log_mcp_read(role: str, entity_type: str, entity_name: str, description: str) -> None: """Loggt eine Leseoperation via MCP (als 'export'-Aktion).""" log_mcp_action( role=role, action="export", entity_type=entity_type, entity_id="", entity_name=entity_name, description=description, ) def log_mcp_create( role: str, entity_type: str, entity_id: str, entity_name: str ) -> None: """Loggt eine Erstellungsoperation via MCP.""" log_mcp_action( role=role, action="create", entity_type=entity_type, entity_id=entity_id, entity_name=entity_name, description=f"[MCP] {entity_type} '{entity_name}' erstellt", ) def log_mcp_update( role: str, entity_type: str, entity_id: str, entity_name: str, changes: dict ) -> None: """Loggt eine Aktualisierungsoperation via MCP.""" changed_fields = ", ".join(changes.keys()) if changes else "" log_mcp_action( role=role, action="update", entity_type=entity_type, entity_id=entity_id, entity_name=entity_name, description=f"[MCP] {entity_type} '{entity_name}' aktualisiert: {changed_fields}", changes=changes, )