Files
paliad/frontend/src/client/paliadin.ts
mAi cdd27d674e feat(paliadin): stream + honest late-recovery (t-paliad-235)
m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.

Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:

1. Switch the aichat backend to /chat/turn/stream
   - new AichatPaliadinService.RunTurnStream relays incremental chunks
   - SSE parser handles default `data:` frames (chunk/meta/done/error)
     and named `event: heartbeat` frames per the upstream contract
   - no more 130 s hard ceiling — stream stays open as long as data or
     heartbeats flow; silenceTimeout (90 s) catches a true upstream
     stall instead

2. Proof-of-life thinking events
   - handler emits `event: thinking` every 5 s while the upstream is
     silent (synthesised locally) AND relays aichat's `heartbeat`
     events as thinking pings
   - frontend renders a lime-dot pulse + monospace counter inside the
     assistant bubble — the user can SEE the chat is still working

3. Honest disconnect copy + real late-recovery
   - new dispatching endpoint GET /api/paliadin/turns/{id}/recover
   - aichat backend: asks aichat via GET /chat/conversations and
     /chat/conversations/{id}/turns whether the turn actually finished
   - legacy backend: falls through to the local row read (janitor)
   - frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
     while the recovery polls; on confirmed "lost" swaps to
     "Antwort konnte nicht zugestellt werden — bitte erneut stellen"
   - migration 118 adds aichat_conversation_id to paliadin_turns so
     the recovery has a fast path when the done frame arrived before
     the drop

Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.

13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.

go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
2026-05-22 15:17:24 +02:00

618 lines
21 KiB
TypeScript

