Files
stiftung-management-system/app/stiftung/agent/providers.py
SysAdmin Agent e0b377014c
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
CI/CD Pipeline / deploy (push) Has been cancelled
Code Quality / quality (push) Has been cancelled
v4.1.0: DMS email documents, category-specific Nachweis linking, version system
- Save cover email body as DMS document with new 'email' context type
- Show email body separately from attachments in email detail view
- Add per-category DMS document assignment in quarterly confirmation
  (Studiennachweis, Einkommenssituation, Vermögenssituation)
- Add VERSION file and context processor for automatic version display
- Add MCP server, agent system, import/export, and new migrations
- Update compose files and production environment template

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 18:48:52 +00:00

324 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM-Provider-Abstraktion für den AI Agent.
Unterstützt:
- Ollama (Standard, OpenAI-kompatibles API via httpx)
- OpenAI
- Anthropic (über OpenAI-kompatible Schnittstelle)
Alle Provider implementieren synchrones Streaming via Generator.
"""
from __future__ import annotations
import json
import logging
from typing import Generator, Any
import httpx
logger = logging.getLogger(__name__)
class LLMError(Exception):
"""LLM-Kommunikationsfehler."""
pass
class BaseLLMProvider:
def chat_stream(
self,
messages: list[dict],
tools: list[dict] | None = None,
) -> Generator[dict, None, None]:
"""
Streamt Antwort-Chunks als Dicts.
Chunk-Typen:
{"type": "text", "content": "..."}
{"type": "tool_call", "id": "...", "name": "...", "arguments": {...}}
{"type": "done"}
{"type": "error", "message": "..."}
"""
raise NotImplementedError
class OllamaProvider(BaseLLMProvider):
"""Ollama via OpenAI-kompatibler Chat-Completion-Endpunkt."""
def __init__(self, base_url: str, model: str):
self.base_url = base_url.rstrip("/")
self.model = model
def chat_stream(
self,
messages: list[dict],
tools: list[dict] | None = None,
) -> Generator[dict, None, None]:
url = f"{self.base_url}/v1/chat/completions"
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"stream": True,
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
try:
with httpx.Client(timeout=120.0) as client:
with client.stream("POST", url, json=payload) as response:
if response.status_code != 200:
body = response.read().decode()
raise LLMError(
f"Ollama-Fehler {response.status_code}: {body[:200]}"
)
yield from _parse_openai_stream(response)
except httpx.ConnectError:
raise LLMError(
f"Verbindung zu Ollama ({self.base_url}) fehlgeschlagen. "
"Ist der Ollama-Dienst gestartet?"
)
except httpx.RemoteProtocolError:
raise LLMError(
"Ollama-Verbindung abgebrochen. "
"Möglicherweise nicht genug RAM für dieses Modell mit Tool-Calling."
)
except httpx.TimeoutException:
raise LLMError("Ollama-Anfrage hat das Zeitlimit überschritten.")
class OpenAIProvider(BaseLLMProvider):
"""OpenAI Chat-Completion API."""
BASE_URL = "https://api.openai.com"
def __init__(self, api_key: str, model: str):
self.api_key = api_key
self.model = model
def chat_stream(
self,
messages: list[dict],
tools: list[dict] | None = None,
) -> Generator[dict, None, None]:
url = f"{self.BASE_URL}/v1/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"stream": True,
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
try:
with httpx.Client(timeout=120.0) as client:
with client.stream("POST", url, json=payload, headers=headers) as response:
if response.status_code != 200:
body = response.read().decode()
raise LLMError(
f"OpenAI-Fehler {response.status_code}: {body[:200]}"
)
yield from _parse_openai_stream(response)
except httpx.TimeoutException:
raise LLMError("OpenAI-Anfrage hat das Zeitlimit überschritten.")
class AnthropicProvider(BaseLLMProvider):
"""Anthropic Messages API (native, not OpenAI-compatible)."""
BASE_URL = "https://api.anthropic.com"
def __init__(self, api_key: str, model: str):
self.api_key = api_key
self.model = model
def chat_stream(
self,
messages: list[dict],
tools: list[dict] | None = None,
) -> Generator[dict, None, None]:
url = f"{self.BASE_URL}/v1/messages"
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
}
# Extract system message from messages list
system = ""
chat_messages = []
for msg in messages:
if msg["role"] == "system":
system = msg["content"]
else:
chat_messages.append(msg)
# Convert OpenAI tool format to Anthropic format
anthropic_tools = []
if tools:
for t in tools:
fn = t.get("function", {})
anthropic_tools.append({
"name": fn.get("name"),
"description": fn.get("description", ""),
"input_schema": fn.get("parameters", {}),
})
payload: dict[str, Any] = {
"model": self.model,
"max_tokens": 4096,
"messages": chat_messages,
"stream": True,
}
if system:
payload["system"] = system
if anthropic_tools:
payload["tools"] = anthropic_tools
try:
with httpx.Client(timeout=120.0) as client:
with client.stream("POST", url, json=payload, headers=headers) as response:
if response.status_code != 200:
body = response.read().decode()
raise LLMError(
f"Anthropic-Fehler {response.status_code}: {body[:200]}"
)
yield from _parse_anthropic_stream(response)
except httpx.TimeoutException:
raise LLMError("Anthropic-Anfrage hat das Zeitlimit überschritten.")
def _parse_openai_stream(response) -> Generator[dict, None, None]:
"""Parst OpenAI-kompatibles SSE-Streaming-Format."""
accumulated_tool_calls: dict[int, dict] = {}
for line in response.iter_lines():
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
line = line[6:]
try:
chunk = json.loads(line)
except json.JSONDecodeError:
continue
choice = chunk.get("choices", [{}])[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
# Text content
if delta.get("content"):
yield {"type": "text", "content": delta["content"]}
# Tool calls (streaming parts arrive incrementally)
tool_calls_delta = delta.get("tool_calls", [])
for tc_delta in tool_calls_delta:
idx = tc_delta.get("index", 0)
if idx not in accumulated_tool_calls:
accumulated_tool_calls[idx] = {
"id": "",
"name": "",
"arguments": "",
}
tc = accumulated_tool_calls[idx]
if tc_delta.get("id"):
tc["id"] += tc_delta["id"]
fn = tc_delta.get("function", {})
if fn.get("name"):
tc["name"] += fn["name"]
if fn.get("arguments"):
tc["arguments"] += fn["arguments"]
if finish_reason in ("tool_calls", "stop"):
# Emit completed tool calls
for tc in accumulated_tool_calls.values():
try:
args = json.loads(tc["arguments"]) if tc["arguments"] else {}
except json.JSONDecodeError:
args = {}
yield {
"type": "tool_call",
"id": tc["id"],
"name": tc["name"],
"arguments": args,
}
accumulated_tool_calls.clear()
if finish_reason == "stop":
yield {"type": "done"}
return
yield {"type": "done"}
def _parse_anthropic_stream(response) -> Generator[dict, None, None]:
"""Parst Anthropic SSE-Streaming-Format."""
current_tool: dict | None = None
tool_input_str = ""
for line in response.iter_lines():
if not line or line.startswith("event:"):
continue
if line.startswith("data: "):
line = line[6:]
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
event_type = event.get("type", "")
if event_type == "content_block_start":
block = event.get("content_block", {})
if block.get("type") == "tool_use":
current_tool = {"id": block.get("id", ""), "name": block.get("name", "")}
tool_input_str = ""
elif event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
yield {"type": "text", "content": delta.get("text", "")}
elif delta.get("type") == "input_json_delta":
tool_input_str += delta.get("partial_json", "")
elif event_type == "content_block_stop":
if current_tool is not None:
try:
args = json.loads(tool_input_str) if tool_input_str else {}
except json.JSONDecodeError:
args = {}
yield {
"type": "tool_call",
"id": current_tool["id"],
"name": current_tool["name"],
"arguments": args,
}
current_tool = None
tool_input_str = ""
elif event_type == "message_stop":
yield {"type": "done"}
return
yield {"type": "done"}
def get_provider(config) -> BaseLLMProvider:
"""Erstellt den konfigurierten LLM-Provider."""
if config.provider == "ollama":
return OllamaProvider(base_url=config.ollama_url, model=config.model_name)
elif config.provider == "openai":
if not config.openai_api_key:
raise LLMError("OpenAI API-Key ist nicht konfiguriert.")
return OpenAIProvider(api_key=config.openai_api_key, model=config.model_name)
elif config.provider == "anthropic":
if not config.anthropic_api_key:
raise LLMError("Anthropic API-Key ist nicht konfiguriert.")
return AnthropicProvider(api_key=config.anthropic_api_key, model=config.model_name)
else:
raise LLMError(f"Unbekannter Provider: {config.provider}")