Merge: fix(paliadin): one-shot fallback when persona lacks streaming (unblock chat)
Some checks failed
Paliad CI gate / build (push) Has been cancelled
Paliad CI gate / test-go (push) Has been cancelled
Paliad CI gate / deploy (push) Has been cancelled

This commit is contained in:
mAi
2026-05-26 19:24:42 +02:00

View File

@@ -220,6 +220,14 @@ func (s *AichatPaliadinService) RunTurnStream(ctx context.Context, req TurnReque
}
if streamErr != nil {
// Aichat persona without streaming support — graceful fallback to
// the one-shot /chat/turn endpoint. Same body shape; we adapt the
// non-streaming response into a single StreamChunk so the caller
// sees identical event ordering.
if strings.Contains(streamErr.Error(), "unsupported_streaming") {
log.Printf("paliadin: persona %q lacks streaming support — falling back to one-shot turn %s", s.cfg.Persona, turnID)
return s.fallbackOneShotFromStream(ctx, turnID, body, events, startedAt, session)
}
// Don't overwrite an existing error_code we may have set above.
_ = s.markTurnError(ctx, turnID, classifyAichatError(streamErr))
return nil, streamErr
@@ -255,6 +263,80 @@ func (s *AichatPaliadinService) RunTurnStream(ctx context.Context, req TurnReque
}, nil
}
// fallbackOneShotFromStream runs the same `body` against aichat's
// non-streaming /chat/turn endpoint and adapts the response into the
// StreamingPaliadin contract — a single StreamChunk + StreamMeta +
// StreamConversation, followed by `events` being closed by the
// outer RunTurnStream's defer. Used when the configured persona doesn't
// support streaming (aichat returns HTTP 400 unsupported_streaming).
//
// Identical persistence shape as the one-shot RunTurn: completeTurn +
// markPrimed/clearPrimed. No new turn row (already inserted by
// RunTurnStream). No primer rebuild (already in body).
func (s *AichatPaliadinService) fallbackOneShotFromStream(
ctx context.Context,
turnID uuid.UUID,
body aichatTurnRequest,
events chan<- StreamEvent,
startedAt time.Time,
session string,
) (*TurnResult, error) {
var resp aichatTurnResponse
if err := s.callHTTP(ctx, http.MethodPost, "/chat/turn", body, &resp); err != nil {
_ = s.markTurnError(ctx, turnID, classifyAichatError(err))
safeSendStream(ctx, events, StreamEvent{
Kind: StreamError,
Code: classifyAichatError(err),
Message: err.Error(),
})
return nil, err
}
if resp.PaneSpawned {
s.clearPrimed(session)
} else {
s.markPrimed(session)
}
cleanBody := resp.Response
tokens := approxTokenCount(cleanBody)
chipCount := countChips(cleanBody)
finished := time.Now().UTC()
durationMS := int(finished.Sub(startedAt) / time.Millisecond)
tmeta := trailerMeta{
UsedTools: resp.Meta.UsedTools,
ClassifierTag: resp.Meta.ClassifierTag,
RowsSeen: coerceAichatRowsSeen(resp.Meta.RowsSeen),
}
// Emit the response as a single chunk so the frontend renders it.
safeSendStream(ctx, events, StreamEvent{
Kind: StreamChunk,
Content: cleanBody,
})
safeSendStream(ctx, events, StreamEvent{
Kind: StreamMeta,
UsedTools: tmeta.UsedTools,
ClassifierTag: tmeta.ClassifierTag,
RowsSeen: tmeta.RowsSeen,
})
if err := s.completeTurn(ctx, turnID, finished, durationMS, cleanBody, tokens, tmeta, chipCount); err != nil {
log.Printf("paliadin: complete turn %s (fallback one-shot): %v", turnID, err)
}
return &TurnResult{
TurnID: turnID,
Response: cleanBody,
UsedTools: tmeta.UsedTools,
RowsSeen: tmeta.RowsSeen,
ChipCount: chipCount,
ClassifierTag: tmeta.ClassifierTag,
DurationMS: durationMS,
}, nil
}
// streamFrame is one decoded SSE event.
type streamFrame struct {
event string // "" → default (data:) event