diff --git a/README.md b/README.md index 461a390..7b84175 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ Codes: `consumer_unreachable`, `no_consumer`, `scheduler_error`, `bad_consumer_u - ✅ Schritt 0 — ComfyUI persistent (`systemd: comfyui.service`) - ✅ Schritt 1 — `mvoice /api/admin/{load,unload}` (mai/knuth/admin-load-unload @ mVoice) -- ✅ Schritt 2 — Routing-Façade + `/v1/status` (passthrough scheduler) -- ☐ Schritt 3 — wa.sh auf Broker umgestellt -- ☐ Schritt 4 — Queue + globaler GPU-Lock -- ☐ Schritt 5 — Coexistenz-Gruppen + LRU-Eviction +- ✅ Schritt 2 — Routing-Façade + `/v1/status` +- ✅ Schritt 3 — wa.sh auf Broker umgestellt (m/mAi `mai/knuth/wa-tts-broker`) +- ✅ Schritt 4 — Queue + globaler GPU-Lock +- ✅ Schritt 5 — Coexistenz-Gruppen + LRU-Eviction diff --git a/cmd/mgpumanager/main.go b/cmd/mgpumanager/main.go index 3b43c84..cf7a8fa 100644 --- a/cmd/mgpumanager/main.go +++ b/cmd/mgpumanager/main.go @@ -61,9 +61,10 @@ func main() { reg := registry.New(cfg, logger.With("component", "registry")) gpuPoller := gpu.NewPoller(cfg.GPU.PollInterval(), logger.With("component", "gpu")) - // Phase 1 always runs a single-slot global GPU lock. Schritt 5's - // eviction-aware scheduler wraps this same lock with VRAM pressure logic. - sched := scheduler.NewLocked(reg, 1) + // Schritt 5: VRAM-pressure-aware scheduler. Wraps the global GPU lock + // with eviction logic — see internal/scheduler/evicting.go. + sched := scheduler.NewEvicting(cfg, reg, gpuPoller, + logger.With("component", "scheduler")) go reg.Run(ctx) go gpuPoller.Run(ctx) diff --git a/internal/gpu/gpu.go b/internal/gpu/gpu.go index f8b77ae..09a5899 100644 --- a/internal/gpu/gpu.go +++ b/internal/gpu/gpu.go @@ -62,6 +62,18 @@ func (p *Poller) Last() Sample { return p.last } +// SetSampleForTest injects a synthetic VRAM reading. Used from tests that +// must drive the scheduler's eviction logic without a real GPU or +// nvidia-smi. Production callers should never reach this. +func SetSampleForTest(p *Poller, freeMiB, totalMiB int) { + p.store(Sample{ + FreeMiB: freeMiB, + TotalMiB: totalMiB, + UsedMiB: totalMiB - freeMiB, + At: time.Now(), + }) +} + func (p *Poller) sampleOnce(ctx context.Context) { cctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() diff --git a/internal/scheduler/evicting.go b/internal/scheduler/evicting.go new file mode 100644 index 0000000..e0813df --- /dev/null +++ b/internal/scheduler/evicting.go @@ -0,0 +1,329 @@ +package scheduler + +import ( + "context" + "fmt" + "io" + "log/slog" + "net/http" + "slices" + "strings" + "sync" + "sync/atomic" + "time" + + "mgit.msbls.de/m/mGPUmanager/internal/config" + "mgit.msbls.de/m/mGPUmanager/internal/gpu" + "mgit.msbls.de/m/mGPUmanager/internal/registry" +) + +// vramCushionMiB is the minimum free VRAM the scheduler insists on having +// AFTER the target consumer is loaded. Keeps cudaMalloc headers from OOM-ing +// at the very edge of available memory. +const vramCushionMiB = 256 + +// maxEvictAttempts caps how many consumers the scheduler will unload in a +// single ensureFits cycle before giving up and returning an error. Five is +// generous — we only have four consumers configured. +const maxEvictAttempts = 5 + +// Evicting is the Schritt 5 scheduler: it wraps a Locked scheduler with +// VRAM-pressure-aware eviction. +// +// Flow per job: +// 1. ensureFits — if the live free VRAM minus a 256 MiB cushion is below +// the target consumer's vram_resident_mib AND the target is not already +// resident, unload the LRU non-coexistent consumer. Repeat until fit. +// 2. ensureLoaded — if the target was previously unloaded, call its +// load endpoint (mvoice) or rely on implicit cold-start (whisper, etc.). +// 3. inner.Run — acquire the global GPU lock and run the job. +// +// Eviction state is scheduler-local: registry.Loaded (polled every 5 s) is +// authoritative when the consumer reports it, but for the seconds between an +// unload call and the next probe we rely on our own bookkeeping. +type Evicting struct { + cfg *config.Config + reg *registry.Registry + gpu *gpu.Poller + inner *Locked + logger *slog.Logger + client *http.Client + + mu sync.Mutex + loaded map[string]bool // consumer name -> believed-resident + lastUsed map[string]time.Time + evictions int64 +} + +// NewEvicting builds the Schritt 5 scheduler. All consumers are assumed +// resident at startup — the first health probe will correct any consumers +// that actually aren't (e.g. mvoice in 'unloaded' state). +func NewEvicting(cfg *config.Config, reg *registry.Registry, gpuPoller *gpu.Poller, logger *slog.Logger) *Evicting { + e := &Evicting{ + cfg: cfg, + reg: reg, + gpu: gpuPoller, + inner: NewLocked(reg, 1), + logger: logger, + client: &http.Client{Timeout: 30 * time.Second}, + loaded: make(map[string]bool, len(cfg.Consumers)), + lastUsed: make(map[string]time.Time, len(cfg.Consumers)), + } + for name, cons := range cfg.Consumers { + // Self-managed VRAM consumers (ollama) are always 'loaded' from + // the scheduler's perspective — we never evict them via HTTP. + e.loaded[name] = !cons.VRAMManaged || true + } + return e +} + +// Run is the public Scheduler interface: ensure room + load + serialise. +func (e *Evicting) Run(ctx context.Context, consumer string, fn Job) error { + if err := e.ensureFits(ctx, consumer); err != nil { + return fmt.Errorf("eviction: %w", err) + } + if err := e.ensureLoaded(ctx, consumer); err != nil { + return fmt.Errorf("load %s: %w", consumer, err) + } + err := e.inner.Run(ctx, consumer, fn) + if err == nil { + e.mu.Lock() + e.lastUsed[consumer] = time.Now() + e.mu.Unlock() + } + return err +} + +// Stats forwards from the inner scheduler and adds the eviction counter. +func (e *Evicting) Stats() Stats { + s := e.inner.Stats() + s.Evictions = atomic.LoadInt64(&e.evictions) + return s +} + +// ───── ensureFits ──────────────────────────────────────────────────────── + +func (e *Evicting) ensureFits(ctx context.Context, target string) error { + cons := e.cfg.Consumers[target] + if cons == nil { + return fmt.Errorf("unknown consumer %q", target) + } + if cons.VRAMResidentMiB == 0 || cons.VRAMManaged { + // Self-managed (ollama) or unknown size — let the consumer figure + // it out; no preemptive eviction. + return nil + } + // Already resident? No eviction needed. + e.mu.Lock() + resident := e.loaded[target] + e.mu.Unlock() + if resident { + return nil + } + + for range maxEvictAttempts { + if e.fits(cons) { + return nil + } + victim := e.pickLRUVictim(target, cons) + if victim == "" { + // Nothing left to evict that we're allowed to touch. + e.logger.Warn("no eviction candidates", "target", target, + "need_mib", cons.VRAMResidentMiB, + "free_mib", e.gpu.Last().FreeMiB) + return nil + } + if err := e.unload(ctx, victim); err != nil { + e.logger.Warn("evict failed", "victim", victim, "err", err) + return fmt.Errorf("unload %s: %w", victim, err) + } + atomic.AddInt64(&e.evictions, 1) + e.logger.Info("evicted consumer", + "victim", victim, "target", target, + "free_mib_after", e.gpu.Last().FreeMiB, + "need_mib", cons.VRAMResidentMiB) + // Give the GPU a moment to actually free the VRAM before re-checking. + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + return ctx.Err() + } + } + return fmt.Errorf("VRAM headroom still insufficient after %d evictions", maxEvictAttempts) +} + +// fits returns true when the live nvidia-smi free VRAM minus the safety +// cushion is enough for the target consumer's predicted footprint. +// +// Falls back to the static budget (cfg.GPU.AvailableMiB() minus the +// non-coexistent loaded set) if the GPU poller has not produced a sample +// yet (e.g. during the first second of process lifetime). +func (e *Evicting) fits(cons *config.Consumer) bool { + sample := e.gpu.Last() + if sample.FreeMiB > 0 || sample.TotalMiB > 0 { + return sample.FreeMiB >= cons.VRAMResidentMiB+vramCushionMiB + } + return e.fitsByBudget(cons) +} + +func (e *Evicting) fitsByBudget(cons *config.Consumer) bool { + headroom := e.cfg.GPU.AvailableMiB() + e.mu.Lock() + defer e.mu.Unlock() + for name, loaded := range e.loaded { + if !loaded { + continue + } + other := e.cfg.Consumers[name] + if other == nil || other.VRAMManaged { + continue + } + if slices.Contains(cons.CanCoexistWith, name) { + continue + } + headroom -= other.VRAMResidentMiB + } + return headroom >= cons.VRAMResidentMiB +} + +// pickLRUVictim returns the name of the loaded consumer with the oldest +// LastUsed that is NOT in target's can_coexist_with list, NOT the target +// itself, NOT VRAM-managed, and has *some* way to be evicted. +func (e *Evicting) pickLRUVictim(target string, cons *config.Consumer) string { + snap := e.reg.Snapshot() + e.mu.Lock() + defer e.mu.Unlock() + + var best string + var bestTime time.Time + for name, loaded := range e.loaded { + if !loaded || name == target { + continue + } + other := e.cfg.Consumers[name] + if other == nil || other.VRAMManaged { + continue + } + if slices.Contains(cons.CanCoexistWith, name) { + continue + } + if other.Unload == nil && other.SystemdUnit == "" { + continue + } + // LastUsed: prefer scheduler-local (set on successful job exit) over + // registry (set on probe completion). Scheduler-local is more + // meaningful for LRU because it reflects real GPU work, not health + // chatter. + t := e.lastUsed[name] + if t.IsZero() { + t = snap[name].LastUsed + } + if best == "" || t.Before(bestTime) { + best = name + bestTime = t + } + } + return best +} + +// ───── unload + load ───────────────────────────────────────────────────── + +func (e *Evicting) unload(ctx context.Context, name string) error { + cons := e.cfg.Consumers[name] + if cons.Unload == nil { + // systemd-unit-based unload is whisper-server's path; we don't shell + // out to sudo from a server daemon in Phase 1. Mark unloaded so we + // don't keep picking it as a victim, and let the next request + // cold-start via systemd (whisper-server boots in <2 s). + if cons.SystemdUnit != "" { + e.mu.Lock() + e.loaded[name] = false + e.mu.Unlock() + return nil + } + return fmt.Errorf("consumer %s: no unload route configured", name) + } + + url := cons.URL + cons.Unload.Path + var body io.Reader + if cons.Unload.Body != "" { + body = strings.NewReader(cons.Unload.Body) + } + req, err := http.NewRequestWithContext(ctx, cons.Unload.Method, url, body) + if err != nil { + return err + } + if cons.Unload.Body != "" { + req.Header.Set("Content-Type", "application/json") + } + resp, err := e.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + io.Copy(io.Discard, resp.Body) + if resp.StatusCode >= 400 { + return fmt.Errorf("unload %s returned status %d", name, resp.StatusCode) + } + e.mu.Lock() + e.loaded[name] = false + e.mu.Unlock() + return nil +} + +func (e *Evicting) ensureLoaded(ctx context.Context, name string) error { + cons := e.cfg.Consumers[name] + if cons == nil { + return fmt.Errorf("unknown consumer %q", name) + } + e.mu.Lock() + if e.loaded[name] { + e.mu.Unlock() + return nil + } + e.mu.Unlock() + + // No explicit load endpoint — rely on the consumer's own cold-start + // behaviour (mvoice would auto-load if a request arrived, comfyui as + // well). Mark loaded optimistically. + if cons.Load == nil { + e.mu.Lock() + e.loaded[name] = true + e.mu.Unlock() + return nil + } + + url := cons.URL + cons.Load.Path + var body io.Reader + if cons.Load.Body != "" { + body = strings.NewReader(cons.Load.Body) + } + req, err := http.NewRequestWithContext(ctx, cons.Load.Method, url, body) + if err != nil { + return err + } + resp, err := e.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + io.Copy(io.Discard, resp.Body) + if resp.StatusCode >= 400 { + return fmt.Errorf("load %s returned status %d", name, resp.StatusCode) + } + e.mu.Lock() + e.loaded[name] = true + e.mu.Unlock() + return nil +} + +// SetLoadedForTest overrides the believed-loaded state for one consumer. +// Test-only — production code derives it from health probes + unload calls. +func (e *Evicting) SetLoadedForTest(name string, loaded bool) { + e.mu.Lock() + defer e.mu.Unlock() + e.loaded[name] = loaded +} + +// Compile-time interface guard. +var _ Scheduler = (*Evicting)(nil) diff --git a/internal/scheduler/evicting_test.go b/internal/scheduler/evicting_test.go new file mode 100644 index 0000000..153e01a --- /dev/null +++ b/internal/scheduler/evicting_test.go @@ -0,0 +1,247 @@ +package scheduler + +import ( + "context" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "mgit.msbls.de/m/mGPUmanager/internal/config" + "mgit.msbls.de/m/mGPUmanager/internal/gpu" + "mgit.msbls.de/m/mGPUmanager/internal/registry" +) + +func silentLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) +} + +// gpuStub implements just enough of gpu.Poller's surface for the evicting +// scheduler. We use the real Poller type (no interface yet) by hand-loading +// a Sample via a tiny wrapper. +// +// In practice we set gpu.Poller's internal sample via NewPoller + a goroutine. +// For tests we sidestep that by using a real Poller with a fake nvidia-smi — +// but the simpler path is to construct a Poller, store a Sample, and skip +// Run. We do that by exposing a tiny helper here. + +// makeGPU returns a Poller pre-loaded with the given free/total values. +// It never calls nvidia-smi. +func makeGPU(t *testing.T, freeMiB, totalMiB int) *gpu.Poller { + t.Helper() + p := gpu.NewPoller(time.Hour, silentLogger()) + // gpu.Poller.Last() reads from an internal Sample. We can't poke it + // directly without exporting state, so we use a sub-test trick: run + // sampleOnce against a fake nvidia-smi command. But that needs a PATH + // override and is brittle. Instead, expose a SetForTest helper. + gpu.SetSampleForTest(p, freeMiB, totalMiB) + return p +} + +// fakeConsumer hosts /api/admin/{load,unload} so the evicting scheduler can +// exercise the HTTP eviction path. +type fakeConsumer struct { + srv *httptest.Server + unloadHit atomic.Int32 + loadHit atomic.Int32 +} + +func newFakeConsumer(t *testing.T) *fakeConsumer { + t.Helper() + fc := &fakeConsumer{} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/health", func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"loaded":true,"gpu_resident_mib":2800}`)) + }) + mux.HandleFunc("POST /api/admin/unload", func(w http.ResponseWriter, _ *http.Request) { + fc.unloadHit.Add(1) + w.WriteHeader(200) + }) + mux.HandleFunc("POST /api/admin/load", func(w http.ResponseWriter, _ *http.Request) { + fc.loadHit.Add(1) + w.WriteHeader(200) + }) + mux.HandleFunc("POST /prompt", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + }) + mux.HandleFunc("POST /api/free", func(w http.ResponseWriter, _ *http.Request) { + fc.unloadHit.Add(1) + w.WriteHeader(200) + }) + fc.srv = httptest.NewServer(mux) + return fc +} + +func buildCfg(mvoiceURL, comfyURL string) *config.Config { + return &config.Config{ + Listen: "127.0.0.1:0", + GPU: config.GPU{TotalMiB: 16376, ReservedMiB: 1024, PollIntervalSeconds: 2}, + Routing: map[config.EndpointKind]string{ + config.KindTTS: "mvoice", + config.KindImage: "comfyui", + }, + Consumers: map[string]*config.Consumer{ + "mvoice": { + URL: mvoiceURL, + Health: config.Route{Method: "GET", Path: "/api/health"}, + Paths: map[config.EndpointKind]config.Route{ + config.KindTTS: {Method: "POST", Path: "/api/synthesize"}, + }, + VRAMResidentMiB: 2800, + Load: &config.Route{Method: "POST", Path: "/api/admin/load"}, + Unload: &config.Route{Method: "POST", Path: "/api/admin/unload"}, + CanCoexistWith: []string{"whisper-server", "ollama"}, + Priority: 3, + MaxConcurrency: 1, + }, + "comfyui": { + URL: comfyURL, + Health: config.Route{Method: "GET", Path: "/system_stats"}, + Paths: map[config.EndpointKind]config.Route{ + config.KindImage: {Method: "POST", Path: "/prompt"}, + }, + VRAMResidentMiB: 13000, + Unload: &config.Route{ + Method: "POST", + Path: "/api/free", + Body: `{"unload_models":true,"free_memory":true}`, + }, + CanCoexistWith: []string{}, + Priority: 1, + MaxConcurrency: 1, + }, + }, + } +} + +// TestEvictingSkipsWhenAlreadyResident verifies the no-op fast path: a job +// for an already-loaded consumer with plenty of free VRAM runs without any +// unload call. +func TestEvictingSkipsWhenAlreadyResident(t *testing.T) { + mvoice := newFakeConsumer(t) + defer mvoice.srv.Close() + comfy := newFakeConsumer(t) + defer comfy.srv.Close() + + cfg := buildCfg(mvoice.srv.URL, comfy.srv.URL) + reg := registry.New(cfg, silentLogger()) + g := makeGPU(t, 8192, 16376) // plenty of headroom + e := NewEvicting(cfg, reg, g, silentLogger()) + + if err := e.Run(context.Background(), "mvoice", func(ctx context.Context) error { return nil }); err != nil { + t.Fatal(err) + } + if mvoice.unloadHit.Load() != 0 { + t.Errorf("unexpected unload hits on mvoice: %d", mvoice.unloadHit.Load()) + } + if comfy.unloadHit.Load() != 0 { + t.Errorf("unexpected unload hits on comfyui: %d", comfy.unloadHit.Load()) + } +} + +// TestEvictingFreesNonCoexistentVictim simulates the canonical scenario from +// the design: a TTS request comes in while comfyui is hogging 13 GiB. mvoice +// is not coexistent with comfyui (per cfg), so the scheduler must call +// comfyui's /api/free before letting the TTS job run. +func TestEvictingFreesNonCoexistentVictim(t *testing.T) { + mvoice := newFakeConsumer(t) + defer mvoice.srv.Close() + comfy := newFakeConsumer(t) + defer comfy.srv.Close() + + cfg := buildCfg(mvoice.srv.URL, comfy.srv.URL) + reg := registry.New(cfg, silentLogger()) + + // Only 1 GiB free — mvoice (2.8 GiB) won't fit until comfyui (13 GiB) + // is evicted. + g := makeGPU(t, 1024, 16376) + e := NewEvicting(cfg, reg, g, silentLogger()) + + // Force the believed-loaded state so eviction kicks in (Run treats + // 'already loaded' as a no-op fast path). + e.SetLoadedForTest("mvoice", false) + e.SetLoadedForTest("comfyui", true) + + // After the eviction unload call lands, we want fits() to return true + // for the next iteration — patch the GPU sample to reflect the freed + // memory by swapping the poller before the second fits() check is hit. + // We accomplish that by stubbing the unload handler to also bump the + // sample. + comfy.srv.Config.Handler = withHook(comfy.srv.Config.Handler, func() { + gpu.SetSampleForTest(g, 14000, 16376) + }) + + if err := e.Run(context.Background(), "mvoice", func(ctx context.Context) error { return nil }); err != nil { + t.Fatal(err) + } + if got := comfy.unloadHit.Load(); got != 1 { + t.Errorf("comfyui unload hit count = %d, want 1", got) + } + if got := mvoice.loadHit.Load(); got != 1 { + t.Errorf("mvoice load hit count = %d, want 1", got) + } + if got := e.Stats().Evictions; got != 1 { + t.Errorf("stats.Evictions = %d, want 1", got) + } +} + +// TestEvictingHonoursCoexistence ensures we never evict a consumer that the +// target declared compatible. mvoice can coexist with ollama, so ollama must +// not be picked even if it's the LRU candidate. +func TestEvictingHonoursCoexistence(t *testing.T) { + mvoice := newFakeConsumer(t) + defer mvoice.srv.Close() + comfy := newFakeConsumer(t) + defer comfy.srv.Close() + + cfg := buildCfg(mvoice.srv.URL, comfy.srv.URL) + // Add a stub ollama with an unload endpoint, mark coexistent. + ollama := newFakeConsumer(t) + defer ollama.srv.Close() + cfg.Consumers["ollama"] = &config.Consumer{ + URL: ollama.srv.URL, + Health: config.Route{Method: "GET", Path: "/api/health"}, + Paths: map[config.EndpointKind]config.Route{}, + VRAMResidentMiB: 2000, + Unload: &config.Route{Method: "POST", Path: "/api/admin/unload"}, + CanCoexistWith: []string{"mvoice"}, + MaxConcurrency: 1, + } + + reg := registry.New(cfg, silentLogger()) + g := makeGPU(t, 1000, 16376) + e := NewEvicting(cfg, reg, g, silentLogger()) + e.SetLoadedForTest("mvoice", false) + e.SetLoadedForTest("comfyui", true) + e.SetLoadedForTest("ollama", true) + + comfy.srv.Config.Handler = withHook(comfy.srv.Config.Handler, func() { + gpu.SetSampleForTest(g, 14000, 16376) + }) + + if err := e.Run(context.Background(), "mvoice", func(ctx context.Context) error { return nil }); err != nil { + t.Fatal(err) + } + if got := ollama.unloadHit.Load(); got != 0 { + t.Errorf("ollama (coexistent) unloaded %d times; should be 0", got) + } + if got := comfy.unloadHit.Load(); got != 1 { + t.Errorf("comfyui unload hit count = %d, want 1", got) + } +} + +// ───── helpers ──────────────────────────────────────────────────────────── + +// withHook wraps an http.Handler so each call invokes hook() before +// delegating to the original handler. Used to simulate VRAM being freed +// the instant comfyui's /api/free returns. +func withHook(h http.Handler, hook func()) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hook() + h.ServeHTTP(w, r) + }) +}