Replaces the MVP Passthrough with scheduler.Locked: a capacity-1 channel serialises every consumer's GPU work end-to-end. main.go switches to it. Behavioural contract: - Jobs that arrive while another job holds the GPU block on the channel until the holder finishes. Context cancellation aborts the wait cleanly (no leaked tokens, queue depth decremented). - Stats track queue_depth, in_flight, total_jobs, last_wait_ms, last_run_ms, oldest_queued — surfaced through /v1/status. - One lock for ALL consumers (not per-consumer): the design (§4.3) is explicit that grobgranular > GPU-stream-granular on single-GPU single-user hardware. mvoice + ollama + comfyui never run truly concurrently any more, which is the whole point — that's what produced the CUDA-OOM under load. Tests: - 5 goroutines hammer the scheduler concurrently → max in-flight = 1. - Cancellation while parked on the lock returns ctx.Err() and frees the queue slot. - Stats reflect in-flight + queue-depth transitions correctly. - Race detector clean. Schritt 5 will compose this with VRAM-pressure eviction: before acquiring the lock, check if the target consumer's resident cost fits under the current GPU headroom; if not, unload the LRU non-coexistent consumer first. Refs: m/mGPUmanager#1 (Schritt 4).
173 lines
5.4 KiB
Go
173 lines
5.4 KiB
Go
// Package config loads the mGPUmanager consumer registry from YAML.
|
||
//
|
||
// The consumers.yaml file declares every GPU consumer (mvoice, whisper-server,
|
||
// ollama, comfyui), how to route the four logical endpoint kinds (tts, stt,
|
||
// llm, image) to a consumer, how to probe its health, and how to load/unload
|
||
// it from VRAM. The scheduler (Schritt 4–5) reads vram_resident_mib +
|
||
// can_coexist_with to drive eviction.
|
||
package config
|
||
|
||
import (
|
||
"fmt"
|
||
"net/url"
|
||
"os"
|
||
"strings"
|
||
"time"
|
||
|
||
"gopkg.in/yaml.v3"
|
||
)
|
||
|
||
// EndpointKind enumerates the four logical broker endpoints exposed on /v1/*.
|
||
type EndpointKind string
|
||
|
||
const (
|
||
KindTTS EndpointKind = "tts"
|
||
KindSTT EndpointKind = "stt"
|
||
KindLLM EndpointKind = "llm"
|
||
KindImage EndpointKind = "image"
|
||
)
|
||
|
||
// AllKinds is the canonical ordering used by /v1/status and tests.
|
||
var AllKinds = []EndpointKind{KindTTS, KindSTT, KindLLM, KindImage}
|
||
|
||
// Route describes an HTTP method + path on a consumer.
|
||
type Route struct {
|
||
Method string `yaml:"method"`
|
||
Path string `yaml:"path"`
|
||
// Body is an optional fixed request body for admin operations
|
||
// (e.g. ComfyUI's /api/free expects {"unload_models":true,"free_memory":true}).
|
||
Body string `yaml:"body,omitempty"`
|
||
}
|
||
|
||
// Consumer describes a single GPU consumer behind the broker.
|
||
type Consumer struct {
|
||
URL string `yaml:"url"`
|
||
Health Route `yaml:"health"`
|
||
Paths map[EndpointKind]Route `yaml:"paths"`
|
||
VRAMResidentMiB int `yaml:"vram_resident_mib"`
|
||
VRAMManaged bool `yaml:"vram_managed"` // self-managed LRU (ollama)
|
||
Load *Route `yaml:"load,omitempty"`
|
||
Unload *Route `yaml:"unload,omitempty"`
|
||
SystemdUnit string `yaml:"systemd_unit,omitempty"` // fallback unload (whisper-server)
|
||
CanCoexistWith []string `yaml:"can_coexist_with"`
|
||
Priority int `yaml:"priority"`
|
||
MaxConcurrency int `yaml:"max_concurrency"`
|
||
}
|
||
|
||
// GPU describes the host's GPU envelope.
|
||
type GPU struct {
|
||
TotalMiB int `yaml:"total_mib"`
|
||
ReservedMiB int `yaml:"reserved_mib"`
|
||
PollIntervalSeconds int `yaml:"poll_interval_seconds"`
|
||
}
|
||
|
||
// PollInterval returns the GPU polling cadence as a Duration. Defaults to 2s.
|
||
func (g GPU) PollInterval() time.Duration {
|
||
if g.PollIntervalSeconds <= 0 {
|
||
return 2 * time.Second
|
||
}
|
||
return time.Duration(g.PollIntervalSeconds) * time.Second
|
||
}
|
||
|
||
// AvailableMiB returns total VRAM minus the system-reserved headroom.
|
||
func (g GPU) AvailableMiB() int {
|
||
if g.TotalMiB <= 0 {
|
||
return 0
|
||
}
|
||
avail := g.TotalMiB - g.ReservedMiB
|
||
if avail < 0 {
|
||
return 0
|
||
}
|
||
return avail
|
||
}
|
||
|
||
// Config is the parsed mGPUmanager configuration.
|
||
type Config struct {
|
||
Listen string `yaml:"listen"`
|
||
GPU GPU `yaml:"gpu"`
|
||
Routing map[EndpointKind]string `yaml:"routing"`
|
||
AudioProxy string `yaml:"audio_proxy"`
|
||
AudioPathPrefix string `yaml:"audio_path_prefix"`
|
||
Consumers map[string]*Consumer `yaml:"consumers"`
|
||
}
|
||
|
||
// Load reads and validates a consumers.yaml file from disk.
|
||
func Load(path string) (*Config, error) {
|
||
b, err := os.ReadFile(path)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("read %s: %w", path, err)
|
||
}
|
||
var cfg Config
|
||
if err := yaml.Unmarshal(b, &cfg); err != nil {
|
||
return nil, fmt.Errorf("parse %s: %w", path, err)
|
||
}
|
||
if err := cfg.validate(); err != nil {
|
||
return nil, fmt.Errorf("validate %s: %w", path, err)
|
||
}
|
||
return &cfg, nil
|
||
}
|
||
|
||
func (c *Config) validate() error {
|
||
if c.Listen == "" {
|
||
c.Listen = "127.0.0.1:8770"
|
||
}
|
||
if len(c.Consumers) == 0 {
|
||
return fmt.Errorf("no consumers declared")
|
||
}
|
||
for name, cons := range c.Consumers {
|
||
if cons.URL == "" {
|
||
return fmt.Errorf("consumer %q: url is required", name)
|
||
}
|
||
if _, err := url.Parse(cons.URL); err != nil {
|
||
return fmt.Errorf("consumer %q: invalid url %q: %w", name, cons.URL, err)
|
||
}
|
||
if cons.Health.Path == "" {
|
||
return fmt.Errorf("consumer %q: health.path is required", name)
|
||
}
|
||
if cons.Health.Method == "" {
|
||
cons.Health.Method = "GET"
|
||
}
|
||
cons.Health.Method = strings.ToUpper(cons.Health.Method)
|
||
for kind, route := range cons.Paths {
|
||
if route.Path == "" {
|
||
return fmt.Errorf("consumer %q: paths.%s.path is required", name, kind)
|
||
}
|
||
if route.Method == "" {
|
||
route.Method = "POST"
|
||
}
|
||
route.Method = strings.ToUpper(route.Method)
|
||
cons.Paths[kind] = route
|
||
}
|
||
if cons.MaxConcurrency <= 0 {
|
||
cons.MaxConcurrency = 1
|
||
}
|
||
}
|
||
for kind, consName := range c.Routing {
|
||
if _, ok := c.Consumers[consName]; !ok {
|
||
return fmt.Errorf("routing.%s: unknown consumer %q", kind, consName)
|
||
}
|
||
}
|
||
if c.AudioProxy != "" {
|
||
if _, ok := c.Consumers[c.AudioProxy]; !ok {
|
||
return fmt.Errorf("audio_proxy: unknown consumer %q", c.AudioProxy)
|
||
}
|
||
if c.AudioPathPrefix == "" {
|
||
c.AudioPathPrefix = "/api/audio/"
|
||
}
|
||
if !strings.HasPrefix(c.AudioPathPrefix, "/") || !strings.HasSuffix(c.AudioPathPrefix, "/") {
|
||
return fmt.Errorf("audio_path_prefix must start and end with '/': %q", c.AudioPathPrefix)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ConsumerForKind returns the consumer designated to handle a given endpoint
|
||
// kind, or nil if routing is unset.
|
||
func (c *Config) ConsumerForKind(kind EndpointKind) (string, *Consumer) {
|
||
name, ok := c.Routing[kind]
|
||
if !ok {
|
||
return "", nil
|
||
}
|
||
return name, c.Consumers[name]
|
||
}
|