""" LLM-Provider-Abstraktion für den AI Agent. Unterstützt: - Ollama (Standard, OpenAI-kompatibles API via httpx) - OpenAI - Anthropic (über OpenAI-kompatible Schnittstelle) Alle Provider implementieren synchrones Streaming via Generator. """ from __future__ import annotations import json import logging from typing import Generator, Any import httpx logger = logging.getLogger(__name__) class LLMError(Exception): """LLM-Kommunikationsfehler.""" pass class BaseLLMProvider: def chat_stream( self, messages: list[dict], tools: list[dict] | None = None, ) -> Generator[dict, None, None]: """ Streamt Antwort-Chunks als Dicts. Chunk-Typen: {"type": "text", "content": "..."} {"type": "tool_call", "id": "...", "name": "...", "arguments": {...}} {"type": "done"} {"type": "error", "message": "..."} """ raise NotImplementedError class OllamaProvider(BaseLLMProvider): """Ollama via OpenAI-kompatibler Chat-Completion-Endpunkt.""" def __init__(self, base_url: str, model: str): self.base_url = base_url.rstrip("/") self.model = model def chat_stream( self, messages: list[dict], tools: list[dict] | None = None, ) -> Generator[dict, None, None]: url = f"{self.base_url}/v1/chat/completions" payload: dict[str, Any] = { "model": self.model, "messages": messages, "stream": True, } if tools: payload["tools"] = tools payload["tool_choice"] = "auto" try: with httpx.Client(timeout=120.0) as client: with client.stream("POST", url, json=payload) as response: if response.status_code != 200: body = response.read().decode() raise LLMError( f"Ollama-Fehler {response.status_code}: {body[:200]}" ) yield from _parse_openai_stream(response) except httpx.ConnectError: raise LLMError( f"Verbindung zu Ollama ({self.base_url}) fehlgeschlagen. " "Ist der Ollama-Dienst gestartet?" ) except httpx.RemoteProtocolError: raise LLMError( "Ollama-Verbindung abgebrochen. " "Möglicherweise nicht genug RAM für dieses Modell mit Tool-Calling." ) except httpx.TimeoutException: raise LLMError("Ollama-Anfrage hat das Zeitlimit überschritten.") class OpenAIProvider(BaseLLMProvider): """OpenAI Chat-Completion API.""" BASE_URL = "https://api.openai.com" def __init__(self, api_key: str, model: str): self.api_key = api_key self.model = model def chat_stream( self, messages: list[dict], tools: list[dict] | None = None, ) -> Generator[dict, None, None]: url = f"{self.BASE_URL}/v1/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload: dict[str, Any] = { "model": self.model, "messages": messages, "stream": True, } if tools: payload["tools"] = tools payload["tool_choice"] = "auto" try: with httpx.Client(timeout=120.0) as client: with client.stream("POST", url, json=payload, headers=headers) as response: if response.status_code != 200: body = response.read().decode() raise LLMError( f"OpenAI-Fehler {response.status_code}: {body[:200]}" ) yield from _parse_openai_stream(response) except httpx.TimeoutException: raise LLMError("OpenAI-Anfrage hat das Zeitlimit überschritten.") class AnthropicProvider(BaseLLMProvider): """Anthropic Messages API (native, not OpenAI-compatible).""" BASE_URL = "https://api.anthropic.com" def __init__(self, api_key: str, model: str): self.api_key = api_key self.model = model def chat_stream( self, messages: list[dict], tools: list[dict] | None = None, ) -> Generator[dict, None, None]: url = f"{self.BASE_URL}/v1/messages" headers = { "x-api-key": self.api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json", } # Extract system message from messages list system = "" chat_messages = [] for msg in messages: if msg["role"] == "system": system = msg["content"] else: chat_messages.append(msg) # Convert OpenAI tool format to Anthropic format anthropic_tools = [] if tools: for t in tools: fn = t.get("function", {}) anthropic_tools.append({ "name": fn.get("name"), "description": fn.get("description", ""), "input_schema": fn.get("parameters", {}), }) payload: dict[str, Any] = { "model": self.model, "max_tokens": 4096, "messages": chat_messages, "stream": True, } if system: payload["system"] = system if anthropic_tools: payload["tools"] = anthropic_tools try: with httpx.Client(timeout=120.0) as client: with client.stream("POST", url, json=payload, headers=headers) as response: if response.status_code != 200: body = response.read().decode() raise LLMError( f"Anthropic-Fehler {response.status_code}: {body[:200]}" ) yield from _parse_anthropic_stream(response) except httpx.TimeoutException: raise LLMError("Anthropic-Anfrage hat das Zeitlimit überschritten.") def _parse_openai_stream(response) -> Generator[dict, None, None]: """Parst OpenAI-kompatibles SSE-Streaming-Format.""" accumulated_tool_calls: dict[int, dict] = {} for line in response.iter_lines(): if not line or line == "data: [DONE]": continue if line.startswith("data: "): line = line[6:] try: chunk = json.loads(line) except json.JSONDecodeError: continue choice = chunk.get("choices", [{}])[0] delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") # Text content if delta.get("content"): yield {"type": "text", "content": delta["content"]} # Tool calls (streaming – parts arrive incrementally) tool_calls_delta = delta.get("tool_calls", []) for tc_delta in tool_calls_delta: idx = tc_delta.get("index", 0) if idx not in accumulated_tool_calls: accumulated_tool_calls[idx] = { "id": "", "name": "", "arguments": "", } tc = accumulated_tool_calls[idx] if tc_delta.get("id"): tc["id"] += tc_delta["id"] fn = tc_delta.get("function", {}) if fn.get("name"): tc["name"] += fn["name"] if fn.get("arguments"): tc["arguments"] += fn["arguments"] if finish_reason in ("tool_calls", "stop"): # Emit completed tool calls for tc in accumulated_tool_calls.values(): try: args = json.loads(tc["arguments"]) if tc["arguments"] else {} except json.JSONDecodeError: args = {} yield { "type": "tool_call", "id": tc["id"], "name": tc["name"], "arguments": args, } accumulated_tool_calls.clear() if finish_reason == "stop": yield {"type": "done"} return yield {"type": "done"} def _parse_anthropic_stream(response) -> Generator[dict, None, None]: """Parst Anthropic SSE-Streaming-Format.""" current_tool: dict | None = None tool_input_str = "" for line in response.iter_lines(): if not line or line.startswith("event:"): continue if line.startswith("data: "): line = line[6:] try: event = json.loads(line) except json.JSONDecodeError: continue event_type = event.get("type", "") if event_type == "content_block_start": block = event.get("content_block", {}) if block.get("type") == "tool_use": current_tool = {"id": block.get("id", ""), "name": block.get("name", "")} tool_input_str = "" elif event_type == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": yield {"type": "text", "content": delta.get("text", "")} elif delta.get("type") == "input_json_delta": tool_input_str += delta.get("partial_json", "") elif event_type == "content_block_stop": if current_tool is not None: try: args = json.loads(tool_input_str) if tool_input_str else {} except json.JSONDecodeError: args = {} yield { "type": "tool_call", "id": current_tool["id"], "name": current_tool["name"], "arguments": args, } current_tool = None tool_input_str = "" elif event_type == "message_stop": yield {"type": "done"} return yield {"type": "done"} def get_provider(config) -> BaseLLMProvider: """Erstellt den konfigurierten LLM-Provider.""" if config.provider == "ollama": return OllamaProvider(base_url=config.ollama_url, model=config.model_name) elif config.provider == "openai": if not config.openai_api_key: raise LLMError("OpenAI API-Key ist nicht konfiguriert.") return OpenAIProvider(api_key=config.openai_api_key, model=config.model_name) elif config.provider == "anthropic": if not config.anthropic_api_key: raise LLMError("Anthropic API-Key ist nicht konfiguriert.") return AnthropicProvider(api_key=config.anthropic_api_key, model=config.model_name) else: raise LLMError(f"Unbekannter Provider: {config.provider}")