import { initI18n, getLang, t } from "./i18n";
import { initSidebar } from "./sidebar";
import { renderResponseHTML } from "./paliadin-render";
import { pollForLateResponse, type LateTurn, type LatePollHandle } from "./paliadin-late-poll";
// Paliadin chat panel client (t-paliad-146 PoC, streaming upgrade
// t-paliad-235).
//
// State machine: empty → typing → sending → thinking → streaming → done.
// History lives in localStorage under "paliadin:history:<sessionId>"
// — design §0.5.4 session-only persistence.
//
// SSE consumer subscribes to `event: meta`, `event: content`,
// `event: thinking`, `event: end`, `event: error`, `event: ping`.
//
// `content` events from the aichat backend arrive as incremental
// `{delta: "..."}` chunks; the bubble accumulates them in real time —
// no typewriter simulation needed. Legacy backends still emit a single
// `{text: "..."}` payload and we fall back to the typewriter for that
// shape.
//
// `thinking` events fire while the upstream is alive but hasn't
// produced content yet (or stalled mid-stream); the bubble renders a
// pulse + counter so the user can SEE the chat is still working.
interface HistoryEntry {
role: "user" | "assistant";
text: string;
meta?: {
used_tools?: string[];
rows_seen?: number[];
classifier_tag?: string;
duration_ms?: number;
chip_count?: number;
};
ts: string; // ISO
}
const SESSION_KEY = "paliadin:session";
const HISTORY_PREFIX = "paliadin:history:";
let sessionId: string;
let history: HistoryEntry[] = [];
let currentEventSource: EventSource | null = null;
let currentTurnId: string | null = null;
// Late-response polls keyed by turn_id. Each entry runs until the
// response arrives or the 10-min cap expires. Stays alive across
// turns — m can keep chatting while we wait for the slow one.
const latePolls = new Map<string, LatePollHandle>();
document.addEventListener("DOMContentLoaded", () => {
initI18n();
initSidebar();
bootSession();
wireForm();
wireStarters();
wireReset();
renderHistory();
// Pull the canonical conversation from the DB so a turn typed in the
// inline drawer (which shares this session id) shows up here on
// mount. DB > localStorage when both have data.
void hydrateFromServer();
});
function bootSession(): void {
let s = localStorage.getItem(SESSION_KEY);
if (!s) {
s = crypto.randomUUID();
localStorage.setItem(SESSION_KEY, s);
}
sessionId = s;
const stored = localStorage.getItem(HISTORY_PREFIX + sessionId);
if (stored) {
try {
history = JSON.parse(stored);
} catch {
history = [];
}
}
}
function wireForm(): void {
const form = document.getElementById("paliadin-form") as HTMLFormElement | null;
const input = document.getElementById("paliadin-input") as HTMLTextAreaElement | null;
if (!form || !input) return;
form.addEventListener("submit", (e) => {
e.preventDefault();
const text = input.value.trim();
if (!text) return;
input.value = "";
sendTurn(text);
});
// Enter sends; Shift+Enter inserts newline.
input.addEventListener("keydown", (e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
form.dispatchEvent(new Event("submit"));
}
});
}
function wireStarters(): void {
const starters = document.querySelectorAll<HTMLButtonElement>(".paliadin-starter");
starters.forEach((btn) => {
btn.addEventListener("click", () => {
const lang = getLang();
const promptText = lang === "en"
? btn.dataset.promptEn || btn.textContent?.trim() || ""
: btn.dataset.promptDe || btn.textContent?.trim() || "";
if (promptText) sendTurn(promptText);
});
});
}
function wireReset(): void {
const btn = document.getElementById("paliadin-reset");
if (!btn) return;
btn.addEventListener("click", async () => {
history = [];
saveHistory();
renderHistory();
try {
await fetch("/api/paliadin/reset", { method: "POST", credentials: "same-origin" });
} catch {
// Reset failure is non-fatal — the next turn will spin up a fresh pane anyway.
}
});
}
async function sendTurn(text: string): Promise<void> {
// Hide empty state on first send.
const empty = document.getElementById("paliadin-empty");
if (empty) empty.style.display = "none";
// Append user bubble.
history.push({ role: "user", text, ts: new Date().toISOString() });
saveHistory();
appendBubble("user", text);
// Insert placeholder assistant bubble.
const placeholder = appendBubble("assistant", "");
placeholder.dataset.streaming = "true";
placeholder.querySelector(".paliadin-bubble-text")!.textContent = "Paliadin denkt nach …";
toggleStopButton(true);
// Kick off the turn.
let turnRes: { turn_id: string; sse_url: string };
try {
const r = await fetch("/api/paliadin/turn", {
method: "POST",
headers: { "Content-Type": "application/json" },
credentials: "same-origin",
body: JSON.stringify({
user_message: text,
session_id: sessionId,
page_origin: "/paliadin",
}),
});
if (!r.ok) throw new Error("HTTP " + r.status);
turnRes = await r.json();
} catch (err) {
placeholder.querySelector(".paliadin-bubble-text")!.textContent =
t("paliadin.error.upstream");
placeholder.dataset.streaming = "false";
placeholder.classList.add("paliadin-bubble--error");
toggleStopButton(false);
return;
}
currentTurnId = turnRes.turn_id;
// Open SSE.
const es = new EventSource(turnRes.sse_url);
currentEventSource = es;
// Show the thinking pulse immediately — the placeholder text already
// says "denkt nach", but the visible pulse + counter is the live
// proof-of-life signal m needs to trust that the chat is working.
startThinkingIndicator(placeholder);
// Reset the streamed accumulator for this turn.
placeholder.dataset.fullText = "";
es.addEventListener("meta", () => {
// Could surface a "thinking" indicator; placeholder text already does.
});
es.addEventListener("thinking", (ev) => {
let elapsed = 0;
try {
const data = JSON.parse((ev as MessageEvent).data || "{}");
if (typeof data.elapsed_seconds === "number") {
elapsed = data.elapsed_seconds;
}
} catch {
/* ignore */
}
updateThinkingIndicator(placeholder, elapsed);
});
es.addEventListener("content", (ev) => {
const data = JSON.parse((ev as MessageEvent).data);
const delta = typeof data.delta === "string" ? data.delta : "";
if (delta) {
// Aichat streaming path — accumulate the delta into the bubble.
stopThinkingIndicator(placeholder);
const current = placeholder.dataset.fullText ?? "";
const next = current + delta;
placeholder.dataset.fullText = next;
writeStreamedText(placeholder, next);
return;
}
// Legacy one-shot path — full body in `text`.
const text = String(data.text || "");
placeholder.dataset.fullText = text;
stopThinkingIndicator(placeholder);
typewriter(placeholder, text);
});
es.addEventListener("end", (ev) => {
const data = JSON.parse((ev as MessageEvent).data);
placeholder.dataset.streaming = "false";
stopThinkingIndicator(placeholder);
finishBubble(placeholder, data);
history.push({
role: "assistant",
// Save the raw Markdown body (with [#deadline-OPEN:...] chip markers
// intact), not the rendered textContent. Otherwise on reload the
// chip-anchor text replaces the markers and renderResponseHTML can
// no longer reconstruct the links (m, 2026-05-08 14:11 — links
// disappeared on second load).
text: placeholder.dataset.fullText ?? getBubbleText(placeholder),
meta: {
used_tools: data.used_tools,
rows_seen: data.rows_seen,
classifier_tag: data.classifier_tag,
duration_ms: data.duration_ms,
chip_count: data.chip_count,
},
ts: new Date().toISOString(),
});
saveHistory();
cleanupTurn();
});
es.addEventListener("error", (ev) => {
const errText = friendlyErrorMessage((ev as MessageEvent).data);
stopThinkingIndicator(placeholder);
// Honest copy: we don't claim "nachgereicht" because the recovery
// path may report "lost". Frame it as "checking" while we ask the
// backend whether the turn actually completed upstream.
placeholder.querySelector(".paliadin-bubble-text")!.textContent =
errText + " " + t("paliadin.late.checking");
placeholder.classList.add("paliadin-bubble--error");
placeholder.classList.add("paliadin-bubble--late-pending");
placeholder.dataset.streaming = "false";
placeholder.dataset.errorText = errText;
if (currentTurnId) {
placeholder.dataset.turnId = currentTurnId;
startLatePoll(currentTurnId, placeholder);
}
cleanupTurn();
});
es.addEventListener("ping", () => {
// heartbeat — no-op
});
}
// =============================================================================
// thinking indicator — proof-of-life pulse + elapsed counter
// =============================================================================
function startThinkingIndicator(bubble: HTMLElement): void {
// Append a thinking node next to the bubble text (sibling, so the
// typewriter rewriting text content doesn't clobber it). The node
// shows a pulse dot + the elapsed counter.
let node = bubble.querySelector(".paliadin-thinking") as HTMLElement | null;
if (node) return; // already running
// Clear the static placeholder text — the live pulse + counter is
// now the canonical "denkt nach" signal. Leaving the text in place
// would render the same phrase twice.
const textNode = bubble.querySelector(".paliadin-bubble-text");
if (textNode) textNode.textContent = "";
node = document.createElement("div");
node.className = "paliadin-thinking";
node.innerHTML = `
<span class="paliadin-thinking-dot" aria-hidden="true"></span>
<span class="paliadin-thinking-label"></span>
<span class="paliadin-thinking-elapsed"></span>
`;
const label = node.querySelector(".paliadin-thinking-label")!;
label.textContent = t("paliadin.thinking");
bubble.appendChild(node);
// Initial 0s — replaced as soon as a thinking event arrives or our
// local ticker fires.
updateThinkingIndicator(bubble, 0);
}
function updateThinkingIndicator(bubble: HTMLElement, elapsedSeconds: number): void {
const node = bubble.querySelector(".paliadin-thinking") as HTMLElement | null;
if (!node) return;
const elapsed = node.querySelector(".paliadin-thinking-elapsed");
if (elapsed) {
elapsed.textContent = formatThinkingSeconds(elapsedSeconds);
}
}
function stopThinkingIndicator(bubble: HTMLElement): void {
bubble.querySelector(".paliadin-thinking")?.remove();
}
function formatThinkingSeconds(s: number): string {
if (s < 0) s = 0;
return t("paliadin.thinking.seconds").replace("{seconds}", String(Math.round(s)));
}
// writeStreamedText fills the bubble with raw text as it accumulates.
// Cheaper than the typewriter — we already have the real cadence from
// the wire, no need to simulate it.
function writeStreamedText(bubble: HTMLElement, text: string): void {
const node = bubble.querySelector(".paliadin-bubble-text");
if (!node) return;
node.textContent = text;
const stream = document.getElementById("paliadin-stream");
if (stream) stream.scrollTop = stream.scrollHeight;
}
// Server emits SSE error events as JSON `{code, message}`. Map known
// codes to localised, user-friendly text; fall through to a generic
// "connection lost" for anything we don't recognise (including raw
// EventSource transport errors where data is absent).
function friendlyErrorMessage(data: unknown): string {
if (typeof data !== "string" || data === "") {
return t("paliadin.error.connection_lost");
}
try {
const parsed = JSON.parse(data) as { code?: string };
switch (parsed.code) {
case "tmux_unavailable":
// Local PoC path: paliad is running on a host without tmux/claude
// (typically the legacy laptop-only build).
return t("paliadin.error.local_only");
case "mriver_unreachable":
// t-paliad-151: prod path's mRiver is offline (laptop asleep, off
// tailnet, or paliadin-shim missing).
return t("paliadin.error.mriver_unreachable");
case "shim_auth_failed":
// SSH key wrong or authorized_keys drifted.
return t("paliadin.error.shim_auth_failed");
case "shim_error":
case "bootstrap_failed":
// Generic remote shim failure or system-prompt bootstrap error.
return t("paliadin.error.shim_error");
case "timeout":
return t("paliadin.error.timeout");
}
} catch {
// Not JSON — fall through to the generic connection-lost message
// rather than leaking a raw payload into the bubble.
}
return t("paliadin.error.connection_lost");
}
function cleanupTurn(): void {
if (currentEventSource) {
currentEventSource.close();
currentEventSource = null;
}
currentTurnId = null;
toggleStopButton(false);
}
function toggleStopButton(streaming: boolean): void {
const send = document.getElementById("paliadin-send") as HTMLButtonElement | null;
const stop = document.getElementById("paliadin-stop") as HTMLButtonElement | null;
if (send) send.style.display = streaming ? "none" : "";
if (stop) {
stop.style.display = streaming ? "" : "none";
stop.onclick = () => {
cleanupTurn();
};
}
}
function appendBubble(role: "user" | "assistant", text: string): HTMLElement {
const stream = document.getElementById("paliadin-stream")!;
const bubble = document.createElement("div");
bubble.className = "paliadin-bubble paliadin-bubble--" + role;
bubble.innerHTML = `
<div class="paliadin-bubble-role">${role === "user" ? "Du" : "Paliadin"}</div>
<div class="paliadin-bubble-text"></div>
<div class="paliadin-bubble-meta" style="display:none"></div>
`;
bubble.querySelector(".paliadin-bubble-text")!.textContent = text;
stream.appendChild(bubble);
stream.scrollTop = stream.scrollHeight;
return bubble;
}
// typewriter incrementally fills the bubble's text node so a one-shot
// content blob feels like streaming. ~5 ms per character; fast enough
// to keep up with even a 4k-char response.
function typewriter(bubble: HTMLElement, text: string): void {
const node = bubble.querySelector(".paliadin-bubble-text")!;
node.textContent = "";
let i = 0;
const speed = 6;
const tick = () => {
if (bubble.dataset.streaming !== "true") {
// Streaming finished — finishBubble has already rendered the full
// Markdown via dataset.fullText. Return without writing so we
// don't replace the rendered HTML with raw text on a delayed tick.
return;
}
if (i >= text.length) return;
const next = Math.min(i + 8, text.length);
node.textContent = text.slice(0, next);
i = next;
const stream = document.getElementById("paliadin-stream")!;
stream.scrollTop = stream.scrollHeight;
setTimeout(tick, speed);
};
tick();
}
function getBubbleText(bubble: HTMLElement): string {
return bubble.querySelector(".paliadin-bubble-text")?.textContent || "";
}
// finishBubble parses the response for citation markers + tool-use
// evidence and renders both. Markers found in the text get replaced
// by anchor buttons; the meta row at the bottom shows
// "ran search_my_deadlines (3 results)".
function finishBubble(bubble: HTMLElement, data: any): void {
const textNode = bubble.querySelector(".paliadin-bubble-text")! as HTMLElement;
// Prefer the full text cached on the bubble at content-event time;
// textContent may still reflect the typewriter's partial state.
const raw = bubble.dataset.fullText ?? textNode.textContent ?? "";
textNode.innerHTML = renderResponseHTML(raw);
const metaEl = bubble.querySelector(".paliadin-bubble-meta") as HTMLElement | null;
if (metaEl) {
const tools = (data.used_tools || []) as string[];
const rows = (data.rows_seen || []) as number[];
if (tools.length > 0) {
const parts = tools.map((t, i) => {
const r = rows[i];
return r != null ? `${t} (${r})` : t;
});
metaEl.innerHTML = "▸ " + parts.join(" · ");
metaEl.style.display = "";
}
}
}
// startLatePoll registers the recovery-endpoint poller for one errored
// turn. When the row gains a response we swap the bubble's content +
// drop the error class + retroactively replace the history entry
// (which was never written for the failed turn — append now so reload
// renders the late reply). When the backend confirms the turn is
// "lost", we swap the bubble to the honest "verloren" copy.
function startLatePoll(turnId: string, bubble: HTMLElement): void {
// Avoid duplicate pollers for the same turn (e.g. SSE error fires
// twice in some browsers when the connection drops).
latePolls.get(turnId)?.cancel();
const handle = pollForLateResponse({
turnId,
onLateResponse: (turn) => {
latePolls.delete(turnId);
applyLateResponse(bubble, turn);
},
onLost: () => {
latePolls.delete(turnId);
applyLostResponse(bubble);
},
onGiveUp: () => {
latePolls.delete(turnId);
applyLostResponse(bubble);
},
});
latePolls.set(turnId, handle);
}
function applyLostResponse(bubble: HTMLElement): void {
bubble.classList.remove("paliadin-bubble--late-pending");
bubble.classList.add("paliadin-bubble--lost");
const node = bubble.querySelector(".paliadin-bubble-text");
if (node) node.textContent = t("paliadin.late.lost");
}
function applyLateResponse(bubble: HTMLElement, turn: LateTurn): void {
if (!turn.response) return;
bubble.classList.remove("paliadin-bubble--error", "paliadin-bubble--late-pending");
bubble.classList.add("paliadin-bubble--late");
bubble.dataset.fullText = turn.response;
bubble.dataset.streaming = "false";
finishBubble(bubble, {
used_tools: turn.used_tools,
rows_seen: turn.rows_seen,
classifier_tag: turn.classifier_tag,
duration_ms: turn.duration_ms,
chip_count: turn.chip_count,
});
// Inject a small "(verspätet)" marker into the meta row so it's
// visible at a glance that this bubble was patched after the fact.
const metaEl = bubble.querySelector(".paliadin-bubble-meta") as HTMLElement | null;
if (metaEl) {
const lateTag = document.createElement("span");
lateTag.className = "paliadin-bubble-late-tag";
lateTag.textContent = " · " + t("paliadin.late.marker");
metaEl.appendChild(lateTag);
metaEl.style.display = "";
}
// Persist so a reload shows the late response in place of the error.
history.push({
role: "assistant",
text: turn.response,
meta: {
used_tools: turn.used_tools,
rows_seen: turn.rows_seen,
classifier_tag: turn.classifier_tag ?? undefined,
duration_ms: turn.duration_ms ?? undefined,
chip_count: turn.chip_count,
},
ts: new Date().toISOString(),
});
saveHistory();
}
function saveHistory(): void {
localStorage.setItem(HISTORY_PREFIX + sessionId, JSON.stringify(history));
}
// PaliadinTurnRow mirrors the JSON returned by /api/paliadin/history
// (services.PaliadinTurn). Fields we don't render yet are skipped.
interface PaliadinTurnRow {
turn_id: string;
session_id: string;
started_at: string;
user_message: string;
response?: string | null;
used_tools?: string[] | null;
rows_seen?: number[] | null;
classifier_tag?: string | null;
duration_ms?: number | null;
chip_count?: number | null;
}
// Hydrate from /api/paliadin/history, replacing the localStorage cache
// when the DB returns rows. Fail-quiet on network / auth errors —
// localStorage is a perfectly good offline fallback.
async function hydrateFromServer(): Promise<void> {
let rows: PaliadinTurnRow[] = [];
try {
const r = await fetch(
"/api/paliadin/history?session=" + encodeURIComponent(sessionId) + "&limit=50",
{ credentials: "same-origin" },
);
if (!r.ok) return;
const body = (await r.json()) as PaliadinTurnRow[] | null;
rows = Array.isArray(body) ? body : [];
} catch {
return;
}
if (!rows.length) return;
const reconstructed: HistoryEntry[] = [];
for (const row of rows) {
reconstructed.push({ role: "user", text: row.user_message, ts: row.started_at });
if (typeof row.response === "string" && row.response.length > 0) {
reconstructed.push({
role: "assistant",
text: row.response,
ts: row.started_at,
meta: {
used_tools: row.used_tools ?? undefined,
rows_seen: row.rows_seen ?? undefined,
classifier_tag: row.classifier_tag ?? undefined,
duration_ms: row.duration_ms ?? undefined,
chip_count: row.chip_count ?? undefined,
},
});
}
}
history = reconstructed;
saveHistory();
renderHistory();
}
function renderHistory(): void {
const stream = document.getElementById("paliadin-stream");
if (!stream) return;
// Clear non-empty bubbles, keep the empty-state.
Array.from(stream.children).forEach((el) => {
if (!el.classList.contains("paliadin-empty")) el.remove();
});
if (history.length === 0) {
const empty = document.getElementById("paliadin-empty");
if (empty) empty.style.display = "";
return;
}
const empty = document.getElementById("paliadin-empty");
if (empty) empty.style.display = "none";
history.forEach((h) => {
const bubble = appendBubble(h.role, h.text);
if (h.role === "assistant" && h.meta) {
bubble.dataset.streaming = "false";
finishBubble(bubble, {
used_tools: h.meta.used_tools,
rows_seen: h.meta.rows_seen,
});
}
});
}