- Save cover email body as DMS document with new 'email' context type - Show email body separately from attachments in email detail view - Add per-category DMS document assignment in quarterly confirmation (Studiennachweis, Einkommenssituation, Vermögenssituation) - Add VERSION file and context processor for automatic version display - Add MCP server, agent system, import/export, and new migrations - Update compose files and production environment template Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
324 lines
11 KiB
Python
324 lines
11 KiB
Python
"""
|
||
LLM-Provider-Abstraktion für den AI Agent.
|
||
|
||
Unterstützt:
|
||
- Ollama (Standard, OpenAI-kompatibles API via httpx)
|
||
- OpenAI
|
||
- Anthropic (über OpenAI-kompatible Schnittstelle)
|
||
|
||
Alle Provider implementieren synchrones Streaming via Generator.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from typing import Generator, Any
|
||
|
||
import httpx
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class LLMError(Exception):
|
||
"""LLM-Kommunikationsfehler."""
|
||
pass
|
||
|
||
|
||
class BaseLLMProvider:
|
||
def chat_stream(
|
||
self,
|
||
messages: list[dict],
|
||
tools: list[dict] | None = None,
|
||
) -> Generator[dict, None, None]:
|
||
"""
|
||
Streamt Antwort-Chunks als Dicts.
|
||
Chunk-Typen:
|
||
{"type": "text", "content": "..."}
|
||
{"type": "tool_call", "id": "...", "name": "...", "arguments": {...}}
|
||
{"type": "done"}
|
||
{"type": "error", "message": "..."}
|
||
"""
|
||
raise NotImplementedError
|
||
|
||
|
||
class OllamaProvider(BaseLLMProvider):
|
||
"""Ollama via OpenAI-kompatibler Chat-Completion-Endpunkt."""
|
||
|
||
def __init__(self, base_url: str, model: str):
|
||
self.base_url = base_url.rstrip("/")
|
||
self.model = model
|
||
|
||
def chat_stream(
|
||
self,
|
||
messages: list[dict],
|
||
tools: list[dict] | None = None,
|
||
) -> Generator[dict, None, None]:
|
||
url = f"{self.base_url}/v1/chat/completions"
|
||
payload: dict[str, Any] = {
|
||
"model": self.model,
|
||
"messages": messages,
|
||
"stream": True,
|
||
}
|
||
if tools:
|
||
payload["tools"] = tools
|
||
payload["tool_choice"] = "auto"
|
||
|
||
try:
|
||
with httpx.Client(timeout=120.0) as client:
|
||
with client.stream("POST", url, json=payload) as response:
|
||
if response.status_code != 200:
|
||
body = response.read().decode()
|
||
raise LLMError(
|
||
f"Ollama-Fehler {response.status_code}: {body[:200]}"
|
||
)
|
||
yield from _parse_openai_stream(response)
|
||
except httpx.ConnectError:
|
||
raise LLMError(
|
||
f"Verbindung zu Ollama ({self.base_url}) fehlgeschlagen. "
|
||
"Ist der Ollama-Dienst gestartet?"
|
||
)
|
||
except httpx.RemoteProtocolError:
|
||
raise LLMError(
|
||
"Ollama-Verbindung abgebrochen. "
|
||
"Möglicherweise nicht genug RAM für dieses Modell mit Tool-Calling."
|
||
)
|
||
except httpx.TimeoutException:
|
||
raise LLMError("Ollama-Anfrage hat das Zeitlimit überschritten.")
|
||
|
||
|
||
class OpenAIProvider(BaseLLMProvider):
|
||
"""OpenAI Chat-Completion API."""
|
||
|
||
BASE_URL = "https://api.openai.com"
|
||
|
||
def __init__(self, api_key: str, model: str):
|
||
self.api_key = api_key
|
||
self.model = model
|
||
|
||
def chat_stream(
|
||
self,
|
||
messages: list[dict],
|
||
tools: list[dict] | None = None,
|
||
) -> Generator[dict, None, None]:
|
||
url = f"{self.BASE_URL}/v1/chat/completions"
|
||
headers = {
|
||
"Authorization": f"Bearer {self.api_key}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
payload: dict[str, Any] = {
|
||
"model": self.model,
|
||
"messages": messages,
|
||
"stream": True,
|
||
}
|
||
if tools:
|
||
payload["tools"] = tools
|
||
payload["tool_choice"] = "auto"
|
||
|
||
try:
|
||
with httpx.Client(timeout=120.0) as client:
|
||
with client.stream("POST", url, json=payload, headers=headers) as response:
|
||
if response.status_code != 200:
|
||
body = response.read().decode()
|
||
raise LLMError(
|
||
f"OpenAI-Fehler {response.status_code}: {body[:200]}"
|
||
)
|
||
yield from _parse_openai_stream(response)
|
||
except httpx.TimeoutException:
|
||
raise LLMError("OpenAI-Anfrage hat das Zeitlimit überschritten.")
|
||
|
||
|
||
class AnthropicProvider(BaseLLMProvider):
|
||
"""Anthropic Messages API (native, not OpenAI-compatible)."""
|
||
|
||
BASE_URL = "https://api.anthropic.com"
|
||
|
||
def __init__(self, api_key: str, model: str):
|
||
self.api_key = api_key
|
||
self.model = model
|
||
|
||
def chat_stream(
|
||
self,
|
||
messages: list[dict],
|
||
tools: list[dict] | None = None,
|
||
) -> Generator[dict, None, None]:
|
||
url = f"{self.BASE_URL}/v1/messages"
|
||
headers = {
|
||
"x-api-key": self.api_key,
|
||
"anthropic-version": "2023-06-01",
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
# Extract system message from messages list
|
||
system = ""
|
||
chat_messages = []
|
||
for msg in messages:
|
||
if msg["role"] == "system":
|
||
system = msg["content"]
|
||
else:
|
||
chat_messages.append(msg)
|
||
|
||
# Convert OpenAI tool format to Anthropic format
|
||
anthropic_tools = []
|
||
if tools:
|
||
for t in tools:
|
||
fn = t.get("function", {})
|
||
anthropic_tools.append({
|
||
"name": fn.get("name"),
|
||
"description": fn.get("description", ""),
|
||
"input_schema": fn.get("parameters", {}),
|
||
})
|
||
|
||
payload: dict[str, Any] = {
|
||
"model": self.model,
|
||
"max_tokens": 4096,
|
||
"messages": chat_messages,
|
||
"stream": True,
|
||
}
|
||
if system:
|
||
payload["system"] = system
|
||
if anthropic_tools:
|
||
payload["tools"] = anthropic_tools
|
||
|
||
try:
|
||
with httpx.Client(timeout=120.0) as client:
|
||
with client.stream("POST", url, json=payload, headers=headers) as response:
|
||
if response.status_code != 200:
|
||
body = response.read().decode()
|
||
raise LLMError(
|
||
f"Anthropic-Fehler {response.status_code}: {body[:200]}"
|
||
)
|
||
yield from _parse_anthropic_stream(response)
|
||
except httpx.TimeoutException:
|
||
raise LLMError("Anthropic-Anfrage hat das Zeitlimit überschritten.")
|
||
|
||
|
||
def _parse_openai_stream(response) -> Generator[dict, None, None]:
|
||
"""Parst OpenAI-kompatibles SSE-Streaming-Format."""
|
||
accumulated_tool_calls: dict[int, dict] = {}
|
||
|
||
for line in response.iter_lines():
|
||
if not line or line == "data: [DONE]":
|
||
continue
|
||
if line.startswith("data: "):
|
||
line = line[6:]
|
||
try:
|
||
chunk = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
choice = chunk.get("choices", [{}])[0]
|
||
delta = choice.get("delta", {})
|
||
finish_reason = choice.get("finish_reason")
|
||
|
||
# Text content
|
||
if delta.get("content"):
|
||
yield {"type": "text", "content": delta["content"]}
|
||
|
||
# Tool calls (streaming – parts arrive incrementally)
|
||
tool_calls_delta = delta.get("tool_calls", [])
|
||
for tc_delta in tool_calls_delta:
|
||
idx = tc_delta.get("index", 0)
|
||
if idx not in accumulated_tool_calls:
|
||
accumulated_tool_calls[idx] = {
|
||
"id": "",
|
||
"name": "",
|
||
"arguments": "",
|
||
}
|
||
tc = accumulated_tool_calls[idx]
|
||
if tc_delta.get("id"):
|
||
tc["id"] += tc_delta["id"]
|
||
fn = tc_delta.get("function", {})
|
||
if fn.get("name"):
|
||
tc["name"] += fn["name"]
|
||
if fn.get("arguments"):
|
||
tc["arguments"] += fn["arguments"]
|
||
|
||
if finish_reason in ("tool_calls", "stop"):
|
||
# Emit completed tool calls
|
||
for tc in accumulated_tool_calls.values():
|
||
try:
|
||
args = json.loads(tc["arguments"]) if tc["arguments"] else {}
|
||
except json.JSONDecodeError:
|
||
args = {}
|
||
yield {
|
||
"type": "tool_call",
|
||
"id": tc["id"],
|
||
"name": tc["name"],
|
||
"arguments": args,
|
||
}
|
||
accumulated_tool_calls.clear()
|
||
|
||
if finish_reason == "stop":
|
||
yield {"type": "done"}
|
||
return
|
||
|
||
yield {"type": "done"}
|
||
|
||
|
||
def _parse_anthropic_stream(response) -> Generator[dict, None, None]:
|
||
"""Parst Anthropic SSE-Streaming-Format."""
|
||
current_tool: dict | None = None
|
||
tool_input_str = ""
|
||
|
||
for line in response.iter_lines():
|
||
if not line or line.startswith("event:"):
|
||
continue
|
||
if line.startswith("data: "):
|
||
line = line[6:]
|
||
try:
|
||
event = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
event_type = event.get("type", "")
|
||
|
||
if event_type == "content_block_start":
|
||
block = event.get("content_block", {})
|
||
if block.get("type") == "tool_use":
|
||
current_tool = {"id": block.get("id", ""), "name": block.get("name", "")}
|
||
tool_input_str = ""
|
||
|
||
elif event_type == "content_block_delta":
|
||
delta = event.get("delta", {})
|
||
if delta.get("type") == "text_delta":
|
||
yield {"type": "text", "content": delta.get("text", "")}
|
||
elif delta.get("type") == "input_json_delta":
|
||
tool_input_str += delta.get("partial_json", "")
|
||
|
||
elif event_type == "content_block_stop":
|
||
if current_tool is not None:
|
||
try:
|
||
args = json.loads(tool_input_str) if tool_input_str else {}
|
||
except json.JSONDecodeError:
|
||
args = {}
|
||
yield {
|
||
"type": "tool_call",
|
||
"id": current_tool["id"],
|
||
"name": current_tool["name"],
|
||
"arguments": args,
|
||
}
|
||
current_tool = None
|
||
tool_input_str = ""
|
||
|
||
elif event_type == "message_stop":
|
||
yield {"type": "done"}
|
||
return
|
||
|
||
yield {"type": "done"}
|
||
|
||
|
||
def get_provider(config) -> BaseLLMProvider:
|
||
"""Erstellt den konfigurierten LLM-Provider."""
|
||
if config.provider == "ollama":
|
||
return OllamaProvider(base_url=config.ollama_url, model=config.model_name)
|
||
elif config.provider == "openai":
|
||
if not config.openai_api_key:
|
||
raise LLMError("OpenAI API-Key ist nicht konfiguriert.")
|
||
return OpenAIProvider(api_key=config.openai_api_key, model=config.model_name)
|
||
elif config.provider == "anthropic":
|
||
if not config.anthropic_api_key:
|
||
raise LLMError("Anthropic API-Key ist nicht konfiguriert.")
|
||
return AnthropicProvider(api_key=config.anthropic_api_key, model=config.model_name)
|
||
else:
|
||
raise LLMError(f"Unbekannter Provider: {config.provider}")
|